diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index 07a83fb5c..83e297502 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -74,8 +74,8 @@ type WsServer struct { kickHandlerChan chan *kickHandler clients *UserMap clientPool sync.Pool - onlineUserNum int64 - onlineUserConnNum int64 + onlineUserNum atomic.Int64 + onlineUserConnNum atomic.Int64 handshakeTimeout time.Duration hubServer *Server validate *validator.Validate @@ -220,8 +220,8 @@ func (ws *WsServer) registerClient(client *Client) { if !userOK { ws.clients.Set(client.UserID, client) log.ZDebug(client.ctx, "user not exist", "userID", client.UserID, "platformID", client.PlatformID) - atomic.AddInt64(&ws.onlineUserNum, 1) - atomic.AddInt64(&ws.onlineUserConnNum, 1) + ws.onlineUserNum.Add(1) + ws.onlineUserConnNum.Add(1) } else { i := &kickHandler{ clientOK: clientOK, @@ -234,22 +234,35 @@ func (ws *WsServer) registerClient(client *Client) { ws.clients.Set(client.UserID, client) // 已经有同平台的连接存在 log.ZInfo(client.ctx, "repeat login", "userID", client.UserID, "platformID", client.PlatformID, "old remote addr", getRemoteAdders(oldClients)) - atomic.AddInt64(&ws.onlineUserConnNum, 1) + ws.onlineUserConnNum.Add(1) } else { ws.clients.Set(client.UserID, client) - - atomic.AddInt64(&ws.onlineUserConnNum, 1) + ws.onlineUserConnNum.Add(1) } } - ws.sendUserOnlineInfoToOtherNode(client.ctx, client) - ws.SetUserOnlineStatus(client.ctx, client, constant.Online) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + _ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client) + }() + + wg.Add(1) + go func() { + defer wg.Done() + ws.SetUserOnlineStatus(client.ctx, client, constant.Online) + }() + + wg.Wait() + log.ZInfo( client.ctx, "user online", "online user Num", - ws.onlineUserNum, + ws.onlineUserNum.Load(), "online user conn Num", - ws.onlineUserConnNum, + ws.onlineUserConnNum.Load(), ) } @@ -282,7 +295,7 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien if clientOK { isDeleteUser := ws.clients.deleteClients(newClient.UserID, oldClients) if isDeleteUser { - atomic.AddInt64(&ws.onlineUserNum, -1) + ws.onlineUserNum.Add(-1) } for _, c := range oldClients { err := c.KickOnlineMessage() @@ -350,18 +363,18 @@ func (ws *WsServer) unregisterClient(client *Client) { defer ws.clientPool.Put(client) isDeleteUser := ws.clients.delete(client.UserID, client.ctx.GetRemoteAddr()) if isDeleteUser { - atomic.AddInt64(&ws.onlineUserNum, -1) + ws.onlineUserNum.Add(-1) } - atomic.AddInt64(&ws.onlineUserConnNum, -1) + ws.onlineUserConnNum.Add(-1) ws.SetUserOnlineStatus(client.ctx, client, constant.Offline) log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num", ws.onlineUserNum, "online user conn Num", - ws.onlineUserConnNum, + ws.onlineUserConnNum.Load(), ) } func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) { connContext := newContext(w, r) - if ws.onlineUserConnNum >= ws.wsMaxConnNum { + if ws.onlineUserConnNum.Load() >= ws.wsMaxConnNum { httpError(connContext, errs.ErrConnOverMaxNumLimit) return }