fix: config

pull/3036/head
icey-yu 9 months ago
parent a78a1d7de4
commit d498fd90ca

@ -13,7 +13,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0 github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.69 github.com/openimsdk/protocol v0.0.72-alpha.69
github.com/openimsdk/tools v0.0.50-alpha.62 github.com/openimsdk/tools v0.0.50-alpha.64
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0 github.com/stretchr/testify v1.9.0

@ -349,8 +349,8 @@ github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrk
github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.69 h1:b22oY2XTdBR/BePqA73KsrM3GDF3Vk8YcBEXZU4ArJc= github.com/openimsdk/protocol v0.0.72-alpha.69 h1:b22oY2XTdBR/BePqA73KsrM3GDF3Vk8YcBEXZU4ArJc=
github.com/openimsdk/protocol v0.0.72-alpha.69/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= github.com/openimsdk/protocol v0.0.72-alpha.69/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
github.com/openimsdk/tools v0.0.50-alpha.62 h1:e/m1XL7+EXbkOoxr/En/612WcOPKOUHPBj0++gG6MuQ= github.com/openimsdk/tools v0.0.50-alpha.64 h1:KmtE8V2K8atQJJg1xq2ySSrPQyf8ldwk8fw6jRnsxCw=
github.com/openimsdk/tools v0.0.50-alpha.62/go.mod h1:JowL2jYr8tu4vcQe+5hJh4v3BtSx1T0CIS3pgU/Mw+U= github.com/openimsdk/tools v0.0.50-alpha.64/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=

@ -58,7 +58,9 @@ func Start(ctx context.Context, index int, config *Config) error {
config.RuntimeEnv = runtimeenv.PrintRuntimeEnvironment() config.RuntimeEnv = runtimeenv.PrintRuntimeEnvironment()
client, err := kdisc.NewDiscoveryRegister(&config.Discovery, config.RuntimeEnv) client, err := kdisc.NewDiscoveryRegister(&config.Discovery, config.RuntimeEnv, []string{
config.Discovery.RpcService.MessageGateway,
})
if err != nil { if err != nil {
return errs.WrapMsg(err, "failed to register discovery service") return errs.WrapMsg(err, "failed to register discovery service")
} }

@ -16,9 +16,10 @@ package msggateway
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"sync/atomic" "sync/atomic"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
@ -64,6 +65,9 @@ func (s *Server) Start(ctx context.Context, index int, conf *Config) error {
conf.WebhooksConfig.GetConfigFileName(), conf.WebhooksConfig.GetConfigFileName(),
conf.RedisConfig.GetConfigFileName(), conf.RedisConfig.GetConfigFileName(),
}, },
[]string{
conf.Discovery.RpcService.MessageGateway,
},
s.InitServer, s.InitServer,
) )
} }

@ -87,7 +87,7 @@ func Start(ctx context.Context, index int, config *Config) error {
if err != nil { if err != nil {
return err return err
} }
client, err := discRegister.NewDiscoveryRegister(&config.Discovery, runTimeEnv) client, err := discRegister.NewDiscoveryRegister(&config.Discovery, runTimeEnv, nil)
if err != nil { if err != nil {
return err return err
} }

@ -2,11 +2,14 @@ package tools
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery"
disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd"
pbconversation "github.com/openimsdk/protocol/conversation" pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/third" "github.com/openimsdk/protocol/third"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/mw" "github.com/openimsdk/tools/mw"
@ -27,38 +30,47 @@ type CronTaskConfig struct {
runTimeEnv string runTimeEnv string
} }
func Start(ctx context.Context, config *CronTaskConfig) error { func Start(ctx context.Context, conf *CronTaskConfig) error {
config.runTimeEnv = runtimeenv.PrintRuntimeEnvironment() conf.runTimeEnv = runtimeenv.PrintRuntimeEnvironment()
log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", config.runTimeEnv, "chatRecordsClearTime", config.CronTask.CronExecuteTime, "msgDestructTime", config.CronTask.RetainChatRecords) log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", conf.runTimeEnv, "chatRecordsClearTime", conf.CronTask.CronExecuteTime, "msgDestructTime", conf.CronTask.RetainChatRecords)
if config.CronTask.RetainChatRecords < 1 { if conf.CronTask.RetainChatRecords < 1 {
return errs.New("msg destruct time must be greater than 1").Wrap() return errs.New("msg destruct time must be greater than 1").Wrap()
} }
client, err := kdisc.NewDiscoveryRegister(&config.Discovery, config.runTimeEnv) client, err := kdisc.NewDiscoveryRegister(&conf.Discovery, conf.runTimeEnv, nil)
if err != nil { if err != nil {
return errs.WrapMsg(err, "failed to register discovery service") return errs.WrapMsg(err, "failed to register discovery service")
} }
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
ctx = mcontext.SetOpUserID(ctx, config.Share.IMAdminUserID[0]) ctx = mcontext.SetOpUserID(ctx, conf.Share.IMAdminUserID[0])
msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg) msgConn, err := client.GetConn(ctx, conf.Discovery.RpcService.Msg)
if err != nil { if err != nil {
return err return err
} }
thirdConn, err := client.GetConn(ctx, config.Discovery.RpcService.Third) thirdConn, err := client.GetConn(ctx, conf.Discovery.RpcService.Third)
if err != nil { if err != nil {
return err return err
} }
conversationConn, err := client.GetConn(ctx, config.Discovery.RpcService.Conversation) conversationConn, err := client.GetConn(ctx, conf.Discovery.RpcService.Conversation)
if err != nil { if err != nil {
return err return err
} }
if conf.Discovery.Enable == config.ETCD {
cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), []string{
conf.CronTask.GetConfigFileName(),
conf.Share.GetConfigFileName(),
conf.Discovery.GetConfigFileName(),
})
cm.Watch(ctx)
}
srv := &cronServer{ srv := &cronServer{
ctx: ctx, ctx: ctx,
config: config, config: conf,
cron: cron.New(), cron: cron.New(),
msgClient: msg.NewMsgClient(msgConn), msgClient: msg.NewMsgClient(msgConn),
conversationClient: pbconversation.NewConversationClient(conversationConn), conversationClient: pbconversation.NewConversationClient(conversationConn),
@ -74,7 +86,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
if err := srv.registerClearUserMsg(); err != nil { if err := srv.registerClearUserMsg(); err != nil {
return err return err
} }
log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime) log.ZDebug(ctx, "start cron task", "CronExecuteTime", conf.CronTask.CronExecuteTime)
srv.cron.Start() srv.cron.Start()
<-ctx.Done() <-ctx.Done()
return nil return nil

@ -64,5 +64,8 @@ func (a *AuthRpcCmd) runE() error {
a.authConfig.RedisConfig.GetConfigFileName(), a.authConfig.RedisConfig.GetConfigFileName(),
a.authConfig.Discovery.GetConfigFileName(), a.authConfig.Discovery.GetConfigFileName(),
}, },
[]string{
a.authConfig.Discovery.RpcService.MessageGateway,
},
auth.Start) auth.Start)
} }

@ -68,6 +68,6 @@ func (a *ConversationRpcCmd) runE() error {
a.conversationConfig.Share.GetConfigFileName(), a.conversationConfig.Share.GetConfigFileName(),
a.conversationConfig.LocalCacheConfig.GetConfigFileName(), a.conversationConfig.LocalCacheConfig.GetConfigFileName(),
a.conversationConfig.Discovery.GetConfigFileName(), a.conversationConfig.Discovery.GetConfigFileName(),
}, }, nil,
conversation.Start) conversation.Start)
} }

@ -70,6 +70,6 @@ func (a *FriendRpcCmd) runE() error {
a.relationConfig.WebhooksConfig.GetConfigFileName(), a.relationConfig.WebhooksConfig.GetConfigFileName(),
a.relationConfig.LocalCacheConfig.GetConfigFileName(), a.relationConfig.LocalCacheConfig.GetConfigFileName(),
a.relationConfig.Discovery.GetConfigFileName(), a.relationConfig.Discovery.GetConfigFileName(),
}, }, nil,
relation.Start) relation.Start)
} }

@ -71,6 +71,6 @@ func (a *GroupRpcCmd) runE() error {
a.groupConfig.WebhooksConfig.GetConfigFileName(), a.groupConfig.WebhooksConfig.GetConfigFileName(),
a.groupConfig.LocalCacheConfig.GetConfigFileName(), a.groupConfig.LocalCacheConfig.GetConfigFileName(),
a.groupConfig.Discovery.GetConfigFileName(), a.groupConfig.Discovery.GetConfigFileName(),
}, }, nil,
group.Start, versionctx.EnableVersionCtx()) group.Start, versionctx.EnableVersionCtx())
} }

@ -72,6 +72,6 @@ func (a *MsgRpcCmd) runE() error {
a.msgConfig.WebhooksConfig.GetConfigFileName(), a.msgConfig.WebhooksConfig.GetConfigFileName(),
a.msgConfig.LocalCacheConfig.GetConfigFileName(), a.msgConfig.LocalCacheConfig.GetConfigFileName(),
a.msgConfig.Discovery.GetConfigFileName(), a.msgConfig.Discovery.GetConfigFileName(),
}, }, nil,
msg.Start) msg.Start)
} }

@ -72,5 +72,8 @@ func (a *PushRpcCmd) runE() error {
a.pushConfig.LocalCacheConfig.GetConfigFileName(), a.pushConfig.LocalCacheConfig.GetConfigFileName(),
a.pushConfig.Discovery.GetConfigFileName(), a.pushConfig.Discovery.GetConfigFileName(),
}, },
[]string{
a.pushConfig.Discovery.RpcService.MessageGateway,
},
push.Start) push.Start)
} }

@ -93,7 +93,7 @@ func (r *RootCmd) initEtcd() error {
return err return err
} }
if disConfig.Enable == config.ETCD { if disConfig.Enable == config.ETCD {
discov, _ := kdisc.NewDiscoveryRegister(&disConfig, env) discov, _ := kdisc.NewDiscoveryRegister(&disConfig, env, nil)
r.etcdClient = discov.(*etcd.SvcDiscoveryRegistryImpl).GetClient() r.etcdClient = discov.(*etcd.SvcDiscoveryRegistryImpl).GetClient()
} }
return nil return nil

@ -70,6 +70,6 @@ func (a *ThirdRpcCmd) runE() error {
a.thirdConfig.MinioConfig.GetConfigFileName(), a.thirdConfig.MinioConfig.GetConfigFileName(),
a.thirdConfig.LocalCacheConfig.GetConfigFileName(), a.thirdConfig.LocalCacheConfig.GetConfigFileName(),
a.thirdConfig.Discovery.GetConfigFileName(), a.thirdConfig.Discovery.GetConfigFileName(),
}, }, nil,
third.Start) third.Start)
} }

@ -72,6 +72,6 @@ func (a *UserRpcCmd) runE() error {
a.userConfig.WebhooksConfig.GetConfigFileName(), a.userConfig.WebhooksConfig.GetConfigFileName(),
a.userConfig.LocalCacheConfig.GetConfigFileName(), a.userConfig.LocalCacheConfig.GetConfigFileName(),
a.userConfig.Discovery.GetConfigFileName(), a.userConfig.Discovery.GetConfigFileName(),
}, }, nil,
user.Start) user.Start)
} }

@ -28,7 +28,7 @@ import (
) )
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string) (discovery.SvcDiscoveryRegistry, error) { func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string, watchNames []string) (discovery.SvcDiscoveryRegistry, error) {
if runtimeEnv == config.KUBERNETES { if runtimeEnv == config.KUBERNETES {
return kubernetes.NewKubernetesConnManager(discovery.Kubernetes.Namespace, return kubernetes.NewKubernetesConnManager(discovery.Kubernetes.Namespace,
grpc.WithDefaultCallOptions( grpc.WithDefaultCallOptions(
@ -42,6 +42,7 @@ func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string) (disco
return etcd.NewSvcDiscoveryRegistry( return etcd.NewSvcDiscoveryRegistry(
discovery.Etcd.RootDirectory, discovery.Etcd.RootDirectory,
discovery.Etcd.Address, discovery.Etcd.Address,
watchNames,
etcd.WithDialTimeout(10*time.Second), etcd.WithDialTimeout(10*time.Second),
etcd.WithMaxCallSendMsgSize(20*1024*1024), etcd.WithMaxCallSendMsgSize(20*1024*1024),
etcd.WithUsernameAndPassword(discovery.Etcd.Username, discovery.Etcd.Password)) etcd.WithUsernameAndPassword(discovery.Etcd.Username, discovery.Etcd.Password))

@ -49,7 +49,7 @@ import (
// Start rpc server. // Start rpc server.
func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP,
registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T, registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T,
watchConfigNames []string, watchConfigNames []string, watchServiceNames []string,
rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error,
options ...grpc.ServerOption) error { options ...grpc.ServerOption) error {
@ -95,7 +95,7 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf
if autoSetPorts && discovery.Enable != conf.ETCD { if autoSetPorts && discovery.Enable != conf.ETCD {
return errs.New("only etcd support autoSetPorts", "rpcRegisterName", rpcRegisterName).Wrap() return errs.New("only etcd support autoSetPorts", "rpcRegisterName", rpcRegisterName).Wrap()
} }
client, err := kdisc.NewDiscoveryRegister(discovery, runTimeEnv) client, err := kdisc.NewDiscoveryRegister(discovery, runTimeEnv, watchServiceNames)
if err != nil { if err != nil {
return err return err
} }

Loading…
Cancel
Save