@ -11,45 +11,112 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/localcache/lru"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/util/useronline"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/db/cacheutil"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/redis/go-redis/v9"
)
func NewOnlineCache ( user rpcclient . UserRpcClient , group * GroupLocalCache , rdb redis . UniversalClient , f n func ( ctx context . Context , userID string , platformIDs [ ] int32 ) ) * OnlineCache {
func NewOnlineCache ( user rpcclient . UserRpcClient , group * GroupLocalCache , rdb redis . UniversalClient , f ullUserCache bool , f n func ( ctx context . Context , userID string , platformIDs [ ] int32 ) ) ( * OnlineCache , error ) {
x := & OnlineCache {
user : user ,
group : group ,
local : lru . NewSlotLRU ( 1024 , localcache . LRUStringHash , func ( ) lru . LRU [ string , [ ] int32 ] {
fullUserCache : fullUserCache ,
}
switch x . fullUserCache {
case true :
x . mapCache = cacheutil . NewCache [ string , [ ] int32 ] ( )
if err := x . initUsersOnlineStatus ( mcontext . SetOperationID ( context . TODO ( ) , strconv . FormatInt ( time . Now ( ) . UnixNano ( ) + int64 ( rand . Uint32 ( ) ) , 10 ) ) ) ; err != nil {
return nil , err
}
case false :
x . lruCache = lru . NewSlotLRU ( 1024 , localcache . LRUStringHash , func ( ) lru . LRU [ string , [ ] int32 ] {
return lru . NewLayLRU [ string , [ ] int32 ] ( 2048 , cachekey . OnlineExpire / 2 , time . Second * 3 , localcache . EmptyTarget { } , func ( key string , value [ ] int32 ) { } )
} ) ,
} )
}
go func ( ) {
ctx := mcontext . SetOperationID ( context . Background ( ) , cachekey . OnlineChannel + strconv . FormatUint ( rand . Uint64 ( ) , 10 ) )
for message := range rdb . Subscribe ( ctx , cachekey . OnlineChannel ) . Channel ( ) {
userID , platformIDs , err := useronline . ParseUserOnlineStatus ( message . Payload )
if err != nil {
log . ZError ( ctx , "OnlineCache setUserOnline redis subscribe parseUserOnlineStatus" , err , "payload" , message . Payload , "channel" , message . Channel )
log . ZError ( ctx , "OnlineCache set Has UserOnline redis subscribe parseUserOnlineStatus", err , "payload" , message . Payload , "channel" , message . Channel )
continue
}
storageCache := x . setUserOnline ( userID , platformIDs )
log . ZDebug ( ctx , "OnlineCache setUserOnline" , "userID" , userID , "platformIDs" , platformIDs , "payload" , message . Payload , "storageCache" , storageCache )
switch x . fullUserCache {
case true :
if len ( platformIDs ) == 0 {
// offline
x . mapCache . Delete ( userID )
} else {
x . mapCache . Store ( userID , platformIDs )
}
case false :
storageCache := x . setHasUserOnline ( userID , platformIDs )
log . ZDebug ( ctx , "OnlineCache setHasUserOnline" , "userID" , userID , "platformIDs" , platformIDs , "payload" , message . Payload , "storageCache" , storageCache )
if fn != nil {
fn ( ctx , userID , platformIDs )
}
}
}
} ( )
return x
return x , nil
}
type OnlineCache struct {
user rpcclient . UserRpcClient
group * GroupLocalCache
local lru . LRU [ string , [ ] int32 ]
// fullUserCache if enabled, caches the online status of all users using mapCache;
// otherwise, only a portion of users' online statuses (regardless of whether they are online) will be cached using lruCache.
fullUserCache bool
lruCache lru . LRU [ string , [ ] int32 ]
mapCache * cacheutil . Cache [ string , [ ] int32 ]
}
func ( o * OnlineCache ) initUsersOnlineStatus ( ctx context . Context ) error {
log . ZDebug ( ctx , "init users online status begin" )
var (
totalSet int
)
defer func ( t time . Time ) {
log . ZWarn ( ctx , "init users online status end" , nil , "cost" , time . Since ( t ) , "totalSet" , totalSet )
} ( time . Now ( ) )
for page := int32 ( 1 ) ; ; page ++ {
resp , err := o . user . GetAllUserID ( ctx , page , constant . ParamMaxLength )
if err != nil {
return err
}
usersStatus , err := o . user . GetUsersOnlinePlatform ( ctx , resp . UserIDs )
if err != nil {
return err
}
for _ , user := range usersStatus {
if user . Status == constant . Online {
o . setUserOnline ( user . UserID , user . PlatformIDs )
}
totalSet ++
}
if len ( resp . UserIDs ) < constant . ParamMaxLength {
break
}
}
return nil
}
func ( o * OnlineCache ) getUserOnlinePlatform ( ctx context . Context , userID string ) ( [ ] int32 , error ) {
platformIDs , err := o . local . Get ( userID , func ( ) ( [ ] int32 , error ) {
platformIDs , err := o . l ruCache . Get ( userID , func ( ) ( [ ] int32 , error ) {
return o . user . GetUserOnlinePlatform ( ctx , userID )
} )
if err != nil {
@ -91,7 +158,7 @@ func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, e
// ----------------------
func ( o * OnlineCache ) getUserOnlinePlatformBatch ( ctx context . Context , userIDs [ ] string ) ( map [ string ] [ ] int32 , error ) {
platformIDsMap , err := o . l ocal . GetBatch ( userIDs , func ( missingUsers [ ] string ) ( map [ string ] [ ] int32 , error ) {
platformIDsMap , err := o . l ruCache . GetBatch ( userIDs , func ( missingUsers [ ] string ) ( map [ string ] [ ] int32 , error ) {
platformIDsMap := make ( map [ string ] [ ] int32 )
usersStatus , err := o . user . GetUsersOnlinePlatform ( ctx , missingUsers )
@ -134,6 +201,19 @@ func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]s
}
}
switch o . fullUserCache {
case true :
for _ , userID := range userIDs {
if _ , ok := o . mapCache . Load ( userID ) ; ok {
onlineUserIDs = append ( onlineUserIDs , userID )
} else {
offlineUserIDs = append ( offlineUserIDs , userID )
}
}
case false :
}
log . ZWarn ( ctx , "get users online" , nil , "online users length" , len ( onlineUserIDs ) , "offline users length" , len ( offlineUserIDs ) )
return onlineUserIDs , offlineUserIDs , nil
}
@ -171,6 +251,15 @@ func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]s
// return onlineUserIDs, nil
//}
func ( o * OnlineCache ) setUserOnline ( userID string , platformIDs [ ] int32 ) bool {
return o . local . SetHas ( userID , platformIDs )
func ( o * OnlineCache ) setUserOnline ( userID string , platformIDs [ ] int32 ) {
switch o . fullUserCache {
case true :
o . mapCache . Store ( userID , platformIDs )
case false :
o . lruCache . Set ( userID , platformIDs )
}
}
func ( o * OnlineCache ) setHasUserOnline ( userID string , platformIDs [ ] int32 ) bool {
return o . lruCache . SetHas ( userID , platformIDs )
}