feat: batch get user online

pull/2608/head
icey-yu 1 year ago
parent 16fd28f39f
commit a141eefc5e

@ -197,7 +197,6 @@ func (c *ConsumerHandler) shouldPushOffline(_ context.Context, msg *sdkws.MsgDat
} }
func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) ([]*msggateway.SingleMsgToUserResults, error) { func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) ([]*msggateway.SingleMsgToUserResults, error) {
onlineUserIDs, offlineUserIDs, err := c.onlineCache.GetUsersOnline(ctx, pushToUserIDs) onlineUserIDs, offlineUserIDs, err := c.onlineCache.GetUsersOnline(ctx, pushToUserIDs)
if err != nil { if err != nil {
return nil, err return nil, err

@ -22,6 +22,7 @@ type LRU[K comparable, V any] interface {
Get(key K, fetch func() (V, error)) (V, error) Get(key K, fetch func() (V, error)) (V, error)
Set(key K, value V) Set(key K, value V)
SetHas(key K, value V) bool SetHas(key K, value V) bool
GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error)
Del(key K) bool Del(key K) bool
Stop() Stop()
} }

@ -51,6 +51,11 @@ type ExpirationLRU[K comparable, V any] struct {
target Target target Target
} }
func (x *ExpirationLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) {
//TODO implement me
panic("implement me")
}
func (x *ExpirationLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { func (x *ExpirationLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
x.lock.Lock() x.lock.Lock()
v, ok := x.core.Get(key) v, ok := x.core.Get(key)

@ -15,14 +15,10 @@
package lru package lru
import ( import (
"context"
"sync" "sync"
"time" "time"
"github.com/hashicorp/golang-lru/v2/simplelru" "github.com/hashicorp/golang-lru/v2/simplelru"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
) )
type layLruItem[V any] struct { type layLruItem[V any] struct {
@ -92,77 +88,14 @@ func (x *LayLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
return v.value, v.err return v.value, v.err
} }
func (x *LayLRU[K, V]) GetBatch(keys []K, fetchBatch func([]K) (map[K]V, error)) (map[K]V, error) { func (x *LayLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) {
ctx := context.Background()
resultMap := make(map[K]V)
// errorMap := make(map[K]error)
missingKeys := []K{}
lazyLruItemMap := make(map[K]*layLruItem[V])
for _, key := range keys {
x.lock.Lock()
v, ok := x.core.Get(key)
lazyLruItemMap[key] = v
if ok {
x.lock.Unlock()
v.lock.Lock()
expires, value, err := v.expires, v.value, v.err
if expires != 0 && expires > time.Now().UnixMilli() {
v.lock.Unlock()
resultMap[key] = value
x.target.IncrGetHit()
} else {
missingKeys = append(missingKeys, key)
v.lock.Unlock()
}
if err != nil {
log.ZWarn(ctx, "Get Local LRU is failed.", errs.Wrap(err))
}
continue
} else {
// initialize and insert new lazyLruItem
v = &layLruItem[V]{}
lazyLruItemMap[key] = v
x.core.Add(key, v)
v.lock.Lock()
missingKeys = append(missingKeys, key)
x.lock.Unlock()
}
defer v.lock.Unlock()
}
x.lock.Unlock()
// Fetch missing Key
if len(missingKeys) > 0 {
failedKey := missingKeys
fetchMap, err := fetchBatch(missingKeys)
if err != nil {
log.ZWarn(ctx, "fetch Key is failed.", errs.Wrap(err))
}
for key, value := range fetchMap {
resultMap[key] = value
lazyLruItemMap[key].expires = time.Now().Add(x.successTTL).UnixMilli()
x.target.IncrGetSuccess()
failedKey = datautil.DeleteElems(failedKey, key)
}
for _, key := range failedKey {
lazyLruItemMap[key].expires = time.Now().Add(x.failedTTL).UnixMilli()
x.target.IncrGetFailed()
}
}
return resultMap, nil
}
func (x *LayLRU[K, V]) GetBatchs(keys []K, fetch func(keys []K) (map[K]V, error)) ([]V, error) {
var ( var (
err error err error
once sync.Once once sync.Once
) )
x.lock.Lock() x.lock.Lock()
res := make([]V, 0) res := make(map[K]V)
queries := make([]K, 0) queries := make([]K, 0)
setVs := make(map[K]*layLruItem[V]) setVs := make(map[K]*layLruItem[V])
for _, key := range keys { for _, key := range keys {
@ -174,7 +107,7 @@ func (x *LayLRU[K, V]) GetBatchs(keys []K, fetch func(keys []K) (map[K]V, error)
if expires != 0 && expires > time.Now().UnixMilli() { if expires != 0 && expires > time.Now().UnixMilli() {
v.lock.Unlock() v.lock.Unlock()
x.target.IncrGetHit() x.target.IncrGetHit()
res = append(res, value) res[key] = value
if err1 != nil { if err1 != nil {
once.Do(func() { once.Do(func() {
err = err1 err = err1
@ -207,27 +140,12 @@ func (x *LayLRU[K, V]) GetBatchs(keys []K, fetch func(keys []K) (map[K]V, error)
x.lock.Lock() x.lock.Lock()
x.core.Add(key, v) x.core.Add(key, v)
x.lock.Unlock() x.lock.Unlock()
res = append(res, val) res[key] = val
} }
return res, err return res, err
} }
func (x *LayLRU[K, V]) SetBatch(data map[K]V) {
x.lock.Lock()
defer x.lock.Unlock()
for key, value := range data {
x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()})
}
}
//func (x *LayLRU[K, V]) Set(key K, value V) {
// x.lock.Lock()
// x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()})
// x.lock.Unlock()
//}
//
//func (x *LayLRU[K, V]) Has(key K) bool { //func (x *LayLRU[K, V]) Has(key K) bool {
// x.lock.Lock() // x.lock.Lock()
// defer x.lock.Unlock() // defer x.lock.Unlock()

@ -32,6 +32,29 @@ type slotLRU[K comparable, V any] struct {
hash func(k K) uint64 hash func(k K) uint64
} }
func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) {
var (
slotKeys = make(map[uint64][]K)
vs = make(map[K]V)
)
for _, k := range keys {
index := x.getIndex(k)
slotKeys[index] = append(slotKeys[index], k)
}
for k, v := range slotKeys {
batches, err := x.slots[k].GetBatch(v, fetch)
if err != nil {
return nil, err
}
for key, value := range batches {
vs[key] = value
}
}
return vs, nil
}
func (x *slotLRU[K, V]) getIndex(k K) uint64 { func (x *slotLRU[K, V]) getIndex(k K) uint64 {
return x.hash(k) % x.n return x.hash(k) % x.n
} }

@ -205,29 +205,27 @@ func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, e
return len(platformIDs) > 0, nil return len(platformIDs) > 0, nil
} }
//func (o *OnlineCache) getUserOnlinePlatformBatch(ctx context.Context, userIDs []string) (map[string][]int32, error) { func (o *OnlineCache) getUserOnlinePlatformBatch(ctx context.Context, userIDs []string) (map[string][]int32, error) {
// platformIDsMap, err := o.lruCache.GetBatch(userIDs, func(missingUsers []string) (map[string][]int32, error) { platformIDsMap, err := o.lruCache.GetBatch(userIDs, func(missingUsers []string) (map[string][]int32, error) {
// platformIDsMap := make(map[string][]int32) platformIDsMap := make(map[string][]int32)
//
// usersStatus, err := o.user.GetUsersOnlinePlatform(ctx, missingUsers) usersStatus, err := o.user.GetUsersOnlinePlatform(ctx, missingUsers)
// if err != nil { if err != nil {
// return nil, err return nil, err
// } }
//
// for _, user := range usersStatus { for _, u := range usersStatus {
// platformIDsMap[user.UserID] = user.PlatformIDs platformIDsMap[u.UserID] = u.PlatformIDs
// } }
//
// return platformIDsMap, nil return platformIDsMap, nil
// }) })
// if err != nil { if err != nil {
// log.ZError(ctx, "OnlineCache GetUserOnlinePlatform", err, "userID", userIDs) log.ZError(ctx, "OnlineCache GetUserOnlinePlatform", err, "userID", userIDs)
// return nil, err return nil, err
// } }
// return platformIDsMap, nil
// //log.ZDebug(ctx, "OnlineCache GetUserOnlinePlatform", "userID", userID, "platformIDs", platformIDs) }
// return platformIDsMap, nil
//}
func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, []string, error) { func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, []string, error) {
t := time.Now() t := time.Now()
@ -237,19 +235,6 @@ func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]s
offlineUserIDs = make([]string, 0, len(userIDs)) offlineUserIDs = make([]string, 0, len(userIDs))
) )
//userOnlineMap, err := o.getUserOnlinePlatformBatch(ctx, userIDs)
//if err != nil {
// return nil, nil, err
//}
//
//for key, value := range userOnlineMap {
// if len(value) > 0 {
// onlineUserIDs = append(onlineUserIDs, key)
// } else {
// offlineUserIDs = append(offlineUserIDs, key)
// }
//}
switch o.fullUserCache { switch o.fullUserCache {
case true: case true:
for _, userID := range userIDs { for _, userID := range userIDs {
@ -260,6 +245,18 @@ func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]s
} }
} }
case false: case false:
userOnlineMap, err := o.getUserOnlinePlatformBatch(ctx, userIDs)
if err != nil {
return nil, nil, err
}
for key, value := range userOnlineMap {
if len(value) > 0 {
onlineUserIDs = append(onlineUserIDs, key)
} else {
offlineUserIDs = append(offlineUserIDs, key)
}
}
} }
log.ZWarn(ctx, "get users online", nil, "online users length", len(userIDs), "offline users length", len(offlineUserIDs), "cost", time.Since(t)) log.ZWarn(ctx, "get users online", nil, "online users length", len(userIDs), "offline users length", len(offlineUserIDs), "cost", time.Since(t))

Loading…
Cancel
Save