|
|
|
@ -74,10 +74,10 @@ type OnlineHistoryRedisConsumerHandler struct {
|
|
|
|
|
chArrays [ChannelNum]chan Cmd2Value
|
|
|
|
|
msgDistributionCh chan Cmd2Value
|
|
|
|
|
|
|
|
|
|
singleMsgSuccessCount uint64
|
|
|
|
|
singleMsgFailedCount uint64
|
|
|
|
|
singleMsgSuccessCountMutex sync.Mutex
|
|
|
|
|
singleMsgFailedCountMutex sync.Mutex
|
|
|
|
|
// singleMsgSuccessCount uint64
|
|
|
|
|
// singleMsgFailedCount uint64
|
|
|
|
|
// singleMsgSuccessCountMutex sync.Mutex
|
|
|
|
|
// singleMsgFailedCountMutex sync.Mutex
|
|
|
|
|
|
|
|
|
|
msgDatabase controller.CommonMsgDatabase
|
|
|
|
|
conversationRpcClient *rpcclient.ConversationRpcClient
|
|
|
|
@ -111,62 +111,59 @@ func NewOnlineHistoryRedisConsumerHandler(
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case cmd := <-och.chArrays[channelID]:
|
|
|
|
|
switch cmd.Cmd {
|
|
|
|
|
case SourceMessages:
|
|
|
|
|
msgChannelValue := cmd.Value.(MsgChannelValue)
|
|
|
|
|
ctxMsgList := msgChannelValue.ctxMsgList
|
|
|
|
|
ctx := msgChannelValue.ctx
|
|
|
|
|
log.ZDebug(
|
|
|
|
|
for cmd := range och.chArrays[channelID] {
|
|
|
|
|
switch cmd.Cmd {
|
|
|
|
|
case SourceMessages:
|
|
|
|
|
msgChannelValue := cmd.Value.(MsgChannelValue)
|
|
|
|
|
ctxMsgList := msgChannelValue.ctxMsgList
|
|
|
|
|
ctx := msgChannelValue.ctx
|
|
|
|
|
log.ZDebug(
|
|
|
|
|
ctx,
|
|
|
|
|
"msg arrived channel",
|
|
|
|
|
"channel id",
|
|
|
|
|
channelID,
|
|
|
|
|
"msgList length",
|
|
|
|
|
len(ctxMsgList),
|
|
|
|
|
"uniqueKey",
|
|
|
|
|
msgChannelValue.uniqueKey,
|
|
|
|
|
)
|
|
|
|
|
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(
|
|
|
|
|
ctxMsgList,
|
|
|
|
|
)
|
|
|
|
|
log.ZDebug(
|
|
|
|
|
ctx,
|
|
|
|
|
"msg lens",
|
|
|
|
|
"storageMsgList",
|
|
|
|
|
len(storageMsgList),
|
|
|
|
|
"notStorageMsgList",
|
|
|
|
|
len(notStorageMsgList),
|
|
|
|
|
"storageNotificationList",
|
|
|
|
|
len(storageNotificationList),
|
|
|
|
|
"notStorageNotificationList",
|
|
|
|
|
len(notStorageNotificationList),
|
|
|
|
|
"modifyMsgList",
|
|
|
|
|
len(modifyMsgList),
|
|
|
|
|
)
|
|
|
|
|
conversationIDMsg := msgprocessor.GetChatConversationIDByMsg(ctxMsgList[0].message)
|
|
|
|
|
conversationIDNotification := msgprocessor.GetNotificationConversationIDByMsg(ctxMsgList[0].message)
|
|
|
|
|
och.handleMsg(ctx, msgChannelValue.uniqueKey, conversationIDMsg, storageMsgList, notStorageMsgList)
|
|
|
|
|
och.handleNotification(
|
|
|
|
|
ctx,
|
|
|
|
|
msgChannelValue.uniqueKey,
|
|
|
|
|
conversationIDNotification,
|
|
|
|
|
storageNotificationList,
|
|
|
|
|
notStorageNotificationList,
|
|
|
|
|
)
|
|
|
|
|
if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.uniqueKey, conversationIDNotification, modifyMsgList); err != nil {
|
|
|
|
|
log.ZError(
|
|
|
|
|
ctx,
|
|
|
|
|
"msg arrived channel",
|
|
|
|
|
"channel id",
|
|
|
|
|
channelID,
|
|
|
|
|
"msgList length",
|
|
|
|
|
len(ctxMsgList),
|
|
|
|
|
"msg to modify mq error",
|
|
|
|
|
err,
|
|
|
|
|
"uniqueKey",
|
|
|
|
|
msgChannelValue.uniqueKey,
|
|
|
|
|
)
|
|
|
|
|
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(
|
|
|
|
|
ctxMsgList,
|
|
|
|
|
)
|
|
|
|
|
log.ZDebug(
|
|
|
|
|
ctx,
|
|
|
|
|
"msg lens",
|
|
|
|
|
"storageMsgList",
|
|
|
|
|
len(storageMsgList),
|
|
|
|
|
"notStorageMsgList",
|
|
|
|
|
len(notStorageMsgList),
|
|
|
|
|
"storageNotificationList",
|
|
|
|
|
len(storageNotificationList),
|
|
|
|
|
"notStorageNotificationList",
|
|
|
|
|
len(notStorageNotificationList),
|
|
|
|
|
"modifyMsgList",
|
|
|
|
|
len(modifyMsgList),
|
|
|
|
|
modifyMsgList,
|
|
|
|
|
)
|
|
|
|
|
conversationIDMsg := msgprocessor.GetChatConversationIDByMsg(ctxMsgList[0].message)
|
|
|
|
|
conversationIDNotification := msgprocessor.GetNotificationConversationIDByMsg(ctxMsgList[0].message)
|
|
|
|
|
och.handleMsg(ctx, msgChannelValue.uniqueKey, conversationIDMsg, storageMsgList, notStorageMsgList)
|
|
|
|
|
och.handleNotification(
|
|
|
|
|
ctx,
|
|
|
|
|
msgChannelValue.uniqueKey,
|
|
|
|
|
conversationIDNotification,
|
|
|
|
|
storageNotificationList,
|
|
|
|
|
notStorageNotificationList,
|
|
|
|
|
)
|
|
|
|
|
if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.uniqueKey, conversationIDNotification, modifyMsgList); err != nil {
|
|
|
|
|
log.ZError(
|
|
|
|
|
ctx,
|
|
|
|
|
"msg to modify mq error",
|
|
|
|
|
err,
|
|
|
|
|
"uniqueKey",
|
|
|
|
|
msgChannelValue.uniqueKey,
|
|
|
|
|
"modifyMsgList",
|
|
|
|
|
modifyMsgList,
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|