From bfca6ced2a7fe432ed7b0e856cf43dc027a1b651 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 17 Jul 2024 18:03:10 +0800 Subject: [PATCH] sub --- internal/msggateway/client.go | 24 +++++++---- internal/msggateway/subscription.go | 65 +++++++++++------------------ internal/msggateway/ws_server.go | 4 +- 3 files changed, 41 insertions(+), 52 deletions(-) diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index d5368edf4..289d4e94d 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -75,8 +75,8 @@ type Client struct { token string hbCtx context.Context hbCancel context.CancelFunc - subLock sync.Mutex - subUserIDs map[string]struct{} + subLock *sync.Mutex + subUserIDs map[string]struct{} // client conn subscription list } // ResetClient updates the client's state with new connection and context information. @@ -94,11 +94,11 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer c.closedErr = nil c.token = ctx.GetToken() c.hbCtx, c.hbCancel = context.WithCancel(c.ctx) - if c.subUserIDs == nil { - c.subUserIDs = make(map[string]struct{}) - } else { + c.subLock = new(sync.Mutex) + if c.subUserIDs != nil { clear(c.subUserIDs) } + c.subUserIDs = make(map[string]struct{}) } func (c *Client) pingHandler(_ string) error { @@ -249,13 +249,11 @@ func (c *Client) setAppBackgroundStatus(ctx context.Context, req *Req) ([]byte, } func (c *Client) close() { + c.w.Lock() + defer c.w.Unlock() if c.closed.Load() { return } - - c.w.Lock() - defer c.w.Unlock() - c.closed.Store(true) c.conn.Close() c.hbCancel() // Close server-initiated heartbeat. @@ -316,6 +314,14 @@ func (c *Client) KickOnlineMessage() error { return err } +func (c *Client) PushUserOnlineStatus(data []byte) error { + resp := Resp{ + ReqIdentifier: WsSubUserOnlineStatus, + Data: data, + } + return c.writeBinaryMsg(resp) +} + func (c *Client) writeBinaryMsg(resp Resp) error { if c.closed.Load() { return nil diff --git a/internal/msggateway/subscription.go b/internal/msggateway/subscription.go index 9460f5dbf..9bb41e0df 100644 --- a/internal/msggateway/subscription.go +++ b/internal/msggateway/subscription.go @@ -2,15 +2,11 @@ package msggateway import ( "context" - "encoding/json" - "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" - "github.com/openimsdk/tools/utils/idutil" "google.golang.org/protobuf/proto" "sync" - "time" ) func (ws *WsServer) subscriberUserOnlineStatusChanges(ctx context.Context, userID string, platformIDs []int32) { @@ -45,33 +41,19 @@ func (ws *WsServer) SubUserOnlineStatus(ctx context.Context, client *Client, dat return proto.Marshal(&resp) } -type subClient struct { - clients map[string]*Client -} - func newSubscription() *Subscription { return &Subscription{ userIDs: make(map[string]*subClient), } } -type Subscription struct { - lock sync.RWMutex - userIDs map[string]*subClient +type subClient struct { + clients map[string]*Client } -func (s *Subscription) GetClient(userID string) []*Client { - s.lock.RLock() - defer s.lock.RUnlock() - cs, ok := s.userIDs[userID] - if !ok { - return nil - } - clients := make([]*Client, 0, len(cs.clients)) - for _, client := range cs.clients { - clients = append(clients, client) - } - return clients +type Subscription struct { + lock sync.RWMutex + userIDs map[string]*subClient // subscribe to the user's client connection } func (s *Subscription) DelClient(client *Client) { @@ -99,6 +81,20 @@ func (s *Subscription) DelClient(client *Client) { } } +func (s *Subscription) GetClient(userID string) []*Client { + s.lock.RLock() + defer s.lock.RUnlock() + cs, ok := s.userIDs[userID] + if !ok { + return nil + } + clients := make([]*Client, 0, len(cs.clients)) + for _, client := range cs.clients { + clients = append(clients, client) + } + return clients +} + func (s *Subscription) Sub(client *Client, addUserIDs, delUserIDs []string) { if len(addUserIDs)+len(delUserIDs) == 0 { return @@ -121,6 +117,7 @@ func (s *Subscription) Sub(client *Client, addUserIDs, delUserIDs []string) { continue } client.subUserIDs[userID] = struct{}{} + add[userID] = struct{}{} } client.subLock.Unlock() if len(del)+len(add) == 0 { @@ -154,28 +151,16 @@ func (ws *WsServer) pushUserIDOnlineStatus(ctx context.Context, userID string, p if len(clients) == 0 { return } - msgContent, err := json.Marshal(platformIDs) + onlineStatus, err := proto.Marshal(&sdkws.SubUserOnlineStatusTips{ + Subscribers: []*sdkws.SubUserOnlineStatusElem{{UserID: userID, OnlinePlatformIDs: platformIDs}}, + }) if err != nil { log.ZError(ctx, "pushUserIDOnlineStatus json.Marshal", err) return } - now := time.Now().UnixMilli() - msgID := idutil.GetMsgIDByMD5(userID) - msg := &sdkws.MsgData{ - SendID: userID, - ClientMsgID: msgID, - ServerMsgID: msgID, - SenderPlatformID: constant.AdminPlatformID, - SessionType: constant.NotificationChatType, - ContentType: constant.UserSubscribeOnlineStatusNotification, - Content: msgContent, - SendTime: now, - CreateTime: now, - } for _, client := range clients { - msg.RecvID = client.UserID - if err := client.PushMessage(ctx, msg); err != nil { - log.ZError(ctx, "UserSubscribeOnlineStatusNotification push failed", err, "userID", client.UserID, "platformID", client.PlatformID, "changeUserID", userID, "content", msgContent) + if err := client.PushUserOnlineStatus(onlineStatus); err != nil { + log.ZError(ctx, "UserSubscribeOnlineStatusNotification push failed", err, "userID", client.UserID, "platformID", client.PlatformID, "changeUserID", userID, "changePlatformID", platformIDs) } } } diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go index e903084a9..537b8c5f0 100644 --- a/internal/msggateway/ws_server.go +++ b/internal/msggateway/ws_server.go @@ -358,9 +358,7 @@ func (ws *WsServer) unregisterClient(client *Client) { prommetrics.OnlineUserGauge.Dec() } ws.onlineUserConnNum.Add(-1) - client.subLock.Lock() - clear(client.subUserIDs) - client.subLock.Unlock() + ws.subscription.DelClient(client) //ws.SetUserOnlineStatus(client.ctx, client, constant.Offline) log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num", ws.onlineUserNum.Load(), "online user conn Num",