|
|
@ -157,7 +157,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
|
|
|
|
ctx := triggerChannelValue.ctx
|
|
|
|
ctx := triggerChannelValue.ctx
|
|
|
|
consumerMessages := triggerChannelValue.cMsgList
|
|
|
|
consumerMessages := triggerChannelValue.cMsgList
|
|
|
|
//Aggregation map[userid]message list
|
|
|
|
//Aggregation map[userid]message list
|
|
|
|
log.ZDebug(ctx, "batch messages come to distribution center", len(consumerMessages))
|
|
|
|
log.ZDebug(ctx, "batch messages come to distribution center", "length", len(consumerMessages))
|
|
|
|
for i := 0; i < len(consumerMessages); i++ {
|
|
|
|
for i := 0; i < len(consumerMessages); i++ {
|
|
|
|
ctxMsg := &ContextMsg{}
|
|
|
|
ctxMsg := &ContextMsg{}
|
|
|
|
msgFromMQ := pbMsg.MsgDataToMQ{}
|
|
|
|
msgFromMQ := pbMsg.MsgDataToMQ{}
|
|
|
@ -178,7 +178,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
|
|
|
|
aggregationMsgs[string(consumerMessages[i].Key)] = m
|
|
|
|
aggregationMsgs[string(consumerMessages[i].Key)] = m
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
log.ZDebug(ctx, "generate map list users len", len(aggregationMsgs))
|
|
|
|
log.ZDebug(ctx, "generate map list users len", "length", len(aggregationMsgs))
|
|
|
|
for aggregationID, v := range aggregationMsgs {
|
|
|
|
for aggregationID, v := range aggregationMsgs {
|
|
|
|
if len(v) >= 0 {
|
|
|
|
if len(v) >= 0 {
|
|
|
|
hashCode := utils.GetHashCode(aggregationID)
|
|
|
|
hashCode := utils.GetHashCode(aggregationID)
|
|
|
@ -223,7 +223,7 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG
|
|
|
|
rwLock.Unlock()
|
|
|
|
rwLock.Unlock()
|
|
|
|
split := 1000
|
|
|
|
split := 1000
|
|
|
|
ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator())
|
|
|
|
ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator())
|
|
|
|
log.ZDebug(ctx, "timer trigger msg consumer start", len(ccMsg))
|
|
|
|
log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(ccMsg))
|
|
|
|
for i := 0; i < len(ccMsg)/split; i++ {
|
|
|
|
for i := 0; i < len(ccMsg)/split; i++ {
|
|
|
|
//log.Debug()
|
|
|
|
//log.Debug()
|
|
|
|
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
|
|
|
|
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
|
|
|
@ -233,7 +233,7 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG
|
|
|
|
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
|
|
|
|
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
|
|
|
|
ctx: ctx, cMsgList: ccMsg[split*(len(ccMsg)/split):]}}
|
|
|
|
ctx: ctx, cMsgList: ccMsg[split*(len(ccMsg)/split):]}}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
log.ZDebug(ctx, "timer trigger msg consumer end", len(ccMsg))
|
|
|
|
log.ZDebug(ctx, "timer trigger msg consumer end", "length", len(ccMsg))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|