|
|
|
@ -16,51 +16,34 @@ package msgtransfer
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"strconv"
|
|
|
|
|
"strings"
|
|
|
|
|
"sync"
|
|
|
|
|
"sync/atomic"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/IBM/sarama"
|
|
|
|
|
"github.com/go-redis/redis"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/tools/batcher"
|
|
|
|
|
"github.com/openimsdk/protocol/constant"
|
|
|
|
|
"github.com/openimsdk/protocol/sdkws"
|
|
|
|
|
"github.com/openimsdk/tools/errs"
|
|
|
|
|
"github.com/openimsdk/tools/log"
|
|
|
|
|
"github.com/openimsdk/tools/mcontext"
|
|
|
|
|
"github.com/openimsdk/tools/mq/kafka"
|
|
|
|
|
"github.com/openimsdk/tools/utils/idutil"
|
|
|
|
|
"github.com/openimsdk/tools/utils/stringutil"
|
|
|
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
|
"strconv"
|
|
|
|
|
"strings"
|
|
|
|
|
"time"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
ConsumerMsgs = 3
|
|
|
|
|
SourceMessages = 4
|
|
|
|
|
MongoMessages = 5
|
|
|
|
|
ChannelNum = 100
|
|
|
|
|
size = 500
|
|
|
|
|
mainDataBuffer = 500
|
|
|
|
|
subChanBuffer = 50
|
|
|
|
|
worker = 50
|
|
|
|
|
interval = 100 * time.Millisecond
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type MsgChannelValue struct {
|
|
|
|
|
uniqueKey string
|
|
|
|
|
ctx context.Context
|
|
|
|
|
ctxMsgList []*ContextMsg
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type TriggerChannelValue struct {
|
|
|
|
|
ctx context.Context
|
|
|
|
|
cMsgList []*sarama.ConsumerMessage
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type Cmd2Value struct {
|
|
|
|
|
Cmd int
|
|
|
|
|
Value any
|
|
|
|
|
}
|
|
|
|
|
type ContextMsg struct {
|
|
|
|
|
message *sdkws.MsgData
|
|
|
|
|
ctx context.Context
|
|
|
|
@ -68,13 +51,8 @@ type ContextMsg struct {
|
|
|
|
|
|
|
|
|
|
type OnlineHistoryRedisConsumerHandler struct {
|
|
|
|
|
historyConsumerGroup *kafka.MConsumerGroup
|
|
|
|
|
chArrays [ChannelNum]chan Cmd2Value
|
|
|
|
|
msgDistributionCh chan Cmd2Value
|
|
|
|
|
|
|
|
|
|
// singleMsgSuccessCount uint64
|
|
|
|
|
// singleMsgFailedCount uint64
|
|
|
|
|
// singleMsgSuccessCountMutex sync.Mutex
|
|
|
|
|
// singleMsgFailedCountMutex sync.Mutex
|
|
|
|
|
redisMessageBatches *batcher.Batcher[sarama.ConsumerMessage]
|
|
|
|
|
|
|
|
|
|
msgDatabase controller.CommonMsgDatabase
|
|
|
|
|
conversationRpcClient *rpcclient.ConversationRpcClient
|
|
|
|
@ -83,89 +61,82 @@ type OnlineHistoryRedisConsumerHandler struct {
|
|
|
|
|
|
|
|
|
|
func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase,
|
|
|
|
|
conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*OnlineHistoryRedisConsumerHandler, error) {
|
|
|
|
|
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}, true)
|
|
|
|
|
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}, false)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
var och OnlineHistoryRedisConsumerHandler
|
|
|
|
|
och.msgDatabase = database
|
|
|
|
|
och.msgDistributionCh = make(chan Cmd2Value) // no buffer channel
|
|
|
|
|
go och.MessagesDistributionHandle()
|
|
|
|
|
for i := 0; i < ChannelNum; i++ {
|
|
|
|
|
och.chArrays[i] = make(chan Cmd2Value, 50)
|
|
|
|
|
go och.Run(i)
|
|
|
|
|
|
|
|
|
|
b := batcher.New[sarama.ConsumerMessage](
|
|
|
|
|
batcher.WithSize(size),
|
|
|
|
|
batcher.WithWorker(worker),
|
|
|
|
|
batcher.WithInterval(interval),
|
|
|
|
|
batcher.WithDataBuffer(mainDataBuffer),
|
|
|
|
|
batcher.WithSyncWait(true),
|
|
|
|
|
batcher.WithBuffer(subChanBuffer),
|
|
|
|
|
)
|
|
|
|
|
b.Sharding = func(key string) int {
|
|
|
|
|
hashCode := stringutil.GetHashCode(key)
|
|
|
|
|
return int(hashCode) % och.redisMessageBatches.Worker()
|
|
|
|
|
}
|
|
|
|
|
b.Key = func(consumerMessage *sarama.ConsumerMessage) string {
|
|
|
|
|
return string(consumerMessage.Key)
|
|
|
|
|
}
|
|
|
|
|
b.Do = och.do
|
|
|
|
|
och.redisMessageBatches = b
|
|
|
|
|
och.conversationRpcClient = conversationRpcClient
|
|
|
|
|
och.groupRpcClient = groupRpcClient
|
|
|
|
|
och.historyConsumerGroup = historyConsumerGroup
|
|
|
|
|
return &och, err
|
|
|
|
|
}
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID int, val *batcher.Msg[sarama.ConsumerMessage]) {
|
|
|
|
|
ctx = mcontext.WithTriggerIDContext(ctx, val.TriggerID())
|
|
|
|
|
ctxMessages := och.parseConsumerMessages(ctx, val.Val())
|
|
|
|
|
ctx = withAggregationCtx(ctx, ctxMessages)
|
|
|
|
|
log.ZInfo(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMessages),
|
|
|
|
|
"key", val.Key())
|
|
|
|
|
|
|
|
|
|
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList :=
|
|
|
|
|
och.categorizeMessageLists(ctxMessages)
|
|
|
|
|
log.ZDebug(ctx, "number of categorized messages", "storageMsgList", len(storageMsgList), "notStorageMsgList",
|
|
|
|
|
len(notStorageMsgList), "storageNotificationList", len(storageNotificationList), "notStorageNotificationList",
|
|
|
|
|
len(notStorageNotificationList))
|
|
|
|
|
|
|
|
|
|
conversationIDMsg := msgprocessor.GetChatConversationIDByMsg(ctxMessages[0].message)
|
|
|
|
|
conversationIDNotification := msgprocessor.GetNotificationConversationIDByMsg(ctxMessages[0].message)
|
|
|
|
|
och.handleMsg(ctx, val.Key(), conversationIDMsg, storageMsgList, notStorageMsgList)
|
|
|
|
|
och.handleNotification(ctx, val.Key(), conversationIDNotification, storageNotificationList, notStorageNotificationList)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
|
|
|
|
|
for cmd := range och.chArrays[channelID] {
|
|
|
|
|
switch cmd.Cmd {
|
|
|
|
|
case SourceMessages:
|
|
|
|
|
msgChannelValue := cmd.Value.(MsgChannelValue)
|
|
|
|
|
ctxMsgList := msgChannelValue.ctxMsgList
|
|
|
|
|
ctx := msgChannelValue.ctx
|
|
|
|
|
log.ZDebug(
|
|
|
|
|
ctx,
|
|
|
|
|
"msg arrived channel",
|
|
|
|
|
"channel id",
|
|
|
|
|
channelID,
|
|
|
|
|
"msgList length",
|
|
|
|
|
len(ctxMsgList),
|
|
|
|
|
"uniqueKey",
|
|
|
|
|
msgChannelValue.uniqueKey,
|
|
|
|
|
)
|
|
|
|
|
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(
|
|
|
|
|
ctxMsgList,
|
|
|
|
|
)
|
|
|
|
|
log.ZDebug(
|
|
|
|
|
ctx,
|
|
|
|
|
"msg lens",
|
|
|
|
|
"storageMsgList",
|
|
|
|
|
len(storageMsgList),
|
|
|
|
|
"notStorageMsgList",
|
|
|
|
|
len(notStorageMsgList),
|
|
|
|
|
"storageNotificationList",
|
|
|
|
|
len(storageNotificationList),
|
|
|
|
|
"notStorageNotificationList",
|
|
|
|
|
len(notStorageNotificationList),
|
|
|
|
|
"modifyMsgList",
|
|
|
|
|
len(modifyMsgList),
|
|
|
|
|
)
|
|
|
|
|
conversationIDMsg := msgprocessor.GetChatConversationIDByMsg(ctxMsgList[0].message)
|
|
|
|
|
conversationIDNotification := msgprocessor.GetNotificationConversationIDByMsg(ctxMsgList[0].message)
|
|
|
|
|
och.handleMsg(ctx, msgChannelValue.uniqueKey, conversationIDMsg, storageMsgList, notStorageMsgList)
|
|
|
|
|
och.handleNotification(
|
|
|
|
|
ctx,
|
|
|
|
|
msgChannelValue.uniqueKey,
|
|
|
|
|
conversationIDNotification,
|
|
|
|
|
storageNotificationList,
|
|
|
|
|
notStorageNotificationList,
|
|
|
|
|
)
|
|
|
|
|
if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.uniqueKey, conversationIDNotification, modifyMsgList); err != nil {
|
|
|
|
|
log.ZError(ctx, "msg to modify mq error", err, "uniqueKey", msgChannelValue.uniqueKey, "modifyMsgList", modifyMsgList)
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context.Context, consumerMessages []*sarama.ConsumerMessage) []*ContextMsg {
|
|
|
|
|
var ctxMessages []*ContextMsg
|
|
|
|
|
for i := 0; i < len(consumerMessages); i++ {
|
|
|
|
|
ctxMsg := &ContextMsg{}
|
|
|
|
|
msgFromMQ := &sdkws.MsgData{}
|
|
|
|
|
err := proto.Unmarshal(consumerMessages[i].Value, msgFromMQ)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.ZWarn(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value))
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
var arr []string
|
|
|
|
|
for i, header := range consumerMessages[i].Headers {
|
|
|
|
|
arr = append(arr, strconv.Itoa(i), string(header.Key), string(header.Value))
|
|
|
|
|
}
|
|
|
|
|
log.ZDebug(ctx, "consumer.kafka.GetContextWithMQHeader", "len", len(consumerMessages[i].Headers),
|
|
|
|
|
"header", strings.Join(arr, ", "))
|
|
|
|
|
ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers)
|
|
|
|
|
ctxMsg.message = msgFromMQ
|
|
|
|
|
log.ZDebug(ctx, "message parse finish", "message", msgFromMQ, "key",
|
|
|
|
|
string(consumerMessages[i].Key))
|
|
|
|
|
ctxMessages = append(ctxMessages, ctxMsg)
|
|
|
|
|
}
|
|
|
|
|
return ctxMessages
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Get messages/notifications stored message list, not stored and pushed message list.
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(
|
|
|
|
|
totalMsgs []*ContextMsg,
|
|
|
|
|
) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*sdkws.MsgData) {
|
|
|
|
|
isStorage := func(msg *sdkws.MsgData) bool {
|
|
|
|
|
options2 := msgprocessor.Options(msg.Options)
|
|
|
|
|
if options2.IsHistory() {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
// if !(!options2.IsSenderSync() && conversationID == msg.MsgData.SendID) {
|
|
|
|
|
// return false
|
|
|
|
|
// }
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) categorizeMessageLists(totalMsgs []*ContextMsg) (storageMsgList,
|
|
|
|
|
notStorageMsgList, storageNotificationList, notStorageNotificationList []*ContextMsg) {
|
|
|
|
|
for _, v := range totalMsgs {
|
|
|
|
|
options := msgprocessor.Options(v.message.Options)
|
|
|
|
|
if !options.IsNotNotification() {
|
|
|
|
@ -185,176 +156,106 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(
|
|
|
|
|
msgprocessor.WithOfflinePush(false),
|
|
|
|
|
msgprocessor.WithUnreadCount(false),
|
|
|
|
|
)
|
|
|
|
|
storageMsgList = append(storageMsgList, msg)
|
|
|
|
|
ctxMsg := &ContextMsg{
|
|
|
|
|
message: msg,
|
|
|
|
|
ctx: v.ctx,
|
|
|
|
|
}
|
|
|
|
|
storageMsgList = append(storageMsgList, ctxMsg)
|
|
|
|
|
}
|
|
|
|
|
if isStorage(v.message) {
|
|
|
|
|
storageNotificatoinList = append(storageNotificatoinList, v.message)
|
|
|
|
|
if options.IsHistory() {
|
|
|
|
|
storageNotificationList = append(storageNotificationList, v)
|
|
|
|
|
} else {
|
|
|
|
|
notStorageNotificationList = append(notStorageNotificationList, v.message)
|
|
|
|
|
notStorageNotificationList = append(notStorageNotificationList, v)
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
if isStorage(v.message) {
|
|
|
|
|
storageMsgList = append(storageMsgList, v.message)
|
|
|
|
|
if options.IsHistory() {
|
|
|
|
|
storageMsgList = append(storageMsgList, v)
|
|
|
|
|
} else {
|
|
|
|
|
notStorageMsgList = append(notStorageMsgList, v.message)
|
|
|
|
|
notStorageMsgList = append(notStorageMsgList, v)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if v.message.ContentType == constant.ReactionMessageModifier ||
|
|
|
|
|
v.message.ContentType == constant.ReactionMessageDeleter {
|
|
|
|
|
modifyMsgList = append(modifyMsgList, v.message)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) handleNotification(
|
|
|
|
|
ctx context.Context,
|
|
|
|
|
key, conversationID string,
|
|
|
|
|
storageList, notStorageList []*sdkws.MsgData,
|
|
|
|
|
) {
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key, conversationID string, storageList, notStorageList []*ContextMsg) {
|
|
|
|
|
och.toPushTopic(ctx, key, conversationID, notStorageList)
|
|
|
|
|
if len(storageList) > 0 {
|
|
|
|
|
lastSeq, _, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.ZError(
|
|
|
|
|
ctx,
|
|
|
|
|
"notification batch insert to redis error",
|
|
|
|
|
err,
|
|
|
|
|
"conversationID",
|
|
|
|
|
conversationID,
|
|
|
|
|
"storageList",
|
|
|
|
|
storageList,
|
|
|
|
|
)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
log.ZDebug(ctx, "success to next topic", "conversationID", conversationID)
|
|
|
|
|
err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageList, lastSeq)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.ZError(ctx, "MsgToMongoMQ error", err)
|
|
|
|
|
}
|
|
|
|
|
och.toPushTopic(ctx, key, conversationID, storageList)
|
|
|
|
|
var storageMessageList []*sdkws.MsgData
|
|
|
|
|
for _, msg := range storageList {
|
|
|
|
|
storageMessageList = append(storageMessageList, msg.message)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*sdkws.MsgData) {
|
|
|
|
|
for _, v := range msgs {
|
|
|
|
|
och.msgDatabase.MsgToPushMQ(ctx, key, conversationID, v) // nolint: errcheck
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key, conversationID string, storageList, notStorageList []*sdkws.MsgData) {
|
|
|
|
|
och.toPushTopic(ctx, key, conversationID, notStorageList)
|
|
|
|
|
if len(storageList) > 0 {
|
|
|
|
|
lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList)
|
|
|
|
|
if len(storageMessageList) > 0 {
|
|
|
|
|
msg := storageMessageList[0]
|
|
|
|
|
lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
|
|
|
|
|
if err != nil && errs.Unwrap(err) != redis.Nil {
|
|
|
|
|
log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageList)
|
|
|
|
|
log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if isNewConversation {
|
|
|
|
|
switch storageList[0].SessionType {
|
|
|
|
|
switch msg.SessionType {
|
|
|
|
|
case constant.ReadGroupChatType:
|
|
|
|
|
log.ZInfo(ctx, "group chat first create conversation", "conversationID",
|
|
|
|
|
conversationID)
|
|
|
|
|
userIDs, err := och.groupRpcClient.GetGroupMemberIDs(ctx, storageList[0].GroupID)
|
|
|
|
|
userIDs, err := och.groupRpcClient.GetGroupMemberIDs(ctx, msg.GroupID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.ZWarn(ctx, "get group member ids error", err, "conversationID",
|
|
|
|
|
conversationID)
|
|
|
|
|
} else {
|
|
|
|
|
if err := och.conversationRpcClient.GroupChatFirstCreateConversation(ctx,
|
|
|
|
|
storageList[0].GroupID, userIDs); err != nil {
|
|
|
|
|
msg.GroupID, userIDs); err != nil {
|
|
|
|
|
log.ZWarn(ctx, "single chat first create conversation error", err,
|
|
|
|
|
"conversationID", conversationID)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
case constant.SingleChatType, constant.NotificationChatType:
|
|
|
|
|
if err := och.conversationRpcClient.SingleChatFirstCreateConversation(ctx, storageList[0].RecvID,
|
|
|
|
|
storageList[0].SendID, conversationID, storageList[0].SessionType); err != nil {
|
|
|
|
|
if err := och.conversationRpcClient.SingleChatFirstCreateConversation(ctx, msg.RecvID,
|
|
|
|
|
msg.SendID, conversationID, msg.SessionType); err != nil {
|
|
|
|
|
log.ZWarn(ctx, "single chat or notification first create conversation error", err,
|
|
|
|
|
"conversationID", conversationID, "sessionType", storageList[0].SessionType)
|
|
|
|
|
"conversationID", conversationID, "sessionType", msg.SessionType)
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
log.ZWarn(ctx, "unknown session type", nil, "sessionType",
|
|
|
|
|
storageList[0].SessionType)
|
|
|
|
|
msg.SessionType)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.ZDebug(ctx, "success incr to next topic")
|
|
|
|
|
err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageList, lastSeq)
|
|
|
|
|
err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageMessageList, lastSeq)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.ZError(ctx, "MsgToMongoMQ error", err)
|
|
|
|
|
log.ZError(ctx, "Msg To MongoDB MQ error", err, "conversationID",
|
|
|
|
|
conversationID, "storageList", storageMessageList, "lastSeq", lastSeq)
|
|
|
|
|
}
|
|
|
|
|
och.toPushTopic(ctx, key, conversationID, storageList)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
|
|
|
|
|
for {
|
|
|
|
|
aggregationMsgs := make(map[string][]*ContextMsg, ChannelNum)
|
|
|
|
|
select {
|
|
|
|
|
case cmd := <-och.msgDistributionCh:
|
|
|
|
|
switch cmd.Cmd {
|
|
|
|
|
case ConsumerMsgs:
|
|
|
|
|
triggerChannelValue := cmd.Value.(TriggerChannelValue)
|
|
|
|
|
ctx := triggerChannelValue.ctx
|
|
|
|
|
consumerMessages := triggerChannelValue.cMsgList
|
|
|
|
|
// Aggregation map[userid]message list
|
|
|
|
|
log.ZDebug(ctx, "batch messages come to distribution center", "length", len(consumerMessages))
|
|
|
|
|
for i := 0; i < len(consumerMessages); i++ {
|
|
|
|
|
ctxMsg := &ContextMsg{}
|
|
|
|
|
msgFromMQ := &sdkws.MsgData{}
|
|
|
|
|
err := proto.Unmarshal(consumerMessages[i].Value, msgFromMQ)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.ZError(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value))
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
var arr []string
|
|
|
|
|
for i, header := range consumerMessages[i].Headers {
|
|
|
|
|
arr = append(arr, strconv.Itoa(i), string(header.Key), string(header.Value))
|
|
|
|
|
}
|
|
|
|
|
log.ZInfo(ctx, "consumer.kafka.GetContextWithMQHeader", "len", len(consumerMessages[i].Headers),
|
|
|
|
|
"header", strings.Join(arr, ", "))
|
|
|
|
|
ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers)
|
|
|
|
|
ctxMsg.message = msgFromMQ
|
|
|
|
|
log.ZDebug(
|
|
|
|
|
ctx,
|
|
|
|
|
"single msg come to distribution center",
|
|
|
|
|
"message",
|
|
|
|
|
msgFromMQ,
|
|
|
|
|
"key",
|
|
|
|
|
string(consumerMessages[i].Key),
|
|
|
|
|
)
|
|
|
|
|
// aggregationMsgs[string(consumerMessages[i].Key)] =
|
|
|
|
|
// append(aggregationMsgs[string(consumerMessages[i].Key)], ctxMsg)
|
|
|
|
|
if oldM, ok := aggregationMsgs[string(consumerMessages[i].Key)]; ok {
|
|
|
|
|
oldM = append(oldM, ctxMsg)
|
|
|
|
|
aggregationMsgs[string(consumerMessages[i].Key)] = oldM
|
|
|
|
|
} else {
|
|
|
|
|
m := make([]*ContextMsg, 0, 100)
|
|
|
|
|
m = append(m, ctxMsg)
|
|
|
|
|
aggregationMsgs[string(consumerMessages[i].Key)] = m
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
log.ZDebug(ctx, "generate map list users len", "length", len(aggregationMsgs))
|
|
|
|
|
for uniqueKey, v := range aggregationMsgs {
|
|
|
|
|
if len(v) >= 0 {
|
|
|
|
|
hashCode := stringutil.GetHashCode(uniqueKey)
|
|
|
|
|
channelID := hashCode % ChannelNum
|
|
|
|
|
newCtx := withAggregationCtx(ctx, v)
|
|
|
|
|
log.ZDebug(
|
|
|
|
|
newCtx,
|
|
|
|
|
"generate channelID",
|
|
|
|
|
"hashCode",
|
|
|
|
|
hashCode,
|
|
|
|
|
"channelID",
|
|
|
|
|
channelID,
|
|
|
|
|
"uniqueKey",
|
|
|
|
|
uniqueKey,
|
|
|
|
|
)
|
|
|
|
|
och.chArrays[channelID] <- Cmd2Value{Cmd: SourceMessages, Value: MsgChannelValue{uniqueKey: uniqueKey, ctxMsgList: v, ctx: newCtx}}
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Context, key, conversationID string,
|
|
|
|
|
storageList, notStorageList []*ContextMsg) {
|
|
|
|
|
och.toPushTopic(ctx, key, conversationID, notStorageList)
|
|
|
|
|
var storageMessageList []*sdkws.MsgData
|
|
|
|
|
for _, msg := range storageList {
|
|
|
|
|
storageMessageList = append(storageMessageList, msg.message)
|
|
|
|
|
}
|
|
|
|
|
if len(storageMessageList) > 0 {
|
|
|
|
|
lastSeq, _, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.ZError(ctx, "notification batch insert to redis error", err, "conversationID", conversationID,
|
|
|
|
|
"storageList", storageMessageList)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
log.ZDebug(ctx, "success to next topic", "conversationID", conversationID)
|
|
|
|
|
err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageMessageList, lastSeq)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.ZError(ctx, "Msg To MongoDB MQ error", err, "conversationID",
|
|
|
|
|
conversationID, "storageList", storageMessageList, "lastSeq", lastSeq)
|
|
|
|
|
}
|
|
|
|
|
och.toPushTopic(ctx, key, conversationID, storageList)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(_ context.Context, key, conversationID string, msgs []*ContextMsg) {
|
|
|
|
|
for _, v := range msgs {
|
|
|
|
|
och.msgDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -377,106 +278,30 @@ func (och *OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSess
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(
|
|
|
|
|
sess sarama.ConsumerGroupSession,
|
|
|
|
|
claim sarama.ConsumerGroupClaim,
|
|
|
|
|
) error { // a instance in the consumer group
|
|
|
|
|
for {
|
|
|
|
|
if sess == nil {
|
|
|
|
|
log.ZWarn(context.Background(), "sess == nil, waiting", nil)
|
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
|
} else {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
|
|
|
|
|
claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
|
|
|
|
|
log.ZInfo(context.Background(), "online new session msg come", "highWaterMarkOffset",
|
|
|
|
|
claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition())
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
split = 1000
|
|
|
|
|
rwLock = new(sync.RWMutex)
|
|
|
|
|
messages = make([]*sarama.ConsumerMessage, 0, 1000)
|
|
|
|
|
ticker = time.NewTicker(time.Millisecond * 100)
|
|
|
|
|
|
|
|
|
|
wg = sync.WaitGroup{}
|
|
|
|
|
running = new(atomic.Bool)
|
|
|
|
|
)
|
|
|
|
|
running.Store(true)
|
|
|
|
|
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
go func() {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ticker.C:
|
|
|
|
|
// if the buffer is empty and running is false, return loop.
|
|
|
|
|
if len(messages) == 0 {
|
|
|
|
|
if !running.Load() {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
rwLock.Lock()
|
|
|
|
|
buffer := make([]*sarama.ConsumerMessage, 0, len(messages))
|
|
|
|
|
buffer = append(buffer, messages...)
|
|
|
|
|
|
|
|
|
|
// reuse slice, set cap to 0
|
|
|
|
|
messages = messages[:0]
|
|
|
|
|
rwLock.Unlock()
|
|
|
|
|
|
|
|
|
|
start := time.Now()
|
|
|
|
|
ctx := mcontext.WithTriggerIDContext(context.Background(), idutil.OperationIDGenerator())
|
|
|
|
|
log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(buffer))
|
|
|
|
|
for i := 0; i < len(buffer)/split; i++ {
|
|
|
|
|
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
|
|
|
|
|
ctx: ctx, cMsgList: buffer[i*split : (i+1)*split],
|
|
|
|
|
}}
|
|
|
|
|
}
|
|
|
|
|
if (len(buffer) % split) > 0 {
|
|
|
|
|
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
|
|
|
|
|
ctx: ctx, cMsgList: buffer[split*(len(buffer)/split):],
|
|
|
|
|
}}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.ZDebug(ctx, "timer trigger msg consumer end",
|
|
|
|
|
"length", len(buffer), "time_cost", time.Since(start),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
och.redisMessageBatches.OnComplete = func(lastMessage *sarama.ConsumerMessage, totalCount int) {
|
|
|
|
|
session.MarkMessage(lastMessage, "")
|
|
|
|
|
session.Commit()
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
go func() {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
|
|
|
|
|
for running.Load() {
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case msg, ok := <-claim.Messages():
|
|
|
|
|
if !ok {
|
|
|
|
|
running.Store(false)
|
|
|
|
|
return
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(msg.Value) == 0 {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
rwLock.Lock()
|
|
|
|
|
messages = append(messages, msg)
|
|
|
|
|
rwLock.Unlock()
|
|
|
|
|
|
|
|
|
|
sess.MarkMessage(msg, "")
|
|
|
|
|
|
|
|
|
|
case <-sess.Context().Done():
|
|
|
|
|
running.Store(false)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
err := och.redisMessageBatches.Put(context.Background(), msg)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.ZWarn(context.Background(), "put msg to error", err, "msg", msg)
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
wg.Wait()
|
|
|
|
|
case <-session.Context().Done():
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|