diff --git a/go.mod b/go.mod index 01ff50b18..8f18943a7 100644 --- a/go.mod +++ b/go.mod @@ -37,8 +37,8 @@ require ( require github.com/google/uuid v1.3.0 require ( - github.com/OpenIMSDK/protocol v0.0.14 - github.com/OpenIMSDK/tools v0.0.13 + github.com/OpenIMSDK/protocol v0.0.15 + github.com/OpenIMSDK/tools v0.0.14 github.com/aliyun/aliyun-oss-go-sdk v2.2.8+incompatible github.com/go-redis/redis v6.15.9+incompatible github.com/go-sql-driver/mysql v1.7.1 diff --git a/go.sum b/go.sum index 26e009e28..a361872a3 100644 --- a/go.sum +++ b/go.sum @@ -17,10 +17,10 @@ cloud.google.com/go/storage v1.30.1/go.mod h1:NfxhC0UJE1aXSx7CIIbCf7y9HKT7Biccwk firebase.google.com/go v3.13.0+incompatible h1:3TdYC3DDi6aHn20qoRkxwGqNgdjtblwVAyRLQwGn/+4= firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIwjt8toICdV5Wh9ptHs= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/OpenIMSDK/protocol v0.0.14 h1:cvQ3f8MTcyYygAnZ7Exq6zIbvHGCEV0fWdpzjQEDDBQ= -github.com/OpenIMSDK/protocol v0.0.14/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= -github.com/OpenIMSDK/tools v0.0.13 h1:rcw4HS8S2DPZR9UOBxD8/ol9UBMzXBypzOVEytDRIMo= -github.com/OpenIMSDK/tools v0.0.13/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= +github.com/OpenIMSDK/protocol v0.0.15 h1:KrrvdHH9kFF/tFYL2FXRPAr2e5F5DctSHfHq6MQjUI4= +github.com/OpenIMSDK/protocol v0.0.15/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= +github.com/OpenIMSDK/tools v0.0.14 h1:WLof/+WxyPyRST+QkoTKubYCiV73uCLiL8pgnpH/yKQ= +github.com/OpenIMSDK/tools v0.0.14/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= github.com/Shopify/sarama v1.29.0 h1:ARid8o8oieau9XrHI55f/L3EoRAhm9px6sonbD7yuUE= github.com/Shopify/sarama v1.29.0/go.mod h1:2QpgD79wpdAESqNQMxNc0KYMkycd4slxGdV3TWSVqrU= diff --git a/internal/msggateway/callback.go b/internal/msggateway/callback.go index 11fdd0298..b2999f385 100644 --- a/internal/msggateway/callback.go +++ b/internal/msggateway/callback.go @@ -26,7 +26,7 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/http" ) -func url() string { +func callBackURL() string { return config.Config.Callback.CallbackUrl } @@ -49,7 +49,7 @@ func CallbackUserOnline(ctx context.Context, userID string, platformID int, isAp ConnID: connID, } resp := cbapi.CommonCallbackResp{} - return http.CallBackPostReturn(ctx, url(), &req, &resp, config.Config.Callback.CallbackUserOnline) + return http.CallBackPostReturn(ctx, callBackURL(), &req, &resp, config.Config.Callback.CallbackUserOnline) } func CallbackUserOffline(ctx context.Context, userID string, platformID int, connID string) error { @@ -70,7 +70,7 @@ func CallbackUserOffline(ctx context.Context, userID string, platformID int, con ConnID: connID, } resp := &cbapi.CallbackUserOfflineResp{} - return http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackUserOffline) + return http.CallBackPostReturn(ctx, callBackURL(), req, resp, config.Config.Callback.CallbackUserOffline) } func CallbackUserKickOff(ctx context.Context, userID string, platformID int) error { @@ -90,7 +90,7 @@ func CallbackUserKickOff(ctx context.Context, userID string, platformID int) err Seq: time.Now().UnixMilli(), } resp := &cbapi.CommonCallbackResp{} - return http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackUserOffline) + return http.CallBackPostReturn(ctx, callBackURL(), req, resp, config.Config.Callback.CallbackUserOffline) } // func callbackUserOnline(operationID, userID string, platformID int, token string, isAppBackground bool, connID diff --git a/internal/msggateway/context.go b/internal/msggateway/context.go index 601c28a34..1d32ff71c 100644 --- a/internal/msggateway/context.go +++ b/internal/msggateway/context.go @@ -16,6 +16,7 @@ package msggateway import ( "net/http" + "net/url" "strconv" "time" @@ -71,6 +72,11 @@ func newContext(respWriter http.ResponseWriter, req *http.Request) *UserConnCont ConnID: utils.Md5(req.RemoteAddr + "_" + strconv.Itoa(int(utils.GetCurrentTimestampByMill()))), } } +func newTempContext() *UserConnContext { + return &UserConnContext{ + Req: &http.Request{URL: &url.URL{}}, + } +} func (c *UserConnContext) GetRemoteAddr() string { return c.RemoteAddr @@ -116,9 +122,15 @@ func (c *UserConnContext) GetOperationID() string { return c.Req.URL.Query().Get(OperationID) } +func (c *UserConnContext) SetOperationID(operationID string) { + c.Req.URL.Query().Set(OperationID, operationID) +} func (c *UserConnContext) GetToken() string { return c.Req.URL.Query().Get(Token) } +func (c *UserConnContext) SetToken(token string) { + c.Req.URL.RawQuery = Token + "=" + token +} func (c *UserConnContext) GetBackground() bool { b, err := strconv.ParseBool(c.Req.URL.Query().Get(BackgroundStatus)) diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index d57d41aec..1ebbb5902 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -17,6 +17,8 @@ package msggateway import ( "context" + "github.com/OpenIMSDK/tools/mcontext" + "github.com/OpenIMSDK/Open-IM-Server/pkg/authverify" "github.com/OpenIMSDK/tools/errs" @@ -35,13 +37,13 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/startrpc" ) -func (s *Server) InitServer(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { +func (s *Server) InitServer(disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { rdb, err := cache.NewRedis() if err != nil { return err } msgModel := cache.NewMsgCacheModel(rdb) - s.LongConnServer.SetDiscoveryRegistry(client) + s.LongConnServer.SetDiscoveryRegistry(disCov) s.LongConnServer.SetCacheHandler(msgModel) msggateway.RegisterMsgGatewayServer(server, s) return nil @@ -198,6 +200,20 @@ func (s *Server) MultiTerminalLoginCheck( ctx context.Context, req *msggateway.MultiTerminalLoginCheckReq, ) (*msggateway.MultiTerminalLoginCheckResp, error) { - // TODO implement me - panic("implement me") + if oldClients, userOK, clientOK := s.LongConnServer.GetUserPlatformCons(req.UserID, int(req.PlatformID)); userOK { + tempUserCtx := newTempContext() + tempUserCtx.SetToken(req.Token) + tempUserCtx.SetOperationID(mcontext.GetOperationID(ctx)) + client := &Client{} + client.ctx = tempUserCtx + client.UserID = req.UserID + client.PlatformID = int(req.PlatformID) + i := &kickHandler{ + clientOK: clientOK, + oldClients: oldClients, + newClient: client, + } + s.LongConnServer.SetKickHandlerInfo(i) + } + return &msggateway.MultiTerminalLoginCheckResp{}, nil } diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index ee8853af6..124898f7a 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -23,6 +23,8 @@ import ( "sync/atomic" "time" + "github.com/OpenIMSDK/protocol/msggateway" + "github.com/OpenIMSDK/Open-IM-Server/pkg/authverify" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" @@ -52,6 +54,7 @@ type LongConnServer interface { SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry) KickUserConn(client *Client) error UnRegister(c *Client) + SetKickHandlerInfo(i *kickHandler) Compressor Encoder MessageHandler @@ -78,6 +81,7 @@ type WsServer struct { validate *validator.Validate cache cache.MsgModel userClient *rpcclient.UserRpcClient + disCov discoveryregistry.SvcDiscoveryRegistry Compressor Encoder MessageHandler @@ -88,10 +92,11 @@ type kickHandler struct { newClient *Client } -func (ws *WsServer) SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry) { - ws.MessageHandler = NewGrpcHandler(ws.validate, client) - u := rpcclient.NewUserRpcClient(client) +func (ws *WsServer) SetDiscoveryRegistry(disCov discoveryregistry.SvcDiscoveryRegistry) { + ws.MessageHandler = NewGrpcHandler(ws.validate, disCov) + u := rpcclient.NewUserRpcClient(disCov) ws.userClient = &u + ws.disCov = disCov } func (ws *WsServer) SetUserOnlineStatus(ctx context.Context, client *Client, status int32) { @@ -180,6 +185,31 @@ func (ws *WsServer) Run() error { return http.ListenAndServe(":"+utils.IntToString(ws.port), nil) // Start listening } +func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *Client) error { + conns, err := ws.disCov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName) + if err != nil { + return err + } + // Online push user online message to other node + for _, v := range conns { + if v.Target() == ws.disCov.GetSelfConnTarget() { + log.ZDebug(ctx, "Filter out this node", "node", v.Target()) + continue + } + msgClient := msggateway.NewMsgGatewayClient(v) + _, err := msgClient.MultiTerminalLoginCheck(ctx, &msggateway.MultiTerminalLoginCheckReq{UserID: client.UserID, + PlatformID: int32(client.PlatformID), Token: client.token}) + if err != nil { + log.ZWarn(ctx, "MultiTerminalLoginCheck err", err, "node", v.Target()) + continue + } + } + return nil +} +func (ws *WsServer) SetKickHandlerInfo(i *kickHandler) { + ws.kickHandlerChan <- i +} + func (ws *WsServer) registerClient(client *Client) { var ( userOK bool @@ -211,6 +241,7 @@ func (ws *WsServer) registerClient(client *Client) { atomic.AddInt64(&ws.onlineUserConnNum, 1) } } + ws.sendUserOnlineInfoToOtherNode(client.ctx, client) ws.SetUserOnlineStatus(client.ctx, client, constant.Online) log.ZInfo( client.ctx, @@ -249,7 +280,10 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien fallthrough case constant.AllLoginButSameTermKick: if clientOK { - ws.clients.deleteClients(newClient.UserID, oldClients) + isDeleteUser := ws.clients.deleteClients(newClient.UserID, oldClients) + if isDeleteUser { + atomic.AddInt64(&ws.onlineUserNum, -1) + } for _, c := range oldClients { err := c.KickOnlineMessage() if err != nil { @@ -301,7 +335,8 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien m[k] = constant.KickedToken } } - log.ZDebug(newClient.ctx, "set token map is ", "token map", m, "userID", newClient.UserID) + log.ZDebug(newClient.ctx, "set token map is ", "token map", m, "userID", + newClient.UserID, "token", newClient.ctx.GetToken()) err = ws.cache.SetTokenMapByUidPid(newClient.ctx, newClient.UserID, newClient.PlatformID, m) if err != nil { log.ZWarn(newClient.ctx, "SetTokenMapByUidPid err", err, "userID", newClient.UserID, "platformID", newClient.PlatformID) diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 1babb0ca5..c1fa0d6e6 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -134,7 +134,7 @@ func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID return err } for _, v := range conns { - log.ZDebug(ctx, "forceKickOff", "conn", v.(*grpc.ClientConn).Target()) + log.ZDebug(ctx, "forceKickOff", "conn", v.Target()) } for _, v := range conns { client := msggateway.NewMsgGatewayClient(v) diff --git a/internal/rpc/group/db_map.go b/internal/rpc/group/db_map.go index 62011390a..f793582f8 100644 --- a/internal/rpc/group/db_map.go +++ b/internal/rpc/group/db_map.go @@ -49,6 +49,9 @@ func UpdateGroupInfoMap(ctx context.Context, group *sdkws.GroupInfoForSet) map[s if group.ApplyMemberFriend != nil { m["apply_member_friend"] = group.ApplyMemberFriend.Value } + if group.Ex != nil { + m["ex"] = group.Ex.Value + } return m } diff --git a/pkg/common/db/s3/minio/minio.go b/pkg/common/db/s3/minio/minio.go index 1f848d1b4..bfe40a594 100644 --- a/pkg/common/db/s3/minio/minio.go +++ b/pkg/common/db/s3/minio/minio.go @@ -77,15 +77,15 @@ func NewMinio() (s3.Interface, error) { return nil, err } m := &Minio{ - bucket: conf.Bucket, - bucketURL: conf.Endpoint + "/" + conf.Bucket + "/", - core: &minio.Core{Client: client}, - lock: &sync.Mutex{}, - init: false, + bucket: conf.Bucket, + core: &minio.Core{Client: client}, + lock: &sync.Mutex{}, + init: false, } if conf.SignEndpoint == "" || conf.SignEndpoint == conf.Endpoint { m.opts = opts m.sign = m.core.Client + m.bucketURL = conf.Endpoint + "/" + conf.Bucket + "/" } else { su, err := url.Parse(conf.SignEndpoint) if err != nil { @@ -99,6 +99,7 @@ func NewMinio() (s3.Interface, error) { if err != nil { return nil, err } + m.bucketURL = conf.SignEndpoint + "/" + conf.Bucket + "/" } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() diff --git a/test/data-conversion/kafka-conversation.go b/test/data-conversion/kafka-conversation.go index 418b2e7d7..e3a26e009 100644 --- a/test/data-conversion/kafka-conversation.go +++ b/test/data-conversion/kafka-conversation.go @@ -16,6 +16,7 @@ package data_conversion import ( "fmt" + "github.com/Shopify/sarama" ) diff --git a/test/data-conversion/mysql-conversion.go b/test/data-conversion/mysql-conversion.go index 5c24dd6e2..08d7ab789 100644 --- a/test/data-conversion/mysql-conversion.go +++ b/test/data-conversion/mysql-conversion.go @@ -17,11 +17,13 @@ package data_conversion import ( "context" "fmt" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" + "time" + "github.com/OpenIMSDK/tools/log" "gorm.io/driver/mysql" "gorm.io/gorm" - "time" + + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" ) var (