|
|
|
@ -207,7 +207,7 @@ func (s *seqConversationCacheRedis) getSeqMallocKey(conversationID string) strin
|
|
|
|
|
return cachekey.GetMallocSeqKey(conversationID)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *seqConversationCacheRedis) setSeq(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64) (int64, error) {
|
|
|
|
|
func (s *seqConversationCacheRedis) setSeq(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64, mill int64) (int64, error) {
|
|
|
|
|
if lastSeq < currSeq {
|
|
|
|
|
return 0, errs.New("lastSeq must be greater than currSeq")
|
|
|
|
|
}
|
|
|
|
@ -220,8 +220,9 @@ local lockValue = ARGV[1]
|
|
|
|
|
local dataSecond = ARGV[2]
|
|
|
|
|
local curr_seq = tonumber(ARGV[3])
|
|
|
|
|
local last_seq = tonumber(ARGV[4])
|
|
|
|
|
local mallocTime = ARGV[5]
|
|
|
|
|
if redis.call("EXISTS", key) == 0 then
|
|
|
|
|
redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq)
|
|
|
|
|
redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq, "TIME", mallocTime)
|
|
|
|
|
redis.call("EXPIRE", key, dataSecond)
|
|
|
|
|
return 1
|
|
|
|
|
end
|
|
|
|
@ -229,11 +230,11 @@ if redis.call("HGET", key, "LOCK") ~= lockValue then
|
|
|
|
|
return 2
|
|
|
|
|
end
|
|
|
|
|
redis.call("HDEL", key, "LOCK")
|
|
|
|
|
redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq)
|
|
|
|
|
redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq, "TIME", mallocTime)
|
|
|
|
|
redis.call("EXPIRE", key, dataSecond)
|
|
|
|
|
return 0
|
|
|
|
|
`
|
|
|
|
|
result, err := s.rdb.Eval(ctx, script, []string{key}, owner, int64(s.dataTime/time.Second), currSeq, lastSeq).Int64()
|
|
|
|
|
result, err := s.rdb.Eval(ctx, script, []string{key}, owner, int64(s.dataTime/time.Second), currSeq, lastSeq, mill).Int64()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return 0, errs.Wrap(err)
|
|
|
|
|
}
|
|
|
|
@ -259,6 +260,7 @@ if redis.call("EXISTS", key) == 0 then
|
|
|
|
|
redis.call("EXPIRE", key, lockSecond)
|
|
|
|
|
table.insert(result, 1)
|
|
|
|
|
table.insert(result, lockValue)
|
|
|
|
|
table.insert(result, mallocTime)
|
|
|
|
|
return result
|
|
|
|
|
end
|
|
|
|
|
if redis.call("HEXISTS", key, "LOCK") == 1 then
|
|
|
|
@ -321,9 +323,9 @@ func (s *seqConversationCacheRedis) wait(ctx context.Context) error {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *seqConversationCacheRedis) setSeqRetry(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64) {
|
|
|
|
|
func (s *seqConversationCacheRedis) setSeqRetry(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64, mill int64) {
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
|
|
state, err := s.setSeq(ctx, key, owner, currSeq, lastSeq)
|
|
|
|
|
state, err := s.setSeq(ctx, key, owner, currSeq, lastSeq, mill)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.ZError(ctx, "set seq cache failed", err, "key", key, "owner", owner, "currSeq", currSeq, "lastSeq", lastSeq, "count", i+1)
|
|
|
|
|
if err := s.wait(ctx); err != nil {
|
|
|
|
@ -383,7 +385,7 @@ func (s *seqConversationCacheRedis) mallocTime(ctx context.Context, conversation
|
|
|
|
|
if err != nil {
|
|
|
|
|
return 0, 0, err
|
|
|
|
|
}
|
|
|
|
|
s.setSeqRetry(ctx, key, states[1], seq+size, seq+mallocSize)
|
|
|
|
|
s.setSeqRetry(ctx, key, states[1], seq+size, seq+mallocSize, states[2])
|
|
|
|
|
return seq, 0, nil
|
|
|
|
|
case 2: // locked
|
|
|
|
|
if err := s.wait(ctx); err != nil {
|
|
|
|
@ -393,18 +395,19 @@ func (s *seqConversationCacheRedis) mallocTime(ctx context.Context, conversation
|
|
|
|
|
case 3: // exceeded cache max value
|
|
|
|
|
currSeq := states[1]
|
|
|
|
|
lastSeq := states[2]
|
|
|
|
|
mill := states[4]
|
|
|
|
|
mallocSize := s.getMallocSize(conversationID, size)
|
|
|
|
|
seq, err := s.mgo.Malloc(ctx, conversationID, mallocSize)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return 0, 0, err
|
|
|
|
|
}
|
|
|
|
|
if lastSeq == seq {
|
|
|
|
|
s.setSeqRetry(ctx, key, states[3], currSeq+size, seq+mallocSize)
|
|
|
|
|
s.setSeqRetry(ctx, key, states[3], currSeq+size, seq+mallocSize, mill)
|
|
|
|
|
return currSeq, states[4], nil
|
|
|
|
|
} else {
|
|
|
|
|
log.ZWarn(ctx, "malloc seq not equal cache last seq", nil, "conversationID", conversationID, "currSeq", currSeq, "lastSeq", lastSeq, "mallocSeq", seq)
|
|
|
|
|
s.setSeqRetry(ctx, key, states[3], seq+size, seq+mallocSize)
|
|
|
|
|
return seq, states[4], nil
|
|
|
|
|
s.setSeqRetry(ctx, key, states[3], seq+size, seq+mallocSize, mill)
|
|
|
|
|
return seq, mill, nil
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
log.ZError(ctx, "malloc seq unknown state", nil, "state", states[0], "conversationID", conversationID, "size", size)
|
|
|
|
|