From c667795285e541b0963cda931f6905b3e28e0eea Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Thu, 5 Sep 2024 18:47:54 +0800 Subject: [PATCH] feat: implement Getbatch in rpcCache. --- internal/rpc/user/online.go | 1 + pkg/localcache/lru/lru.go | 1 + pkg/localcache/lru/lru_lazy.go | 76 ++++++++++++++++++++++++++++++---- pkg/rpccache/online.go | 65 +++++++++++++++++++++++++---- 4 files changed, 128 insertions(+), 15 deletions(-) diff --git a/internal/rpc/user/online.go b/internal/rpc/user/online.go index 99b272006..5ac4085a9 100644 --- a/internal/rpc/user/online.go +++ b/internal/rpc/user/online.go @@ -2,6 +2,7 @@ package user import ( "context" + "github.com/openimsdk/protocol/constant" pbuser "github.com/openimsdk/protocol/user" ) diff --git a/pkg/localcache/lru/lru.go b/pkg/localcache/lru/lru.go index 2fedffc48..5182bed5f 100644 --- a/pkg/localcache/lru/lru.go +++ b/pkg/localcache/lru/lru.go @@ -20,6 +20,7 @@ type EvictCallback[K comparable, V any] simplelru.EvictCallback[K, V] type LRU[K comparable, V any] interface { Get(key K, fetch func() (V, error)) (V, error) + GetBatch(key []K, fetchBatch func([]K) (map[string]V, error)) (map[string]V, error) SetHas(key K, value V) bool Del(key K) bool Stop() diff --git a/pkg/localcache/lru/lru_lazy.go b/pkg/localcache/lru/lru_lazy.go index 430778b87..ce3f7cff6 100644 --- a/pkg/localcache/lru/lru_lazy.go +++ b/pkg/localcache/lru/lru_lazy.go @@ -15,10 +15,14 @@ package lru import ( + "context" "sync" "time" "github.com/hashicorp/golang-lru/v2/simplelru" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/utils/datautil" ) type layLruItem[V any] struct { @@ -88,20 +92,76 @@ func (x *LayLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { return v.value, v.err } -func (x *LayLRU[K, V]) GetBatch(keys []K, fetch func() ([]V, error)) ([]V, error) { - return nil, nil +func (x *LayLRU[K, V]) GetBatch(keys []K, fetchBatch func([]K) (map[K]V, error)) (map[K]V, error) { + ctx := context.Background() + resultMap := make(map[K]V) + // errorMap := make(map[K]error) + missingKeys := []K{} + lazyLruItemMap := make(map[K]*layLruItem[V]) + + for _, key := range keys { + x.lock.Lock() + v, ok := x.core.Get(key) + lazyLruItemMap[key] = v + if ok { + x.lock.Unlock() + v.lock.Lock() + expires, value, err := v.expires, v.value, v.err + if expires != 0 && expires > time.Now().UnixMilli() { + v.lock.Unlock() + resultMap[key] = value + x.target.IncrGetHit() + } else { + missingKeys = append(missingKeys, key) + v.lock.Unlock() + } + if err != nil { + log.ZWarn(ctx, "Get Local LRU is failed.", errs.Wrap(err)) + } + continue + } else { + // initialize and insert new lazyLruItem + v = &layLruItem[V]{} + lazyLruItemMap[key] = v + x.core.Add(key, v) + v.lock.Lock() + missingKeys = append(missingKeys, key) + x.lock.Unlock() + } + defer v.lock.Unlock() + } + + x.lock.Unlock() + + // Fetch missing Key + if len(missingKeys) > 0 { + failedKey := missingKeys + fetchMap, err := fetchBatch(missingKeys) + if err != nil { + log.ZWarn(ctx, "fetch Key is failed.", errs.Wrap(err)) + } + + for key, value := range fetchMap { + resultMap[key] = value + lazyLruItemMap[key].expires = time.Now().Add(x.successTTL).UnixMilli() + x.target.IncrGetSuccess() + failedKey = datautil.DeleteElems(failedKey, key) + } + for _, key := range failedKey { + lazyLruItemMap[key].expires = time.Now().Add(x.failedTTL).UnixMilli() + x.target.IncrGetFailed() + } + } + return resultMap, nil } -func (x *LayLRU[K, V]) SetHasBatch(data map[K]V) bool { +func (x *LayLRU[K, V]) SetHasBatch(data map[K]V) { x.lock.Lock() defer x.lock.Unlock() + for key, value := range data { - if x.core.Contains(key) { - x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()}) - return true - } + x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()}) } - return false } //func (x *LayLRU[K, V]) Set(key K, value V) { diff --git a/pkg/rpccache/online.go b/pkg/rpccache/online.go index 9598299d9..e767b9a92 100644 --- a/pkg/rpccache/online.go +++ b/pkg/rpccache/online.go @@ -2,6 +2,10 @@ package rpccache import ( "context" + "math/rand" + "strconv" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/open-im-server/v3/pkg/localcache/lru" @@ -10,9 +14,6 @@ import ( "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "github.com/redis/go-redis/v9" - "math/rand" - "strconv" - "time" ) func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb redis.UniversalClient, fn func(ctx context.Context, userID string, platformIDs []int32)) *OnlineCache { @@ -69,6 +70,16 @@ func (o *OnlineCache) GetUserOnlinePlatform(ctx context.Context, userID string) return platformIDs, nil } +// func (o *OnlineCache) GetUserOnlinePlatformBatch(ctx context.Context, userIDs []string) (map[string]int32, error) { +// platformIDs, err := o.getUserOnlinePlatform(ctx, userIDs) +// if err != nil { +// return nil, err +// } +// tmp := make([]int32, len(platformIDs)) +// copy(tmp, platformIDs) +// return platformIDs, nil +// } + func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, error) { platformIDs, err := o.getUserOnlinePlatform(ctx, userID) if err != nil { @@ -77,19 +88,59 @@ func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, e return len(platformIDs) > 0, nil } -func (o *OnlineCache) GetUsersOnline(ctx context.Context, usersID []string) ([]string, []string, error) { +// ---------------------- + +func (o *OnlineCache) getUserOnlinePlatformBatch(ctx context.Context, userIDs []string) (map[string][]int32, error) { + platformIDsMap, err := o.local.GetBatch(userIDs, func(missingUsers []string) (map[string][]int32, error) { + platformIDsMap := make(map[string][]int32) + + usersStatus, err := o.user.GetUsersOnlinePlatform(ctx, missingUsers) + if err != nil { + return nil, err + } + + for _, user := range usersStatus { + platformIDsMap[user.UserID] = user.PlatformIDs + } + + return platformIDsMap, nil + }) + if err != nil { + log.ZError(ctx, "OnlineCache GetUserOnlinePlatform", err, "userID", userIDs) + return nil, err + } + + //log.ZDebug(ctx, "OnlineCache GetUserOnlinePlatform", "userID", userID, "platformIDs", platformIDs) + return platformIDsMap, nil +} + +// Finalllllllllllllllllllllllllll +func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, []string, error) { var ( - onlineUserIDS []string + onlineUserIDs []string offlineUserIDs []string ) - return onlineUserIDS, offlineUserIDs, nil + userOnlineMap, err := o.getUserOnlinePlatformBatch(ctx, userIDs) + if err != nil { + return nil, nil, err + } + + for key, value := range userOnlineMap { + if len(value) > 0 { + onlineUserIDs = append(onlineUserIDs, key) + } else { + offlineUserIDs = append(offlineUserIDs, key) + } + } + + return onlineUserIDs, offlineUserIDs, nil } //func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, error) { // onlineUserIDs := make([]string, 0, len(userIDs)) // for _, userID := range userIDs { -// online, err := o.GetUserOnline(ctx, userID) +// online, err := o.GetUserOnline(ctx, userID) // if err != nil { // return nil, err // }