diff --git a/pkg/localcache/lru/lru_lazy.go b/pkg/localcache/lru/lru_lazy.go index 0c74f8343..b4e560c4d 100644 --- a/pkg/localcache/lru/lru_lazy.go +++ b/pkg/localcache/lru/lru_lazy.go @@ -156,6 +156,11 @@ func (x *LayLRU[K, V]) GetBatch(keys []K, fetchBatch func([]K) (map[K]V, error)) } func (x *LayLRU[K, V]) GetBatchs(keys []K, fetch func(keys []K) (map[K]V, error)) ([]V, error) { + var ( + err error + once sync.Once + ) + x.lock.Lock() res := make([]V, 0) queries := make([]K, 0) @@ -165,18 +170,28 @@ func (x *LayLRU[K, V]) GetBatchs(keys []K, fetch func(keys []K) (map[K]V, error) if ok { x.lock.Unlock() v.lock.Lock() - expires, value, _ := v.expires, v.value, v.err + expires, value, err1 := v.expires, v.value, v.err if expires != 0 && expires > time.Now().UnixMilli() { v.lock.Unlock() x.target.IncrGetHit() res = append(res, value) + if err1 != nil { + once.Do(func() { + err = err1 + }) + } continue } } queries = append(queries, key) x.lock.Unlock() } - values, err := fetch(queries) + values, err1 := fetch(queries) + if err1 != nil { + once.Do(func() { + err = err1 + }) + } for key, val := range values { v := &layLruItem[V]{} v.value = val @@ -198,7 +213,7 @@ func (x *LayLRU[K, V]) GetBatchs(keys []K, fetch func(keys []K) (map[K]V, error) return res, err } -func (x *LayLRU[K, V]) SetBatch(data map[K]V) bool { +func (x *LayLRU[K, V]) SetBatch(data map[K]V) { x.lock.Lock() defer x.lock.Unlock() diff --git a/pkg/rpccache/online.go b/pkg/rpccache/online.go index 5fe8f97a1..2d83cf2cc 100644 --- a/pkg/rpccache/online.go +++ b/pkg/rpccache/online.go @@ -155,51 +155,50 @@ func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, e return len(platformIDs) > 0, nil } -func (o *OnlineCache) getUserOnlinePlatformBatch(ctx context.Context, userIDs []string) (map[string][]int32, error) { - platformIDsMap, err := o.lruCache.GetBatch(userIDs, func(missingUsers []string) (map[string][]int32, error) { - platformIDsMap := make(map[string][]int32) - - usersStatus, err := o.user.GetUsersOnlinePlatform(ctx, missingUsers) - if err != nil { - return nil, err - } - - for _, user := range usersStatus { - platformIDsMap[user.UserID] = user.PlatformIDs - } - - return platformIDsMap, nil - }) - if err != nil { - log.ZError(ctx, "OnlineCache GetUserOnlinePlatform", err, "userID", userIDs) - return nil, err - } - - //log.ZDebug(ctx, "OnlineCache GetUserOnlinePlatform", "userID", userID, "platformIDs", platformIDs) - return platformIDsMap, nil -} - +//func (o *OnlineCache) getUserOnlinePlatformBatch(ctx context.Context, userIDs []string) (map[string][]int32, error) { +// platformIDsMap, err := o.lruCache.GetBatch(userIDs, func(missingUsers []string) (map[string][]int32, error) { +// platformIDsMap := make(map[string][]int32) +// +// usersStatus, err := o.user.GetUsersOnlinePlatform(ctx, missingUsers) +// if err != nil { +// return nil, err +// } +// +// for _, user := range usersStatus { +// platformIDsMap[user.UserID] = user.PlatformIDs +// } +// +// return platformIDsMap, nil +// }) +// if err != nil { +// log.ZError(ctx, "OnlineCache GetUserOnlinePlatform", err, "userID", userIDs) +// return nil, err +// } +// +// //log.ZDebug(ctx, "OnlineCache GetUserOnlinePlatform", "userID", userID, "platformIDs", platformIDs) +// return platformIDsMap, nil +//} -func (o *OnlineCache) GetUsersOnline(ctx context.Context, usersID []string) ([]string, []string, error) { +func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, []string, error) { t := time.Now() var ( - onlineUserIDs []string - offlineUserIDs []string + onlineUserIDs = make([]string, 0, len(userIDs)) + offlineUserIDs = make([]string, 0, len(userIDs)) ) - - userOnlineMap, err := o.getUserOnlinePlatformBatch(ctx, userIDs) - if err != nil { - return nil, nil, err - } - - for key, value := range userOnlineMap { - if len(value) > 0 { - onlineUserIDs = append(onlineUserIDs, key) - } else { - offlineUserIDs = append(offlineUserIDs, key) - } - } + // + //userOnlineMap, err := o.getUserOnlinePlatformBatch(ctx, userIDs) + //if err != nil { + // return nil, nil, err + //} + // + //for key, value := range userOnlineMap { + // if len(value) > 0 { + // onlineUserIDs = append(onlineUserIDs, key) + // } else { + // offlineUserIDs = append(offlineUserIDs, key) + // } + //} switch o.fullUserCache { case true: @@ -213,8 +212,8 @@ func (o *OnlineCache) GetUsersOnline(ctx context.Context, usersID []string) ([]s case false: } - log.ZWarn(ctx, "get users online", nil, "online users length", len(onlineUserIDS), "offline users length", len(offlineUserIDs), "cost", time.Since(t)) - return onlineUserIDS, offlineUserIDs, nil + log.ZWarn(ctx, "get users online", nil, "online users length", len(userIDs), "offline users length", len(offlineUserIDs), "cost", time.Since(t)) + return userIDs, offlineUserIDs, nil } //func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, error) {