|
|
|
@ -25,6 +25,8 @@ import (
|
|
|
|
|
"strconv"
|
|
|
|
|
"syscall"
|
|
|
|
|
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
|
|
|
|
"github.com/openimsdk/tools/discovery"
|
|
|
|
|
"github.com/openimsdk/tools/discovery/etcd"
|
|
|
|
|
"github.com/openimsdk/tools/utils/jsonutil"
|
|
|
|
|
"github.com/openimsdk/tools/utils/network"
|
|
|
|
@ -39,9 +41,7 @@ import (
|
|
|
|
|
|
|
|
|
|
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
|
|
|
|
discRegister "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
|
|
|
|
|
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
|
|
|
|
"github.com/openimsdk/tools/errs"
|
|
|
|
|
"github.com/openimsdk/tools/log"
|
|
|
|
|
"github.com/openimsdk/tools/mw"
|
|
|
|
@ -93,6 +93,9 @@ func Start(ctx context.Context, index int, config *Config) error {
|
|
|
|
|
}
|
|
|
|
|
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()),
|
|
|
|
|
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
|
|
|
|
|
if err = rpcclient.InitRpcCaller(client, config.Discovery.RpcService); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
msgModel := redis.NewMsgCache(rdb)
|
|
|
|
|
msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB())
|
|
|
|
@ -127,10 +130,10 @@ func Start(ctx context.Context, index int, config *Config) error {
|
|
|
|
|
historyMongoCH: historyMongoCH,
|
|
|
|
|
runTimeEnv: runTimeEnv,
|
|
|
|
|
}
|
|
|
|
|
return msgTransfer.Start(index, config)
|
|
|
|
|
return msgTransfer.Start(index, config, client)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (m *MsgTransfer) Start(index int, config *Config) error {
|
|
|
|
|
func (m *MsgTransfer) Start(index int, config *Config, client discovery.SvcDiscoveryRegistry) error {
|
|
|
|
|
m.ctx, m.cancel = context.WithCancel(context.Background())
|
|
|
|
|
var (
|
|
|
|
|
netDone = make(chan struct{}, 1)
|
|
|
|
@ -145,14 +148,6 @@ func (m *MsgTransfer) Start(index int, config *Config) error {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
client, err := kdisc.NewDiscoveryRegister(&config.Discovery, m.runTimeEnv)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errs.WrapMsg(err, "failed to register discovery service")
|
|
|
|
|
}
|
|
|
|
|
if err = rpcclient.InitRpcCaller(client, config.Discovery.RpcService); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
registerIP, err := network.GetRpcRegisterIP("")
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|