diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index a126bc9d0..f97efdf15 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -120,19 +120,27 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(totalMsgs [] for _, v := range totalMsgs { options := utils.Options(v.message.Options) if !options.IsNotNotification() { - // 原通知 - notificationMsg := proto.Clone(v.message).(*sdkws.MsgData) + // clone msg from notificationMsg if options.IsSendMsg() { + msg := proto.Clone(v.message).(*sdkws.MsgData) // 消息 if v.message.Options != nil { - v.message.Options = utils.WithOptions(utils.Options(v.message.Options), utils.WithNotNotification(true), utils.WithSendMsg(false)) + msg.Options = utils.NewMsgOptions() } - storageMsgList = append(storageMsgList, v.message) + if options.IsOfflinePush() { + v.message.Options = utils.WithOptions(utils.Options(v.message.Options), utils.WithOfflinePush(false)) + msg.Options = utils.WithOptions(utils.Options(msg.Options), utils.WithOfflinePush(true)) + } + if options.IsUnreadCount() { + v.message.Options = utils.WithOptions(utils.Options(v.message.Options), utils.WithUnreadCount(false)) + msg.Options = utils.WithOptions(utils.Options(msg.Options), utils.WithUnreadCount(true)) + } + storageMsgList = append(storageMsgList, msg) } - if isStorage(notificationMsg) { - storageNotificatoinList = append(storageNotificatoinList, notificationMsg) + if isStorage(v.message) { + storageNotificatoinList = append(storageNotificatoinList, v.message) } else { - notStorageNotificationList = append(notStorageNotificationList, notificationMsg) + notStorageNotificationList = append(notStorageNotificationList, v.message) } } else { if isStorage(v.message) { diff --git a/internal/rpc/msg/send_pull.go b/internal/rpc/msg/send_pull.go index 788fe2cd8..b2215ec6c 100644 --- a/internal/rpc/msg/send_pull.go +++ b/internal/rpc/msg/send_pull.go @@ -65,8 +65,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq) var isSend bool = true conversationID := utils.GetConversationIDByMsg(req.MsgData) - - if utils.MsgIsNotification(req.MsgData) { + if !utils.IsNotification(conversationID) { isSend, err = m.modifyMessageByUserMessageReceiveOpt(ctx, req.MsgData.RecvID, conversationID, constant.SingleChatType, req) if err != nil { return nil, err diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 8dadbc0bb..fbada0991 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -358,10 +358,10 @@ type Notification struct { func GetOptionsByNotification(cfg NotificationConf) utils.Options { opts := utils.NewOptions() if cfg.UnreadCount { - opts = utils.WithOptions(opts, utils.WithUnreadCount()) + opts = utils.WithOptions(opts, utils.WithUnreadCount(true)) } if cfg.OfflinePush.Enable { - opts = utils.WithOptions(opts, utils.WithOfflinePush()) + opts = utils.WithOptions(opts, utils.WithOfflinePush(true)) } switch cfg.ReliabilityLevel { case constant.UnreliableNotification: diff --git a/pkg/common/db/cache/meta_cache.go b/pkg/common/db/cache/meta_cache.go index f8087705c..5866b5b82 100644 --- a/pkg/common/db/cache/meta_cache.go +++ b/pkg/common/db/cache/meta_cache.go @@ -16,7 +16,7 @@ import ( const ( scanCount = 3000 maxRetryTimes = 5 - retryInterval = time.Second * 1 + retryInterval = time.Millisecond * 100 ) var errIndex = errors.New("err index") @@ -49,7 +49,7 @@ func (m *metaCacheRedis) ExecDel(ctx context.Context) error { if err := m.rcClient.TagAsDeletedBatch2(ctx, m.keys); err != nil { if retryTimes >= m.maxRetryTimes { err = errs.ErrInternalServer.Wrap(fmt.Sprintf("delete cache error: %v, keys: %v, retry times %d, please check redis server", err, m.keys, retryTimes)) - log.ZWarn(ctx, "delete cache failed", err, "keys", m.keys) + log.ZWarn(ctx, "delete cache failed, please handle keys", err, "keys", m.keys) return err } retryTimes++ @@ -106,11 +106,11 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin return t, nil } if v == "" { - return t, errs.ErrRecordNotFound.Wrap("msgCache is not found") + return t, errs.ErrRecordNotFound.Wrap("cache is not found") } err = json.Unmarshal([]byte(v), &t) if err != nil { - log.ZError(ctx, "msgCache json.Unmarshal failed", err, "key", key, "value", v, "expire", expire) + log.ZError(ctx, "cache json.Unmarshal failed", err, "key", key, "value", v, "expire", expire) return t, utils.Wrap(err, "") } return t, nil diff --git a/pkg/utils/options.go b/pkg/utils/options.go index 4381d8132..a3b265e14 100644 --- a/pkg/utils/options.go +++ b/pkg/utils/options.go @@ -25,6 +25,12 @@ func NewOptions(opts ...OptionsOpt) Options { return options } +func NewMsgOptions() Options { + options := make(map[string]bool, 11) + options[constant.IsOfflinePush] = false + return make(map[string]bool) +} + func WithOptions(options Options, opts ...OptionsOpt) Options { for _, opt := range opts { opt(options) @@ -56,15 +62,15 @@ func WithPersistent() OptionsOpt { } } -func WithOfflinePush() OptionsOpt { +func WithOfflinePush(b bool) OptionsOpt { return func(options Options) { - options[constant.IsOfflinePush] = true + options[constant.IsOfflinePush] = b } } -func WithUnreadCount() OptionsOpt { +func WithUnreadCount(b bool) OptionsOpt { return func(options Options) { - options[constant.IsUnreadCount] = true + options[constant.IsUnreadCount] = b } }