You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
212 lines
6.4 KiB
212 lines
6.4 KiB
// Copyright © 2023 OpenIM. All rights reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package redis
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/dtm-labs/rockscache"
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
|
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
|
|
"github.com/openimsdk/tools/errs"
|
|
"github.com/openimsdk/tools/log"
|
|
"github.com/openimsdk/tools/mw/specialerror"
|
|
"github.com/openimsdk/tools/utils/datautil"
|
|
"github.com/redis/go-redis/v9"
|
|
"time"
|
|
)
|
|
|
|
// BatchDeleterRedis is a concrete implementation of the BatchDeleter interface based on Redis and RocksCache.
|
|
type BatchDeleterRedis struct {
|
|
redisClient redis.UniversalClient
|
|
keys []string
|
|
rocksClient *rockscache.Client
|
|
redisPubTopics []string
|
|
}
|
|
|
|
// NewBatchDeleterRedis creates a new BatchDeleterRedis instance.
|
|
func NewBatchDeleterRedis(redisClient redis.UniversalClient, options *rockscache.Options, redisPubTopics []string) *BatchDeleterRedis {
|
|
return &BatchDeleterRedis{
|
|
redisClient: redisClient,
|
|
rocksClient: rockscache.NewClient(redisClient, *options),
|
|
redisPubTopics: redisPubTopics,
|
|
}
|
|
}
|
|
|
|
// ExecDelWithKeys directly takes keys for batch deletion and publishes deletion information.
|
|
func (c *BatchDeleterRedis) ExecDelWithKeys(ctx context.Context, keys []string) error {
|
|
distinctKeys := datautil.Distinct(keys)
|
|
return c.execDel(ctx, distinctKeys)
|
|
}
|
|
|
|
// ChainExecDel is used for chain calls for batch deletion. It must call Clone to prevent memory pollution.
|
|
func (c *BatchDeleterRedis) ChainExecDel(ctx context.Context) error {
|
|
distinctKeys := datautil.Distinct(c.keys)
|
|
return c.execDel(ctx, distinctKeys)
|
|
}
|
|
|
|
// execDel performs batch deletion and publishes the keys that have been deleted to update the local cache information of other nodes.
|
|
func (c *BatchDeleterRedis) execDel(ctx context.Context, keys []string) error {
|
|
if len(keys) > 0 {
|
|
log.ZDebug(ctx, "delete cache", "topic", c.redisPubTopics, "keys", keys)
|
|
slotMapKeys, err := groupKeysBySlot(ctx, c.redisClient, keys)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Batch delete keys
|
|
for slot, singleSlotKeys := range slotMapKeys {
|
|
if err := c.rocksClient.TagAsDeletedBatch2(ctx, singleSlotKeys); err != nil {
|
|
log.ZWarn(ctx, "Batch delete cache failed", err, "slot", slot, "keys", singleSlotKeys)
|
|
continue
|
|
}
|
|
}
|
|
// Publish the keys that have been deleted to Redis to update the local cache information of other nodes
|
|
if len(c.redisPubTopics) > 0 && len(keys) > 0 {
|
|
keysByTopic := localcache.GetPublishKeysByTopic(c.redisPubTopics, keys)
|
|
for topic, keys := range keysByTopic {
|
|
if len(keys) > 0 {
|
|
data, err := json.Marshal(keys)
|
|
if err != nil {
|
|
log.ZWarn(ctx, "keys json marshal failed", err, "topic", topic, "keys", keys)
|
|
} else {
|
|
if err := c.redisClient.Publish(ctx, topic, string(data)).Err(); err != nil {
|
|
log.ZWarn(ctx, "redis publish cache delete error", err, "topic", topic, "keys", keys)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Clone creates a copy of BatchDeleterRedis for chain calls to prevent memory pollution.
|
|
func (c *BatchDeleterRedis) Clone() cache.BatchDeleter {
|
|
return &BatchDeleterRedis{
|
|
redisClient: c.redisClient,
|
|
keys: c.keys,
|
|
rocksClient: c.rocksClient,
|
|
redisPubTopics: c.redisPubTopics,
|
|
}
|
|
}
|
|
|
|
// AddKeys adds keys to be deleted.
|
|
func (c *BatchDeleterRedis) AddKeys(keys ...string) {
|
|
c.keys = append(c.keys, keys...)
|
|
}
|
|
|
|
// GetRocksCacheOptions returns the default configuration options for RocksCache.
|
|
func GetRocksCacheOptions() *rockscache.Options {
|
|
opts := rockscache.NewDefaultOptions()
|
|
opts.StrongConsistency = true
|
|
opts.RandomExpireAdjustment = 0.2
|
|
|
|
return &opts
|
|
}
|
|
|
|
// groupKeysBySlot groups keys by their Redis cluster hash slots.
|
|
func groupKeysBySlot(ctx context.Context, redisClient redis.UniversalClient, keys []string) (map[int64][]string, error) {
|
|
slots := make(map[int64][]string)
|
|
clusterClient, isCluster := redisClient.(*redis.ClusterClient)
|
|
if isCluster {
|
|
pipe := clusterClient.Pipeline()
|
|
cmds := make([]*redis.IntCmd, len(keys))
|
|
for i, key := range keys {
|
|
cmds[i] = pipe.ClusterKeySlot(ctx, key)
|
|
}
|
|
_, err := pipe.Exec(ctx)
|
|
if err != nil {
|
|
return nil, errs.WrapMsg(err, "get slot err")
|
|
}
|
|
|
|
for i, cmd := range cmds {
|
|
slot, err := cmd.Result()
|
|
if err != nil {
|
|
log.ZWarn(ctx, "some key get slot err", err, "key", keys[i])
|
|
continue
|
|
}
|
|
slots[slot] = append(slots[slot], keys[i])
|
|
}
|
|
} else {
|
|
// If not a cluster client, put all keys in the same slot (0)
|
|
slots[0] = keys
|
|
}
|
|
|
|
return slots, nil
|
|
}
|
|
|
|
func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) {
|
|
var t T
|
|
var write bool
|
|
v, err := rcClient.Fetch2(ctx, key, expire, func() (s string, err error) {
|
|
t, err = fn(ctx)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
bs, err := json.Marshal(t)
|
|
if err != nil {
|
|
return "", errs.WrapMsg(err, "marshal failed")
|
|
}
|
|
write = true
|
|
|
|
return string(bs), nil
|
|
})
|
|
if err != nil {
|
|
return t, errs.Wrap(err)
|
|
}
|
|
if write {
|
|
return t, nil
|
|
}
|
|
if v == "" {
|
|
return t, errs.ErrRecordNotFound.WrapMsg("cache is not found")
|
|
}
|
|
err = json.Unmarshal([]byte(v), &t)
|
|
if err != nil {
|
|
errInfo := fmt.Sprintf("cache json.Unmarshal failed, key:%s, value:%s, expire:%s", key, v, expire)
|
|
return t, errs.WrapMsg(err, errInfo)
|
|
}
|
|
|
|
return t, nil
|
|
}
|
|
|
|
func batchGetCache[T any, K comparable](
|
|
ctx context.Context,
|
|
rcClient *rockscache.Client,
|
|
expire time.Duration,
|
|
keys []K,
|
|
keyFn func(key K) string,
|
|
fns func(ctx context.Context, key K) (T, error),
|
|
) ([]T, error) {
|
|
if len(keys) == 0 {
|
|
return nil, nil
|
|
}
|
|
res := make([]T, 0, len(keys))
|
|
for _, key := range keys {
|
|
val, err := getCache(ctx, rcClient, keyFn(key), expire, func(ctx context.Context) (T, error) {
|
|
return fns(ctx, key)
|
|
})
|
|
if err != nil {
|
|
if errs.ErrRecordNotFound.Is(specialerror.ErrCode(errs.Unwrap(err))) {
|
|
continue
|
|
}
|
|
return nil, errs.Wrap(err)
|
|
}
|
|
res = append(res, val)
|
|
}
|
|
|
|
return res, nil
|
|
}
|