|
|
@ -23,6 +23,8 @@ import (
|
|
|
|
"sync/atomic"
|
|
|
|
"sync/atomic"
|
|
|
|
"time"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"github.com/OpenIMSDK/protocol/msggateway"
|
|
|
|
|
|
|
|
|
|
|
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/authverify"
|
|
|
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/authverify"
|
|
|
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
|
|
|
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
|
|
|
|
|
|
|
|
|
|
|
@ -52,6 +54,7 @@ type LongConnServer interface {
|
|
|
|
SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry)
|
|
|
|
SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry)
|
|
|
|
KickUserConn(client *Client) error
|
|
|
|
KickUserConn(client *Client) error
|
|
|
|
UnRegister(c *Client)
|
|
|
|
UnRegister(c *Client)
|
|
|
|
|
|
|
|
SetKickHandlerInfo(i *kickHandler)
|
|
|
|
Compressor
|
|
|
|
Compressor
|
|
|
|
Encoder
|
|
|
|
Encoder
|
|
|
|
MessageHandler
|
|
|
|
MessageHandler
|
|
|
@ -78,6 +81,7 @@ type WsServer struct {
|
|
|
|
validate *validator.Validate
|
|
|
|
validate *validator.Validate
|
|
|
|
cache cache.MsgModel
|
|
|
|
cache cache.MsgModel
|
|
|
|
userClient *rpcclient.UserRpcClient
|
|
|
|
userClient *rpcclient.UserRpcClient
|
|
|
|
|
|
|
|
disCov discoveryregistry.SvcDiscoveryRegistry
|
|
|
|
Compressor
|
|
|
|
Compressor
|
|
|
|
Encoder
|
|
|
|
Encoder
|
|
|
|
MessageHandler
|
|
|
|
MessageHandler
|
|
|
@ -88,10 +92,11 @@ type kickHandler struct {
|
|
|
|
newClient *Client
|
|
|
|
newClient *Client
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (ws *WsServer) SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry) {
|
|
|
|
func (ws *WsServer) SetDiscoveryRegistry(disCov discoveryregistry.SvcDiscoveryRegistry) {
|
|
|
|
ws.MessageHandler = NewGrpcHandler(ws.validate, client)
|
|
|
|
ws.MessageHandler = NewGrpcHandler(ws.validate, disCov)
|
|
|
|
u := rpcclient.NewUserRpcClient(client)
|
|
|
|
u := rpcclient.NewUserRpcClient(disCov)
|
|
|
|
ws.userClient = &u
|
|
|
|
ws.userClient = &u
|
|
|
|
|
|
|
|
ws.disCov = disCov
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (ws *WsServer) SetUserOnlineStatus(ctx context.Context, client *Client, status int32) {
|
|
|
|
func (ws *WsServer) SetUserOnlineStatus(ctx context.Context, client *Client, status int32) {
|
|
|
@ -180,6 +185,31 @@ func (ws *WsServer) Run() error {
|
|
|
|
return http.ListenAndServe(":"+utils.IntToString(ws.port), nil) // Start listening
|
|
|
|
return http.ListenAndServe(":"+utils.IntToString(ws.port), nil) // Start listening
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *Client) error {
|
|
|
|
|
|
|
|
conns, err := ws.disCov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
// Online push user online message to other node
|
|
|
|
|
|
|
|
for _, v := range conns {
|
|
|
|
|
|
|
|
if v.Target() == ws.disCov.GetSelfConnTarget() {
|
|
|
|
|
|
|
|
log.ZDebug(ctx, "Filter out this node", "node", v.Target())
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
msgClient := msggateway.NewMsgGatewayClient(v)
|
|
|
|
|
|
|
|
_, err := msgClient.MultiTerminalLoginCheck(ctx, &msggateway.MultiTerminalLoginCheckReq{UserID: client.UserID,
|
|
|
|
|
|
|
|
PlatformID: int32(client.PlatformID), Token: client.token})
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
log.ZWarn(ctx, "MultiTerminalLoginCheck err", err, "node", v.Target())
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ws *WsServer) SetKickHandlerInfo(i *kickHandler) {
|
|
|
|
|
|
|
|
ws.kickHandlerChan <- i
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (ws *WsServer) registerClient(client *Client) {
|
|
|
|
func (ws *WsServer) registerClient(client *Client) {
|
|
|
|
var (
|
|
|
|
var (
|
|
|
|
userOK bool
|
|
|
|
userOK bool
|
|
|
@ -211,6 +241,7 @@ func (ws *WsServer) registerClient(client *Client) {
|
|
|
|
atomic.AddInt64(&ws.onlineUserConnNum, 1)
|
|
|
|
atomic.AddInt64(&ws.onlineUserConnNum, 1)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
ws.sendUserOnlineInfoToOtherNode(client.ctx, client)
|
|
|
|
ws.SetUserOnlineStatus(client.ctx, client, constant.Online)
|
|
|
|
ws.SetUserOnlineStatus(client.ctx, client, constant.Online)
|
|
|
|
log.ZInfo(
|
|
|
|
log.ZInfo(
|
|
|
|
client.ctx,
|
|
|
|
client.ctx,
|
|
|
@ -249,7 +280,10 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien
|
|
|
|
fallthrough
|
|
|
|
fallthrough
|
|
|
|
case constant.AllLoginButSameTermKick:
|
|
|
|
case constant.AllLoginButSameTermKick:
|
|
|
|
if clientOK {
|
|
|
|
if clientOK {
|
|
|
|
ws.clients.deleteClients(newClient.UserID, oldClients)
|
|
|
|
isDeleteUser := ws.clients.deleteClients(newClient.UserID, oldClients)
|
|
|
|
|
|
|
|
if isDeleteUser {
|
|
|
|
|
|
|
|
atomic.AddInt64(&ws.onlineUserNum, -1)
|
|
|
|
|
|
|
|
}
|
|
|
|
for _, c := range oldClients {
|
|
|
|
for _, c := range oldClients {
|
|
|
|
err := c.KickOnlineMessage()
|
|
|
|
err := c.KickOnlineMessage()
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
@ -301,7 +335,8 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien
|
|
|
|
m[k] = constant.KickedToken
|
|
|
|
m[k] = constant.KickedToken
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
log.ZDebug(newClient.ctx, "set token map is ", "token map", m, "userID", newClient.UserID)
|
|
|
|
log.ZDebug(newClient.ctx, "set token map is ", "token map", m, "userID",
|
|
|
|
|
|
|
|
newClient.UserID, "token", newClient.ctx.GetToken())
|
|
|
|
err = ws.cache.SetTokenMapByUidPid(newClient.ctx, newClient.UserID, newClient.PlatformID, m)
|
|
|
|
err = ws.cache.SetTokenMapByUidPid(newClient.ctx, newClient.UserID, newClient.PlatformID, m)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
log.ZWarn(newClient.ctx, "SetTokenMapByUidPid err", err, "userID", newClient.UserID, "platformID", newClient.PlatformID)
|
|
|
|
log.ZWarn(newClient.ctx, "SetTokenMapByUidPid err", err, "userID", newClient.UserID, "platformID", newClient.PlatformID)
|
|
|
|