From 34ed032af13e9743c8bf2b97695ba1d0d0692a54 Mon Sep 17 00:00:00 2001 From: xuexihuang <1339326187@qq.com> Date: Mon, 25 Dec 2023 15:32:07 +0800 Subject: [PATCH] K8s environment supports multiple msggateway by consistent hash (#1600) * feature:support multi msggateway * feature:support multi msggateway * feature:support multi msggateway by hash method * fix:fix log * change to consistent hash * change go.mod * fix:fix go routine values error * fix:fix push filter logic * fix:fix push filter logic --------- Co-authored-by: lin.huang --- go.mod | 8 +- go.sum | 12 +- internal/msggateway/n_ws_server.go | 13 +- internal/push/push_to_client.go | 139 +++++++++++++++--- .../kubernetes/kubernetes.go | 90 ++++++------ 5 files changed, 178 insertions(+), 84 deletions(-) diff --git a/go.mod b/go.mod index c8dd96689..9a9cde30e 100644 --- a/go.mod +++ b/go.mod @@ -34,10 +34,11 @@ require github.com/google/uuid v1.3.1 require ( github.com/IBM/sarama v1.41.3 github.com/OpenIMSDK/protocol v0.0.36 - github.com/OpenIMSDK/tools v0.0.20 + github.com/OpenIMSDK/tools v0.0.21 github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible github.com/go-redis/redis v6.15.9+incompatible github.com/redis/go-redis/v9 v9.2.1 + github.com/stathat/consistent v1.0.0 github.com/tencentyun/cos-go-sdk-v5 v0.7.45 go.uber.org/automaxprocs v1.5.3 golang.org/x/sync v0.4.0 @@ -141,6 +142,7 @@ require ( gopkg.in/src-d/go-billy.v4 v4.3.2 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect gorm.io/gorm v1.23.8 // indirect + stathat.com/c/consistent v1.0.0 // indirect ) require ( @@ -154,7 +156,3 @@ require ( golang.org/x/crypto v0.14.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) -//replace ( -// github.com/OpenIMSDK/protocol v0.0.34 => github.com/AndrewZuo01/protocol v0.0.0-20231218034338-b8d838e0b182 -//) - diff --git a/go.sum b/go.sum index 30e4b3cb4..61323c57b 100644 --- a/go.sum +++ b/go.sum @@ -18,10 +18,10 @@ firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIw github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c= github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= -github.com/OpenIMSDK/protocol v0.0.31 h1:ax43x9aqA6EKNXNukS5MT5BSTqkUmwO4uTvbJLtzCgE= -github.com/OpenIMSDK/protocol v0.0.31/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= -github.com/OpenIMSDK/tools v0.0.20 h1:zBTjQZRJ5lR1FIzP9mtWyAvh5dKsmJXQugi4p8X/97k= -github.com/OpenIMSDK/tools v0.0.20/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= +github.com/OpenIMSDK/protocol v0.0.36 h1:UJnFsr1A4RrNeHMNDVS/1nvXWFoGM43dcXpZeJiIZ+0= +github.com/OpenIMSDK/protocol v0.0.36/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= +github.com/OpenIMSDK/tools v0.0.21 h1:iTapc2mIEVH/xl5Nd6jfwPub11Pgp44tVcE1rjB3a48= +github.com/OpenIMSDK/tools v0.0.21/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs= @@ -308,6 +308,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/src-d/gcfg v1.4.0 h1:xXbNR5AlLSA315x2UO+fTSSAXCDf+Ar38/6oyGbDKQ4= github.com/src-d/gcfg v1.4.0/go.mod h1:p/UMsR43ujA89BJY9duynAwIpvqEujIH/jFlfL7jWoI= +github.com/stathat/consistent v1.0.0 h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U= +github.com/stathat/consistent v1.0.0/go.mod h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -536,3 +538,5 @@ gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81 honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= +stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c= +stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0= diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index 737d5db14..70c2f8fe0 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -288,12 +288,13 @@ func (ws *WsServer) registerClient(client *Client) { } wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - _ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client) - }() - + if config.Config.Envs.Discovery == "zookeeper" { + wg.Add(1) + go func() { + defer wg.Done() + _ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client) + }() + } wg.Add(1) go func() { defer wg.Done() diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 2ee8c457f..12b78ea2d 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "google.golang.org/grpc" "sync" "golang.org/x/sync/errgroup" @@ -142,6 +143,47 @@ func (p *Pusher) UnmarshalNotificationElem(bytes []byte, t any) error { return json.Unmarshal([]byte(notification.Detail), t) } +/* +k8s deployment,offline push group messages function +*/ +func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData, wsResults []*msggateway.SingleMsgToUserResults) error { + + var needOfflinePushUserIDs []string + for _, v := range wsResults { + if !v.OnlinePush { + needOfflinePushUserIDs = append(needOfflinePushUserIDs, v.UserID) + } + } + if len(needOfflinePushUserIDs) > 0 { + var offlinePushUserIDs []string + err := callbackOfflinePush(ctx, needOfflinePushUserIDs, msg, &offlinePushUserIDs) + if err != nil { + return err + } + + if len(offlinePushUserIDs) > 0 { + needOfflinePushUserIDs = offlinePushUserIDs + } + if msg.ContentType != constant.SignalingNotification { + resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs( + ctx, + &conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs}, + ) + if err != nil { + return err + } + if len(resp.UserIDs) > 0 { + err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs) + if err != nil { + log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg) + return err + } + } + } + + } + return nil +} func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) var pushToUserIDs []string @@ -205,7 +247,10 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg) isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) - if isOfflinePush { + if isOfflinePush && config.Config.Envs.Discovery == "k8s" { + return p.k8sOfflinePush2SuperGroup(ctx, groupID, msg, wsResults) + } + if isOfflinePush && config.Config.Envs.Discovery == "zookeeper" { var ( onlineSuccessUserIDs = []string{msg.SendID} webAndPcBackgroundUserIDs []string @@ -239,14 +284,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws } needOfflinePushUserIDs := utils.DifferenceString(onlineSuccessUserIDs, pushToUserIDs) - if msg.ContentType != constant.SignalingNotification { - notNotificationUserIDs, err := p.conversationLocalCache.GetRecvMsgNotNotifyUserIDs(ctx, groupID) - if err != nil { - return err - } - needOfflinePushUserIDs = utils.SliceSub(needOfflinePushUserIDs, notNotificationUserIDs) - } // Use offline push messaging if len(needOfflinePushUserIDs) > 0 { var offlinePushUserIDs []string @@ -258,30 +296,89 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws if len(offlinePushUserIDs) > 0 { needOfflinePushUserIDs = offlinePushUserIDs } - resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs( - ctx, - &conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs}, - ) - if err != nil { - return err - } - if len(resp.UserIDs) > 0 { - err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs) + if msg.ContentType != constant.SignalingNotification { + resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs( + ctx, + &conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs}, + ) if err != nil { - log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg) return err } - if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, webAndPcBackgroundUserIDs)); err != nil { - log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, webAndPcBackgroundUserIDs)) - return err + if len(resp.UserIDs) > 0 { + err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs) + if err != nil { + log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg) + return err + } + if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, webAndPcBackgroundUserIDs)); err != nil { + log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, webAndPcBackgroundUserIDs)) + return err + } } } + } } return nil } +func (p *Pusher) k8sOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { + var usersHost = make(map[string][]string) + for _, v := range pushToUserIDs { + tHost, err := p.discov.GetUserIdHashGatewayHost(ctx, v) + if err != nil { + log.ZError(ctx, "get msggateway hash error", err) + return nil, err + } + tUsers, tbl := usersHost[tHost] + if tbl { + tUsers = append(tUsers, v) + usersHost[tHost] = tUsers + } else { + usersHost[tHost] = []string{v} + } + } + log.ZDebug(ctx, "genUsers send hosts struct:", "usersHost", usersHost) + var usersConns = make(map[*grpc.ClientConn][]string) + for host, userIds := range usersHost { + tconn, _ := p.discov.GetConn(ctx, host) + usersConns[tconn] = userIds + } + var ( + mu sync.Mutex + wg = errgroup.Group{} + maxWorkers = config.Config.Push.MaxConcurrentWorkers + ) + if maxWorkers < 3 { + maxWorkers = 3 + } + wg.SetLimit(maxWorkers) + for conn, userIds := range usersConns { + tcon := conn + tuserIds := userIds + wg.Go(func() error { + input := &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: tuserIds} + msgClient := msggateway.NewMsgGatewayClient(tcon) + reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input) + if err != nil { + return nil + } + log.ZDebug(ctx, "push result", "reply", reply) + if reply != nil && reply.SinglePushResult != nil { + mu.Lock() + wsResults = append(wsResults, reply.SinglePushResult...) + mu.Unlock() + } + return nil + }) + } + _ = wg.Wait() + return wsResults, nil +} func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { + if config.Config.Envs.Discovery == "k8s" { + return p.k8sOnlinePush(ctx, msg, pushToUserIDs) + } conns, err := p.discov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName) log.ZDebug(ctx, "get gateway conn", "conn length", len(conns)) if err != nil { diff --git a/pkg/common/discoveryregister/kubernetes/kubernetes.go b/pkg/common/discoveryregister/kubernetes/kubernetes.go index cd5fb0a36..2ff1539e6 100644 --- a/pkg/common/discoveryregister/kubernetes/kubernetes.go +++ b/pkg/common/discoveryregister/kubernetes/kubernetes.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "github.com/stathat/consistent" "os" "strconv" "strings" @@ -31,51 +32,54 @@ import ( // K8sDR represents the Kubernetes service discovery and registration client. type K8sDR struct { - options []grpc.DialOption - rpcRegisterAddr string + options []grpc.DialOption + rpcRegisterAddr string + gatewayHostConsistent *consistent.Consistent } -// NewK8sDiscoveryRegister creates a new instance of K8sDR for Kubernetes service discovery and registration. func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) { - - return &K8sDR{}, nil + gatewayConsistent := consistent.New() + gatewayHosts := getMsgGatewayHost(context.Background()) + for _, v := range gatewayHosts { + gatewayConsistent.Add(v) + } + return &K8sDR{gatewayHostConsistent: gatewayConsistent}, nil } -// Register registers a service with Kubernetes. func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error { if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName { cli.rpcRegisterAddr = serviceName } else { - cli.rpcRegisterAddr = cli.getSelfHost(context.Background()) + cli.rpcRegisterAddr = getSelfHost(context.Background()) } return nil } - -// UnRegister removes a service registration from Kubernetes. func (cli *K8sDR) UnRegister() error { return nil } - -// CreateRpcRootNodes creates root nodes for RPC in Kubernetes. func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error { return nil } - -// RegisterConf2Registry registers a configuration to the registry. func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error { return nil } -// GetConfFromRegistry retrieves a configuration from the registry. func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) { + return nil, nil } - -func (cli *K8sDR) getSelfHost(ctx context.Context) string { +func (cli *K8sDR) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) { + host, err := cli.gatewayHostConsistent.Get(userId) + if err != nil { + log.ZError(ctx, "GetUserIdHashGatewayHost error", err) + } + return host, err +} +func getSelfHost(ctx context.Context) string { port := 88 instance := "openimserver" selfPodName := os.Getenv("MY_POD_NAME") @@ -95,26 +99,8 @@ func (cli *K8sDR) getSelfHost(ctx context.Context) string { return host } -// GetConns returns a list of gRPC client connections for a given service. -func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { - if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName { - conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...) - return []*grpc.ClientConn{conn}, err - } - var ret []*grpc.ClientConn - gatewayHosts := cli.getMsgGatewayHost(ctx) - for _, host := range gatewayHosts { - conn, err := grpc.DialContext(ctx, host, append(cli.options, opts...)...) - if err != nil { - return nil, err - } - ret = append(ret, conn) - } - return ret, nil -} - // like openimserver-openim-msggateway-0.openimserver-openim-msggateway-headless.openim-lin.svc.cluster.local:88 -func (cli *K8sDR) getMsgGatewayHost(ctx context.Context) []string { +func getMsgGatewayHost(ctx context.Context) []string { port := 88 instance := "openimserver" selfPodName := os.Getenv("MY_POD_NAME") @@ -135,40 +121,48 @@ func (cli *K8sDR) getMsgGatewayHost(ctx context.Context) []string { ret = append(ret, host) } log.ZInfo(ctx, "getMsgGatewayHost", "instance", instance, "selfPodName", selfPodName, "replicas", replicas, "ns", ns, "ret", ret) - return ret } +func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { -// GetConn returns a single gRPC client connection for a given service. + if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName { + conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...) + return []*grpc.ClientConn{conn}, err + } else { + var ret []*grpc.ClientConn + gatewayHosts := getMsgGatewayHost(ctx) + for _, host := range gatewayHosts { + conn, err := grpc.DialContext(ctx, host, append(cli.options, opts...)...) + if err != nil { + return nil, err + } else { + ret = append(ret, conn) + } + } + return ret, nil + } +} func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...) } -// GetSelfConnTarget returns the connection target of the client itself. func (cli *K8sDR) GetSelfConnTarget() string { + return cli.rpcRegisterAddr } - -// AddOption adds gRPC dial options to the client. func (cli *K8sDR) AddOption(opts ...grpc.DialOption) { cli.options = append(cli.options, opts...) } - -// CloseConn closes a given gRPC client connection. func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) { conn.Close() } -// do not use this method for call rpc. +// do not use this method for call rpc func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn { fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!") - return nil } - -// Close closes the K8sDR client. func (cli *K8sDR) Close() { - - // Close any open resources here (if applicable) return }