|
|
|
@ -17,6 +17,7 @@ package msggateway
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"errors"
|
|
|
|
|
"github.com/OpenIMSDK/protocol/msggateway"
|
|
|
|
|
"net/http"
|
|
|
|
|
"strconv"
|
|
|
|
|
"sync"
|
|
|
|
@ -52,6 +53,7 @@ type LongConnServer interface {
|
|
|
|
|
SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry)
|
|
|
|
|
KickUserConn(client *Client) error
|
|
|
|
|
UnRegister(c *Client)
|
|
|
|
|
SetKickHandlerInfo(i *kickHandler)
|
|
|
|
|
Compressor
|
|
|
|
|
Encoder
|
|
|
|
|
MessageHandler
|
|
|
|
@ -78,6 +80,7 @@ type WsServer struct {
|
|
|
|
|
validate *validator.Validate
|
|
|
|
|
cache cache.MsgModel
|
|
|
|
|
userClient *rpcclient.UserRpcClient
|
|
|
|
|
disCov discoveryregistry.SvcDiscoveryRegistry
|
|
|
|
|
Compressor
|
|
|
|
|
Encoder
|
|
|
|
|
MessageHandler
|
|
|
|
@ -88,10 +91,11 @@ type kickHandler struct {
|
|
|
|
|
newClient *Client
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ws *WsServer) SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry) {
|
|
|
|
|
ws.MessageHandler = NewGrpcHandler(ws.validate, client)
|
|
|
|
|
u := rpcclient.NewUserRpcClient(client)
|
|
|
|
|
func (ws *WsServer) SetDiscoveryRegistry(disCov discoveryregistry.SvcDiscoveryRegistry) {
|
|
|
|
|
ws.MessageHandler = NewGrpcHandler(ws.validate, disCov)
|
|
|
|
|
u := rpcclient.NewUserRpcClient(disCov)
|
|
|
|
|
ws.userClient = &u
|
|
|
|
|
ws.disCov = disCov
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ws *WsServer) SetUserOnlineStatus(ctx context.Context, client *Client, status int32) {
|
|
|
|
@ -180,6 +184,31 @@ func (ws *WsServer) Run() error {
|
|
|
|
|
return http.ListenAndServe(":"+utils.IntToString(ws.port), nil) // Start listening
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *Client) error {
|
|
|
|
|
conns, err := ws.disCov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
// Online push message
|
|
|
|
|
for _, v := range conns {
|
|
|
|
|
if v.Target() == ws.disCov.GetSelfConnTarget() {
|
|
|
|
|
log.ZDebug(ctx, "Filter out this node", "node", v.Target())
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
msgClient := msggateway.NewMsgGatewayClient(v)
|
|
|
|
|
_, err := msgClient.MultiTerminalLoginCheck(ctx, &msggateway.MultiTerminalLoginCheckReq{UserID: client.UserID,
|
|
|
|
|
PlatformID: int32(client.PlatformID), Token: client.token})
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.ZWarn(ctx, "MultiTerminalLoginCheck err", err, "node", v.Target())
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
func (ws *WsServer) SetKickHandlerInfo(i *kickHandler) {
|
|
|
|
|
ws.kickHandlerChan <- i
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ws *WsServer) registerClient(client *Client) {
|
|
|
|
|
var (
|
|
|
|
|
userOK bool
|
|
|
|
@ -211,6 +240,7 @@ func (ws *WsServer) registerClient(client *Client) {
|
|
|
|
|
atomic.AddInt64(&ws.onlineUserConnNum, 1)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
ws.sendUserOnlineInfoToOtherNode(client.ctx, client)
|
|
|
|
|
ws.SetUserOnlineStatus(client.ctx, client, constant.Online)
|
|
|
|
|
log.ZInfo(
|
|
|
|
|
client.ctx,
|
|
|
|
|