pull/232/head
skiffer-git 3 years ago
parent f1e2704407
commit 5b9e5f904b

@ -105,7 +105,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
isSenderSync := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsSenderSync) isSenderSync := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsSenderSync)
if isHistory { if isHistory {
storageMsgList = append(storageMsgList, v) storageMsgList = append(storageMsgList, v)
log.NewWarn(triggerID, "storageMsgList to mongodb client msgID: ", v.MsgData.ClientMsgID) // log.NewWarn(triggerID, "storageMsgList to mongodb client msgID: ", v.MsgData.ClientMsgID)
} }
if !(!isSenderSync && msgChannelValue.userID == v.MsgData.SendID) { if !(!isSenderSync && msgChannelValue.userID == v.MsgData.SendID) {
pushMsgList = append(pushMsgList, v) pushMsgList = append(pushMsgList, v)
@ -389,9 +389,16 @@ func (OnlineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error
func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
for {
if sess == nil { if sess == nil {
log.Error("sess == nil")
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
} else {
log.NewWarn("sess == ", sess)
break
} }
}
log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition())
cMsg := make([]*sarama.ConsumerMessage, 0, 1000) cMsg := make([]*sarama.ConsumerMessage, 0, 1000)
t := time.NewTicker(time.Duration(100) * time.Millisecond) t := time.NewTicker(time.Duration(100) * time.Millisecond)

@ -373,7 +373,7 @@ func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status str
if err != nil { if err != nil {
log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status) log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status)
} else { } else {
log.NewWarn(m.OperationID, "sendMsgToKafka client msgID ", m.MsgData.ClientMsgID) // log.NewWarn(m.OperationID, "sendMsgToKafka client msgID ", m.MsgData.ClientMsgID)
} }
return err return err
case constant.OfflineStatus: case constant.OfflineStatus:

Loading…
Cancel
Save