fix: redis support acquisition time

pull/2664/head
withchao 1 year ago
parent 6add09d476
commit 4cb56a1326

@ -169,6 +169,7 @@ local key = KEYS[1]
local size = tonumber(ARGV[1]) local size = tonumber(ARGV[1])
local lockSecond = ARGV[2] local lockSecond = ARGV[2]
local dataSecond = ARGV[3] local dataSecond = ARGV[3]
local mallocTime = ARGV[4]
local result = {} local result = {}
if redis.call("EXISTS", key) == 0 then if redis.call("EXISTS", key) == 0 then
local lockValue = math.random(0, 999999999) local lockValue = math.random(0, 999999999)
@ -189,6 +190,12 @@ if size == 0 then
table.insert(result, 0) table.insert(result, 0)
table.insert(result, curr_seq) table.insert(result, curr_seq)
table.insert(result, last_seq) table.insert(result, last_seq)
local setTime = redis.call("HGET", key, "TIME")
if setTime then
table.insert(result, setTime)
else
table.insert(result, 0)
end
return result return result
end end
local max_seq = curr_seq + size local max_seq = curr_seq + size
@ -196,21 +203,25 @@ if max_seq > last_seq then
local lockValue = math.random(0, 999999999) local lockValue = math.random(0, 999999999)
redis.call("HSET", key, "LOCK", lockValue) redis.call("HSET", key, "LOCK", lockValue)
redis.call("HSET", key, "CURR", last_seq) redis.call("HSET", key, "CURR", last_seq)
redis.call("HSET", key, "TIME", mallocTime)
redis.call("EXPIRE", key, lockSecond) redis.call("EXPIRE", key, lockSecond)
table.insert(result, 3) table.insert(result, 3)
table.insert(result, curr_seq) table.insert(result, curr_seq)
table.insert(result, last_seq) table.insert(result, last_seq)
table.insert(result, lockValue) table.insert(result, lockValue)
table.insert(result, mallocTime)
return result return result
end end
redis.call("HSET", key, "CURR", max_seq) redis.call("HSET", key, "CURR", max_seq)
redis.call("HSET", key, "TIME", ARGV[4])
redis.call("EXPIRE", key, dataSecond) redis.call("EXPIRE", key, dataSecond)
table.insert(result, 0) table.insert(result, 0)
table.insert(result, curr_seq) table.insert(result, curr_seq)
table.insert(result, last_seq) table.insert(result, last_seq)
table.insert(result, mallocTime)
return result return result
` `
result, err := s.rdb.Eval(ctx, script, []string{key}, size, int64(s.lockTime/time.Second), int64(s.dataTime/time.Second)).Int64Slice() result, err := s.rdb.Eval(ctx, script, []string{key}, size, int64(s.lockTime/time.Second), int64(s.dataTime/time.Second), time.Now().UnixMilli()).Int64Slice()
if err != nil { if err != nil {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
} }
@ -267,29 +278,34 @@ func (s *seqConversationCacheRedis) getMallocSize(conversationID string, size in
} }
func (s *seqConversationCacheRedis) Malloc(ctx context.Context, conversationID string, size int64) (int64, error) { func (s *seqConversationCacheRedis) Malloc(ctx context.Context, conversationID string, size int64) (int64, error) {
seq, _, err := s.mallocTime(ctx, conversationID, size)
return seq, err
}
func (s *seqConversationCacheRedis) mallocTime(ctx context.Context, conversationID string, size int64) (int64, int64, error) {
if size < 0 { if size < 0 {
return 0, errs.New("size must be greater than 0") return 0, 0, errs.New("size must be greater than 0")
} }
key := s.getSeqMallocKey(conversationID) key := s.getSeqMallocKey(conversationID)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
states, err := s.malloc(ctx, key, size) states, err := s.malloc(ctx, key, size)
if err != nil { if err != nil {
return 0, err return 0, 0, err
} }
switch states[0] { switch states[0] {
case 0: // success case 0: // success
return states[1], nil return states[1], states[3], nil
case 1: // not found case 1: // not found
mallocSize := s.getMallocSize(conversationID, size) mallocSize := s.getMallocSize(conversationID, size)
seq, err := s.mgo.Malloc(ctx, conversationID, mallocSize) seq, err := s.mgo.Malloc(ctx, conversationID, mallocSize)
if err != nil { if err != nil {
return 0, err return 0, 0, err
} }
s.setSeqRetry(ctx, key, states[1], seq+size, seq+mallocSize) s.setSeqRetry(ctx, key, states[1], seq+size, seq+mallocSize)
return seq, nil return seq, 0, nil
case 2: // locked case 2: // locked
if err := s.wait(ctx); err != nil { if err := s.wait(ctx); err != nil {
return 0, err return 0, 0, err
} }
continue continue
case 3: // exceeded cache max value case 3: // exceeded cache max value
@ -298,23 +314,23 @@ func (s *seqConversationCacheRedis) Malloc(ctx context.Context, conversationID s
mallocSize := s.getMallocSize(conversationID, size) mallocSize := s.getMallocSize(conversationID, size)
seq, err := s.mgo.Malloc(ctx, conversationID, mallocSize) seq, err := s.mgo.Malloc(ctx, conversationID, mallocSize)
if err != nil { if err != nil {
return 0, err return 0, 0, err
} }
if lastSeq == seq { if lastSeq == seq {
s.setSeqRetry(ctx, key, states[3], currSeq+size, seq+mallocSize) s.setSeqRetry(ctx, key, states[3], currSeq+size, seq+mallocSize)
return currSeq, nil return currSeq, 0, nil
} else { } else {
log.ZWarn(ctx, "malloc seq not equal cache last seq", nil, "conversationID", conversationID, "currSeq", currSeq, "lastSeq", lastSeq, "mallocSeq", seq) log.ZWarn(ctx, "malloc seq not equal cache last seq", nil, "conversationID", conversationID, "currSeq", currSeq, "lastSeq", lastSeq, "mallocSeq", seq)
s.setSeqRetry(ctx, key, states[3], seq+size, seq+mallocSize) s.setSeqRetry(ctx, key, states[3], seq+size, seq+mallocSize)
return seq, nil return seq, 0, nil
} }
default: default:
log.ZError(ctx, "malloc seq unknown state", nil, "state", states[0], "conversationID", conversationID, "size", size) log.ZError(ctx, "malloc seq unknown state", nil, "state", states[0], "conversationID", conversationID, "size", size)
return 0, errs.New(fmt.Sprintf("unknown state: %d", states[0])) return 0, 0, errs.New(fmt.Sprintf("unknown state: %d", states[0]))
} }
} }
log.ZError(ctx, "malloc seq retrying still failed", nil, "conversationID", conversationID, "size", size) log.ZError(ctx, "malloc seq retrying still failed", nil, "conversationID", conversationID, "size", size)
return 0, errs.New("malloc seq waiting for lock timeout", "conversationID", conversationID, "size", size) return 0, 0, errs.New("malloc seq waiting for lock timeout", "conversationID", conversationID, "size", size)
} }
func (s *seqConversationCacheRedis) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { func (s *seqConversationCacheRedis) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
@ -331,3 +347,7 @@ func (s *seqConversationCacheRedis) SetMinSeqs(ctx context.Context, seqs map[str
} }
return DeleteCacheBySlot(ctx, s.rocks, keys) return DeleteCacheBySlot(ctx, s.rocks, keys)
} }
func (s *seqConversationCacheRedis) GetMaxSeqWithTime(ctx context.Context, conversationID string) (int64, int64, error) {
return s.mallocTime(ctx, conversationID, 0)
}

@ -14,7 +14,7 @@ import (
) )
func newTestSeq() *seqConversationCacheRedis { func newTestSeq() *seqConversationCacheRedis {
mgocli, err := mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)) mgocli, err := mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@127.0.0.1:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -23,7 +23,7 @@ func newTestSeq() *seqConversationCacheRedis {
panic(err) panic(err)
} }
opt := &redis.Options{ opt := &redis.Options{
Addr: "172.16.8.48:16379", Addr: "127.0.0.1:16379",
Password: "openIM123", Password: "openIM123",
DB: 1, DB: 1,
} }
@ -107,3 +107,13 @@ func TestMinSeq(t *testing.T) {
ts := newTestSeq() ts := newTestSeq()
t.Log(ts.GetMinSeq(context.Background(), "10000000")) t.Log(ts.GetMinSeq(context.Background(), "10000000"))
} }
func TestMalloc(t *testing.T) {
ts := newTestSeq()
t.Log(ts.Malloc(context.Background(), "10000000", 100))
}
func TestGetMaxSeqWithTime(t *testing.T) {
ts := newTestSeq()
t.Log(ts.GetMaxSeqWithTime(context.Background(), "10000000"))
}

Loading…
Cancel
Save