refactor: all module update.

pull/2148/head
Gordon 1 year ago
parent a383abe469
commit 97e53a492b

@ -20,10 +20,7 @@ import (
)
func main() {
msgTransferCmd := cmd.NewMsgTransferCmd(cmd.MsgTransferServer)
msgTransferCmd.AddPrometheusPortFlag()
msgTransferCmd.AddTransferProgressFlag()
if err := msgTransferCmd.Exec(); err != nil {
if err := cmd.NewMsgTransferCmd().Exec(); err != nil {
program.ExitWithError(err)
}
}

@ -15,16 +15,12 @@
package main
import (
"github.com/openimsdk/open-im-server/v3/internal/push"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/tools/system/program"
)
func main() {
pushCmd := cmd.NewRpcCmd(cmd.RpcPushServer, push.Start)
pushCmd.AddPortFlag()
pushCmd.AddPrometheusPortFlag()
if err := pushCmd.Exec(); err != nil {
if err := cmd.NewPushRpcCmd().Exec(); err != nil {
program.ExitWithError(err)
}
}

@ -15,16 +15,12 @@
package main
import (
"github.com/openimsdk/open-im-server/v3/internal/rpc/user"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/tools/system/program"
)
func main() {
rpcCmd := cmd.NewRpcCmd(cmd.RpcUserServer, user.Start)
rpcCmd.AddPortFlag()
rpcCmd.AddPrometheusPortFlag()
if err := rpcCmd.Exec(); err != nil {
if err := cmd.NewUserRpcCmd().Exec(); err != nil {
program.ExitWithError(err)
}
}

@ -1,13 +1,18 @@
username: ''
password: ''
producerAck: ""
compressType: "none"
addr: [ localhost:19094 ]
toRedis:
topic: "toRedis"
toMongo:
topic: "toMongo"
toPush:
topic: "toPush"
consumerGroupID:
msgToRedis: redis
msgToMongo: mongo
msgToPush: push
toRedisTopic: "toRedis"
toMongoTopic: "toMongo"
toPushTopic: "toPush"
toRedisGroupID: redis
toMongoGroupID: mongo
toPushGroupID: push
tls:
enableTLS: false
caCrt: ""
clientCrt: ""
clientKey: ""
clientKeyPwd: ""
insecureSkipVerify: false

@ -7,7 +7,7 @@ prometheus:
enable: true
ports: [ 20107 ]
maxConcurrentWorkers: 3
enable: getui
geTui:
pushUrl: "https://restapi.getui.com/v2/$appId"

@ -10,4 +10,3 @@ prometheus:
tokenPolicy:
expire: 90
secret: openIM123

@ -2,6 +2,7 @@ redis:
address: [ localhost:16379 ]
username: ''
password: openIM123
enablePipeline: false
clusterMode: false
db: 0
MaxRetry: 10

@ -1,3 +1,4 @@
secret: openIM123
env: zookeeper
rpcRegisterName:
user: User

@ -19,6 +19,10 @@ beforeSendGroupMsg:
enable: false
timeout: 5
failedContinue: true
beforeMsgModify:
enable: false
timeout: 5
failedContinue: true
afterSendGroupMsg:
enable: false
timeout: 5
@ -55,6 +59,10 @@ beforeUpdateUserInfo:
enable: false
timeout: 5
failedContinue: true
afterUpdateUserInfo:
enable: false
timeout: 5
failedContinue: true
beforeCreateGroup:
enable: false
timeout: 5

@ -33,7 +33,6 @@ import (
"github.com/gin-gonic/gin/binding"
"github.com/go-playground/validator/v10"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
@ -312,11 +311,12 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, rdb redis.UniversalClie
return r
}
func GinParseToken(rdb redis.UniversalClient, config *config.GlobalConfig) gin.HandlerFunc {
func GinParseToken(rdb redis.UniversalClient, config *cmd.ApiConfig) gin.HandlerFunc {
//todo TokenPolicy
dataBase := controller.NewAuthDatabase(
cache.NewTokenCacheModel(rdb),
config.Secret,
config.TokenPolicy.Expire,
config.Share.Secret,
0,
)
return func(c *gin.Context) {
switch c.Request.Method {
@ -328,7 +328,7 @@ func GinParseToken(rdb redis.UniversalClient, config *config.GlobalConfig) gin.H
c.Abort()
return
}
claims, err := tokenverify.GetClaimFromToken(token, authverify.Secret(config.Secret))
claims, err := tokenverify.GetClaimFromToken(token, authverify.Secret(config.Share.Secret))
if err != nil {
log.ZWarn(c, "jwt get token error", errs.ErrTokenUnknown.Wrap())
apiresp.GinError(c, servererrs.ErrTokenUnknown.Wrap())

@ -17,8 +17,10 @@ package msgtransfer
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/utils/datautil"
"net/http"
"os"
"os/signal"
@ -54,46 +56,44 @@ type MsgTransfer struct {
cancel context.CancelFunc
}
func Start(ctx context.Context, config *config.GlobalConfig, prometheusPort, index int) error {
log.CInfo(ctx, "MSG-TRANSFER server is initializing", "prometheusPort", prometheusPort, "index", index)
mgocli, err := mongoutil.NewMongoDB(ctx, config.Mongo.Build())
func Start(ctx context.Context, index int, config *cmd.MsgTransferConfig) error {
log.CInfo(ctx, "MSG-TRANSFER server is initializing", "prometheusPorts", config.MsgTransfer.Prometheus.Ports, "index", index)
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil {
return err
}
rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build())
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
if err != nil {
return err
}
if err != nil {
return err
}
client, err := kdisc.NewDiscoveryRegister(config)
client, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig)
if err != nil {
return err
}
if err := client.CreateRpcRootNodes(config.GetServiceNames()); err != nil {
if err := client.CreateRpcRootNodes(config.Share.RpcRegisterName.GetServiceNames()); err != nil {
return err
}
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
msgModel := cache.NewMsgCache(rdb, config.MsgCacheTimeout, &config.Redis)
//todo MsgCacheTimeout
msgModel := cache.NewMsgCache(rdb, 86400, config.RedisConfig.EnablePipeline)
seqModel := cache.NewSeqCache(rdb)
msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB())
if err != nil {
return err
}
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, &config.Kafka)
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqModel, &config.KafkaConfig)
if err != nil {
return err
}
conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.RpcRegisterName.OpenImConversationName)
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.RpcRegisterName.OpenImGroupName)
msgTransfer, err := NewMsgTransfer(&config.Kafka, msgDatabase, &conversationRpcClient, &groupRpcClient)
conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation)
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group)
msgTransfer, err := NewMsgTransfer(&config.KafkaConfig, msgDatabase, &conversationRpcClient, &groupRpcClient)
if err != nil {
return err
}
return msgTransfer.Start(prometheusPort, config, index)
return msgTransfer.Start(index, config)
}
func NewMsgTransfer(kafkaConf *config.Kafka, msgDatabase controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*MsgTransfer, error) {
@ -112,11 +112,11 @@ func NewMsgTransfer(kafkaConf *config.Kafka, msgDatabase controller.CommonMsgDat
}, nil
}
func (m *MsgTransfer) Start(prometheusPort int, config *config.GlobalConfig, index int) error {
if prometheusPort <= 0 {
return errs.New("invalid prometheus port", "prometheusPort", prometheusPort)
func (m *MsgTransfer) Start(index int, config *cmd.MsgTransferConfig) error {
prometheusPort, err := datautil.GetElemByIndex(config.MsgTransfer.Prometheus.Ports, index)
if err != nil {
return err
}
m.ctx, m.cancel = context.WithCancel(context.Background())
var (
@ -127,13 +127,13 @@ func (m *MsgTransfer) Start(prometheusPort int, config *config.GlobalConfig, ind
go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH)
go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH)
if config.Prometheus.Enable {
if config.MsgTransfer.Prometheus.Enable {
go func() {
proreg := prometheus.NewRegistry()
proreg.MustRegister(
collectors.NewGoCollector(),
)
proreg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer", &config.RpcRegisterName)...)
proreg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer", &config.ZookeeperConfig)...)
http.Handle("/metrics", promhttp.HandlerFor(proreg, promhttp.HandlerOpts{Registry: proreg}))
err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil)
if err != nil && err != http.ErrServerClosed {

@ -81,8 +81,9 @@ type OnlineHistoryRedisConsumerHandler struct {
groupRpcClient *rpcclient.GroupRpcClient
}
func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*OnlineHistoryRedisConsumerHandler, error) {
historyConsumerGroup, err := kafka.NewMConsumerGroup(&kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToRedis, []string{kafkaConf.LatestMsgToRedis.Topic})
func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase,
conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*OnlineHistoryRedisConsumerHandler, error) {
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic})
if err != nil {
return nil, err
}

@ -33,7 +33,7 @@ type OnlineHistoryMongoConsumerHandler struct {
}
func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) {
historyConsumerGroup, err := kafka.NewMConsumerGroup(&kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToMongo, []string{kafkaConf.MsgToMongo.Topic})
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic})
if err != nil {
return nil, err
}

@ -26,8 +26,8 @@ import (
"github.com/openimsdk/tools/utils/datautil"
)
func callbackOfflinePush(ctx context.Context, callback *config.Callback, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error {
if !callback.CallbackOfflinePush.Enable || msg.ContentType == constant.Typing {
func callbackOfflinePush(ctx context.Context, callback *config.Webhooks, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error {
if !callback.BeforeOfflinePush.Enable || msg.ContentType == constant.Typing {
return nil
}
req := &callbackstruct.CallbackBeforePushReq{
@ -51,7 +51,7 @@ func callbackOfflinePush(ctx context.Context, callback *config.Callback, userIDs
}
resp := &callbackstruct.CallbackBeforePushResp{}
if err := http.CallBackPostReturn(ctx, callback.CallbackUrl, req, resp, callback.CallbackOfflinePush); err != nil {
if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.BeforeOfflinePush); err != nil {
return err
}
@ -64,8 +64,8 @@ func callbackOfflinePush(ctx context.Context, callback *config.Callback, userIDs
return nil
}
func callbackOnlinePush(ctx context.Context, callback *config.Callback, userIDs []string, msg *sdkws.MsgData) error {
if !callback.CallbackOnlinePush.Enable || datautil.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing {
func callbackOnlinePush(ctx context.Context, callback *config.Webhooks, userIDs []string, msg *sdkws.MsgData) error {
if !callback.BeforeOnlinePush.Enable || datautil.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing {
return nil
}
req := callbackstruct.CallbackBeforePushReq{
@ -87,7 +87,7 @@ func callbackOnlinePush(ctx context.Context, callback *config.Callback, userIDs
Content: GetContent(msg),
}
resp := &callbackstruct.CallbackBeforePushResp{}
if err := http.CallBackPostReturn(ctx, callback.CallbackUrl, req, resp, callback.CallbackOnlinePush); err != nil {
if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.BeforeOnlinePush); err != nil {
return err
}
return nil
@ -95,12 +95,12 @@ func callbackOnlinePush(ctx context.Context, callback *config.Callback, userIDs
func callbackBeforeSuperGroupOnlinePush(
ctx context.Context,
callback *config.Callback,
callback *config.Webhooks,
groupID string,
msg *sdkws.MsgData,
pushToUserIDs *[]string,
) error {
if !callback.CallbackBeforeSuperGroupOnlinePush.Enable || msg.ContentType == constant.Typing {
if !callback.BeforeGroupOnlinePush.Enable || msg.ContentType == constant.Typing {
return nil
}
req := callbackstruct.CallbackBeforeSuperGroupOnlinePushReq{
@ -120,7 +120,7 @@ func callbackBeforeSuperGroupOnlinePush(
Seq: msg.Seq,
}
resp := &callbackstruct.CallbackBeforeSuperGroupOnlinePushResp{}
if err := http.CallBackPostReturn(ctx, callback.CallbackUrl, req, resp, callback.CallbackBeforeSuperGroupOnlinePush); err != nil {
if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.BeforeGroupOnlinePush); err != nil {
return err
}

@ -45,7 +45,7 @@ func NewClient(pushConf *config.Push, cache cache.ThirdCache) (*Fcm, error) {
if err != nil {
return nil, err
}
credentialsFilePath := filepath.Join(projectRoot, "config", pushConf.Fcm.ServiceAccount)
credentialsFilePath := filepath.Join(projectRoot, "config", pushConf.FCM.ServiceAccount)
opt := option.WithCredentialsFile(credentialsFilePath)
fcmApp, err := firebase.NewApp(context.Background(), nil, opt)
if err != nil {

@ -51,14 +51,14 @@ const (
)
type Client struct {
cache cache.MsgModel
cache cache.ThirdCache
tokenExpireTime int64
taskIDTTL int64
pushConf *config.Push
httpClient *httputil.HTTPClient
}
func NewClient(pushConf *config.Push, cache cache.MsgModel) *Client {
func NewClient(pushConf *config.Push, cache cache.ThirdCache) *Client {
return &Client{cache: cache,
tokenExpireTime: tokenExpireTime,
taskIDTTL: taskIDTTL,

@ -57,7 +57,7 @@ func (n *Notification) SetExtras(extras Extras) {
}
func (n *Notification) SetAndroidIntent(pushConf *config.Push) {
n.Android.Intent.URL = pushConf.Jpns.PushIntent
n.Android.Intent.URL = pushConf.JPNS.PushIntent
}
func (n *Notification) IOSEnableMutableContent() {

@ -27,13 +27,11 @@ import (
type JPush struct {
pushConf *config.Push
iOSPushConf *config.IOSPush
httpClient *httputil.HTTPClient
}
func NewClient(pushConf *config.Push, iOSPushConf *config.IOSPush) *JPush {
func NewClient(pushConf *config.Push) *JPush {
return &JPush{pushConf: pushConf,
iOSPushConf: iOSPushConf,
httpClient: httputil.NewHTTPClient(httputil.NewClientConfig()),
}
}
@ -71,7 +69,7 @@ func (j *JPush) Push(ctx context.Context, userIDs []string, title, content strin
var msg body.Message
msg.SetMsgContent(content)
var opt body.Options
opt.SetApnsProduction(j.iOSPushConf.Production)
opt.SetApnsProduction(j.pushConf.IOSPush.Production)
var pushObj body.PushObj
pushObj.SetPlatform(&pf)
pushObj.SetAudience(&au)
@ -85,9 +83,9 @@ func (j *JPush) Push(ctx context.Context, userIDs []string, title, content strin
func (j *JPush) request(ctx context.Context, po body.PushObj, resp any, timeout int) error {
return j.httpClient.PostReturn(
ctx,
j.pushConf.Jpns.PushUrl,
j.pushConf.JPNS.PushURL,
map[string]string{
"Authorization": j.getAuthorization(j.pushConf.Jpns.AppKey, j.pushConf.Jpns.MasterSecret),
"Authorization": j.getAuthorization(j.pushConf.JPNS.AppKey, j.pushConf.JPNS.MasterSecret),
},
po,
resp,

@ -35,7 +35,7 @@ type ConsumerHandler struct {
}
func NewConsumerHandler(kafkaConf *config.Kafka, pusher *Pusher) (*ConsumerHandler, error) {
pushConsumerGroup, err := kafka.NewMConsumerGroup(&kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToPush, []string{kafkaConf.MsgToPush.Topic})
pushConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToPushGroupID, []string{kafkaConf.ToPushTopic})
if err != nil {
return nil, err
}

@ -16,9 +16,9 @@ package push
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
@ -35,20 +35,20 @@ type pushServer struct {
pusher *Pusher
}
func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build())
func Start(ctx context.Context, config *cmd.PushConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
if err != nil {
return err
}
cacheModel := cache.NewThirdCache(rdb)
offlinePusher, err := NewOfflinePusher(&config.Push, &config.IOSPush, cacheModel)
offlinePusher, err := NewOfflinePusher(&config.RpcConfig, cacheModel)
if err != nil {
return err
}
database := controller.NewPushDatabase(cacheModel)
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.RpcRegisterName.OpenImGroupName)
conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.RpcRegisterName.OpenImConversationName)
msgRpcClient := rpcclient.NewMessageRpcClient(client, config.RpcRegisterName.OpenImMsgName)
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group)
conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation)
msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg)
pusher := NewPusher(
config,
client,
@ -65,7 +65,7 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discovery.Sv
pusher: pusher,
})
consumer, err := NewConsumer(&config.Kafka, pusher)
consumer, err := NewConsumer(&config.KafkaConfig, pusher)
if err != nil {
return err
}

@ -17,6 +17,7 @@ package push
import (
"context"
"encoding/json"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/tools/errs"
"sync"
@ -48,7 +49,7 @@ import (
)
type Pusher struct {
config *config.GlobalConfig
config *cmd.PushConfig
database controller.PushDatabase
discov discovery.SvcDiscoveryRegistry
offlinePusher offlinepush.OfflinePusher
@ -61,7 +62,7 @@ type Pusher struct {
var errNoOfflinePusher = errs.New("no offlinePusher is configured")
func NewPusher(config *config.GlobalConfig, discov discovery.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase,
func NewPusher(config *cmd.PushConfig, discov discovery.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase,
groupLocalCache *rpccache.GroupLocalCache, conversationLocalCache *rpccache.ConversationLocalCache,
conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient,
) *Pusher {
@ -78,7 +79,7 @@ func NewPusher(config *config.GlobalConfig, discov discovery.SvcDiscoveryRegistr
}
}
func NewOfflinePusher(pushConf *config.Push, iOSPushConf *config.IOSPush, cache cache.ThirdCache) (offlinepush.OfflinePusher, error) {
func NewOfflinePusher(pushConf *config.Push, cache cache.ThirdCache) (offlinepush.OfflinePusher, error) {
var offlinePusher offlinepush.OfflinePusher
switch pushConf.Enable {
case "getui":
@ -86,7 +87,7 @@ func NewOfflinePusher(pushConf *config.Push, iOSPushConf *config.IOSPush, cache
case "fcm":
return fcm.NewClient(pushConf, cache)
case "jpush":
offlinePusher = jpush.NewClient(pushConf, iOSPushConf)
offlinePusher = jpush.NewClient(pushConf)
default:
offlinePusher = dummy.NewClient()
}
@ -104,7 +105,7 @@ func (p *Pusher) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID
func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error {
log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String())
if err := callbackOnlinePush(ctx, &p.config.Callback, userIDs, msg); err != nil {
if err := callbackOnlinePush(ctx, &p.config.WebhooksConfig, userIDs, msg); err != nil {
return err
}
// push
@ -132,7 +133,7 @@ func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.Msg
})
if len(offlinePushUserIDList) > 0 {
if err = callbackOfflinePush(ctx, &p.config.Callback, offlinePushUserIDList, msg, &[]string{}); err != nil {
if err = callbackOfflinePush(ctx, &p.config.WebhooksConfig, offlinePushUserIDList, msg, &[]string{}); err != nil {
return err
}
err = p.offlinePushMsg(ctx, msg.SendID, msg, offlinePushUserIDList)
@ -165,7 +166,7 @@ func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string,
}
if len(needOfflinePushUserIDs) > 0 {
var offlinePushUserIDs []string
err := callbackOfflinePush(ctx, &p.config.Callback, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
err := callbackOfflinePush(ctx, &p.config.WebhooksConfig, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
if err != nil {
return err
}
@ -196,7 +197,7 @@ func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string,
func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
var pushToUserIDs []string
if err = callbackBeforeSuperGroupOnlinePush(ctx, &p.config.Callback, groupID, msg, &pushToUserIDs); err != nil {
if err = callbackBeforeSuperGroupOnlinePush(ctx, &p.config.WebhooksConfig, groupID, msg, &pushToUserIDs); err != nil {
return err
}
@ -238,11 +239,8 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
return err
}
log.ZDebug(ctx, "GroupDismissedNotificationInfo****", "groupID", groupID, "num", len(pushToUserIDs), "list", pushToUserIDs)
if len(p.config.Manager.UserID) > 0 {
ctx = mcontext.WithOpUserIDContext(ctx, p.config.Manager.UserID[0])
}
if len(p.config.Manager.UserID) == 0 && len(p.config.IMAdmin.UserID) > 0 {
ctx = mcontext.WithOpUserIDContext(ctx, p.config.IMAdmin.UserID[0])
if len(p.config.Share.IMAdmin.UserID) > 0 {
ctx = mcontext.WithOpUserIDContext(ctx, p.config.Share.IMAdmin.UserID[0])
}
defer func(groupID string) {
if err = p.groupRpcClient.DismissGroup(ctx, groupID); err != nil {
@ -260,10 +258,10 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg)
isOfflinePush := datautil.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
if isOfflinePush && p.config.Envs.Discovery == "k8s" {
if isOfflinePush && p.config.Share.Env == "k8s" {
return p.k8sOfflinePush2SuperGroup(ctx, groupID, msg, wsResults)
}
if isOfflinePush && p.config.Envs.Discovery == "zookeeper" {
if isOfflinePush && p.config.Share.Env == "zookeeper" {
var (
onlineSuccessUserIDs = []string{msg.SendID}
webAndPcBackgroundUserIDs []string
@ -301,7 +299,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
// Use offline push messaging
if len(needOfflinePushUserIDs) > 0 {
var offlinePushUserIDs []string
err = callbackOfflinePush(ctx, &p.config.Callback, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
err = callbackOfflinePush(ctx, &p.config.WebhooksConfig, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
if err != nil {
return err
}
@ -360,7 +358,7 @@ func (p *Pusher) k8sOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUs
var (
mu sync.Mutex
wg = errgroup.Group{}
maxWorkers = p.config.Push.MaxConcurrentWorkers
maxWorkers = p.config.RpcConfig.MaxConcurrentWorkers
)
if maxWorkers < 3 {
maxWorkers = 3
@ -389,10 +387,10 @@ func (p *Pusher) k8sOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUs
return wsResults, nil
}
func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
if p.config.Envs.Discovery == "k8s" {
if p.config.Share.Env == "k8s" {
return p.k8sOnlinePush(ctx, msg, pushToUserIDs)
}
conns, err := p.discov.GetConns(ctx, p.config.RpcRegisterName.OpenImMessageGatewayName)
conns, err := p.discov.GetConns(ctx, p.config.Share.RpcRegisterName.MessageGateway)
log.ZDebug(ctx, "get gateway conn", "conn length", len(conns))
if err != nil {
return nil, err
@ -402,7 +400,7 @@ func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData,
mu sync.Mutex
wg = errgroup.Group{}
input = &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs}
maxWorkers = p.config.Push.MaxConcurrentWorkers
maxWorkers = p.config.RpcConfig.MaxConcurrentWorkers
)
if maxWorkers < 3 {

@ -54,7 +54,7 @@ func Start(ctx context.Context, config *cmd.AuthConfig, client discovery.SvcDisc
RegisterCenter: client,
authDatabase: controller.NewAuthDatabase(
cache.NewTokenCacheModel(rdb),
config.RpcConfig.Secret,
config.Share.Secret,
config.RpcConfig.TokenPolicy.Expire,
),
config: config,
@ -64,7 +64,7 @@ func Start(ctx context.Context, config *cmd.AuthConfig, client discovery.SvcDisc
func (s *authServer) UserToken(ctx context.Context, req *pbauth.UserTokenReq) (*pbauth.UserTokenResp, error) {
resp := pbauth.UserTokenResp{}
if req.Secret != s.config.RpcConfig.Secret {
if req.Secret != s.config.Share.Secret {
return nil, errs.ErrNoPermission.WrapMsg("secret invalid")
}
if _, err := s.userRpcClient.GetUserInfo(ctx, req.UserID); err != nil {
@ -102,7 +102,7 @@ func (s *authServer) GetUserToken(ctx context.Context, req *pbauth.GetUserTokenR
}
func (s *authServer) parseToken(ctx context.Context, tokensString string) (claims *tokenverify.Claims, err error) {
claims, err = tokenverify.GetClaimFromToken(tokensString, authverify.Secret(s.config.RpcConfig.Secret))
claims, err = tokenverify.GetClaimFromToken(tokensString, authverify.Secret(s.config.Share.Secret))
if err != nil {
return nil, errs.Wrap(err)
}

@ -125,7 +125,7 @@ func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadR
Seqs: req.Seqs,
ContentType: conversation.ConversationType,
}
if err := CallbackSingleMsgRead(ctx, m.config, reqCallback); err != nil {
if err := CallbackSingleMsgRead(ctx, &m.config.WebhooksConfig, reqCallback); err != nil {
return nil, err
}
@ -198,7 +198,7 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon
UnreadMsgNum: req.HasReadSeq,
ContentType: int64(conversation.ConversationType),
}
if err := CallbackGroupMsgRead(ctx, m.config, reqCall); err != nil {
if err := CallbackGroupMsgRead(ctx, &m.config.WebhooksConfig, reqCall); err != nil {
return nil, err
}

@ -61,8 +61,8 @@ func GetContent(msg *sdkws.MsgData) string {
}
}
func callbackBeforeSendSingleMsg(ctx context.Context, globalConfig *config.GlobalConfig, msg *pbchat.SendMsgReq) error {
if !globalConfig.Callback.CallbackBeforeSendSingleMsg.Enable || msg.MsgData.ContentType == constant.Typing {
func callbackBeforeSendSingleMsg(ctx context.Context, callback *config.Webhooks, msg *pbchat.SendMsgReq) error {
if !callback.BeforeSendSingleMsg.Enable || msg.MsgData.ContentType == constant.Typing {
return nil
}
req := &cbapi.CallbackBeforeSendSingleMsgReq{
@ -70,14 +70,14 @@ func callbackBeforeSendSingleMsg(ctx context.Context, globalConfig *config.Globa
RecvID: msg.MsgData.RecvID,
}
resp := &cbapi.CallbackBeforeSendSingleMsgResp{}
if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackBeforeSendSingleMsg); err != nil {
if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.BeforeSendSingleMsg); err != nil {
return err
}
return nil
}
func callbackAfterSendSingleMsg(ctx context.Context, globalConfig *config.GlobalConfig, msg *pbchat.SendMsgReq) error {
if !globalConfig.Callback.CallbackAfterSendSingleMsg.Enable || msg.MsgData.ContentType == constant.Typing {
func callbackAfterSendSingleMsg(ctx context.Context, callback *config.Webhooks, msg *pbchat.SendMsgReq) error {
if !callback.AfterSendSingleMsg.Enable || msg.MsgData.ContentType == constant.Typing {
return nil
}
req := &cbapi.CallbackAfterSendSingleMsgReq{
@ -85,14 +85,14 @@ func callbackAfterSendSingleMsg(ctx context.Context, globalConfig *config.Global
RecvID: msg.MsgData.RecvID,
}
resp := &cbapi.CallbackAfterSendSingleMsgResp{}
if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackAfterSendSingleMsg); err != nil {
if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.AfterSendSingleMsg); err != nil {
return err
}
return nil
}
func callbackBeforeSendGroupMsg(ctx context.Context, globalConfig *config.GlobalConfig, msg *pbchat.SendMsgReq) error {
if !globalConfig.Callback.CallbackBeforeSendGroupMsg.Enable || msg.MsgData.ContentType == constant.Typing {
func callbackBeforeSendGroupMsg(ctx context.Context, callback *config.Webhooks, msg *pbchat.SendMsgReq) error {
if !callback.BeforeSendGroupMsg.Enable || msg.MsgData.ContentType == constant.Typing {
return nil
}
req := &cbapi.CallbackBeforeSendGroupMsgReq{
@ -100,14 +100,14 @@ func callbackBeforeSendGroupMsg(ctx context.Context, globalConfig *config.Global
GroupID: msg.MsgData.GroupID,
}
resp := &cbapi.CallbackBeforeSendGroupMsgResp{}
if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackBeforeSendGroupMsg); err != nil {
if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.BeforeSendGroupMsg); err != nil {
return err
}
return nil
}
func callbackAfterSendGroupMsg(ctx context.Context, globalConfig *config.GlobalConfig, msg *pbchat.SendMsgReq) error {
if !globalConfig.Callback.CallbackAfterSendGroupMsg.Enable || msg.MsgData.ContentType == constant.Typing {
func callbackAfterSendGroupMsg(ctx context.Context, callback *config.Webhooks, msg *pbchat.SendMsgReq) error {
if !callback.AfterSendGroupMsg.Enable || msg.MsgData.ContentType == constant.Typing {
return nil
}
req := &cbapi.CallbackAfterSendGroupMsgReq{
@ -115,21 +115,21 @@ func callbackAfterSendGroupMsg(ctx context.Context, globalConfig *config.GlobalC
GroupID: msg.MsgData.GroupID,
}
resp := &cbapi.CallbackAfterSendGroupMsgResp{}
if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackAfterSendGroupMsg); err != nil {
if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.AfterSendGroupMsg); err != nil {
return err
}
return nil
}
func callbackMsgModify(ctx context.Context, globalConfig *config.GlobalConfig, msg *pbchat.SendMsgReq) error {
if !globalConfig.Callback.CallbackMsgModify.Enable || msg.MsgData.ContentType != constant.Text {
func callbackMsgModify(ctx context.Context, callback *config.Webhooks, msg *pbchat.SendMsgReq) error {
if !callback.BeforeMsgModify.Enable || msg.MsgData.ContentType != constant.Text {
return nil
}
req := &cbapi.CallbackMsgModifyCommandReq{
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackMsgModifyCommand),
}
resp := &cbapi.CallbackMsgModifyCommandResp{}
if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackMsgModify); err != nil {
if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.BeforeMsgModify); err != nil {
return err
}
if resp.Content != nil {
@ -154,34 +154,34 @@ func callbackMsgModify(ctx context.Context, globalConfig *config.GlobalConfig, m
return nil
}
func CallbackGroupMsgRead(ctx context.Context, globalConfig *config.GlobalConfig, req *cbapi.CallbackGroupMsgReadReq) error {
if !globalConfig.Callback.CallbackGroupMsgRead.Enable {
func CallbackGroupMsgRead(ctx context.Context, callback *config.Webhooks, req *cbapi.CallbackGroupMsgReadReq) error {
if !callback.AfterGroupMsgRead.Enable {
return nil
}
req.CallbackCommand = cbapi.CallbackGroupMsgReadCommand
resp := &cbapi.CallbackGroupMsgReadResp{}
if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackGroupMsgRead); err != nil {
if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.AfterGroupMsgRead); err != nil {
return err
}
return nil
}
func CallbackSingleMsgRead(ctx context.Context, globalConfig *config.GlobalConfig, req *cbapi.CallbackSingleMsgReadReq) error {
if !globalConfig.Callback.CallbackSingleMsgRead.Enable {
func CallbackSingleMsgRead(ctx context.Context, callback *config.Webhooks, req *cbapi.CallbackSingleMsgReadReq) error {
if !callback.AfterSingleMsgRead.Enable {
return nil
}
req.CallbackCommand = cbapi.CallbackSingleMsgRead
resp := &cbapi.CallbackSingleMsgReadResp{}
if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, req, resp, globalConfig.Callback.CallbackSingleMsgRead); err != nil {
if err := http.CallBackPostReturn(ctx, callback.URL, req, resp, callback.AfterSingleMsgRead); err != nil {
return err
}
return nil
}
func CallbackAfterRevokeMsg(ctx context.Context, globalConfig *config.GlobalConfig, req *pbchat.RevokeMsgReq) error {
if !globalConfig.Callback.CallbackAfterRevokeMsg.Enable {
func CallbackAfterRevokeMsg(ctx context.Context, callback *config.Webhooks, req *pbchat.RevokeMsgReq) error {
if !callback.AfterRevokeMsg.Enable {
return nil
}
callbackReq := &cbapi.CallbackAfterRevokeMsgReq{
@ -191,7 +191,7 @@ func CallbackAfterRevokeMsg(ctx context.Context, globalConfig *config.GlobalConf
UserID: req.UserID,
}
resp := &cbapi.CallbackAfterRevokeMsgResp{}
if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, callbackReq, resp, globalConfig.Callback.CallbackAfterRevokeMsg); err != nil {
if err := http.CallBackPostReturn(ctx, callback.URL, callbackReq, resp, callback.AfterRevokeMsg); err != nil {
return err
}
return nil

@ -16,25 +16,25 @@ package msg
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
)
type MessageInterceptorFunc func(ctx context.Context, globalConfig *config.GlobalConfig, req *msg.SendMsgReq) (*sdkws.MsgData, error)
type MessageInterceptorFunc func(ctx context.Context, globalConfig *cmd.MsgConfig, req *msg.SendMsgReq) (*sdkws.MsgData, error)
func MessageHasReadEnabled(ctx context.Context, globalConfig *config.GlobalConfig, req *msg.SendMsgReq) (*sdkws.MsgData, error) {
func MessageHasReadEnabled(ctx context.Context, config *cmd.MsgConfig, req *msg.SendMsgReq) (*sdkws.MsgData, error) {
switch {
case req.MsgData.ContentType == constant.HasReadReceipt && req.MsgData.SessionType == constant.SingleChatType:
if !globalConfig.SingleMessageHasReadReceiptEnable {
if !config.RpcConfig.SingleMessageHasReadReceiptEnable {
return nil, servererrs.ErrMessageHasReadDisable.Wrap()
}
return req.MsgData, nil
case req.MsgData.ContentType == constant.HasReadReceipt && req.MsgData.SessionType == constant.SuperGroupChatType:
if !globalConfig.GroupMessageHasReadReceiptEnable {
if !config.RpcConfig.GroupMessageHasReadReceiptEnable {
return nil, servererrs.ErrMessageHasReadDisable.Wrap()
}
return req.MsgData, nil

@ -41,7 +41,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
if req.Seq < 0 {
return nil, errs.ErrArgs.WrapMsg("seq is invalid")
}
if err := authverify.CheckAccessV3(ctx, req.UserID, &m.config.Manager, &m.config.IMAdmin); err != nil {
if err := authverify.CheckAccessV3(ctx, req.UserID, &m.config.Share.IMAdmin); err != nil {
return nil, err
}
user, err := m.UserLocalCache.GetUserInfo(ctx, req.UserID)
@ -62,10 +62,10 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
data, _ := json.Marshal(msgs[0])
log.ZDebug(ctx, "GetMsgBySeqs", "conversationID", req.ConversationID, "seq", req.Seq, "msg", string(data))
var role int32
if !authverify.IsAppManagerUid(ctx, &m.config.Manager, &m.config.IMAdmin) {
if !authverify.IsAppManagerUid(ctx, &m.config.Share.IMAdmin) {
switch msgs[0].SessionType {
case constant.SingleChatType:
if err := authverify.CheckAccessV3(ctx, msgs[0].SendID, &m.config.Manager, &m.config.IMAdmin); err != nil {
if err := authverify.CheckAccessV3(ctx, msgs[0].SendID, &m.config.Share.IMAdmin); err != nil {
return nil, err
}
role = user.AppMangerLevel
@ -104,11 +104,8 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
}
revokerUserID := mcontext.GetOpUserID(ctx)
var flag bool
if len(m.config.Manager.UserID) > 0 {
flag = datautil.Contain(revokerUserID, m.config.Manager.UserID...)
}
if len(m.config.Manager.UserID) == 0 && len(m.config.IMAdmin.UserID) > 0 {
flag = datautil.Contain(revokerUserID, m.config.IMAdmin.UserID...)
if len(m.config.Share.IMAdmin.UserID) > 0 {
flag = datautil.Contain(revokerUserID, m.config.Share.IMAdmin.UserID...)
}
tips := sdkws.RevokeMsgTips{
RevokerUserID: revokerUserID,
@ -128,7 +125,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
if err := m.notificationSender.NotificationWithSesstionType(ctx, req.UserID, recvID, constant.MsgRevokeNotification, msgs[0].SessionType, &tips); err != nil {
return nil, err
}
if err = CallbackAfterRevokeMsg(ctx, m.config, req); err != nil {
if err = CallbackAfterRevokeMsg(ctx, &m.config.WebhooksConfig, req); err != nil {
return nil, err
}
return &msg.RevokeMsgResp{}, nil

@ -59,10 +59,10 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *pbmsg.SendMs
prommetrics.GroupChatMsgProcessFailedCounter.Inc()
return nil, err
}
if err = callbackBeforeSendGroupMsg(ctx, m.config, req); err != nil {
if err = callbackBeforeSendGroupMsg(ctx, &m.config.WebhooksConfig, req); err != nil {
return nil, err
}
if err := callbackMsgModify(ctx, m.config, req); err != nil {
if err := callbackMsgModify(ctx, &m.config.WebhooksConfig, req); err != nil {
return nil, err
}
err = m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForGroup(req.MsgData.GroupID), req.MsgData)
@ -72,7 +72,7 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *pbmsg.SendMs
if req.MsgData.ContentType == constant.AtText {
go m.setConversationAtInfo(ctx, req.MsgData)
}
if err = callbackAfterSendGroupMsg(ctx, m.config, req); err != nil {
if err = callbackAfterSendGroupMsg(ctx, &m.config.WebhooksConfig, req); err != nil {
log.ZWarn(ctx, "CallbackAfterSendGroupMsg", err)
}
prommetrics.GroupChatMsgProcessSuccessCounter.Inc()
@ -157,18 +157,18 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq
prommetrics.SingleChatMsgProcessFailedCounter.Inc()
return nil, nil
} else {
if err = callbackBeforeSendSingleMsg(ctx, m.config, req); err != nil {
if err = callbackBeforeSendSingleMsg(ctx, &m.config.WebhooksConfig, req); err != nil {
return nil, err
}
if err := callbackMsgModify(ctx, m.config, req); err != nil {
if err := callbackMsgModify(ctx, &m.config.WebhooksConfig, req); err != nil {
return nil, err
}
if err := m.MsgDatabase.MsgToMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(req.MsgData.SendID, req.MsgData.RecvID), req.MsgData); err != nil {
prommetrics.SingleChatMsgProcessFailedCounter.Inc()
return nil, err
}
err = callbackAfterSendSingleMsg(ctx, m.config, req)
err = callbackAfterSendSingleMsg(ctx, &m.config.WebhooksConfig, req)
if err != nil {
log.ZWarn(ctx, "CallbackAfterSendSingleMsg", err, "req", req)
}

@ -52,7 +52,7 @@ type (
)
func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorFunc) {
m.Handlers = append(m.Handlers, interceptorFunc...
m.Handlers = append(m.Handlers, interceptorFunc...)
}
@ -65,12 +65,12 @@ func Start(ctx context.Context, config *cmd.MsgConfig, client discovery.SvcDisco
if err != nil {
return err
}
cacheModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis)
msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB())
if err != nil {
return err
}
msgModel := cache.NewMsgCache(rdb, config.MsgCacheTimeout, &config.Redis)
//todo MsgCacheTimeout
msgModel := cache.NewMsgCache(rdb, 86400, config.RedisConfig.EnablePipeline)
seqModel := cache.NewSeqCache(rdb)
conversationClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation)
userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, &config.Share.IMAdmin)

@ -87,7 +87,7 @@ func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessag
}
func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sdkws.GetMaxSeqResp, error) {
if err := authverify.CheckAccessV3(ctx, req.UserID, &m.config.Manager, &m.config.IMAdmin); err != nil {
if err := authverify.CheckAccessV3(ctx, req.UserID, &m.config.Share.IMAdmin); err != nil {
return nil, err
}
conversationIDs, err := m.ConversationLocalCache.GetConversationIDs(ctx, req.UserID)

@ -15,7 +15,7 @@
package msg
import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
@ -23,16 +23,16 @@ import (
"go.mongodb.org/mongo-driver/mongo"
)
func isMessageHasReadEnabled(msgData *sdkws.MsgData, config *config.GlobalConfig) bool {
func isMessageHasReadEnabled(msgData *sdkws.MsgData, config *cmd.MsgConfig) bool {
switch {
case msgData.ContentType == constant.HasReadReceipt && msgData.SessionType == constant.SingleChatType:
if config.SingleMessageHasReadReceiptEnable {
if config.RpcConfig.SingleMessageHasReadReceiptEnable {
return true
} else {
return false
}
case msgData.ContentType == constant.HasReadReceipt && msgData.SessionType == constant.SuperGroupChatType:
if config.GroupMessageHasReadReceiptEnable {
if config.RpcConfig.GroupMessageHasReadReceiptEnable {
return true
} else {
return false

@ -52,10 +52,7 @@ type MessageRevoked struct {
func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgReq) error {
switch data.MsgData.SessionType {
case constant.SingleChatType:
if len(m.config.Manager.UserID) > 0 && datautil.Contain(data.MsgData.SendID, m.config.Manager.UserID...) {
return nil
}
if datautil.Contain(data.MsgData.SendID, m.config.IMAdmin.UserID...) {
if datautil.Contain(data.MsgData.SendID, m.config.Share.IMAdmin.UserID...) {
return nil
}
if data.MsgData.ContentType <= constant.NotificationEnd &&
@ -69,7 +66,7 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe
if black {
return servererrs.ErrBlockedByPeer.Wrap()
}
if m.config.MessageVerify.FriendVerify != nil && *m.config.MessageVerify.FriendVerify {
if m.config.RpcConfig.FriendVerify {
friend, err := m.FriendLocalCache.IsFriend(ctx, data.MsgData.SendID, data.MsgData.RecvID)
if err != nil {
return err
@ -92,10 +89,8 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe
if groupInfo.GroupType == constant.SuperGroup {
return nil
}
if len(m.config.Manager.UserID) > 0 && datautil.Contain(data.MsgData.SendID, m.config.Manager.UserID...) {
return nil
}
if datautil.Contain(data.MsgData.SendID, m.config.IMAdmin.UserID...) {
if datautil.Contain(data.MsgData.SendID, m.config.Share.IMAdmin.UserID...) {
return nil
}
if data.MsgData.ContentType <= constant.NotificationEnd &&

@ -82,7 +82,7 @@ func (t *thirdServer) UploadLogs(ctx context.Context, req *third.UploadLogsReq)
}
func (t *thirdServer) DeleteLogs(ctx context.Context, req *third.DeleteLogsReq) (*third.DeleteLogsResp, error) {
if err := authverify.CheckAdmin(ctx, &t.config.Manager, &t.config.IMAdmin); err != nil {
if err := authverify.CheckAdmin(ctx, &t.config.Share.IMAdmin); err != nil {
return nil, err
}
userID := ""
@ -123,7 +123,7 @@ func dbToPbLogInfos(logs []*relationtb.LogModel) []*third.LogInfo {
}
func (t *thirdServer) SearchLogs(ctx context.Context, req *third.SearchLogsReq) (*third.SearchLogsResp, error) {
if err := authverify.CheckAdmin(ctx, &t.config.Manager, &t.config.IMAdmin); err != nil {
if err := authverify.CheckAdmin(ctx, &t.config.Share.IMAdmin); err != nil {
return nil, err
}
var (

@ -54,7 +54,7 @@ func (t *thirdServer) checkUploadName(ctx context.Context, name string) error {
if opUserID == "" {
return errs.ErrNoPermission.WrapMsg("opUserID is empty")
}
if !authverify.IsManagerUserID(opUserID, &t.config.Manager, &t.config.IMAdmin) {
if !authverify.IsManagerUserID(opUserID, &t.config.Share.IMAdmin) {
if !strings.HasPrefix(name, opUserID+"/") {
return errs.ErrNoPermission.WrapMsg(fmt.Sprintf("name must start with `%s/`", opUserID))
}
@ -80,5 +80,5 @@ func checkValidObjectName(objectName string) error {
}
func (t *thirdServer) IsManagerUserID(opUserID string) bool {
return authverify.IsManagerUserID(opUserID, &t.config.Manager, &t.config.IMAdmin)
return authverify.IsManagerUserID(opUserID, &t.config.Share.IMAdmin)
}

@ -24,8 +24,8 @@ import (
pbuser "github.com/openimsdk/protocol/user"
)
func CallbackBeforeUpdateUserInfo(ctx context.Context, globalConfig *config.GlobalConfig, req *pbuser.UpdateUserInfoReq) error {
if !globalConfig.Callback.CallbackBeforeUpdateUserInfo.Enable {
func CallbackBeforeUpdateUserInfo(ctx context.Context, callback *config.Webhooks, req *pbuser.UpdateUserInfoReq) error {
if !callback.BeforeUpdateUserInfo.Enable {
return nil
}
cbReq := &cbapi.CallbackBeforeUpdateUserInfoReq{
@ -35,7 +35,7 @@ func CallbackBeforeUpdateUserInfo(ctx context.Context, globalConfig *config.Glob
Nickname: &req.UserInfo.Nickname,
}
resp := &cbapi.CallbackBeforeUpdateUserInfoResp{}
if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackBeforeUpdateUserInfo); err != nil {
if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.BeforeUpdateUserInfo); err != nil {
return err
}
datautil.NotNilReplace(&req.UserInfo.FaceURL, resp.FaceURL)
@ -43,8 +43,8 @@ func CallbackBeforeUpdateUserInfo(ctx context.Context, globalConfig *config.Glob
datautil.NotNilReplace(&req.UserInfo.Nickname, resp.Nickname)
return nil
}
func CallbackAfterUpdateUserInfo(ctx context.Context, globalConfig *config.GlobalConfig, req *pbuser.UpdateUserInfoReq) error {
if !globalConfig.Callback.CallbackAfterUpdateUserInfo.Enable {
func CallbackAfterUpdateUserInfo(ctx context.Context, callback *config.Webhooks, req *pbuser.UpdateUserInfoReq) error {
if !callback.AfterUpdateUserInfo.Enable {
return nil
}
cbReq := &cbapi.CallbackAfterUpdateUserInfoReq{
@ -54,13 +54,13 @@ func CallbackAfterUpdateUserInfo(ctx context.Context, globalConfig *config.Globa
Nickname: req.UserInfo.Nickname,
}
resp := &cbapi.CallbackAfterUpdateUserInfoResp{}
if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackBeforeUpdateUserInfo); err != nil {
if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.AfterUpdateUserInfo); err != nil {
return err
}
return nil
}
func CallbackBeforeUpdateUserInfoEx(ctx context.Context, globalConfig *config.GlobalConfig, req *pbuser.UpdateUserInfoExReq) error {
if !globalConfig.Callback.CallbackBeforeUpdateUserInfoEx.Enable {
func CallbackBeforeUpdateUserInfoEx(ctx context.Context, callback *config.Webhooks, req *pbuser.UpdateUserInfoExReq) error {
if !callback.BeforeUpdateUserInfoEx.Enable {
return nil
}
cbReq := &cbapi.CallbackBeforeUpdateUserInfoExReq{
@ -70,7 +70,7 @@ func CallbackBeforeUpdateUserInfoEx(ctx context.Context, globalConfig *config.Gl
Nickname: req.UserInfo.Nickname,
}
resp := &cbapi.CallbackBeforeUpdateUserInfoExResp{}
if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackBeforeUpdateUserInfoEx); err != nil {
if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.BeforeUpdateUserInfoEx); err != nil {
return err
}
datautil.NotNilReplace(req.UserInfo.FaceURL, resp.FaceURL)
@ -78,8 +78,8 @@ func CallbackBeforeUpdateUserInfoEx(ctx context.Context, globalConfig *config.Gl
datautil.NotNilReplace(req.UserInfo.Nickname, resp.Nickname)
return nil
}
func CallbackAfterUpdateUserInfoEx(ctx context.Context, globalConfig *config.GlobalConfig, req *pbuser.UpdateUserInfoExReq) error {
if !globalConfig.Callback.CallbackAfterUpdateUserInfoEx.Enable {
func CallbackAfterUpdateUserInfoEx(ctx context.Context, callback *config.Webhooks, req *pbuser.UpdateUserInfoExReq) error {
if !callback.AfterUpdateUserInfoEx.Enable {
return nil
}
cbReq := &cbapi.CallbackAfterUpdateUserInfoExReq{
@ -89,14 +89,14 @@ func CallbackAfterUpdateUserInfoEx(ctx context.Context, globalConfig *config.Glo
Nickname: req.UserInfo.Nickname,
}
resp := &cbapi.CallbackAfterUpdateUserInfoExResp{}
if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackBeforeUpdateUserInfoEx); err != nil {
if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.AfterUpdateUserInfoEx); err != nil {
return err
}
return nil
}
func CallbackBeforeUserRegister(ctx context.Context, globalConfig *config.GlobalConfig, req *pbuser.UserRegisterReq) error {
if !globalConfig.Callback.CallbackBeforeUserRegister.Enable {
func CallbackBeforeUserRegister(ctx context.Context, callback *config.Webhooks, req *pbuser.UserRegisterReq) error {
if !callback.BeforeUserRegister.Enable {
return nil
}
cbReq := &cbapi.CallbackBeforeUserRegisterReq{
@ -106,7 +106,7 @@ func CallbackBeforeUserRegister(ctx context.Context, globalConfig *config.Global
}
resp := &cbapi.CallbackBeforeUserRegisterResp{}
if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackBeforeUpdateUserInfo); err != nil {
if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.BeforeUserRegister); err != nil {
return err
}
if len(resp.Users) != 0 {
@ -115,8 +115,8 @@ func CallbackBeforeUserRegister(ctx context.Context, globalConfig *config.Global
return nil
}
func CallbackAfterUserRegister(ctx context.Context, globalConfig *config.GlobalConfig, req *pbuser.UserRegisterReq) error {
if !globalConfig.Callback.CallbackAfterUserRegister.Enable {
func CallbackAfterUserRegister(ctx context.Context, callback *config.Webhooks, req *pbuser.UserRegisterReq) error {
if !callback.AfterUserRegister.Enable {
return nil
}
cbReq := &cbapi.CallbackAfterUserRegisterReq{
@ -126,7 +126,7 @@ func CallbackAfterUserRegister(ctx context.Context, globalConfig *config.GlobalC
}
resp := &cbapi.CallbackAfterUserRegisterResp{}
if err := http.CallBackPostReturn(ctx, globalConfig.Callback.CallbackUrl, cbReq, resp, globalConfig.Callback.CallbackAfterUpdateUserInfo); err != nil {
if err := http.CallBackPostReturn(ctx, callback.URL, cbReq, resp, callback.AfterUserRegister); err != nil {
return err
}
return nil

@ -16,13 +16,13 @@ package user
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/tools/db/redisutil"
"math/rand"
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
@ -51,24 +51,24 @@ type userServer struct {
friendRpcClient *rpcclient.FriendRpcClient
groupRpcClient *rpcclient.GroupRpcClient
RegisterCenter registry.SvcDiscoveryRegistry
config *config.GlobalConfig
config *cmd.UserConfig
}
func Start(ctx context.Context, config *config.GlobalConfig, client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
mgocli, err := mongoutil.NewMongoDB(ctx, config.Mongo.Build())
func Start(ctx context.Context, config *cmd.UserConfig, client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil {
return err
}
rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build())
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
if err != nil {
return err
}
users := make([]*tablerelation.UserModel, 0)
if len(config.IMAdmin.UserID) != len(config.IMAdmin.Nickname) {
if len(config.Share.IMAdmin.UserID) != len(config.Share.IMAdmin.Nickname) {
return errs.New("the count of ImAdmin.UserID is not equal to the count of ImAdmin.Nickname").Wrap()
}
for k, v := range config.IMAdmin.UserID {
users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.IMAdmin.Nickname[k], AppMangerLevel: constant.AppNotificationAdmin})
for k, v := range config.Share.IMAdmin.UserID {
users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Share.IMAdmin.Nickname[k], AppMangerLevel: constant.AppNotificationAdmin})
}
userDB, err := mgo.NewUserMongo(mgocli.GetDB())
if err != nil {
@ -77,15 +77,15 @@ func Start(ctx context.Context, config *config.GlobalConfig, client registry.Svc
cache := cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt())
userMongoDB := mgo.NewUserMongoDriver(mgocli.GetDB())
database := controller.NewUserDatabase(userDB, cache, mgocli.GetTx(), userMongoDB)
friendRpcClient := rpcclient.NewFriendRpcClient(client, config.RpcRegisterName.OpenImFriendName)
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.RpcRegisterName.OpenImGroupName)
msgRpcClient := rpcclient.NewMessageRpcClient(client, config.RpcRegisterName.OpenImMsgName)
friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Share.RpcRegisterName.Friend)
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group)
msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg)
u := &userServer{
db: database,
RegisterCenter: client,
friendRpcClient: &friendRpcClient,
groupRpcClient: &groupRpcClient,
friendNotificationSender: notification.NewFriendNotificationSender(&config.Notification, &msgRpcClient, notification.WithDBFunc(database.FindWithError)),
friendNotificationSender: notification.NewFriendNotificationSender(&config.NotificationConfig, &msgRpcClient, notification.WithDBFunc(database.FindWithError)),
userNotificationSender: notification.NewUserNotificationSender(config, &msgRpcClient, notification.WithUserFunc(database.FindWithError)),
config: config,
}
@ -105,11 +105,11 @@ func (s *userServer) GetDesignateUsers(ctx context.Context, req *pbuser.GetDesig
func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInfoReq) (resp *pbuser.UpdateUserInfoResp, err error) {
resp = &pbuser.UpdateUserInfoResp{}
err = authverify.CheckAccessV3(ctx, req.UserInfo.UserID, &s.config.Manager, &s.config.IMAdmin)
err = authverify.CheckAccessV3(ctx, req.UserInfo.UserID, &s.config.Share.IMAdmin)
if err != nil {
return nil, err
}
if err := CallbackBeforeUpdateUserInfo(ctx, s.config, req); err != nil {
if err := CallbackBeforeUpdateUserInfo(ctx, &s.config.WebhooksConfig, req); err != nil {
return nil, err
}
data := convert.UserPb2DBMap(req.UserInfo)
@ -129,7 +129,7 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
for _, friendID := range friends {
s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID)
}
if err = CallbackAfterUpdateUserInfo(ctx, s.config, req); err != nil {
if err = CallbackAfterUpdateUserInfo(ctx, &s.config.WebhooksConfig, req); err != nil {
return nil, err
}
if err = s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil {
@ -139,12 +139,12 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
}
func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUserInfoExReq) (resp *pbuser.UpdateUserInfoExResp, err error) {
resp = &pbuser.UpdateUserInfoExResp{}
err = authverify.CheckAccessV3(ctx, req.UserInfo.UserID, &s.config.Manager, &s.config.IMAdmin)
err = authverify.CheckAccessV3(ctx, req.UserInfo.UserID, &s.config.Share.IMAdmin)
if err != nil {
return nil, err
}
if err = CallbackBeforeUpdateUserInfoEx(ctx, s.config, req); err != nil {
if err = CallbackBeforeUpdateUserInfoEx(ctx, &s.config.WebhooksConfig, req); err != nil {
return nil, err
}
data := convert.UserPb2DBMapEx(req.UserInfo)
@ -164,7 +164,7 @@ func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUse
for _, friendID := range friends {
s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID)
}
if err := CallbackAfterUpdateUserInfoEx(ctx, s.config, req); err != nil {
if err := CallbackAfterUpdateUserInfoEx(ctx, &s.config.WebhooksConfig, req); err != nil {
return nil, err
}
if err := s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil {
@ -191,7 +191,7 @@ func (s *userServer) AccountCheck(ctx context.Context, req *pbuser.AccountCheckR
if datautil.Duplicate(req.CheckUserIDs) {
return nil, errs.ErrArgs.WrapMsg("userID repeated")
}
err = authverify.CheckAdmin(ctx, &s.config.Manager, &s.config.IMAdmin)
err = authverify.CheckAdmin(ctx, &s.config.Share.IMAdmin)
if err != nil {
return nil, err
}
@ -238,8 +238,8 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR
if len(req.Users) == 0 {
return nil, errs.ErrArgs.WrapMsg("users is empty")
}
if req.Secret != s.config.Secret {
log.ZDebug(ctx, "UserRegister", s.config.Secret, req.Secret)
if req.Secret != s.config.Share.Secret {
log.ZDebug(ctx, "UserRegister", s.config.Share.Secret, req.Secret)
return nil, errs.ErrNoPermission.WrapMsg("secret invalid")
}
if datautil.DuplicateAny(req.Users, func(e *sdkws.UserInfo) string { return e.UserID }) {
@ -262,7 +262,7 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR
if exist {
return nil, servererrs.ErrRegisteredAlready.WrapMsg("userID registered already")
}
if err := CallbackBeforeUserRegister(ctx, s.config, req); err != nil {
if err := CallbackBeforeUserRegister(ctx, &s.config.WebhooksConfig, req); err != nil {
return nil, err
}
now := time.Now()
@ -282,7 +282,7 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR
return nil, err
}
if err := CallbackAfterUserRegister(ctx, s.config, req); err != nil {
if err := CallbackAfterUserRegister(ctx, &s.config.WebhooksConfig, req); err != nil {
return nil, err
}
return resp, nil
@ -377,7 +377,7 @@ func (s *userServer) GetSubscribeUsersStatus(ctx context.Context,
// ProcessUserCommandAdd user general function add.
func (s *userServer) ProcessUserCommandAdd(ctx context.Context, req *pbuser.ProcessUserCommandAddReq) (*pbuser.ProcessUserCommandAddResp, error) {
err := authverify.CheckAccessV3(ctx, req.UserID, &s.config.Manager, &s.config.IMAdmin)
err := authverify.CheckAccessV3(ctx, req.UserID, &s.config.Share.IMAdmin)
if err != nil {
return nil, err
}
@ -408,7 +408,7 @@ func (s *userServer) ProcessUserCommandAdd(ctx context.Context, req *pbuser.Proc
// ProcessUserCommandDelete user general function delete.
func (s *userServer) ProcessUserCommandDelete(ctx context.Context, req *pbuser.ProcessUserCommandDeleteReq) (*pbuser.ProcessUserCommandDeleteResp, error) {
err := authverify.CheckAccessV3(ctx, req.UserID, &s.config.Manager, &s.config.IMAdmin)
err := authverify.CheckAccessV3(ctx, req.UserID, &s.config.Share.IMAdmin)
if err != nil {
return nil, err
}
@ -431,7 +431,7 @@ func (s *userServer) ProcessUserCommandDelete(ctx context.Context, req *pbuser.P
// ProcessUserCommandUpdate user general function update.
func (s *userServer) ProcessUserCommandUpdate(ctx context.Context, req *pbuser.ProcessUserCommandUpdateReq) (*pbuser.ProcessUserCommandUpdateResp, error) {
err := authverify.CheckAccessV3(ctx, req.UserID, &s.config.Manager, &s.config.IMAdmin)
err := authverify.CheckAccessV3(ctx, req.UserID, &s.config.Share.IMAdmin)
if err != nil {
return nil, err
}
@ -463,7 +463,7 @@ func (s *userServer) ProcessUserCommandUpdate(ctx context.Context, req *pbuser.P
func (s *userServer) ProcessUserCommandGet(ctx context.Context, req *pbuser.ProcessUserCommandGetReq) (*pbuser.ProcessUserCommandGetResp, error) {
err := authverify.CheckAccessV3(ctx, req.UserID, &s.config.Manager, &s.config.IMAdmin)
err := authverify.CheckAccessV3(ctx, req.UserID, &s.config.Share.IMAdmin)
if err != nil {
return nil, err
}
@ -492,7 +492,7 @@ func (s *userServer) ProcessUserCommandGet(ctx context.Context, req *pbuser.Proc
}
func (s *userServer) ProcessUserCommandGetAll(ctx context.Context, req *pbuser.ProcessUserCommandGetAllReq) (*pbuser.ProcessUserCommandGetAllResp, error) {
err := authverify.CheckAccessV3(ctx, req.UserID, &s.config.Manager, &s.config.IMAdmin)
err := authverify.CheckAccessV3(ctx, req.UserID, &s.config.Share.IMAdmin)
if err != nil {
return nil, err
}
@ -665,7 +665,7 @@ func (s *userServer) userModelToResp(users []*relation.UserModel, pagination pag
accounts := make([]*pbuser.NotificationAccountInfo, 0)
var total int64
for _, v := range users {
if v.AppMangerLevel == constant.AppNotificationAdmin && !datautil.Contain(v.UserID, s.config.IMAdmin.UserID...) {
if v.AppMangerLevel == constant.AppNotificationAdmin && !datautil.Contain(v.UserID, s.config.Share.IMAdmin.UserID...) {
temp := &pbuser.NotificationAccountInfo{
UserID: v.UserID,
FaceURL: v.FaceURL,

@ -17,11 +17,11 @@ package tools
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/tools/db/redisutil"
"math"
"math/rand"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
@ -47,12 +47,12 @@ type MsgTool struct {
userDatabase controller.UserDatabase
groupDatabase controller.GroupDatabase
msgNotificationSender *notification.MsgNotificationSender
Config *config.GlobalConfig
config *cmd.CronTaskConfig
}
func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controller.UserDatabase,
groupDatabase controller.GroupDatabase, conversationDatabase controller.ConversationDatabase,
msgNotificationSender *notification.MsgNotificationSender, config *config.GlobalConfig,
msgNotificationSender *notification.MsgNotificationSender, config *cmd.CronTaskConfig,
) *MsgTool {
return &MsgTool{
msgDatabase: msgDatabase,
@ -60,20 +60,20 @@ func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controlle
groupDatabase: groupDatabase,
conversationDatabase: conversationDatabase,
msgNotificationSender: msgNotificationSender,
Config: config,
config: config,
}
}
func InitMsgTool(ctx context.Context, config *config.GlobalConfig) (*MsgTool, error) {
mgocli, err := mongoutil.NewMongoDB(ctx, config.Mongo.Build())
func InitMsgTool(ctx context.Context, config *cmd.CronTaskConfig) (*MsgTool, error) {
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil {
return nil, err
}
rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build())
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
if err != nil {
return nil, err
}
discov, err := kdisc.NewDiscoveryRegister(config)
discov, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig)
if err != nil {
return nil, err
}
@ -115,7 +115,7 @@ func InitMsgTool(ctx context.Context, config *config.GlobalConfig) (*MsgTool, er
cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB),
mgocli.GetTx(),
)
msgRpcClient := rpcclient.NewMessageRpcClient(discov, config.RpcRegisterName.OpenImMsgName)
msgRpcClient := rpcclient.NewMessageRpcClient(discov, config.Share.RpcRegisterName.Msg)
msgNotificationSender := notification.NewMsgNotificationSender(config, rpcclient.WithRpcClient(&msgRpcClient))
msgTool := NewMsgTool(msgDatabase, userDatabase, groupDatabase, conversationDatabase, msgNotificationSender, config)
return msgTool, nil
@ -179,8 +179,9 @@ func (c *MsgTool) AllConversationClearMsgAndFixSeq() {
func (c *MsgTool) ClearConversationsMsg(ctx context.Context, conversationIDs []string) {
for _, conversationID := range conversationIDs {
if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, int64(c.Config.RetainChatRecords*24*60*60)); err != nil {
log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "conversationID", conversationID, "DBRetainChatRecords", c.Config.RetainChatRecords)
if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, int64(c.config.CronTask.RetainChatRecords*24*60*60)); err != nil {
log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "conversationID",
conversationID, "DBRetainChatRecords", c.config.CronTask.RetainChatRecords)
}
if err := c.checkMaxSeq(ctx, conversationID); err != nil {
log.ZError(ctx, "fixSeq failed", err, "conversationID", conversationID)

@ -41,8 +41,8 @@ const (
LogConfigFileName = "log.yml"
OpenIMAPICfgFileName = "openim-api.yml"
OpenIMCronTaskCfgFileName = "openim-crontask.yml"
OpenIMMsgGatewayCfgFileName = "openim-msg-gateway.yml"
OpenIMMsgTransferCfgFileName = "openim-msg-transfer.yml"
OpenIMMsgGatewayCfgFileName = "openim-msggateway.yml"
OpenIMMsgTransferCfgFileName = "openim-msgtransfer.yml"
OpenIMPushCfgFileName = "openim-push.yml"
OpenIMRPCAuthCfgFileName = "openim-rpc-auth.yml"
OpenIMRPCConversationCfgFileName = "openim-rpc-conversation.yml"
@ -65,12 +65,16 @@ const (
zoopkeeperEnvPrefix = "openim-zookeeper"
apiEnvPrefix = "openim-api"
cornTaskEnvPrefix = "openim-crontask"
msgGatewayEnvPrefix = "openim-msggateway"
msgTransferEnvPrefix = "openim-msgtransfer"
pushEnvPrefix = "openim-push"
authEnvPrefix = "openim-auth"
conversationEnvPrefix = "openim-conversation"
friendEnvPrefix = "openim-friend"
groupEnvPrefix = "openim-group"
msgEnvPrefix = "openim-msg"
thridEnvPrefix = "openim-third"
userEnvPrefix = "openim-user"
)
const (

@ -31,6 +31,10 @@ type CronTaskCmd struct {
type CronTaskConfig struct {
CronTask config.CronTask
RedisConfig config.Redis
MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper
Share config.Share
KafkaConfig config.Kafka
}
func NewCronTaskCmd() *CronTaskCmd {
@ -39,6 +43,10 @@ func NewCronTaskCmd() *CronTaskCmd {
ret.configMap = map[string]StructEnvPrefix{
OpenIMCronTaskCfgFileName: {EnvPrefix: cornTaskEnvPrefix, ConfigStruct: &cronTaskConfig.CronTask},
RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &cronTaskConfig.RedisConfig},
MongodbConfigFileName: {EnvPrefix: mongodbEnvPrefix, ConfigStruct: &cronTaskConfig.MongodbConfig},
ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &cronTaskConfig.ZookeeperConfig},
ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &cronTaskConfig.Share},
KafkaConfigFileName: {EnvPrefix: kafkaEnvPrefix, ConfigStruct: &cronTaskConfig.KafkaConfig},
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)

@ -42,7 +42,7 @@ func NewMsgGatewayCmd() *MsgGatewayCmd {
var msgGatewayConfig MsgGatewayConfig
ret := &MsgGatewayCmd{msgGatewayConfig: msgGatewayConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMAPICfgFileName: {EnvPrefix: apiEnvPrefix, ConfigStruct: &msgGatewayConfig.MsgGateway},
OpenIMMsgGatewayCfgFileName: {EnvPrefix: msgGatewayEnvPrefix, ConfigStruct: &msgGatewayConfig.MsgGateway},
RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &msgGatewayConfig.RedisConfig},
ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &msgGatewayConfig.ZookeeperConfig},
ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &msgGatewayConfig.Share},

@ -15,11 +15,7 @@
package cmd
import (
"testing"
"github.com/openimsdk/protocol/constant"
"github.com/stretchr/testify/mock"
"gotest.tools/assert"
)
// MockRootCmd is a mock type for the RootCmd type
@ -31,21 +27,3 @@ func (m *MockRootCmd) Execute() error {
args := m.Called()
return args.Error(0)
}
func TestMsgGatewayCmd_GetPortFromConfig(t *testing.T) {
msgGatewayCmd := &MsgGatewayCmd{RootCmd: &RootCmd{}}
tests := []struct {
portType string
want int
}{
{constant.FlagWsPort, 8080}, // Replace 8080 with the expected port from the config
{constant.FlagPort, 8081}, // Replace 8081 with the expected port from the config
{"invalid", 0},
}
for _, tt := range tests {
t.Run(tt.portType, func(t *testing.T) {
got := msgGatewayCmd.GetPortFromConfig(tt.portType)
assert.Equal(t, tt.want, got)
})
}
}

@ -16,10 +16,8 @@ package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/msgtransfer"
config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
@ -27,44 +25,43 @@ import (
type MsgTransferCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
msgTransferConfig MsgTransferConfig
}
func NewMsgTransferCmd(name string) *MsgTransferCmd {
ret := &MsgTransferCmd{RootCmd: NewRootCmd(program.GetProcessName(), name)}
ret.ctx = context.WithValue(context.Background(), "version", config2.Version)
ret.addRunE()
ret.SetRootCmdPt(ret)
return ret
type MsgTransferConfig struct {
MsgTransfer config.MsgTransfer
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
ZookeeperConfig config.ZooKeeper
Share config.Share
WebhooksConfig config.Webhooks
}
func (m *MsgTransferCmd) addRunE() {
m.Command.RunE = func(cmd *cobra.Command, args []string) error {
return msgtransfer.Start(m.ctx, m.config, m.getPrometheusPortFlag(cmd), m.getTransferProgressFlagValue())
func NewMsgTransferCmd() *MsgTransferCmd {
var msgTransferConfig MsgTransferConfig
ret := &MsgTransferCmd{msgTransferConfig: msgTransferConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMMsgTransferCfgFileName: {EnvPrefix: msgTransferEnvPrefix, ConfigStruct: &msgTransferConfig.MsgTransfer},
RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &msgTransferConfig.RedisConfig},
MongodbConfigFileName: {EnvPrefix: mongodbEnvPrefix, ConfigStruct: &msgTransferConfig.MongodbConfig},
KafkaConfigFileName: {EnvPrefix: kafkaEnvPrefix, ConfigStruct: &msgTransferConfig.KafkaConfig},
ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &msgTransferConfig.ZookeeperConfig},
ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &msgTransferConfig.Share},
WebhooksConfigFileName: {EnvPrefix: webhooksEnvPrefix, ConfigStruct: &msgTransferConfig.WebhooksConfig},
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.Command.PreRunE = func(cmd *cobra.Command, args []string) error {
return ret.preRunE()
}
return ret
}
func (m *MsgTransferCmd) Exec() error {
return m.Execute()
}
func (m *MsgTransferCmd) GetPortFromConfig(portType string) int {
if portType == constant.FlagPort {
return 0
} else if portType == constant.FlagPrometheusPort {
n := m.getTransferProgressFlagValue()
return m.config.Prometheus.MessageTransferPrometheusPort[n]
}
return 0
}
func (m *MsgTransferCmd) AddTransferProgressFlag() {
m.Command.Flags().IntP(constant.FlagTransferProgressIndex, "n", 0, "transfer progress index")
}
func (m *MsgTransferCmd) getTransferProgressFlagValue() int {
nIndex, err := m.Command.Flags().GetInt(constant.FlagTransferProgressIndex)
if err != nil {
return 0
}
return nIndex
func (m *MsgTransferCmd) preRunE() error {
return msgtransfer.Start(m.ctx, m.Index(), &m.msgTransferConfig)
}

@ -138,7 +138,7 @@ func NewSeqCmd() *SeqCmd {
func (s *SeqCmd) GetSeqCmd() *cobra.Command {
s.Command.Run = func(cmdLines *cobra.Command, args []string) {
_, err := tools.InitMsgTool(context.Background(), s.MsgTool.Config)
_, err := tools.InitMsgTool(context.Background(), s.MsgTool.config)
if err != nil {
program.ExitWithError(err)
}

@ -0,0 +1,72 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/push"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
type PushRpcCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
pushConfig PushConfig
}
type PushConfig struct {
RpcConfig config.Push
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
}
func NewPushRpcCmd() *PushRpcCmd {
var pushConfig PushConfig
ret := &PushRpcCmd{pushConfig: pushConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMPushCfgFileName: {EnvPrefix: pushEnvPrefix, ConfigStruct: &pushConfig.RpcConfig},
RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &pushConfig.RedisConfig},
ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &pushConfig.ZookeeperConfig},
MongodbConfigFileName: {EnvPrefix: mongodbEnvPrefix, ConfigStruct: &pushConfig.MongodbConfig},
KafkaConfigFileName: {EnvPrefix: kafkaEnvPrefix, ConfigStruct: &pushConfig.KafkaConfig},
ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &pushConfig.Share},
NotificationFileName: {EnvPrefix: notificationEnvPrefix, ConfigStruct: &pushConfig.NotificationConfig},
WebhooksConfigFileName: {EnvPrefix: webhooksEnvPrefix, ConfigStruct: &pushConfig.WebhooksConfig},
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.Command.PreRunE = func(cmd *cobra.Command, args []string) error {
return ret.preRunE()
}
return ret
}
func (a *PushRpcCmd) Exec() error {
return a.Execute()
}
func (a *PushRpcCmd) preRunE() error {
return startrpc.Start(a.ctx, &a.pushConfig.ZookeeperConfig, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP,
a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.Ports,
a.Index(), a.pushConfig.Share.RpcRegisterName.Auth, &a.pushConfig, push.Start)
}

@ -0,0 +1,72 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/rpc/user"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
type UserRpcCmd struct {
*RootCmd
ctx context.Context
configMap map[string]StructEnvPrefix
userConfig UserConfig
}
type UserConfig struct {
RpcConfig config.User
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
}
func NewUserRpcCmd() *UserRpcCmd {
var userConfig UserConfig
ret := &UserRpcCmd{userConfig: userConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMRPCUserCfgFileName: {EnvPrefix: userEnvPrefix, ConfigStruct: &userConfig.RpcConfig},
RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &userConfig.RedisConfig},
ZookeeperConfigFileName: {EnvPrefix: zoopkeeperEnvPrefix, ConfigStruct: &userConfig.ZookeeperConfig},
MongodbConfigFileName: {EnvPrefix: mongodbEnvPrefix, ConfigStruct: &userConfig.MongodbConfig},
KafkaConfigFileName: {EnvPrefix: kafkaEnvPrefix, ConfigStruct: &userConfig.KafkaConfig},
ShareFileName: {EnvPrefix: shareEnvPrefix, ConfigStruct: &userConfig.Share},
NotificationFileName: {EnvPrefix: notificationEnvPrefix, ConfigStruct: &userConfig.NotificationConfig},
WebhooksConfigFileName: {EnvPrefix: webhooksEnvPrefix, ConfigStruct: &userConfig.WebhooksConfig},
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.Command.PreRunE = func(cmd *cobra.Command, args []string) error {
return ret.preRunE()
}
return ret
}
func (a *UserRpcCmd) Exec() error {
return a.Execute()
}
func (a *UserRpcCmd) preRunE() error {
return startrpc.Start(a.ctx, &a.userConfig.ZookeeperConfig, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP,
a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.Ports,
a.Index(), a.userConfig.Share.RpcRegisterName.Auth, &a.userConfig, user.Start)
}

@ -18,6 +18,7 @@ import (
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/mq/kafka"
"github.com/openimsdk/tools/s3/minio"
"time"
)
@ -70,6 +71,8 @@ type Mongo struct {
type Kafka struct {
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
ProducerAck string `mapstructure:"producerAck"`
CompressType string `mapstructure:"compressType"`
Address []string `mapstructure:"address"`
ToRedisTopic string `mapstructure:"toRedisTopic"`
ToMongoTopic string `mapstructure:"toMongoTopic"`
@ -77,6 +80,15 @@ type Kafka struct {
ToRedisGroupID string `mapstructure:"toRedisGroupID"`
ToMongoGroupID string `mapstructure:"toMongoGroupID"`
ToPushGroupID string `mapstructure:"toPushGroupID"`
Tls TLSConfig `mapstructure:"tls"`
}
type TLSConfig struct {
EnableTLS bool `mapstructure:"enableTLS"`
CACrt string `mapstructure:"caCrt"`
ClientCrt string `mapstructure:"clientCrt"`
ClientKey string `mapstructure:"clientKey"`
ClientKeyPwd string `mapstructure:"clientKeyPwd"`
InsecureSkipVerify bool `mapstructure:"insecureSkipVerify"`
}
type API struct {
@ -179,6 +191,7 @@ type Push struct {
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"`
MaxConcurrentWorkers int `mapstructure:"maxConcurrentWorkers"`
Enable string `mapstructure:"enable"`
GeTui struct {
PushUrl string `mapstructure:"pushUrl"`
@ -214,7 +227,6 @@ type Auth struct {
TokenPolicy struct {
Expire int64 `mapstructure:"expire"`
} `mapstructure:"tokenPolicy"`
Secret string `mapstructure:"secret"`
}
type Conversation struct {
@ -314,6 +326,7 @@ type Redis struct {
Address []string `mapstructure:"address"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
EnablePipeline bool `mapstructure:"enablePipeline"`
ClusterMode bool `mapstructure:"clusterMode"`
DB int `mapstructure:"db"`
MaxRetry int `mapstructure:"MaxRetry"`
@ -326,6 +339,7 @@ type WebhookConfig struct {
}
type Share struct {
Secret string `mapstructure:"secret"`
Env string `mapstructure:"env"`
RpcRegisterName RpcRegisterName `mapstructure:"rpcRegisterName"`
IMAdmin IMAdmin `mapstructure:"imAdmin"`
@ -369,6 +383,7 @@ type Webhooks struct {
AfterSendSingleMsg WebhookConfig `mapstructure:"afterSendSingleMsg"`
BeforeSendGroupMsg WebhookConfig `mapstructure:"beforeSendGroupMsg"`
AfterSendGroupMsg WebhookConfig `mapstructure:"afterSendGroupMsg"`
BeforeMsgModify WebhookConfig `mapstructure:"beforeMsgModify"`
AfterUserOnline WebhookConfig `mapstructure:"afterUserOnline"`
AfterUserOffline WebhookConfig `mapstructure:"afterUserOffline"`
AfterUserKickOff WebhookConfig `mapstructure:"afterUserKickOff"`
@ -377,6 +392,7 @@ type Webhooks struct {
BeforeGroupOnlinePush WebhookConfig `mapstructure:"beforeGroupOnlinePush"`
BeforeAddFriend WebhookConfig `mapstructure:"beforeAddFriend"`
BeforeUpdateUserInfo WebhookConfig `mapstructure:"beforeUpdateUserInfo"`
AfterUpdateUserInfo WebhookConfig `mapstructure:"afterUpdateUserInfo"`
BeforeCreateGroup WebhookConfig `mapstructure:"beforeCreateGroup"`
AfterCreateGroup WebhookConfig `mapstructure:"afterCreateGroup"`
BeforeMemberJoinGroup WebhookConfig `mapstructure:"beforeMemberJoinGroup"`
@ -439,7 +455,33 @@ func (r *Redis) Build() *redisutil.Config {
}
func (k *Kafka) Build() *kafka.Config {
return &kafka.Config{}
return &kafka.Config{
Username: k.Username,
Password: k.Password,
ProducerAck: k.ProducerAck,
CompressType: k.CompressType,
Addr: k.Address,
TLS: kafka.TLSConfig{
EnableTLS: k.Tls.EnableTLS,
CACrt: k.Tls.CACrt,
ClientCrt: k.Tls.ClientCrt,
ClientKey: k.Tls.ClientKey,
ClientKeyPwd: k.Tls.ClientKeyPwd,
InsecureSkipVerify: k.Tls.InsecureSkipVerify,
},
}
}
func (m *Minio) Build() *minio.Config {
return &minio.Config{
Bucket: m.Bucket,
Endpoint: "",
AccessKeyID: m.AccessKeyID,
SecretAccessKey: m.SecretAccessKey,
SessionToken: m.SessionToken,
SignEndpoint: "",
PublicRead: m.PublicRead,
}
}
func (l *CacheConfig) Failed() time.Duration {

@ -17,6 +17,7 @@ package controller
import (
"context"
"encoding/json"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
@ -106,19 +107,19 @@ type CommonMsgDatabase interface {
}
func NewCommonMsgDatabase(msgDocModel relation.MsgDocModelInterface, msg cache.MsgCache, seq cache.SeqCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
conf, err := kafka.BuildProducerConfig(kafkaConf.Config)
conf, err := kafka.BuildProducerConfig(*kafkaConf.Build())
if err != nil {
return nil, err
}
producerToRedis, err := kafka.NewKafkaProducer(conf, kafkaConf.Config.Addr, kafkaConf.LatestMsgToRedis.Topic)
producerToRedis, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToRedisTopic)
if err != nil {
return nil, err
}
producerToMongo, err := kafka.NewKafkaProducer(conf, kafkaConf.Config.Addr, kafkaConf.MsgToMongo.Topic)
producerToMongo, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToMongoTopic)
if err != nil {
return nil, err
}
producerToPush, err := kafka.NewKafkaProducer(conf, kafkaConf.Config.Addr, kafkaConf.MsgToPush.Topic)
producerToPush, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToPushTopic)
if err != nil {
return nil, err
}
@ -132,14 +133,15 @@ func NewCommonMsgDatabase(msgDocModel relation.MsgDocModelInterface, msg cache.M
}, nil
}
func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, config *config.GlobalConfig) (CommonMsgDatabase, error) {
func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database, config *cmd.CronTaskConfig) (CommonMsgDatabase, error) {
msgDocModel, err := mgo.NewMsgMongo(database)
if err != nil {
return nil, err
}
msg := cache.NewMsgCache(rdb, config.MsgCacheTimeout, config.Redis.EnablePipeline)
//todo MsgCacheTimeout
msg := cache.NewMsgCache(rdb, 86400, config.RedisConfig.EnablePipeline)
seq := cache.NewSeqCache(rdb)
return NewCommonMsgDatabase(msgDocModel, msg, seq, config.Kafka)
return NewCommonMsgDatabase(msgDocModel, msg, seq, &config.KafkaConfig)
}
type commonMsgDatabase struct {

@ -16,6 +16,7 @@ package discoveryregister
import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/direct"
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs"
@ -26,7 +27,7 @@ const (
zookeeper = "zoopkeeper"
kubenetes = "k8s"
direct = "direct"
directT = "direct"
)
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.

Loading…
Cancel
Save