Merge branch 'main' of github.com:openimsdk/open-im-server into refactor/parse-token

pull/2463/head
Monet Lee 1 year ago
commit e74fa5c7ae

@ -198,6 +198,7 @@ func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.
offlineUserIDs = append(offlineUserIDs, userID)
}
}
log.ZDebug(ctx, "GetConnsAndOnlinePush online cache", "sendID", msg.SendID, "recvID", msg.RecvID, "groupID", msg.GroupID, "sessionType", msg.SessionType, "clientMsgID", msg.ClientMsgID, "serverMsgID", msg.ServerMsgID, "offlineUserIDs", offlineUserIDs, "onlineUserIDs", onlineUserIDs)
var result []*msggateway.SingleMsgToUserResults
if len(onlineUserIDs) > 0 {
var err error

@ -5,6 +5,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
"strconv"
"time"
@ -82,8 +83,11 @@ func (s *userOnline) SetUserOnline(ctx context.Context, userID string, online, o
argv = append(argv, platformID)
}
keys := []string{s.getUserOnlineKey(userID), userID, s.channelName}
if err := s.rdb.Eval(ctx, script, keys, argv).Err(); err != nil {
status, err := s.rdb.Eval(ctx, script, keys, argv).Result()
if err != nil {
log.ZError(ctx, "redis SetUserOnline", err, "userID", userID, "online", online, "offline", offline)
return err
}
log.ZDebug(ctx, "redis SetUserOnline", "userID", userID, "online", online, "offline", offline, "status", status)
return nil
}

@ -28,7 +28,7 @@ func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb re
for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() {
userID, platformIDs, err := useronline.ParseUserOnlineStatus(message.Payload)
if err != nil {
log.ZError(ctx, "OnlineCache redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel)
log.ZError(ctx, "OnlineCache setUserOnline redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel)
continue
}
storageCache := x.setUserOnline(userID, platformIDs)
@ -48,9 +48,15 @@ type OnlineCache struct {
}
func (o *OnlineCache) GetUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) {
return o.local.Get(userID, func() ([]int32, error) {
platformIDs, err := o.local.Get(userID, func() ([]int32, error) {
return o.user.GetUserOnlinePlatform(ctx, userID)
})
if err != nil {
log.ZError(ctx, "OnlineCache GetUserOnlinePlatform", err, "userID", userID)
return nil, err
}
log.ZDebug(ctx, "OnlineCache GetUserOnlinePlatform", "userID", userID, "platformIDs", platformIDs)
return platformIDs, nil
}
func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, error) {
@ -61,39 +67,39 @@ func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, e
return len(platformIDs) > 0, nil
}
func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, error) {
onlineUserIDs := make([]string, 0, len(userIDs))
for _, userID := range userIDs {
online, err := o.GetUserOnline(ctx, userID)
if err != nil {
return nil, err
}
if online {
onlineUserIDs = append(onlineUserIDs, userID)
}
}
log.ZDebug(ctx, "OnlineCache GetUsersOnline", "userIDs", userIDs, "onlineUserIDs", onlineUserIDs)
return onlineUserIDs, nil
}
func (o *OnlineCache) GetGroupOnline(ctx context.Context, groupID string) ([]string, error) {
userIDs, err := o.group.GetGroupMemberIDs(ctx, groupID)
if err != nil {
return nil, err
}
var onlineUserIDs []string
for _, userID := range userIDs {
online, err := o.GetUserOnline(ctx, userID)
if err != nil {
return nil, err
}
if online {
onlineUserIDs = append(onlineUserIDs, userID)
}
}
log.ZDebug(ctx, "OnlineCache GetGroupOnline", "groupID", groupID, "onlineUserIDs", onlineUserIDs, "allUserID", userIDs)
return onlineUserIDs, nil
}
//func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, error) {
// onlineUserIDs := make([]string, 0, len(userIDs))
// for _, userID := range userIDs {
// online, err := o.GetUserOnline(ctx, userID)
// if err != nil {
// return nil, err
// }
// if online {
// onlineUserIDs = append(onlineUserIDs, userID)
// }
// }
// log.ZDebug(ctx, "OnlineCache GetUsersOnline", "userIDs", userIDs, "onlineUserIDs", onlineUserIDs)
// return onlineUserIDs, nil
//}
//
//func (o *OnlineCache) GetGroupOnline(ctx context.Context, groupID string) ([]string, error) {
// userIDs, err := o.group.GetGroupMemberIDs(ctx, groupID)
// if err != nil {
// return nil, err
// }
// var onlineUserIDs []string
// for _, userID := range userIDs {
// online, err := o.GetUserOnline(ctx, userID)
// if err != nil {
// return nil, err
// }
// if online {
// onlineUserIDs = append(onlineUserIDs, userID)
// }
// }
// log.ZDebug(ctx, "OnlineCache GetGroupOnline", "groupID", groupID, "onlineUserIDs", onlineUserIDs, "allUserID", userIDs)
// return onlineUserIDs, nil
//}
func (o *OnlineCache) setUserOnline(userID string, platformIDs []int32) bool {
return o.local.SetHas(userID, platformIDs)

Loading…
Cancel
Save