redis consumer split notification 2 msg

test-errcode
wangchuxiao 2 years ago
parent cf39ea7364
commit d7401e085d

@ -26,7 +26,6 @@ type MsgChannelValue struct {
aggregationID string //maybe userID or super groupID
ctx context.Context
ctxMsgList []*ContextMsg
lastSeq uint64
}
type TriggerChannelValue struct {
@ -53,10 +52,6 @@ type OnlineHistoryRedisConsumerHandler struct {
singleMsgSuccessCountMutex sync.Mutex
singleMsgFailedCountMutex sync.Mutex
//producerToPush *kafka.Producer
//producerToModify *kafka.Producer
//producerToMongo *kafka.Producer
msgDatabase controller.MsgDatabase
}
@ -69,9 +64,6 @@ func NewOnlineHistoryRedisConsumerHandler(database controller.MsgDatabase) *Onli
och.chArrays[i] = make(chan Cmd2Value, 50)
go och.Run(i)
}
//och.producerToPush = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic)
//och.producerToModify = kafka.NewKafkaProducer(config.Config.Kafka.MsgToModify.Addr, config.Config.Kafka.MsgToModify.Topic)
//och.producerToMongo = kafka.NewKafkaProducer(config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.MsgToMongo.Topic)
och.historyConsumerGroup = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis)
@ -89,60 +81,93 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
ctxMsgList := msgChannelValue.ctxMsgList
ctx := msgChannelValue.ctx
storageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
storagePushMsgList := make([]*ContextMsg, 0, 80)
notStoragePushMsgList := make([]*ContextMsg, 0, 80)
notStorageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
storageNotificationList := make([]*pbMsg.MsgDataToMQ, 0, 80)
notStorageNotificationList := make([]*pbMsg.MsgDataToMQ, 0, 80)
modifyMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "aggregationID", msgChannelValue.aggregationID)
var modifyMsgList []*pbMsg.MsgDataToMQ
//ctx := mcontext.NewCtx("redis consumer")
//mcontext.SetOperationID(ctx, triggerID)
for _, v := range ctxMsgList {
log.ZDebug(ctx, "msg come to storage center", "message", v.message.String())
isHistory := utils.GetSwitchFromOptions(v.message.MsgData.Options, constant.IsHistory)
isSenderSync := utils.GetSwitchFromOptions(v.message.MsgData.Options, constant.IsSenderSync)
if isHistory {
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList = och.getPushStorageMsgList(msgChannelValue.aggregationID, ctxMsgList)
och.handleMsg(ctx, msgChannelValue.aggregationID, storageMsgList, notStorageMsgList)
och.handleNotification(ctx, msgChannelValue.aggregationID, storageNotificationList, notStorageNotificationList)
if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.aggregationID, modifyMsgList); err != nil {
log.ZError(ctx, "msg to modify mq error", err, "aggregationID", msgChannelValue.aggregationID, "modifyMsgList", modifyMsgList)
}
}
}
}
}
// 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表,
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(aggregationID string, totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*pbMsg.MsgDataToMQ) {
isStorage := func(msg *pbMsg.MsgDataToMQ) bool {
options2 := utils.Options(msg.MsgData.Options)
if options2.IsHistory() {
return true
} else {
if !(!options2.IsSenderSync() && aggregationID == msg.MsgData.SendID) {
return false
}
}
return false
}
for _, v := range totalMsgs {
options := utils.Options(v.message.MsgData.Options)
if options.IsNotification() {
// 原通知
notificationMsg := proto.Clone(v.message).(*pbMsg.MsgDataToMQ)
if options.IsSendMsg() {
// 消息
v.message.MsgData.Options = utils.WithOptions(utils.Options(v.message.MsgData.Options), utils.WithNotification(false))
storageMsgList = append(storageMsgList, v.message)
}
if isStorage(notificationMsg) {
storageNotificatoinList = append(storageNotificatoinList, notificationMsg)
} else {
notStorageNotificationList = append(notStorageNotificationList, notificationMsg)
}
} else {
if isStorage(v.message) {
storageMsgList = append(storageMsgList, v.message)
storagePushMsgList = append(storagePushMsgList, v)
} else {
if !(!isSenderSync && msgChannelValue.aggregationID == v.message.MsgData.SendID) {
notStoragePushMsgList = append(notStoragePushMsgList, v)
notStorageMsgList = append(notStorageMsgList, v.message)
}
}
if v.message.MsgData.ContentType == constant.ReactionMessageModifier || v.message.MsgData.ContentType == constant.ReactionMessageDeleter {
modifyMsgList = append(modifyMsgList, v.message)
}
}
if len(modifyMsgList) > 0 {
och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.aggregationID, "", modifyMsgList)
return
}
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, aggregationID string, storageList, notStorageList []*pbMsg.MsgDataToMQ) {
och.handle(ctx, aggregationID, storageList, notStorageList, och.msgDatabase.BatchInsertChat2Cache)
}
func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Context, aggregationID string, storageList, notStorageList []*pbMsg.MsgDataToMQ) {
och.handle(ctx, aggregationID, storageList, notStorageList, och.msgDatabase.NotificationBatchInsertChat2Cache)
}
log.ZDebug(ctx, "msg storage length", "storageMsgList", len(storageMsgList), "push length", len(notStoragePushMsgList))
if len(storageMsgList) > 0 {
lastSeq, err := och.msgDatabase.BatchInsertChat2Cache(ctx, msgChannelValue.aggregationID, storageMsgList)
func (och *OnlineHistoryRedisConsumerHandler) handle(ctx context.Context, aggregationID string, storageList, notStorageList []*pbMsg.MsgDataToMQ, cacheAndIncr func(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error)) {
if len(storageList) > 0 {
lastSeq, err := cacheAndIncr(ctx, aggregationID, storageList)
if err != nil {
log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageMsgList)
log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageList)
och.singleMsgFailedCountMutex.Lock()
och.singleMsgFailedCount += uint64(len(storageMsgList))
och.singleMsgFailedCount += uint64(len(storageList))
och.singleMsgFailedCountMutex.Unlock()
} else {
och.singleMsgSuccessCountMutex.Lock()
och.singleMsgSuccessCount += uint64(len(storageMsgList))
och.singleMsgSuccessCount += uint64(len(storageList))
och.singleMsgSuccessCountMutex.Unlock()
och.msgDatabase.MsgToMongoMQ(ctx, msgChannelValue.aggregationID, "", storageMsgList, lastSeq)
for _, v := range storagePushMsgList {
och.msgDatabase.MsgToPushMQ(v.ctx, msgChannelValue.aggregationID, v.message)
}
for _, v := range notStoragePushMsgList {
och.msgDatabase.MsgToPushMQ(v.ctx, msgChannelValue.aggregationID, v.message)
}
}
} else {
for _, v := range notStoragePushMsgList {
p, o, err := och.msgDatabase.MsgToPushMQ(v.ctx, msgChannelValue.aggregationID, v.message)
if err != nil {
log.ZError(v.ctx, "kafka send failed", err, "msg", v.message.String(), "pid", p, "offset", o)
}
och.msgDatabase.MsgToMongoMQ(ctx, aggregationID, storageList, lastSeq)
for _, v := range storageList {
och.msgDatabase.MsgToPushMQ(ctx, aggregationID, v)
}
}
}
if len(notStorageList) > 0 {
for _, v := range notStorageList {
och.msgDatabase.MsgToPushMQ(ctx, aggregationID, v)
}
}
}

@ -2,6 +2,7 @@ package msgtransfer
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
@ -40,12 +41,10 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
return
}
log.Info(operationID, "BatchInsertChat2DB userID: ", msgFromMQ.AggregationID, "msgFromMQ.LastSeq: ", msgFromMQ.LastSeq)
//err = db.DB.BatchInsertChat2DB(msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.TriggerID, msgFromMQ.LastSeq)
err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.AggregationID, msgFromMQ.Messages, msgFromMQ.LastSeq)
if err != nil {
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.Messages, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
}
//err = db.DB.DeleteMessageFromCache(msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.GetTriggerID())
err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.AggregationID, msgFromMQ.Messages)
if err != nil {
log.NewError(operationID, "remove cache msg from redis err", err.Error(), msgFromMQ.Messages, msgFromMQ.AggregationID, msgFromMQ.TriggerID)

@ -7,6 +7,7 @@ import (
"path/filepath"
"runtime"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
@ -42,6 +43,7 @@ type CallBackConfig struct {
type NotificationConf struct {
IsSendMsg bool `yaml:"isSendMsg"`
ReliabilityLevel int `yaml:"reliabilityLevel"` // 1 online 2 presistent
UnreadCount bool `yaml:"unreadCount"`
OfflinePush POfflinePush `yaml:"offlinePush"`
}
@ -361,6 +363,12 @@ func GetOptionsByNotification(cfg NotificationConf) utils.Options {
if cfg.OfflinePush.Enable {
opts = utils.WithOptions(opts, utils.WithOfflinePush())
}
switch cfg.ReliabilityLevel {
case constant.UnreliableNotification:
case constant.ReliableNotificationNoMsg:
opts = utils.WithOptions(opts, utils.WithHistory(true), utils.WithPersistent())
}
opts = utils.WithOptions(opts, utils.WithSendMsg(cfg.IsSendMsg))
return opts
}

@ -143,6 +143,7 @@ const (
IsSenderNotificationPush = "senderNotificationPush"
IsReactionFromCache = "reactionFromCache"
IsNotification = "isNotification"
IsSendMsg = "isSendMsg"
//GroupStatus
GroupOk = 0
@ -215,6 +216,12 @@ const (
ReadDiffusion = 1
)
const (
UnreliableNotification = 1
ReliableNotificationNoMsg = 2
ReliableNotificationMsg = 3
)
const (
AtAllString = "AtAllTag"
AtNormal = 0

@ -213,11 +213,19 @@ func (c *cache) DeleteTokenByUidPid(ctx context.Context, userID string, platform
return utils.Wrap1(c.rdb.HDel(ctx, key, fields...).Err())
}
func (c *cache) getMessageCacheKey(sourceID string, seq int64) string {
return messageCache + sourceID + "_" + strconv.Itoa(int(seq))
}
func (c *cache) allMessageCacheKey(sourceID string) string {
return messageCache + sourceID + "_*"
}
func (c *cache) GetMessagesBySeq(ctx context.Context, userID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
pipe := c.rdb.Pipeline()
for _, v := range seqs {
//MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
key := messageCache + userID + "_" + strconv.Itoa(int(v))
key := c.getMessageCacheKey(userID, v)
if err := pipe.Get(ctx, key).Err(); err != nil && err != redis.Nil {
return nil, nil, err
}
@ -243,7 +251,7 @@ func (c *cache) SetMessageToCache(ctx context.Context, userID string, msgList []
pipe := c.rdb.Pipeline()
var failedMsgs []pbMsg.MsgDataToMQ
for _, msg := range msgList {
key := messageCache + userID + "_" + strconv.Itoa(int(msg.MsgData.Seq))
key := c.getMessageCacheKey(userID, msg.MsgData.Seq)
s, err := utils.Pb2String(msg.MsgData)
if err != nil {
return 0, utils.Wrap1(err)
@ -263,7 +271,7 @@ func (c *cache) SetMessageToCache(ctx context.Context, userID string, msgList []
func (c *cache) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ) error {
pipe := c.rdb.Pipeline()
for _, v := range msgList {
if err := pipe.Del(ctx, messageCache+userID+"_"+strconv.Itoa(int(v.MsgData.Seq))).Err(); err != nil {
if err := pipe.Del(ctx, c.getMessageCacheKey(userID, v.MsgData.Seq)).Err(); err != nil {
return utils.Wrap1(err)
}
}
@ -272,8 +280,7 @@ func (c *cache) DeleteMessageFromCache(ctx context.Context, userID string, msgLi
}
func (c *cache) CleanUpOneUserAllMsg(ctx context.Context, userID string) error {
key := messageCache + userID + "_" + "*"
vals, err := c.rdb.Keys(ctx, key).Result()
vals, err := c.rdb.Keys(ctx, c.allMessageCacheKey(userID)).Result()
if err == redis.Nil {
return nil
}
@ -381,7 +388,7 @@ func (c *cache) DelUserSignalList(ctx context.Context, userID string) error {
func (c *cache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error {
for _, seq := range seqs {
key := messageCache + userID + "_" + strconv.Itoa(int(seq))
key := c.getMessageCacheKey(userID, seq)
result, err := c.rdb.Get(ctx, key).Result()
if err != nil {
if err == redis.Nil {

@ -35,6 +35,8 @@ type MsgDatabase interface {
DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error
// incrSeq然后批量插入缓存
BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error)
// incrSeq通知seq然后批量插入缓存
NotificationBatchInsertChat2Cache(ctx context.Context, sourceID string, msgs []*pbMsg.MsgDataToMQ) (int64, error)
// 删除消息 返回不存在的seqList
DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error)
// 获取群ID或者UserID最新一条在mongo里面的消息
@ -78,9 +80,9 @@ type MsgDatabase interface {
GetGroupMinSeq(ctx context.Context, groupID string) (int64, error)
MsgToMQ(ctx context.Context, key string, msg2mq *pbMsg.MsgDataToMQ) error
MsgToModifyMQ(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ) error
MsgToModifyMQ(ctx context.Context, aggregationID string, messages []*pbMsg.MsgDataToMQ) error
MsgToPushMQ(ctx context.Context, sourceID string, msg2mq *pbMsg.MsgDataToMQ) (int32, int64, error)
MsgToMongoMQ(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error
MsgToMongoMQ(ctx context.Context, aggregationID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error
}
func NewMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.Model) MsgDatabase {
@ -187,9 +189,9 @@ func (db *msgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *pbMsg.Ms
return err
}
func (db *msgDatabase) MsgToModifyMQ(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ) error {
func (db *msgDatabase) MsgToModifyMQ(ctx context.Context, aggregationID string, messages []*pbMsg.MsgDataToMQ) error {
if len(messages) > 0 {
_, _, err := db.producerToModify.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToModifyByMQ{AggregationID: aggregationID, Messages: messages, TriggerID: triggerID})
_, _, err := db.producerToModify.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToModifyByMQ{AggregationID: aggregationID, Messages: messages})
return err
}
return nil
@ -197,12 +199,16 @@ func (db *msgDatabase) MsgToModifyMQ(ctx context.Context, aggregationID string,
func (db *msgDatabase) MsgToPushMQ(ctx context.Context, key string, msg2mq *pbMsg.MsgDataToMQ) (int32, int64, error) {
mqPushMsg := pbMsg.PushMsgDataToMQ{MsgData: msg2mq.MsgData, SourceID: key}
return db.producerToPush.SendMessage(ctx, key, &mqPushMsg)
partition, offset, err := db.producerToPush.SendMessage(ctx, key, &mqPushMsg)
if err != nil {
log.ZError(ctx, "MsgToPushMQ", err, "key", key, "msg2mq", msg2mq)
}
return partition, offset, err
}
func (db *msgDatabase) MsgToMongoMQ(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error {
func (db *msgDatabase) MsgToMongoMQ(ctx context.Context, aggregationID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error {
if len(messages) > 0 {
_, _, err := db.producerToModify.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, Messages: messages, TriggerID: triggerID})
_, _, err := db.producerToModify.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, Messages: messages})
return err
}
return nil
@ -311,6 +317,10 @@ func (db *msgDatabase) DeleteMessageFromCache(ctx context.Context, userID string
return db.cache.DeleteMessageFromCache(ctx, userID, msgs)
}
func (db *msgDatabase) NotificationBatchInsertChat2Cache(ctx context.Context, sourceID string, msgs []*pbMsg.MsgDataToMQ) (int64, error) {
return 0, nil
}
func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) {
//newTime := utils.GetCurrentTimestampByMill()
lenList := len(msgList)

@ -3,6 +3,7 @@ package rpcclient
import (
"context"
"encoding/json"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
@ -95,7 +96,7 @@ func (m *MsgClient) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMes
func (c *MsgClient) Notification(ctx context.Context, sendID, recvID string, contentType, sessionType int32, m proto.Message, opts ...utils.OptionsOpt) error {
content, err := json.Marshal(m)
if err != nil {
log.ZError(ctx, "MsgClient Notification json.Marshal failed", err)
log.ZError(ctx, "MsgClient Notification json.Marshal failed", err, "sendID", sendID, "recvID", recvID, "contentType", contentType, "sessionType", sessionType, "m", m)
return err
}
var req msg.SendMsgReq

@ -8,6 +8,7 @@ type OptionsOpt func(Options)
func NewOptions(opts ...OptionsOpt) Options {
options := make(map[string]bool, 11)
options[constant.IsNotification] = false
options[constant.IsSendMsg] = false
options[constant.IsHistory] = false
options[constant.IsPersistent] = false
options[constant.IsOfflinePush] = false
@ -31,15 +32,21 @@ func WithOptions(options Options, opts ...OptionsOpt) Options {
return options
}
func WithNotification() OptionsOpt {
func WithNotification(b bool) OptionsOpt {
return func(options Options) {
options[constant.IsNotification] = true
options[constant.IsNotification] = b
}
}
func WithHistory() OptionsOpt {
func WithSendMsg(b bool) OptionsOpt {
return func(options Options) {
options[constant.IsHistory] = true
options[constant.IsSendMsg] = b
}
}
func WithHistory(b bool) OptionsOpt {
return func(options Options) {
options[constant.IsHistory] = b
}
}
@ -109,42 +116,46 @@ func (o Options) IsNotification() bool {
return o.Is(constant.IsNotification)
}
func (o Options) IsHistory(options Options) bool {
func (o Options) IsSendMsg() bool {
return o.Is(constant.IsSendMsg)
}
func (o Options) IsHistory() bool {
return o.Is(constant.IsHistory)
}
func (o Options) IsPersistent(options Options) bool {
func (o Options) IsPersistent() bool {
return o.Is(constant.IsPersistent)
}
func (o Options) IsOfflinePush(options Options) bool {
func (o Options) IsOfflinePush() bool {
return o.Is(constant.IsOfflinePush)
}
func (o Options) IsUnreadCount(options Options) bool {
func (o Options) IsUnreadCount() bool {
return o.Is(constant.IsUnreadCount)
}
func (o Options) IsConversationUpdate(options Options) bool {
func (o Options) IsConversationUpdate() bool {
return o.Is(constant.IsConversationUpdate)
}
func (o Options) IsSenderSync(options Options) bool {
func (o Options) IsSenderSync() bool {
return o.Is(constant.IsSenderSync)
}
func (o Options) IsNotPrivate(options Options) bool {
func (o Options) IsNotPrivate() bool {
return o.Is(constant.IsNotPrivate)
}
func (o Options) IsSenderConversationUpdate(options Options) bool {
func (o Options) IsSenderConversationUpdate() bool {
return o.Is(constant.IsSenderConversationUpdate)
}
func (o Options) IsSenderNotificationPush(options Options) bool {
func (o Options) IsSenderNotificationPush() bool {
return o.Is(constant.IsSenderNotificationPush)
}
func (o Options) IsReactionFromCache(options Options) bool {
func (o Options) IsReactionFromCache() bool {
return o.Is(constant.IsReactionFromCache)
}

Loading…
Cancel
Save