From c4cdf4de103df0b9019bf9f65d11a4dea20dc4f9 Mon Sep 17 00:00:00 2001 From: luhaoling <2198702716@qq.com> Date: Fri, 23 Feb 2024 18:30:10 +0800 Subject: [PATCH] fix: replace global config with dependency injection --- internal/api/msg.go | 2 - internal/api/route.go | 31 ++++----- internal/api/user.go | 5 +- internal/msggateway/message_handler.go | 5 +- internal/msggateway/n_ws_server.go | 10 +-- internal/rpc/auth/auth.go | 12 ++-- internal/rpc/conversation/conversaion.go | 12 ++-- internal/rpc/friend/callback.go | 60 ++++++++-------- internal/rpc/friend/friend.go | 32 ++++----- internal/rpc/group/callback.go | 89 ++++++++++++------------ internal/rpc/group/group.go | 52 +++++++------- internal/rpc/msg/as_read.go | 4 +- internal/rpc/msg/callback.go | 52 +++++++------- internal/rpc/msg/message_interceptor.go | 8 +-- internal/rpc/msg/revoke.go | 11 ++- internal/rpc/msg/send.go | 14 ++-- internal/rpc/msg/server.go | 18 ++--- internal/rpc/msg/utils.go | 6 +- internal/rpc/msg/verify.go | 12 ++-- internal/rpc/third/third.go | 10 +-- internal/rpc/user/callback.go | 36 +++++----- internal/rpc/user/user.go | 34 ++++----- internal/tools/cron_task.go | 4 +- internal/tools/cron_task_test.go | 6 +- internal/tools/msg.go | 35 +++++----- pkg/authverify/token.go | 40 +++++------ pkg/common/cmd/msg_gateway.go | 7 +- pkg/common/cmd/msg_transfer.go | 1 - pkg/common/cmd/msg_utils.go | 4 +- pkg/common/config/parse_test.go | 3 +- pkg/common/db/cache/init_redis.go | 12 ++-- pkg/common/db/cache/meta_cache.go | 16 ++--- pkg/common/db/cache/msg.go | 23 +++--- pkg/common/db/controller/msg.go | 14 ++-- pkg/common/db/s3/cos/cos.go | 8 ++- pkg/common/db/s3/minio/minio.go | 28 ++++---- pkg/common/db/unrelation/mongo.go | 24 +++---- pkg/rpcclient/auth.go | 7 +- pkg/rpcclient/conversation.go | 11 +-- pkg/rpcclient/friend.go | 11 +-- pkg/rpcclient/group.go | 11 +-- pkg/rpcclient/msg.go | 27 +++---- pkg/rpcclient/notification/group.go | 20 +----- pkg/rpcclient/third.go | 7 +- pkg/rpcclient/user.go | 11 +-- 45 files changed, 416 insertions(+), 429 deletions(-) diff --git a/internal/api/msg.go b/internal/api/msg.go index 9348596ac..93995d41e 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -209,8 +209,6 @@ func (m *MessageApi) SendMessage(c *gin.Context) { // Prepare the message request with additional required data. sendMsgReq, err := m.getSendMsgReq(c, req.SendMsg) if err != nil { - // Log and respond with an error if preparation fails. - log.ZError(c, "decodeData failed", err) apiresp.GinError(c, err) return } diff --git a/internal/api/route.go b/internal/api/route.go index 88004e802..6c001f900 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -58,7 +58,7 @@ func Start(config *config.GlobalConfig, port int, proPort int) error { err := "port or proPort is empty:" + strconv.Itoa(port) + "," + strconv.Itoa(proPort) return errs.Wrap(fmt.Errorf(err)) } - rdb, err := cache.NewRedis() + rdb, err := cache.NewRedis(config) if err != nil { return err } @@ -82,7 +82,7 @@ func Start(config *config.GlobalConfig, port int, proPort int) error { netDone = make(chan struct{}, 1) netErr error ) - router := newGinRouter(client, rdb) + router := newGinRouter(client, rdb, config) if config.Prometheus.Enable { go func() { p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api")) @@ -132,27 +132,26 @@ func Start(config *config.GlobalConfig, port int, proPort int) error { return nil } -func newGinRouter(disCov discoveryregistry.SvcDiscoveryRegistry, rdb redis.UniversalClient) *gin.Engine { +func newGinRouter(disCov discoveryregistry.SvcDiscoveryRegistry, rdb redis.UniversalClient, config *config.GlobalConfig) *gin.Engine { disCov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) // 默认RPC中间件 gin.SetMode(gin.ReleaseMode) r := gin.New() if v, ok := binding.Validator.Engine().(*validator.Validate); ok { _ = v.RegisterValidation("required_if", RequiredIf) } - log.ZInfo(context.Background(), "load config", "config", config.Config) r.Use(gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID()) // init rpc client here - userRpc := rpcclient.NewUser(disCov) - groupRpc := rpcclient.NewGroup(disCov) - friendRpc := rpcclient.NewFriend(disCov) - messageRpc := rpcclient.NewMessage(disCov) - conversationRpc := rpcclient.NewConversation(disCov) - authRpc := rpcclient.NewAuth(disCov) - thirdRpc := rpcclient.NewThird(disCov) + userRpc := rpcclient.NewUser(disCov, config) + groupRpc := rpcclient.NewGroup(disCov, config) + friendRpc := rpcclient.NewFriend(disCov, config) + messageRpc := rpcclient.NewMessage(disCov, config) + conversationRpc := rpcclient.NewConversation(disCov, config) + authRpc := rpcclient.NewAuth(disCov, config) + thirdRpc := rpcclient.NewThird(disCov, config) u := NewUserApi(*userRpc) m := NewMessageApi(messageRpc, userRpc) - ParseToken := GinParseToken(rdb) + ParseToken := GinParseToken(rdb, config) userRouterGroup := r.Group("/user") { userRouterGroup.POST("/user_register", u.UserRegister) @@ -314,11 +313,11 @@ func newGinRouter(disCov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive return r } -func GinParseToken(rdb redis.UniversalClient) gin.HandlerFunc { +func GinParseToken(rdb redis.UniversalClient, config *config.GlobalConfig) gin.HandlerFunc { dataBase := controller.NewAuthDatabase( - cache.NewMsgCacheModel(rdb), - config.Config.Secret, - config.Config.TokenPolicy.Expire, + cache.NewMsgCacheModel(rdb, config), + config.Secret, + config.TokenPolicy.Expire, ) return func(c *gin.Context) { switch c.Request.Method { diff --git a/internal/api/user.go b/internal/api/user.go index e7bbd4bfb..a62da6e25 100644 --- a/internal/api/user.go +++ b/internal/api/user.go @@ -24,7 +24,6 @@ import ( "github.com/OpenIMSDK/tools/log" "github.com/gin-gonic/gin" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" ) @@ -71,7 +70,7 @@ func (u *UserApi) GetUsersOnlineStatus(c *gin.Context) { apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) return } - conns, err := u.Discov.GetConns(c, config.Config.RpcRegisterName.OpenImMessageGatewayName) + conns, err := u.Discov.GetConns(c, u.Config.RpcRegisterName.OpenImMessageGatewayName) if err != nil { apiresp.GinError(c, err) return @@ -135,7 +134,7 @@ func (u *UserApi) GetUsersOnlineTokenDetail(c *gin.Context) { apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) return } - conns, err := u.Discov.GetConns(c, config.Config.RpcRegisterName.OpenImMessageGatewayName) + conns, err := u.Discov.GetConns(c, u.Config.RpcRegisterName.OpenImMessageGatewayName) if err != nil { apiresp.GinError(c, err) return diff --git a/internal/msggateway/message_handler.go b/internal/msggateway/message_handler.go index dd5e00f18..1082de7bd 100644 --- a/internal/msggateway/message_handler.go +++ b/internal/msggateway/message_handler.go @@ -16,6 +16,7 @@ package msggateway import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "sync" "github.com/OpenIMSDK/protocol/push" @@ -107,8 +108,8 @@ type GrpcHandler struct { validate *validator.Validate } -func NewGrpcHandler(validate *validator.Validate, client discoveryregistry.SvcDiscoveryRegistry) *GrpcHandler { - msgRpcClient := rpcclient.NewMessageRpcClient(client) +func NewGrpcHandler(validate *validator.Validate, client discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *GrpcHandler { + msgRpcClient := rpcclient.NewMessageRpcClient(client, config) pushRpcClient := rpcclient.NewPushRpcClient(client) return &GrpcHandler{ msgRpcClient: &msgRpcClient, diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index ffefb6e5d..75b27fc68 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -52,7 +52,7 @@ type LongConnServer interface { GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool) Validate(s any) error SetCacheHandler(cache cache.MsgModel) - SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry) + SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) KickUserConn(client *Client) error UnRegister(c *Client) SetKickHandlerInfo(i *kickHandler) @@ -94,9 +94,9 @@ type kickHandler struct { newClient *Client } -func (ws *WsServer) SetDiscoveryRegistry(disCov discoveryregistry.SvcDiscoveryRegistry) { - ws.MessageHandler = NewGrpcHandler(ws.validate, disCov) - u := rpcclient.NewUserRpcClient(disCov) +func (ws *WsServer) SetDiscoveryRegistry(disCov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) { + ws.MessageHandler = NewGrpcHandler(ws.validate, disCov, config) + u := rpcclient.NewUserRpcClient(disCov, config) ws.userClient = &u ws.disCov = disCov } @@ -442,7 +442,7 @@ func (ws *WsServer) ParseWSArgs(r *http.Request) (args *WSArgs, err error) { return nil, errs.ErrConnArgsErr.Wrap("platformID is not int") } v.PlatformID = platformID - if err = authverify.WsVerifyToken(v.Token, v.UserID, platformID); err != nil { + if err = authverify.WsVerifyToken(v.Token, v.UserID, ws.globalConfig.Secret, platformID); err != nil { return nil, err } if query.Get(Compression) == GzipCompressionProtocol { diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 301204e2e..a5bca825e 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -46,11 +46,11 @@ type authServer struct { } func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { - rdb, err := cache.NewRedis() + rdb, err := cache.NewRedis(config) if err != nil { return err } - userRpcClient := rpcclient.NewUserRpcClient(client) + userRpcClient := rpcclient.NewUserRpcClient(client, config) pbauth.RegisterAuthServer(server, &authServer{ userRpcClient: &userRpcClient, RegisterCenter: client, @@ -66,7 +66,7 @@ func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryReg func (s *authServer) UserToken(ctx context.Context, req *pbauth.UserTokenReq) (*pbauth.UserTokenResp, error) { resp := pbauth.UserTokenResp{} - if req.Secret != config.Config.Secret { + if req.Secret != s.config.Secret { return nil, errs.ErrNoPermission.Wrap("secret invalid") } if _, err := s.userRpcClient.GetUserInfo(ctx, req.UserID); err != nil { @@ -78,7 +78,7 @@ func (s *authServer) UserToken(ctx context.Context, req *pbauth.UserTokenReq) (* } prommetrics.UserLoginCounter.Inc() resp.Token = token - resp.ExpireTimeSeconds = config.Config.TokenPolicy.Expire * 24 * 60 * 60 + resp.ExpireTimeSeconds = s.config.TokenPolicy.Expire * 24 * 60 * 60 return &resp, nil } @@ -100,7 +100,7 @@ func (s *authServer) GetUserToken(ctx context.Context, req *pbauth.GetUserTokenR return nil, err } resp.Token = token - resp.ExpireTimeSeconds = config.Config.TokenPolicy.Expire * 24 * 60 * 60 + resp.ExpireTimeSeconds = s.config.TokenPolicy.Expire * 24 * 60 * 60 return &resp, nil } @@ -155,7 +155,7 @@ func (s *authServer) ForceLogout(ctx context.Context, req *pbauth.ForceLogoutReq } func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID int32, operationID string) error { - conns, err := s.RegisterCenter.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName) + conns, err := s.RegisterCenter.GetConns(ctx, s.config.RpcRegisterName.OpenImMessageGatewayName) if err != nil { return err } diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 311e82d4b..43582bb32 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -62,21 +62,21 @@ func (c *conversationServer) GetConversationNotReceiveMessageUserIDs( } func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { - rdb, err := cache.NewRedis() + rdb, err := cache.NewRedis(config) if err != nil { return err } - mongo, err := unrelation.NewMongo() + mongo, err := unrelation.NewMongo(config) if err != nil { return err } - conversationDB, err := mgo.NewConversationMongo(mongo.GetDatabase()) + conversationDB, err := mgo.NewConversationMongo(mongo.GetDatabase(config.Mongo.Database)) if err != nil { return err } - groupRpcClient := rpcclient.NewGroupRpcClient(client) - msgRpcClient := rpcclient.NewMessageRpcClient(client) - userRpcClient := rpcclient.NewUserRpcClient(client) + groupRpcClient := rpcclient.NewGroupRpcClient(client, config) + msgRpcClient := rpcclient.NewMessageRpcClient(client, config) + userRpcClient := rpcclient.NewUserRpcClient(client, config) pbconversation.RegisterConversationServer(server, &conversationServer{ msgRpcClient: &msgRpcClient, user: &userRpcClient, diff --git a/internal/rpc/friend/callback.go b/internal/rpc/friend/callback.go index e5054d9a9..8db5c5141 100644 --- a/internal/rpc/friend/callback.go +++ b/internal/rpc/friend/callback.go @@ -25,8 +25,8 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/http" ) -func CallbackBeforeAddFriend(ctx context.Context, req *pbfriend.ApplyToAddFriendReq) error { - if !config.Config.Callback.CallbackBeforeAddFriend.Enable { +func CallbackBeforeAddFriend(ctx context.Context, globalConfig *config.GlobalConfig, req *pbfriend.ApplyToAddFriendReq) error { + if !globalConfig.Callback.CallbackBeforeAddFriend.Enable { return nil } cbReq := &cbapi.CallbackBeforeAddFriendReq{ @@ -37,14 +37,14 @@ func CallbackBeforeAddFriend(ctx context.Context, req *pbfriend.ApplyToAddFriend Ex: req.Ex, } resp := &cbapi.CallbackBeforeAddFriendResp{} - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeAddFriend); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackBeforeAddFriend); err != nil { return err } return nil } -func CallbackBeforeSetFriendRemark(ctx context.Context, req *pbfriend.SetFriendRemarkReq) error { - if !config.Config.Callback.CallbackBeforeSetFriendRemark.Enable { +func CallbackBeforeSetFriendRemark(ctx context.Context, globalConfig *config.GlobalConfig, req *pbfriend.SetFriendRemarkReq) error { + if !globalConfig.Callback.CallbackBeforeSetFriendRemark.Enable { return nil } cbReq := &cbapi.CallbackBeforeSetFriendRemarkReq{ @@ -54,15 +54,15 @@ func CallbackBeforeSetFriendRemark(ctx context.Context, req *pbfriend.SetFriendR Remark: req.Remark, } resp := &cbapi.CallbackBeforeSetFriendRemarkResp{} - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeAddFriend); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackBeforeAddFriend); err != nil { return err } utils.NotNilReplace(&req.Remark, &resp.Remark) return nil } -func CallbackAfterSetFriendRemark(ctx context.Context, req *pbfriend.SetFriendRemarkReq) error { - if !config.Config.Callback.CallbackAfterSetFriendRemark.Enable { +func CallbackAfterSetFriendRemark(ctx context.Context, globalConfig *config.GlobalConfig, req *pbfriend.SetFriendRemarkReq) error { + if !globalConfig.Callback.CallbackAfterSetFriendRemark.Enable { return nil } cbReq := &cbapi.CallbackAfterSetFriendRemarkReq{ @@ -72,13 +72,13 @@ func CallbackAfterSetFriendRemark(ctx context.Context, req *pbfriend.SetFriendRe Remark: req.Remark, } resp := &cbapi.CallbackAfterSetFriendRemarkResp{} - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeAddFriend); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackBeforeAddFriend); err != nil { return err } return nil } -func CallbackBeforeAddBlack(ctx context.Context, req *pbfriend.AddBlackReq) error { - if !config.Config.Callback.CallbackBeforeAddBlack.Enable { +func CallbackBeforeAddBlack(ctx context.Context, globalConfig *config.GlobalConfig, req *pbfriend.AddBlackReq) error { + if !globalConfig.Callback.CallbackBeforeAddBlack.Enable { return nil } cbReq := &cbapi.CallbackBeforeAddBlackReq{ @@ -87,13 +87,13 @@ func CallbackBeforeAddBlack(ctx context.Context, req *pbfriend.AddBlackReq) erro BlackUserID: req.BlackUserID, } resp := &cbapi.CallbackBeforeAddBlackResp{} - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeAddBlack); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackBeforeAddBlack); err != nil { return err } return nil } -func CallbackAfterAddFriend(ctx context.Context, req *pbfriend.ApplyToAddFriendReq) error { - if !config.Config.Callback.CallbackAfterAddFriend.Enable { +func CallbackAfterAddFriend(ctx context.Context, globalConfig *config.GlobalConfig, req *pbfriend.ApplyToAddFriendReq) error { + if !globalConfig.Callback.CallbackAfterAddFriend.Enable { return nil } cbReq := &cbapi.CallbackAfterAddFriendReq{ @@ -103,14 +103,14 @@ func CallbackAfterAddFriend(ctx context.Context, req *pbfriend.ApplyToAddFriendR ReqMsg: req.ReqMsg, } resp := &cbapi.CallbackAfterAddFriendResp{} - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackAfterAddFriend); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackAfterAddFriend); err != nil { return err } return nil } -func CallbackBeforeAddFriendAgree(ctx context.Context, req *pbfriend.RespondFriendApplyReq) error { - if !config.Config.Callback.CallbackBeforeAddFriendAgree.Enable { +func CallbackBeforeAddFriendAgree(ctx context.Context, globalConfig *config.GlobalConfig, req *pbfriend.RespondFriendApplyReq) error { + if !globalConfig.Callback.CallbackBeforeAddFriendAgree.Enable { return nil } cbReq := &cbapi.CallbackBeforeAddFriendAgreeReq{ @@ -121,13 +121,13 @@ func CallbackBeforeAddFriendAgree(ctx context.Context, req *pbfriend.RespondFrie HandleResult: req.HandleResult, } resp := &cbapi.CallbackBeforeAddFriendAgreeResp{} - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeAddFriendAgree); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackBeforeAddFriendAgree); err != nil { return err } return nil } -func CallbackAfterDeleteFriend(ctx context.Context, req *pbfriend.DeleteFriendReq) error { - if !config.Config.Callback.CallbackAfterDeleteFriend.Enable { +func CallbackAfterDeleteFriend(ctx context.Context, globalConfig *config.GlobalConfig, req *pbfriend.DeleteFriendReq) error { + if !globalConfig.Callback.CallbackAfterDeleteFriend.Enable { return nil } cbReq := &cbapi.CallbackAfterDeleteFriendReq{ @@ -136,13 +136,13 @@ func CallbackAfterDeleteFriend(ctx context.Context, req *pbfriend.DeleteFriendRe FriendUserID: req.FriendUserID, } resp := &cbapi.CallbackAfterDeleteFriendResp{} - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackAfterDeleteFriend); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackAfterDeleteFriend); err != nil { return err } return nil } -func CallbackBeforeImportFriends(ctx context.Context, req *pbfriend.ImportFriendReq) error { - if !config.Config.Callback.CallbackBeforeImportFriends.Enable { +func CallbackBeforeImportFriends(ctx context.Context, globalConfig *config.GlobalConfig, req *pbfriend.ImportFriendReq) error { + if !globalConfig.Callback.CallbackBeforeImportFriends.Enable { return nil } cbReq := &cbapi.CallbackBeforeImportFriendsReq{ @@ -151,7 +151,7 @@ func CallbackBeforeImportFriends(ctx context.Context, req *pbfriend.ImportFriend FriendUserIDs: req.FriendUserIDs, } resp := &cbapi.CallbackBeforeImportFriendsResp{} - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeImportFriends); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackBeforeImportFriends); err != nil { return err } if len(resp.FriendUserIDs) != 0 { @@ -159,8 +159,8 @@ func CallbackBeforeImportFriends(ctx context.Context, req *pbfriend.ImportFriend } return nil } -func CallbackAfterImportFriends(ctx context.Context, req *pbfriend.ImportFriendReq) error { - if !config.Config.Callback.CallbackAfterImportFriends.Enable { +func CallbackAfterImportFriends(ctx context.Context, globalConfig *config.GlobalConfig, req *pbfriend.ImportFriendReq) error { + if !globalConfig.Callback.CallbackAfterImportFriends.Enable { return nil } cbReq := &cbapi.CallbackAfterImportFriendsReq{ @@ -169,14 +169,14 @@ func CallbackAfterImportFriends(ctx context.Context, req *pbfriend.ImportFriendR FriendUserIDs: req.FriendUserIDs, } resp := &cbapi.CallbackAfterImportFriendsResp{} - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackAfterImportFriends); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackAfterImportFriends); err != nil { return err } return nil } -func CallbackAfterRemoveBlack(ctx context.Context, req *pbfriend.RemoveBlackReq) error { - if !config.Config.Callback.CallbackAfterRemoveBlack.Enable { +func CallbackAfterRemoveBlack(ctx context.Context, globalConfig *config.GlobalConfig, req *pbfriend.RemoveBlackReq) error { + if !globalConfig.Callback.CallbackAfterRemoveBlack.Enable { return nil } cbReq := &cbapi.CallbackAfterRemoveBlackReq{ @@ -185,7 +185,7 @@ func CallbackAfterRemoveBlack(ctx context.Context, req *pbfriend.RemoveBlackReq) BlackUserID: req.BlackUserID, } resp := &cbapi.CallbackAfterRemoveBlackResp{} - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackAfterRemoveBlack); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackAfterRemoveBlack); err != nil { return err } return nil diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index eacb5b921..bc5ada260 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -57,35 +57,35 @@ type friendServer struct { func Start(config *config.GlobalConfig, client registry.SvcDiscoveryRegistry, server *grpc.Server) error { // Initialize MongoDB - mongo, err := unrelation.NewMongo() + mongo, err := unrelation.NewMongo(config) if err != nil { return err } // Initialize Redis - rdb, err := cache.NewRedis() + rdb, err := cache.NewRedis(config) if err != nil { return err } - friendMongoDB, err := mgo.NewFriendMongo(mongo.GetDatabase()) + friendMongoDB, err := mgo.NewFriendMongo(mongo.GetDatabase(config.Mongo.Database)) if err != nil { return err } - friendRequestMongoDB, err := mgo.NewFriendRequestMongo(mongo.GetDatabase()) + friendRequestMongoDB, err := mgo.NewFriendRequestMongo(mongo.GetDatabase(config.Mongo.Database)) if err != nil { return err } - blackMongoDB, err := mgo.NewBlackMongo(mongo.GetDatabase()) + blackMongoDB, err := mgo.NewBlackMongo(mongo.GetDatabase(config.Mongo.Database)) if err != nil { return err } // Initialize RPC clients - userRpcClient := rpcclient.NewUserRpcClient(client) - msgRpcClient := rpcclient.NewMessageRpcClient(client) + userRpcClient := rpcclient.NewUserRpcClient(client, config) + msgRpcClient := rpcclient.NewMessageRpcClient(client, config) // Initialize notification sender notificationSender := notification.NewFriendNotificationSender( @@ -107,7 +107,7 @@ func Start(config *config.GlobalConfig, client registry.SvcDiscoveryRegistry, se userRpcClient: &userRpcClient, notificationSender: notificationSender, RegisterCenter: client, - conversationRpcClient: rpcclient.NewConversationRpcClient(client), + conversationRpcClient: rpcclient.NewConversationRpcClient(client, config), config: config, }) @@ -123,7 +123,7 @@ func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbfriend.Apply if req.ToUserID == req.FromUserID { return nil, errs.ErrCanNotAddYourself.Wrap("req.ToUserID", req.ToUserID) } - if err = CallbackBeforeAddFriend(ctx, req); err != nil && err != errs.ErrCallbackContinue { + if err = CallbackBeforeAddFriend(ctx, s.config, req); err != nil && err != errs.ErrCallbackContinue { return nil, err } if _, err := s.userRpcClient.GetUsersInfoMap(ctx, []string{req.ToUserID, req.FromUserID}); err != nil { @@ -140,7 +140,7 @@ func (s *friendServer) ApplyToAddFriend(ctx context.Context, req *pbfriend.Apply return nil, err } s.notificationSender.FriendApplicationAddNotification(ctx, req) - if err = CallbackAfterAddFriend(ctx, req); err != nil && err != errs.ErrCallbackContinue { + if err = CallbackAfterAddFriend(ctx, s.config, req); err != nil && err != errs.ErrCallbackContinue { return nil, err } return resp, nil @@ -161,7 +161,7 @@ func (s *friendServer) ImportFriends(ctx context.Context, req *pbfriend.ImportFr if utils.Duplicate(req.FriendUserIDs) { return nil, errs.ErrArgs.Wrap("friend userID repeated") } - if err := CallbackBeforeImportFriends(ctx, req); err != nil { + if err := CallbackBeforeImportFriends(ctx, s.config, req); err != nil { return nil, err } @@ -175,7 +175,7 @@ func (s *friendServer) ImportFriends(ctx context.Context, req *pbfriend.ImportFr HandleResult: constant.FriendResponseAgree, }) } - if err := CallbackAfterImportFriends(ctx, req); err != nil { + if err := CallbackAfterImportFriends(ctx, s.config, req); err != nil { return nil, err } return &pbfriend.ImportFriendResp{}, nil @@ -196,7 +196,7 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbfriend.Res HandleResult: req.HandleResult, } if req.HandleResult == constant.FriendResponseAgree { - if err := CallbackBeforeAddFriendAgree(ctx, req); err != nil && err != errs.ErrCallbackContinue { + if err := CallbackBeforeAddFriendAgree(ctx, s.config, req); err != nil && err != errs.ErrCallbackContinue { return nil, err } err := s.friendDatabase.AgreeFriendRequest(ctx, &friendRequest) @@ -232,7 +232,7 @@ func (s *friendServer) DeleteFriend(ctx context.Context, req *pbfriend.DeleteFri return nil, err } s.notificationSender.FriendDeletedNotification(ctx, req) - if err := CallbackAfterDeleteFriend(ctx, req); err != nil { + if err := CallbackAfterDeleteFriend(ctx, s.config, req); err != nil { return nil, err } return resp, nil @@ -242,7 +242,7 @@ func (s *friendServer) DeleteFriend(ctx context.Context, req *pbfriend.DeleteFri func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbfriend.SetFriendRemarkReq) (resp *pbfriend.SetFriendRemarkResp, err error) { defer log.ZInfo(ctx, utils.GetFuncName()+" Return") - if err = CallbackBeforeSetFriendRemark(ctx, req); err != nil && err != errs.ErrCallbackContinue { + if err = CallbackBeforeSetFriendRemark(ctx, s.config, req); err != nil && err != errs.ErrCallbackContinue { return nil, err } resp = &pbfriend.SetFriendRemarkResp{} @@ -256,7 +256,7 @@ func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbfriend.SetFri if err := s.friendDatabase.UpdateRemark(ctx, req.OwnerUserID, req.FriendUserID, req.Remark); err != nil { return nil, err } - if err := CallbackAfterSetFriendRemark(ctx, req); err != nil && err != errs.ErrCallbackContinue { + if err := CallbackAfterSetFriendRemark(ctx, s.config, req); err != nil && err != errs.ErrCallbackContinue { return nil, err } s.notificationSender.FriendRemarkSetNotification(ctx, req.OwnerUserID, req.FriendUserID) diff --git a/internal/rpc/group/callback.go b/internal/rpc/group/callback.go index d891f4d1e..18b484a3e 100644 --- a/internal/rpc/group/callback.go +++ b/internal/rpc/group/callback.go @@ -35,8 +35,8 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/http" ) -func CallbackBeforeCreateGroup(ctx context.Context, req *group.CreateGroupReq) (err error) { - if !config.Config.Callback.CallbackBeforeCreateGroup.Enable { +func CallbackBeforeCreateGroup(ctx context.Context, globalConfig *config.GlobalConfig, req *group.CreateGroupReq) (err error) { + if !globalConfig.Callback.CallbackBeforeCreateGroup.Enable { return nil } cbReq := &callbackstruct.CallbackBeforeCreateGroupReq{ @@ -61,7 +61,7 @@ func CallbackBeforeCreateGroup(ctx context.Context, req *group.CreateGroupReq) ( }) } resp := &callbackstruct.CallbackBeforeCreateGroupResp{} - if err = http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeCreateGroup); err != nil { + if err = http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackBeforeCreateGroup); err != nil { return err } utils.NotNilReplace(&req.GroupInfo.GroupID, resp.GroupID) @@ -79,8 +79,8 @@ func CallbackBeforeCreateGroup(ctx context.Context, req *group.CreateGroupReq) ( return nil } -func CallbackAfterCreateGroup(ctx context.Context, req *group.CreateGroupReq) (err error) { - if !config.Config.Callback.CallbackAfterCreateGroup.Enable { +func CallbackAfterCreateGroup(ctx context.Context, globalConfig *config.GlobalConfig, req *group.CreateGroupReq) (err error) { + if !globalConfig.Callback.CallbackAfterCreateGroup.Enable { return nil } cbReq := &callbackstruct.CallbackAfterCreateGroupReq{ @@ -104,7 +104,7 @@ func CallbackAfterCreateGroup(ctx context.Context, req *group.CreateGroupReq) (e }) } resp := &callbackstruct.CallbackAfterCreateGroupResp{} - if err = http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackAfterCreateGroup); err != nil { + if err = http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackAfterCreateGroup); err != nil { return err } return nil @@ -112,10 +112,11 @@ func CallbackAfterCreateGroup(ctx context.Context, req *group.CreateGroupReq) (e func CallbackBeforeMemberJoinGroup( ctx context.Context, + globalConfig *config.GlobalConfig, groupMember *relation.GroupMemberModel, groupEx string, ) (err error) { - if !config.Config.Callback.CallbackBeforeMemberJoinGroup.Enable { + if !globalConfig.Callback.CallbackBeforeMemberJoinGroup.Enable { return nil } callbackReq := &callbackstruct.CallbackBeforeMemberJoinGroupReq{ @@ -128,10 +129,10 @@ func CallbackBeforeMemberJoinGroup( resp := &callbackstruct.CallbackBeforeMemberJoinGroupResp{} err = http.CallBackPostReturn( ctx, - config.Config.Callback.CallbackUrl, + globalConfig.Callback.CallbackUrl, callbackReq, resp, - config.Config.Callback.CallbackBeforeMemberJoinGroup, + globalConfig.Callback.CallbackBeforeMemberJoinGroup, ) if err != nil { return err @@ -146,8 +147,8 @@ func CallbackBeforeMemberJoinGroup( return nil } -func CallbackBeforeSetGroupMemberInfo(ctx context.Context, req *group.SetGroupMemberInfo) (err error) { - if !config.Config.Callback.CallbackBeforeSetGroupMemberInfo.Enable { +func CallbackBeforeSetGroupMemberInfo(ctx context.Context, globalConfig *config.GlobalConfig, req *group.SetGroupMemberInfo) (err error) { + if !globalConfig.Callback.CallbackBeforeSetGroupMemberInfo.Enable { return nil } callbackReq := callbackstruct.CallbackBeforeSetGroupMemberInfoReq{ @@ -170,10 +171,10 @@ func CallbackBeforeSetGroupMemberInfo(ctx context.Context, req *group.SetGroupMe resp := &callbackstruct.CallbackBeforeSetGroupMemberInfoResp{} err = http.CallBackPostReturn( ctx, - config.Config.Callback.CallbackUrl, + globalConfig.Callback.CallbackUrl, callbackReq, resp, - config.Config.Callback.CallbackBeforeSetGroupMemberInfo, + globalConfig.Callback.CallbackBeforeSetGroupMemberInfo, ) if err != nil { return err @@ -192,8 +193,8 @@ func CallbackBeforeSetGroupMemberInfo(ctx context.Context, req *group.SetGroupMe } return nil } -func CallbackAfterSetGroupMemberInfo(ctx context.Context, req *group.SetGroupMemberInfo) (err error) { - if !config.Config.Callback.CallbackBeforeSetGroupMemberInfo.Enable { +func CallbackAfterSetGroupMemberInfo(ctx context.Context, globalConfig *config.GlobalConfig, req *group.SetGroupMemberInfo) (err error) { + if !globalConfig.Callback.CallbackBeforeSetGroupMemberInfo.Enable { return nil } callbackReq := callbackstruct.CallbackAfterSetGroupMemberInfoReq{ @@ -214,14 +215,14 @@ func CallbackAfterSetGroupMemberInfo(ctx context.Context, req *group.SetGroupMem callbackReq.Ex = &req.Ex.Value } resp := &callbackstruct.CallbackAfterSetGroupMemberInfoResp{} - if err = http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackAfterSetGroupMemberInfo); err != nil { + if err = http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, callbackReq, resp, globalConfig.Callback.CallbackAfterSetGroupMemberInfo); err != nil { return err } return nil } -func CallbackQuitGroup(ctx context.Context, req *group.QuitGroupReq) (err error) { - if !config.Config.Callback.CallbackQuitGroup.Enable { +func CallbackQuitGroup(ctx context.Context, globalConfig *config.GlobalConfig, req *group.QuitGroupReq) (err error) { + if !globalConfig.Callback.CallbackQuitGroup.Enable { return nil } cbReq := &callbackstruct.CallbackQuitGroupReq{ @@ -230,14 +231,14 @@ func CallbackQuitGroup(ctx context.Context, req *group.QuitGroupReq) (err error) UserID: req.UserID, } resp := &callbackstruct.CallbackQuitGroupResp{} - if err = http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackQuitGroup); err != nil { + if err = http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackQuitGroup); err != nil { return err } return nil } -func CallbackKillGroupMember(ctx context.Context, req *pbgroup.KickGroupMemberReq) (err error) { - if !config.Config.Callback.CallbackKillGroupMember.Enable { +func CallbackKillGroupMember(ctx context.Context, globalConfig *config.GlobalConfig, req *pbgroup.KickGroupMemberReq) (err error) { + if !globalConfig.Callback.CallbackKillGroupMember.Enable { return nil } cbReq := &callbackstruct.CallbackKillGroupMemberReq{ @@ -246,41 +247,41 @@ func CallbackKillGroupMember(ctx context.Context, req *pbgroup.KickGroupMemberRe KickedUserIDs: req.KickedUserIDs, } resp := &callbackstruct.CallbackKillGroupMemberResp{} - if err = http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackQuitGroup); err != nil { + if err = http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackQuitGroup); err != nil { return err } return nil } -func CallbackDismissGroup(ctx context.Context, req *callbackstruct.CallbackDisMissGroupReq) (err error) { - if !config.Config.Callback.CallbackDismissGroup.Enable { +func CallbackDismissGroup(ctx context.Context, globalConfig *config.GlobalConfig, req *callbackstruct.CallbackDisMissGroupReq) (err error) { + if !globalConfig.Callback.CallbackDismissGroup.Enable { return nil } req.CallbackCommand = callbackstruct.CallbackDisMissGroupCommand resp := &callbackstruct.CallbackDisMissGroupResp{} - if err = http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackQuitGroup); err != nil { + if err = http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackQuitGroup); err != nil { return err } return nil } -func CallbackApplyJoinGroupBefore(ctx context.Context, req *callbackstruct.CallbackJoinGroupReq) (err error) { - if !config.Config.Callback.CallbackBeforeJoinGroup.Enable { +func CallbackApplyJoinGroupBefore(ctx context.Context, globalConfig *config.GlobalConfig, req *callbackstruct.CallbackJoinGroupReq) (err error) { + if !globalConfig.Callback.CallbackBeforeJoinGroup.Enable { return nil } req.CallbackCommand = callbackstruct.CallbackBeforeJoinGroupCommand resp := &callbackstruct.CallbackJoinGroupResp{} - if err = http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeJoinGroup); err != nil { + if err = http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackBeforeJoinGroup); err != nil { return err } return nil } -func CallbackAfterTransferGroupOwner(ctx context.Context, req *pbgroup.TransferGroupOwnerReq) (err error) { - if !config.Config.Callback.CallbackAfterTransferGroupOwner.Enable { +func CallbackAfterTransferGroupOwner(ctx context.Context, globalConfig *config.GlobalConfig, req *pbgroup.TransferGroupOwnerReq) (err error) { + if !globalConfig.Callback.CallbackAfterTransferGroupOwner.Enable { return nil } @@ -292,13 +293,13 @@ func CallbackAfterTransferGroupOwner(ctx context.Context, req *pbgroup.TransferG } resp := &callbackstruct.CallbackTransferGroupOwnerResp{} - if err = http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackAfterTransferGroupOwner); err != nil { + if err = http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackAfterTransferGroupOwner); err != nil { return err } return nil } -func CallbackBeforeInviteUserToGroup(ctx context.Context, req *group.InviteUserToGroupReq) (err error) { - if !config.Config.Callback.CallbackBeforeInviteUserToGroup.Enable { +func CallbackBeforeInviteUserToGroup(ctx context.Context, globalConfig *config.GlobalConfig, req *group.InviteUserToGroupReq) (err error) { + if !globalConfig.Callback.CallbackBeforeInviteUserToGroup.Enable { return nil } @@ -313,10 +314,10 @@ func CallbackBeforeInviteUserToGroup(ctx context.Context, req *group.InviteUserT resp := &callbackstruct.CallbackBeforeInviteUserToGroupResp{} err = http.CallBackPostReturn( ctx, - config.Config.Callback.CallbackUrl, + globalConfig.Callback.CallbackUrl, callbackReq, resp, - config.Config.Callback.CallbackBeforeInviteUserToGroup, + globalConfig.Callback.CallbackBeforeInviteUserToGroup, ) if err != nil { @@ -330,8 +331,8 @@ func CallbackBeforeInviteUserToGroup(ctx context.Context, req *group.InviteUserT return nil } -func CallbackAfterJoinGroup(ctx context.Context, req *group.JoinGroupReq) error { - if !config.Config.Callback.CallbackAfterJoinGroup.Enable { +func CallbackAfterJoinGroup(ctx context.Context, globalConfig *config.GlobalConfig, req *group.JoinGroupReq) error { + if !globalConfig.Callback.CallbackAfterJoinGroup.Enable { return nil } callbackReq := &callbackstruct.CallbackAfterJoinGroupReq{ @@ -343,14 +344,14 @@ func CallbackAfterJoinGroup(ctx context.Context, req *group.JoinGroupReq) error InviterUserID: req.InviterUserID, } resp := &callbackstruct.CallbackAfterJoinGroupResp{} - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackAfterJoinGroup); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, callbackReq, resp, globalConfig.Callback.CallbackAfterJoinGroup); err != nil { return err } return nil } -func CallbackBeforeSetGroupInfo(ctx context.Context, req *group.SetGroupInfoReq) error { - if !config.Config.Callback.CallbackBeforeSetGroupInfo.Enable { +func CallbackBeforeSetGroupInfo(ctx context.Context, globalConfig *config.GlobalConfig, req *group.SetGroupInfoReq) error { + if !globalConfig.Callback.CallbackBeforeSetGroupInfo.Enable { return nil } callbackReq := &callbackstruct.CallbackBeforeSetGroupInfoReq{ @@ -377,7 +378,7 @@ func CallbackBeforeSetGroupInfo(ctx context.Context, req *group.SetGroupInfoReq) } resp := &callbackstruct.CallbackBeforeSetGroupInfoResp{} - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackBeforeSetGroupInfo); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, callbackReq, resp, globalConfig.Callback.CallbackBeforeSetGroupInfo); err != nil { return err } @@ -399,8 +400,8 @@ func CallbackBeforeSetGroupInfo(ctx context.Context, req *group.SetGroupInfoReq) utils.NotNilReplace(&req.GroupInfoForSet.Introduction, &resp.Introduction) return nil } -func CallbackAfterSetGroupInfo(ctx context.Context, req *group.SetGroupInfoReq) error { - if !config.Config.Callback.CallbackAfterSetGroupInfo.Enable { +func CallbackAfterSetGroupInfo(ctx context.Context, globalConfig *config.GlobalConfig, req *group.SetGroupInfoReq) error { + if !globalConfig.Callback.CallbackAfterSetGroupInfo.Enable { return nil } callbackReq := &callbackstruct.CallbackAfterSetGroupInfoReq{ @@ -424,7 +425,7 @@ func CallbackAfterSetGroupInfo(ctx context.Context, req *group.SetGroupInfoReq) callbackReq.ApplyMemberFriend = &req.GroupInfoForSet.ApplyMemberFriend.Value } resp := &callbackstruct.CallbackAfterSetGroupInfoResp{} - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackAfterSetGroupInfo); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, callbackReq, resp, globalConfig.Callback.CallbackAfterSetGroupInfo); err != nil { return err } return nil diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 07a8ed9dd..77d20aeeb 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -61,29 +61,29 @@ import ( ) func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { - mongo, err := unrelation.NewMongo() + mongo, err := unrelation.NewMongo(config) if err != nil { return err } - rdb, err := cache.NewRedis() + rdb, err := cache.NewRedis(config) if err != nil { return err } - groupDB, err := mgo.NewGroupMongo(mongo.GetDatabase()) + groupDB, err := mgo.NewGroupMongo(mongo.GetDatabase(config.Mongo.Database)) if err != nil { return err } - groupMemberDB, err := mgo.NewGroupMember(mongo.GetDatabase()) + groupMemberDB, err := mgo.NewGroupMember(mongo.GetDatabase(config.Mongo.Database)) if err != nil { return err } - groupRequestDB, err := mgo.NewGroupRequestMgo(mongo.GetDatabase()) + groupRequestDB, err := mgo.NewGroupRequestMgo(mongo.GetDatabase(config.Mongo.Database)) if err != nil { return err } - userRpcClient := rpcclient.NewUserRpcClient(client) - msgRpcClient := rpcclient.NewMessageRpcClient(client) - conversationRpcClient := rpcclient.NewConversationRpcClient(client) + userRpcClient := rpcclient.NewUserRpcClient(client, config) + msgRpcClient := rpcclient.NewMessageRpcClient(client, config) + conversationRpcClient := rpcclient.NewConversationRpcClient(client, config) var gs groupServer database := controller.NewGroupDatabase(rdb, groupDB, groupMemberDB, groupRequestDB, tx.NewMongo(mongo.GetClient()), grouphash.NewGroupHashFromGroupServer(&gs)) gs.db = database @@ -117,7 +117,6 @@ func (s *groupServer) GetJoinedGroupIDs(ctx context.Context, req *pbgroup.GetJoi } func (s *groupServer) NotificationUserInfoUpdate(ctx context.Context, req *pbgroup.NotificationUserInfoUpdateReq) (*pbgroup.NotificationUserInfoUpdateResp, error) { - defer log.ZDebug(ctx, "NotificationUserInfoUpdate return") members, err := s.db.FindGroupMemberUser(ctx, nil, req.UserID) if err != nil { return nil, err @@ -129,7 +128,6 @@ func (s *groupServer) NotificationUserInfoUpdate(ctx context.Context, req *pbgro } groupIDs = append(groupIDs, member.GroupID) } - log.ZInfo(ctx, "NotificationUserInfoUpdate", "joinGroupNum", len(members), "updateNum", len(groupIDs), "updateGroupIDs", groupIDs) for _, groupID := range groupIDs { if err := s.Notification.GroupMemberInfoSetNotification(ctx, groupID, req.UserID); err != nil { log.ZError(ctx, "NotificationUserInfoUpdate setGroupMemberInfo notification failed", err, "groupID", groupID) @@ -227,7 +225,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR return nil, errs.ErrUserIDNotFound.Wrap("user not found") } // Callback Before create Group - if err := CallbackBeforeCreateGroup(ctx, req); err != nil { + if err := CallbackBeforeCreateGroup(ctx, s.config, req); err != nil { return nil, err } var groupMembers []*relationtb.GroupMemberModel @@ -246,7 +244,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR JoinTime: time.Now(), MuteEndTime: time.UnixMilli(0), } - if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil { + if err := CallbackBeforeMemberJoinGroup(ctx, s.config, groupMember, group.Ex); err != nil { return err } groupMembers = append(groupMembers, groupMember) @@ -314,7 +312,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR AdminUserIDs: req.AdminUserIDs, } - if err := CallbackAfterCreateGroup(ctx, reqCallBackAfter); err != nil { + if err := CallbackAfterCreateGroup(ctx, s.config, reqCallBackAfter); err != nil { return nil, err } @@ -405,7 +403,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite } } - if err := CallbackBeforeInviteUserToGroup(ctx, req); err != nil { + if err := CallbackBeforeInviteUserToGroup(ctx, s.config, req); err != nil { return nil, err } if group.NeedVerification == constant.AllNeedVerification { @@ -449,7 +447,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite JoinTime: time.Now(), MuteEndTime: time.UnixMilli(0), } - if err := CallbackBeforeMemberJoinGroup(ctx, member, group.Ex); err != nil { + if err := CallbackBeforeMemberJoinGroup(ctx, s.config, member, group.Ex); err != nil { return nil, err } groupMembers = append(groupMembers, member) @@ -621,7 +619,7 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbgroup.KickGrou return nil, err } - if err := CallbackKillGroupMember(ctx, req); err != nil { + if err := CallbackKillGroupMember(ctx, s.config, req); err != nil { return nil, err } return resp, nil @@ -791,7 +789,7 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup OperatorUserID: mcontext.GetOpUserID(ctx), Ex: groupRequest.Ex, } - if err = CallbackBeforeMemberJoinGroup(ctx, member, group.Ex); err != nil { + if err = CallbackBeforeMemberJoinGroup(ctx, s.config, member, group.Ex); err != nil { return nil, err } } @@ -839,7 +837,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq) Ex: req.Ex, } - if err = CallbackApplyJoinGroupBefore(ctx, reqCall); err != nil { + if err = CallbackApplyJoinGroupBefore(ctx, s.config, reqCall); err != nil { return nil, err } _, err = s.db.TakeGroupMember(ctx, req.GroupID, req.InviterUserID) @@ -860,7 +858,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq) JoinTime: time.Now(), MuteEndTime: time.UnixMilli(0), } - if err := CallbackBeforeMemberJoinGroup(ctx, groupMember, group.Ex); err != nil { + if err := CallbackBeforeMemberJoinGroup(ctx, s.config, groupMember, group.Ex); err != nil { return nil, err } if err := s.db.CreateGroup(ctx, nil, []*relationtb.GroupMemberModel{groupMember}); err != nil { @@ -871,7 +869,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq) return nil, err } s.Notification.MemberEnterNotification(ctx, req.GroupID, req.InviterUserID) - if err = CallbackAfterJoinGroup(ctx, req); err != nil { + if err = CallbackAfterJoinGroup(ctx, s.config, req); err != nil { return nil, err } return resp, nil @@ -921,7 +919,7 @@ func (s *groupServer) QuitGroup(ctx context.Context, req *pbgroup.QuitGroupReq) } // callback - if err := CallbackQuitGroup(ctx, req); err != nil { + if err := CallbackQuitGroup(ctx, s.config, req); err != nil { return nil, err } return resp, nil @@ -951,7 +949,7 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf return nil, err } } - if err := CallbackBeforeSetGroupInfo(ctx, req); err != nil { + if err := CallbackBeforeSetGroupInfo(ctx, s.config, req); err != nil { return nil, err } group, err := s.db.TakeGroup(ctx, req.GroupInfoForSet.GroupID) @@ -1020,7 +1018,7 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf if num > 0 { _ = s.Notification.GroupInfoSetNotification(ctx, tips) } - if err := CallbackAfterSetGroupInfo(ctx, req); err != nil { + if err := CallbackAfterSetGroupInfo(ctx, s.config, req); err != nil { return nil, err } return resp, nil @@ -1066,7 +1064,7 @@ func (s *groupServer) TransferGroupOwner(ctx context.Context, req *pbgroup.Trans return nil, err } - if err := CallbackAfterTransferGroupOwner(ctx, req); err != nil { + if err := CallbackAfterTransferGroupOwner(ctx, s.config, req); err != nil { return nil, err } s.Notification.GroupOwnerTransferredNotification(ctx, req) @@ -1240,7 +1238,7 @@ func (s *groupServer) DismissGroup(ctx context.Context, req *pbgroup.DismissGrou MembersID: membersID, GroupType: string(group.GroupType), } - if err := CallbackDismissGroup(ctx, reqCall); err != nil { + if err := CallbackDismissGroup(ctx, s.config, reqCall); err != nil { return nil, err } @@ -1432,7 +1430,7 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbgroup.SetGr } } for i := 0; i < len(req.Members); i++ { - if err := CallbackBeforeSetGroupMemberInfo(ctx, req.Members[i]); err != nil { + if err := CallbackBeforeSetGroupMemberInfo(ctx, s.config, req.Members[i]); err != nil { return nil, err } } @@ -1459,7 +1457,7 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbgroup.SetGr } } for i := 0; i < len(req.Members); i++ { - if err := CallbackAfterSetGroupMemberInfo(ctx, req.Members[i]); err != nil { + if err := CallbackAfterSetGroupMemberInfo(ctx, s.config, req.Members[i]); err != nil { return nil, err } } diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index cb292421e..8b4e196af 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -132,7 +132,7 @@ func (m *msgServer) MarkMsgsAsRead( Seqs: req.Seqs, ContentType: conversation.ConversationType, } - if err = CallbackSingleMsgRead(ctx, req_callback); err != nil { + if err = CallbackSingleMsgRead(ctx, m.config, req_callback); err != nil { return nil, err } @@ -209,7 +209,7 @@ func (m *msgServer) MarkConversationAsRead( UnreadMsgNum: req.HasReadSeq, ContentType: int64(conversation.ConversationType), } - if err := CallbackGroupMsgRead(ctx, reqCall); err != nil { + if err := CallbackGroupMsgRead(ctx, m.config, reqCall); err != nil { return nil, err } diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index f98318bba..d5e197ca2 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -32,10 +32,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/http" ) -func cbURL() string { - return config.Config.Callback.CallbackUrl -} - func toCommonCallback(ctx context.Context, msg *pbchat.SendMsgReq, command string) cbapi.CommonCallbackReq { return cbapi.CommonCallbackReq{ SendID: msg.MsgData.SendID, @@ -69,8 +65,8 @@ func GetContent(msg *sdkws.MsgData) string { } } -func callbackBeforeSendSingleMsg(ctx context.Context, msg *pbchat.SendMsgReq) error { - if !config.Config.Callback.CallbackBeforeSendSingleMsg.Enable || msg.MsgData.ContentType == constant.Typing { +func callbackBeforeSendSingleMsg(ctx context.Context, globalConfig *config.GlobalConfig, msg *pbchat.SendMsgReq) error { + if !globalConfig.Callback.CallbackBeforeSendSingleMsg.Enable || msg.MsgData.ContentType == constant.Typing { return nil } req := &cbapi.CallbackBeforeSendSingleMsgReq{ @@ -78,14 +74,14 @@ func callbackBeforeSendSingleMsg(ctx context.Context, msg *pbchat.SendMsgReq) er RecvID: msg.MsgData.RecvID, } resp := &cbapi.CallbackBeforeSendSingleMsgResp{} - if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackBeforeSendSingleMsg); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackBeforeSendSingleMsg); err != nil { return err } return nil } -func callbackAfterSendSingleMsg(ctx context.Context, msg *pbchat.SendMsgReq) error { - if !config.Config.Callback.CallbackAfterSendSingleMsg.Enable || msg.MsgData.ContentType == constant.Typing { +func callbackAfterSendSingleMsg(ctx context.Context, globalConfig *config.GlobalConfig, msg *pbchat.SendMsgReq) error { + if !globalConfig.Callback.CallbackAfterSendSingleMsg.Enable || msg.MsgData.ContentType == constant.Typing { return nil } req := &cbapi.CallbackAfterSendSingleMsgReq{ @@ -93,14 +89,14 @@ func callbackAfterSendSingleMsg(ctx context.Context, msg *pbchat.SendMsgReq) err RecvID: msg.MsgData.RecvID, } resp := &cbapi.CallbackAfterSendSingleMsgResp{} - if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackAfterSendSingleMsg); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackAfterSendSingleMsg); err != nil { return err } return nil } -func callbackBeforeSendGroupMsg(ctx context.Context, msg *pbchat.SendMsgReq) error { - if !config.Config.Callback.CallbackBeforeSendGroupMsg.Enable || msg.MsgData.ContentType == constant.Typing { +func callbackBeforeSendGroupMsg(ctx context.Context, globalConfig *config.GlobalConfig, msg *pbchat.SendMsgReq) error { + if !globalConfig.Callback.CallbackBeforeSendGroupMsg.Enable || msg.MsgData.ContentType == constant.Typing { return nil } req := &cbapi.CallbackBeforeSendGroupMsgReq{ @@ -108,14 +104,14 @@ func callbackBeforeSendGroupMsg(ctx context.Context, msg *pbchat.SendMsgReq) err GroupID: msg.MsgData.GroupID, } resp := &cbapi.CallbackBeforeSendGroupMsgResp{} - if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackBeforeSendGroupMsg); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackBeforeSendGroupMsg); err != nil { return err } return nil } -func callbackAfterSendGroupMsg(ctx context.Context, msg *pbchat.SendMsgReq) error { - if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable || msg.MsgData.ContentType == constant.Typing { +func callbackAfterSendGroupMsg(ctx context.Context, globalConfig *config.GlobalConfig, msg *pbchat.SendMsgReq) error { + if !globalConfig.Callback.CallbackAfterSendGroupMsg.Enable || msg.MsgData.ContentType == constant.Typing { return nil } req := &cbapi.CallbackAfterSendGroupMsgReq{ @@ -123,21 +119,21 @@ func callbackAfterSendGroupMsg(ctx context.Context, msg *pbchat.SendMsgReq) erro GroupID: msg.MsgData.GroupID, } resp := &cbapi.CallbackAfterSendGroupMsgResp{} - if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackAfterSendGroupMsg); err != nil { return err } return nil } -func callbackMsgModify(ctx context.Context, msg *pbchat.SendMsgReq) error { - if !config.Config.Callback.CallbackMsgModify.Enable || msg.MsgData.ContentType != constant.Text { +func callbackMsgModify(ctx context.Context, globalConfig *config.GlobalConfig, msg *pbchat.SendMsgReq) error { + if !globalConfig.Callback.CallbackMsgModify.Enable || msg.MsgData.ContentType != constant.Text { return nil } req := &cbapi.CallbackMsgModifyCommandReq{ CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackMsgModifyCommand), } resp := &cbapi.CallbackMsgModifyCommandResp{} - if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackMsgModify); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackMsgModify); err != nil { return err } if resp.Content != nil { @@ -162,34 +158,34 @@ func callbackMsgModify(ctx context.Context, msg *pbchat.SendMsgReq) error { log.ZDebug(ctx, "callbackMsgModify", "msg", msg.MsgData) return nil } -func CallbackGroupMsgRead(ctx context.Context, req *cbapi.CallbackGroupMsgReadReq) error { - if !config.Config.Callback.CallbackGroupMsgRead.Enable || req.ContentType != constant.Text { +func CallbackGroupMsgRead(ctx context.Context, globalConfig *config.GlobalConfig, req *cbapi.CallbackGroupMsgReadReq) error { + if !globalConfig.Callback.CallbackGroupMsgRead.Enable || req.ContentType != constant.Text { return nil } req.CallbackCommand = cbapi.CallbackGroupMsgReadCommand resp := &cbapi.CallbackGroupMsgReadResp{} - if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackMsgModify); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackMsgModify); err != nil { return err } return nil } -func CallbackSingleMsgRead(ctx context.Context, req *cbapi.CallbackSingleMsgReadReq) error { - if !config.Config.Callback.CallbackSingleMsgRead.Enable || req.ContentType != constant.Text { +func CallbackSingleMsgRead(ctx context.Context, globalConfig *config.GlobalConfig, req *cbapi.CallbackSingleMsgReadReq) error { + if !globalConfig.Callback.CallbackSingleMsgRead.Enable || req.ContentType != constant.Text { return nil } req.CallbackCommand = cbapi.CallbackSingleMsgRead resp := &cbapi.CallbackSingleMsgReadResp{} - if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackMsgModify); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackMsgModify); err != nil { return err } return nil } -func CallbackAfterRevokeMsg(ctx context.Context, req *pbchat.RevokeMsgReq) error { - if !config.Config.Callback.CallbackAfterRevokeMsg.Enable { +func CallbackAfterRevokeMsg(ctx context.Context, globalConfig *config.GlobalConfig, req *pbchat.RevokeMsgReq) error { + if !globalConfig.Callback.CallbackAfterRevokeMsg.Enable { return nil } callbackReq := &cbapi.CallbackAfterRevokeMsgReq{ @@ -199,7 +195,7 @@ func CallbackAfterRevokeMsg(ctx context.Context, req *pbchat.RevokeMsgReq) error UserID: req.UserID, } resp := &cbapi.CallbackAfterRevokeMsgResp{} - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackAfterRevokeMsg); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, callbackReq, resp, globalConfig.Callback.CallbackAfterRevokeMsg); err != nil { return err } return nil diff --git a/internal/rpc/msg/message_interceptor.go b/internal/rpc/msg/message_interceptor.go index a98c31219..d7421215d 100644 --- a/internal/rpc/msg/message_interceptor.go +++ b/internal/rpc/msg/message_interceptor.go @@ -25,17 +25,17 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) -type MessageInterceptorFunc func(ctx context.Context, req *msg.SendMsgReq) (*sdkws.MsgData, error) +type MessageInterceptorFunc func(ctx context.Context, globalConfig *config.GlobalConfig, req *msg.SendMsgReq) (*sdkws.MsgData, error) -func MessageHasReadEnabled(_ context.Context, req *msg.SendMsgReq) (*sdkws.MsgData, error) { +func MessageHasReadEnabled(_ context.Context, globalConfig *config.GlobalConfig, req *msg.SendMsgReq) (*sdkws.MsgData, error) { switch { case req.MsgData.ContentType == constant.HasReadReceipt && req.MsgData.SessionType == constant.SingleChatType: - if !config.Config.SingleMessageHasReadReceiptEnable { + if !globalConfig.SingleMessageHasReadReceiptEnable { return nil, errs.ErrMessageHasReadDisable.Wrap() } return req.MsgData, nil case req.MsgData.ContentType == constant.HasReadReceipt && req.MsgData.SessionType == constant.SuperGroupChatType: - if !config.Config.GroupMessageHasReadReceiptEnable { + if !globalConfig.GroupMessageHasReadReceiptEnable { return nil, errs.ErrMessageHasReadDisable.Wrap() } return req.MsgData, nil diff --git a/internal/rpc/msg/revoke.go b/internal/rpc/msg/revoke.go index 0a24753b2..ad60defa6 100644 --- a/internal/rpc/msg/revoke.go +++ b/internal/rpc/msg/revoke.go @@ -29,7 +29,6 @@ import ( "github.com/OpenIMSDK/tools/mcontext" "github.com/OpenIMSDK/tools/utils" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" ) @@ -112,11 +111,11 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. } revokerUserID := mcontext.GetOpUserID(ctx) var flag bool - if len(config.Config.Manager.UserID) > 0 { - flag = utils.Contain(revokerUserID, config.Config.Manager.UserID...) + if len(m.config.Manager.UserID) > 0 { + flag = utils.Contain(revokerUserID, m.config.Manager.UserID...) } - if len(config.Config.Manager.UserID) == 0 && len(config.Config.IMAdmin.UserID) > 0 { - flag = utils.Contain(revokerUserID, config.Config.IMAdmin.UserID...) + if len(m.config.Manager.UserID) == 0 && len(m.config.IMAdmin.UserID) > 0 { + flag = utils.Contain(revokerUserID, m.config.IMAdmin.UserID...) } tips := sdkws.RevokeMsgTips{ RevokerUserID: revokerUserID, @@ -136,7 +135,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. if err := m.notificationSender.NotificationWithSesstionType(ctx, req.UserID, recvID, constant.MsgRevokeNotification, msgs[0].SessionType, &tips); err != nil { return nil, err } - if err = CallbackAfterRevokeMsg(ctx, req); err != nil { + if err = CallbackAfterRevokeMsg(ctx, m.config, req); err != nil { return nil, err } return &msg.RevokeMsgResp{}, nil diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index 630b74a4a..f28e44c44 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -34,7 +34,7 @@ import ( func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (resp *pbmsg.SendMsgResp, error error) { resp = &pbmsg.SendMsgResp{} if req.MsgData != nil { - flag := isMessageHasReadEnabled(req.MsgData) + flag := isMessageHasReadEnabled(req.MsgData, m.config) if !flag { return nil, errs.ErrMessageHasReadDisable.Wrap() } @@ -62,11 +62,11 @@ func (m *msgServer) sendMsgSuperGroupChat( prommetrics.GroupChatMsgProcessFailedCounter.Inc() return nil, err } - if err = callbackBeforeSendGroupMsg(ctx, req); err != nil { + if err = callbackBeforeSendGroupMsg(ctx, m.config, req); err != nil { return nil, err } - if err := callbackMsgModify(ctx, req); err != nil { + if err := callbackMsgModify(ctx, m.config, req); err != nil { return nil, err } err = m.MsgDatabase.MsgToMQ(ctx, utils.GenConversationUniqueKeyForGroup(req.MsgData.GroupID), req.MsgData) @@ -76,7 +76,7 @@ func (m *msgServer) sendMsgSuperGroupChat( if req.MsgData.ContentType == constant.AtText { go m.setConversationAtInfo(ctx, req.MsgData) } - if err = callbackAfterSendGroupMsg(ctx, req); err != nil { + if err = callbackAfterSendGroupMsg(ctx, m.config, req); err != nil { log.ZWarn(ctx, "CallbackAfterSendGroupMsg", err) } prommetrics.GroupChatMsgProcessSuccessCounter.Inc() @@ -165,18 +165,18 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq prommetrics.SingleChatMsgProcessFailedCounter.Inc() return nil, nil } else { - if err = callbackBeforeSendSingleMsg(ctx, req); err != nil { + if err = callbackBeforeSendSingleMsg(ctx, m.config, req); err != nil { return nil, err } - if err := callbackMsgModify(ctx, req); err != nil { + if err := callbackMsgModify(ctx, m.config, req); err != nil { return nil, err } if err := m.MsgDatabase.MsgToMQ(ctx, utils.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil { prommetrics.SingleChatMsgProcessFailedCounter.Inc() return nil, err } - err = callbackAfterSendSingleMsg(ctx, req) + err = callbackAfterSendSingleMsg(ctx, m.config, req) if err != nil { log.ZWarn(ctx, "CallbackAfterSendSingleMsg", err, "req", req) } diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index bbf8149a7..713329412 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -53,9 +53,9 @@ func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorF m.Handlers = append(m.Handlers, interceptorFunc...) } -func (m *msgServer) execInterceptorHandler(ctx context.Context, req *msg.SendMsgReq) error { +func (m *msgServer) execInterceptorHandler(ctx context.Context, config *config.GlobalConfig, req *msg.SendMsgReq) error { for _, handler := range m.Handlers { - msgData, err := handler(ctx, req) + msgData, err := handler(ctx, config, req) if err != nil { return err } @@ -65,11 +65,11 @@ func (m *msgServer) execInterceptorHandler(ctx context.Context, req *msg.SendMsg } func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { - rdb, err := cache.NewRedis() + rdb, err := cache.NewRedis(config) if err != nil { return err } - mongo, err := unrelation.NewMongo() + mongo, err := unrelation.NewMongo(config) if err != nil { return err } @@ -77,11 +77,11 @@ func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryReg return err } cacheModel := cache.NewMsgCacheModel(rdb) - msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase()) - conversationClient := rpcclient.NewConversationRpcClient(client) - userRpcClient := rpcclient.NewUserRpcClient(client) - groupRpcClient := rpcclient.NewGroupRpcClient(client) - friendRpcClient := rpcclient.NewFriendRpcClient(client) + msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase(config.Mongo.Database)) + conversationClient := rpcclient.NewConversationRpcClient(client, config) + userRpcClient := rpcclient.NewUserRpcClient(client, config) + groupRpcClient := rpcclient.NewGroupRpcClient(client, config) + friendRpcClient := rpcclient.NewFriendRpcClient(client, config) msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, cacheModel) if err != nil { return err diff --git a/internal/rpc/msg/utils.go b/internal/rpc/msg/utils.go index e45d7b395..e2ec23f2e 100644 --- a/internal/rpc/msg/utils.go +++ b/internal/rpc/msg/utils.go @@ -24,16 +24,16 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) -func isMessageHasReadEnabled(msgData *sdkws.MsgData) bool { +func isMessageHasReadEnabled(msgData *sdkws.MsgData, config *config.GlobalConfig) bool { switch { case msgData.ContentType == constant.HasReadReceipt && msgData.SessionType == constant.SingleChatType: - if config.Config.SingleMessageHasReadReceiptEnable { + if config.SingleMessageHasReadReceiptEnable { return true } else { return false } case msgData.ContentType == constant.HasReadReceipt && msgData.SessionType == constant.SuperGroupChatType: - if config.Config.GroupMessageHasReadReceiptEnable { + if config.GroupMessageHasReadReceiptEnable { return true } else { return false diff --git a/internal/rpc/msg/verify.go b/internal/rpc/msg/verify.go index 0080b6fdb..d789bdcb5 100644 --- a/internal/rpc/msg/verify.go +++ b/internal/rpc/msg/verify.go @@ -25,8 +25,6 @@ import ( "github.com/OpenIMSDK/protocol/sdkws" "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/utils" - - "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) var ExcludeContentType = []int{constant.HasReadReceipt} @@ -51,10 +49,10 @@ type MessageRevoked struct { func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgReq) error { switch data.MsgData.SessionType { case constant.SingleChatType: - if len(config.Config.Manager.UserID) > 0 && utils.IsContain(data.MsgData.SendID, config.Config.Manager.UserID) { + if len(m.config.Manager.UserID) > 0 && utils.IsContain(data.MsgData.SendID, m.config.Manager.UserID) { return nil } - if utils.IsContain(data.MsgData.SendID, config.Config.IMAdmin.UserID) { + if utils.IsContain(data.MsgData.SendID, m.config.IMAdmin.UserID) { return nil } if data.MsgData.ContentType <= constant.NotificationEnd && @@ -68,7 +66,7 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe if black { return errs.ErrBlockedByPeer.Wrap() } - if *config.Config.MessageVerify.FriendVerify { + if *m.config.MessageVerify.FriendVerify { friend, err := m.friend.IsFriend(ctx, data.MsgData.SendID, data.MsgData.RecvID) if err != nil { return err @@ -91,10 +89,10 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe if groupInfo.GroupType == constant.SuperGroup { return nil } - if len(config.Config.Manager.UserID) > 0 && utils.IsContain(data.MsgData.SendID, config.Config.Manager.UserID) { + if len(m.config.Manager.UserID) > 0 && utils.IsContain(data.MsgData.SendID, m.config.Manager.UserID) { return nil } - if utils.IsContain(data.MsgData.SendID, config.Config.IMAdmin.UserID) { + if utils.IsContain(data.MsgData.SendID, m.config.IMAdmin.UserID) { return nil } if data.MsgData.ContentType <= constant.NotificationEnd && diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index dac59da73..f1e170f44 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -40,15 +40,15 @@ import ( ) func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { - mongo, err := unrelation.NewMongo() + mongo, err := unrelation.NewMongo(config) if err != nil { return err } - logdb, err := mgo.NewLogMongo(mongo.GetDatabase()) + logdb, err := mgo.NewLogMongo(mongo.GetDatabase(config.Mongo.Database)) if err != nil { return err } - s3db, err := mgo.NewS3Mongo(mongo.GetDatabase()) + s3db, err := mgo.NewS3Mongo(mongo.GetDatabase(config.Mongo.Database)) if err != nil { return err } @@ -63,7 +63,7 @@ func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryReg apiURL += "/" } apiURL += "object/" - rdb, err := cache.NewRedis() + rdb, err := cache.NewRedis(config) if err != nil { return err } @@ -86,7 +86,7 @@ func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryReg third.RegisterThirdServer(server, &thirdServer{ apiURL: apiURL, thirdDatabase: controller.NewThirdDatabase(cache.NewMsgCacheModel(rdb), logdb), - userRpcClient: rpcclient.NewUserRpcClient(client), + userRpcClient: rpcclient.NewUserRpcClient(client, config), s3dataBase: controller.NewS3Database(rdb, o, s3db), defaultExpire: time.Hour * 24 * 7, config: config, diff --git a/internal/rpc/user/callback.go b/internal/rpc/user/callback.go index 5276946a4..e8e23a2c4 100644 --- a/internal/rpc/user/callback.go +++ b/internal/rpc/user/callback.go @@ -25,8 +25,8 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/http" ) -func CallbackBeforeUpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInfoReq) error { - if !config.Config.Callback.CallbackBeforeUpdateUserInfo.Enable { +func CallbackBeforeUpdateUserInfo(ctx context.Context, globalConfig *config.GlobalConfig, req *pbuser.UpdateUserInfoReq) error { + if !globalConfig.Callback.CallbackBeforeUpdateUserInfo.Enable { return nil } cbReq := &cbapi.CallbackBeforeUpdateUserInfoReq{ @@ -36,7 +36,7 @@ func CallbackBeforeUpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInf Nickname: &req.UserInfo.Nickname, } resp := &cbapi.CallbackBeforeUpdateUserInfoResp{} - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeUpdateUserInfo); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackBeforeUpdateUserInfo); err != nil { return err } utils.NotNilReplace(&req.UserInfo.FaceURL, resp.FaceURL) @@ -44,8 +44,8 @@ func CallbackBeforeUpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInf utils.NotNilReplace(&req.UserInfo.Nickname, resp.Nickname) return nil } -func CallbackAfterUpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInfoReq) error { - if !config.Config.Callback.CallbackAfterUpdateUserInfo.Enable { +func CallbackAfterUpdateUserInfo(ctx context.Context, globalConfig *config.GlobalConfig, req *pbuser.UpdateUserInfoReq) error { + if !globalConfig.Callback.CallbackAfterUpdateUserInfo.Enable { return nil } cbReq := &cbapi.CallbackAfterUpdateUserInfoReq{ @@ -55,13 +55,13 @@ func CallbackAfterUpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInfo Nickname: req.UserInfo.Nickname, } resp := &cbapi.CallbackAfterUpdateUserInfoResp{} - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeUpdateUserInfo); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackBeforeUpdateUserInfo); err != nil { return err } return nil } -func CallbackBeforeUpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUserInfoExReq) error { - if !config.Config.Callback.CallbackBeforeUpdateUserInfoEx.Enable { +func CallbackBeforeUpdateUserInfoEx(ctx context.Context, globalConfig *config.GlobalConfig, req *pbuser.UpdateUserInfoExReq) error { + if !globalConfig.Callback.CallbackBeforeUpdateUserInfoEx.Enable { return nil } cbReq := &cbapi.CallbackBeforeUpdateUserInfoExReq{ @@ -71,7 +71,7 @@ func CallbackBeforeUpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUserI Nickname: req.UserInfo.Nickname, } resp := &cbapi.CallbackBeforeUpdateUserInfoExResp{} - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeUpdateUserInfoEx); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackBeforeUpdateUserInfoEx); err != nil { return err } utils.NotNilReplace(req.UserInfo.FaceURL, resp.FaceURL) @@ -79,8 +79,8 @@ func CallbackBeforeUpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUserI utils.NotNilReplace(req.UserInfo.Nickname, resp.Nickname) return nil } -func CallbackAfterUpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUserInfoExReq) error { - if !config.Config.Callback.CallbackAfterUpdateUserInfoEx.Enable { +func CallbackAfterUpdateUserInfoEx(ctx context.Context, globalConfig *config.GlobalConfig, req *pbuser.UpdateUserInfoExReq) error { + if !globalConfig.Callback.CallbackAfterUpdateUserInfoEx.Enable { return nil } cbReq := &cbapi.CallbackAfterUpdateUserInfoExReq{ @@ -90,14 +90,14 @@ func CallbackAfterUpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUserIn Nickname: req.UserInfo.Nickname, } resp := &cbapi.CallbackAfterUpdateUserInfoExResp{} - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeUpdateUserInfoEx); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackBeforeUpdateUserInfoEx); err != nil { return err } return nil } -func CallbackBeforeUserRegister(ctx context.Context, req *pbuser.UserRegisterReq) error { - if !config.Config.Callback.CallbackBeforeUserRegister.Enable { +func CallbackBeforeUserRegister(ctx context.Context, globalConfig *config.GlobalConfig, req *pbuser.UserRegisterReq) error { + if !globalConfig.Callback.CallbackBeforeUserRegister.Enable { return nil } cbReq := &cbapi.CallbackBeforeUserRegisterReq{ @@ -107,7 +107,7 @@ func CallbackBeforeUserRegister(ctx context.Context, req *pbuser.UserRegisterReq } resp := &cbapi.CallbackBeforeUserRegisterResp{} - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeUpdateUserInfo); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackBeforeUpdateUserInfo); err != nil { return err } if len(resp.Users) != 0 { @@ -116,8 +116,8 @@ func CallbackBeforeUserRegister(ctx context.Context, req *pbuser.UserRegisterReq return nil } -func CallbackAfterUserRegister(ctx context.Context, req *pbuser.UserRegisterReq) error { - if !config.Config.Callback.CallbackAfterUserRegister.Enable { +func CallbackAfterUserRegister(ctx context.Context, globalConfig *config.GlobalConfig, req *pbuser.UserRegisterReq) error { + if !globalConfig.Callback.CallbackAfterUserRegister.Enable { return nil } cbReq := &cbapi.CallbackAfterUserRegisterReq{ @@ -127,7 +127,7 @@ func CallbackAfterUserRegister(ctx context.Context, req *pbuser.UserRegisterReq) } resp := &cbapi.CallbackAfterUserRegisterResp{} - if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackAfterUpdateUserInfo); err != nil { + if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackAfterUpdateUserInfo); err != nil { return err } return nil diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 288022254..8e898f50d 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -68,31 +68,31 @@ func (s *userServer) GetGroupOnlineUser(ctx context.Context, req *pbuser.GetGrou } func Start(config *config.GlobalConfig, client registry.SvcDiscoveryRegistry, server *grpc.Server) error { - rdb, err := cache.NewRedis() + rdb, err := cache.NewRedis(config) if err != nil { return err } - mongo, err := unrelation.NewMongo() + mongo, err := unrelation.NewMongo(config) if err != nil { return err } users := make([]*tablerelation.UserModel, 0) if len(config.IMAdmin.UserID) != len(config.IMAdmin.Nickname) { - return errors.New("len(config.Config.AppNotificationAdmin.AppManagerUid) != len(config.Config.AppNotificationAdmin.Nickname)") + return errors.New("len(s.config.AppNotificationAdmin.AppManagerUid) != len(s.config.AppNotificationAdmin.Nickname)") } for k, v := range config.IMAdmin.UserID { users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.IMAdmin.Nickname[k], AppMangerLevel: constant.AppNotificationAdmin}) } - userDB, err := mgo.NewUserMongo(mongo.GetDatabase()) + userDB, err := mgo.NewUserMongo(mongo.GetDatabase(config.Mongo.Database)) if err != nil { return err } cache := cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt()) - userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase()) + userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase(config.Mongo.Database)) database := controller.NewUserDatabase(userDB, cache, tx.NewMongo(mongo.GetClient()), userMongoDB) - friendRpcClient := rpcclient.NewFriendRpcClient(client) - groupRpcClient := rpcclient.NewGroupRpcClient(client) - msgRpcClient := rpcclient.NewMessageRpcClient(client) + friendRpcClient := rpcclient.NewFriendRpcClient(client, config) + groupRpcClient := rpcclient.NewGroupRpcClient(client, config) + msgRpcClient := rpcclient.NewMessageRpcClient(client, config) u := &userServer{ UserDatabase: database, RegisterCenter: client, @@ -125,7 +125,7 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI if err != nil { return nil, err } - if err := CallbackBeforeUpdateUserInfo(ctx, req); err != nil { + if err := CallbackBeforeUpdateUserInfo(ctx, s.config, req); err != nil { return nil, err } data := convert.UserPb2DBMap(req.UserInfo) @@ -145,7 +145,7 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI for _, friendID := range friends { s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID) } - if err := CallbackAfterUpdateUserInfo(ctx, req); err != nil { + if err := CallbackAfterUpdateUserInfo(ctx, s.config, req); err != nil { return nil, err } if err := s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil { @@ -160,7 +160,7 @@ func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUse return nil, err } - if err = CallbackBeforeUpdateUserInfoEx(ctx, req); err != nil { + if err = CallbackBeforeUpdateUserInfoEx(ctx, s.config, req); err != nil { return nil, err } data := convert.UserPb2DBMapEx(req.UserInfo) @@ -180,7 +180,7 @@ func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUse for _, friendID := range friends { s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID) } - if err := CallbackAfterUpdateUserInfoEx(ctx, req); err != nil { + if err := CallbackAfterUpdateUserInfoEx(ctx, s.config, req); err != nil { return nil, err } if err := s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil { @@ -254,8 +254,8 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR if len(req.Users) == 0 { return nil, errs.ErrArgs.Wrap("users is empty") } - if req.Secret != config.Config.Secret { - log.ZDebug(ctx, "UserRegister", config.Config.Secret, req.Secret) + if req.Secret != s.config.Secret { + log.ZDebug(ctx, "UserRegister", s.config.Secret, req.Secret) return nil, errs.ErrNoPermission.Wrap("secret invalid") } if utils.DuplicateAny(req.Users, func(e *sdkws.UserInfo) string { return e.UserID }) { @@ -278,7 +278,7 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR if exist { return nil, errs.ErrRegisteredAlready.Wrap("userID registered already") } - if err := CallbackBeforeUserRegister(ctx, req); err != nil { + if err := CallbackBeforeUserRegister(ctx, s.config, req); err != nil { return nil, err } now := time.Now() @@ -298,7 +298,7 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR return nil, err } - if err := CallbackAfterUserRegister(ctx, req); err != nil { + if err := CallbackAfterUserRegister(ctx, s.config, req); err != nil { return nil, err } return resp, nil @@ -681,7 +681,7 @@ func (s *userServer) userModelToResp(users []*relation.UserModel, pagination pag accounts := make([]*pbuser.NotificationAccountInfo, 0) var total int64 for _, v := range users { - if v.AppMangerLevel == constant.AppNotificationAdmin && !utils.IsContain(v.UserID, config.Config.IMAdmin.UserID) { + if v.AppMangerLevel == constant.AppNotificationAdmin && !utils.IsContain(v.UserID, s.config.IMAdmin.UserID) { temp := &pbuser.NotificationAccountInfo{ UserID: v.UserID, FaceURL: v.FaceURL, diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 6d0d9fdf2..d6f042513 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -34,14 +34,14 @@ import ( func StartTask(config *config.GlobalConfig) error { fmt.Println("cron task start, config", config.ChatRecordsClearTime) - msgTool, err := InitMsgTool() + msgTool, err := InitMsgTool(config) if err != nil { return err } msgTool.convertTools() - rdb, err := cache.NewRedis() + rdb, err := cache.NewRedis(config) if err != nil { return err } diff --git a/internal/tools/cron_task_test.go b/internal/tools/cron_task_test.go index 28bc2c945..d88020e9a 100644 --- a/internal/tools/cron_task_test.go +++ b/internal/tools/cron_task_test.go @@ -61,7 +61,7 @@ func TestCronWrapFunc(t *testing.T) { start := time.Now() key := fmt.Sprintf("cron-%v", rand.Int31()) crontab := cron.New(cron.WithSeconds()) - crontab.AddFunc("*/1 * * * * *", cronWrapFunc(rdb, key, cb)) + crontab.AddFunc("*/1 * * * * *", cronWrapFunc(config.NewGlobalConfig(), rdb, key, cb)) crontab.Start() <-done @@ -80,10 +80,10 @@ func TestCronWrapFuncWithNetlock(t *testing.T) { crontab := cron.New(cron.WithSeconds()) key := fmt.Sprintf("cron-%v", rand.Int31()) - crontab.AddFunc("*/1 * * * * *", cronWrapFunc(rdb, key, func() { + crontab.AddFunc("*/1 * * * * *", cronWrapFunc(config.NewGlobalConfig(), rdb, key, func() { done <- "host1" })) - crontab.AddFunc("*/1 * * * * *", cronWrapFunc(rdb, key, func() { + crontab.AddFunc("*/1 * * * * *", cronWrapFunc(config.NewGlobalConfig(), rdb, key, func() { done <- "host2" })) crontab.Start() diff --git a/internal/tools/msg.go b/internal/tools/msg.go index 1ec1e03a2..53e8b4a39 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -52,10 +52,12 @@ type MsgTool struct { userDatabase controller.UserDatabase groupDatabase controller.GroupDatabase msgNotificationSender *notification.MsgNotificationSender + Config *config.GlobalConfig } func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controller.UserDatabase, - groupDatabase controller.GroupDatabase, conversationDatabase controller.ConversationDatabase, msgNotificationSender *notification.MsgNotificationSender, + groupDatabase controller.GroupDatabase, conversationDatabase controller.ConversationDatabase, + msgNotificationSender *notification.MsgNotificationSender, config *config.GlobalConfig, ) *MsgTool { return &MsgTool{ msgDatabase: msgDatabase, @@ -63,32 +65,33 @@ func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controlle groupDatabase: groupDatabase, conversationDatabase: conversationDatabase, msgNotificationSender: msgNotificationSender, + Config: config, } } -func InitMsgTool() (*MsgTool, error) { - rdb, err := cache.NewRedis() +func InitMsgTool(config *config.GlobalConfig) (*MsgTool, error) { + rdb, err := cache.NewRedis(config) if err != nil { return nil, err } - mongo, err := unrelation.NewMongo() + mongo, err := unrelation.NewMongo(config) if err != nil { return nil, err } - discov, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery) + discov, err := kdisc.NewDiscoveryRegister(config.Envs.Discovery) if err != nil { return nil, err } discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) - userDB, err := mgo.NewUserMongo(mongo.GetDatabase()) + userDB, err := mgo.NewUserMongo(mongo.GetDatabase(config.Mongo.Database)) if err != nil { return nil, err } - msgDatabase, err := controller.InitCommonMsgDatabase(rdb, mongo.GetDatabase()) + msgDatabase, err := controller.InitCommonMsgDatabase(rdb, mongo.GetDatabase(config.Mongo.Database)) if err != nil { return nil, err } - userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase()) + userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase(config.Mongo.Database)) ctxTx := tx.NewMongo(mongo.GetClient()) userDatabase := controller.NewUserDatabase( userDB, @@ -96,19 +99,19 @@ func InitMsgTool() (*MsgTool, error) { ctxTx, userMongoDB, ) - groupDB, err := mgo.NewGroupMongo(mongo.GetDatabase()) + groupDB, err := mgo.NewGroupMongo(mongo.GetDatabase(config.Mongo.Database)) if err != nil { return nil, err } - groupMemberDB, err := mgo.NewGroupMember(mongo.GetDatabase()) + groupMemberDB, err := mgo.NewGroupMember(mongo.GetDatabase(config.Mongo.Database)) if err != nil { return nil, err } - groupRequestDB, err := mgo.NewGroupRequestMgo(mongo.GetDatabase()) + groupRequestDB, err := mgo.NewGroupRequestMgo(mongo.GetDatabase(config.Mongo.Database)) if err != nil { return nil, err } - conversationDB, err := mgo.NewConversationMongo(mongo.GetDatabase()) + conversationDB, err := mgo.NewConversationMongo(mongo.GetDatabase(config.Mongo.Database)) if err != nil { return nil, err } @@ -118,9 +121,9 @@ func InitMsgTool() (*MsgTool, error) { cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), ctxTx, ) - msgRpcClient := rpcclient.NewMessageRpcClient(discov) + msgRpcClient := rpcclient.NewMessageRpcClient(discov, config) msgNotificationSender := notification.NewMsgNotificationSender(rpcclient.WithRpcClient(&msgRpcClient)) - msgTool := NewMsgTool(msgDatabase, userDatabase, groupDatabase, conversationDatabase, msgNotificationSender) + msgTool := NewMsgTool(msgDatabase, userDatabase, groupDatabase, conversationDatabase, msgNotificationSender, config) return msgTool, nil } @@ -182,8 +185,8 @@ func (c *MsgTool) AllConversationClearMsgAndFixSeq() { func (c *MsgTool) ClearConversationsMsg(ctx context.Context, conversationIDs []string) { for _, conversationID := range conversationIDs { - if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, int64(config.Config.RetainChatRecords*24*60*60)); err != nil { - log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "conversationID", conversationID, "DBRetainChatRecords", config.Config.RetainChatRecords) + if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, int64(c.config.RetainChatRecords*24*60*60)); err != nil { + log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "conversationID", conversationID, "DBRetainChatRecords", c.config.RetainChatRecords) } if err := c.checkMaxSeq(ctx, conversationID); err != nil { log.ZError(ctx, "fixSeq failed", err, "conversationID", conversationID) diff --git a/pkg/authverify/token.go b/pkg/authverify/token.go index ce2dca0c2..08b7d1b37 100644 --- a/pkg/authverify/token.go +++ b/pkg/authverify/token.go @@ -27,18 +27,18 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) -func Secret() jwt.Keyfunc { +func Secret(secret string) jwt.Keyfunc { return func(token *jwt.Token) (any, error) { - return []byte(config.Config.Secret), nil + return []byte(secret), nil } } -func CheckAccessV3(ctx context.Context, ownerUserID string) (err error) { +func CheckAccessV3(ctx context.Context, ownerUserID string, config *config.GlobalConfig) (err error) { opUserID := mcontext.GetOpUserID(ctx) - if len(config.Config.Manager.UserID) > 0 && utils.IsContain(opUserID, config.Config.Manager.UserID) { + if len(config.Manager.UserID) > 0 && utils.IsContain(opUserID, config.Manager.UserID) { return nil } - if utils.IsContain(opUserID, config.Config.IMAdmin.UserID) { + if utils.IsContain(opUserID, config.IMAdmin.UserID) { return nil } if opUserID == ownerUserID { @@ -47,40 +47,40 @@ func CheckAccessV3(ctx context.Context, ownerUserID string) (err error) { return errs.ErrNoPermission.Wrap("ownerUserID", ownerUserID) } -func IsAppManagerUid(ctx context.Context) bool { - return (len(config.Config.Manager.UserID) > 0 && utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.Manager.UserID)) || - utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.IMAdmin.UserID) +func IsAppManagerUid(ctx context.Context, config *config.GlobalConfig) bool { + return (len(config.Manager.UserID) > 0 && utils.IsContain(mcontext.GetOpUserID(ctx), config.Manager.UserID)) || + utils.IsContain(mcontext.GetOpUserID(ctx), config.IMAdmin.UserID) } -func CheckAdmin(ctx context.Context) error { - if len(config.Config.Manager.UserID) > 0 && utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.Manager.UserID) { +func CheckAdmin(ctx context.Context, config *config.GlobalConfig) error { + if len(config.Manager.UserID) > 0 && utils.IsContain(mcontext.GetOpUserID(ctx), config.Manager.UserID) { return nil } - if utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.IMAdmin.UserID) { + if utils.IsContain(mcontext.GetOpUserID(ctx), config.IMAdmin.UserID) { return nil } return errs.ErrNoPermission.Wrap(fmt.Sprintf("user %s is not admin userID", mcontext.GetOpUserID(ctx))) } -func CheckIMAdmin(ctx context.Context) error { - if utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.IMAdmin.UserID) { +func CheckIMAdmin(ctx context.Context, config *config.GlobalConfig) error { + if utils.IsContain(mcontext.GetOpUserID(ctx), config.IMAdmin.UserID) { return nil } - if len(config.Config.Manager.UserID) > 0 && utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.Manager.UserID) { + if len(config.Manager.UserID) > 0 && utils.IsContain(mcontext.GetOpUserID(ctx), config.Manager.UserID) { return nil } return errs.ErrNoPermission.Wrap(fmt.Sprintf("user %s is not CheckIMAdmin userID", mcontext.GetOpUserID(ctx))) } -func ParseRedisInterfaceToken(redisToken any) (*tokenverify.Claims, error) { - return tokenverify.GetClaimFromToken(string(redisToken.([]uint8)), Secret()) +func ParseRedisInterfaceToken(redisToken any, secret string) (*tokenverify.Claims, error) { + return tokenverify.GetClaimFromToken(string(redisToken.([]uint8)), Secret(secret)) } -func IsManagerUserID(opUserID string) bool { - return (len(config.Config.Manager.UserID) > 0 && utils.IsContain(opUserID, config.Config.Manager.UserID)) || utils.IsContain(opUserID, config.Config.IMAdmin.UserID) +func IsManagerUserID(opUserID string, config config.GlobalConfig) bool { + return (len(config.Manager.UserID) > 0 && utils.IsContain(opUserID, config.Manager.UserID)) || utils.IsContain(opUserID, config.IMAdmin.UserID) } -func WsVerifyToken(token, userID string, platformID int) error { - claim, err := tokenverify.GetClaimFromToken(token, Secret()) +func WsVerifyToken(token, userID, secret string, platformID int) error { + claim, err := tokenverify.GetClaimFromToken(token, Secret(secret)) if err != nil { return err } diff --git a/pkg/common/cmd/msg_gateway.go b/pkg/common/cmd/msg_gateway.go index 872804603..3a939fa97 100644 --- a/pkg/common/cmd/msg_gateway.go +++ b/pkg/common/cmd/msg_gateway.go @@ -22,7 +22,6 @@ import ( "github.com/OpenIMSDK/protocol/constant" "github.com/openimsdk/open-im-server/v3/internal/msggateway" - v3config "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) type MsgGatewayCmd struct { @@ -64,13 +63,13 @@ func (m *MsgGatewayCmd) Exec() error { func (m *MsgGatewayCmd) GetPortFromConfig(portType string) int { switch portType { case constant.FlagWsPort: - return v3config.Config.LongConnSvr.OpenImWsPort[0] + return m.config.LongConnSvr.OpenImWsPort[0] case constant.FlagPort: - return v3config.Config.LongConnSvr.OpenImMessageGatewayPort[0] + return m.config.LongConnSvr.OpenImMessageGatewayPort[0] case constant.FlagPrometheusPort: - return v3config.Config.Prometheus.MessageGatewayPrometheusPort[0] + return m.config.Prometheus.MessageGatewayPrometheusPort[0] default: return 0 diff --git a/pkg/common/cmd/msg_transfer.go b/pkg/common/cmd/msg_transfer.go index 75ef087c1..e46b66b52 100644 --- a/pkg/common/cmd/msg_transfer.go +++ b/pkg/common/cmd/msg_transfer.go @@ -16,7 +16,6 @@ package cmd import ( "fmt" - "github.com/OpenIMSDK/protocol/constant" "github.com/spf13/cobra" diff --git a/pkg/common/cmd/msg_utils.go b/pkg/common/cmd/msg_utils.go index cfaf631ec..d73356f1f 100644 --- a/pkg/common/cmd/msg_utils.go +++ b/pkg/common/cmd/msg_utils.go @@ -22,7 +22,7 @@ import ( type MsgUtilsCmd struct { cobra.Command - msgTool *tools.MsgTool + MsgTool *tools.MsgTool } func (m *MsgUtilsCmd) AddUserIDFlag() { @@ -136,7 +136,7 @@ func NewSeqCmd() *SeqCmd { func (s *SeqCmd) GetSeqCmd() *cobra.Command { s.Command.Run = func(cmdLines *cobra.Command, args []string) { - _, err := tools.InitMsgTool() + _, err := tools.InitMsgTool(s.MsgTool.Config) if err != nil { panic(err) } diff --git a/pkg/common/config/parse_test.go b/pkg/common/config/parse_test.go index 38171ec08..6291cd2cf 100644 --- a/pkg/common/config/parse_test.go +++ b/pkg/common/config/parse_test.go @@ -103,13 +103,14 @@ func TestInitConfig(t *testing.T) { tests := []struct { name string args args + config *GlobalConfig wantErr bool }{ // TODO: Add test cases. } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if err := InitConfig(tt.args.configFolderPath); (err != nil) != tt.wantErr { + if err := InitConfig(tt.config, tt.args.configFolderPath); (err != nil) != tt.wantErr { t.Errorf("InitConfig() error = %v, wantErr %v", err, tt.wantErr) } }) diff --git a/pkg/common/db/cache/init_redis.go b/pkg/common/db/cache/init_redis.go index a8189fdbc..f98d84ed0 100644 --- a/pkg/common/db/cache/init_redis.go +++ b/pkg/common/db/cache/init_redis.go @@ -46,7 +46,7 @@ func NewRedis(config *config.GlobalConfig) (redis.UniversalClient, error) { } // Read configuration from environment variables - overrideConfigFromEnv() + overrideConfigFromEnv(config) if len(config.Redis.Address) == 0 { return nil, errs.Wrap(errors.New("redis address is empty")) @@ -87,22 +87,22 @@ func NewRedis(config *config.GlobalConfig) (redis.UniversalClient, error) { } // overrideConfigFromEnv overrides configuration fields with environment variables if present. -func overrideConfigFromEnv() { +func overrideConfigFromEnv(config *config.GlobalConfig) { if envAddr := os.Getenv("REDIS_ADDRESS"); envAddr != "" { if envPort := os.Getenv("REDIS_PORT"); envPort != "" { addresses := strings.Split(envAddr, ",") for i, addr := range addresses { addresses[i] = addr + ":" + envPort } - config.Config.Redis.Address = addresses + config.Redis.Address = addresses } else { - config.Config.Redis.Address = strings.Split(envAddr, ",") + config.Redis.Address = strings.Split(envAddr, ",") } } if envUser := os.Getenv("REDIS_USERNAME"); envUser != "" { - config.Config.Redis.Username = envUser + config.Redis.Username = envUser } if envPass := os.Getenv("REDIS_PASSWORD"); envPass != "" { - config.Config.Redis.Password = envPass + config.Redis.Password = envPass } } diff --git a/pkg/common/db/cache/meta_cache.go b/pkg/common/db/cache/meta_cache.go index 4bc2a046a..1fbb6c3b6 100644 --- a/pkg/common/db/cache/meta_cache.go +++ b/pkg/common/db/cache/meta_cache.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "time" "github.com/OpenIMSDK/tools/mw/specialerror" @@ -130,18 +131,18 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin v, err := rcClient.Fetch2(ctx, key, expire, func() (s string, err error) { t, err = fn(ctx) if err != nil { - return "", err + return "", errs.Wrap(err) } bs, err := json.Marshal(t) if err != nil { - return "", utils.Wrap(err, "") + return "", errs.Wrap(err, "marshal failed") } write = true return string(bs), nil }) if err != nil { - return t, err + return t, errs.Wrap(err) } if write { return t, nil @@ -151,9 +152,8 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin } err = json.Unmarshal([]byte(v), &t) if err != nil { - log.ZError(ctx, "cache json.Unmarshal failed", err, "key", key, "value", v, "expire", expire) - - return t, utils.Wrap(err, "") + errInfo := fmt.Sprintf("cache json.Unmarshal failed, key:%s, value:%s, expire:%s", key, v, expire) + return t, errs.Wrap(err, errInfo) } return t, nil @@ -206,7 +206,7 @@ func batchGetCache2[T any, K comparable]( fns func(ctx context.Context, key K) (T, error), ) ([]T, error) { if len(keys) == 0 { - return nil, nil + return nil, errs.ErrArgs.Wrap("groupID is empty") } res := make([]T, 0, len(keys)) for _, key := range keys { @@ -217,7 +217,7 @@ func batchGetCache2[T any, K comparable]( if errs.ErrRecordNotFound.Is(specialerror.ErrCode(errs.Unwrap(err))) { continue } - return nil, err + return nil, errs.Wrap(err) } res = append(res, val) } diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 5cd3cb22c..3b200c97f 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -127,13 +127,14 @@ type MsgModel interface { UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error } -func NewMsgCacheModel(client redis.UniversalClient) MsgModel { - return &msgCache{rdb: client} +func NewMsgCacheModel(client redis.UniversalClient, config *config.GlobalConfig) MsgModel { + return &msgCache{rdb: client, config: config} } type msgCache struct { metaCache - rdb redis.UniversalClient + rdb redis.UniversalClient + config *config.GlobalConfig } func (c *msgCache) getMaxSeqKey(conversationID string) string { @@ -310,7 +311,7 @@ func (c *msgCache) allMessageCacheKey(conversationID string) string { } func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { - if config.Config.Redis.EnablePipeline { + if c.config.Redis.EnablePipeline { return c.PipeGetMessagesBySeq(ctx, conversationID, seqs) } @@ -411,7 +412,7 @@ func (c *msgCache) ParallelGetMessagesBySeq(ctx context.Context, conversationID } func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) { - if config.Config.Redis.EnablePipeline { + if c.config.Redis.EnablePipeline { return c.PipeSetMessageToCache(ctx, conversationID, msgs) } return c.ParallelSetMessageToCache(ctx, conversationID, msgs) @@ -426,7 +427,7 @@ func (c *msgCache) PipeSetMessageToCache(ctx context.Context, conversationID str } key := c.getMessageCacheKey(conversationID, msg.Seq) - _ = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second) + _ = pipe.Set(ctx, key, s, time.Duration(c.config.MsgCacheTimeout)*time.Second) } results, err := pipe.Exec(ctx) @@ -456,7 +457,7 @@ func (c *msgCache) ParallelSetMessageToCache(ctx context.Context, conversationID } key := c.getMessageCacheKey(conversationID, msg.Seq) - if err := c.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + if err := c.rdb.Set(ctx, key, s, time.Duration(c.config.MsgCacheTimeout)*time.Second).Err(); err != nil { return errs.Wrap(err) } return nil @@ -491,10 +492,10 @@ func (c *msgCache) UserDeleteMsgs(ctx context.Context, conversationID string, se if err != nil { return errs.Wrap(err) } - if err := c.rdb.Expire(ctx, delUserListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + if err := c.rdb.Expire(ctx, delUserListKey, time.Duration(c.config.MsgCacheTimeout)*time.Second).Err(); err != nil { return errs.Wrap(err) } - if err := c.rdb.Expire(ctx, userDelListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + if err := c.rdb.Expire(ctx, userDelListKey, time.Duration(c.config.MsgCacheTimeout)*time.Second).Err(); err != nil { return errs.Wrap(err) } } @@ -599,7 +600,7 @@ func (c *msgCache) DelUserDeleteMsgsList(ctx context.Context, conversationID str } func (c *msgCache) DeleteMessages(ctx context.Context, conversationID string, seqs []int64) error { - if config.Config.Redis.EnablePipeline { + if c.config.Redis.EnablePipeline { return c.PipeDeleteMessages(ctx, conversationID, seqs) } @@ -681,7 +682,7 @@ func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []in if err != nil { return errs.Wrap(err) } - if err := c.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + if err := c.rdb.Set(ctx, key, s, time.Duration(c.config.MsgCacheTimeout)*time.Second).Err(); err != nil { return errs.Wrap(err) } } diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index b841a7d31..88507e910 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -126,16 +126,16 @@ type CommonMsgDatabase interface { ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) } -func NewCommonMsgDatabase(msgDocModel unrelationtb.MsgDocModelInterface, cacheModel cache.MsgModel) (CommonMsgDatabase, error) { - producerToRedis, err := kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.LatestMsgToRedis.Topic) +func NewCommonMsgDatabase(msgDocModel unrelationtb.MsgDocModelInterface, cacheModel cache.MsgModel, config *config.GlobalConfig) (CommonMsgDatabase, error) { + producerToRedis, err := kafka.NewKafkaProducer(config.Kafka.Addr, config.Kafka.LatestMsgToRedis.Topic) if err != nil { return nil, err } - producerToMongo, err := kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.MsgToMongo.Topic) + producerToMongo, err := kafka.NewKafkaProducer(config.Kafka.Addr, config.Kafka.MsgToMongo.Topic) if err != nil { return nil, err } - producerToPush, err := kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.MsgToPush.Topic) + producerToPush, err := kafka.NewKafkaProducer(config.Kafka.Addr, config.Kafka.MsgToPush.Topic) if err != nil { return nil, err } @@ -148,10 +148,10 @@ func NewCommonMsgDatabase(msgDocModel unrelationtb.MsgDocModelInterface, cacheMo }, nil } -func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database) (CommonMsgDatabase, error) { - cacheModel := cache.NewMsgCacheModel(rdb) +func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, config *config.GlobalConfig) (CommonMsgDatabase, error) { + cacheModel := cache.NewMsgCacheModel(rdb, config) msgDocModel := unrelation.NewMsgMongoDriver(database) - return NewCommonMsgDatabase(msgDocModel, cacheModel) + return NewCommonMsgDatabase(msgDocModel, cacheModel, config) } type commonMsgDatabase struct { diff --git a/pkg/common/db/s3/cos/cos.go b/pkg/common/db/s3/cos/cos.go index a82ffe670..c5af9775f 100644 --- a/pkg/common/db/s3/cos/cos.go +++ b/pkg/common/db/s3/cos/cos.go @@ -56,8 +56,8 @@ const ( videoSnapshotImageJpg = "jpg" ) -func NewCos() (s3.Interface, error) { - conf := config.Config.Object.Cos +func NewCos(config *config.GlobalConfig) (s3.Interface, error) { + conf := config.Object.Cos u, err := url.Parse(conf.BucketURL) if err != nil { panic(err) @@ -73,6 +73,7 @@ func NewCos() (s3.Interface, error) { copyURL: u.Host + "/", client: client, credential: client.GetCredential(), + config: config, }, nil } @@ -80,6 +81,7 @@ type Cos struct { copyURL string client *cos.Client credential *cos.Credential + config *config.GlobalConfig } func (c *Cos) Engine() string { @@ -328,7 +330,7 @@ func (c *Cos) AccessURL(ctx context.Context, name string, expire time.Duration, } func (c *Cos) getPresignedURL(ctx context.Context, name string, expire time.Duration, opt *cos.PresignedURLOptions) (*url.URL, error) { - if !config.Config.Object.Cos.PublicRead { + if !c.config.Object.Cos.PublicRead { return c.client.Object.GetPresignedURL(ctx, http.MethodGet, name, c.credential.SecretID, c.credential.SecretKey, expire, opt) } return c.client.Object.GetObjectURL(name), nil diff --git a/pkg/common/db/s3/minio/minio.go b/pkg/common/db/s3/minio/minio.go index 5a615dcfd..e8da5856f 100644 --- a/pkg/common/db/s3/minio/minio.go +++ b/pkg/common/db/s3/minio/minio.go @@ -59,13 +59,13 @@ const ( const successCode = http.StatusOK -func NewMinio(cache cache.MinioCache) (s3.Interface, error) { - u, err := url.Parse(config.Config.Object.Minio.Endpoint) +func NewMinio(cache cache.MinioCache, config config.GlobalConfig) (s3.Interface, error) { + u, err := url.Parse(config.Object.Minio.Endpoint) if err != nil { return nil, err } opts := &minio.Options{ - Creds: credentials.NewStaticV4(config.Config.Object.Minio.AccessKeyID, config.Config.Object.Minio.SecretAccessKey, config.Config.Object.Minio.SessionToken), + Creds: credentials.NewStaticV4(config.Object.Minio.AccessKeyID, config.Object.Minio.SecretAccessKey, config.Object.Minio.SessionToken), Secure: u.Scheme == "https", } client, err := minio.New(u.Host, opts) @@ -73,26 +73,27 @@ func NewMinio(cache cache.MinioCache) (s3.Interface, error) { return nil, err } m := &Minio{ - bucket: config.Config.Object.Minio.Bucket, + bucket: config.Object.Minio.Bucket, core: &minio.Core{Client: client}, lock: &sync.Mutex{}, init: false, cache: cache, + config: config, } - if config.Config.Object.Minio.SignEndpoint == "" || config.Config.Object.Minio.SignEndpoint == config.Config.Object.Minio.Endpoint { + if config.Object.Minio.SignEndpoint == "" || config.Object.Minio.SignEndpoint == config.Object.Minio.Endpoint { m.opts = opts m.sign = m.core.Client m.prefix = u.Path u.Path = "" - config.Config.Object.Minio.Endpoint = u.String() - m.signEndpoint = config.Config.Object.Minio.Endpoint + config.Object.Minio.Endpoint = u.String() + m.signEndpoint = config.Object.Minio.Endpoint } else { - su, err := url.Parse(config.Config.Object.Minio.SignEndpoint) + su, err := url.Parse(config.Object.Minio.SignEndpoint) if err != nil { return nil, err } m.opts = &minio.Options{ - Creds: credentials.NewStaticV4(config.Config.Object.Minio.AccessKeyID, config.Config.Object.Minio.SecretAccessKey, config.Config.Object.Minio.SessionToken), + Creds: credentials.NewStaticV4(config.Object.Minio.AccessKeyID, config.Object.Minio.SecretAccessKey, config.Object.Minio.SessionToken), Secure: su.Scheme == "https", } m.sign, err = minio.New(su.Host, m.opts) @@ -101,8 +102,8 @@ func NewMinio(cache cache.MinioCache) (s3.Interface, error) { } m.prefix = su.Path su.Path = "" - config.Config.Object.Minio.SignEndpoint = su.String() - m.signEndpoint = config.Config.Object.Minio.SignEndpoint + config.Object.Minio.SignEndpoint = su.String() + m.signEndpoint = config.Object.Minio.SignEndpoint } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -123,6 +124,7 @@ type Minio struct { init bool prefix string cache cache.MinioCache + config config.GlobalConfig } func (m *Minio) initMinio(ctx context.Context) error { @@ -134,7 +136,7 @@ func (m *Minio) initMinio(ctx context.Context) error { if m.init { return nil } - conf := config.Config.Object.Minio + conf := m.config.Object.Minio exists, err := m.core.Client.BucketExists(ctx, conf.Bucket) if err != nil { return fmt.Errorf("check bucket exists error: %w", err) @@ -399,7 +401,7 @@ func (m *Minio) PresignedGetObject(ctx context.Context, name string, expire time rawURL *url.URL err error ) - if config.Config.Object.Minio.PublicRead { + if m.config.Object.Minio.PublicRead { rawURL, err = makeTargetURL(m.sign, m.bucket, name, m.location, false, query) } else { rawURL, err = m.sign.PresignedGetObject(ctx, m.bucket, name, expire, query) diff --git a/pkg/common/db/unrelation/mongo.go b/pkg/common/db/unrelation/mongo.go index 4c093b3c3..6db551295 100644 --- a/pkg/common/db/unrelation/mongo.go +++ b/pkg/common/db/unrelation/mongo.go @@ -42,9 +42,9 @@ type Mongo struct { } // NewMongo Initialize MongoDB connection. -func NewMongo() (*Mongo, error) { +func NewMongo(config *config.GlobalConfig) (*Mongo, error) { specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound) - uri := buildMongoURI() + uri := buildMongoURI(config) var mongoClient *mongo.Client var err error @@ -68,14 +68,14 @@ func NewMongo() (*Mongo, error) { return nil, errs.Wrap(err, uri) } -func buildMongoURI() string { +func buildMongoURI(config *config.GlobalConfig) string { uri := os.Getenv("MONGO_URI") if uri != "" { return uri } - if config.Config.Mongo.Uri != "" { - return config.Config.Mongo.Uri + if config.Mongo.Uri != "" { + return config.Mongo.Uri } username := os.Getenv("MONGO_OPENIM_USERNAME") @@ -86,21 +86,21 @@ func buildMongoURI() string { maxPoolSize := os.Getenv("MONGO_MAX_POOL_SIZE") if username == "" { - username = config.Config.Mongo.Username + username = config.Mongo.Username } if password == "" { - password = config.Config.Mongo.Password + password = config.Mongo.Password } if address == "" { - address = strings.Join(config.Config.Mongo.Address, ",") + address = strings.Join(config.Mongo.Address, ",") } else if port != "" { address = fmt.Sprintf("%s:%s", address, port) } if database == "" { - database = config.Config.Mongo.Database + database = config.Mongo.Database } if maxPoolSize == "" { - maxPoolSize = fmt.Sprint(config.Config.Mongo.MaxPoolSize) + maxPoolSize = fmt.Sprint(config.Mongo.MaxPoolSize) } uriFormat := "mongodb://%s/%s?maxPoolSize=%s" @@ -124,8 +124,8 @@ func (m *Mongo) GetClient() *mongo.Client { } // GetDatabase returns the specific database from MongoDB. -func (m *Mongo) GetDatabase() *mongo.Database { - return m.db.Database(config.Config.Mongo.Database) +func (m *Mongo) GetDatabase(database string) *mongo.Database { + return m.db.Database(database) } // CreateMsgIndex creates an index for messages in MongoDB. diff --git a/pkg/rpcclient/auth.go b/pkg/rpcclient/auth.go index 0ee021de1..5501a3faf 100644 --- a/pkg/rpcclient/auth.go +++ b/pkg/rpcclient/auth.go @@ -25,17 +25,18 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) -func NewAuth(discov discoveryregistry.SvcDiscoveryRegistry) *Auth { - conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImAuthName) +func NewAuth(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *Auth { + conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImAuthName) if err != nil { panic(err) } client := auth.NewAuthClient(conn) - return &Auth{discov: discov, conn: conn, Client: client} + return &Auth{discov: discov, conn: conn, Client: client, Config: config} } type Auth struct { conn grpc.ClientConnInterface Client auth.AuthClient discov discoveryregistry.SvcDiscoveryRegistry + Config *config.GlobalConfig } diff --git a/pkg/rpcclient/conversation.go b/pkg/rpcclient/conversation.go index 3ba8dd8c0..bfe71b79d 100644 --- a/pkg/rpcclient/conversation.go +++ b/pkg/rpcclient/conversation.go @@ -31,21 +31,22 @@ type Conversation struct { Client pbconversation.ConversationClient conn grpc.ClientConnInterface discov discoveryregistry.SvcDiscoveryRegistry + Config *config.GlobalConfig } -func NewConversation(discov discoveryregistry.SvcDiscoveryRegistry) *Conversation { - conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImConversationName) +func NewConversation(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *Conversation { + conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImConversationName) if err != nil { panic(err) } client := pbconversation.NewConversationClient(conn) - return &Conversation{discov: discov, conn: conn, Client: client} + return &Conversation{discov: discov, conn: conn, Client: client, Config: config} } type ConversationRpcClient Conversation -func NewConversationRpcClient(discov discoveryregistry.SvcDiscoveryRegistry) ConversationRpcClient { - return ConversationRpcClient(*NewConversation(discov)) +func NewConversationRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) ConversationRpcClient { + return ConversationRpcClient(*NewConversation(discov, config)) } func (c *ConversationRpcClient) GetSingleConversationRecvMsgOpt(ctx context.Context, userID, conversationID string) (int32, error) { diff --git a/pkg/rpcclient/friend.go b/pkg/rpcclient/friend.go index b84db40d4..090b62ad9 100644 --- a/pkg/rpcclient/friend.go +++ b/pkg/rpcclient/friend.go @@ -30,21 +30,22 @@ type Friend struct { conn grpc.ClientConnInterface Client friend.FriendClient discov discoveryregistry.SvcDiscoveryRegistry + Config *config.GlobalConfig } -func NewFriend(discov discoveryregistry.SvcDiscoveryRegistry) *Friend { - conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImFriendName) +func NewFriend(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *Friend { + conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImFriendName) if err != nil { panic(err) } client := friend.NewFriendClient(conn) - return &Friend{discov: discov, conn: conn, Client: client} + return &Friend{discov: discov, conn: conn, Client: client, Config: config} } type FriendRpcClient Friend -func NewFriendRpcClient(discov discoveryregistry.SvcDiscoveryRegistry) FriendRpcClient { - return FriendRpcClient(*NewFriend(discov)) +func NewFriendRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) FriendRpcClient { + return FriendRpcClient(*NewFriend(discov, config)) } func (f *FriendRpcClient) GetFriendsInfo( diff --git a/pkg/rpcclient/group.go b/pkg/rpcclient/group.go index bf0efe60c..b61e5eb89 100644 --- a/pkg/rpcclient/group.go +++ b/pkg/rpcclient/group.go @@ -34,21 +34,22 @@ type Group struct { conn grpc.ClientConnInterface Client group.GroupClient discov discoveryregistry.SvcDiscoveryRegistry + Config *config.GlobalConfig } -func NewGroup(discov discoveryregistry.SvcDiscoveryRegistry) *Group { - conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImGroupName) +func NewGroup(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *Group { + conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImGroupName) if err != nil { panic(err) } client := group.NewGroupClient(conn) - return &Group{discov: discov, conn: conn, Client: client} + return &Group{discov: discov, conn: conn, Client: client, Config: config} } type GroupRpcClient Group -func NewGroupRpcClient(discov discoveryregistry.SvcDiscoveryRegistry) GroupRpcClient { - return GroupRpcClient(*NewGroup(discov)) +func NewGroupRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) GroupRpcClient { + return GroupRpcClient(*NewGroup(discov, config)) } func (g *GroupRpcClient) GetGroupInfos( diff --git a/pkg/rpcclient/msg.go b/pkg/rpcclient/msg.go index 56167d7f4..a8640fb94 100644 --- a/pkg/rpcclient/msg.go +++ b/pkg/rpcclient/msg.go @@ -17,6 +17,8 @@ package rpcclient import ( "context" "encoding/json" + "fmt" + "github.com/OpenIMSDK/tools/errs" "google.golang.org/grpc" "google.golang.org/protobuf/proto" @@ -130,21 +132,22 @@ type Message struct { conn grpc.ClientConnInterface Client msg.MsgClient discov discoveryregistry.SvcDiscoveryRegistry + Config *config.GlobalConfig } -func NewMessage(discov discoveryregistry.SvcDiscoveryRegistry) *Message { - conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImMsgName) +func NewMessage(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *Message { + conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImMsgName) if err != nil { panic(err) } client := msg.NewMsgClient(conn) - return &Message{discov: discov, conn: conn, Client: client} + return &Message{discov: discov, conn: conn, Client: client, Config: config} } type MessageRpcClient Message -func NewMessageRpcClient(discov discoveryregistry.SvcDiscoveryRegistry) MessageRpcClient { - return MessageRpcClient(*NewMessage(discov)) +func NewMessageRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) MessageRpcClient { + return MessageRpcClient(*NewMessage(discov, config)) } func (m *MessageRpcClient) SendMsg(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { @@ -245,8 +248,8 @@ func (s *NotificationSender) NotificationWithSesstionType(ctx context.Context, s n := sdkws.NotificationElem{Detail: utils.StructToJsonString(m)} content, err := json.Marshal(&n) if err != nil { - log.ZError(ctx, "MsgClient Notification json.Marshal failed", err, "sendID", sendID, "recvID", recvID, "contentType", contentType, "msg", m) - return err + errInfo := fmt.Sprintf("MsgClient Notification json.Marshal failed, sendID:%s, recvID:%s, contentType:%d, msg:%s", sendID, recvID, contentType, m) + return errs.Wrap(err, errInfo) } notificationOpt := ¬ificationOpt{} for _, opt := range opts { @@ -258,7 +261,8 @@ func (s *NotificationSender) NotificationWithSesstionType(ctx context.Context, s if notificationOpt.WithRpcGetUsername && s.getUserInfo != nil { userInfo, err = s.getUserInfo(ctx, sendID) if err != nil { - log.ZWarn(ctx, "getUserInfo failed", err, "sendID", sendID) + errInfo := fmt.Sprintf("getUserInfo failed, sendID:%s", sendID) + return errs.Wrap(err, errInfo) } else { msg.SenderNickname = userInfo.Nickname msg.SenderFaceURL = userInfo.FaceURL @@ -290,10 +294,9 @@ func (s *NotificationSender) NotificationWithSesstionType(ctx context.Context, s msg.OfflinePushInfo = &offlineInfo req.MsgData = &msg _, err = s.sendMsg(ctx, &req) - if err == nil { - log.ZDebug(ctx, "MsgClient Notification SendMsg success", "req", &req) - } else { - log.ZError(ctx, "MsgClient Notification SendMsg failed", err, "req", &req) + if err != nil { + errInfo := fmt.Sprintf("MsgClient Notification SendMsg failed, req:%s", &req) + return errs.Wrap(err, errInfo) } return err } diff --git a/pkg/rpcclient/notification/group.go b/pkg/rpcclient/notification/group.go index 8c3719b2c..d9906a0db 100755 --- a/pkg/rpcclient/notification/group.go +++ b/pkg/rpcclient/notification/group.go @@ -244,17 +244,11 @@ func (g *GroupNotificationSender) getUsersInfoMap(ctx context.Context, userIDs [ } func (g *GroupNotificationSender) fillOpUser(ctx context.Context, opUser **sdkws.GroupMemberFullInfo, groupID string) (err error) { - defer log.ZDebug(ctx, "return") - defer func() { - if err != nil { - log.ZError(ctx, utils.GetFuncName(1)+" failed", err) - } - }() if opUser == nil { return errs.ErrInternalServer.Wrap("**sdkws.GroupMemberFullInfo is nil") } if *opUser != nil { - return nil + return errs.ErrArgs.Wrap("*opUser is not nil") } userID := mcontext.GetOpUserID(ctx) if groupID != "" { @@ -651,12 +645,6 @@ func (g *GroupNotificationSender) GroupCancelMutedNotification(ctx context.Conte } func (g *GroupNotificationSender) GroupMemberInfoSetNotification(ctx context.Context, groupID, groupMemberUserID string) (err error) { - defer log.ZDebug(ctx, "return") - defer func() { - if err != nil { - log.ZError(ctx, utils.GetFuncName(1)+" failed", err) - } - }() group, err := g.getGroupInfo(ctx, groupID) if err != nil { return err @@ -673,12 +661,6 @@ func (g *GroupNotificationSender) GroupMemberInfoSetNotification(ctx context.Con } func (g *GroupNotificationSender) GroupMemberSetToAdminNotification(ctx context.Context, groupID, groupMemberUserID string) (err error) { - defer log.ZDebug(ctx, "return") - defer func() { - if err != nil { - log.ZError(ctx, utils.GetFuncName(1)+" failed", err) - } - }() group, err := g.getGroupInfo(ctx, groupID) if err != nil { return err diff --git a/pkg/rpcclient/third.go b/pkg/rpcclient/third.go index 73d874005..106d33dd9 100755 --- a/pkg/rpcclient/third.go +++ b/pkg/rpcclient/third.go @@ -33,16 +33,17 @@ type Third struct { Client third.ThirdClient discov discoveryregistry.SvcDiscoveryRegistry MinioClient *minio.Client + Config *config.GlobalConfig } -func NewThird(discov discoveryregistry.SvcDiscoveryRegistry) *Third { - conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImThirdName) +func NewThird(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *Third { + conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImThirdName) if err != nil { panic(err) } client := third.NewThirdClient(conn) minioClient, err := minioInit() - return &Third{discov: discov, Client: client, conn: conn, MinioClient: minioClient} + return &Third{discov: discov, Client: client, conn: conn, MinioClient: minioClient, Config: config} } func minioInit() (*minio.Client, error) { diff --git a/pkg/rpcclient/user.go b/pkg/rpcclient/user.go index 451914cd3..d0403e284 100644 --- a/pkg/rpcclient/user.go +++ b/pkg/rpcclient/user.go @@ -36,16 +36,17 @@ type User struct { conn grpc.ClientConnInterface Client user.UserClient Discov discoveryregistry.SvcDiscoveryRegistry + Config *config.GlobalConfig } // NewUser initializes and returns a User instance based on the provided service discovery registry. -func NewUser(discov discoveryregistry.SvcDiscoveryRegistry) *User { - conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImUserName) +func NewUser(discov discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) *User { + conn, err := discov.GetConn(context.Background(), config.RpcRegisterName.OpenImUserName) if err != nil { panic(err) } client := user.NewUserClient(conn) - return &User{Discov: discov, Client: client, conn: conn} + return &User{Discov: discov, Client: client, conn: conn, Config: config} } // UserRpcClient represents the structure for a User RPC client. @@ -58,8 +59,8 @@ func NewUserRpcClientByUser(user *User) *UserRpcClient { } // NewUserRpcClient initializes a UserRpcClient based on the provided service discovery registry. -func NewUserRpcClient(client discoveryregistry.SvcDiscoveryRegistry) UserRpcClient { - return UserRpcClient(*NewUser(client)) +func NewUserRpcClient(client discoveryregistry.SvcDiscoveryRegistry, config *config.GlobalConfig) UserRpcClient { + return UserRpcClient(*NewUser(client, config)) } // GetUsersInfo retrieves information for multiple users based on their user IDs.