From 0bd7269f1e20691bf1ecbbbacd72179e3d7ed367 Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Mon, 25 Mar 2024 21:17:35 +0800 Subject: [PATCH] fix: message gateway update. --- go.mod | 2 +- internal/msggateway/client.go | 4 ++-- internal/msggateway/context.go | 8 +++++--- internal/msggateway/hub_server.go | 4 ++-- internal/msggateway/message_handler.go | 10 +++++----- internal/msggateway/n_ws_server.go | 12 ++++++------ internal/msggateway/user_map.go | 4 ++-- 7 files changed, 23 insertions(+), 21 deletions(-) diff --git a/go.mod b/go.mod index fab33e8d6..59a9b291e 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/localcache v0.0.1 github.com/openimsdk/protocol v0.0.58-google - github.com/openimsdk/tools v0.0.46-alpha.16.0.20240322040503-5ee151e04e7d + github.com/openimsdk/tools v0.0.46 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/sirupsen/logrus v1.9.3 // indirect diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 7a6d6832f..babf9567a 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "github.com/openimsdk/tools/utils/stringutil" "runtime/debug" "sync" "sync/atomic" @@ -29,7 +30,6 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" - "github.com/openimsdk/tools/utils" "google.golang.org/protobuf/proto" ) @@ -91,7 +91,7 @@ type Client struct { func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, isBackground, isCompress bool, longConnServer LongConnServer, token string) { c.w = new(sync.Mutex) c.conn = conn - c.PlatformID = utils.StringToInt(ctx.GetPlatformID()) + c.PlatformID = stringutil.StringToInt(ctx.GetPlatformID()) c.IsCompress = isCompress c.IsBackground = isBackground c.UserID = ctx.GetUserID() diff --git a/internal/msggateway/context.go b/internal/msggateway/context.go index 9c12ee1f7..7139f9e9c 100644 --- a/internal/msggateway/context.go +++ b/internal/msggateway/context.go @@ -15,13 +15,15 @@ package msggateway import ( + "github.com/openimsdk/tools/utils/encrypt" + "github.com/openimsdk/tools/utils/stringutil" + "github.com/openimsdk/tools/utils/timeutil" "net/http" "net/url" "strconv" "time" "github.com/openimsdk/protocol/constant" - "github.com/openimsdk/tools/utils" ) type UserConnContext struct { @@ -54,7 +56,7 @@ func (c *UserConnContext) Value(key any) any { case constant.ConnID: return c.GetConnID() case constant.OpUserPlatform: - return constant.PlatformIDToName(utils.StringToInt(c.GetPlatformID())) + return constant.PlatformIDToName(stringutil.StringToInt(c.GetPlatformID())) case constant.RemoteAddr: return c.RemoteAddr default: @@ -69,7 +71,7 @@ func newContext(respWriter http.ResponseWriter, req *http.Request) *UserConnCont Path: req.URL.Path, Method: req.Method, RemoteAddr: req.RemoteAddr, - ConnID: utils.Md5(req.RemoteAddr + "_" + strconv.Itoa(int(utils.GetCurrentTimestampByMill()))), + ConnID: encrypt.Md5(req.RemoteAddr + "_" + strconv.Itoa(int(timeutil.GetCurrentTimestampByMill()))), } } diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index e29fbfc93..92f4bcec4 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -23,14 +23,14 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msggateway" - "github.com/openimsdk/tools/discoveryregistry" + "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "google.golang.org/grpc" ) -func (s *Server) InitServer(ctx context.Context, config *config.GlobalConfig, disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { +func (s *Server) InitServer(ctx context.Context, config *config.GlobalConfig, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error { rdb, err := cache.NewRedis(ctx, &config.Redis) if err != nil { return err diff --git a/internal/msggateway/message_handler.go b/internal/msggateway/message_handler.go index d527f2697..0b7df65ba 100644 --- a/internal/msggateway/message_handler.go +++ b/internal/msggateway/message_handler.go @@ -16,6 +16,8 @@ package msggateway import ( "context" + "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/utils/goassist" "sync" "github.com/go-playground/validator/v10" @@ -24,9 +26,7 @@ import ( "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/push" "github.com/openimsdk/protocol/sdkws" - "github.com/openimsdk/tools/discoveryregistry" "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/utils" "google.golang.org/protobuf/proto" ) @@ -46,7 +46,7 @@ func (r *Req) String() string { tReq.SendID = r.SendID tReq.OperationID = r.OperationID tReq.MsgIncr = r.MsgIncr - return utils.StructToJsonString(tReq) + return goassist.StructToJsonString(tReq) } var reqPool = sync.Pool{ @@ -86,7 +86,7 @@ func (r *Resp) String() string { tResp.OperationID = r.OperationID tResp.ErrCode = r.ErrCode tResp.ErrMsg = r.ErrMsg - return utils.StructToJsonString(tResp) + return goassist.StructToJsonString(tResp) } type MessageHandler interface { @@ -106,7 +106,7 @@ type GrpcHandler struct { validate *validator.Validate } -func NewGrpcHandler(validate *validator.Validate, client discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName *config.RpcRegisterName) *GrpcHandler { +func NewGrpcHandler(validate *validator.Validate, client discovery.SvcDiscoveryRegistry, rpcRegisterName *config.RpcRegisterName) *GrpcHandler { msgRpcClient := rpcclient.NewMessageRpcClient(client, rpcRegisterName.OpenImMsgName) pushRpcClient := rpcclient.NewPushRpcClient(client, rpcRegisterName.OpenImPushName) return &GrpcHandler{ diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index cdc75c1e4..770dc4c1f 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -19,6 +19,8 @@ import ( "encoding/json" "errors" "fmt" + "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/utils/stringutil" "net/http" "strconv" "sync" @@ -34,10 +36,8 @@ import ( "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msggateway" "github.com/openimsdk/tools/apiresp" - "github.com/openimsdk/tools/discoveryregistry" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/utils" "github.com/redis/go-redis/v9" "golang.org/x/sync/errgroup" ) @@ -49,7 +49,7 @@ type LongConnServer interface { GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool) Validate(s any) error SetCacheHandler(cache cache.TokenModel) - SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) + SetDiscoveryRegistry(client discovery.SvcDiscoveryRegistry, config *config.GlobalConfig) KickUserConn(client *Client) error UnRegister(c *Client) SetKickHandlerInfo(i *kickHandler) @@ -81,7 +81,7 @@ type WsServer struct { validate *validator.Validate cache cache.TokenModel userClient *rpcclient.UserRpcClient - disCov discoveryregistry.SvcDiscoveryRegistry + disCov discovery.SvcDiscoveryRegistry Compressor Encoder MessageHandler @@ -93,7 +93,7 @@ type kickHandler struct { newClient *Client } -func (ws *WsServer) SetDiscoveryRegistry(disCov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) { +func (ws *WsServer) SetDiscoveryRegistry(disCov discovery.SvcDiscoveryRegistry, config *config.GlobalConfig) { ws.MessageHandler = NewGrpcHandler(ws.validate, disCov, &config.RpcRegisterName) u := rpcclient.NewUserRpcClient(disCov, config.RpcRegisterName.OpenImUserName, &config.Manager, &config.IMAdmin) ws.userClient = &u @@ -176,7 +176,7 @@ func (ws *WsServer) Run(done chan error) error { shutdownDone = make(chan struct{}, 1) ) - server := http.Server{Addr: ":" + utils.IntToString(ws.port), Handler: nil} + server := http.Server{Addr: ":" + stringutil.IntToString(ws.port), Handler: nil} go func() { for { diff --git a/internal/msggateway/user_map.go b/internal/msggateway/user_map.go index 91a29a5bd..66e566745 100644 --- a/internal/msggateway/user_map.go +++ b/internal/msggateway/user_map.go @@ -16,10 +16,10 @@ package msggateway import ( "context" + "github.com/openimsdk/tools/utils/goassist" "sync" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/utils" ) type UserMap struct { @@ -93,7 +93,7 @@ func (u *UserMap) delete(key string, connRemoteAddr string) (isDeleteUser bool) } func (u *UserMap) deleteClients(key string, clients []*Client) (isDeleteUser bool) { - m := utils.SliceToMapAny(clients, func(c *Client) (string, struct{}) { + m := goassist.SliceToMapAny(clients, func(c *Client) (string, struct{}) { return c.ctx.GetRemoteAddr(), struct{}{} }) allClients, existed := u.m.Load(key)