fix: message gateway update.

pull/2100/head
Gordon 2 years ago
parent 05c8d3dac3
commit 0bd7269f1e

@ -16,7 +16,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0 github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/localcache v0.0.1 github.com/openimsdk/localcache v0.0.1
github.com/openimsdk/protocol v0.0.58-google github.com/openimsdk/protocol v0.0.58-google
github.com/openimsdk/tools v0.0.46-alpha.16.0.20240322040503-5ee151e04e7d github.com/openimsdk/tools v0.0.46
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_golang v1.18.0
github.com/sirupsen/logrus v1.9.3 // indirect github.com/sirupsen/logrus v1.9.3 // indirect

@ -18,6 +18,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/openimsdk/tools/utils/stringutil"
"runtime/debug" "runtime/debug"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -29,7 +30,6 @@ import (
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
@ -91,7 +91,7 @@ type Client struct {
func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, isBackground, isCompress bool, longConnServer LongConnServer, token string) { func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, isBackground, isCompress bool, longConnServer LongConnServer, token string) {
c.w = new(sync.Mutex) c.w = new(sync.Mutex)
c.conn = conn c.conn = conn
c.PlatformID = utils.StringToInt(ctx.GetPlatformID()) c.PlatformID = stringutil.StringToInt(ctx.GetPlatformID())
c.IsCompress = isCompress c.IsCompress = isCompress
c.IsBackground = isBackground c.IsBackground = isBackground
c.UserID = ctx.GetUserID() c.UserID = ctx.GetUserID()

@ -15,13 +15,15 @@
package msggateway package msggateway
import ( import (
"github.com/openimsdk/tools/utils/encrypt"
"github.com/openimsdk/tools/utils/stringutil"
"github.com/openimsdk/tools/utils/timeutil"
"net/http" "net/http"
"net/url" "net/url"
"strconv" "strconv"
"time" "time"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/utils"
) )
type UserConnContext struct { type UserConnContext struct {
@ -54,7 +56,7 @@ func (c *UserConnContext) Value(key any) any {
case constant.ConnID: case constant.ConnID:
return c.GetConnID() return c.GetConnID()
case constant.OpUserPlatform: case constant.OpUserPlatform:
return constant.PlatformIDToName(utils.StringToInt(c.GetPlatformID())) return constant.PlatformIDToName(stringutil.StringToInt(c.GetPlatformID()))
case constant.RemoteAddr: case constant.RemoteAddr:
return c.RemoteAddr return c.RemoteAddr
default: default:
@ -69,7 +71,7 @@ func newContext(respWriter http.ResponseWriter, req *http.Request) *UserConnCont
Path: req.URL.Path, Path: req.URL.Path,
Method: req.Method, Method: req.Method,
RemoteAddr: req.RemoteAddr, RemoteAddr: req.RemoteAddr,
ConnID: utils.Md5(req.RemoteAddr + "_" + strconv.Itoa(int(utils.GetCurrentTimestampByMill()))), ConnID: encrypt.Md5(req.RemoteAddr + "_" + strconv.Itoa(int(timeutil.GetCurrentTimestampByMill()))),
} }
} }

@ -23,14 +23,14 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/msggateway" "github.com/openimsdk/protocol/msggateway"
"github.com/openimsdk/tools/discoveryregistry" "github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mcontext"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
func (s *Server) InitServer(ctx context.Context, config *config.GlobalConfig, disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { func (s *Server) InitServer(ctx context.Context, config *config.GlobalConfig, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(ctx, &config.Redis) rdb, err := cache.NewRedis(ctx, &config.Redis)
if err != nil { if err != nil {
return err return err

@ -16,6 +16,8 @@ package msggateway
import ( import (
"context" "context"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/utils/goassist"
"sync" "sync"
"github.com/go-playground/validator/v10" "github.com/go-playground/validator/v10"
@ -24,9 +26,7 @@ import (
"github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/push" "github.com/openimsdk/protocol/push"
"github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/discoveryregistry"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
@ -46,7 +46,7 @@ func (r *Req) String() string {
tReq.SendID = r.SendID tReq.SendID = r.SendID
tReq.OperationID = r.OperationID tReq.OperationID = r.OperationID
tReq.MsgIncr = r.MsgIncr tReq.MsgIncr = r.MsgIncr
return utils.StructToJsonString(tReq) return goassist.StructToJsonString(tReq)
} }
var reqPool = sync.Pool{ var reqPool = sync.Pool{
@ -86,7 +86,7 @@ func (r *Resp) String() string {
tResp.OperationID = r.OperationID tResp.OperationID = r.OperationID
tResp.ErrCode = r.ErrCode tResp.ErrCode = r.ErrCode
tResp.ErrMsg = r.ErrMsg tResp.ErrMsg = r.ErrMsg
return utils.StructToJsonString(tResp) return goassist.StructToJsonString(tResp)
} }
type MessageHandler interface { type MessageHandler interface {
@ -106,7 +106,7 @@ type GrpcHandler struct {
validate *validator.Validate validate *validator.Validate
} }
func NewGrpcHandler(validate *validator.Validate, client discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName *config.RpcRegisterName) *GrpcHandler { func NewGrpcHandler(validate *validator.Validate, client discovery.SvcDiscoveryRegistry, rpcRegisterName *config.RpcRegisterName) *GrpcHandler {
msgRpcClient := rpcclient.NewMessageRpcClient(client, rpcRegisterName.OpenImMsgName) msgRpcClient := rpcclient.NewMessageRpcClient(client, rpcRegisterName.OpenImMsgName)
pushRpcClient := rpcclient.NewPushRpcClient(client, rpcRegisterName.OpenImPushName) pushRpcClient := rpcclient.NewPushRpcClient(client, rpcRegisterName.OpenImPushName)
return &GrpcHandler{ return &GrpcHandler{

@ -19,6 +19,8 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/utils/stringutil"
"net/http" "net/http"
"strconv" "strconv"
"sync" "sync"
@ -34,10 +36,8 @@ import (
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/msggateway" "github.com/openimsdk/protocol/msggateway"
"github.com/openimsdk/tools/apiresp" "github.com/openimsdk/tools/apiresp"
"github.com/openimsdk/tools/discoveryregistry"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
@ -49,7 +49,7 @@ type LongConnServer interface {
GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool) GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool)
Validate(s any) error Validate(s any) error
SetCacheHandler(cache cache.TokenModel) SetCacheHandler(cache cache.TokenModel)
SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) SetDiscoveryRegistry(client discovery.SvcDiscoveryRegistry, config *config.GlobalConfig)
KickUserConn(client *Client) error KickUserConn(client *Client) error
UnRegister(c *Client) UnRegister(c *Client)
SetKickHandlerInfo(i *kickHandler) SetKickHandlerInfo(i *kickHandler)
@ -81,7 +81,7 @@ type WsServer struct {
validate *validator.Validate validate *validator.Validate
cache cache.TokenModel cache cache.TokenModel
userClient *rpcclient.UserRpcClient userClient *rpcclient.UserRpcClient
disCov discoveryregistry.SvcDiscoveryRegistry disCov discovery.SvcDiscoveryRegistry
Compressor Compressor
Encoder Encoder
MessageHandler MessageHandler
@ -93,7 +93,7 @@ type kickHandler struct {
newClient *Client newClient *Client
} }
func (ws *WsServer) SetDiscoveryRegistry(disCov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) { func (ws *WsServer) SetDiscoveryRegistry(disCov discovery.SvcDiscoveryRegistry, config *config.GlobalConfig) {
ws.MessageHandler = NewGrpcHandler(ws.validate, disCov, &config.RpcRegisterName) ws.MessageHandler = NewGrpcHandler(ws.validate, disCov, &config.RpcRegisterName)
u := rpcclient.NewUserRpcClient(disCov, config.RpcRegisterName.OpenImUserName, &config.Manager, &config.IMAdmin) u := rpcclient.NewUserRpcClient(disCov, config.RpcRegisterName.OpenImUserName, &config.Manager, &config.IMAdmin)
ws.userClient = &u ws.userClient = &u
@ -176,7 +176,7 @@ func (ws *WsServer) Run(done chan error) error {
shutdownDone = make(chan struct{}, 1) shutdownDone = make(chan struct{}, 1)
) )
server := http.Server{Addr: ":" + utils.IntToString(ws.port), Handler: nil} server := http.Server{Addr: ":" + stringutil.IntToString(ws.port), Handler: nil}
go func() { go func() {
for { for {

@ -16,10 +16,10 @@ package msggateway
import ( import (
"context" "context"
"github.com/openimsdk/tools/utils/goassist"
"sync" "sync"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils"
) )
type UserMap struct { type UserMap struct {
@ -93,7 +93,7 @@ func (u *UserMap) delete(key string, connRemoteAddr string) (isDeleteUser bool)
} }
func (u *UserMap) deleteClients(key string, clients []*Client) (isDeleteUser bool) { func (u *UserMap) deleteClients(key string, clients []*Client) (isDeleteUser bool) {
m := utils.SliceToMapAny(clients, func(c *Client) (string, struct{}) { m := goassist.SliceToMapAny(clients, func(c *Client) (string, struct{}) {
return c.ctx.GetRemoteAddr(), struct{}{} return c.ctx.GetRemoteAddr(), struct{}{}
}) })
allClients, existed := u.m.Load(key) allClients, existed := u.m.Load(key)

Loading…
Cancel
Save