diff --git a/cmd/openim-msgtransfer/main.go b/cmd/openim-msgtransfer/main.go index dbd64d903..23b629d69 100644 --- a/cmd/openim-msgtransfer/main.go +++ b/cmd/openim-msgtransfer/main.go @@ -20,10 +20,7 @@ import ( ) func main() { - msgTransferCmd := cmd.NewMsgTransferCmd(cmd.MsgTransferServer) - msgTransferCmd.AddPrometheusPortFlag() - msgTransferCmd.AddTransferProgressFlag() - if err := msgTransferCmd.Exec(); err != nil { + if err := cmd.NewMsgTransferCmd().Exec(); err != nil { program.ExitWithError(err) } } diff --git a/cmd/openim-push/main.go b/cmd/openim-push/main.go index 12322637d..cd5106d17 100644 --- a/cmd/openim-push/main.go +++ b/cmd/openim-push/main.go @@ -15,16 +15,12 @@ package main import ( - "github.com/openimsdk/open-im-server/v3/internal/push" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/tools/system/program" ) func main() { - pushCmd := cmd.NewRpcCmd(cmd.RpcPushServer, push.Start) - pushCmd.AddPortFlag() - pushCmd.AddPrometheusPortFlag() - if err := pushCmd.Exec(); err != nil { + if err := cmd.NewPushRpcCmd().Exec(); err != nil { program.ExitWithError(err) } } diff --git a/cmd/openim-rpc/openim-rpc-user/main.go b/cmd/openim-rpc/openim-rpc-user/main.go index 01faeab28..75ab90291 100644 --- a/cmd/openim-rpc/openim-rpc-user/main.go +++ b/cmd/openim-rpc/openim-rpc-user/main.go @@ -15,16 +15,12 @@ package main import ( - "github.com/openimsdk/open-im-server/v3/internal/rpc/user" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/tools/system/program" ) func main() { - rpcCmd := cmd.NewRpcCmd(cmd.RpcUserServer, user.Start) - rpcCmd.AddPortFlag() - rpcCmd.AddPrometheusPortFlag() - if err := rpcCmd.Exec(); err != nil { + if err := cmd.NewUserRpcCmd().Exec(); err != nil { program.ExitWithError(err) } } diff --git a/config/kafka.yml b/config/kafka.yml index 02b23c142..ce1f80be7 100644 --- a/config/kafka.yml +++ b/config/kafka.yml @@ -1,13 +1,18 @@ username: '' password: '' +producerAck: "" +compressType: "none" addr: [ localhost:19094 ] -toRedis: - topic: "toRedis" -toMongo: - topic: "toMongo" -toPush: - topic: "toPush" -consumerGroupID: - msgToRedis: redis - msgToMongo: mongo - msgToPush: push +toRedisTopic: "toRedis" +toMongoTopic: "toMongo" +toPushTopic: "toPush" +toRedisGroupID: redis +toMongoGroupID: mongo +toPushGroupID: push +tls: + enableTLS: false + caCrt: "" + clientCrt: "" + clientKey: "" + clientKeyPwd: "" + insecureSkipVerify: false diff --git a/config/openim-push.yml b/config/openim-push.yml index e1fdf15db..35e1b2c07 100644 --- a/config/openim-push.yml +++ b/config/openim-push.yml @@ -7,7 +7,7 @@ prometheus: enable: true ports: [ 20107 ] - +maxConcurrentWorkers: 3 enable: getui geTui: pushUrl: "https://restapi.getui.com/v2/$appId" diff --git a/config/openim-rpc-auth.yml b/config/openim-rpc-auth.yml index eacd5be50..1ad3fe24b 100644 --- a/config/openim-rpc-auth.yml +++ b/config/openim-rpc-auth.yml @@ -10,4 +10,3 @@ prometheus: tokenPolicy: expire: 90 -secret: openIM123 \ No newline at end of file diff --git a/config/redis.yml b/config/redis.yml index 0a93ca1ba..c4ba110c1 100644 --- a/config/redis.yml +++ b/config/redis.yml @@ -2,6 +2,7 @@ redis: address: [ localhost:16379 ] username: '' password: openIM123 + enablePipeline: false clusterMode: false db: 0 MaxRetry: 10 \ No newline at end of file diff --git a/config/share.yml b/config/share.yml index 60d1a1856..eeaf1e55c 100644 --- a/config/share.yml +++ b/config/share.yml @@ -1,3 +1,4 @@ +secret: openIM123 env: zookeeper rpcRegisterName: user: User diff --git a/config/webhooks.yml b/config/webhooks.yml index 5f583ed33..689ac3e08 100644 --- a/config/webhooks.yml +++ b/config/webhooks.yml @@ -19,6 +19,10 @@ beforeSendGroupMsg: enable: false timeout: 5 failedContinue: true +beforeMsgModify: + enable: false + timeout: 5 + failedContinue: true afterSendGroupMsg: enable: false timeout: 5 @@ -55,6 +59,10 @@ beforeUpdateUserInfo: enable: false timeout: 5 failedContinue: true +afterUpdateUserInfo: + enable: false + timeout: 5 + failedContinue: true beforeCreateGroup: enable: false timeout: 5 diff --git a/internal/api/route.go b/internal/api/route.go index 374ccbb48..bacbf3f53 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -33,7 +33,6 @@ import ( "github.com/gin-gonic/gin/binding" "github.com/go-playground/validator/v10" "github.com/openimsdk/open-im-server/v3/pkg/authverify" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" @@ -312,11 +311,12 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, rdb redis.UniversalClie return r } -func GinParseToken(rdb redis.UniversalClient, config *config.GlobalConfig) gin.HandlerFunc { +func GinParseToken(rdb redis.UniversalClient, config *cmd.ApiConfig) gin.HandlerFunc { + //todo TokenPolicy dataBase := controller.NewAuthDatabase( cache.NewTokenCacheModel(rdb), - config.Secret, - config.TokenPolicy.Expire, + config.Share.Secret, + 0, ) return func(c *gin.Context) { switch c.Request.Method { @@ -328,7 +328,7 @@ func GinParseToken(rdb redis.UniversalClient, config *config.GlobalConfig) gin.H c.Abort() return } - claims, err := tokenverify.GetClaimFromToken(token, authverify.Secret(config.Secret)) + claims, err := tokenverify.GetClaimFromToken(token, authverify.Secret(config.Share.Secret)) if err != nil { log.ZWarn(c, "jwt get token error", errs.ErrTokenUnknown.Wrap()) apiresp.GinError(c, servererrs.ErrTokenUnknown.Wrap()) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 3ca93bd2a..daad05881 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -17,8 +17,10 @@ package msgtransfer import ( "context" "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" + "github.com/openimsdk/tools/utils/datautil" "net/http" "os" "os/signal" @@ -54,46 +56,44 @@ type MsgTransfer struct { cancel context.CancelFunc } -func Start(ctx context.Context, config *config.GlobalConfig, prometheusPort, index int) error { - log.CInfo(ctx, "MSG-TRANSFER server is initializing", "prometheusPort", prometheusPort, "index", index) - mgocli, err := mongoutil.NewMongoDB(ctx, config.Mongo.Build()) +func Start(ctx context.Context, index int, config *cmd.MsgTransferConfig) error { + log.CInfo(ctx, "MSG-TRANSFER server is initializing", "prometheusPorts", config.MsgTransfer.Prometheus.Ports, "index", index) + mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { return err } - rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build()) + rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) if err != nil { return err } - if err != nil { - return err - } - client, err := kdisc.NewDiscoveryRegister(config) + client, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig) if err != nil { return err } - if err := client.CreateRpcRootNodes(config.GetServiceNames()); err != nil { + if err := client.CreateRpcRootNodes(config.Share.RpcRegisterName.GetServiceNames()); err != nil { return err } client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) - msgModel := cache.NewMsgCache(rdb, config.MsgCacheTimeout, &config.Redis) + //todo MsgCacheTimeout + msgModel := cache.NewMsgCache(rdb, 86400, config.RedisConfig.EnablePipeline) seqModel := cache.NewSeqCache(rdb) msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB()) if err != nil { return err } - msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, &config.Kafka) + msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, &config.KafkaConfig) if err != nil { return err } - conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.RpcRegisterName.OpenImConversationName) - groupRpcClient := rpcclient.NewGroupRpcClient(client, config.RpcRegisterName.OpenImGroupName) - msgTransfer, err := NewMsgTransfer(&config.Kafka, msgDatabase, &conversationRpcClient, &groupRpcClient) + conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) + groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) + msgTransfer, err := NewMsgTransfer(&config.KafkaConfig, msgDatabase, &conversationRpcClient, &groupRpcClient) if err != nil { return err } - return msgTransfer.Start(prometheusPort, config, index) + return msgTransfer.Start(index, config) } func NewMsgTransfer(kafkaConf *config.Kafka, msgDatabase controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*MsgTransfer, error) { @@ -112,11 +112,11 @@ func NewMsgTransfer(kafkaConf *config.Kafka, msgDatabase controller.CommonMsgDat }, nil } -func (m *MsgTransfer) Start(prometheusPort int, config *config.GlobalConfig, index int) error { - if prometheusPort <= 0 { - return errs.New("invalid prometheus port", "prometheusPort", prometheusPort) +func (m *MsgTransfer) Start(index int, config *cmd.MsgTransferConfig) error { + prometheusPort, err := datautil.GetElemByIndex(config.MsgTransfer.Prometheus.Ports, index) + if err != nil { + return err } - m.ctx, m.cancel = context.WithCancel(context.Background()) var ( @@ -127,13 +127,13 @@ func (m *MsgTransfer) Start(prometheusPort int, config *config.GlobalConfig, ind go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH) go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH) - if config.Prometheus.Enable { + if config.MsgTransfer.Prometheus.Enable { go func() { proreg := prometheus.NewRegistry() proreg.MustRegister( collectors.NewGoCollector(), ) - proreg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer", &config.RpcRegisterName)...) + proreg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer", &config.ZookeeperConfig)...) http.Handle("/metrics", promhttp.HandlerFor(proreg, promhttp.HandlerOpts{Registry: proreg})) err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil) if err != nil && err != http.ErrServerClosed { diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index d532f6a71..43f4622a8 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -81,8 +81,9 @@ type OnlineHistoryRedisConsumerHandler struct { groupRpcClient *rpcclient.GroupRpcClient } -func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*OnlineHistoryRedisConsumerHandler, error) { - historyConsumerGroup, err := kafka.NewMConsumerGroup(&kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToRedis, []string{kafkaConf.LatestMsgToRedis.Topic}) +func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase, + conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*OnlineHistoryRedisConsumerHandler, error) { + historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}) if err != nil { return nil, err } diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 703a9ba91..9a989fb6d 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -33,7 +33,7 @@ type OnlineHistoryMongoConsumerHandler struct { } func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) { - historyConsumerGroup, err := kafka.NewMConsumerGroup(&kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToMongo, []string{kafkaConf.MsgToMongo.Topic}) + historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}) if err != nil { return nil, err } diff --git a/internal/push/callback.go b/internal/push/callback.go index bd72cfec7..3b665b38f 100644 --- a/internal/push/callback.go +++ b/internal/push/callback.go @@ -26,8 +26,8 @@ import ( "github.com/openimsdk/tools/utils/datautil" ) -func callbackOfflinePush(ctx context.Context, callback *config.Callback, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error { - if !callback.CallbackOfflinePush.Enable || msg.ContentType == constant.Typing { +func callbackOfflinePush(ctx context.Context, callback *config.Webhooks, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error { + if !callback.BeforeOfflinePush.Enable || msg.ContentType == constant.Typing { return nil } req := &callbackstruct.CallbackBeforePushReq{ @@ -51,7 +51,7 @@ func callbackOfflinePush(ctx context.Context, callback *config.Callback, userIDs } resp := &callbackstruct.CallbackBeforePushResp{} - if err := http.CallBackPostReturn(ctx, callback.CallbackUrl, req, resp, callback.CallbackOfflinePush); err != nil { + if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.BeforeOfflinePush); err != nil { return err } @@ -64,8 +64,8 @@ func callbackOfflinePush(ctx context.Context, callback *config.Callback, userIDs return nil } -func callbackOnlinePush(ctx context.Context, callback *config.Callback, userIDs []string, msg *sdkws.MsgData) error { - if !callback.CallbackOnlinePush.Enable || datautil.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing { +func callbackOnlinePush(ctx context.Context, callback *config.Webhooks, userIDs []string, msg *sdkws.MsgData) error { + if !callback.BeforeOnlinePush.Enable || datautil.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing { return nil } req := callbackstruct.CallbackBeforePushReq{ @@ -87,7 +87,7 @@ func callbackOnlinePush(ctx context.Context, callback *config.Callback, userIDs Content: GetContent(msg), } resp := &callbackstruct.CallbackBeforePushResp{} - if err := http.CallBackPostReturn(ctx, callback.CallbackUrl, req, resp, callback.CallbackOnlinePush); err != nil { + if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.BeforeOnlinePush); err != nil { return err } return nil @@ -95,12 +95,12 @@ func callbackOnlinePush(ctx context.Context, callback *config.Callback, userIDs func callbackBeforeSuperGroupOnlinePush( ctx context.Context, - callback *config.Callback, + callback *config.Webhooks, groupID string, msg *sdkws.MsgData, pushToUserIDs *[]string, ) error { - if !callback.CallbackBeforeSuperGroupOnlinePush.Enable || msg.ContentType == constant.Typing { + if !callback.BeforeGroupOnlinePush.Enable || msg.ContentType == constant.Typing { return nil } req := callbackstruct.CallbackBeforeSuperGroupOnlinePushReq{ @@ -120,7 +120,7 @@ func callbackBeforeSuperGroupOnlinePush( Seq: msg.Seq, } resp := &callbackstruct.CallbackBeforeSuperGroupOnlinePushResp{} - if err := http.CallBackPostReturn(ctx, callback.CallbackUrl, req, resp, callback.CallbackBeforeSuperGroupOnlinePush); err != nil { + if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.BeforeGroupOnlinePush); err != nil { return err } diff --git a/internal/push/offlinepush/fcm/push.go b/internal/push/offlinepush/fcm/push.go index e4808ab2d..3b41959d2 100644 --- a/internal/push/offlinepush/fcm/push.go +++ b/internal/push/offlinepush/fcm/push.go @@ -45,7 +45,7 @@ func NewClient(pushConf *config.Push, cache cache.ThirdCache) (*Fcm, error) { if err != nil { return nil, err } - credentialsFilePath := filepath.Join(projectRoot, "config", pushConf.Fcm.ServiceAccount) + credentialsFilePath := filepath.Join(projectRoot, "config", pushConf.FCM.ServiceAccount) opt := option.WithCredentialsFile(credentialsFilePath) fcmApp, err := firebase.NewApp(context.Background(), nil, opt) if err != nil { diff --git a/internal/push/offlinepush/getui/push.go b/internal/push/offlinepush/getui/push.go index c3bd7e5d8..ae7b8bc58 100644 --- a/internal/push/offlinepush/getui/push.go +++ b/internal/push/offlinepush/getui/push.go @@ -51,14 +51,14 @@ const ( ) type Client struct { - cache cache.MsgModel + cache cache.ThirdCache tokenExpireTime int64 taskIDTTL int64 pushConf *config.Push httpClient *httputil.HTTPClient } -func NewClient(pushConf *config.Push, cache cache.MsgModel) *Client { +func NewClient(pushConf *config.Push, cache cache.ThirdCache) *Client { return &Client{cache: cache, tokenExpireTime: tokenExpireTime, taskIDTTL: taskIDTTL, diff --git a/internal/push/offlinepush/jpush/body/notification.go b/internal/push/offlinepush/jpush/body/notification.go index 0f68e9a68..42e59c46c 100644 --- a/internal/push/offlinepush/jpush/body/notification.go +++ b/internal/push/offlinepush/jpush/body/notification.go @@ -57,7 +57,7 @@ func (n *Notification) SetExtras(extras Extras) { } func (n *Notification) SetAndroidIntent(pushConf *config.Push) { - n.Android.Intent.URL = pushConf.Jpns.PushIntent + n.Android.Intent.URL = pushConf.JPNS.PushIntent } func (n *Notification) IOSEnableMutableContent() { diff --git a/internal/push/offlinepush/jpush/push.go b/internal/push/offlinepush/jpush/push.go index e3f2e8ae7..7d2b17bd2 100644 --- a/internal/push/offlinepush/jpush/push.go +++ b/internal/push/offlinepush/jpush/push.go @@ -26,15 +26,13 @@ import ( ) type JPush struct { - pushConf *config.Push - iOSPushConf *config.IOSPush - httpClient *httputil.HTTPClient + pushConf *config.Push + httpClient *httputil.HTTPClient } -func NewClient(pushConf *config.Push, iOSPushConf *config.IOSPush) *JPush { +func NewClient(pushConf *config.Push) *JPush { return &JPush{pushConf: pushConf, - iOSPushConf: iOSPushConf, - httpClient: httputil.NewHTTPClient(httputil.NewClientConfig()), + httpClient: httputil.NewHTTPClient(httputil.NewClientConfig()), } } @@ -71,7 +69,7 @@ func (j *JPush) Push(ctx context.Context, userIDs []string, title, content strin var msg body.Message msg.SetMsgContent(content) var opt body.Options - opt.SetApnsProduction(j.iOSPushConf.Production) + opt.SetApnsProduction(j.pushConf.IOSPush.Production) var pushObj body.PushObj pushObj.SetPlatform(&pf) pushObj.SetAudience(&au) @@ -85,9 +83,9 @@ func (j *JPush) Push(ctx context.Context, userIDs []string, title, content strin func (j *JPush) request(ctx context.Context, po body.PushObj, resp any, timeout int) error { return j.httpClient.PostReturn( ctx, - j.pushConf.Jpns.PushUrl, + j.pushConf.JPNS.PushURL, map[string]string{ - "Authorization": j.getAuthorization(j.pushConf.Jpns.AppKey, j.pushConf.Jpns.MasterSecret), + "Authorization": j.getAuthorization(j.pushConf.JPNS.AppKey, j.pushConf.JPNS.MasterSecret), }, po, resp, diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index d994fdcde..5de3c594c 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -35,7 +35,7 @@ type ConsumerHandler struct { } func NewConsumerHandler(kafkaConf *config.Kafka, pusher *Pusher) (*ConsumerHandler, error) { - pushConsumerGroup, err := kafka.NewMConsumerGroup(&kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToPush, []string{kafkaConf.MsgToPush.Topic}) + pushConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToPushGroupID, []string{kafkaConf.ToPushTopic}) if err != nil { return nil, err } diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index 57c1bb4d0..1306455c8 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -16,9 +16,9 @@ package push import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/tools/db/redisutil" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/rpccache" @@ -35,20 +35,20 @@ type pushServer struct { pusher *Pusher } -func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { - rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build()) +func Start(ctx context.Context, config *cmd.PushConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { + rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) if err != nil { return err } cacheModel := cache.NewThirdCache(rdb) - offlinePusher, err := NewOfflinePusher(&config.Push, &config.IOSPush, cacheModel) + offlinePusher, err := NewOfflinePusher(&config.RpcConfig, cacheModel) if err != nil { return err } database := controller.NewPushDatabase(cacheModel) - groupRpcClient := rpcclient.NewGroupRpcClient(client, config.RpcRegisterName.OpenImGroupName) - conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.RpcRegisterName.OpenImConversationName) - msgRpcClient := rpcclient.NewMessageRpcClient(client, config.RpcRegisterName.OpenImMsgName) + groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) + conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) + msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) pusher := NewPusher( config, client, @@ -65,7 +65,7 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discovery.Sv pusher: pusher, }) - consumer, err := NewConsumer(&config.Kafka, pusher) + consumer, err := NewConsumer(&config.KafkaConfig, pusher) if err != nil { return err } diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 41675087f..f94c31cb2 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -17,6 +17,7 @@ package push import ( "context" "encoding/json" + "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/tools/errs" "sync" @@ -48,7 +49,7 @@ import ( ) type Pusher struct { - config *config.GlobalConfig + config *cmd.PushConfig database controller.PushDatabase discov discovery.SvcDiscoveryRegistry offlinePusher offlinepush.OfflinePusher @@ -61,7 +62,7 @@ type Pusher struct { var errNoOfflinePusher = errs.New("no offlinePusher is configured") -func NewPusher(config *config.GlobalConfig, discov discovery.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase, +func NewPusher(config *cmd.PushConfig, discov discovery.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase, groupLocalCache *rpccache.GroupLocalCache, conversationLocalCache *rpccache.ConversationLocalCache, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient, ) *Pusher { @@ -78,7 +79,7 @@ func NewPusher(config *config.GlobalConfig, discov discovery.SvcDiscoveryRegistr } } -func NewOfflinePusher(pushConf *config.Push, iOSPushConf *config.IOSPush, cache cache.ThirdCache) (offlinepush.OfflinePusher, error) { +func NewOfflinePusher(pushConf *config.Push, cache cache.ThirdCache) (offlinepush.OfflinePusher, error) { var offlinePusher offlinepush.OfflinePusher switch pushConf.Enable { case "getui": @@ -86,7 +87,7 @@ func NewOfflinePusher(pushConf *config.Push, iOSPushConf *config.IOSPush, cache case "fcm": return fcm.NewClient(pushConf, cache) case "jpush": - offlinePusher = jpush.NewClient(pushConf, iOSPushConf) + offlinePusher = jpush.NewClient(pushConf) default: offlinePusher = dummy.NewClient() } @@ -104,7 +105,7 @@ func (p *Pusher) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error { log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String()) - if err := callbackOnlinePush(ctx, &p.config.Callback, userIDs, msg); err != nil { + if err := callbackOnlinePush(ctx, &p.config.WebhooksConfig, userIDs, msg); err != nil { return err } // push @@ -132,7 +133,7 @@ func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.Msg }) if len(offlinePushUserIDList) > 0 { - if err = callbackOfflinePush(ctx, &p.config.Callback, offlinePushUserIDList, msg, &[]string{}); err != nil { + if err = callbackOfflinePush(ctx, &p.config.WebhooksConfig, offlinePushUserIDList, msg, &[]string{}); err != nil { return err } err = p.offlinePushMsg(ctx, msg.SendID, msg, offlinePushUserIDList) @@ -165,7 +166,7 @@ func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string, } if len(needOfflinePushUserIDs) > 0 { var offlinePushUserIDs []string - err := callbackOfflinePush(ctx, &p.config.Callback, needOfflinePushUserIDs, msg, &offlinePushUserIDs) + err := callbackOfflinePush(ctx, &p.config.WebhooksConfig, needOfflinePushUserIDs, msg, &offlinePushUserIDs) if err != nil { return err } @@ -196,7 +197,7 @@ func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string, func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) var pushToUserIDs []string - if err = callbackBeforeSuperGroupOnlinePush(ctx, &p.config.Callback, groupID, msg, &pushToUserIDs); err != nil { + if err = callbackBeforeSuperGroupOnlinePush(ctx, &p.config.WebhooksConfig, groupID, msg, &pushToUserIDs); err != nil { return err } @@ -238,11 +239,8 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws return err } log.ZDebug(ctx, "GroupDismissedNotificationInfo****", "groupID", groupID, "num", len(pushToUserIDs), "list", pushToUserIDs) - if len(p.config.Manager.UserID) > 0 { - ctx = mcontext.WithOpUserIDContext(ctx, p.config.Manager.UserID[0]) - } - if len(p.config.Manager.UserID) == 0 && len(p.config.IMAdmin.UserID) > 0 { - ctx = mcontext.WithOpUserIDContext(ctx, p.config.IMAdmin.UserID[0]) + if len(p.config.Share.IMAdmin.UserID) > 0 { + ctx = mcontext.WithOpUserIDContext(ctx, p.config.Share.IMAdmin.UserID[0]) } defer func(groupID string) { if err = p.groupRpcClient.DismissGroup(ctx, groupID); err != nil { @@ -260,10 +258,10 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg) isOfflinePush := datautil.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) - if isOfflinePush && p.config.Envs.Discovery == "k8s" { + if isOfflinePush && p.config.Share.Env == "k8s" { return p.k8sOfflinePush2SuperGroup(ctx, groupID, msg, wsResults) } - if isOfflinePush && p.config.Envs.Discovery == "zookeeper" { + if isOfflinePush && p.config.Share.Env == "zookeeper" { var ( onlineSuccessUserIDs = []string{msg.SendID} webAndPcBackgroundUserIDs []string @@ -301,7 +299,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws // Use offline push messaging if len(needOfflinePushUserIDs) > 0 { var offlinePushUserIDs []string - err = callbackOfflinePush(ctx, &p.config.Callback, needOfflinePushUserIDs, msg, &offlinePushUserIDs) + err = callbackOfflinePush(ctx, &p.config.WebhooksConfig, needOfflinePushUserIDs, msg, &offlinePushUserIDs) if err != nil { return err } @@ -360,7 +358,7 @@ func (p *Pusher) k8sOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUs var ( mu sync.Mutex wg = errgroup.Group{} - maxWorkers = p.config.Push.MaxConcurrentWorkers + maxWorkers = p.config.RpcConfig.MaxConcurrentWorkers ) if maxWorkers < 3 { maxWorkers = 3 @@ -389,10 +387,10 @@ func (p *Pusher) k8sOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUs return wsResults, nil } func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { - if p.config.Envs.Discovery == "k8s" { + if p.config.Share.Env == "k8s" { return p.k8sOnlinePush(ctx, msg, pushToUserIDs) } - conns, err := p.discov.GetConns(ctx, p.config.RpcRegisterName.OpenImMessageGatewayName) + conns, err := p.discov.GetConns(ctx, p.config.Share.RpcRegisterName.MessageGateway) log.ZDebug(ctx, "get gateway conn", "conn length", len(conns)) if err != nil { return nil, err @@ -402,7 +400,7 @@ func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, mu sync.Mutex wg = errgroup.Group{} input = &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs} - maxWorkers = p.config.Push.MaxConcurrentWorkers + maxWorkers = p.config.RpcConfig.MaxConcurrentWorkers ) if maxWorkers < 3 { diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 39cd918f1..2a91e548d 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -54,7 +54,7 @@ func Start(ctx context.Context, config *cmd.AuthConfig, client discovery.SvcDisc RegisterCenter: client, authDatabase: controller.NewAuthDatabase( cache.NewTokenCacheModel(rdb), - config.RpcConfig.Secret, + config.Share.Secret, config.RpcConfig.TokenPolicy.Expire, ), config: config, @@ -64,7 +64,7 @@ func Start(ctx context.Context, config *cmd.AuthConfig, client discovery.SvcDisc func (s *authServer) UserToken(ctx context.Context, req *pbauth.UserTokenReq) (*pbauth.UserTokenResp, error) { resp := pbauth.UserTokenResp{} - if req.Secret != s.config.RpcConfig.Secret { + if req.Secret != s.config.Share.Secret { return nil, errs.ErrNoPermission.WrapMsg("secret invalid") } if _, err := s.userRpcClient.GetUserInfo(ctx, req.UserID); err != nil { @@ -102,7 +102,7 @@ func (s *authServer) GetUserToken(ctx context.Context, req *pbauth.GetUserTokenR } func (s *authServer) parseToken(ctx context.Context, tokensString string) (claims *tokenverify.Claims, err error) { - claims, err = tokenverify.GetClaimFromToken(tokensString, authverify.Secret(s.config.RpcConfig.Secret)) + claims, err = tokenverify.GetClaimFromToken(tokensString, authverify.Secret(s.config.Share.Secret)) if err != nil { return nil, errs.Wrap(err) } diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index e8a0bed39..fab94f11d 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -125,7 +125,7 @@ func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadR Seqs: req.Seqs, ContentType: conversation.ConversationType, } - if err := CallbackSingleMsgRead(ctx, m.config, reqCallback); err != nil { + if err := CallbackSingleMsgRead(ctx, &m.config.WebhooksConfig, reqCallback); err != nil { return nil, err } @@ -198,7 +198,7 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon UnreadMsgNum: req.HasReadSeq, ContentType: int64(conversation.ConversationType), } - if err := CallbackGroupMsgRead(ctx, m.config, reqCall); err != nil { + if err := CallbackGroupMsgRead(ctx, &m.config.WebhooksConfig, reqCall); err != nil { return nil, err } diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index 7eb3ebc1c..e4d30de32 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -61,8 +61,8 @@ func GetContent(msg *sdkws.MsgData) string { } } -func callbackBeforeSendSingleMsg(ctx context.Context, globalConfig *config.GlobalConfig, msg *pbchat.SendMsgReq) error { - if !globalConfig.Callback.CallbackBeforeSendSingleMsg.Enable || msg.MsgData.ContentType == constant.Typing { +func callbackBeforeSendSingleMsg(ctx context.Context, callback *config.Webhooks, msg *pbchat.SendMsgReq) error { + if !callback.BeforeSendSingleMsg.Enable || msg.MsgData.ContentType == constant.Typing { return nil } req := &cbapi.CallbackBeforeSendSingleMsgReq{ @@ -70,14 +70,14 @@ func callbackBeforeSendSingleMsg(ctx context.Context, globalConfig *config.Globa RecvID: msg.MsgData.RecvID, } resp := &cbapi.CallbackBeforeSendSingleMsgResp{} - if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackBeforeSendSingleMsg); err != nil { + if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.BeforeSendSingleMsg); err != nil { return err } return nil } -func callbackAfterSendSingleMsg(ctx context.Context, globalConfig *config.GlobalConfig, msg *pbchat.SendMsgReq) error { - if !globalConfig.Callback.CallbackAfterSendSingleMsg.Enable || msg.MsgData.ContentType == constant.Typing { +func callbackAfterSendSingleMsg(ctx context.Context, callback *config.Webhooks, msg *pbchat.SendMsgReq) error { + if !callback.AfterSendSingleMsg.Enable || msg.MsgData.ContentType == constant.Typing { return nil } req := &cbapi.CallbackAfterSendSingleMsgReq{ @@ -85,14 +85,14 @@ func callbackAfterSendSingleMsg(ctx context.Context, globalConfig *config.Global RecvID: msg.MsgData.RecvID, } resp := &cbapi.CallbackAfterSendSingleMsgResp{} - if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackAfterSendSingleMsg); err != nil { + if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.AfterSendSingleMsg); err != nil { return err } return nil } -func callbackBeforeSendGroupMsg(ctx context.Context, globalConfig *config.GlobalConfig, msg *pbchat.SendMsgReq) error { - if !globalConfig.Callback.CallbackBeforeSendGroupMsg.Enable || msg.MsgData.ContentType == constant.Typing { +func callbackBeforeSendGroupMsg(ctx context.Context, callback *config.Webhooks, msg *pbchat.SendMsgReq) error { + if !callback.BeforeSendGroupMsg.Enable || msg.MsgData.ContentType == constant.Typing { return nil } req := &cbapi.CallbackBeforeSendGroupMsgReq{ @@ -100,14 +100,14 @@ func callbackBeforeSendGroupMsg(ctx context.Context, globalConfig *config.Global GroupID: msg.MsgData.GroupID, } resp := &cbapi.CallbackBeforeSendGroupMsgResp{} - if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackBeforeSendGroupMsg); err != nil { + if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.BeforeSendGroupMsg); err != nil { return err } return nil } -func callbackAfterSendGroupMsg(ctx context.Context, globalConfig *config.GlobalConfig, msg *pbchat.SendMsgReq) error { - if !globalConfig.Callback.CallbackAfterSendGroupMsg.Enable || msg.MsgData.ContentType == constant.Typing { +func callbackAfterSendGroupMsg(ctx context.Context, callback *config.Webhooks, msg *pbchat.SendMsgReq) error { + if !callback.AfterSendGroupMsg.Enable || msg.MsgData.ContentType == constant.Typing { return nil } req := &cbapi.CallbackAfterSendGroupMsgReq{ @@ -115,21 +115,21 @@ func callbackAfterSendGroupMsg(ctx context.Context, globalConfig *config.GlobalC GroupID: msg.MsgData.GroupID, } resp := &cbapi.CallbackAfterSendGroupMsgResp{} - if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackAfterSendGroupMsg); err != nil { + if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.AfterSendGroupMsg); err != nil { return err } return nil } -func callbackMsgModify(ctx context.Context, globalConfig *config.GlobalConfig, msg *pbchat.SendMsgReq) error { - if !globalConfig.Callback.CallbackMsgModify.Enable || msg.MsgData.ContentType != constant.Text { +func callbackMsgModify(ctx context.Context, callback *config.Webhooks, msg *pbchat.SendMsgReq) error { + if !callback.BeforeMsgModify.Enable || msg.MsgData.ContentType != constant.Text { return nil } req := &cbapi.CallbackMsgModifyCommandReq{ CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackMsgModifyCommand), } resp := &cbapi.CallbackMsgModifyCommandResp{} - if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackMsgModify); err != nil { + if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.BeforeMsgModify); err != nil { return err } if resp.Content != nil { @@ -154,34 +154,34 @@ func callbackMsgModify(ctx context.Context, globalConfig *config.GlobalConfig, m return nil } -func CallbackGroupMsgRead(ctx context.Context, globalConfig *config.GlobalConfig, req *cbapi.CallbackGroupMsgReadReq) error { - if !globalConfig.Callback.CallbackGroupMsgRead.Enable { +func CallbackGroupMsgRead(ctx context.Context, callback *config.Webhooks, req *cbapi.CallbackGroupMsgReadReq) error { + if !callback.AfterGroupMsgRead.Enable { return nil } req.CallbackCommand = cbapi.CallbackGroupMsgReadCommand resp := &cbapi.CallbackGroupMsgReadResp{} - if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackGroupMsgRead); err != nil { + if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.AfterGroupMsgRead); err != nil { return err } return nil } -func CallbackSingleMsgRead(ctx context.Context, globalConfig *config.GlobalConfig, req *cbapi.CallbackSingleMsgReadReq) error { - if !globalConfig.Callback.CallbackSingleMsgRead.Enable { +func CallbackSingleMsgRead(ctx context.Context, callback *config.Webhooks, req *cbapi.CallbackSingleMsgReadReq) error { + if !callback.AfterSingleMsgRead.Enable { return nil } req.CallbackCommand = cbapi.CallbackSingleMsgRead resp := &cbapi.CallbackSingleMsgReadResp{} - if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackSingleMsgRead); err != nil { + if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.AfterSingleMsgRead); err != nil { return err } return nil } -func CallbackAfterRevokeMsg(ctx context.Context, globalConfig *config.GlobalConfig, req *pbchat.RevokeMsgReq) error { - if !globalConfig.Callback.CallbackAfterRevokeMsg.Enable { +func CallbackAfterRevokeMsg(ctx context.Context, callback *config.Webhooks, req *pbchat.RevokeMsgReq) error { + if !callback.AfterRevokeMsg.Enable { return nil } callbackReq := &cbapi.CallbackAfterRevokeMsgReq{ @@ -191,7 +191,7 @@ func CallbackAfterRevokeMsg(ctx context.Context, globalConfig *config.GlobalConf UserID: req.UserID, } resp := &cbapi.CallbackAfterRevokeMsgResp{} - if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, callbackReq, resp, globalConfig.Callback.CallbackAfterRevokeMsg); err != nil { + if err := http.CallBackPostReturn(ctx, callback.URL, callbackReq, resp, callback.AfterRevokeMsg); err != nil { return err } return nil diff --git a/internal/rpc/msg/message_interceptor.go b/internal/rpc/msg/message_interceptor.go index 34d26cac2..1c7094e73 100644 --- a/internal/rpc/msg/message_interceptor.go +++ b/internal/rpc/msg/message_interceptor.go @@ -16,25 +16,25 @@ package msg import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" ) -type MessageInterceptorFunc func(ctx context.Context, globalConfig *config.GlobalConfig, req *msg.SendMsgReq) (*sdkws.MsgData, error) +type MessageInterceptorFunc func(ctx context.Context, globalConfig *cmd.MsgConfig, req *msg.SendMsgReq) (*sdkws.MsgData, error) -func MessageHasReadEnabled(ctx context.Context, globalConfig *config.GlobalConfig, req *msg.SendMsgReq) (*sdkws.MsgData, error) { +func MessageHasReadEnabled(ctx context.Context, config *cmd.MsgConfig, req *msg.SendMsgReq) (*sdkws.MsgData, error) { switch { case req.MsgData.ContentType == constant.HasReadReceipt && req.MsgData.SessionType == constant.SingleChatType: - if !globalConfig.SingleMessageHasReadReceiptEnable { + if !config.RpcConfig.SingleMessageHasReadReceiptEnable { return nil, servererrs.ErrMessageHasReadDisable.Wrap() } return req.MsgData, nil case req.MsgData.ContentType == constant.HasReadReceipt && req.MsgData.SessionType == constant.SuperGroupChatType: - if !globalConfig.GroupMessageHasReadReceiptEnable { + if !config.RpcConfig.GroupMessageHasReadReceiptEnable { return nil, servererrs.ErrMessageHasReadDisable.Wrap() } return req.MsgData, nil diff --git a/internal/rpc/msg/revoke.go b/internal/rpc/msg/revoke.go index 59f3b84c3..9dc164ab6 100644 --- a/internal/rpc/msg/revoke.go +++ b/internal/rpc/msg/revoke.go @@ -41,7 +41,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. if req.Seq < 0 { return nil, errs.ErrArgs.WrapMsg("seq is invalid") } - if err := authverify.CheckAccessV3(ctx, req.UserID, &m.config.Manager, &m.config.IMAdmin); err != nil { + if err := authverify.CheckAccessV3(ctx, req.UserID, &m.config.Share.IMAdmin); err != nil { return nil, err } user, err := m.UserLocalCache.GetUserInfo(ctx, req.UserID) @@ -62,10 +62,10 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. data, _ := json.Marshal(msgs[0]) log.ZDebug(ctx, "GetMsgBySeqs", "conversationID", req.ConversationID, "seq", req.Seq, "msg", string(data)) var role int32 - if !authverify.IsAppManagerUid(ctx, &m.config.Manager, &m.config.IMAdmin) { + if !authverify.IsAppManagerUid(ctx, &m.config.Share.IMAdmin) { switch msgs[0].SessionType { case constant.SingleChatType: - if err := authverify.CheckAccessV3(ctx, msgs[0].SendID, &m.config.Manager, &m.config.IMAdmin); err != nil { + if err := authverify.CheckAccessV3(ctx, msgs[0].SendID, &m.config.Share.IMAdmin); err != nil { return nil, err } role = user.AppMangerLevel @@ -104,11 +104,8 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. } revokerUserID := mcontext.GetOpUserID(ctx) var flag bool - if len(m.config.Manager.UserID) > 0 { - flag = datautil.Contain(revokerUserID, m.config.Manager.UserID...) - } - if len(m.config.Manager.UserID) == 0 && len(m.config.IMAdmin.UserID) > 0 { - flag = datautil.Contain(revokerUserID, m.config.IMAdmin.UserID...) + if len(m.config.Share.IMAdmin.UserID) > 0 { + flag = datautil.Contain(revokerUserID, m.config.Share.IMAdmin.UserID...) } tips := sdkws.RevokeMsgTips{ RevokerUserID: revokerUserID, @@ -128,7 +125,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. if err := m.notificationSender.NotificationWithSesstionType(ctx, req.UserID, recvID, constant.MsgRevokeNotification, msgs[0].SessionType, &tips); err != nil { return nil, err } - if err = CallbackAfterRevokeMsg(ctx, m.config, req); err != nil { + if err = CallbackAfterRevokeMsg(ctx, &m.config.WebhooksConfig, req); err != nil { return nil, err } return &msg.RevokeMsgResp{}, nil diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index 1881913eb..999c24a67 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -59,10 +59,10 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *pbmsg.SendMs prommetrics.GroupChatMsgProcessFailedCounter.Inc() return nil, err } - if err = callbackBeforeSendGroupMsg(ctx, m.config, req); err != nil { + if err = callbackBeforeSendGroupMsg(ctx, &m.config.WebhooksConfig, req); err != nil { return nil, err } - if err := callbackMsgModify(ctx, m.config, req); err != nil { + if err := callbackMsgModify(ctx, &m.config.WebhooksConfig, req); err != nil { return nil, err } err = m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForGroup(req.MsgData.GroupID), req.MsgData) @@ -72,7 +72,7 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *pbmsg.SendMs if req.MsgData.ContentType == constant.AtText { go m.setConversationAtInfo(ctx, req.MsgData) } - if err = callbackAfterSendGroupMsg(ctx, m.config, req); err != nil { + if err = callbackAfterSendGroupMsg(ctx, &m.config.WebhooksConfig, req); err != nil { log.ZWarn(ctx, "CallbackAfterSendGroupMsg", err) } prommetrics.GroupChatMsgProcessSuccessCounter.Inc() @@ -157,18 +157,18 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq prommetrics.SingleChatMsgProcessFailedCounter.Inc() return nil, nil } else { - if err = callbackBeforeSendSingleMsg(ctx, m.config, req); err != nil { + if err = callbackBeforeSendSingleMsg(ctx, &m.config.WebhooksConfig, req); err != nil { return nil, err } - if err := callbackMsgModify(ctx, m.config, req); err != nil { + if err := callbackMsgModify(ctx, &m.config.WebhooksConfig, req); err != nil { return nil, err } if err := m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil { prommetrics.SingleChatMsgProcessFailedCounter.Inc() return nil, err } - err = callbackAfterSendSingleMsg(ctx, m.config, req) + err = callbackAfterSendSingleMsg(ctx, &m.config.WebhooksConfig, req) if err != nil { log.ZWarn(ctx, "CallbackAfterSendSingleMsg", err, "req", req) } diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 894216a6f..58f21b18b 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -52,7 +52,7 @@ type ( ) func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorFunc) { - m.Handlers = append(m.Handlers, interceptorFunc... + m.Handlers = append(m.Handlers, interceptorFunc...) } @@ -65,12 +65,12 @@ func Start(ctx context.Context, config *cmd.MsgConfig, client discovery.SvcDisco if err != nil { return err } - cacheModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis) msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB()) if err != nil { return err } - msgModel := cache.NewMsgCache(rdb, config.MsgCacheTimeout, &config.Redis) + //todo MsgCacheTimeout + msgModel := cache.NewMsgCache(rdb, 86400, config.RedisConfig.EnablePipeline) seqModel := cache.NewSeqCache(rdb) conversationClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, &config.Share.IMAdmin) diff --git a/internal/rpc/msg/sync_msg.go b/internal/rpc/msg/sync_msg.go index bdb2c0911..5fc2a568c 100644 --- a/internal/rpc/msg/sync_msg.go +++ b/internal/rpc/msg/sync_msg.go @@ -87,7 +87,7 @@ func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessag } func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sdkws.GetMaxSeqResp, error) { - if err := authverify.CheckAccessV3(ctx, req.UserID, &m.config.Manager, &m.config.IMAdmin); err != nil { + if err := authverify.CheckAccessV3(ctx, req.UserID, &m.config.Share.IMAdmin); err != nil { return nil, err } conversationIDs, err := m.ConversationLocalCache.GetConversationIDs(ctx, req.UserID) diff --git a/internal/rpc/msg/utils.go b/internal/rpc/msg/utils.go index eaa8c40ba..9e5a656e0 100644 --- a/internal/rpc/msg/utils.go +++ b/internal/rpc/msg/utils.go @@ -15,7 +15,7 @@ package msg import ( - "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" @@ -23,16 +23,16 @@ import ( "go.mongodb.org/mongo-driver/mongo" ) -func isMessageHasReadEnabled(msgData *sdkws.MsgData, config *config.GlobalConfig) bool { +func isMessageHasReadEnabled(msgData *sdkws.MsgData, config *cmd.MsgConfig) bool { switch { case msgData.ContentType == constant.HasReadReceipt && msgData.SessionType == constant.SingleChatType: - if config.SingleMessageHasReadReceiptEnable { + if config.RpcConfig.SingleMessageHasReadReceiptEnable { return true } else { return false } case msgData.ContentType == constant.HasReadReceipt && msgData.SessionType == constant.SuperGroupChatType: - if config.GroupMessageHasReadReceiptEnable { + if config.RpcConfig.GroupMessageHasReadReceiptEnable { return true } else { return false diff --git a/internal/rpc/msg/verify.go b/internal/rpc/msg/verify.go index c9605d8b4..86b8cb372 100644 --- a/internal/rpc/msg/verify.go +++ b/internal/rpc/msg/verify.go @@ -52,10 +52,7 @@ type MessageRevoked struct { func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgReq) error { switch data.MsgData.SessionType { case constant.SingleChatType: - if len(m.config.Manager.UserID) > 0 && datautil.Contain(data.MsgData.SendID, m.config.Manager.UserID...) { - return nil - } - if datautil.Contain(data.MsgData.SendID, m.config.IMAdmin.UserID...) { + if datautil.Contain(data.MsgData.SendID, m.config.Share.IMAdmin.UserID...) { return nil } if data.MsgData.ContentType <= constant.NotificationEnd && @@ -69,7 +66,7 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe if black { return servererrs.ErrBlockedByPeer.Wrap() } - if m.config.MessageVerify.FriendVerify != nil && *m.config.MessageVerify.FriendVerify { + if m.config.RpcConfig.FriendVerify { friend, err := m.FriendLocalCache.IsFriend(ctx, data.MsgData.SendID, data.MsgData.RecvID) if err != nil { return err @@ -92,10 +89,8 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe if groupInfo.GroupType == constant.SuperGroup { return nil } - if len(m.config.Manager.UserID) > 0 && datautil.Contain(data.MsgData.SendID, m.config.Manager.UserID...) { - return nil - } - if datautil.Contain(data.MsgData.SendID, m.config.IMAdmin.UserID...) { + + if datautil.Contain(data.MsgData.SendID, m.config.Share.IMAdmin.UserID...) { return nil } if data.MsgData.ContentType <= constant.NotificationEnd && diff --git a/internal/rpc/third/log.go b/internal/rpc/third/log.go index b1e10216c..40c660c6b 100644 --- a/internal/rpc/third/log.go +++ b/internal/rpc/third/log.go @@ -82,7 +82,7 @@ func (t *thirdServer) UploadLogs(ctx context.Context, req *third.UploadLogsReq) } func (t *thirdServer) DeleteLogs(ctx context.Context, req *third.DeleteLogsReq) (*third.DeleteLogsResp, error) { - if err := authverify.CheckAdmin(ctx, &t.config.Manager, &t.config.IMAdmin); err != nil { + if err := authverify.CheckAdmin(ctx, &t.config.Share.IMAdmin); err != nil { return nil, err } userID := "" @@ -123,7 +123,7 @@ func dbToPbLogInfos(logs []*relationtb.LogModel) []*third.LogInfo { } func (t *thirdServer) SearchLogs(ctx context.Context, req *third.SearchLogsReq) (*third.SearchLogsResp, error) { - if err := authverify.CheckAdmin(ctx, &t.config.Manager, &t.config.IMAdmin); err != nil { + if err := authverify.CheckAdmin(ctx, &t.config.Share.IMAdmin); err != nil { return nil, err } var ( diff --git a/internal/rpc/third/tool.go b/internal/rpc/third/tool.go index 0d8c678b3..57acda0bb 100644 --- a/internal/rpc/third/tool.go +++ b/internal/rpc/third/tool.go @@ -54,7 +54,7 @@ func (t *thirdServer) checkUploadName(ctx context.Context, name string) error { if opUserID == "" { return errs.ErrNoPermission.WrapMsg("opUserID is empty") } - if !authverify.IsManagerUserID(opUserID, &t.config.Manager, &t.config.IMAdmin) { + if !authverify.IsManagerUserID(opUserID, &t.config.Share.IMAdmin) { if !strings.HasPrefix(name, opUserID+"/") { return errs.ErrNoPermission.WrapMsg(fmt.Sprintf("name must start with `%s/`", opUserID)) } @@ -80,5 +80,5 @@ func checkValidObjectName(objectName string) error { } func (t *thirdServer) IsManagerUserID(opUserID string) bool { - return authverify.IsManagerUserID(opUserID, &t.config.Manager, &t.config.IMAdmin) + return authverify.IsManagerUserID(opUserID, &t.config.Share.IMAdmin) } diff --git a/internal/rpc/user/callback.go b/internal/rpc/user/callback.go index 128ca7cb3..fa2f14be6 100644 --- a/internal/rpc/user/callback.go +++ b/internal/rpc/user/callback.go @@ -24,8 +24,8 @@ import ( pbuser "github.com/openimsdk/protocol/user" ) -func CallbackBeforeUpdateUserInfo(ctx context.Context, globalConfig *config.GlobalConfig, req *pbuser.UpdateUserInfoReq) error { - if !globalConfig.Callback.CallbackBeforeUpdateUserInfo.Enable { +func CallbackBeforeUpdateUserInfo(ctx context.Context, callback *config.Webhooks, req *pbuser.UpdateUserInfoReq) error { + if !callback.BeforeUpdateUserInfo.Enable { return nil } cbReq := &cbapi.CallbackBeforeUpdateUserInfoReq{ @@ -35,7 +35,7 @@ func CallbackBeforeUpdateUserInfo(ctx context.Context, globalConfig *config.Glob Nickname: &req.UserInfo.Nickname, } resp := &cbapi.CallbackBeforeUpdateUserInfoResp{} - if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackBeforeUpdateUserInfo); err != nil { + if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.BeforeUpdateUserInfo); err != nil { return err } datautil.NotNilReplace(&req.UserInfo.FaceURL, resp.FaceURL) @@ -43,8 +43,8 @@ func CallbackBeforeUpdateUserInfo(ctx context.Context, globalConfig *config.Glob datautil.NotNilReplace(&req.UserInfo.Nickname, resp.Nickname) return nil } -func CallbackAfterUpdateUserInfo(ctx context.Context, globalConfig *config.GlobalConfig, req *pbuser.UpdateUserInfoReq) error { - if !globalConfig.Callback.CallbackAfterUpdateUserInfo.Enable { +func CallbackAfterUpdateUserInfo(ctx context.Context, callback *config.Webhooks, req *pbuser.UpdateUserInfoReq) error { + if !callback.AfterUpdateUserInfo.Enable { return nil } cbReq := &cbapi.CallbackAfterUpdateUserInfoReq{ @@ -54,13 +54,13 @@ func CallbackAfterUpdateUserInfo(ctx context.Context, globalConfig *config.Globa Nickname: req.UserInfo.Nickname, } resp := &cbapi.CallbackAfterUpdateUserInfoResp{} - if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackBeforeUpdateUserInfo); err != nil { + if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.AfterUpdateUserInfo); err != nil { return err } return nil } -func CallbackBeforeUpdateUserInfoEx(ctx context.Context, globalConfig *config.GlobalConfig, req *pbuser.UpdateUserInfoExReq) error { - if !globalConfig.Callback.CallbackBeforeUpdateUserInfoEx.Enable { +func CallbackBeforeUpdateUserInfoEx(ctx context.Context, callback *config.Webhooks, req *pbuser.UpdateUserInfoExReq) error { + if !callback.BeforeUpdateUserInfoEx.Enable { return nil } cbReq := &cbapi.CallbackBeforeUpdateUserInfoExReq{ @@ -70,7 +70,7 @@ func CallbackBeforeUpdateUserInfoEx(ctx context.Context, globalConfig *config.Gl Nickname: req.UserInfo.Nickname, } resp := &cbapi.CallbackBeforeUpdateUserInfoExResp{} - if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackBeforeUpdateUserInfoEx); err != nil { + if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.BeforeUpdateUserInfoEx); err != nil { return err } datautil.NotNilReplace(req.UserInfo.FaceURL, resp.FaceURL) @@ -78,8 +78,8 @@ func CallbackBeforeUpdateUserInfoEx(ctx context.Context, globalConfig *config.Gl datautil.NotNilReplace(req.UserInfo.Nickname, resp.Nickname) return nil } -func CallbackAfterUpdateUserInfoEx(ctx context.Context, globalConfig *config.GlobalConfig, req *pbuser.UpdateUserInfoExReq) error { - if !globalConfig.Callback.CallbackAfterUpdateUserInfoEx.Enable { +func CallbackAfterUpdateUserInfoEx(ctx context.Context, callback *config.Webhooks, req *pbuser.UpdateUserInfoExReq) error { + if !callback.AfterUpdateUserInfoEx.Enable { return nil } cbReq := &cbapi.CallbackAfterUpdateUserInfoExReq{ @@ -89,14 +89,14 @@ func CallbackAfterUpdateUserInfoEx(ctx context.Context, globalConfig *config.Glo Nickname: req.UserInfo.Nickname, } resp := &cbapi.CallbackAfterUpdateUserInfoExResp{} - if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackBeforeUpdateUserInfoEx); err != nil { + if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.AfterUpdateUserInfoEx); err != nil { return err } return nil } -func CallbackBeforeUserRegister(ctx context.Context, globalConfig *config.GlobalConfig, req *pbuser.UserRegisterReq) error { - if !globalConfig.Callback.CallbackBeforeUserRegister.Enable { +func CallbackBeforeUserRegister(ctx context.Context, callback *config.Webhooks, req *pbuser.UserRegisterReq) error { + if !callback.BeforeUserRegister.Enable { return nil } cbReq := &cbapi.CallbackBeforeUserRegisterReq{ @@ -106,7 +106,7 @@ func CallbackBeforeUserRegister(ctx context.Context, globalConfig *config.Global } resp := &cbapi.CallbackBeforeUserRegisterResp{} - if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackBeforeUpdateUserInfo); err != nil { + if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.BeforeUserRegister); err != nil { return err } if len(resp.Users) != 0 { @@ -115,8 +115,8 @@ func CallbackBeforeUserRegister(ctx context.Context, globalConfig *config.Global return nil } -func CallbackAfterUserRegister(ctx context.Context, globalConfig *config.GlobalConfig, req *pbuser.UserRegisterReq) error { - if !globalConfig.Callback.CallbackAfterUserRegister.Enable { +func CallbackAfterUserRegister(ctx context.Context, callback *config.Webhooks, req *pbuser.UserRegisterReq) error { + if !callback.AfterUserRegister.Enable { return nil } cbReq := &cbapi.CallbackAfterUserRegisterReq{ @@ -126,7 +126,7 @@ func CallbackAfterUserRegister(ctx context.Context, globalConfig *config.GlobalC } resp := &cbapi.CallbackAfterUserRegisterResp{} - if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackAfterUpdateUserInfo); err != nil { + if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.AfterUserRegister); err != nil { return err } return nil diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 32617b507..1c30df47a 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -16,13 +16,13 @@ package user import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/tools/db/redisutil" "math/rand" "strings" "time" "github.com/openimsdk/open-im-server/v3/pkg/authverify" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" @@ -51,24 +51,24 @@ type userServer struct { friendRpcClient *rpcclient.FriendRpcClient groupRpcClient *rpcclient.GroupRpcClient RegisterCenter registry.SvcDiscoveryRegistry - config *config.GlobalConfig + config *cmd.UserConfig } -func Start(ctx context.Context, config *config.GlobalConfig, client registry.SvcDiscoveryRegistry, server *grpc.Server) error { - mgocli, err := mongoutil.NewMongoDB(ctx, config.Mongo.Build()) +func Start(ctx context.Context, config *cmd.UserConfig, client registry.SvcDiscoveryRegistry, server *grpc.Server) error { + mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { return err } - rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build()) + rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) if err != nil { return err } users := make([]*tablerelation.UserModel, 0) - if len(config.IMAdmin.UserID) != len(config.IMAdmin.Nickname) { + if len(config.Share.IMAdmin.UserID) != len(config.Share.IMAdmin.Nickname) { return errs.New("the count of ImAdmin.UserID is not equal to the count of ImAdmin.Nickname").Wrap() } - for k, v := range config.IMAdmin.UserID { - users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.IMAdmin.Nickname[k], AppMangerLevel: constant.AppNotificationAdmin}) + for k, v := range config.Share.IMAdmin.UserID { + users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Share.IMAdmin.Nickname[k], AppMangerLevel: constant.AppNotificationAdmin}) } userDB, err := mgo.NewUserMongo(mgocli.GetDB()) if err != nil { @@ -77,15 +77,15 @@ func Start(ctx context.Context, config *config.GlobalConfig, client registry.Svc cache := cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt()) userMongoDB := mgo.NewUserMongoDriver(mgocli.GetDB()) database := controller.NewUserDatabase(userDB, cache, mgocli.GetTx(), userMongoDB) - friendRpcClient := rpcclient.NewFriendRpcClient(client, config.RpcRegisterName.OpenImFriendName) - groupRpcClient := rpcclient.NewGroupRpcClient(client, config.RpcRegisterName.OpenImGroupName) - msgRpcClient := rpcclient.NewMessageRpcClient(client, config.RpcRegisterName.OpenImMsgName) + friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Share.RpcRegisterName.Friend) + groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) + msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) u := &userServer{ db: database, RegisterCenter: client, friendRpcClient: &friendRpcClient, groupRpcClient: &groupRpcClient, - friendNotificationSender: notification.NewFriendNotificationSender(&config.Notification, &msgRpcClient, notification.WithDBFunc(database.FindWithError)), + friendNotificationSender: notification.NewFriendNotificationSender(&config.NotificationConfig, &msgRpcClient, notification.WithDBFunc(database.FindWithError)), userNotificationSender: notification.NewUserNotificationSender(config, &msgRpcClient, notification.WithUserFunc(database.FindWithError)), config: config, } @@ -105,11 +105,11 @@ func (s *userServer) GetDesignateUsers(ctx context.Context, req *pbuser.GetDesig func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInfoReq) (resp *pbuser.UpdateUserInfoResp, err error) { resp = &pbuser.UpdateUserInfoResp{} - err = authverify.CheckAccessV3(ctx, req.UserInfo.UserID, &s.config.Manager, &s.config.IMAdmin) + err = authverify.CheckAccessV3(ctx, req.UserInfo.UserID, &s.config.Share.IMAdmin) if err != nil { return nil, err } - if err := CallbackBeforeUpdateUserInfo(ctx, s.config, req); err != nil { + if err := CallbackBeforeUpdateUserInfo(ctx, &s.config.WebhooksConfig, req); err != nil { return nil, err } data := convert.UserPb2DBMap(req.UserInfo) @@ -129,7 +129,7 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI for _, friendID := range friends { s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID) } - if err = CallbackAfterUpdateUserInfo(ctx, s.config, req); err != nil { + if err = CallbackAfterUpdateUserInfo(ctx, &s.config.WebhooksConfig, req); err != nil { return nil, err } if err = s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil { @@ -139,12 +139,12 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI } func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUserInfoExReq) (resp *pbuser.UpdateUserInfoExResp, err error) { resp = &pbuser.UpdateUserInfoExResp{} - err = authverify.CheckAccessV3(ctx, req.UserInfo.UserID, &s.config.Manager, &s.config.IMAdmin) + err = authverify.CheckAccessV3(ctx, req.UserInfo.UserID, &s.config.Share.IMAdmin) if err != nil { return nil, err } - if err = CallbackBeforeUpdateUserInfoEx(ctx, s.config, req); err != nil { + if err = CallbackBeforeUpdateUserInfoEx(ctx, &s.config.WebhooksConfig, req); err != nil { return nil, err } data := convert.UserPb2DBMapEx(req.UserInfo) @@ -164,7 +164,7 @@ func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUse for _, friendID := range friends { s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID) } - if err := CallbackAfterUpdateUserInfoEx(ctx, s.config, req); err != nil { + if err := CallbackAfterUpdateUserInfoEx(ctx, &s.config.WebhooksConfig, req); err != nil { return nil, err } if err := s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil { @@ -191,7 +191,7 @@ func (s *userServer) AccountCheck(ctx context.Context, req *pbuser.AccountCheckR if datautil.Duplicate(req.CheckUserIDs) { return nil, errs.ErrArgs.WrapMsg("userID repeated") } - err = authverify.CheckAdmin(ctx, &s.config.Manager, &s.config.IMAdmin) + err = authverify.CheckAdmin(ctx, &s.config.Share.IMAdmin) if err != nil { return nil, err } @@ -238,8 +238,8 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR if len(req.Users) == 0 { return nil, errs.ErrArgs.WrapMsg("users is empty") } - if req.Secret != s.config.Secret { - log.ZDebug(ctx, "UserRegister", s.config.Secret, req.Secret) + if req.Secret != s.config.Share.Secret { + log.ZDebug(ctx, "UserRegister", s.config.Share.Secret, req.Secret) return nil, errs.ErrNoPermission.WrapMsg("secret invalid") } if datautil.DuplicateAny(req.Users, func(e *sdkws.UserInfo) string { return e.UserID }) { @@ -262,7 +262,7 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR if exist { return nil, servererrs.ErrRegisteredAlready.WrapMsg("userID registered already") } - if err := CallbackBeforeUserRegister(ctx, s.config, req); err != nil { + if err := CallbackBeforeUserRegister(ctx, &s.config.WebhooksConfig, req); err != nil { return nil, err } now := time.Now() @@ -282,7 +282,7 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR return nil, err } - if err := CallbackAfterUserRegister(ctx, s.config, req); err != nil { + if err := CallbackAfterUserRegister(ctx, &s.config.WebhooksConfig, req); err != nil { return nil, err } return resp, nil @@ -377,7 +377,7 @@ func (s *userServer) GetSubscribeUsersStatus(ctx context.Context, // ProcessUserCommandAdd user general function add. func (s *userServer) ProcessUserCommandAdd(ctx context.Context, req *pbuser.ProcessUserCommandAddReq) (*pbuser.ProcessUserCommandAddResp, error) { - err := authverify.CheckAccessV3(ctx, req.UserID, &s.config.Manager, &s.config.IMAdmin) + err := authverify.CheckAccessV3(ctx, req.UserID, &s.config.Share.IMAdmin) if err != nil { return nil, err } @@ -408,7 +408,7 @@ func (s *userServer) ProcessUserCommandAdd(ctx context.Context, req *pbuser.Proc // ProcessUserCommandDelete user general function delete. func (s *userServer) ProcessUserCommandDelete(ctx context.Context, req *pbuser.ProcessUserCommandDeleteReq) (*pbuser.ProcessUserCommandDeleteResp, error) { - err := authverify.CheckAccessV3(ctx, req.UserID, &s.config.Manager, &s.config.IMAdmin) + err := authverify.CheckAccessV3(ctx, req.UserID, &s.config.Share.IMAdmin) if err != nil { return nil, err } @@ -431,7 +431,7 @@ func (s *userServer) ProcessUserCommandDelete(ctx context.Context, req *pbuser.P // ProcessUserCommandUpdate user general function update. func (s *userServer) ProcessUserCommandUpdate(ctx context.Context, req *pbuser.ProcessUserCommandUpdateReq) (*pbuser.ProcessUserCommandUpdateResp, error) { - err := authverify.CheckAccessV3(ctx, req.UserID, &s.config.Manager, &s.config.IMAdmin) + err := authverify.CheckAccessV3(ctx, req.UserID, &s.config.Share.IMAdmin) if err != nil { return nil, err } @@ -463,7 +463,7 @@ func (s *userServer) ProcessUserCommandUpdate(ctx context.Context, req *pbuser.P func (s *userServer) ProcessUserCommandGet(ctx context.Context, req *pbuser.ProcessUserCommandGetReq) (*pbuser.ProcessUserCommandGetResp, error) { - err := authverify.CheckAccessV3(ctx, req.UserID, &s.config.Manager, &s.config.IMAdmin) + err := authverify.CheckAccessV3(ctx, req.UserID, &s.config.Share.IMAdmin) if err != nil { return nil, err } @@ -492,7 +492,7 @@ func (s *userServer) ProcessUserCommandGet(ctx context.Context, req *pbuser.Proc } func (s *userServer) ProcessUserCommandGetAll(ctx context.Context, req *pbuser.ProcessUserCommandGetAllReq) (*pbuser.ProcessUserCommandGetAllResp, error) { - err := authverify.CheckAccessV3(ctx, req.UserID, &s.config.Manager, &s.config.IMAdmin) + err := authverify.CheckAccessV3(ctx, req.UserID, &s.config.Share.IMAdmin) if err != nil { return nil, err } @@ -665,7 +665,7 @@ func (s *userServer) userModelToResp(users []*relation.UserModel, pagination pag accounts := make([]*pbuser.NotificationAccountInfo, 0) var total int64 for _, v := range users { - if v.AppMangerLevel == constant.AppNotificationAdmin && !datautil.Contain(v.UserID, s.config.IMAdmin.UserID...) { + if v.AppMangerLevel == constant.AppNotificationAdmin && !datautil.Contain(v.UserID, s.config.Share.IMAdmin.UserID...) { temp := &pbuser.NotificationAccountInfo{ UserID: v.UserID, FaceURL: v.FaceURL, diff --git a/internal/tools/msg.go b/internal/tools/msg.go index af9940754..f2bbc57d9 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -17,11 +17,11 @@ package tools import ( "context" "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/tools/db/redisutil" "math" "math/rand" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo" @@ -47,12 +47,12 @@ type MsgTool struct { userDatabase controller.UserDatabase groupDatabase controller.GroupDatabase msgNotificationSender *notification.MsgNotificationSender - Config *config.GlobalConfig + config *cmd.CronTaskConfig } func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controller.UserDatabase, groupDatabase controller.GroupDatabase, conversationDatabase controller.ConversationDatabase, - msgNotificationSender *notification.MsgNotificationSender, config *config.GlobalConfig, + msgNotificationSender *notification.MsgNotificationSender, config *cmd.CronTaskConfig, ) *MsgTool { return &MsgTool{ msgDatabase: msgDatabase, @@ -60,20 +60,20 @@ func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controlle groupDatabase: groupDatabase, conversationDatabase: conversationDatabase, msgNotificationSender: msgNotificationSender, - Config: config, + config: config, } } -func InitMsgTool(ctx context.Context, config *config.GlobalConfig) (*MsgTool, error) { - mgocli, err := mongoutil.NewMongoDB(ctx, config.Mongo.Build()) +func InitMsgTool(ctx context.Context, config *cmd.CronTaskConfig) (*MsgTool, error) { + mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { return nil, err } - rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build()) + rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) if err != nil { return nil, err } - discov, err := kdisc.NewDiscoveryRegister(config) + discov, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig) if err != nil { return nil, err } @@ -115,7 +115,7 @@ func InitMsgTool(ctx context.Context, config *config.GlobalConfig) (*MsgTool, er cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), mgocli.GetTx(), ) - msgRpcClient := rpcclient.NewMessageRpcClient(discov, config.RpcRegisterName.OpenImMsgName) + msgRpcClient := rpcclient.NewMessageRpcClient(discov, config.Share.RpcRegisterName.Msg) msgNotificationSender := notification.NewMsgNotificationSender(config, rpcclient.WithRpcClient(&msgRpcClient)) msgTool := NewMsgTool(msgDatabase, userDatabase, groupDatabase, conversationDatabase, msgNotificationSender, config) return msgTool, nil @@ -179,8 +179,9 @@ func (c *MsgTool) AllConversationClearMsgAndFixSeq() { func (c *MsgTool) ClearConversationsMsg(ctx context.Context, conversationIDs []string) { for _, conversationID := range conversationIDs { - if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, int64(c.Config.RetainChatRecords*24*60*60)); err != nil { - log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "conversationID", conversationID, "DBRetainChatRecords", c.Config.RetainChatRecords) + if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, int64(c.config.CronTask.RetainChatRecords*24*60*60)); err != nil { + log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "conversationID", + conversationID, "DBRetainChatRecords", c.config.CronTask.RetainChatRecords) } if err := c.checkMaxSeq(ctx, conversationID); err != nil { log.ZError(ctx, "fixSeq failed", err, "conversationID", conversationID) diff --git a/pkg/common/cmd/constant.go b/pkg/common/cmd/constant.go index 8e3403f61..61c179019 100644 --- a/pkg/common/cmd/constant.go +++ b/pkg/common/cmd/constant.go @@ -41,8 +41,8 @@ const ( LogConfigFileName = "log.yml" OpenIMAPICfgFileName = "openim-api.yml" OpenIMCronTaskCfgFileName = "openim-crontask.yml" - OpenIMMsgGatewayCfgFileName = "openim-msg-gateway.yml" - OpenIMMsgTransferCfgFileName = "openim-msg-transfer.yml" + OpenIMMsgGatewayCfgFileName = "openim-msggateway.yml" + OpenIMMsgTransferCfgFileName = "openim-msgtransfer.yml" OpenIMPushCfgFileName = "openim-push.yml" OpenIMRPCAuthCfgFileName = "openim-rpc-auth.yml" OpenIMRPCConversationCfgFileName = "openim-rpc-conversation.yml" @@ -65,12 +65,16 @@ const ( zoopkeeperEnvPrefix = "openim-zookeeper" apiEnvPrefix = "openim-api" cornTaskEnvPrefix = "openim-crontask" + msgGatewayEnvPrefix = "openim-msggateway" + msgTransferEnvPrefix = "openim-msgtransfer" + pushEnvPrefix = "openim-push" authEnvPrefix = "openim-auth" conversationEnvPrefix = "openim-conversation" friendEnvPrefix = "openim-friend" groupEnvPrefix = "openim-group" msgEnvPrefix = "openim-msg" thridEnvPrefix = "openim-third" + userEnvPrefix = "openim-user" ) const ( diff --git a/pkg/common/cmd/cron_task.go b/pkg/common/cmd/cron_task.go index 446fc6ab7..c0a6710ec 100644 --- a/pkg/common/cmd/cron_task.go +++ b/pkg/common/cmd/cron_task.go @@ -29,8 +29,12 @@ type CronTaskCmd struct { cronTaskConfig CronTaskConfig } type CronTaskConfig struct { - CronTask config.CronTask - RedisConfig config.Redis + CronTask config.CronTask + RedisConfig config.Redis + MongodbConfig config.Mongo + ZookeeperConfig config.ZooKeeper + Share config.Share + KafkaConfig config.Kafka } func NewCronTaskCmd() *CronTaskCmd { @@ -39,6 +43,10 @@ func NewCronTaskCmd() *CronTaskCmd { ret.configMap = map[string]StructEnvPrefix{ OpenIMCronTaskCfgFileName: {EnvPrefix: cornTaskEnvPrefix, ConfigStruct: &cronTaskConfig.CronTask}, RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &cronTaskConfig.RedisConfig}, + MongodbConfigFileName: {EnvPrefix: mongodbEnvPrefix, ConfigStruct: &cronTaskConfig.MongodbConfig}, + ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &cronTaskConfig.ZookeeperConfig}, + ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &cronTaskConfig.Share}, + KafkaConfigFileName: {EnvPrefix: kafkaEnvPrefix, ConfigStruct: &cronTaskConfig.KafkaConfig}, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) diff --git a/pkg/common/cmd/msg_gateway.go b/pkg/common/cmd/msg_gateway.go index cf2b57b2e..48c0328cc 100644 --- a/pkg/common/cmd/msg_gateway.go +++ b/pkg/common/cmd/msg_gateway.go @@ -42,11 +42,11 @@ func NewMsgGatewayCmd() *MsgGatewayCmd { var msgGatewayConfig MsgGatewayConfig ret := &MsgGatewayCmd{msgGatewayConfig: msgGatewayConfig} ret.configMap = map[string]StructEnvPrefix{ - OpenIMAPICfgFileName: {EnvPrefix: apiEnvPrefix, ConfigStruct: &msgGatewayConfig.MsgGateway}, - RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &msgGatewayConfig.RedisConfig}, - ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &msgGatewayConfig.ZookeeperConfig}, - ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &msgGatewayConfig.Share}, - WebhooksConfigFileName: {EnvPrefix: webhooksEnvPrefix, ConfigStruct: &msgGatewayConfig.WebhooksConfig}, + OpenIMMsgGatewayCfgFileName: {EnvPrefix: msgGatewayEnvPrefix, ConfigStruct: &msgGatewayConfig.MsgGateway}, + RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &msgGatewayConfig.RedisConfig}, + ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &msgGatewayConfig.ZookeeperConfig}, + ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &msgGatewayConfig.Share}, + WebhooksConfigFileName: {EnvPrefix: webhooksEnvPrefix, ConfigStruct: &msgGatewayConfig.WebhooksConfig}, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) diff --git a/pkg/common/cmd/msg_gateway_test.go b/pkg/common/cmd/msg_gateway_test.go index e2e75115b..5908877ed 100644 --- a/pkg/common/cmd/msg_gateway_test.go +++ b/pkg/common/cmd/msg_gateway_test.go @@ -15,11 +15,7 @@ package cmd import ( - "testing" - - "github.com/openimsdk/protocol/constant" "github.com/stretchr/testify/mock" - "gotest.tools/assert" ) // MockRootCmd is a mock type for the RootCmd type @@ -31,21 +27,3 @@ func (m *MockRootCmd) Execute() error { args := m.Called() return args.Error(0) } - -func TestMsgGatewayCmd_GetPortFromConfig(t *testing.T) { - msgGatewayCmd := &MsgGatewayCmd{RootCmd: &RootCmd{}} - tests := []struct { - portType string - want int - }{ - {constant.FlagWsPort, 8080}, // Replace 8080 with the expected port from the config - {constant.FlagPort, 8081}, // Replace 8081 with the expected port from the config - {"invalid", 0}, - } - for _, tt := range tests { - t.Run(tt.portType, func(t *testing.T) { - got := msgGatewayCmd.GetPortFromConfig(tt.portType) - assert.Equal(t, tt.want, got) - }) - } -} diff --git a/pkg/common/cmd/msg_transfer.go b/pkg/common/cmd/msg_transfer.go index 973fa8cad..990ffdcd3 100644 --- a/pkg/common/cmd/msg_transfer.go +++ b/pkg/common/cmd/msg_transfer.go @@ -16,55 +16,52 @@ package cmd import ( "context" - "github.com/openimsdk/open-im-server/v3/internal/msgtransfer" - config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/system/program" "github.com/spf13/cobra" ) type MsgTransferCmd struct { *RootCmd - ctx context.Context + ctx context.Context + configMap map[string]StructEnvPrefix + msgTransferConfig MsgTransferConfig } - -func NewMsgTransferCmd(name string) *MsgTransferCmd { - ret := &MsgTransferCmd{RootCmd: NewRootCmd(program.GetProcessName(), name)} - ret.ctx = context.WithValue(context.Background(), "version", config2.Version) - ret.addRunE() - ret.SetRootCmdPt(ret) - return ret +type MsgTransferConfig struct { + MsgTransfer config.MsgTransfer + RedisConfig config.Redis + MongodbConfig config.Mongo + KafkaConfig config.Kafka + ZookeeperConfig config.ZooKeeper + Share config.Share + WebhooksConfig config.Webhooks } -func (m *MsgTransferCmd) addRunE() { - m.Command.RunE = func(cmd *cobra.Command, args []string) error { - return msgtransfer.Start(m.ctx, m.config, m.getPrometheusPortFlag(cmd), m.getTransferProgressFlagValue()) +func NewMsgTransferCmd() *MsgTransferCmd { + var msgTransferConfig MsgTransferConfig + ret := &MsgTransferCmd{msgTransferConfig: msgTransferConfig} + ret.configMap = map[string]StructEnvPrefix{ + OpenIMMsgTransferCfgFileName: {EnvPrefix: msgTransferEnvPrefix, ConfigStruct: &msgTransferConfig.MsgTransfer}, + RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &msgTransferConfig.RedisConfig}, + MongodbConfigFileName: {EnvPrefix: mongodbEnvPrefix, ConfigStruct: &msgTransferConfig.MongodbConfig}, + KafkaConfigFileName: {EnvPrefix: kafkaEnvPrefix, ConfigStruct: &msgTransferConfig.KafkaConfig}, + ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &msgTransferConfig.ZookeeperConfig}, + ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &msgTransferConfig.Share}, + WebhooksConfigFileName: {EnvPrefix: webhooksEnvPrefix, ConfigStruct: &msgTransferConfig.WebhooksConfig}, } + ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) + ret.ctx = context.WithValue(context.Background(), "version", config.Version) + ret.Command.PreRunE = func(cmd *cobra.Command, args []string) error { + return ret.preRunE() + } + return ret } func (m *MsgTransferCmd) Exec() error { return m.Execute() } -func (m *MsgTransferCmd) GetPortFromConfig(portType string) int { - if portType == constant.FlagPort { - return 0 - } else if portType == constant.FlagPrometheusPort { - n := m.getTransferProgressFlagValue() - return m.config.Prometheus.MessageTransferPrometheusPort[n] - } - return 0 -} - -func (m *MsgTransferCmd) AddTransferProgressFlag() { - m.Command.Flags().IntP(constant.FlagTransferProgressIndex, "n", 0, "transfer progress index") -} - -func (m *MsgTransferCmd) getTransferProgressFlagValue() int { - nIndex, err := m.Command.Flags().GetInt(constant.FlagTransferProgressIndex) - if err != nil { - return 0 - } - return nIndex +func (m *MsgTransferCmd) preRunE() error { + return msgtransfer.Start(m.ctx, m.Index(), &m.msgTransferConfig) } diff --git a/pkg/common/cmd/msg_utils.go b/pkg/common/cmd/msg_utils.go index b5b76c7f6..504c5b896 100644 --- a/pkg/common/cmd/msg_utils.go +++ b/pkg/common/cmd/msg_utils.go @@ -138,7 +138,7 @@ func NewSeqCmd() *SeqCmd { func (s *SeqCmd) GetSeqCmd() *cobra.Command { s.Command.Run = func(cmdLines *cobra.Command, args []string) { - _, err := tools.InitMsgTool(context.Background(), s.MsgTool.Config) + _, err := tools.InitMsgTool(context.Background(), s.MsgTool.config) if err != nil { program.ExitWithError(err) } diff --git a/pkg/common/cmd/push.go b/pkg/common/cmd/push.go new file mode 100644 index 000000000..eab29fa1c --- /dev/null +++ b/pkg/common/cmd/push.go @@ -0,0 +1,72 @@ +// Copyright © 2023 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/internal/push" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" + "github.com/openimsdk/tools/system/program" + "github.com/spf13/cobra" +) + +type PushRpcCmd struct { + *RootCmd + ctx context.Context + configMap map[string]StructEnvPrefix + pushConfig PushConfig +} +type PushConfig struct { + RpcConfig config.Push + RedisConfig config.Redis + MongodbConfig config.Mongo + KafkaConfig config.Kafka + ZookeeperConfig config.ZooKeeper + NotificationConfig config.Notification + Share config.Share + WebhooksConfig config.Webhooks +} + +func NewPushRpcCmd() *PushRpcCmd { + var pushConfig PushConfig + ret := &PushRpcCmd{pushConfig: pushConfig} + ret.configMap = map[string]StructEnvPrefix{ + OpenIMPushCfgFileName: {EnvPrefix: pushEnvPrefix, ConfigStruct: &pushConfig.RpcConfig}, + RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &pushConfig.RedisConfig}, + ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &pushConfig.ZookeeperConfig}, + MongodbConfigFileName: {EnvPrefix: mongodbEnvPrefix, ConfigStruct: &pushConfig.MongodbConfig}, + KafkaConfigFileName: {EnvPrefix: kafkaEnvPrefix, ConfigStruct: &pushConfig.KafkaConfig}, + ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &pushConfig.Share}, + NotificationFileName: {EnvPrefix: notificationEnvPrefix, ConfigStruct: &pushConfig.NotificationConfig}, + WebhooksConfigFileName: {EnvPrefix: webhooksEnvPrefix, ConfigStruct: &pushConfig.WebhooksConfig}, + } + ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) + ret.ctx = context.WithValue(context.Background(), "version", config.Version) + ret.Command.PreRunE = func(cmd *cobra.Command, args []string) error { + return ret.preRunE() + } + return ret +} + +func (a *PushRpcCmd) Exec() error { + return a.Execute() +} + +func (a *PushRpcCmd) preRunE() error { + return startrpc.Start(a.ctx, &a.pushConfig.ZookeeperConfig, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, + a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.Ports, + a.Index(), a.pushConfig.Share.RpcRegisterName.Auth, &a.pushConfig, push.Start) +} diff --git a/pkg/common/cmd/user.go b/pkg/common/cmd/user.go new file mode 100644 index 000000000..eed5066c7 --- /dev/null +++ b/pkg/common/cmd/user.go @@ -0,0 +1,72 @@ +// Copyright © 2023 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/internal/rpc/user" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" + "github.com/openimsdk/tools/system/program" + "github.com/spf13/cobra" +) + +type UserRpcCmd struct { + *RootCmd + ctx context.Context + configMap map[string]StructEnvPrefix + userConfig UserConfig +} +type UserConfig struct { + RpcConfig config.User + RedisConfig config.Redis + MongodbConfig config.Mongo + KafkaConfig config.Kafka + ZookeeperConfig config.ZooKeeper + NotificationConfig config.Notification + Share config.Share + WebhooksConfig config.Webhooks +} + +func NewUserRpcCmd() *UserRpcCmd { + var userConfig UserConfig + ret := &UserRpcCmd{userConfig: userConfig} + ret.configMap = map[string]StructEnvPrefix{ + OpenIMRPCUserCfgFileName: {EnvPrefix: userEnvPrefix, ConfigStruct: &userConfig.RpcConfig}, + RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &userConfig.RedisConfig}, + ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &userConfig.ZookeeperConfig}, + MongodbConfigFileName: {EnvPrefix: mongodbEnvPrefix, ConfigStruct: &userConfig.MongodbConfig}, + KafkaConfigFileName: {EnvPrefix: kafkaEnvPrefix, ConfigStruct: &userConfig.KafkaConfig}, + ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &userConfig.Share}, + NotificationFileName: {EnvPrefix: notificationEnvPrefix, ConfigStruct: &userConfig.NotificationConfig}, + WebhooksConfigFileName: {EnvPrefix: webhooksEnvPrefix, ConfigStruct: &userConfig.WebhooksConfig}, + } + ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) + ret.ctx = context.WithValue(context.Background(), "version", config.Version) + ret.Command.PreRunE = func(cmd *cobra.Command, args []string) error { + return ret.preRunE() + } + return ret +} + +func (a *UserRpcCmd) Exec() error { + return a.Execute() +} + +func (a *UserRpcCmd) preRunE() error { + return startrpc.Start(a.ctx, &a.userConfig.ZookeeperConfig, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, + a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.Ports, + a.Index(), a.userConfig.Share.RpcRegisterName.Auth, &a.userConfig, user.Start) +} diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index b5e7751d8..daf897aad 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -18,6 +18,7 @@ import ( "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/mq/kafka" + "github.com/openimsdk/tools/s3/minio" "time" ) @@ -68,15 +69,26 @@ type Mongo struct { MaxRetry int `mapstructure:"maxRetry"` } type Kafka struct { - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - Address []string `mapstructure:"address"` - ToRedisTopic string `mapstructure:"toRedisTopic"` - ToMongoTopic string `mapstructure:"toMongoTopic"` - ToPushTopic string `mapstructure:"toPushTopic"` - ToRedisGroupID string `mapstructure:"toRedisGroupID"` - ToMongoGroupID string `mapstructure:"toMongoGroupID"` - ToPushGroupID string `mapstructure:"toPushGroupID"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` + ProducerAck string `mapstructure:"producerAck"` + CompressType string `mapstructure:"compressType"` + Address []string `mapstructure:"address"` + ToRedisTopic string `mapstructure:"toRedisTopic"` + ToMongoTopic string `mapstructure:"toMongoTopic"` + ToPushTopic string `mapstructure:"toPushTopic"` + ToRedisGroupID string `mapstructure:"toRedisGroupID"` + ToMongoGroupID string `mapstructure:"toMongoGroupID"` + ToPushGroupID string `mapstructure:"toPushGroupID"` + Tls TLSConfig `mapstructure:"tls"` +} +type TLSConfig struct { + EnableTLS bool `mapstructure:"enableTLS"` + CACrt string `mapstructure:"caCrt"` + ClientCrt string `mapstructure:"clientCrt"` + ClientKey string `mapstructure:"clientKey"` + ClientKeyPwd string `mapstructure:"clientKeyPwd"` + InsecureSkipVerify bool `mapstructure:"insecureSkipVerify"` } type API struct { @@ -178,9 +190,10 @@ type Push struct { ListenIP string `mapstructure:"listenIP"` Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` - Prometheus Prometheus `mapstructure:"prometheus"` - Enable string `mapstructure:"enable"` - GeTui struct { + Prometheus Prometheus `mapstructure:"prometheus"` + MaxConcurrentWorkers int `mapstructure:"maxConcurrentWorkers"` + Enable string `mapstructure:"enable"` + GeTui struct { PushUrl string `mapstructure:"pushUrl"` MasterSecret string `mapstructure:"masterSecret"` AppKey string `mapstructure:"appKey"` @@ -214,7 +227,6 @@ type Auth struct { TokenPolicy struct { Expire int64 `mapstructure:"expire"` } `mapstructure:"tokenPolicy"` - Secret string `mapstructure:"secret"` } type Conversation struct { @@ -311,12 +323,13 @@ type User struct { } type Redis struct { - Address []string `mapstructure:"address"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - ClusterMode bool `mapstructure:"clusterMode"` - DB int `mapstructure:"db"` - MaxRetry int `mapstructure:"MaxRetry"` + Address []string `mapstructure:"address"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` + EnablePipeline bool `mapstructure:"enablePipeline"` + ClusterMode bool `mapstructure:"clusterMode"` + DB int `mapstructure:"db"` + MaxRetry int `mapstructure:"MaxRetry"` } type WebhookConfig struct { @@ -326,6 +339,7 @@ type WebhookConfig struct { } type Share struct { + Secret string `mapstructure:"secret"` Env string `mapstructure:"env"` RpcRegisterName RpcRegisterName `mapstructure:"rpcRegisterName"` IMAdmin IMAdmin `mapstructure:"imAdmin"` @@ -369,6 +383,7 @@ type Webhooks struct { AfterSendSingleMsg WebhookConfig `mapstructure:"afterSendSingleMsg"` BeforeSendGroupMsg WebhookConfig `mapstructure:"beforeSendGroupMsg"` AfterSendGroupMsg WebhookConfig `mapstructure:"afterSendGroupMsg"` + BeforeMsgModify WebhookConfig `mapstructure:"beforeMsgModify"` AfterUserOnline WebhookConfig `mapstructure:"afterUserOnline"` AfterUserOffline WebhookConfig `mapstructure:"afterUserOffline"` AfterUserKickOff WebhookConfig `mapstructure:"afterUserKickOff"` @@ -377,6 +392,7 @@ type Webhooks struct { BeforeGroupOnlinePush WebhookConfig `mapstructure:"beforeGroupOnlinePush"` BeforeAddFriend WebhookConfig `mapstructure:"beforeAddFriend"` BeforeUpdateUserInfo WebhookConfig `mapstructure:"beforeUpdateUserInfo"` + AfterUpdateUserInfo WebhookConfig `mapstructure:"afterUpdateUserInfo"` BeforeCreateGroup WebhookConfig `mapstructure:"beforeCreateGroup"` AfterCreateGroup WebhookConfig `mapstructure:"afterCreateGroup"` BeforeMemberJoinGroup WebhookConfig `mapstructure:"beforeMemberJoinGroup"` @@ -439,7 +455,33 @@ func (r *Redis) Build() *redisutil.Config { } func (k *Kafka) Build() *kafka.Config { - return &kafka.Config{} + return &kafka.Config{ + Username: k.Username, + Password: k.Password, + ProducerAck: k.ProducerAck, + CompressType: k.CompressType, + Addr: k.Address, + TLS: kafka.TLSConfig{ + EnableTLS: k.Tls.EnableTLS, + CACrt: k.Tls.CACrt, + ClientCrt: k.Tls.ClientCrt, + ClientKey: k.Tls.ClientKey, + ClientKeyPwd: k.Tls.ClientKeyPwd, + InsecureSkipVerify: k.Tls.InsecureSkipVerify, + }, + } +} +func (m *Minio) Build() *minio.Config { + return &minio.Config{ + Bucket: m.Bucket, + Endpoint: "", + AccessKeyID: m.AccessKeyID, + SecretAccessKey: m.SecretAccessKey, + SessionToken: m.SessionToken, + SignEndpoint: "", + PublicRead: m.PublicRead, + } + } func (l *CacheConfig) Failed() time.Duration { diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 6a4e70ea6..e51597057 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -17,6 +17,7 @@ package controller import ( "context" "encoding/json" + "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "time" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -106,19 +107,19 @@ type CommonMsgDatabase interface { } func NewCommonMsgDatabase(msgDocModel relation.MsgDocModelInterface, msg cache.MsgCache, seq cache.SeqCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { - conf, err := kafka.BuildProducerConfig(kafkaConf.Config) + conf, err := kafka.BuildProducerConfig(*kafkaConf.Build()) if err != nil { return nil, err } - producerToRedis, err := kafka.NewKafkaProducer(conf, kafkaConf.Config.Addr, kafkaConf.LatestMsgToRedis.Topic) + producerToRedis, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToRedisTopic) if err != nil { return nil, err } - producerToMongo, err := kafka.NewKafkaProducer(conf, kafkaConf.Config.Addr, kafkaConf.MsgToMongo.Topic) + producerToMongo, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToMongoTopic) if err != nil { return nil, err } - producerToPush, err := kafka.NewKafkaProducer(conf, kafkaConf.Config.Addr, kafkaConf.MsgToPush.Topic) + producerToPush, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToPushTopic) if err != nil { return nil, err } @@ -132,14 +133,15 @@ func NewCommonMsgDatabase(msgDocModel relation.MsgDocModelInterface, msg cache.M }, nil } -func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, config *config.GlobalConfig) (CommonMsgDatabase, error) { +func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, config *cmd.CronTaskConfig) (CommonMsgDatabase, error) { msgDocModel, err := mgo.NewMsgMongo(database) if err != nil { return nil, err } - msg := cache.NewMsgCache(rdb, config.MsgCacheTimeout, config.Redis.EnablePipeline) + //todo MsgCacheTimeout + msg := cache.NewMsgCache(rdb, 86400, config.RedisConfig.EnablePipeline) seq := cache.NewSeqCache(rdb) - return NewCommonMsgDatabase(msgDocModel, msg, seq, config.Kafka) + return NewCommonMsgDatabase(msgDocModel, msg, seq, &config.KafkaConfig) } type commonMsgDatabase struct { diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index 25077a387..7da7c5f33 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -16,6 +16,7 @@ package discoveryregister import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/direct" "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/errs" @@ -26,7 +27,7 @@ const ( zookeeper = "zoopkeeper" kubenetes = "k8s" - direct = "direct" + directT = "direct" ) // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.