feat: msg doc len changed

pull/819/head
wangchuxiao 2 years ago
parent dad8b406a6
commit 9b242e5121

@ -21,7 +21,7 @@ import (
func main() { func main() {
cronTaskCmd := cmd.NewCronTaskCmd() cronTaskCmd := cmd.NewCronTaskCmd()
if err := cronTaskCmd.Exec(tools.StartCronTask); err != nil { if err := cronTaskCmd.Exec(tools.StartTask); err != nil {
panic(err.Error()) panic(err.Error())
} }
} }

@ -16,12 +16,13 @@ package msgtransfer
import ( import (
"context" "context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/msgprocessor"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/msgprocessor"
"github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/errs"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
@ -143,7 +144,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
len(modifyMsgList), len(modifyMsgList),
) )
conversationIDMsg := msgprocessor.GetChatConversationIDByMsg(ctxMsgList[0].message) conversationIDMsg := msgprocessor.GetChatConversationIDByMsg(ctxMsgList[0].message)
conversationIDNotification := msgprocessor.GetNotificationConversationID(ctxMsgList[0].message) conversationIDNotification := msgprocessor.GetNotificationConversationIDByMsg(ctxMsgList[0].message)
och.handleMsg(ctx, msgChannelValue.uniqueKey, conversationIDMsg, storageMsgList, notStorageMsgList) och.handleMsg(ctx, msgChannelValue.uniqueKey, conversationIDMsg, storageMsgList, notStorageMsgList)
och.handleNotification( och.handleNotification(
ctx, ctx,

@ -25,12 +25,13 @@ import (
"github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/log"
) )
func StartCronTask() error { func StartTask() error {
fmt.Println("cron task start, config", config.Config.ChatRecordsClearTime) fmt.Println("cron task start, config", config.Config.ChatRecordsClearTime)
msgTool, err := InitMsgTool() msgTool, err := InitMsgTool()
if err != nil { if err != nil {
return err return err
} }
msgTool.ConvertTools()
c := cron.New() c := cron.New()
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)

@ -0,0 +1,29 @@
package tools
import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/msgprocessor"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
)
func (c *MsgTool) ConvertTools() {
ctx := mcontext.NewCtx("convert")
conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx)
if err != nil {
log.ZError(ctx, "get all conversation ids failed", err)
return
}
userIDs, err := c.userDatabase.GetAllUserID(ctx, 0, 0)
if err != nil {
log.ZError(ctx, "get all user ids failed", err)
return
}
log.ZDebug(ctx, "all userIDs", "userIDs", userIDs)
for _, userID := range userIDs {
conversationIDs = append(conversationIDs, msgprocessor.GetConversationIDBySessionType(constant.SingleChatType, userID, userID))
conversationIDs = append(conversationIDs, msgprocessor.GetNotificationConversationID(constant.SingleChatType, userID, userID))
}
log.ZDebug(ctx, "all conversationIDs", "conversationIDs", conversationIDs)
c.msgDatabase.ConvertMsgsDocLen(ctx, conversationIDs)
}

@ -117,6 +117,7 @@ type CommonMsgDatabase interface {
pageNumber int32, pageNumber int32,
showNumber int32, showNumber int32,
) (msgCount int64, userCount int64, groups []*unRelationTb.GroupCount, dateCount map[string]int64, err error) ) (msgCount int64, userCount int64, groups []*unRelationTb.GroupCount, dateCount map[string]int64, err error)
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
} }
func NewCommonMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.MsgModel) CommonMsgDatabase { func NewCommonMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.MsgModel) CommonMsgDatabase {
@ -964,3 +965,7 @@ func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbMsg.Searc
} }
return total, totalMsgs, nil return total, totalMsgs, nil
} }
func (db *commonMsgDatabase) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) {
db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs)
}

@ -85,7 +85,11 @@ func (u *UserGorm) Page(
// 获取所有用户ID. // 获取所有用户ID.
func (u *UserGorm) GetAllUserID(ctx context.Context, pageNumber, showNumber int32) (userIDs []string, err error) { func (u *UserGorm) GetAllUserID(ctx context.Context, pageNumber, showNumber int32) (userIDs []string, err error) {
if pageNumber != 0 || showNumber != 0 {
return userIDs, errs.Wrap(u.db(ctx).Pluck("user_id", &userIDs).Error)
} else {
return userIDs, errs.Wrap(u.db(ctx).Limit(int(showNumber)).Offset(int((pageNumber-1)*showNumber)).Pluck("user_id", &userIDs).Error) return userIDs, errs.Wrap(u.db(ctx).Limit(int(showNumber)).Offset(int((pageNumber-1)*showNumber)).Pluck("user_id", &userIDs).Error)
}
} }
func (u *UserGorm) GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) { func (u *UserGorm) GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) {

@ -27,7 +27,8 @@ import (
) )
const ( const (
singleGocMsgNum = 5000 singleGocMsgNum = 100
singleGocMsgNum5000 = 5000
Msg = "msg" Msg = "msg"
OldestList = 0 OldestList = 0
NewestList = -1 NewestList = -1
@ -128,6 +129,7 @@ type MsgDocModelInterface interface {
pageNumber int32, pageNumber int32,
showNumber int32, showNumber int32,
) (msgCount int64, userCount int64, groups []*GroupCount, dateCount map[string]int64, err error) ) (msgCount int64, userCount int64, groups []*GroupCount, dateCount map[string]int64, err error)
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
} }
func (MsgDocModel) TableName() string { func (MsgDocModel) TableName() string {
@ -138,6 +140,10 @@ func (MsgDocModel) GetSingleGocMsgNum() int64 {
return singleGocMsgNum return singleGocMsgNum
} }
func (MsgDocModel) GetSingleGocMsgNum5000() int64 {
return singleGocMsgNum5000
}
func (m *MsgDocModel) IsFull() bool { func (m *MsgDocModel) IsFull() bool {
return m.Msg[len(m.Msg)-1].Msg != nil return m.Msg[len(m.Msg)-1].Msg != nil
} }

@ -0,0 +1,65 @@
package unrelation
import (
"context"
"fmt"
table "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
"github.com/OpenIMSDK/tools/log"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)
func (m *MsgMongoDriver) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) {
for _, conversationID := range conversationIDs {
regex := primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)}
cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": regex})
if err != nil {
log.ZError(ctx, "convertAll find msg doc failed", err, "conversationID", conversationID)
continue
}
var msgDocs []table.MsgDocModel
err = cursor.All(ctx, &msgDocs)
if err != nil {
log.ZError(ctx, "convertAll cursor all failed", err, "conversationID", conversationID)
continue
}
if len(msgDocs) < 1 {
log.ZInfo(ctx, "len(msgs) < 1", "conversationID", conversationID)
continue
}
log.ZInfo(ctx, "msg doc convert", "conversationID", conversationID, "len(msgDocs)", len(msgDocs))
if _, err := m.MsgCollection.DeleteMany(ctx, bson.M{"doc_id": regex}); err != nil {
log.ZError(ctx, "convertAll delete many failed", err, "conversationID", conversationID)
continue
}
var newMsgDocs []interface{}
for _, msgDoc := range msgDocs {
if int64(len(msgDoc.Msg)) == m.model.GetSingleGocMsgNum() {
continue
}
var index int64
for index < int64(len(msgDoc.Msg)) {
msg := msgDoc.Msg[index]
if msg != nil && msg.Msg != nil {
msgDocModel := table.MsgDocModel{DocID: m.model.GetDocID(conversationID, msg.Msg.Seq)}
end := index + m.model.GetSingleGocMsgNum()
if int(end) >= len(msgDoc.Msg) {
msgDocModel.Msg = msgDoc.Msg[index:]
} else {
msgDocModel.Msg = msgDoc.Msg[index:end]
}
index = end
} else {
break
}
}
}
_, err = m.MsgCollection.InsertMany(ctx, newMsgDocs)
if err != nil {
log.ZError(ctx, "convertAll insert many failed", err, "conversationID", conversationID)
} else {
log.ZInfo(ctx, "convertAll insert many success", "conversationID", conversationID)
}
}
}

@ -1,14 +1,15 @@
package msgprocessor package msgprocessor
import ( import (
"sort"
"strings"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/sdkws" "github.com/OpenIMSDK/protocol/sdkws"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"sort"
"strings"
) )
func GetNotificationConversationID(msg *sdkws.MsgData) string { func GetNotificationConversationIDByMsg(msg *sdkws.MsgData) string {
switch msg.SessionType { switch msg.SessionType {
case constant.SingleChatType: case constant.SingleChatType:
l := []string{msg.SendID, msg.RecvID} l := []string{msg.SendID, msg.RecvID}
@ -99,6 +100,20 @@ func GetConversationIDBySessionType(sessionType int, ids ...string) string {
return "" return ""
} }
func GetNotificationConversationID(sessionType int, ids ...string) string {
sort.Strings(ids)
if len(ids) > 2 || len(ids) < 1 {
return ""
}
switch sessionType {
case constant.SingleChatType:
return "n_" + strings.Join(ids, "_") // single chat
case constant.SuperGroupChatType:
return "n_" + ids[0] // super group chat
}
return ""
}
func IsNotification(conversationID string) bool { func IsNotification(conversationID string) bool {
return strings.HasPrefix(conversationID, "n_") return strings.HasPrefix(conversationID, "n_")
} }

Loading…
Cancel
Save