|
|
|
@ -54,6 +54,8 @@ type MsgTransfer struct {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func Start(ctx context.Context, config *config.GlobalConfig, prometheusPort, index int) error {
|
|
|
|
|
log.CInfo(ctx, "MSG-TRANSFER server is initializing",
|
|
|
|
|
"prometheusPort", prometheusPort, "index", index)
|
|
|
|
|
rdb, err := cache.NewRedis(ctx, &config.Redis)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
@ -89,7 +91,7 @@ func Start(ctx context.Context, config *config.GlobalConfig, prometheusPort, ind
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return msgTransfer.Start(ctx, prometheusPort, config, index)
|
|
|
|
|
return msgTransfer.Start(prometheusPort, config, index)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewMsgTransfer(kafkaConf *config.Kafka, msgDatabase controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*MsgTransfer, error) {
|
|
|
|
@ -108,9 +110,7 @@ func NewMsgTransfer(kafkaConf *config.Kafka, msgDatabase controller.CommonMsgDat
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (m *MsgTransfer) Start(ctx context.Context, prometheusPort int, config *config.GlobalConfig, index int) error {
|
|
|
|
|
log.CInfo(ctx, "msg_transfer server starting",
|
|
|
|
|
"prometheusPort", prometheusPort, "index", index)
|
|
|
|
|
func (m *MsgTransfer) Start(prometheusPort int, config *config.GlobalConfig, index int) error {
|
|
|
|
|
if prometheusPort <= 0 {
|
|
|
|
|
return errs.WrapMsg(errors.New("invalid prometheus port"), "prometheusPort validation failed", "providedPort", prometheusPort)
|
|
|
|
|
}
|
|
|
|
|