diff --git a/internal/rpc/msg/delete.go b/internal/rpc/msg/delete.go index b7a6e0706..a3fd3f489 100644 --- a/internal/rpc/msg/delete.go +++ b/internal/rpc/msg/delete.go @@ -20,7 +20,7 @@ func (m *msgServer) DelSuperGroupMsg(ctx context.Context, req *msg.DelSuperGroup if err := tokenverify.CheckAdmin(ctx); err != nil { return nil, err } - if err := m.MsgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, req.GroupID, []string{req.UserID}, 0); err != nil { + if err := m.MsgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, req.GroupID, 0); err != nil { return nil, err } return resp, nil @@ -31,7 +31,7 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (*msg.Cl if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil { return nil, err } - if err := m.MsgDatabase.CleanUpUserMsg(ctx, req.UserID); err != nil { + if err := m.MsgDatabase.CleanUpConversationMsgs(ctx, req.UserID); err != nil { return nil, err } return resp, nil diff --git a/internal/tools/msg.go b/internal/tools/msg.go index 3196d6682..3b7192c18 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -96,7 +96,7 @@ func (c *MsgTool) ClearSuperGroupMsg(ctx context.Context, superGroupIDs []string log.ZError(ctx, "ClearSuperGroupMsg failed", err, "groupID", groupID) continue } - if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, groupID, userIDs, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil { + if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, groupID, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil { log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "groupID", groupID, "userID", userIDs, "DBRetainChatRecords", config.Config.Mongo.DBRetainChatRecords) } if err := c.fixGroupSeq(ctx, groupID, userIDs); err != nil { @@ -114,7 +114,7 @@ func (c *MsgTool) FixGroupSeq(ctx context.Context, groupID string) error { } func (c *MsgTool) fixGroupSeq(ctx context.Context, groupID string, userIDs []string) error { - _, maxSeqMongo, maxSeqCache, err := c.msgDatabase.GetSuperGroupMinMaxSeqInMongoAndCache(ctx, groupID) + _, maxSeqMongo, _, maxSeqCache, err := c.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, groupID) if err != nil { if err == unrelation.ErrMsgNotFound { return nil @@ -126,14 +126,14 @@ func (c *MsgTool) fixGroupSeq(ctx context.Context, groupID string, userIDs []str continue } } - if err := c.CheckMaxSeqWithMongo(ctx, groupID, maxSeqCache, maxSeqMongo, constant.WriteDiffusion); err != nil { + if err := c.CheckMaxSeqWithMongo(ctx, groupID, maxSeqCache, maxSeqMongo); err != nil { log.ZWarn(ctx, "cache max seq and mongo max seq is diff > 10", err, "groupID", groupID, "maxSeqCache", maxSeqCache, "maxSeqMongo", maxSeqMongo, "constant.WriteDiffusion", constant.WriteDiffusion) } return nil } func (c *MsgTool) GetAndFixUserSeqs(ctx context.Context, userID string) (maxSeqCache, maxSeqMongo int64, err error) { - minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := c.msgDatabase.GetUserMinMaxSeqInMongoAndCache(ctx, userID) + minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := c.msgDatabase.GetConversationMinMaxSeqInMongoAndCache(ctx, userID) if err != nil { if err != unrelation.ErrMsgNotFound { log.ZError(ctx, "GetUserMinMaxSeqInMongoAndCache failed", err, "userID", userID) @@ -142,7 +142,7 @@ func (c *MsgTool) GetAndFixUserSeqs(ctx context.Context, userID string) (maxSeqC } log.ZDebug(ctx, "userID", userID, "minSeqMongo", minSeqMongo, "maxSeqMongo", maxSeqMongo, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache) if minSeqCache > maxSeqCache { - if err := c.msgDatabase.SetUserMinSeq(ctx, userID, maxSeqCache); err != nil { + if err := c.msgDatabase.SetMinSeq(ctx, userID, maxSeqCache); err != nil { log.ZError(ctx, "SetUserMinSeq failed", err, "userID", userID, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache) } else { log.ZInfo(ctx, "SetUserMinSeq success", "userID", userID, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache) @@ -152,13 +152,13 @@ func (c *MsgTool) GetAndFixUserSeqs(ctx context.Context, userID string) (maxSeqC } func (c *MsgTool) GetAndFixGroupUserSeq(ctx context.Context, userID string, groupID string, maxSeqCache int64) (minSeqCache int64, err error) { - minSeqCache, err = c.msgDatabase.GetGroupUserMinSeq(ctx, groupID, userID) + minSeqCache, err = c.msgDatabase.GetMinSeq(ctx, groupID) if err != nil { log.ZError(ctx, "GetGroupUserMinSeq failed", err, "groupID", groupID, "userID", userID) return 0, err } if minSeqCache > maxSeqCache { - if err := c.msgDatabase.SetGroupUserMinSeq(ctx, groupID, userID, maxSeqCache); err != nil { + if err := c.msgDatabase.SetConversationUserMinSeq(ctx, groupID, userID, maxSeqCache); err != nil { log.ZError(ctx, "SetGroupUserMinSeq failed", err, "groupID", groupID, "userID", userID, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache) } else { log.ZInfo(ctx, "SetGroupUserMinSeq success", "groupID", groupID, "userID", userID, "minSeqCache", minSeqCache, "maxSeqCache", maxSeqCache) @@ -192,16 +192,16 @@ func (c *MsgTool) FixAllSeq(ctx context.Context) error { return err } for _, userID := range userIDs { - userCurrentMinSeq, err := c.msgDatabase.GetUserMinSeq(ctx, userID) + userCurrentMinSeq, err := c.msgDatabase.GetMinSeq(ctx, userID) if err != nil && err != redis.Nil { continue } - userCurrentMaxSeq, err := c.msgDatabase.GetUserMaxSeq(ctx, userID) + userCurrentMaxSeq, err := c.msgDatabase.GetMaxSeq(ctx, userID) if err != nil && err != redis.Nil { continue } if userCurrentMinSeq > userCurrentMaxSeq { - if err = c.msgDatabase.SetUserMinSeq(ctx, userID, userCurrentMaxSeq); err != nil { + if err = c.msgDatabase.SetMinSeq(ctx, userID, userCurrentMaxSeq); err != nil { fmt.Println("SetUserMinSeq failed", userID, userCurrentMaxSeq) } fmt.Println("fix", userID, userCurrentMaxSeq) @@ -213,7 +213,7 @@ func (c *MsgTool) FixAllSeq(ctx context.Context) error { return err } for _, groupID := range groupIDs { - maxSeq, err := c.msgDatabase.GetGroupMaxSeq(ctx, groupID) + maxSeq, err := c.msgDatabase.GetMaxSeq(ctx, groupID) if err != nil { fmt.Println("GetGroupMaxSeq failed", groupID) continue @@ -224,13 +224,13 @@ func (c *MsgTool) FixAllSeq(ctx context.Context) error { continue } for _, userID := range userIDs { - userMinSeq, err := c.msgDatabase.GetGroupUserMinSeq(ctx, groupID, userID) + userMinSeq, err := c.msgDatabase.GetMinSeq(ctx, groupID) if err != nil && err != redis.Nil { fmt.Println("GetGroupUserMinSeq failed", groupID, userID) continue } if userMinSeq > maxSeq { - if err = c.msgDatabase.SetGroupUserMinSeq(ctx, groupID, userID, maxSeq); err != nil { + if err = c.msgDatabase.SetMinSeq(ctx, groupID, maxSeq); err != nil { fmt.Println("SetGroupUserMinSeq failed", err.Error(), groupID, userID, maxSeq) } fmt.Println("fix", groupID, userID, maxSeq, userMinSeq) diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 9bed3ebc6..9b25afc7c 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -75,8 +75,10 @@ type MsgDatabase interface { SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) GetMinSeq(ctx context.Context, conversationID string) (int64, error) - GetUserMinSeq(ctx context.Context, conversationIDs []string) (map[string]int64, error) - SetUserMinSeq(ctx context.Context, seqs map[string]int64) (err error) + GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) + GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) + SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error + SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) // to mq MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error @@ -312,7 +314,6 @@ func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID if lenList < 1 { return 0, errors.New("too short as 0") } - // judge sessionType to get seq lastMaxSeq := currentMaxSeq for _, m := range msgs { currentMaxSeq++ @@ -581,7 +582,7 @@ func (db *msgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Context, c if minSeq == 0 { return nil } - return db.cache.SetUserMinSeq(ctx, map[string]int64{conversationID: minSeq}) + return db.cache.SetMinSeq(ctx, conversationID, minSeq) } // this is struct for recursion @@ -604,9 +605,9 @@ func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, conversationID st if err != nil || msgs.DocID == "" { if err != nil { if err == unrelation.ErrMsgListNotExist { - log.NewDebug(mcontext.GetOperationID(ctx), utils.GetSelfFuncName(), "ID:", conversationID, "index:", index, err.Error()) + log.ZDebug(ctx, "deleteMsgRecursion ErrMsgListNotExist", "conversationID", conversationID, "index:", index) } else { - //log.NewError(operationID, utils.GetSelfFuncName(), "GetUserMsgListByIndex failed", err.Error(), index, ID) + log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index) } } // 获取报错,或者获取不到了,物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList), 结束递归 @@ -616,7 +617,7 @@ func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, conversationID st } return delStruct.getSetMinSeq() + 1, nil } - //log.NewDebug(operationID, "ID:", conversationID, "index:", index, "uid:", msgs.UID, "len:", len(msgs.Msg)) + log.ZDebug(ctx, "conversationID", conversationID, "index:", index, "docID", msgs.DocID, "len", len(msgs.Msg)) if int64(len(msgs.Msg)) > db.msg.GetSingleGocMsgNum() { log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgs.Msg), "docID:", msgs.DocID) } @@ -625,17 +626,17 @@ func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, conversationID st lastMsgPb := &sdkws.MsgData{} err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, lastMsgPb) if err != nil { - //log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID) + log.ZError(ctx, "proto.Unmarshal failed", err, "index", len(msgs.Msg)-1, "docID", msgs.DocID) return 0, utils.Wrap(err, "proto.Unmarshal failed") } delStruct.minSeq = lastMsgPb.Seq } else { var hasMarkDelFlag bool - for _, msg := range msgs.Msg { + for i, msg := range msgs.Msg { msgPb := &sdkws.MsgData{} err = proto.Unmarshal(msg.Msg, msgPb) if err != nil { - //log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID) + log.ZError(ctx, "proto.Unmarshal failed", err, "index", i, "docID", msgs.DocID) return 0, utils.Wrap(err, "proto.Unmarshal failed") } if utils.GetCurrentTimestampByMill() > msg.SendTime+(remainTime*1000) { @@ -666,16 +667,16 @@ func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, conversationID st func (db *msgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) { minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, conversationID) if err != nil { - return 0, 0, 0, 0, err + return } // from cache - minSeqCache, err = db.cache.GetUserMinSeq(ctx, conversationID) + minSeqCache, err = db.cache.GetMinSeq(ctx, conversationID) if err != nil { - return 0, 0, 0, 0, err + return } - maxSeqCache, err = db.cache.GetUserMaxSeq(ctx, conversationID) + maxSeqCache, err = db.cache.GetMaxSeq(ctx, conversationID) if err != nil { - return 0, 0, 0, 0, err + return } return } @@ -683,20 +684,20 @@ func (db *msgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context.Conte func (db *msgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) { oldestMsgMongo, err := db.msgDocDatabase.GetOldestMsg(ctx, conversationID) if err != nil { - return 0, 0, err + return } msgPb, err := db.unmarshalMsg(oldestMsgMongo) if err != nil { - return 0, 0, err + return } minSeqMongo = msgPb.Seq newestMsgMongo, err := db.msgDocDatabase.GetNewestMsg(ctx, conversationID) if err != nil { - return 0, 0, err + return } msgPb, err = db.unmarshalMsg(newestMsgMongo) if err != nil { - return 0, 0, err + return } maxSeqMongo = msgPb.Seq return @@ -720,9 +721,15 @@ func (db *msgDatabase) GetMinSeqs(ctx context.Context, conversationIDs []string) func (db *msgDatabase) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { return db.cache.GetMinSeq(ctx, conversationID) } -func (db *msgDatabase) GetUserMinSeq(ctx context.Context, conversationIDs []string) (map[string]int64, error) { - return db.cache.GetUserMinSeq(ctx, conversationIDs) +func (db *msgDatabase) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { + return db.cache.GetConversationUserMinSeq(ctx, conversationID, userID) +} +func (db *msgDatabase) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) { + return db.cache.GetConversationUserMinSeqs(ctx, conversationID, userIDs) +} +func (db *msgDatabase) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error { + return db.cache.SetConversationUserMinSeq(ctx, conversationID, userID, minSeq) } -func (db *msgDatabase) SetUserMinSeq(ctx context.Context, seqs map[string]int64) (err error) { - return db.cache.SetUserMinSeq(ctx, seqs) +func (db *msgDatabase) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) { + return db.cache.SetConversationUserMinSeqs(ctx, conversationID, seqs) } diff --git a/pkg/utils/strings.go b/pkg/utils/strings.go index f8feeaa8d..8b4229fcd 100644 --- a/pkg/utils/strings.go +++ b/pkg/utils/strings.go @@ -112,6 +112,24 @@ func GetConversationIDBySessionType(sessionType int, ids ...string) string { return "" } +func GetNotificationConversationIDBySessionType(sessionType int, ids ...string) string { + sort.Strings(ids) + if len(ids) > 2 || len(ids) < 1 { + return "" + } + switch sessionType { + case constant.SingleChatType: + return "n_" + strings.Join(ids, "_") // single chat + case constant.GroupChatType: + return "n_" + ids[0] // group chat + case constant.SuperGroupChatType: + return "n_" + ids[0] // super group chat + case constant.NotificationChatType: + return "n_" + ids[0] // server notification chat + } + return "" +} + func IsNotification(conversationID string) bool { return strings.HasPrefix(conversationID, "n_") }