test-errcode
wangchuxiao 2 years ago
parent 70758c306a
commit 1dad0b06d1

@ -136,7 +136,7 @@ func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sd
return nil, err
}
for _, conversationID := range conversationIDs {
conversationIDs = append(conversationIDs, utils.GetNotificationConvetstionID(conversationID))
conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID))
}
log.ZDebug(ctx, "GetMaxSeq", "conversationIDs", conversationIDs)
maxSeqs, err := m.MsgDatabase.GetMaxSeqs(ctx, conversationIDs)

@ -9,7 +9,6 @@ import (
"github.com/robfig/cron/v3"
)
const cronTaskOperationID = "cronTaskOperationID-"
const moduleName = "cron"
func StartCronTask() error {

@ -7,7 +7,6 @@ import (
"math"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation"
@ -16,7 +15,6 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/go-redis/redis/v8"
)
type MsgTool struct {
@ -65,169 +63,68 @@ func (c *MsgTool) AllConversationClearMsgAndFixSeq() {
log.ZError(ctx, "GetAllConversationIDs failed", err)
return
}
c.ClearSuperGroupMsg(ctx, conversationIDs)
log.ZInfo(ctx, "============================ start del cron finished ============================")
}
func (c *MsgTool) ClearUsersMsg(ctx context.Context, userIDs []string) {
for _, userID := range userIDs {
if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, userID, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil {
log.ZError(ctx, "DeleteUserMsgsAndSetMinSeq failed", err, "userID", userID, "DBRetainChatRecords", config.Config.Mongo.DBRetainChatRecords)
}
maxSeqCache, maxSeqMongo, err := c.GetAndFixUserSeqs(ctx, userID)
if err != nil {
continue
}
c.CheckMaxSeqWithMongo(ctx, userID, maxSeqCache, maxSeqMongo)
for _, conversationID := range conversationIDs {
conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID))
}
c.ClearConversationsMsg(ctx, conversationIDs)
log.ZInfo(ctx, "============================ start del cron finished ============================")
}
func (c *MsgTool) ClearSuperGroupMsg(ctx context.Context, superGroupIDs []string) {
for _, groupID := range superGroupIDs {
userIDs, err := c.groupDatabase.FindGroupMemberUserID(ctx, groupID)
if err != nil {
log.ZError(ctx, "ClearSuperGroupMsg failed", err, "groupID", groupID)
continue
func (c *MsgTool) ClearConversationsMsg(ctx context.Context, conversationIDs []string) {
for _, conversationID := range conversationIDs {
if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil {
log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "conversationID", conversationID, "DBRetainChatRecords", config.Config.Mongo.DBRetainChatRecords)
}
if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, groupID, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil {
log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "groupID", groupID, "userID", userIDs, "DBRetainChatRecords", config.Config.Mongo.DBRetainChatRecords)
if err := c.fixAndCheckSeq(ctx, conversationID); err != nil {
log.ZError(ctx, "fixSeq failed", err, "conversationID", conversationID)
}
if err := c.fixGroupSeq(ctx, groupID, userIDs); err != nil {
log.ZError(ctx, "fixGroupSeq failed", err, "groupID", groupID, "userID", userIDs)
}
}
}
func (c *MsgTool) FixGroupSeq(ctx context.Context, groupID string) error {
userIDs, err := c.groupDatabase.FindGroupMemberUserID(ctx, groupID)
if err != nil {
return err
}
return c.fixGroupSeq(ctx, groupID, userIDs)
}
func (c *MsgTool) fixGroupSeq(ctx context.Context, groupID string, userIDs []string) error {
_, maxSeqMongo, _, maxSeqCache, err := c.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, groupID)
func (c *MsgTool) checkMaxSeqWithMongo(ctx context.Context, conversationID string, maxSeqCache int64) error {
maxSeqMongo, _, err := c.msgDatabase.GetMongoMaxAndMinSeq(ctx, conversationID)
if err != nil {
if err == unrelation.ErrMsgNotFound {
return nil
}
return err
}
for _, userID := range userIDs {
if _, err := c.GetAndFixGroupUserSeq(ctx, userID, groupID, maxSeqCache); err != nil {
continue
}
}
if err := c.CheckMaxSeqWithMongo(ctx, groupID, maxSeqCache, maxSeqMongo); err != nil {
log.ZWarn(ctx, "cache max seq and mongo max seq is diff > 10", err, "groupID", groupID, "maxSeqCache", maxSeqCache, "maxSeqMongo", maxSeqMongo, "constant.WriteDiffusion", constant.WriteDiffusion)
if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 {
return errSeq
}
return nil
}
func (c *MsgTool) GetAndFixUserSeqs(ctx context.Context, userID string) (maxSeqCache, maxSeqMongo int64, err error) {
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := c.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, userID)
func (c *MsgTool) fixAndCheckSeq(ctx context.Context, conversationID string) error {
maxSeq, err := c.msgDatabase.GetMaxSeq(ctx, conversationID)
if err != nil {
if err != unrelation.ErrMsgNotFound {
log.ZError(ctx, "GetUserMinMaxSeqInMongoAndCache failed", err, "userID", userID)
}
return 0, 0, err
}
log.ZDebug(ctx, "userID", userID, "minSeqMongo", minSeqMongo, "maxSeqMongo", maxSeqMongo, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache)
if minSeqCache > maxSeqCache {
if err := c.msgDatabase.SetMinSeq(ctx, userID, maxSeqCache); err != nil {
log.ZError(ctx, "SetUserMinSeq failed", err, "userID", userID, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache)
} else {
log.ZInfo(ctx, "SetUserMinSeq success", "userID", userID, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache)
}
return err
}
return maxSeqCache, maxSeqMongo, nil
}
func (c *MsgTool) GetAndFixGroupUserSeq(ctx context.Context, userID string, groupID string, maxSeqCache int64) (minSeqCache int64, err error) {
minSeqCache, err = c.msgDatabase.GetMinSeq(ctx, groupID)
minSeq, err := c.msgDatabase.GetMinSeq(ctx, conversationID)
if err != nil {
log.ZError(ctx, "GetGroupUserMinSeq failed", err, "groupID", groupID, "userID", userID)
return 0, err
}
if minSeqCache > maxSeqCache {
if err := c.msgDatabase.SetConversationUserMinSeq(ctx, groupID, userID, maxSeqCache); err != nil {
log.ZError(ctx, "SetGroupUserMinSeq failed", err, "groupID", groupID, "userID", userID, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache)
} else {
log.ZInfo(ctx, "SetGroupUserMinSeq success", "groupID", groupID, "userID", userID, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache)
return err
}
if minSeq > maxSeq {
if err = c.msgDatabase.SetMinSeq(ctx, conversationID, maxSeq); err != nil {
return err
}
}
return minSeqCache, nil
}
func (c *MsgTool) CheckMaxSeqWithMongo(ctx context.Context, conversationID string, maxSeqCache, maxSeqMongo int64) error {
if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 {
return errSeq
if err := c.checkMaxSeqWithMongo(ctx, conversationID, maxSeq); err != nil {
return err
}
return nil
}
func (c *MsgTool) ShowUserSeqs(ctx context.Context, userID string) {
}
func (c *MsgTool) ShowSuperGroupSeqs(ctx context.Context, groupID string) {
}
func (c *MsgTool) ShowSuperGroupUserSeqs(ctx context.Context, groupID, userID string) {
}
func (c *MsgTool) FixAllSeq(ctx context.Context) error {
userIDs, err := c.userDatabase.GetAllUserID(ctx)
conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx)
if err != nil {
log.ZError(ctx, "GetAllConversationIDs failed", err)
return err
}
for _, userID := range userIDs {
userCurrentMinSeq, err := c.msgDatabase.GetMinSeq(ctx, userID)
if err != nil && err != redis.Nil {
continue
}
userCurrentMaxSeq, err := c.msgDatabase.GetMaxSeq(ctx, userID)
if err != nil && err != redis.Nil {
continue
}
if userCurrentMinSeq > userCurrentMaxSeq {
if err = c.msgDatabase.SetMinSeq(ctx, userID, userCurrentMaxSeq); err != nil {
fmt.Println("SetUserMinSeq failed", userID, userCurrentMaxSeq)
}
fmt.Println("fix", userID, userCurrentMaxSeq)
}
for _, conversationID := range conversationIDs {
conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID))
}
fmt.Println("fix users seq success")
groupIDs, err := c.groupDatabase.GetGroupIDsByGroupType(ctx, constant.WorkingGroup)
if err != nil {
return err
}
for _, groupID := range groupIDs {
maxSeq, err := c.msgDatabase.GetMaxSeq(ctx, groupID)
if err != nil {
fmt.Println("GetGroupMaxSeq failed", groupID)
continue
}
userIDs, err := c.groupDatabase.FindGroupMemberUserID(ctx, groupID)
if err != nil {
fmt.Println("get groupID", groupID, "failed, try again later")
continue
}
for _, userID := range userIDs {
userMinSeq, err := c.msgDatabase.GetMinSeq(ctx, groupID)
if err != nil && err != redis.Nil {
fmt.Println("GetGroupUserMinSeq failed", groupID, userID)
continue
}
if userMinSeq > maxSeq {
if err = c.msgDatabase.SetMinSeq(ctx, groupID, maxSeq); err != nil {
fmt.Println("SetGroupUserMinSeq failed", err.Error(), groupID, userID, maxSeq)
}
fmt.Println("fix", groupID, userID, maxSeq, userMinSeq)
}
for _, conversationID := range conversationIDs {
if err := c.fixAndCheckSeq(ctx, conversationID); err != nil {
log.ZError(ctx, "fixSeq failed", err, "conversationID", conversationID)
}
}
fmt.Println("fix all seq finished")

@ -90,13 +90,13 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) {
t.Error("init failed")
return
}
msgTools.ClearUsersMsg(ctx, []string{conversationID})
msgTools.ClearConversationsMsg(ctx, []string{conversationID})
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := msgTools.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, conversationID)
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
if err := msgTools.CheckMaxSeqWithMongo(ctx, conversationID, maxSeqCache, maxSeqMongo); err != nil {
if maxSeqCache != maxSeqMongo {
t.Error("checkMaxSeqWithMongo failed", conversationID)
}
if minSeqMongo != minSeqCache {
@ -133,13 +133,13 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) {
t.Error("InsertOne failed", testUID1)
}
msgTools.ClearUsersMsg(ctx, []string{conversationID})
msgTools.ClearConversationsMsg(ctx, []string{conversationID})
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, conversationID)
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
if err := msgTools.CheckMaxSeqWithMongo(ctx, conversationID, maxSeqCache, maxSeqMongo); err != nil {
if maxSeqCache != maxSeqMongo {
t.Error("checkMaxSeqWithMongo failed", conversationID)
}
if minSeqMongo != minSeqCache {
@ -165,13 +165,13 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) {
t.Error("InsertOne failed", conversationID)
}
msgTools.ClearUsersMsg(ctx, []string{conversationID})
msgTools.ClearConversationsMsg(ctx, []string{conversationID})
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, conversationID)
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
if err := msgTools.CheckMaxSeqWithMongo(ctx, conversationID, maxSeqCache, maxSeqMongo); err != nil {
if maxSeqCache != maxSeqMongo {
t.Error("checkMaxSeqWithMongo failed", conversationID)
}
if minSeqMongo != minSeqCache {
@ -211,7 +211,7 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) {
t.Error("InsertOne failed", conversationID)
}
msgTools.ClearUsersMsg(ctx, []string{conversationID})
msgTools.ClearConversationsMsg(ctx, []string{conversationID})
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
@ -221,7 +221,7 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
if err := msgTools.CheckMaxSeqWithMongo(ctx, conversationID, maxSeqCache, maxSeqMongo); err != nil {
if maxSeqCache != maxSeqMongo {
t.Error("checkMaxSeqWithMongo failed", conversationID)
}
if minSeqMongo != minSeqCache {
@ -252,7 +252,7 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) {
t.Error("InsertOne failed", conversationID)
}
msgTools.ClearUsersMsg(ctx, []string{conversationID})
msgTools.ClearConversationsMsg(ctx, []string{conversationID})
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
@ -262,7 +262,7 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
if err := msgTools.CheckMaxSeqWithMongo(ctx, conversationID, maxSeqCache, maxSeqMongo); err != nil {
if maxSeqCache != maxSeqMongo {
t.Error("checkMaxSeqWithMongo failed", conversationID)
}
if minSeqMongo != minSeqCache {
@ -312,7 +312,7 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
if err := msgTools.CheckMaxSeqWithMongo(ctx, conversationID, maxSeqCache, maxSeqMongo); err != nil {
if maxSeqCache != maxSeqMongo {
t.Error("checkMaxSeqWithMongo failed", conversationID)
}
if minSeqMongo != minSeqCache {

@ -56,6 +56,7 @@ type CommonMsgDatabase interface {
SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error
SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error)
GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (maxSeq, minSeq int64, err error)
GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
@ -678,6 +679,10 @@ func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context
return
}
func (db *commonMsgDatabase) GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (maxSeq, minSeq int64, err error) {
return db.GetMinMaxSeqMongo(ctx, conversationID)
}
func (db *commonMsgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) {
oldestMsgMongo, err := db.msgDocDatabase.GetOldestMsg(ctx, conversationID)
if err != nil {

@ -276,7 +276,7 @@ func IsNotification(conversationID string) bool {
return strings.HasPrefix(conversationID, "n_")
}
func GetNotificationConvetstionID(conversationID string) string {
func GetNotificationConversationIDByConversationID(conversationID string) string {
l := strings.Split(conversationID, "_")
if len(l) > 1 {
l[0] = "n"

Loading…
Cancel
Save