From 9a4f5f78cb03fa16261059a44d3d95d348121d41 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Tue, 9 Jul 2024 14:26:55 +0800 Subject: [PATCH] sub --- go.mod | 2 + go.sum | 2 - internal/msggateway/client.go | 6 +- internal/msggateway/compressor_test.go | 9 +- internal/msggateway/constant.go | 1 + internal/msggateway/hub_server.go | 10 +- internal/msggateway/init.go | 9 +- internal/msggateway/n_ws_server.go | 32 ++- internal/msggateway/subscription.go | 330 +++++++++++++------------ internal/push/push_handler.go | 2 +- pkg/rpccache/online.go | 5 +- 11 files changed, 222 insertions(+), 186 deletions(-) diff --git a/go.mod b/go.mod index 6450f3615..41ab0a322 100644 --- a/go.mod +++ b/go.mod @@ -175,3 +175,5 @@ require ( golang.org/x/crypto v0.21.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) + +replace github.com/openimsdk/protocol => /Users/chao/Desktop/project/protocol diff --git a/go.sum b/go.sum index 577154a1b..92350d745 100644 --- a/go.sum +++ b/go.sum @@ -262,8 +262,6 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.69-alpha.24 h1:TYcNJeWOTuE40UQ54eNPdDdy0KTOh9rAOgax8lCyhDc= -github.com/openimsdk/protocol v0.0.69-alpha.24/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.49-alpha.45 h1:XIzCoef4myybOiIlGuRY9FTtGBisZFC4Uy4PhG0ZWQ0= github.com/openimsdk/tools v0.0.49-alpha.45/go.mod h1:HtSRjPTL8PsuZ+PhR5noqzrYBF0sdwW3/O/sWVucWg8= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 62c71d5be..bed454e49 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -72,8 +72,8 @@ type Client struct { closed atomic.Bool closedErr error token string - //subLock sync.Mutex - //subUserIDs map[string]struct{} + subLock sync.Mutex + subUserIDs map[string]struct{} } // ResetClient updates the client's state with new connection and context information. @@ -204,6 +204,8 @@ func (c *Client) handleMessage(message []byte) error { resp, messageErr = c.longConnServer.UserLogout(ctx, binaryReq) case WsSetBackgroundStatus: resp, messageErr = c.setAppBackgroundStatus(ctx, binaryReq) + case WsSubUserOnlineStatus: + resp, messageErr = c.longConnServer.SubUserOnlineStatus(ctx, c, binaryReq) default: return fmt.Errorf( "ReqIdentifier failed,sendID:%s,msgIncr:%s,reqIdentifier:%d", diff --git a/internal/msggateway/compressor_test.go b/internal/msggateway/compressor_test.go index 173c9bb20..952bd4d95 100644 --- a/internal/msggateway/compressor_test.go +++ b/internal/msggateway/compressor_test.go @@ -16,10 +16,10 @@ package msggateway import ( "crypto/rand" + "github.com/stretchr/testify/assert" "sync" "testing" - - "github.com/stretchr/testify/assert" + "unsafe" ) func mockRandom() []byte { @@ -132,3 +132,8 @@ func BenchmarkDecompressWithSyncPool(b *testing.B) { assert.Equal(b, nil, err) } } + +func TestName(t *testing.T) { + t.Log(unsafe.Sizeof(Client{})) + +} diff --git a/internal/msggateway/constant.go b/internal/msggateway/constant.go index 64664ac0a..bc74ed583 100644 --- a/internal/msggateway/constant.go +++ b/internal/msggateway/constant.go @@ -43,6 +43,7 @@ const ( WSKickOnlineMsg = 2002 WsLogoutMsg = 2003 WsSetBackgroundStatus = 2004 + WsSubUserOnlineStatus = 2005 WSDataError = 3001 ) diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index 8ff6d1001..3891aa532 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -19,6 +19,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" + "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msggateway" "github.com/openimsdk/tools/discovery" @@ -31,6 +32,10 @@ import ( func (s *Server) InitServer(ctx context.Context, config *Config, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error { s.LongConnServer.SetDiscoveryRegistry(disCov, config) msggateway.RegisterMsgGatewayServer(server, s) + s.userRcp = rpcclient.NewUserRpcClient(disCov, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) + if s.ready != nil { + return s.ready(s) + } return nil } @@ -50,18 +55,21 @@ type Server struct { LongConnServer LongConnServer config *Config pushTerminal map[int]struct{} + ready func(srv *Server) error + userRcp rpcclient.UserRpcClient } func (s *Server) SetLongConnServer(LongConnServer LongConnServer) { s.LongConnServer = LongConnServer } -func NewServer(rpcPort int, longConnServer LongConnServer, conf *Config) *Server { +func NewServer(rpcPort int, longConnServer LongConnServer, conf *Config, ready func(srv *Server) error) *Server { s := &Server{ rpcPort: rpcPort, LongConnServer: longConnServer, pushTerminal: make(map[int]struct{}), config: conf, + ready: ready, } s.pushTerminal[constant.IOSPlatformID] = struct{}{} s.pushTerminal[constant.AndroidPlatformID] = struct{}{} diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 739a71ecb..44e79e412 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -17,6 +17,7 @@ package msggateway import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/rpccache" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/utils/datautil" "time" @@ -56,11 +57,13 @@ func Start(ctx context.Context, index int, conf *Config) error { WithMessageMaxMsgLength(conf.MsgGateway.LongConnSvr.WebsocketMaxMsgLen), ) - go longServer.ChangeOnlineStatus(4) + hubServer := NewServer(rpcPort, longServer, conf, func(srv *Server) error { + longServer.online = rpccache.NewOnlineCache(srv.userRcp, nil, rdb, longServer.subscriberUserOnlineStatusChanges) + return nil + }) - go longServer.SubscriberUserOnlineStatusChanges(rdb) + go longServer.ChangeOnlineStatus(4) - hubServer := NewServer(rpcPort, longServer, conf) netDone := make(chan error) go func() { err = hubServer.Start(ctx, index, conf) diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index 8a79151fe..e903084a9 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" + "github.com/openimsdk/open-im-server/v3/pkg/rpccache" pbAuth "github.com/openimsdk/protocol/auth" "github.com/openimsdk/tools/mcontext" "net/http" @@ -48,20 +49,22 @@ type LongConnServer interface { KickUserConn(client *Client) error UnRegister(c *Client) SetKickHandlerInfo(i *kickHandler) + SubUserOnlineStatus(ctx context.Context, client *Client, data *Req) ([]byte, error) Compressor Encoder MessageHandler } type WsServer struct { - msgGatewayConfig *Config - port int - wsMaxConnNum int64 - registerChan chan *Client - unregisterChan chan *Client - kickHandlerChan chan *kickHandler - clients UserMap - //subscription *Subscription + msgGatewayConfig *Config + port int + wsMaxConnNum int64 + registerChan chan *Client + unregisterChan chan *Client + kickHandlerChan chan *kickHandler + clients UserMap + online *rpccache.OnlineCache + subscription *Subscription clientPool sync.Pool onlineUserNum atomic.Int64 onlineUserConnNum atomic.Int64 @@ -125,6 +128,8 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer { for _, o := range opts { o(&config) } + //userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) + v := validator.New() return &WsServer{ msgGatewayConfig: msgGatewayConfig, @@ -142,10 +147,10 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer { kickHandlerChan: make(chan *kickHandler, 1000), validate: v, clients: newUserMap(), - //subscription: newSubscription(), - Compressor: NewGzipCompressor(), - Encoder: NewGobEncoder(), - webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL), + subscription: newSubscription(), + Compressor: NewGzipCompressor(), + Encoder: NewGobEncoder(), + webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL), } } @@ -353,6 +358,9 @@ func (ws *WsServer) unregisterClient(client *Client) { prommetrics.OnlineUserGauge.Dec() } ws.onlineUserConnNum.Add(-1) + client.subLock.Lock() + clear(client.subUserIDs) + client.subLock.Unlock() //ws.SetUserOnlineStatus(client.ctx, client, constant.Offline) log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num", ws.onlineUserNum.Load(), "online user conn Num", diff --git a/internal/msggateway/subscription.go b/internal/msggateway/subscription.go index 8df548f54..9460f5dbf 100644 --- a/internal/msggateway/subscription.go +++ b/internal/msggateway/subscription.go @@ -2,174 +2,180 @@ package msggateway import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" - "github.com/openimsdk/open-im-server/v3/pkg/util/useronline" + "encoding/json" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mcontext" - "github.com/redis/go-redis/v9" - "math/rand" - "strconv" + "github.com/openimsdk/tools/utils/datautil" + "github.com/openimsdk/tools/utils/idutil" + "google.golang.org/protobuf/proto" + "sync" + "time" ) -func (ws *WsServer) SubscriberUserOnlineStatusChanges(rdb redis.UniversalClient) { - ctx := mcontext.SetOperationID(context.Background(), cachekey.OnlineChannel+strconv.FormatUint(rand.Uint64(), 10)) - for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() { - userID, platformIDs, err := useronline.ParseUserOnlineStatus(message.Payload) - if err != nil { - log.ZError(ctx, "OnlineCache redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel) +func (ws *WsServer) subscriberUserOnlineStatusChanges(ctx context.Context, userID string, platformIDs []int32) { + if ws.clients.RecvSubChange(userID, platformIDs) { + log.ZDebug(ctx, "gateway receive subscription message and go back online", "userID", userID, "platformIDs", platformIDs) + } else { + log.ZDebug(ctx, "gateway ignore user online status changes", "userID", userID, "platformIDs", platformIDs) + } + ws.pushUserIDOnlineStatus(ctx, userID, platformIDs) +} + +func (ws *WsServer) SubUserOnlineStatus(ctx context.Context, client *Client, data *Req) ([]byte, error) { + var sub sdkws.SubUserOnlineStatus + if err := proto.Unmarshal(data.Data, &sub); err != nil { + return nil, err + } + ws.subscription.Sub(client, sub.SubscribeUserID, sub.UnsubscribeUserID) + var resp sdkws.SubUserOnlineStatusTips + if len(sub.SubscribeUserID) > 0 { + resp.Subscribers = make([]*sdkws.SubUserOnlineStatusElem, 0, len(sub.SubscribeUserID)) + for _, userID := range sub.SubscribeUserID { + platformIDs, err := ws.online.GetUserOnlinePlatform(ctx, userID) + if err != nil { + return nil, err + } + resp.Subscribers = append(resp.Subscribers, &sdkws.SubUserOnlineStatusElem{ + UserID: userID, + OnlinePlatformIDs: platformIDs, + }) + } + } + return proto.Marshal(&resp) +} + +type subClient struct { + clients map[string]*Client +} + +func newSubscription() *Subscription { + return &Subscription{ + userIDs: make(map[string]*subClient), + } +} + +type Subscription struct { + lock sync.RWMutex + userIDs map[string]*subClient +} + +func (s *Subscription) GetClient(userID string) []*Client { + s.lock.RLock() + defer s.lock.RUnlock() + cs, ok := s.userIDs[userID] + if !ok { + return nil + } + clients := make([]*Client, 0, len(cs.clients)) + for _, client := range cs.clients { + clients = append(clients, client) + } + return clients +} + +func (s *Subscription) DelClient(client *Client) { + client.subLock.Lock() + userIDs := datautil.Keys(client.subUserIDs) + for _, userID := range userIDs { + delete(client.subUserIDs, userID) + } + client.subLock.Unlock() + if len(userIDs) == 0 { + return + } + addr := client.ctx.GetRemoteAddr() + s.lock.Lock() + defer s.lock.Unlock() + for _, userID := range userIDs { + sub, ok := s.userIDs[userID] + if !ok { continue } - if ws.clients.RecvSubChange(userID, platformIDs) { - log.ZDebug(ctx, "gateway receive subscription message and go back online", "userID", userID, "platformIDs", platformIDs) - } else { - log.ZDebug(ctx, "gateway ignore user online status changes", "userID", userID, "platformIDs", platformIDs) + delete(sub.clients, addr) + if len(sub.clients) == 0 { + delete(s.userIDs, userID) } } } -//import ( -// "context" -// "encoding/json" -// "github.com/openimsdk/protocol/constant" -// "github.com/openimsdk/protocol/sdkws" -// "github.com/openimsdk/tools/log" -// "github.com/openimsdk/tools/utils/datautil" -// "github.com/openimsdk/tools/utils/idutil" -// "sync" -// "time" -//) -// -//type subClient struct { -// clients map[string]*Client -//} -// -//func newSubscription() *Subscription { -// return &Subscription{ -// userIDs: make(map[string]*subClient), -// } -//} -// -//type Subscription struct { -// lock sync.RWMutex -// userIDs map[string]*subClient -//} -// -//func (s *Subscription) GetClient(userID string) []*Client { -// s.lock.RLock() -// defer s.lock.RUnlock() -// cs, ok := s.userIDs[userID] -// if !ok { -// return nil -// } -// clients := make([]*Client, 0, len(cs.clients)) -// for _, client := range cs.clients { -// clients = append(clients, client) -// } -// return clients -//} -// -//func (s *Subscription) DelClient(client *Client) { -// client.subLock.Lock() -// userIDs := datautil.Keys(client.subUserIDs) -// for _, userID := range userIDs { -// delete(client.subUserIDs, userID) -// } -// client.subLock.Unlock() -// if len(userIDs) == 0 { -// return -// } -// addr := client.ctx.GetRemoteAddr() -// s.lock.Lock() -// defer s.lock.Unlock() -// for _, userID := range userIDs { -// sub, ok := s.userIDs[userID] -// if !ok { -// continue -// } -// delete(sub.clients, addr) -// if len(sub.clients) == 0 { -// delete(s.userIDs, userID) -// } -// } -//} -// -//func (s *Subscription) Sub(client *Client, addUserIDs, delUserIDs []string) { -// if len(addUserIDs)+len(delUserIDs) == 0 { -// return -// } -// var ( -// del = make(map[string]struct{}) -// add = make(map[string]struct{}) -// ) -// client.subLock.Lock() -// for _, userID := range delUserIDs { -// if _, ok := client.subUserIDs[userID]; !ok { -// continue -// } -// del[userID] = struct{}{} -// delete(client.subUserIDs, userID) -// } -// for _, userID := range addUserIDs { -// delete(del, userID) -// if _, ok := client.subUserIDs[userID]; ok { -// continue -// } -// client.subUserIDs[userID] = struct{}{} -// } -// client.subLock.Unlock() -// if len(del)+len(add) == 0 { -// return -// } -// addr := client.ctx.GetRemoteAddr() -// s.lock.Lock() -// defer s.lock.Unlock() -// for userID := range del { -// sub, ok := s.userIDs[userID] -// if !ok { -// continue -// } -// delete(sub.clients, addr) -// if len(sub.clients) == 0 { -// delete(s.userIDs, userID) -// } -// } -// for userID := range add { -// sub, ok := s.userIDs[userID] -// if !ok { -// sub = &subClient{clients: make(map[string]*Client)} -// s.userIDs[userID] = sub -// } -// sub.clients[addr] = client -// } -//} -// -//func (ws *WsServer) pushUserIDOnlineStatus(ctx context.Context, userID string, platformIDs []int32) { -// clients := ws.subscription.GetClient(userID) -// if len(clients) == 0 { -// return -// } -// msgContent, err := json.Marshal(platformIDs) -// if err != nil { -// log.ZError(ctx, "pushUserIDOnlineStatus json.Marshal", err) -// return -// } -// now := time.Now().UnixMilli() -// msgID := idutil.GetMsgIDByMD5(userID) -// msg := &sdkws.MsgData{ -// SendID: userID, -// ClientMsgID: msgID, -// ServerMsgID: msgID, -// SenderPlatformID: constant.AdminPlatformID, -// SessionType: constant.NotificationChatType, -// ContentType: constant.UserSubscribeOnlineStatusNotification, -// Content: msgContent, -// SendTime: now, -// CreateTime: now, -// } -// for _, client := range clients { -// msg.RecvID = client.UserID -// if err := client.PushMessage(ctx, msg); err != nil { -// log.ZError(ctx, "UserSubscribeOnlineStatusNotification push failed", err, "userID", client.UserID, "platformID", client.PlatformID, "changeUserID", userID, "content", msgContent) -// } -// } -//} +func (s *Subscription) Sub(client *Client, addUserIDs, delUserIDs []string) { + if len(addUserIDs)+len(delUserIDs) == 0 { + return + } + var ( + del = make(map[string]struct{}) + add = make(map[string]struct{}) + ) + client.subLock.Lock() + for _, userID := range delUserIDs { + if _, ok := client.subUserIDs[userID]; !ok { + continue + } + del[userID] = struct{}{} + delete(client.subUserIDs, userID) + } + for _, userID := range addUserIDs { + delete(del, userID) + if _, ok := client.subUserIDs[userID]; ok { + continue + } + client.subUserIDs[userID] = struct{}{} + } + client.subLock.Unlock() + if len(del)+len(add) == 0 { + return + } + addr := client.ctx.GetRemoteAddr() + s.lock.Lock() + defer s.lock.Unlock() + for userID := range del { + sub, ok := s.userIDs[userID] + if !ok { + continue + } + delete(sub.clients, addr) + if len(sub.clients) == 0 { + delete(s.userIDs, userID) + } + } + for userID := range add { + sub, ok := s.userIDs[userID] + if !ok { + sub = &subClient{clients: make(map[string]*Client)} + s.userIDs[userID] = sub + } + sub.clients[addr] = client + } +} + +func (ws *WsServer) pushUserIDOnlineStatus(ctx context.Context, userID string, platformIDs []int32) { + clients := ws.subscription.GetClient(userID) + if len(clients) == 0 { + return + } + msgContent, err := json.Marshal(platformIDs) + if err != nil { + log.ZError(ctx, "pushUserIDOnlineStatus json.Marshal", err) + return + } + now := time.Now().UnixMilli() + msgID := idutil.GetMsgIDByMD5(userID) + msg := &sdkws.MsgData{ + SendID: userID, + ClientMsgID: msgID, + ServerMsgID: msgID, + SenderPlatformID: constant.AdminPlatformID, + SessionType: constant.NotificationChatType, + ContentType: constant.UserSubscribeOnlineStatusNotification, + Content: msgContent, + SendTime: now, + CreateTime: now, + } + for _, client := range clients { + msg.RecvID = client.UserID + if err := client.PushMessage(ctx, msg); err != nil { + log.ZError(ctx, "UserSubscribeOnlineStatusNotification push failed", err, "userID", client.UserID, "platformID", client.PlatformID, "changeUserID", userID, "content", msgContent) + } + } +} diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index e942f40ae..ed87b3929 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -75,7 +75,7 @@ func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher, consumerHandler.conversationLocalCache = rpccache.NewConversationLocalCache(consumerHandler.conversationRpcClient, &config.LocalCacheConfig, rdb) consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL) consumerHandler.config = config - consumerHandler.onlineCache = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb) + consumerHandler.onlineCache = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb, nil) return &consumerHandler, nil } diff --git a/pkg/rpccache/online.go b/pkg/rpccache/online.go index d986651c7..5db68d198 100644 --- a/pkg/rpccache/online.go +++ b/pkg/rpccache/online.go @@ -15,7 +15,7 @@ import ( "time" ) -func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb redis.UniversalClient) *OnlineCache { +func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb redis.UniversalClient, fn func(ctx context.Context, userID string, platformIDs []int32)) *OnlineCache { x := &OnlineCache{ user: user, group: group, @@ -33,6 +33,9 @@ func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb re } storageCache := x.setUserOnline(userID, platformIDs) log.ZDebug(ctx, "OnlineCache setUserOnline", "userID", userID, "platformIDs", platformIDs, "payload", message.Payload, "storageCache", storageCache) + if fn != nil { + fn(ctx, userID, platformIDs) + } } }() return x