diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 6d0ee8c67..849f537c4 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -173,7 +173,20 @@ func (c *msgCache) getSeqs(ctx context.Context, items []string, getkey func(s st } func (c *msgCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error { - return c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey) + var retErr error + for { + select { + case <-ctx.Done(): + return errs.Wrap(retErr, "SetMaxSeq redis retry too many amount") + default: + retErr = c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey) + if retErr != nil { + time.Sleep(time.Second * 2) + continue + } + return nil + } + } } func (c *msgCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m map[string]int64, err error) { @@ -181,7 +194,21 @@ func (c *msgCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m } func (c *msgCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { - return c.getSeq(ctx, conversationID, c.getMaxSeqKey) + var retErr error + var retData int64 + for { + select { + case <-ctx.Done(): + return -1, errs.Wrap(retErr, "GetMaxSeq redis retry too many amount") + default: + retData, retErr = c.getSeq(ctx, conversationID, c.getMaxSeqKey) + if retErr != nil && errs.Unwrap(retErr) != redis.Nil { + time.Sleep(time.Second * 2) + continue + } + return retData, retErr + } + } } func (c *msgCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index f6f48cb02..cba0a6bbd 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -357,7 +357,9 @@ func (db *commonMsgDatabase) DelUserDeleteMsgsList(ctx context.Context, conversa } func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { - currentMaxSeq, err := db.cache.GetMaxSeq(ctx, conversationID) + cancelCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) + defer cancel() + currentMaxSeq, err := db.cache.GetMaxSeq(cancelCtx, conversationID) if err != nil && errs.Unwrap(err) != redis.Nil { log.ZError(ctx, "db.cache.GetMaxSeq", err) return 0, false, err @@ -386,7 +388,9 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa } else { prommetrics.MsgInsertRedisSuccessCounter.Add(float64(len(msgs))) } - err = db.cache.SetMaxSeq(ctx, conversationID, currentMaxSeq) + cancelCtx, cancel = context.WithTimeout(ctx, 1*time.Minute) + defer cancel() + err = db.cache.SetMaxSeq(cancelCtx, conversationID, currentMaxSeq) if err != nil { log.ZError(ctx, "db.cache.SetMaxSeq error", err, "conversationID", conversationID) prommetrics.SeqSetFailedCounter.Inc()