pull/2427/head
withchao 1 year ago
parent b6ab4a5f84
commit bfca6ced2a

@ -75,8 +75,8 @@ type Client struct {
token string token string
hbCtx context.Context hbCtx context.Context
hbCancel context.CancelFunc hbCancel context.CancelFunc
subLock sync.Mutex subLock *sync.Mutex
subUserIDs map[string]struct{} subUserIDs map[string]struct{} // client conn subscription list
} }
// ResetClient updates the client's state with new connection and context information. // ResetClient updates the client's state with new connection and context information.
@ -94,11 +94,11 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer
c.closedErr = nil c.closedErr = nil
c.token = ctx.GetToken() c.token = ctx.GetToken()
c.hbCtx, c.hbCancel = context.WithCancel(c.ctx) c.hbCtx, c.hbCancel = context.WithCancel(c.ctx)
if c.subUserIDs == nil { c.subLock = new(sync.Mutex)
c.subUserIDs = make(map[string]struct{}) if c.subUserIDs != nil {
} else {
clear(c.subUserIDs) clear(c.subUserIDs)
} }
c.subUserIDs = make(map[string]struct{})
} }
func (c *Client) pingHandler(_ string) error { func (c *Client) pingHandler(_ string) error {
@ -249,13 +249,11 @@ func (c *Client) setAppBackgroundStatus(ctx context.Context, req *Req) ([]byte,
} }
func (c *Client) close() { func (c *Client) close() {
c.w.Lock()
defer c.w.Unlock()
if c.closed.Load() { if c.closed.Load() {
return return
} }
c.w.Lock()
defer c.w.Unlock()
c.closed.Store(true) c.closed.Store(true)
c.conn.Close() c.conn.Close()
c.hbCancel() // Close server-initiated heartbeat. c.hbCancel() // Close server-initiated heartbeat.
@ -316,6 +314,14 @@ func (c *Client) KickOnlineMessage() error {
return err return err
} }
func (c *Client) PushUserOnlineStatus(data []byte) error {
resp := Resp{
ReqIdentifier: WsSubUserOnlineStatus,
Data: data,
}
return c.writeBinaryMsg(resp)
}
func (c *Client) writeBinaryMsg(resp Resp) error { func (c *Client) writeBinaryMsg(resp Resp) error {
if c.closed.Load() { if c.closed.Load() {
return nil return nil

@ -2,15 +2,11 @@ package msggateway
import ( import (
"context" "context"
"encoding/json"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/idutil"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"sync" "sync"
"time"
) )
func (ws *WsServer) subscriberUserOnlineStatusChanges(ctx context.Context, userID string, platformIDs []int32) { func (ws *WsServer) subscriberUserOnlineStatusChanges(ctx context.Context, userID string, platformIDs []int32) {
@ -45,33 +41,19 @@ func (ws *WsServer) SubUserOnlineStatus(ctx context.Context, client *Client, dat
return proto.Marshal(&resp) return proto.Marshal(&resp)
} }
type subClient struct {
clients map[string]*Client
}
func newSubscription() *Subscription { func newSubscription() *Subscription {
return &Subscription{ return &Subscription{
userIDs: make(map[string]*subClient), userIDs: make(map[string]*subClient),
} }
} }
type Subscription struct { type subClient struct {
lock sync.RWMutex clients map[string]*Client
userIDs map[string]*subClient
} }
func (s *Subscription) GetClient(userID string) []*Client { type Subscription struct {
s.lock.RLock() lock sync.RWMutex
defer s.lock.RUnlock() userIDs map[string]*subClient // subscribe to the user's client connection
cs, ok := s.userIDs[userID]
if !ok {
return nil
}
clients := make([]*Client, 0, len(cs.clients))
for _, client := range cs.clients {
clients = append(clients, client)
}
return clients
} }
func (s *Subscription) DelClient(client *Client) { func (s *Subscription) DelClient(client *Client) {
@ -99,6 +81,20 @@ func (s *Subscription) DelClient(client *Client) {
} }
} }
func (s *Subscription) GetClient(userID string) []*Client {
s.lock.RLock()
defer s.lock.RUnlock()
cs, ok := s.userIDs[userID]
if !ok {
return nil
}
clients := make([]*Client, 0, len(cs.clients))
for _, client := range cs.clients {
clients = append(clients, client)
}
return clients
}
func (s *Subscription) Sub(client *Client, addUserIDs, delUserIDs []string) { func (s *Subscription) Sub(client *Client, addUserIDs, delUserIDs []string) {
if len(addUserIDs)+len(delUserIDs) == 0 { if len(addUserIDs)+len(delUserIDs) == 0 {
return return
@ -121,6 +117,7 @@ func (s *Subscription) Sub(client *Client, addUserIDs, delUserIDs []string) {
continue continue
} }
client.subUserIDs[userID] = struct{}{} client.subUserIDs[userID] = struct{}{}
add[userID] = struct{}{}
} }
client.subLock.Unlock() client.subLock.Unlock()
if len(del)+len(add) == 0 { if len(del)+len(add) == 0 {
@ -154,28 +151,16 @@ func (ws *WsServer) pushUserIDOnlineStatus(ctx context.Context, userID string, p
if len(clients) == 0 { if len(clients) == 0 {
return return
} }
msgContent, err := json.Marshal(platformIDs) onlineStatus, err := proto.Marshal(&sdkws.SubUserOnlineStatusTips{
Subscribers: []*sdkws.SubUserOnlineStatusElem{{UserID: userID, OnlinePlatformIDs: platformIDs}},
})
if err != nil { if err != nil {
log.ZError(ctx, "pushUserIDOnlineStatus json.Marshal", err) log.ZError(ctx, "pushUserIDOnlineStatus json.Marshal", err)
return return
} }
now := time.Now().UnixMilli()
msgID := idutil.GetMsgIDByMD5(userID)
msg := &sdkws.MsgData{
SendID: userID,
ClientMsgID: msgID,
ServerMsgID: msgID,
SenderPlatformID: constant.AdminPlatformID,
SessionType: constant.NotificationChatType,
ContentType: constant.UserSubscribeOnlineStatusNotification,
Content: msgContent,
SendTime: now,
CreateTime: now,
}
for _, client := range clients { for _, client := range clients {
msg.RecvID = client.UserID if err := client.PushUserOnlineStatus(onlineStatus); err != nil {
if err := client.PushMessage(ctx, msg); err != nil { log.ZError(ctx, "UserSubscribeOnlineStatusNotification push failed", err, "userID", client.UserID, "platformID", client.PlatformID, "changeUserID", userID, "changePlatformID", platformIDs)
log.ZError(ctx, "UserSubscribeOnlineStatusNotification push failed", err, "userID", client.UserID, "platformID", client.PlatformID, "changeUserID", userID, "content", msgContent)
} }
} }
} }

@ -358,9 +358,7 @@ func (ws *WsServer) unregisterClient(client *Client) {
prommetrics.OnlineUserGauge.Dec() prommetrics.OnlineUserGauge.Dec()
} }
ws.onlineUserConnNum.Add(-1) ws.onlineUserConnNum.Add(-1)
client.subLock.Lock() ws.subscription.DelClient(client)
clear(client.subUserIDs)
client.subLock.Unlock()
//ws.SetUserOnlineStatus(client.ctx, client, constant.Offline) //ws.SetUserOnlineStatus(client.ctx, client, constant.Offline)
log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num", log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num",
ws.onlineUserNum.Load(), "online user conn Num", ws.onlineUserNum.Load(), "online user conn Num",

Loading…
Cancel
Save