From 5615e47d99ec2ca6e351f733e70ab3a27aabfaaa Mon Sep 17 00:00:00 2001 From: WangchuXiao Date: Wed, 9 Aug 2023 10:37:56 +0800 Subject: [PATCH] new feature data convert (#819) * new feature: add batch send msg * new feature: add batch send msg * new feature: add batch send msg * new feature: add batch send msg * new feature: add batch send msg * new feature: add batch send msg * fix bug: multiple gateway kick user * fix bug: multiple gateway kick user * fix bug: multiple gateway kick user * fix bug: multiple gateway kick user * fix bug: multiple gateway kick user * MsgDestructTime * fix bug: msg destruct sql * fix bug: msg destruct * fix bug: msg destruct * fix bug: msg destruct sql * fix bug: msg destruct sql * fix bug: msg destruct sql * fix bug: msg destruct sql * debug: print stack * debug: print stack * debug: print stack * fix bug: msg destruct sql Signed-off-by: wangchuxiao * fix bug: msg notification self 2 self push twice * fix bug: heartbeat get self notification * fix bug: init grpc conn in one process * fix bug: grpc conn Signed-off-by: wangchuxiao * fix bug: zk client recreate node when reconn * fix bug: set friend mark args error * fix bug: rpc client intercepter called twice Signed-off-by: wangchuxiao * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * test: document msg num set 100 * new feat: sync designated model * new feat: sync designated model * new feat: sync designated model * new feat: sync designated model * new feat: sync designated model * new feat: sync designated model * new feat: sync designated model * merge code * merge code * fix bug: repeat add friend not effect * feat: msg doc len changed * feat: msg doc len changed * feat: msg doc len changed * feat: msg doc len changed * feat: msg doc len changed * feat: msg doc len changed * feat: msg doc len changed * feat: msg doc len changed * feat: msg doc len changed --------- Signed-off-by: wangchuxiao Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: wangchuxiao-dev --- cmd/openim-crontask/main.go | 2 +- go.work.sum | 5 +- .../msgtransfer/online_history_msg_handler.go | 2 +- internal/rpc/user/user.go | 11 ++- internal/tools/cron_task.go | 3 +- internal/tools/msg_doc_convert.go | 32 +++++++++ pkg/common/db/controller/msg.go | 5 ++ pkg/common/db/relation/user_model.go | 6 +- pkg/common/db/table/unrelation/msg.go | 14 ++-- pkg/common/db/unrelation/msg_convert.go | 67 +++++++++++++++++++ pkg/msgprocessor/conversation.go | 26 ++++++- 11 files changed, 153 insertions(+), 20 deletions(-) create mode 100644 internal/tools/msg_doc_convert.go create mode 100644 pkg/common/db/unrelation/msg_convert.go diff --git a/cmd/openim-crontask/main.go b/cmd/openim-crontask/main.go index 73deb8c66..6fbb0558a 100644 --- a/cmd/openim-crontask/main.go +++ b/cmd/openim-crontask/main.go @@ -21,7 +21,7 @@ import ( func main() { cronTaskCmd := cmd.NewCronTaskCmd() - if err := cronTaskCmd.Exec(tools.StartCronTask); err != nil { + if err := cronTaskCmd.Exec(tools.StartTask); err != nil { panic(err.Error()) } } diff --git a/go.work.sum b/go.work.sum index 2a89bc247..49bef9646 100644 --- a/go.work.sum +++ b/go.work.sum @@ -1,9 +1,6 @@ -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 8422ebddb..efaf28cc3 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -145,7 +145,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { len(modifyMsgList), ) conversationIDMsg := msgprocessor.GetChatConversationIDByMsg(ctxMsgList[0].message) - conversationIDNotification := msgprocessor.GetNotificationConversationID(ctxMsgList[0].message) + conversationIDNotification := msgprocessor.GetNotificationConversationIDByMsg(ctxMsgList[0].message) och.handleMsg(ctx, msgChannelValue.uniqueKey, conversationIDMsg, storageMsgList, notStorageMsgList) och.handleNotification( ctx, diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index ab45a2b2f..3d954cd9a 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -20,16 +20,12 @@ import ( "strings" "time" - "github.com/OpenIMSDK/tools/log" - "github.com/OpenIMSDK/Open-IM-Server/pkg/authverify" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation" - "github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/sdkws" - pbuser "github.com/OpenIMSDK/protocol/user" - registry "github.com/OpenIMSDK/tools/discoveryregistry" "github.com/OpenIMSDK/tools/errs" + "github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/tx" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" @@ -40,10 +36,11 @@ import ( tablerelation "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/notification" + registry "github.com/OpenIMSDK/tools/discoveryregistry" - "google.golang.org/grpc" - + pbuser "github.com/OpenIMSDK/protocol/user" "github.com/OpenIMSDK/tools/utils" + "google.golang.org/grpc" ) type userServer struct { diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index a1756a2f8..0bc8b00a3 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -26,12 +26,13 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" ) -func StartCronTask() error { +func StartTask() error { fmt.Println("cron task start, config", config.Config.ChatRecordsClearTime) msgTool, err := InitMsgTool() if err != nil { return err } + msgTool.ConvertTools() c := cron.New() var wg sync.WaitGroup wg.Add(1) diff --git a/internal/tools/msg_doc_convert.go b/internal/tools/msg_doc_convert.go new file mode 100644 index 000000000..537b7bf2c --- /dev/null +++ b/internal/tools/msg_doc_convert.go @@ -0,0 +1,32 @@ +package tools + +import ( + "github.com/OpenIMSDK/Open-IM-Server/pkg/msgprocessor" + "github.com/OpenIMSDK/protocol/constant" + "github.com/OpenIMSDK/tools/log" + "github.com/OpenIMSDK/tools/mcontext" +) + +func (c *MsgTool) ConvertTools() { + ctx := mcontext.NewCtx("convert") + conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx) + if err != nil { + log.ZError(ctx, "get all conversation ids failed", err) + return + } + for _, conversationID := range conversationIDs { + conversationIDs = append(conversationIDs, msgprocessor.GetNotificationConversationIDByConversationID(conversationID)) + } + userIDs, err := c.userDatabase.GetAllUserID(ctx, 0, 0) + if err != nil { + log.ZError(ctx, "get all user ids failed", err) + return + } + log.ZDebug(ctx, "all userIDs", "len userIDs", len(userIDs)) + for _, userID := range userIDs { + conversationIDs = append(conversationIDs, msgprocessor.GetConversationIDBySessionType(constant.SingleChatType, userID, userID)) + conversationIDs = append(conversationIDs, msgprocessor.GetNotificationConversationID(constant.SingleChatType, userID, userID)) + } + log.ZDebug(ctx, "all conversationIDs", "len userIDs", len(conversationIDs)) + c.msgDatabase.ConvertMsgsDocLen(ctx, conversationIDs) +} diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index e2f1447ab..418fe7a4a 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -118,6 +118,7 @@ type CommonMsgDatabase interface { pageNumber int32, showNumber int32, ) (msgCount int64, userCount int64, groups []*unRelationTb.GroupCount, dateCount map[string]int64, err error) + ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) } func NewCommonMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.MsgModel) CommonMsgDatabase { @@ -965,3 +966,7 @@ func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbMsg.Searc } return total, totalMsgs, nil } + +func (db *commonMsgDatabase) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) { + db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs) +} diff --git a/pkg/common/db/relation/user_model.go b/pkg/common/db/relation/user_model.go index f97abdefe..758a62037 100644 --- a/pkg/common/db/relation/user_model.go +++ b/pkg/common/db/relation/user_model.go @@ -86,7 +86,11 @@ func (u *UserGorm) Page( // 获取所有用户ID. func (u *UserGorm) GetAllUserID(ctx context.Context, pageNumber, showNumber int32) (userIDs []string, err error) { - return userIDs, errs.Wrap(u.db(ctx).Limit(int(showNumber)).Offset(int((pageNumber-1)*showNumber)).Pluck("user_id", &userIDs).Error) + if pageNumber == 0 || showNumber == 0 { + return userIDs, errs.Wrap(u.db(ctx).Pluck("user_id", &userIDs).Error) + } else { + return userIDs, errs.Wrap(u.db(ctx).Limit(int(showNumber)).Offset(int((pageNumber-1)*showNumber)).Pluck("user_id", &userIDs).Error) + } } func (u *UserGorm) GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) { diff --git a/pkg/common/db/table/unrelation/msg.go b/pkg/common/db/table/unrelation/msg.go index 30e6670b5..c95b211a8 100644 --- a/pkg/common/db/table/unrelation/msg.go +++ b/pkg/common/db/table/unrelation/msg.go @@ -27,10 +27,11 @@ import ( ) const ( - singleGocMsgNum = 5000 - Msg = "msg" - OldestList = 0 - NewestList = -1 + singleGocMsgNum = 100 + singleGocMsgNum5000 = 5000 + Msg = "msg" + OldestList = 0 + NewestList = -1 ) type MsgDocModel struct { @@ -128,6 +129,7 @@ type MsgDocModelInterface interface { pageNumber int32, showNumber int32, ) (msgCount int64, userCount int64, groups []*GroupCount, dateCount map[string]int64, err error) + ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) } func (MsgDocModel) TableName() string { @@ -138,6 +140,10 @@ func (MsgDocModel) GetSingleGocMsgNum() int64 { return singleGocMsgNum } +func (MsgDocModel) GetSingleGocMsgNum5000() int64 { + return singleGocMsgNum5000 +} + func (m *MsgDocModel) IsFull() bool { return m.Msg[len(m.Msg)-1].Msg != nil } diff --git a/pkg/common/db/unrelation/msg_convert.go b/pkg/common/db/unrelation/msg_convert.go new file mode 100644 index 000000000..134020dfd --- /dev/null +++ b/pkg/common/db/unrelation/msg_convert.go @@ -0,0 +1,67 @@ +package unrelation + +import ( + "context" + "fmt" + + table "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" + "github.com/OpenIMSDK/tools/log" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +func (m *MsgMongoDriver) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) { + for _, conversationID := range conversationIDs { + regex := primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)} + cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": regex}) + if err != nil { + log.ZError(ctx, "convertAll find msg doc failed", err, "conversationID", conversationID) + continue + } + var msgDocs []table.MsgDocModel + err = cursor.All(ctx, &msgDocs) + if err != nil { + log.ZError(ctx, "convertAll cursor all failed", err, "conversationID", conversationID) + continue + } + if len(msgDocs) < 1 { + continue + } + log.ZInfo(ctx, "msg doc convert", "conversationID", conversationID, "len(msgDocs)", len(msgDocs)) + if len(msgDocs[0].Msg) == int(m.model.GetSingleGocMsgNum5000()) { + if _, err := m.MsgCollection.DeleteMany(ctx, bson.M{"doc_id": regex}); err != nil { + log.ZError(ctx, "convertAll delete many failed", err, "conversationID", conversationID) + continue + } + var newMsgDocs []interface{} + for _, msgDoc := range msgDocs { + if int64(len(msgDoc.Msg)) == m.model.GetSingleGocMsgNum() { + continue + } + var index int64 + for index < int64(len(msgDoc.Msg)) { + msg := msgDoc.Msg[index] + if msg != nil && msg.Msg != nil { + msgDocModel := table.MsgDocModel{DocID: m.model.GetDocID(conversationID, msg.Msg.Seq)} + end := index + m.model.GetSingleGocMsgNum() + if int(end) >= len(msgDoc.Msg) { + msgDocModel.Msg = msgDoc.Msg[index:] + } else { + msgDocModel.Msg = msgDoc.Msg[index:end] + } + newMsgDocs = append(newMsgDocs, msgDocModel) + index = end + } else { + break + } + } + } + _, err = m.MsgCollection.InsertMany(ctx, newMsgDocs) + if err != nil { + log.ZError(ctx, "convertAll insert many failed", err, "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs)) + } else { + log.ZInfo(ctx, "msg doc convert", "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs)) + } + } + } +} diff --git a/pkg/msgprocessor/conversation.go b/pkg/msgprocessor/conversation.go index c0d97dc44..ca77438ea 100644 --- a/pkg/msgprocessor/conversation.go +++ b/pkg/msgprocessor/conversation.go @@ -23,7 +23,7 @@ import ( "google.golang.org/protobuf/proto" ) -func GetNotificationConversationID(msg *sdkws.MsgData) string { +func GetNotificationConversationIDByMsg(msg *sdkws.MsgData) string { switch msg.SessionType { case constant.SingleChatType: l := []string{msg.SendID, msg.RecvID} @@ -114,6 +114,30 @@ func GetConversationIDBySessionType(sessionType int, ids ...string) string { return "" } +func GetNotificationConversationIDByConversationID(conversationID string) string { + l := strings.Split(conversationID, "_") + if len(l) > 1 { + l[0] = "n" + return strings.Join(l, "_") + } else { + return "" + } +} + +func GetNotificationConversationID(sessionType int, ids ...string) string { + sort.Strings(ids) + if len(ids) > 2 || len(ids) < 1 { + return "" + } + switch sessionType { + case constant.SingleChatType: + return "n_" + strings.Join(ids, "_") // single chat + case constant.SuperGroupChatType: + return "n_" + ids[0] // super group chat + } + return "" +} + func IsNotification(conversationID string) bool { return strings.HasPrefix(conversationID, "n_") }