From c1967a63caf1af009c3312c407f667c87e093c5a Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Fri, 5 Jul 2024 18:47:48 +0800 Subject: [PATCH] online status --- internal/msggateway/init.go | 8 ++++++++ internal/msggateway/subscription.go | 25 ++++++++++++++++++++++++ internal/msggateway/user_map.go | 30 +++++++++++++++++++++++++++++ pkg/common/cmd/msg_gateway.go | 1 + pkg/rpccache/online.go | 24 ++--------------------- pkg/util/useronline/split.go | 27 ++++++++++++++++++++++++++ 6 files changed, 93 insertions(+), 22 deletions(-) create mode 100644 pkg/util/useronline/split.go diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 815ec8ca6..739a71ecb 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -17,6 +17,7 @@ package msggateway import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/utils/datautil" "time" @@ -26,6 +27,7 @@ import ( type Config struct { MsgGateway config.MsgGateway Share config.Share + RedisConfig config.Redis WebhooksConfig config.Webhooks Discovery config.Discovery } @@ -42,6 +44,10 @@ func Start(ctx context.Context, index int, conf *Config) error { if err != nil { return err } + rdb, err := redisutil.NewRedisClient(ctx, conf.RedisConfig.Build()) + if err != nil { + return err + } longServer := NewWsServer( conf, WithPort(wsPort), @@ -52,6 +58,8 @@ func Start(ctx context.Context, index int, conf *Config) error { go longServer.ChangeOnlineStatus(4) + go longServer.SubscriberUserOnlineStatusChanges(rdb) + hubServer := NewServer(rpcPort, longServer, conf) netDone := make(chan error) go func() { diff --git a/internal/msggateway/subscription.go b/internal/msggateway/subscription.go index d7e606037..98cf41366 100644 --- a/internal/msggateway/subscription.go +++ b/internal/msggateway/subscription.go @@ -1,5 +1,30 @@ package msggateway +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/util/useronline" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/mcontext" + "github.com/redis/go-redis/v9" + "math/rand" + "strconv" +) + +func (ws *WsServer) SubscriberUserOnlineStatusChanges(rdb redis.UniversalClient) { + ctx := mcontext.SetOperationID(context.Background(), cachekey.OnlineChannel+strconv.FormatUint(rand.Uint64(), 10)) + for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() { + userID, platformIDs, err := useronline.ParseUserOnlineStatus(message.Payload) + if err != nil { + log.ZError(ctx, "OnlineCache redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel) + continue + } + if ws.clients.RecvSubChange(userID, platformIDs) { + log.ZDebug(ctx, "receive subscription message and go back online", "userID", userID) + } + } +} + //import ( // "context" // "encoding/json" diff --git a/internal/msggateway/user_map.go b/internal/msggateway/user_map.go index 57fa78087..bd1f19728 100644 --- a/internal/msggateway/user_map.go +++ b/internal/msggateway/user_map.go @@ -13,6 +13,7 @@ type UserMap interface { DeleteClients(userID string, clients []*Client) (isDeleteUser bool) UserState() <-chan UserState GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState + RecvSubChange(userID string, platformIDs []int32) bool } type UserState struct { @@ -37,6 +38,17 @@ func (u *UserPlatform) PlatformIDs() []int32 { return platformIDs } +func (u *UserPlatform) PlatformIDSet() map[int32]struct{} { + if len(u.Clients) == 0 { + return nil + } + platformIDs := make(map[int32]struct{}) + for _, client := range u.Clients { + platformIDs[int32(client.PlatformID)] = struct{}{} + } + return platformIDs +} + func newUserMap() UserMap { return &userMap{ data: make(map[string]*UserPlatform), @@ -50,6 +62,24 @@ type userMap struct { ch chan UserState } +func (u *userMap) RecvSubChange(userID string, platformIDs []int32) bool { + u.lock.RLock() + defer u.lock.RUnlock() + result, ok := u.data[userID] + if !ok { + return false + } + localPlatformIDs := result.PlatformIDSet() + for _, platformID := range platformIDs { + delete(localPlatformIDs, platformID) + } + if len(localPlatformIDs) == 0 { + return false + } + u.push(userID, result, nil) + return true +} + func (u *userMap) push(userID string, userPlatform *UserPlatform, offline []int32) bool { select { case u.ch <- UserState{UserID: userID, Online: userPlatform.PlatformIDs(), Offline: offline}: diff --git a/pkg/common/cmd/msg_gateway.go b/pkg/common/cmd/msg_gateway.go index 78004094c..29d3fba33 100644 --- a/pkg/common/cmd/msg_gateway.go +++ b/pkg/common/cmd/msg_gateway.go @@ -37,6 +37,7 @@ func NewMsgGatewayCmd() *MsgGatewayCmd { ret.configMap = map[string]any{ OpenIMMsgGatewayCfgFileName: &msgGatewayConfig.MsgGateway, ShareFileName: &msgGatewayConfig.Share, + RedisConfigFileName: &msgGatewayConfig.RedisConfig, WebhooksConfigFileName: &msgGatewayConfig.WebhooksConfig, DiscoveryConfigFilename: &msgGatewayConfig.Discovery, } diff --git a/pkg/rpccache/online.go b/pkg/rpccache/online.go index eac2f43af..d986651c7 100644 --- a/pkg/rpccache/online.go +++ b/pkg/rpccache/online.go @@ -2,17 +2,16 @@ package rpccache import ( "context" - "errors" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/open-im-server/v3/pkg/localcache/lru" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + "github.com/openimsdk/open-im-server/v3/pkg/util/useronline" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "github.com/redis/go-redis/v9" "math/rand" "strconv" - "strings" "time" ) @@ -25,28 +24,9 @@ func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb re }), } go func() { - parseUserOnlineStatus := func(payload string) (string, []int32, error) { - arr := strings.Split(payload, ":") - if len(arr) == 0 { - return "", nil, errors.New("invalid data") - } - userID := arr[len(arr)-1] - if userID == "" { - return "", nil, errors.New("userID is empty") - } - platformIDs := make([]int32, len(arr)-1) - for i := range platformIDs { - platformID, err := strconv.Atoi(arr[i]) - if err != nil { - return "", nil, err - } - platformIDs[i] = int32(platformID) - } - return userID, platformIDs, nil - } ctx := mcontext.SetOperationID(context.Background(), cachekey.OnlineChannel+strconv.FormatUint(rand.Uint64(), 10)) for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() { - userID, platformIDs, err := parseUserOnlineStatus(message.Payload) + userID, platformIDs, err := useronline.ParseUserOnlineStatus(message.Payload) if err != nil { log.ZError(ctx, "OnlineCache redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel) continue diff --git a/pkg/util/useronline/split.go b/pkg/util/useronline/split.go new file mode 100644 index 000000000..c39d31d15 --- /dev/null +++ b/pkg/util/useronline/split.go @@ -0,0 +1,27 @@ +package useronline + +import ( + "errors" + "strconv" + "strings" +) + +func ParseUserOnlineStatus(payload string) (string, []int32, error) { + arr := strings.Split(payload, ":") + if len(arr) == 0 { + return "", nil, errors.New("invalid data") + } + userID := arr[len(arr)-1] + if userID == "" { + return "", nil, errors.New("userID is empty") + } + platformIDs := make([]int32, len(arr)-1) + for i := range platformIDs { + platformID, err := strconv.Atoi(arr[i]) + if err != nil { + return "", nil, err + } + platformIDs[i] = int32(platformID) + } + return userID, platformIDs, nil +}