|
|
|
@ -95,7 +95,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
|
|
|
|
|
//ctx := mcontext.NewCtx("redis consumer")
|
|
|
|
|
//mcontext.SetOperationID(ctx, triggerID)
|
|
|
|
|
for _, v := range ctxMsgList {
|
|
|
|
|
log.ZDebug(ctx, "msg come to storage center", v.message.String())
|
|
|
|
|
log.ZDebug(ctx, "msg come to storage center", "message", v.message.String())
|
|
|
|
|
isHistory := utils.GetSwitchFromOptions(v.message.MsgData.Options, constant.IsHistory)
|
|
|
|
|
isSenderSync := utils.GetSwitchFromOptions(v.message.MsgData.Options, constant.IsSenderSync)
|
|
|
|
|
if isHistory {
|
|
|
|
@ -113,7 +113,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
|
|
|
|
|
if len(modifyMsgList) > 0 {
|
|
|
|
|
och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.aggregationID, "", modifyMsgList)
|
|
|
|
|
}
|
|
|
|
|
log.ZDebug(ctx, "msg storage length", len(storageMsgList), "push length", len(notStoragePushMsgList))
|
|
|
|
|
log.ZDebug(ctx, "msg storage length", "storageMsgList", len(storageMsgList), "push length", len(notStoragePushMsgList))
|
|
|
|
|
if len(storageMsgList) > 0 {
|
|
|
|
|
lastSeq, err := och.msgDatabase.BatchInsertChat2Cache(ctx, msgChannelValue.aggregationID, storageMsgList)
|
|
|
|
|
if err != nil {
|
|
|
|
|