You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Open-IM-Server/pkg/common/db/cache/rockscache.go

169 lines
3.9 KiB

2 years ago
package cache
2 years ago
import (
"context"
2 years ago
"encoding/json"
2 years ago
"errors"
2 years ago
2 years ago
"time"
2 years ago
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
2 years ago
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
2 years ago
"github.com/dtm-labs/rockscache"
2 years ago
)
2 years ago
const scanCount = 3000
2 years ago
2 years ago
var errIndex = errors.New("err index")
type metaCache interface {
ExecDel(ctx context.Context) error
// delete key rapid
2 years ago
DelKey(ctx context.Context, key string) error
2 years ago
AddKeys(keys ...string)
2 years ago
ClearKeys()
GetPreDelKeys() []string
2 years ago
}
2 years ago
func NewMetaCacheRedis(rcClient *rockscache.Client, keys ...string) metaCache {
return &metaCacheRedis{rcClient: rcClient, keys: keys}
2 years ago
}
type metaCacheRedis struct {
rcClient *rockscache.Client
keys []string
}
func (m *metaCacheRedis) ExecDel(ctx context.Context) error {
if len(m.keys) > 0 {
2 years ago
log.ZDebug(ctx, "DelKey", "keys", m.keys)
2 years ago
return m.rcClient.TagAsDeletedBatch2(ctx, m.keys)
}
return nil
}
2 years ago
func (m *metaCacheRedis) DelKey(ctx context.Context, key string) error {
2 years ago
return m.rcClient.TagAsDeleted2(ctx, key)
}
func (m *metaCacheRedis) AddKeys(keys ...string) {
m.keys = append(m.keys, keys...)
}
2 years ago
func (m *metaCacheRedis) ClearKeys() {
m.keys = []string{}
}
func (m *metaCacheRedis) GetPreDelKeys() []string {
2 years ago
return m.keys
}
2 years ago
func GetDefaultOpt() rockscache.Options {
opts := rockscache.NewDefaultOptions()
opts.StrongConsistency = true
opts.RandomExpireAdjustment = 0.2
return opts
}
2 years ago
2 years ago
func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) {
2 years ago
var t T
var write bool
2 years ago
v, err := rcClient.Fetch2(ctx, key, expire, func() (s string, err error) {
2 years ago
t, err = fn(ctx)
if err != nil {
return "", err
}
bs, err := json.Marshal(t)
if err != nil {
return "", utils.Wrap(err, "")
}
write = true
return string(bs), nil
})
if err != nil {
return t, err
}
if write {
return t, nil
}
err = json.Unmarshal([]byte(v), &t)
if err != nil {
return t, utils.Wrap(err, "")
}
return t, nil
}
2 years ago
func batchGetCache[T any](ctx context.Context, rcClient *rockscache.Client, keys []string, expire time.Duration, keyIndexFn func(t T, keys []string) (int, error), fn func(ctx context.Context) ([]T, error)) ([]T, error) {
batchMap, err := rcClient.FetchBatch2(ctx, keys, expire, func(idxs []int) (m map[int]string, err error) {
values := make(map[int]string)
2 years ago
tArrays, err := fn(ctx)
2 years ago
if err != nil {
return nil, err
}
2 years ago
for _, v := range tArrays {
index, err := keyIndexFn(v, keys)
if err != nil {
continue
}
bs, err := json.Marshal(v)
if err != nil {
return nil, utils.Wrap(err, "marshal failed")
}
values[index] = string(bs)
}
return values, nil
})
if err != nil {
return nil, err
}
2 years ago
var tArrays []T
2 years ago
for _, v := range batchMap {
2 years ago
if v != "" {
var t T
err = json.Unmarshal([]byte(v), &t)
if err != nil {
return nil, utils.Wrap(err, "unmarshal failed")
}
tArrays = append(tArrays, t)
2 years ago
}
2 years ago
}
2 years ago
return tArrays, nil
2 years ago
}
2 years ago
func batchGetCacheMap[T any](ctx context.Context, rcClient *rockscache.Client, keys []string, originKeys []string, expire time.Duration, keyIndexFn func(t T, keys []string) (int, error), fn func(ctx context.Context) (map[string]T, error)) (map[string]T, error) {
batchMap, err := rcClient.FetchBatch2(ctx, keys, expire, func(idxs []int) (m map[int]string, err error) {
values := make(map[int]string)
tArrays, err := fn(ctx)
if err != nil {
return nil, err
}
for _, v := range tArrays {
index, err := keyIndexFn(v, keys)
if err != nil {
continue
}
bs, err := json.Marshal(v)
if err != nil {
return nil, utils.Wrap(err, "marshal failed")
}
values[index] = string(bs)
}
return values, nil
})
if err != nil {
return nil, err
}
tMap := make(map[string]T)
for i, v := range batchMap {
if v != "" {
var t T
err = json.Unmarshal([]byte(v), &t)
if err != nil {
return nil, utils.Wrap(err, "unmarshal failed")
}
tMap[keys[i]] = t
}
}
return tMap, nil
}