diff --git a/internal/api/init.go b/internal/api/init.go index bc8c3c884..308902eb5 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -30,7 +30,6 @@ import ( kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" - "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" @@ -57,10 +56,7 @@ func Start(ctx context.Context, index int, config *Config) error { config.RuntimeEnv = runtimeenv.PrintRuntimeEnvironment() - var client discovery.SvcDiscoveryRegistry - - // Determine whether zk is passed according to whether it is a clustered deployment - client, err = kdisc.NewDiscoveryRegister(&config.Discovery, config.RuntimeEnv) + client, err := kdisc.NewDiscoveryRegister(&config.Discovery, config.RuntimeEnv) if err != nil { return errs.WrapMsg(err, "failed to register discovery service") } @@ -95,7 +91,7 @@ func Start(ctx context.Context, index int, config *Config) error { return errs.New("only etcd support autoSetPorts", "RegisterName", "api").Wrap() } - router := newGinRouter(client, config, client) + router := newGinRouter(client, config) if config.API.Prometheus.Enable { var ( listener net.Listener diff --git a/internal/api/router.go b/internal/api/router.go index fa2deaa38..d6bf4e130 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -47,7 +47,7 @@ func prommetricsGin() gin.HandlerFunc { } } -func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config, client discovery.SvcDiscoveryRegistry) *gin.Engine { +func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.Engine { disCov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) gin.SetMode(gin.ReleaseMode) @@ -68,7 +68,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config, client u := NewUserApi(disCov, config.Discovery.RpcService.MessageGateway) m := NewMessageApi(config.Share.IMAdminUserID) j := jssdk.NewJSSdkApi() - pd := NewPrometheusDiscoveryApi(config, client) + pd := NewPrometheusDiscoveryApi(config, disCov) userRouterGroup := r.Group("/user") { userRouterGroup.POST("/user_register", u.UserRegister) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 117a8dd47..19a53ebd5 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -25,6 +25,8 @@ import ( "strconv" "syscall" + "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/utils/jsonutil" "github.com/openimsdk/tools/utils/network" @@ -39,9 +41,7 @@ import ( conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" discRegister "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" - kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mw" @@ -93,6 +93,9 @@ func Start(ctx context.Context, index int, config *Config) error { } client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) + if err = rpcclient.InitRpcCaller(client, config.Discovery.RpcService); err != nil { + return err + } msgModel := redis.NewMsgCache(rdb) msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB()) @@ -127,10 +130,10 @@ func Start(ctx context.Context, index int, config *Config) error { historyMongoCH: historyMongoCH, runTimeEnv: runTimeEnv, } - return msgTransfer.Start(index, config) + return msgTransfer.Start(index, config, client) } -func (m *MsgTransfer) Start(index int, config *Config) error { +func (m *MsgTransfer) Start(index int, config *Config, client discovery.SvcDiscoveryRegistry) error { m.ctx, m.cancel = context.WithCancel(context.Background()) var ( netDone = make(chan struct{}, 1) @@ -145,14 +148,6 @@ func (m *MsgTransfer) Start(index int, config *Config) error { return err } - client, err := kdisc.NewDiscoveryRegister(&config.Discovery, m.runTimeEnv) - if err != nil { - return errs.WrapMsg(err, "failed to register discovery service") - } - if err = rpcclient.InitRpcCaller(client, config.Discovery.RpcService); err != nil { - return err - } - registerIP, err := network.GetRpcRegisterIP("") if err != nil { return err