From d7401e085d0b1a60cb859d44a2a8dd01ada39e86 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 26 Apr 2023 18:57:41 +0800 Subject: [PATCH] redis consumer split notification 2 msg --- .../msgtransfer/online_history_msg_handler.go | 143 ++++++++++-------- .../online_msg_to_mongo_handler.go | 3 +- pkg/common/config/config.go | 14 +- pkg/common/constant/constant.go | 7 + pkg/common/db/cache/redis.go | 19 ++- pkg/common/db/controller/msg.go | 24 ++- pkg/rpcclient/msg.go | 3 +- pkg/utils/options.go | 39 +++-- 8 files changed, 160 insertions(+), 92 deletions(-) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 1d2455755..2401de4d5 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -26,7 +26,6 @@ type MsgChannelValue struct { aggregationID string //maybe userID or super groupID ctx context.Context ctxMsgList []*ContextMsg - lastSeq uint64 } type TriggerChannelValue struct { @@ -53,10 +52,6 @@ type OnlineHistoryRedisConsumerHandler struct { singleMsgSuccessCountMutex sync.Mutex singleMsgFailedCountMutex sync.Mutex - //producerToPush *kafka.Producer - //producerToModify *kafka.Producer - //producerToMongo *kafka.Producer - msgDatabase controller.MsgDatabase } @@ -69,9 +64,6 @@ func NewOnlineHistoryRedisConsumerHandler(database controller.MsgDatabase) *Onli och.chArrays[i] = make(chan Cmd2Value, 50) go och.Run(i) } - //och.producerToPush = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic) - //och.producerToModify = kafka.NewKafkaProducer(config.Config.Kafka.MsgToModify.Addr, config.Config.Kafka.MsgToModify.Topic) - //och.producerToMongo = kafka.NewKafkaProducer(config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.MsgToMongo.Topic) och.historyConsumerGroup = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis) @@ -89,64 +81,97 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { ctxMsgList := msgChannelValue.ctxMsgList ctx := msgChannelValue.ctx storageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) - storagePushMsgList := make([]*ContextMsg, 0, 80) - notStoragePushMsgList := make([]*ContextMsg, 0, 80) + notStorageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) + storageNotificationList := make([]*pbMsg.MsgDataToMQ, 0, 80) + notStorageNotificationList := make([]*pbMsg.MsgDataToMQ, 0, 80) + modifyMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "aggregationID", msgChannelValue.aggregationID) - var modifyMsgList []*pbMsg.MsgDataToMQ - //ctx := mcontext.NewCtx("redis consumer") - //mcontext.SetOperationID(ctx, triggerID) - for _, v := range ctxMsgList { - log.ZDebug(ctx, "msg come to storage center", "message", v.message.String()) - isHistory := utils.GetSwitchFromOptions(v.message.MsgData.Options, constant.IsHistory) - isSenderSync := utils.GetSwitchFromOptions(v.message.MsgData.Options, constant.IsSenderSync) - if isHistory { - storageMsgList = append(storageMsgList, v.message) - storagePushMsgList = append(storagePushMsgList, v) - } else { - if !(!isSenderSync && msgChannelValue.aggregationID == v.message.MsgData.SendID) { - notStoragePushMsgList = append(notStoragePushMsgList, v) - } - } - if v.message.MsgData.ContentType == constant.ReactionMessageModifier || v.message.MsgData.ContentType == constant.ReactionMessageDeleter { - modifyMsgList = append(modifyMsgList, v.message) - } - } - if len(modifyMsgList) > 0 { - och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.aggregationID, "", modifyMsgList) - } - log.ZDebug(ctx, "msg storage length", "storageMsgList", len(storageMsgList), "push length", len(notStoragePushMsgList)) - if len(storageMsgList) > 0 { - lastSeq, err := och.msgDatabase.BatchInsertChat2Cache(ctx, msgChannelValue.aggregationID, storageMsgList) - if err != nil { - log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageMsgList) - och.singleMsgFailedCountMutex.Lock() - och.singleMsgFailedCount += uint64(len(storageMsgList)) - och.singleMsgFailedCountMutex.Unlock() - } else { - och.singleMsgSuccessCountMutex.Lock() - och.singleMsgSuccessCount += uint64(len(storageMsgList)) - och.singleMsgSuccessCountMutex.Unlock() - och.msgDatabase.MsgToMongoMQ(ctx, msgChannelValue.aggregationID, "", storageMsgList, lastSeq) - for _, v := range storagePushMsgList { - och.msgDatabase.MsgToPushMQ(v.ctx, msgChannelValue.aggregationID, v.message) - } - for _, v := range notStoragePushMsgList { - och.msgDatabase.MsgToPushMQ(v.ctx, msgChannelValue.aggregationID, v.message) - } - } - } else { - for _, v := range notStoragePushMsgList { - p, o, err := och.msgDatabase.MsgToPushMQ(v.ctx, msgChannelValue.aggregationID, v.message) - if err != nil { - log.ZError(v.ctx, "kafka send failed", err, "msg", v.message.String(), "pid", p, "offset", o) - } - } + storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList = och.getPushStorageMsgList(msgChannelValue.aggregationID, ctxMsgList) + och.handleMsg(ctx, msgChannelValue.aggregationID, storageMsgList, notStorageMsgList) + och.handleNotification(ctx, msgChannelValue.aggregationID, storageNotificationList, notStorageNotificationList) + if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.aggregationID, modifyMsgList); err != nil { + log.ZError(ctx, "msg to modify mq error", err, "aggregationID", msgChannelValue.aggregationID, "modifyMsgList", modifyMsgList) } } } } } +// 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表, +func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(aggregationID string, totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*pbMsg.MsgDataToMQ) { + isStorage := func(msg *pbMsg.MsgDataToMQ) bool { + options2 := utils.Options(msg.MsgData.Options) + if options2.IsHistory() { + return true + } else { + if !(!options2.IsSenderSync() && aggregationID == msg.MsgData.SendID) { + return false + } + } + return false + } + for _, v := range totalMsgs { + options := utils.Options(v.message.MsgData.Options) + if options.IsNotification() { + // 原通知 + notificationMsg := proto.Clone(v.message).(*pbMsg.MsgDataToMQ) + if options.IsSendMsg() { + // 消息 + v.message.MsgData.Options = utils.WithOptions(utils.Options(v.message.MsgData.Options), utils.WithNotification(false)) + storageMsgList = append(storageMsgList, v.message) + } + if isStorage(notificationMsg) { + storageNotificatoinList = append(storageNotificatoinList, notificationMsg) + } else { + notStorageNotificationList = append(notStorageNotificationList, notificationMsg) + } + } else { + if isStorage(v.message) { + storageMsgList = append(storageMsgList, v.message) + } else { + notStorageMsgList = append(notStorageMsgList, v.message) + } + } + if v.message.MsgData.ContentType == constant.ReactionMessageModifier || v.message.MsgData.ContentType == constant.ReactionMessageDeleter { + modifyMsgList = append(modifyMsgList, v.message) + } + } + return +} + +func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, aggregationID string, storageList, notStorageList []*pbMsg.MsgDataToMQ) { + och.handle(ctx, aggregationID, storageList, notStorageList, och.msgDatabase.BatchInsertChat2Cache) +} + +func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Context, aggregationID string, storageList, notStorageList []*pbMsg.MsgDataToMQ) { + och.handle(ctx, aggregationID, storageList, notStorageList, och.msgDatabase.NotificationBatchInsertChat2Cache) +} + +func (och *OnlineHistoryRedisConsumerHandler) handle(ctx context.Context, aggregationID string, storageList, notStorageList []*pbMsg.MsgDataToMQ, cacheAndIncr func(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error)) { + if len(storageList) > 0 { + lastSeq, err := cacheAndIncr(ctx, aggregationID, storageList) + if err != nil { + log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageList) + och.singleMsgFailedCountMutex.Lock() + och.singleMsgFailedCount += uint64(len(storageList)) + och.singleMsgFailedCountMutex.Unlock() + } else { + och.singleMsgSuccessCountMutex.Lock() + och.singleMsgSuccessCount += uint64(len(storageList)) + och.singleMsgSuccessCountMutex.Unlock() + och.msgDatabase.MsgToMongoMQ(ctx, aggregationID, storageList, lastSeq) + for _, v := range storageList { + och.msgDatabase.MsgToPushMQ(ctx, aggregationID, v) + } + } + } + if len(notStorageList) > 0 { + for _, v := range notStorageList { + och.msgDatabase.MsgToPushMQ(ctx, aggregationID, v) + } + } +} + func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { for { aggregationMsgs := make(map[string][]*ContextMsg, ChannelNum) diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index d0f4ff001..cec3a5744 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -2,6 +2,7 @@ package msgtransfer import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller" @@ -40,12 +41,10 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont return } log.Info(operationID, "BatchInsertChat2DB userID: ", msgFromMQ.AggregationID, "msgFromMQ.LastSeq: ", msgFromMQ.LastSeq) - //err = db.DB.BatchInsertChat2DB(msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.TriggerID, msgFromMQ.LastSeq) err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.AggregationID, msgFromMQ.Messages, msgFromMQ.LastSeq) if err != nil { log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.Messages, msgFromMQ.AggregationID, msgFromMQ.TriggerID) } - //err = db.DB.DeleteMessageFromCache(msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.GetTriggerID()) err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.AggregationID, msgFromMQ.Messages) if err != nil { log.NewError(operationID, "remove cache msg from redis err", err.Error(), msgFromMQ.Messages, msgFromMQ.AggregationID, msgFromMQ.TriggerID) diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 1ac97cc51..7488f68fb 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -7,6 +7,7 @@ import ( "path/filepath" "runtime" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" @@ -41,9 +42,10 @@ type CallBackConfig struct { } type NotificationConf struct { - IsSendMsg bool `yaml:"isSendMsg"` - UnreadCount bool `yaml:"unreadCount"` - OfflinePush POfflinePush `yaml:"offlinePush"` + IsSendMsg bool `yaml:"isSendMsg"` + ReliabilityLevel int `yaml:"reliabilityLevel"` // 1 online 2 presistent + UnreadCount bool `yaml:"unreadCount"` + OfflinePush POfflinePush `yaml:"offlinePush"` } type POfflinePush struct { @@ -361,6 +363,12 @@ func GetOptionsByNotification(cfg NotificationConf) utils.Options { if cfg.OfflinePush.Enable { opts = utils.WithOptions(opts, utils.WithOfflinePush()) } + switch cfg.ReliabilityLevel { + case constant.UnreliableNotification: + case constant.ReliableNotificationNoMsg: + opts = utils.WithOptions(opts, utils.WithHistory(true), utils.WithPersistent()) + } + opts = utils.WithOptions(opts, utils.WithSendMsg(cfg.IsSendMsg)) return opts } diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index d6efe31b3..3770f4fdd 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -143,6 +143,7 @@ const ( IsSenderNotificationPush = "senderNotificationPush" IsReactionFromCache = "reactionFromCache" IsNotification = "isNotification" + IsSendMsg = "isSendMsg" //GroupStatus GroupOk = 0 @@ -215,6 +216,12 @@ const ( ReadDiffusion = 1 ) +const ( + UnreliableNotification = 1 + ReliableNotificationNoMsg = 2 + ReliableNotificationMsg = 3 +) + const ( AtAllString = "AtAllTag" AtNormal = 0 diff --git a/pkg/common/db/cache/redis.go b/pkg/common/db/cache/redis.go index b15d4d603..a9ff38061 100644 --- a/pkg/common/db/cache/redis.go +++ b/pkg/common/db/cache/redis.go @@ -213,11 +213,19 @@ func (c *cache) DeleteTokenByUidPid(ctx context.Context, userID string, platform return utils.Wrap1(c.rdb.HDel(ctx, key, fields...).Err()) } +func (c *cache) getMessageCacheKey(sourceID string, seq int64) string { + return messageCache + sourceID + "_" + strconv.Itoa(int(seq)) +} + +func (c *cache) allMessageCacheKey(sourceID string) string { + return messageCache + sourceID + "_*" +} + func (c *cache) GetMessagesBySeq(ctx context.Context, userID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { pipe := c.rdb.Pipeline() for _, v := range seqs { //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 - key := messageCache + userID + "_" + strconv.Itoa(int(v)) + key := c.getMessageCacheKey(userID, v) if err := pipe.Get(ctx, key).Err(); err != nil && err != redis.Nil { return nil, nil, err } @@ -243,7 +251,7 @@ func (c *cache) SetMessageToCache(ctx context.Context, userID string, msgList [] pipe := c.rdb.Pipeline() var failedMsgs []pbMsg.MsgDataToMQ for _, msg := range msgList { - key := messageCache + userID + "_" + strconv.Itoa(int(msg.MsgData.Seq)) + key := c.getMessageCacheKey(userID, msg.MsgData.Seq) s, err := utils.Pb2String(msg.MsgData) if err != nil { return 0, utils.Wrap1(err) @@ -263,7 +271,7 @@ func (c *cache) SetMessageToCache(ctx context.Context, userID string, msgList [] func (c *cache) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ) error { pipe := c.rdb.Pipeline() for _, v := range msgList { - if err := pipe.Del(ctx, messageCache+userID+"_"+strconv.Itoa(int(v.MsgData.Seq))).Err(); err != nil { + if err := pipe.Del(ctx, c.getMessageCacheKey(userID, v.MsgData.Seq)).Err(); err != nil { return utils.Wrap1(err) } } @@ -272,8 +280,7 @@ func (c *cache) DeleteMessageFromCache(ctx context.Context, userID string, msgLi } func (c *cache) CleanUpOneUserAllMsg(ctx context.Context, userID string) error { - key := messageCache + userID + "_" + "*" - vals, err := c.rdb.Keys(ctx, key).Result() + vals, err := c.rdb.Keys(ctx, c.allMessageCacheKey(userID)).Result() if err == redis.Nil { return nil } @@ -381,7 +388,7 @@ func (c *cache) DelUserSignalList(ctx context.Context, userID string) error { func (c *cache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error { for _, seq := range seqs { - key := messageCache + userID + "_" + strconv.Itoa(int(seq)) + key := c.getMessageCacheKey(userID, seq) result, err := c.rdb.Get(ctx, key).Result() if err != nil { if err == redis.Nil { diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index b4d0bd6a9..b2ccdf115 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -35,6 +35,8 @@ type MsgDatabase interface { DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error // incrSeq然后批量插入缓存 BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) + // incrSeq通知seq然后批量插入缓存 + NotificationBatchInsertChat2Cache(ctx context.Context, sourceID string, msgs []*pbMsg.MsgDataToMQ) (int64, error) // 删除消息 返回不存在的seqList DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) // 获取群ID或者UserID最新一条在mongo里面的消息 @@ -78,9 +80,9 @@ type MsgDatabase interface { GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) MsgToMQ(ctx context.Context, key string, msg2mq *pbMsg.MsgDataToMQ) error - MsgToModifyMQ(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ) error + MsgToModifyMQ(ctx context.Context, aggregationID string, messages []*pbMsg.MsgDataToMQ) error MsgToPushMQ(ctx context.Context, sourceID string, msg2mq *pbMsg.MsgDataToMQ) (int32, int64, error) - MsgToMongoMQ(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error + MsgToMongoMQ(ctx context.Context, aggregationID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error } func NewMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.Model) MsgDatabase { @@ -187,9 +189,9 @@ func (db *msgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *pbMsg.Ms return err } -func (db *msgDatabase) MsgToModifyMQ(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ) error { +func (db *msgDatabase) MsgToModifyMQ(ctx context.Context, aggregationID string, messages []*pbMsg.MsgDataToMQ) error { if len(messages) > 0 { - _, _, err := db.producerToModify.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToModifyByMQ{AggregationID: aggregationID, Messages: messages, TriggerID: triggerID}) + _, _, err := db.producerToModify.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToModifyByMQ{AggregationID: aggregationID, Messages: messages}) return err } return nil @@ -197,12 +199,16 @@ func (db *msgDatabase) MsgToModifyMQ(ctx context.Context, aggregationID string, func (db *msgDatabase) MsgToPushMQ(ctx context.Context, key string, msg2mq *pbMsg.MsgDataToMQ) (int32, int64, error) { mqPushMsg := pbMsg.PushMsgDataToMQ{MsgData: msg2mq.MsgData, SourceID: key} - return db.producerToPush.SendMessage(ctx, key, &mqPushMsg) + partition, offset, err := db.producerToPush.SendMessage(ctx, key, &mqPushMsg) + if err != nil { + log.ZError(ctx, "MsgToPushMQ", err, "key", key, "msg2mq", msg2mq) + } + return partition, offset, err } -func (db *msgDatabase) MsgToMongoMQ(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error { +func (db *msgDatabase) MsgToMongoMQ(ctx context.Context, aggregationID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error { if len(messages) > 0 { - _, _, err := db.producerToModify.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, Messages: messages, TriggerID: triggerID}) + _, _, err := db.producerToModify.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, Messages: messages}) return err } return nil @@ -311,6 +317,10 @@ func (db *msgDatabase) DeleteMessageFromCache(ctx context.Context, userID string return db.cache.DeleteMessageFromCache(ctx, userID, msgs) } +func (db *msgDatabase) NotificationBatchInsertChat2Cache(ctx context.Context, sourceID string, msgs []*pbMsg.MsgDataToMQ) (int64, error) { + return 0, nil +} + func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) { //newTime := utils.GetCurrentTimestampByMill() lenList := len(msgList) diff --git a/pkg/rpcclient/msg.go b/pkg/rpcclient/msg.go index 47958518a..d9f36802f 100644 --- a/pkg/rpcclient/msg.go +++ b/pkg/rpcclient/msg.go @@ -3,6 +3,7 @@ package rpcclient import ( "context" "encoding/json" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" @@ -95,7 +96,7 @@ func (m *MsgClient) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMes func (c *MsgClient) Notification(ctx context.Context, sendID, recvID string, contentType, sessionType int32, m proto.Message, opts ...utils.OptionsOpt) error { content, err := json.Marshal(m) if err != nil { - log.ZError(ctx, "MsgClient Notification json.Marshal failed", err) + log.ZError(ctx, "MsgClient Notification json.Marshal failed", err, "sendID", sendID, "recvID", recvID, "contentType", contentType, "sessionType", sessionType, "m", m) return err } var req msg.SendMsgReq diff --git a/pkg/utils/options.go b/pkg/utils/options.go index 5a551e4f4..be889e09e 100644 --- a/pkg/utils/options.go +++ b/pkg/utils/options.go @@ -8,6 +8,7 @@ type OptionsOpt func(Options) func NewOptions(opts ...OptionsOpt) Options { options := make(map[string]bool, 11) options[constant.IsNotification] = false + options[constant.IsSendMsg] = false options[constant.IsHistory] = false options[constant.IsPersistent] = false options[constant.IsOfflinePush] = false @@ -31,15 +32,21 @@ func WithOptions(options Options, opts ...OptionsOpt) Options { return options } -func WithNotification() OptionsOpt { +func WithNotification(b bool) OptionsOpt { return func(options Options) { - options[constant.IsNotification] = true + options[constant.IsNotification] = b } } -func WithHistory() OptionsOpt { +func WithSendMsg(b bool) OptionsOpt { return func(options Options) { - options[constant.IsHistory] = true + options[constant.IsSendMsg] = b + } +} + +func WithHistory(b bool) OptionsOpt { + return func(options Options) { + options[constant.IsHistory] = b } } @@ -109,42 +116,46 @@ func (o Options) IsNotification() bool { return o.Is(constant.IsNotification) } -func (o Options) IsHistory(options Options) bool { +func (o Options) IsSendMsg() bool { + return o.Is(constant.IsSendMsg) +} + +func (o Options) IsHistory() bool { return o.Is(constant.IsHistory) } -func (o Options) IsPersistent(options Options) bool { +func (o Options) IsPersistent() bool { return o.Is(constant.IsPersistent) } -func (o Options) IsOfflinePush(options Options) bool { +func (o Options) IsOfflinePush() bool { return o.Is(constant.IsOfflinePush) } -func (o Options) IsUnreadCount(options Options) bool { +func (o Options) IsUnreadCount() bool { return o.Is(constant.IsUnreadCount) } -func (o Options) IsConversationUpdate(options Options) bool { +func (o Options) IsConversationUpdate() bool { return o.Is(constant.IsConversationUpdate) } -func (o Options) IsSenderSync(options Options) bool { +func (o Options) IsSenderSync() bool { return o.Is(constant.IsSenderSync) } -func (o Options) IsNotPrivate(options Options) bool { +func (o Options) IsNotPrivate() bool { return o.Is(constant.IsNotPrivate) } -func (o Options) IsSenderConversationUpdate(options Options) bool { +func (o Options) IsSenderConversationUpdate() bool { return o.Is(constant.IsSenderConversationUpdate) } -func (o Options) IsSenderNotificationPush(options Options) bool { +func (o Options) IsSenderNotificationPush() bool { return o.Is(constant.IsSenderNotificationPush) } -func (o Options) IsReactionFromCache(options Options) bool { +func (o Options) IsReactionFromCache() bool { return o.Is(constant.IsReactionFromCache) }