diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index 2673665f7..c382de7f0 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -10,8 +10,8 @@ import ( "sync" ) -const OnlineTopicBusy = "Busy" -const OnlineTopicVacancy = "Vacancy" +const OnlineTopicBusy = 1 +const OnlineTopicVacancy = 0 var ( persistentCH PersistentConsumerHandler @@ -19,7 +19,7 @@ var ( offlineHistoryCH OfflineHistoryConsumerHandler producer *kafka.Producer cmdCh chan Cmd2Value - onlineTopicStatus string + onlineTopicStatus int w *sync.Mutex singleMsgSuccessCount uint64 groupMsgCount uint64 @@ -31,6 +31,7 @@ func Init() { w = new(sync.Mutex) persistentCH.Init() historyCH.Init(cmdCh) + onlineTopicStatus = OnlineTopicVacancy log.Debug("come msg transfer ts", config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline, config.Config.Kafka.Ws2mschatOffline.Topic) offlineHistoryCH.Init(cmdCh) statistics.NewStatistics(&singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) @@ -47,12 +48,12 @@ func Run() { go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH) go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH) } -func SetOnlineTopicStatus(status string) { +func SetOnlineTopicStatus(status int) { w.Lock() defer w.Unlock() onlineTopicStatus = status } -func GetOnlineTopicStatus() string { +func GetOnlineTopicStatus() int { w.Lock() defer w.Unlock() return onlineTopicStatus diff --git a/internal/msg_transfer/logic/offline_history_msg_handler.go b/internal/msg_transfer/logic/offline_history_msg_handler.go index a0416d7ed..b6eba5575 100644 --- a/internal/msg_transfer/logic/offline_history_msg_handler.go +++ b/internal/msg_transfer/logic/offline_history_msg_handler.go @@ -103,23 +103,20 @@ func (mc *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS // log.NewDebug("", "kafka get info to delay mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "offline") // //mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) //} - cmd := Cmd2Value{} -repeat: - select { - case cmd = <-mc.cmdCh: - case <-time.After(time.Millisecond * time.Duration(1)): - goto repeat - } - if cmd.Cmd == OnlineTopicVacancy { - for msg := range claim.Messages() { - if GetOnlineTopicStatus() == OnlineTopicVacancy { - log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) - mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) - sess.MarkMessage(msg, "") - } else { - goto repeat + for msg := range claim.Messages() { + if GetOnlineTopicStatus() == OnlineTopicVacancy { + log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) + mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) + sess.MarkMessage(msg, "") + } else { + select { + case <-mc.cmdCh: + case <-time.After(time.Millisecond * time.Duration(100)): } + mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) + sess.MarkMessage(msg, "") } } + return nil } diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 570de54d9..a2424e476 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -19,7 +19,7 @@ import ( type fcb func(msg []byte, msgKey string) type Cmd2Value struct { - Cmd string + Cmd int Value interface{} } type OnlineHistoryConsumerHandler struct { @@ -37,7 +37,7 @@ func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo) } -func (och *OnlineHistoryConsumerHandler) TriggerCmd(status string) { +func (och *OnlineHistoryConsumerHandler) TriggerCmd(status int) { operationID := utils.OperationIDGenerator() for { err := sendCmd(och.cmdCh, Cmd2Value{Cmd: status, Value: ""}, 1) @@ -135,15 +135,15 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) for msg := range claim.Messages() { - och.TriggerCmd(OnlineTopicBusy) SetOnlineTopicStatus(OnlineTopicBusy) - log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online") + //och.TriggerCmd(OnlineTopicBusy) + log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) och.msgHandle[msg.Topic](msg.Value, string(msg.Key)) sess.MarkMessage(msg, "") if claim.HighWaterMarkOffset()-msg.Offset <= 1 { log.Debug("", "online msg consume end", claim.HighWaterMarkOffset(), msg.Offset) - och.TriggerCmd(OnlineTopicVacancy) SetOnlineTopicStatus(OnlineTopicVacancy) + och.TriggerCmd(OnlineTopicVacancy) } } return nil