From 9b242e5121076b69e345a7990ec890f4254ba25a Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Mon, 7 Aug 2023 19:39:30 +0800 Subject: [PATCH] feat: msg doc len changed --- cmd/openim-crontask/main.go | 2 +- .../msgtransfer/online_history_msg_handler.go | 5 +- internal/tools/cron_task.go | 3 +- internal/tools/msg_doc_convert.go | 29 +++++++++ pkg/common/db/controller/msg.go | 5 ++ pkg/common/db/relation/user_model.go | 6 +- pkg/common/db/table/unrelation/msg.go | 14 ++-- pkg/common/db/unrelation/msg_convert.go | 65 +++++++++++++++++++ pkg/msgprocessor/conversation.go | 21 +++++- 9 files changed, 138 insertions(+), 12 deletions(-) create mode 100644 internal/tools/msg_doc_convert.go create mode 100644 pkg/common/db/unrelation/msg_convert.go diff --git a/cmd/openim-crontask/main.go b/cmd/openim-crontask/main.go index 73deb8c66..6fbb0558a 100644 --- a/cmd/openim-crontask/main.go +++ b/cmd/openim-crontask/main.go @@ -21,7 +21,7 @@ import ( func main() { cronTaskCmd := cmd.NewCronTaskCmd() - if err := cronTaskCmd.Exec(tools.StartCronTask); err != nil { + if err := cronTaskCmd.Exec(tools.StartTask); err != nil { panic(err.Error()) } } diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 282f50119..0b3a08d37 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -16,12 +16,13 @@ package msgtransfer import ( "context" - "github.com/OpenIMSDK/Open-IM-Server/pkg/msgprocessor" "strconv" "strings" "sync" "time" + "github.com/OpenIMSDK/Open-IM-Server/pkg/msgprocessor" + "github.com/OpenIMSDK/tools/errs" "github.com/Shopify/sarama" @@ -143,7 +144,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { len(modifyMsgList), ) conversationIDMsg := msgprocessor.GetChatConversationIDByMsg(ctxMsgList[0].message) - conversationIDNotification := msgprocessor.GetNotificationConversationID(ctxMsgList[0].message) + conversationIDNotification := msgprocessor.GetNotificationConversationIDByMsg(ctxMsgList[0].message) och.handleMsg(ctx, msgChannelValue.uniqueKey, conversationIDMsg, storageMsgList, notStorageMsgList) och.handleNotification( ctx, diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index a7333a5d4..fed0a5f7e 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -25,12 +25,13 @@ import ( "github.com/OpenIMSDK/tools/log" ) -func StartCronTask() error { +func StartTask() error { fmt.Println("cron task start, config", config.Config.ChatRecordsClearTime) msgTool, err := InitMsgTool() if err != nil { return err } + msgTool.ConvertTools() c := cron.New() var wg sync.WaitGroup wg.Add(1) diff --git a/internal/tools/msg_doc_convert.go b/internal/tools/msg_doc_convert.go new file mode 100644 index 000000000..5b684508b --- /dev/null +++ b/internal/tools/msg_doc_convert.go @@ -0,0 +1,29 @@ +package tools + +import ( + "github.com/OpenIMSDK/Open-IM-Server/pkg/msgprocessor" + "github.com/OpenIMSDK/protocol/constant" + "github.com/OpenIMSDK/tools/log" + "github.com/OpenIMSDK/tools/mcontext" +) + +func (c *MsgTool) ConvertTools() { + ctx := mcontext.NewCtx("convert") + conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx) + if err != nil { + log.ZError(ctx, "get all conversation ids failed", err) + return + } + userIDs, err := c.userDatabase.GetAllUserID(ctx, 0, 0) + if err != nil { + log.ZError(ctx, "get all user ids failed", err) + return + } + log.ZDebug(ctx, "all userIDs", "userIDs", userIDs) + for _, userID := range userIDs { + conversationIDs = append(conversationIDs, msgprocessor.GetConversationIDBySessionType(constant.SingleChatType, userID, userID)) + conversationIDs = append(conversationIDs, msgprocessor.GetNotificationConversationID(constant.SingleChatType, userID, userID)) + } + log.ZDebug(ctx, "all conversationIDs", "conversationIDs", conversationIDs) + c.msgDatabase.ConvertMsgsDocLen(ctx, conversationIDs) +} diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 7c03eb8b6..3cec68bcc 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -117,6 +117,7 @@ type CommonMsgDatabase interface { pageNumber int32, showNumber int32, ) (msgCount int64, userCount int64, groups []*unRelationTb.GroupCount, dateCount map[string]int64, err error) + ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) } func NewCommonMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.MsgModel) CommonMsgDatabase { @@ -964,3 +965,7 @@ func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbMsg.Searc } return total, totalMsgs, nil } + +func (db *commonMsgDatabase) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) { + db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs) +} diff --git a/pkg/common/db/relation/user_model.go b/pkg/common/db/relation/user_model.go index b7c592eeb..a3a4579ac 100644 --- a/pkg/common/db/relation/user_model.go +++ b/pkg/common/db/relation/user_model.go @@ -85,7 +85,11 @@ func (u *UserGorm) Page( // 获取所有用户ID. func (u *UserGorm) GetAllUserID(ctx context.Context, pageNumber, showNumber int32) (userIDs []string, err error) { - return userIDs, errs.Wrap(u.db(ctx).Limit(int(showNumber)).Offset(int((pageNumber-1)*showNumber)).Pluck("user_id", &userIDs).Error) + if pageNumber != 0 || showNumber != 0 { + return userIDs, errs.Wrap(u.db(ctx).Pluck("user_id", &userIDs).Error) + } else { + return userIDs, errs.Wrap(u.db(ctx).Limit(int(showNumber)).Offset(int((pageNumber-1)*showNumber)).Pluck("user_id", &userIDs).Error) + } } func (u *UserGorm) GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) { diff --git a/pkg/common/db/table/unrelation/msg.go b/pkg/common/db/table/unrelation/msg.go index 30e6670b5..c95b211a8 100644 --- a/pkg/common/db/table/unrelation/msg.go +++ b/pkg/common/db/table/unrelation/msg.go @@ -27,10 +27,11 @@ import ( ) const ( - singleGocMsgNum = 5000 - Msg = "msg" - OldestList = 0 - NewestList = -1 + singleGocMsgNum = 100 + singleGocMsgNum5000 = 5000 + Msg = "msg" + OldestList = 0 + NewestList = -1 ) type MsgDocModel struct { @@ -128,6 +129,7 @@ type MsgDocModelInterface interface { pageNumber int32, showNumber int32, ) (msgCount int64, userCount int64, groups []*GroupCount, dateCount map[string]int64, err error) + ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) } func (MsgDocModel) TableName() string { @@ -138,6 +140,10 @@ func (MsgDocModel) GetSingleGocMsgNum() int64 { return singleGocMsgNum } +func (MsgDocModel) GetSingleGocMsgNum5000() int64 { + return singleGocMsgNum5000 +} + func (m *MsgDocModel) IsFull() bool { return m.Msg[len(m.Msg)-1].Msg != nil } diff --git a/pkg/common/db/unrelation/msg_convert.go b/pkg/common/db/unrelation/msg_convert.go new file mode 100644 index 000000000..362bfd6fc --- /dev/null +++ b/pkg/common/db/unrelation/msg_convert.go @@ -0,0 +1,65 @@ +package unrelation + +import ( + "context" + "fmt" + + table "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" + "github.com/OpenIMSDK/tools/log" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +func (m *MsgMongoDriver) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) { + for _, conversationID := range conversationIDs { + regex := primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)} + cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": regex}) + if err != nil { + log.ZError(ctx, "convertAll find msg doc failed", err, "conversationID", conversationID) + continue + } + var msgDocs []table.MsgDocModel + err = cursor.All(ctx, &msgDocs) + if err != nil { + log.ZError(ctx, "convertAll cursor all failed", err, "conversationID", conversationID) + continue + } + if len(msgDocs) < 1 { + log.ZInfo(ctx, "len(msgs) < 1", "conversationID", conversationID) + continue + } + log.ZInfo(ctx, "msg doc convert", "conversationID", conversationID, "len(msgDocs)", len(msgDocs)) + if _, err := m.MsgCollection.DeleteMany(ctx, bson.M{"doc_id": regex}); err != nil { + log.ZError(ctx, "convertAll delete many failed", err, "conversationID", conversationID) + continue + } + var newMsgDocs []interface{} + for _, msgDoc := range msgDocs { + if int64(len(msgDoc.Msg)) == m.model.GetSingleGocMsgNum() { + continue + } + var index int64 + for index < int64(len(msgDoc.Msg)) { + msg := msgDoc.Msg[index] + if msg != nil && msg.Msg != nil { + msgDocModel := table.MsgDocModel{DocID: m.model.GetDocID(conversationID, msg.Msg.Seq)} + end := index + m.model.GetSingleGocMsgNum() + if int(end) >= len(msgDoc.Msg) { + msgDocModel.Msg = msgDoc.Msg[index:] + } else { + msgDocModel.Msg = msgDoc.Msg[index:end] + } + index = end + } else { + break + } + } + } + _, err = m.MsgCollection.InsertMany(ctx, newMsgDocs) + if err != nil { + log.ZError(ctx, "convertAll insert many failed", err, "conversationID", conversationID) + } else { + log.ZInfo(ctx, "convertAll insert many success", "conversationID", conversationID) + } + } +} diff --git a/pkg/msgprocessor/conversation.go b/pkg/msgprocessor/conversation.go index b24794af8..a63b6391a 100644 --- a/pkg/msgprocessor/conversation.go +++ b/pkg/msgprocessor/conversation.go @@ -1,14 +1,15 @@ package msgprocessor import ( + "sort" + "strings" + "github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/sdkws" "google.golang.org/protobuf/proto" - "sort" - "strings" ) -func GetNotificationConversationID(msg *sdkws.MsgData) string { +func GetNotificationConversationIDByMsg(msg *sdkws.MsgData) string { switch msg.SessionType { case constant.SingleChatType: l := []string{msg.SendID, msg.RecvID} @@ -99,6 +100,20 @@ func GetConversationIDBySessionType(sessionType int, ids ...string) string { return "" } +func GetNotificationConversationID(sessionType int, ids ...string) string { + sort.Strings(ids) + if len(ids) > 2 || len(ids) < 1 { + return "" + } + switch sessionType { + case constant.SingleChatType: + return "n_" + strings.Join(ids, "_") // single chat + case constant.SuperGroupChatType: + return "n_" + ids[0] // super group chat + } + return "" +} + func IsNotification(conversationID string) bool { return strings.HasPrefix(conversationID, "n_") }