Merge branch 'errcode' of github.com:OpenIMSDK/Open-IM-Server into errcode

test-errcode
wangchuxiao 2 years ago
commit 76fd103a82

@ -90,7 +90,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
msgChannelValue := cmd.Value.(MsgChannelValue) msgChannelValue := cmd.Value.(MsgChannelValue)
ctxMsgList := msgChannelValue.ctxMsgList ctxMsgList := msgChannelValue.ctxMsgList
ctx := msgChannelValue.ctx ctx := msgChannelValue.ctx
log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "uniqueKey", msgChannelValue.uniqueKey) log.ZDebug(withAggregationCtx(ctx, ctxMsgList), "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "uniqueKey", msgChannelValue.uniqueKey)
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(ctxMsgList) storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(ctxMsgList)
log.ZDebug(ctx, "msg lens", "storageMsgList", len(storageMsgList), "notStorageMsgList", len(notStorageMsgList), log.ZDebug(ctx, "msg lens", "storageMsgList", len(storageMsgList), "notStorageMsgList", len(notStorageMsgList),
"storageNotificationList", len(storageNotificationList), "notStorageNotificationList", len(notStorageNotificationList), "modifyMsgList", len(modifyMsgList)) "storageNotificationList", len(storageNotificationList), "notStorageNotificationList", len(notStorageNotificationList), "modifyMsgList", len(modifyMsgList))
@ -234,7 +234,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ) err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ)
if err != nil { if err != nil {
log.ZError(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value)) log.ZError(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value))
return continue
} }
var arr []string var arr []string
for i, header := range consumerMessages[i].Headers { for i, header := range consumerMessages[i].Headers {
@ -243,7 +243,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
log.ZInfo(ctx, "consumer.kafka.GetContextWithMQHeader", "len", len(consumerMessages[i].Headers), "header", strings.Join(arr, ", ")) log.ZInfo(ctx, "consumer.kafka.GetContextWithMQHeader", "len", len(consumerMessages[i].Headers), "header", strings.Join(arr, ", "))
ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers) ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers)
ctxMsg.message = &msgFromMQ ctxMsg.message = &msgFromMQ
log.ZDebug(ctx, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key)) log.ZDebug(ctx, "single msg come to distribution center", "message", msgFromMQ, "key", string(consumerMessages[i].Key))
//aggregationMsgs[string(consumerMessages[i].Key)] = append(aggregationMsgs[string(consumerMessages[i].Key)], ctxMsg) //aggregationMsgs[string(consumerMessages[i].Key)] = append(aggregationMsgs[string(consumerMessages[i].Key)], ctxMsg)
if oldM, ok := aggregationMsgs[string(consumerMessages[i].Key)]; ok { if oldM, ok := aggregationMsgs[string(consumerMessages[i].Key)]; ok {
oldM = append(oldM, ctxMsg) oldM = append(oldM, ctxMsg)
@ -259,7 +259,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
if len(v) >= 0 { if len(v) >= 0 {
hashCode := utils.GetHashCode(uniqueKey) hashCode := utils.GetHashCode(uniqueKey)
channelID := hashCode % ChannelNum channelID := hashCode % ChannelNum
log.ZDebug(ctx, "generate channelID", "hashCode", hashCode, "channelID", channelID, "uniqueKey", uniqueKey) log.ZDebug(withAggregationCtx(ctx, v), "generate channelID", "hashCode", hashCode, "channelID", channelID, "uniqueKey", uniqueKey)
och.chArrays[channelID] <- Cmd2Value{Cmd: SourceMessages, Value: MsgChannelValue{uniqueKey: uniqueKey, ctxMsgList: v, ctx: ctx}} och.chArrays[channelID] <- Cmd2Value{Cmd: SourceMessages, Value: MsgChannelValue{uniqueKey: uniqueKey, ctxMsgList: v, ctx: ctx}}
} }
} }
@ -267,6 +267,15 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
} }
} }
} }
func withAggregationCtx(ctx context.Context, values []*ContextMsg) context.Context {
var allMessageOperationID string
for _, v := range values {
if opid := mcontext.GetOperationID(v.ctx); opid != "" {
allMessageOperationID += "$" + opid
}
}
return mcontext.SetOperationID(ctx, allMessageOperationID)
}
func (och *OnlineHistoryRedisConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (och *OnlineHistoryRedisConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (och *OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { func (och *OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
@ -300,8 +309,7 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG
cMsg = make([]*sarama.ConsumerMessage, 0, 1000) cMsg = make([]*sarama.ConsumerMessage, 0, 1000)
rwLock.Unlock() rwLock.Unlock()
split := 1000 split := 1000
ctx := mcontext.NewCtx(utils.OperationIDGenerator()) ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator())
ctx = mcontext.WithTriggerIDContext(ctx, utils.OperationIDGenerator())
log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(ccMsg)) log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(ccMsg))
for i := 0; i < len(ccMsg)/split; i++ { for i := 0; i < len(ccMsg)/split; i++ {
//log.Debug() //log.Debug()

Loading…
Cancel
Save