diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 6d0ee8c67..282d1d1c1 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -645,19 +645,35 @@ func (c *msgCache) PipeDeleteMessages(ctx context.Context, conversationID string } func (c *msgCache) CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error { - vals, err := c.rdb.Keys(ctx, c.allMessageCacheKey(conversationID)).Result() - if errors.Is(err, redis.Nil) { - return nil - } - if err != nil { - return errs.Wrap(err) - } - for _, v := range vals { - if err := c.rdb.Del(ctx, v).Err(); err != nil { + var ( + cursor uint64 + keys []string + err error + + key = c.allMessageCacheKey(conversationID) + ) + + for { + // scan up to 10000 at a time, the count (10000) param refers to the number of scans on redis server. + // if the count is too small, needs to be run scan on redis frequently. + var limit int64 = 10000 + keys, cursor, err = c.rdb.Scan(ctx, cursor, key, limit).Result() + if err != nil { return errs.Wrap(err) } + + for _, key := range keys { + err := c.rdb.Del(ctx, key).Err() + if err != nil { + return errs.Wrap(err) + } + } + + // scan end + if cursor == 0 { + return nil + } } - return nil } func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error { diff --git a/pkg/common/db/cache/msg_test.go b/pkg/common/db/cache/msg_test.go index 3fddf5965..a5be018ed 100644 --- a/pkg/common/db/cache/msg_test.go +++ b/pkg/common/db/cache/msg_test.go @@ -385,3 +385,50 @@ func testParallelDeleteMessagesMix(t *testing.T, cid string, seqs []int64, input assert.EqualValues(t, 1, val) // exists } } + +func TestCleanUpOneConversationAllMsg(t *testing.T) { + rdb := redis.NewClient(&redis.Options{}) + defer rdb.Close() + + cacher := msgCache{rdb: rdb} + count := 1000 + prefix := fmt.Sprintf("%v", rand.Int63()) + + ids := []string{} + for i := 0; i < count; i++ { + id := fmt.Sprintf("%v-cid-%v", prefix, rand.Int63()) + ids = append(ids, id) + + key := cacher.allMessageCacheKey(id) + rdb.Set(context.Background(), key, "openim", 0) + } + + // delete 100 keys with scan. + for i := 0; i < 100; i++ { + pickedKey := ids[i] + err := cacher.CleanUpOneConversationAllMsg(context.Background(), pickedKey) + assert.Nil(t, err) + + ls, err := rdb.Keys(context.Background(), pickedKey).Result() + assert.Nil(t, err) + assert.Equal(t, 0, len(ls)) + + rcode, err := rdb.Exists(context.Background(), pickedKey).Result() + assert.Nil(t, err) + assert.EqualValues(t, 0, rcode) // non-exists + } + + sid := fmt.Sprintf("%v-cid-*", prefix) + ls, err := rdb.Keys(context.Background(), cacher.allMessageCacheKey(sid)).Result() + assert.Nil(t, err) + assert.Equal(t, count-100, len(ls)) + + // delete fuzzy matching keys. + err = cacher.CleanUpOneConversationAllMsg(context.Background(), sid) + assert.Nil(t, err) + + // don't contains keys matched `{prefix}-cid-{random}` on redis + ls, err = rdb.Keys(context.Background(), cacher.allMessageCacheKey(sid)).Result() + assert.Nil(t, err) + assert.Equal(t, 0, len(ls)) +}