diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 17a310fb3..282f50119 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -16,6 +16,7 @@ package msgtransfer import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/msgprocessor" "strconv" "strings" "sync" @@ -141,8 +142,8 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { "modifyMsgList", len(modifyMsgList), ) - conversationIDMsg := utils.GetChatConversationIDByMsg(ctxMsgList[0].message) - conversationIDNotification := utils.GetNotificationConversationID(ctxMsgList[0].message) + conversationIDMsg := msgprocessor.GetChatConversationIDByMsg(ctxMsgList[0].message) + conversationIDNotification := msgprocessor.GetNotificationConversationID(ctxMsgList[0].message) och.handleMsg(ctx, msgChannelValue.uniqueKey, conversationIDMsg, storageMsgList, notStorageMsgList) och.handleNotification( ctx, @@ -172,7 +173,7 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList( totalMsgs []*ContextMsg, ) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*sdkws.MsgData) { isStorage := func(msg *sdkws.MsgData) bool { - options2 := utils.Options(msg.Options) + options2 := msgprocessor.Options(msg.Options) if options2.IsHistory() { return true } else { @@ -183,28 +184,28 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList( } } for _, v := range totalMsgs { - options := utils.Options(v.message.Options) + options := msgprocessor.Options(v.message.Options) if !options.IsNotNotification() { // clone msg from notificationMsg if options.IsSendMsg() { msg := proto.Clone(v.message).(*sdkws.MsgData) // 消息 if v.message.Options != nil { - msg.Options = utils.NewMsgOptions() + msg.Options = msgprocessor.NewMsgOptions() } if options.IsOfflinePush() { - v.message.Options = utils.WithOptions( - utils.Options(v.message.Options), - utils.WithOfflinePush(false), + v.message.Options = msgprocessor.WithOptions( + v.message.Options, + msgprocessor.WithOfflinePush(false), ) - msg.Options = utils.WithOptions(utils.Options(msg.Options), utils.WithOfflinePush(true)) + msg.Options = msgprocessor.WithOptions(msg.Options, msgprocessor.WithOfflinePush(true)) } if options.IsUnreadCount() { - v.message.Options = utils.WithOptions( - utils.Options(v.message.Options), - utils.WithUnreadCount(false), + v.message.Options = msgprocessor.WithOptions( + v.message.Options, + msgprocessor.WithUnreadCount(false), ) - msg.Options = utils.WithOptions(utils.Options(msg.Options), utils.WithUnreadCount(true)) + msg.Options = msgprocessor.WithOptions(msg.Options, msgprocessor.WithUnreadCount(true)) } storageMsgList = append(storageMsgList, msg) } diff --git a/pkg/common/config/parse.go b/pkg/common/config/parse.go index 1fc592b6d..e3fd99b9f 100644 --- a/pkg/common/config/parse.go +++ b/pkg/common/config/parse.go @@ -17,6 +17,7 @@ package config import ( _ "embed" "fmt" + "github.com/OpenIMSDK/Open-IM-Server/pkg/msgprocessor" "os" "path/filepath" "runtime" @@ -24,7 +25,6 @@ import ( "gopkg.in/yaml.v3" "github.com/OpenIMSDK/protocol/constant" - "github.com/OpenIMSDK/tools/utils" ) //go:embed version @@ -42,20 +42,20 @@ const ( DefaultFolderPath = "../config/" ) -func GetOptionsByNotification(cfg NotificationConf) utils.Options { - opts := utils.NewOptions() +func GetOptionsByNotification(cfg NotificationConf) msgprocessor.Options { + opts := msgprocessor.NewOptions() if cfg.UnreadCount { - opts = utils.WithOptions(opts, utils.WithUnreadCount(true)) + opts = msgprocessor.WithOptions(opts, msgprocessor.WithUnreadCount(true)) } if cfg.OfflinePush.Enable { - opts = utils.WithOptions(opts, utils.WithOfflinePush(true)) + opts = msgprocessor.WithOptions(opts, msgprocessor.WithOfflinePush(true)) } switch cfg.ReliabilityLevel { case constant.UnreliableNotification: case constant.ReliableNotificationNoMsg: - opts = utils.WithOptions(opts, utils.WithHistory(true), utils.WithPersistent()) + opts = msgprocessor.WithOptions(opts, msgprocessor.WithHistory(true), msgprocessor.WithPersistent()) } - opts = utils.WithOptions(opts, utils.WithSendMsg(cfg.IsSendMsg)) + opts = msgprocessor.WithOptions(opts, msgprocessor.WithSendMsg(cfg.IsSendMsg)) return opts } diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index e208567d0..953396fbe 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -16,6 +16,7 @@ package cache import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/msgprocessor" "strconv" "time" @@ -374,7 +375,7 @@ func (c *msgCache) GetMessagesBySeq( failedSeqs = append(failedSeqs, seqs[i]) } else { msg := sdkws.MsgData{} - err = utils.String2Pb(cmd.Val(), &msg) + err = msgprocessor.String2Pb(cmd.Val(), &msg) if err == nil { if msg.Status != constant.MsgDeleted { seqMsgs = append(seqMsgs, &msg) @@ -394,7 +395,7 @@ func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string, var failedMsgs []*sdkws.MsgData for _, msg := range msgs { key := c.getMessageCacheKey(conversationID, msg.Seq) - s, err := utils.Pb2String(msg) + s, err := msgprocessor.Pb2String(msg) if err != nil { return 0, errs.Wrap(err) } @@ -535,7 +536,7 @@ func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []in return err } msg.Status = constant.MsgDeleted - s, err := utils.Pb2String(&msg) + s, err := msgprocessor.Pb2String(&msg) if err != nil { return errs.Wrap(err) } diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go index cb6ac41f9..2829f963e 100644 --- a/pkg/common/db/controller/conversation.go +++ b/pkg/common/db/controller/conversation.go @@ -16,6 +16,7 @@ package controller import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/msgprocessor" "time" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" @@ -236,7 +237,7 @@ func (c *conversationDatabase) FindRecvMsgNotNotifyUserIDs(ctx context.Context, func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error { cache := c.cache.NewCache() - conversationID := utils.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID) + conversationID := msgprocessor.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID) if err := c.tx.Transaction(func(tx any) error { existConversationUserIDs, err := c.conversationDB.FindUserID(ctx, userIDs, []string{conversationID}) if err != nil { diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 3f8fc8fa2..d78aa1ff4 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -16,6 +16,7 @@ package startrpc import ( "fmt" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "net" "strconv" "time"