msg transfer handle change

pull/232/head
Gordon 3 years ago
parent 8e3f090ffe
commit 573937e1e5

@ -38,6 +38,7 @@ func Init() {
w = new(sync.Mutex)
persistentCH.Init()
historyCH.Init(cmdCh)
historyMongoCH.Init()
onlineTopicStatus = OnlineTopicVacancy
//offlineHistoryCH.Init(cmdCh)
statistics.NewStatistics(&singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
@ -53,7 +54,7 @@ func Run() {
fmt.Println("not start mysql consumer")
}
go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH)
//go historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyMongoCH)
go historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyMongoCH)
//go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH)
}
func SetOnlineTopicStatus(status int) {

@ -18,7 +18,7 @@ type OnlineHistoryMongoConsumerHandler struct {
historyConsumerGroup *kfk.MConsumerGroup
}
func (och *OnlineHistoryMongoConsumerHandler) Init(cmdCh chan Cmd2Value) {
func (och *OnlineHistoryMongoConsumerHandler) Init() {
och.msgHandle = make(map[string]fcb)
och.msgHandle[config.Config.Kafka.MsgToMongo.Topic] = och.handleChatWs2Mongo
och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,

Loading…
Cancel
Save