fix: fix lint errors in pkg/common/db/controller

pull/1263/head
cncsmonster 2 years ago
parent 7cb011fd5e
commit e7e48a8c74

@ -69,9 +69,9 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI
} }
} }
if len(deleteTokenKey) != 0 { if len(deleteTokenKey) != 0 {
err := a.cache.DeleteTokenByUidPid(ctx, userID, platformID, deleteTokenKey) err2 := a.cache.DeleteTokenByUidPid(ctx, userID, platformID, deleteTokenKey)
if err != nil { if err2 != nil {
return "", err return "", err2
} }
} }
claims := tokenverify.BuildClaims(userID, platformID, a.accessExpire) claims := tokenverify.BuildClaims(userID, platformID, a.accessExpire)
@ -80,5 +80,6 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI
if err != nil { if err != nil {
return "", utils.Wrap(err, "") return "", utils.Wrap(err, "")
} }
return tokenString, a.cache.AddTokenFlag(ctx, userID, platformID, tokenString, constant.NormalToken) return tokenString, a.cache.AddTokenFlag(ctx, userID, platformID, tokenString, constant.NormalToken)
} }

@ -55,6 +55,7 @@ func (b *blackDatabase) Create(ctx context.Context, blacks []*relation.BlackMode
if err := b.black.Create(ctx, blacks); err != nil { if err := b.black.Create(ctx, blacks); err != nil {
return err return err
} }
return b.deleteBlackIDsCache(ctx, blacks) return b.deleteBlackIDsCache(ctx, blacks)
} }
@ -63,6 +64,7 @@ func (b *blackDatabase) Delete(ctx context.Context, blacks []*relation.BlackMode
if err := b.black.Delete(ctx, blacks); err != nil { if err := b.black.Delete(ctx, blacks); err != nil {
return err return err
} }
return b.deleteBlackIDsCache(ctx, blacks) return b.deleteBlackIDsCache(ctx, blacks)
} }
@ -71,6 +73,7 @@ func (b *blackDatabase) deleteBlackIDsCache(ctx context.Context, blacks []*relat
for _, black := range blacks { for _, black := range blacks {
cache = cache.DelBlackIDs(ctx, black.OwnerUserID) cache = cache.DelBlackIDs(ctx, black.OwnerUserID)
} }
return cache.ExecDel(ctx) return cache.ExecDel(ctx)
} }
@ -97,6 +100,7 @@ func (b *blackDatabase) CheckIn(
return return
} }
log.ZDebug(ctx, "blackIDs", "user1BlackIDs", userID1BlackIDs, "user2BlackIDs", userID2BlackIDs) log.ZDebug(ctx, "blackIDs", "user1BlackIDs", userID1BlackIDs, "user2BlackIDs", userID2BlackIDs)
return utils.IsContain(userID2, userID1BlackIDs), utils.IsContain(userID1, userID2BlackIDs), nil return utils.IsContain(userID2, userID1BlackIDs), utils.IsContain(userID1, userID2BlackIDs), nil
} }

@ -99,8 +99,8 @@ func (c *conversationDatabase) SetUsersConversationFiledTx(ctx context.Context,
now := time.Now() now := time.Now()
for _, v := range NotUserIDs { for _, v := range NotUserIDs {
temp := new(relationtb.ConversationModel) temp := new(relationtb.ConversationModel)
if err := utils.CopyStructFields(temp, conversation); err != nil { if err2 := utils.CopyStructFields(temp, conversation); err2 != nil {
return err return err2
} }
temp.OwnerUserID = v temp.OwnerUserID = v
temp.CreateTime = now temp.CreateTime = now
@ -113,10 +113,12 @@ func (c *conversationDatabase) SetUsersConversationFiledTx(ctx context.Context,
} }
cache = cache.DelConversationIDs(NotUserIDs...).DelUserConversationIDsHash(NotUserIDs...).DelConversations(conversation.ConversationID, NotUserIDs...) cache = cache.DelConversationIDs(NotUserIDs...).DelUserConversationIDsHash(NotUserIDs...).DelConversations(conversation.ConversationID, NotUserIDs...)
} }
return nil return nil
}); err != nil { }); err != nil {
return err return err
} }
return cache.ExecDel(ctx) return cache.ExecDel(ctx)
} }
@ -130,6 +132,7 @@ func (c *conversationDatabase) UpdateUsersConversationFiled(ctx context.Context,
if _, ok := args["recv_msg_opt"]; ok { if _, ok := args["recv_msg_opt"]; ok {
cache = cache.DelConversationNotReceiveMessageUserIDs(conversationID) cache = cache.DelConversationNotReceiveMessageUserIDs(conversationID)
} }
return cache.ExecDel(ctx) return cache.ExecDel(ctx)
} }
@ -137,13 +140,14 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat
if err := c.conversationDB.Create(ctx, conversations); err != nil { if err := c.conversationDB.Create(ctx, conversations); err != nil {
return err return err
} }
var userIDs []string userIDs := make([]string, 0, len(conversations))
cache := c.cache.NewCache() cache := c.cache.NewCache()
for _, conversation := range conversations { for _, conversation := range conversations {
cache = cache.DelConversations(conversation.OwnerUserID, conversation.ConversationID) cache = cache.DelConversations(conversation.OwnerUserID, conversation.ConversationID)
cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID) cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID)
userIDs = append(userIDs, conversation.OwnerUserID) userIDs = append(userIDs, conversation.OwnerUserID)
} }
return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).ExecDel(ctx) return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).ExecDel(ctx)
} }
@ -178,10 +182,12 @@ func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(ctx context.Con
} }
} }
} }
return nil return nil
}); err != nil { }); err != nil {
return err return err
} }
return cache.ExecDel(ctx) return cache.ExecDel(ctx)
} }
@ -234,12 +240,15 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs
if err != nil { if err != nil {
return err return err
} }
cache = cache.DelConversationIDs(ownerUserID).DelUserConversationIDsHash(ownerUserID).DelConversationNotReceiveMessageUserIDs(utils.Slice(notExistConversations, func(e *relationtb.ConversationModel) string { return e.ConversationID })...) cache = cache.DelConversationIDs(ownerUserID).DelUserConversationIDsHash(ownerUserID)
cache = cache.DelConversationNotReceiveMessageUserIDs(utils.Slice(notExistConversations, func(e *relationtb.ConversationModel) string { return e.ConversationID })...)
} }
return nil return nil
}); err != nil { }); err != nil {
return err return err
} }
return cache.ExecDel(ctx) return cache.ExecDel(ctx)
} }
@ -276,10 +285,12 @@ func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context,
for _, v := range existConversationUserIDs { for _, v := range existConversationUserIDs {
cache = cache.DelConversations(v, conversationID) cache = cache.DelConversations(v, conversationID)
} }
return nil return nil
}); err != nil { }); err != nil {
return err return err
} }
return cache.ExecDel(ctx) return cache.ExecDel(ctx)
} }

@ -16,6 +16,7 @@ package controller
import ( import (
"context" "context"
"errors"
"time" "time"
"gorm.io/gorm" "gorm.io/gorm"
@ -109,6 +110,7 @@ func (f *friendDatabase) CheckIn(
if err != nil { if err != nil {
return return
} }
return utils.IsContain(userID2, userID1FriendIDs), utils.IsContain(userID1, userID2FriendIDs), nil return utils.IsContain(userID2, userID1FriendIDs), utils.IsContain(userID1, userID2FriendIDs), nil
} }
@ -121,8 +123,8 @@ func (f *friendDatabase) AddFriendRequest(
) (err error) { ) (err error) {
return f.tx.Transaction(func(tx any) error { return f.tx.Transaction(func(tx any) error {
_, err := f.friendRequest.NewTx(tx).Take(ctx, fromUserID, toUserID) _, err := f.friendRequest.NewTx(tx).Take(ctx, fromUserID, toUserID)
// 有db错误 // if there is a db error
if err != nil && errs.Unwrap(err) != gorm.ErrRecordNotFound { if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
return err return err
} }
// 无错误 则更新 // 无错误 则更新
@ -136,12 +138,14 @@ func (f *friendDatabase) AddFriendRequest(
if err := f.friendRequest.NewTx(tx).UpdateByMap(ctx, fromUserID, toUserID, m); err != nil { if err := f.friendRequest.NewTx(tx).UpdateByMap(ctx, fromUserID, toUserID, m); err != nil {
return err return err
} }
return nil return nil
} }
// gorm.ErrRecordNotFound 错误,则新增 // gorm.ErrRecordNotFound 错误,则新增
if err := f.friendRequest.NewTx(tx).Create(ctx, []*relation.FriendRequestModel{{FromUserID: fromUserID, ToUserID: toUserID, ReqMsg: reqMsg, Ex: ex, CreateTime: time.Now(), HandleTime: time.Unix(0, 0)}}); err != nil { if err := f.friendRequest.NewTx(tx).Create(ctx, []*relation.FriendRequestModel{{FromUserID: fromUserID, ToUserID: toUserID, ReqMsg: reqMsg, Ex: ex, CreateTime: time.Now(), HandleTime: time.Unix(0, 0)}}); err != nil {
return err return err
} }
return nil return nil
}) })
} }
@ -154,11 +158,11 @@ func (f *friendDatabase) BecomeFriends(
addSource int32, addSource int32,
) (err error) { ) (err error) {
cache := f.cache.NewCache() cache := f.cache.NewCache()
if err := f.tx.Transaction(func(tx any) error { fn := func(tx any) error {
// 先find 找出重复的 去掉重复的 // first,find and drop delete ones
fs1, err := f.friend.NewTx(tx).FindFriends(ctx, ownerUserID, friendUserIDs) fs1, err2 := f.friend.NewTx(tx).FindFriends(ctx, ownerUserID, friendUserIDs)
if err != nil { if err2 != nil {
return err return err2
} }
opUserID := mcontext.GetOperationID(ctx) opUserID := mcontext.GetOperationID(ctx)
for _, v := range friendUserIDs { for _, v := range friendUserIDs {
@ -168,13 +172,13 @@ func (f *friendDatabase) BecomeFriends(
return e.FriendUserID return e.FriendUserID
}) })
err = f.friend.NewTx(tx).Create(ctx, fs11) err2 = f.friend.NewTx(tx).Create(ctx, fs11)
if err != nil { if err2 != nil {
return err return err2
} }
fs2, err := f.friend.NewTx(tx).FindReversalFriends(ctx, ownerUserID, friendUserIDs) fs2, err2 := f.friend.NewTx(tx).FindReversalFriends(ctx, ownerUserID, friendUserIDs)
if err != nil { if err2 != nil {
return err return err2
} }
var newFriendIDs []string var newFriendIDs []string
for _, v := range friendUserIDs { for _, v := range friendUserIDs {
@ -184,16 +188,20 @@ func (f *friendDatabase) BecomeFriends(
fs22 := utils.DistinctAny(fs2, func(e *relation.FriendModel) string { fs22 := utils.DistinctAny(fs2, func(e *relation.FriendModel) string {
return e.OwnerUserID return e.OwnerUserID
}) })
err = f.friend.NewTx(tx).Create(ctx, fs22) err2 = f.friend.NewTx(tx).Create(ctx, fs22)
if err != nil { if err2 != nil {
return err return err2
} }
newFriendIDs = append(newFriendIDs, ownerUserID) newFriendIDs = append(newFriendIDs, ownerUserID)
cache = cache.DelFriendIDs(newFriendIDs...) cache = cache.DelFriendIDs(newFriendIDs...)
return nil return nil
}); err != nil {
return nil
} }
err = f.tx.Transaction(fn)
if err != nil {
return err
}
return cache.ExecDel(ctx) return cache.ExecDel(ctx)
} }
@ -216,6 +224,7 @@ func (f *friendDatabase) RefuseFriendRequest(
if err != nil { if err != nil {
return err return err
} }
return nil return nil
} }
@ -251,7 +260,7 @@ func (f *friendDatabase) AgreeFriendRequest(
if err != nil { if err != nil {
return err return err
} }
} else if err != nil && errs.Unwrap(err) != gorm.ErrRecordNotFound { } else if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
return err return err
} }
@ -290,6 +299,7 @@ func (f *friendDatabase) AgreeFriendRequest(
return err return err
} }
} }
return f.cache.DelFriendIDs(friendRequest.ToUserID, friendRequest.FromUserID).ExecDel(ctx) return f.cache.DelFriendIDs(friendRequest.ToUserID, friendRequest.FromUserID).ExecDel(ctx)
}) })
} }
@ -299,6 +309,7 @@ func (f *friendDatabase) Delete(ctx context.Context, ownerUserID string, friendU
if err := f.friend.Delete(ctx, ownerUserID, friendUserIDs); err != nil { if err := f.friend.Delete(ctx, ownerUserID, friendUserIDs); err != nil {
return err return err
} }
return f.cache.DelFriendIDs(append(friendUserIDs, ownerUserID)...).ExecDel(ctx) return f.cache.DelFriendIDs(append(friendUserIDs, ownerUserID)...).ExecDel(ctx)
} }
@ -307,6 +318,7 @@ func (f *friendDatabase) UpdateRemark(ctx context.Context, ownerUserID, friendUs
if err := f.friend.UpdateRemark(ctx, ownerUserID, friendUserID, remark); err != nil { if err := f.friend.UpdateRemark(ctx, ownerUserID, friendUserID, remark); err != nil {
return err return err
} }
return f.cache.DelFriend(ownerUserID, friendUserID).ExecDel(ctx) return f.cache.DelFriend(ownerUserID, friendUserID).ExecDel(ctx)
} }
@ -359,6 +371,7 @@ func (f *friendDatabase) FindFriendsWithError(
if len(friends) != len(friendUserIDs) { if len(friends) != len(friendUserIDs) {
err = errs.ErrRecordNotFound.Wrap() err = errs.ErrRecordNotFound.Wrap()
} }
return return
} }

@ -102,6 +102,7 @@ func NewGroupDatabase(
cache: cache, cache: cache,
mongoDB: superGroup, mongoDB: superGroup,
} }
return database return database
} }
@ -109,6 +110,7 @@ func InitGroupDatabase(db *gorm.DB, rdb redis.UniversalClient, database *mongo.D
rcOptions := rockscache.NewDefaultOptions() rcOptions := rockscache.NewDefaultOptions()
rcOptions.StrongConsistency = true rcOptions.StrongConsistency = true
rcOptions.RandomExpireAdjustment = 0.2 rcOptions.RandomExpireAdjustment = 0.2
return NewGroupDatabase( return NewGroupDatabase(
relation.NewGroupDB(db), relation.NewGroupDB(db),
relation.NewGroupMemberDB(db), relation.NewGroupMemberDB(db),
@ -151,6 +153,7 @@ func (g *groupDatabase) FindGroupMemberNum(ctx context.Context, groupID string)
if err != nil { if err != nil {
return 0, err return 0, err
} }
return uint32(num), nil return uint32(num), nil
} }
@ -184,10 +187,12 @@ func (g *groupDatabase) CreateGroup(
cache = cache.DelJoinedGroupID(groupMember.UserID).DelGroupMembersInfo(groupMember.GroupID, groupMember.UserID) cache = cache.DelJoinedGroupID(groupMember.UserID).DelGroupMembersInfo(groupMember.GroupID, groupMember.UserID)
} }
cache = cache.DelGroupsInfo(createGroupIDs...) cache = cache.DelGroupsInfo(createGroupIDs...)
return nil return nil
}); err != nil { }); err != nil {
return err return err
} }
return cache.ExecDel(ctx) return cache.ExecDel(ctx)
} }
@ -211,6 +216,7 @@ func (g *groupDatabase) UpdateGroup(ctx context.Context, groupID string, data ma
if err := g.groupDB.UpdateMap(ctx, groupID, data); err != nil { if err := g.groupDB.UpdateMap(ctx, groupID, data); err != nil {
return err return err
} }
return g.cache.DelGroupsInfo(groupID).ExecDel(ctx) return g.cache.DelGroupsInfo(groupID).ExecDel(ctx)
} }
@ -231,10 +237,12 @@ func (g *groupDatabase) DismissGroup(ctx context.Context, groupID string, delete
cache = cache.DelJoinedGroupID(userIDs...).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelGroupMembersHash(groupID) cache = cache.DelJoinedGroupID(userIDs...).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelGroupMembersHash(groupID)
} }
cache = cache.DelGroupsInfo(groupID) cache = cache.DelGroupsInfo(groupID)
return nil return nil
}); err != nil { }); err != nil {
return err return err
} }
return cache.ExecDel(ctx) return cache.ExecDel(ctx)
} }
@ -276,6 +284,7 @@ func (g *groupDatabase) FindGroupMember(ctx context.Context, groupIDs []string,
} }
res = append(res, v) res = append(res, v)
} }
return res, nil return res, nil
} }
if len(roleLevels) == 0 { if len(roleLevels) == 0 {
@ -286,8 +295,10 @@ func (g *groupDatabase) FindGroupMember(ctx context.Context, groupIDs []string,
} }
totalGroupMembers = append(totalGroupMembers, groupMembers...) totalGroupMembers = append(totalGroupMembers, groupMembers...)
} }
return totalGroupMembers, nil return totalGroupMembers, nil
} }
return g.groupMemberDB.Find(ctx, groupIDs, userIDs, roleLevels) return g.groupMemberDB.Find(ctx, groupIDs, userIDs, roleLevels)
} }
@ -307,6 +318,7 @@ func (g *groupDatabase) PageGetJoinGroup(
} }
totalGroupMembers = append(totalGroupMembers, groupMembers...) totalGroupMembers = append(totalGroupMembers, groupMembers...)
} }
return uint32(len(groupIDs)), totalGroupMembers, nil return uint32(len(groupIDs)), totalGroupMembers, nil
} }
@ -327,6 +339,7 @@ func (g *groupDatabase) PageGetGroupMember(
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }
return uint32(len(groupMemberIDs)), members, nil return uint32(len(groupMemberIDs)), members, nil
} }
@ -378,6 +391,7 @@ func (g *groupDatabase) HandlerGroupRequest(
return err return err
} }
} }
return nil return nil
}) })
} }
@ -386,6 +400,7 @@ func (g *groupDatabase) DeleteGroupMember(ctx context.Context, groupID string, u
if err := g.groupMemberDB.Delete(ctx, groupID, userIDs); err != nil { if err := g.groupMemberDB.Delete(ctx, groupID, userIDs); err != nil {
return err return err
} }
return g.cache.DelGroupMembersHash(groupID). return g.cache.DelGroupMembersHash(groupID).
DelGroupMemberIDs(groupID). DelGroupMemberIDs(groupID).
DelGroupsMemberNum(groupID). DelGroupsMemberNum(groupID).
@ -410,6 +425,7 @@ func (g *groupDatabase) MapGroupMemberNum(ctx context.Context, groupIDs []string
} }
m[groupID] = uint32(num) m[groupID] = uint32(num)
} }
return m, nil return m, nil
} }
@ -429,6 +445,7 @@ func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string,
if rowsAffected != 1 { if rowsAffected != 1 {
return utils.Wrap(fmt.Errorf("newOwnerUserID %s rowsAffected = %d", newOwnerUserID, rowsAffected), "") return utils.Wrap(fmt.Errorf("newOwnerUserID %s rowsAffected = %d", newOwnerUserID, rowsAffected), "")
} }
return g.cache.DelGroupMembersInfo(groupID, oldOwnerUserID, newOwnerUserID).DelGroupMembersHash(groupID).ExecDel(ctx) return g.cache.DelGroupMembersInfo(groupID, oldOwnerUserID, newOwnerUserID).DelGroupMembersHash(groupID).ExecDel(ctx)
}) })
} }
@ -442,6 +459,7 @@ func (g *groupDatabase) UpdateGroupMember(
if err := g.groupMemberDB.Update(ctx, groupID, userID, data); err != nil { if err := g.groupMemberDB.Update(ctx, groupID, userID, data); err != nil {
return err return err
} }
return g.cache.DelGroupMembersInfo(groupID, userID).ExecDel(ctx) return g.cache.DelGroupMembersInfo(groupID, userID).ExecDel(ctx)
} }
@ -454,10 +472,12 @@ func (g *groupDatabase) UpdateGroupMembers(ctx context.Context, data []*relation
} }
cache = cache.DelGroupMembersInfo(item.GroupID, item.UserID) cache = cache.DelGroupMembersInfo(item.GroupID, item.UserID)
} }
return nil return nil
}); err != nil { }); err != nil {
return err return err
} }
return cache.ExecDel(ctx) return cache.ExecDel(ctx)
} }
@ -469,6 +489,7 @@ func (g *groupDatabase) CreateGroupRequest(ctx context.Context, requests []*rela
return err return err
} }
} }
return db.Create(ctx, requests) return db.Create(ctx, requests)
}) })
} }
@ -504,6 +525,7 @@ func (g *groupDatabase) CreateSuperGroup(ctx context.Context, groupID string, in
if err := g.mongoDB.CreateSuperGroup(ctx, groupID, initMemberIDs); err != nil { if err := g.mongoDB.CreateSuperGroup(ctx, groupID, initMemberIDs); err != nil {
return err return err
} }
return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(initMemberIDs...).ExecDel(ctx) return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(initMemberIDs...).ExecDel(ctx)
} }
@ -521,10 +543,12 @@ func (g *groupDatabase) DeleteSuperGroup(ctx context.Context, groupID string) er
if len(models) > 0 { if len(models) > 0 {
cache = cache.DelJoinedSuperGroupIDs(models[0].MemberIDs...) cache = cache.DelJoinedSuperGroupIDs(models[0].MemberIDs...)
} }
return nil return nil
}); err != nil { }); err != nil {
return err return err
} }
return cache.ExecDel(ctx) return cache.ExecDel(ctx)
} }
@ -532,6 +556,7 @@ func (g *groupDatabase) DeleteSuperGroupMember(ctx context.Context, groupID stri
if err := g.mongoDB.RemoverUserFromSuperGroup(ctx, groupID, userIDs); err != nil { if err := g.mongoDB.RemoverUserFromSuperGroup(ctx, groupID, userIDs); err != nil {
return err return err
} }
return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx) return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx)
} }
@ -539,6 +564,7 @@ func (g *groupDatabase) CreateSuperGroupMember(ctx context.Context, groupID stri
if err := g.mongoDB.AddUserToSuperGroup(ctx, groupID, userIDs); err != nil { if err := g.mongoDB.AddUserToSuperGroup(ctx, groupID, userIDs); err != nil {
return err return err
} }
return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx) return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx)
} }

@ -135,6 +135,7 @@ func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database)
cacheModel := cache.NewMsgCacheModel(rdb) cacheModel := cache.NewMsgCacheModel(rdb)
msgDocModel := unrelation.NewMsgMongoDriver(database) msgDocModel := unrelation.NewMsgMongoDriver(database)
CommonMsgDatabase := NewCommonMsgDatabase(msgDocModel, cacheModel) CommonMsgDatabase := NewCommonMsgDatabase(msgDocModel, cacheModel)
return CommonMsgDatabase return CommonMsgDatabase
} }
@ -150,14 +151,17 @@ type commonMsgDatabase struct {
func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error { func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error {
_, _, err := db.producer.SendMessage(ctx, key, msg2mq) _, _, err := db.producer.SendMessage(ctx, key, msg2mq)
return err return err
} }
func (db *commonMsgDatabase) MsgToModifyMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData) error { func (db *commonMsgDatabase) MsgToModifyMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData) error {
if len(messages) > 0 { if len(messages) > 0 {
_, _, err := db.producerToModify.SendMessage(ctx, key, &pbmsg.MsgDataToModifyByMQ{ConversationID: conversationID, Messages: messages}) _, _, err := db.producerToModify.SendMessage(ctx, key, &pbmsg.MsgDataToModifyByMQ{ConversationID: conversationID, Messages: messages})
return err return err
} }
return nil return nil
} }
@ -165,26 +169,26 @@ func (db *commonMsgDatabase) MsgToPushMQ(ctx context.Context, key, conversationI
partition, offset, err := db.producerToPush.SendMessage(ctx, key, &pbmsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID}) partition, offset, err := db.producerToPush.SendMessage(ctx, key, &pbmsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID})
if err != nil { if err != nil {
log.ZError(ctx, "MsgToPushMQ", err, "key", key, "msg2mq", msg2mq) log.ZError(ctx, "MsgToPushMQ", err, "key", key, "msg2mq", msg2mq)
return 0, 0, err return 0, 0, err
} }
return partition, offset, nil return partition, offset, nil
} }
func (db *commonMsgDatabase) MsgToMongoMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error { func (db *commonMsgDatabase) MsgToMongoMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error {
if len(messages) > 0 { if len(messages) > 0 {
_, _, err := db.producerToMongo.SendMessage(ctx, key, &pbmsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages}) _, _, err := db.producerToMongo.SendMessage(ctx, key, &pbmsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages})
return err return err
} }
return nil
}
func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error {
if len(fields) == 0 {
return nil return nil
} }
num := db.msg.GetSingleGocMsgNum()
func checkTypeForBatchInsertBlock(fields []any, key int8, firstSeq int64) error {
// num = 100 // num = 100
for i, field := range fields { // 检查类型 for i, field := range fields { // check type
var ok bool var ok bool
switch key { switch key {
case updateKeyMsg: case updateKeyMsg:
@ -202,8 +206,11 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
return errs.ErrInternalServer.Wrap("field type is invalid") return errs.ErrInternalServer.Wrap("field type is invalid")
} }
} }
// 返回值为true表示数据库存在该文档false表示数据库不存在该文档
updateMsgModel := func(seq int64, i int) (bool, error) { return nil
}
func (db *commonMsgDatabase) updateMsgModelForBatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, seq int64, i int) (bool, error) {
var ( var (
res *mongo.UpdateResult res *mongo.UpdateResult
err error err error
@ -220,25 +227,16 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
if err != nil { if err != nil {
return false, err return false, err
} }
return res.MatchedCount > 0, nil return res.MatchedCount > 0, nil
} }
tryUpdate := true
for i := 0; i < len(fields); i++ { func (db *commonMsgDatabase) newDocForBatchInsertBlock(conversationID string, fields []any, key int8, seq, firstSeq, num int64, i int) (unrelationtb.MsgDocModel, int) {
seq := firstSeq + int64(i) // 当前seq
if tryUpdate {
matched, err := updateMsgModel(seq, i)
if err != nil {
return err
}
if matched {
continue // 匹配到了,继续下一个(不一定修改)
}
}
doc := unrelationtb.MsgDocModel{ doc := unrelationtb.MsgDocModel{
DocID: db.msg.GetDocID(conversationID, seq), DocID: db.msg.GetDocID(conversationID, seq),
Msg: make([]*unrelationtb.MsgInfoModel, num), Msg: make([]*unrelationtb.MsgInfoModel, num),
} }
var insert int // 插入的数量 var insert int // number of inserted
for j := i; j < len(fields); j++ { for j := i; j < len(fields); j++ {
seq = firstSeq + int64(j) seq = firstSeq + int64(j)
if db.msg.GetDocID(conversationID, seq) != doc.DocID { if db.msg.GetDocID(conversationID, seq) != doc.DocID {
@ -265,17 +263,49 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
doc.Msg[i].DelList = []string{} doc.Msg[i].DelList = []string{}
} }
} }
return doc, insert
}
func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error {
if len(fields) == 0 {
return nil
}
num := db.msg.GetSingleGocMsgNum()
// num = 100
err := checkTypeForBatchInsertBlock(fields, key, firstSeq)
if err != nil {
return err
}
tryUpdate := true
for i := 0; i < len(fields); i++ {
seq := firstSeq + int64(i) // current seq
// try update
if tryUpdate {
matched, err := db.updateMsgModelForBatchInsertBlock(ctx, conversationID, fields, key, seq, i)
if err != nil {
return err
}
if matched {
continue // if matched,skip
}
}
doc, insert := db.newDocForBatchInsertBlock(conversationID, fields, key, seq, firstSeq, num, i)
// insert doc into db
if err := db.msgDocDatabase.Create(ctx, &doc); err != nil { if err := db.msgDocDatabase.Create(ctx, &doc); err != nil {
if mongo.IsDuplicateKeyError(err) { if mongo.IsDuplicateKeyError(err) {
i-- // 存在并发,重试当前数据 i-- // exists concurrent,
tryUpdate = true // 以修改模式 tryUpdate = true // try update
continue continue
} }
return err return err
} }
tryUpdate = false // 当前以插入成功,下一块优先插入模式 tryUpdate = false // if insert success,change to insert mode
i += insert - 1 // 跳过已插入的数据 i += insert - 1 // skip inserted data
} }
return nil return nil
} }
@ -322,6 +352,7 @@ func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversatio
Ex: msg.Ex, Ex: msg.Ex,
} }
} }
return db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, msgList[0].Seq) return db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, msgList[0].Seq)
} }
@ -338,9 +369,11 @@ func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userI
log.ZDebug(ctx, "MarkSingleChatMsgsAsRead", "userID", userID, "docID", docID, "indexes", indexes) log.ZDebug(ctx, "MarkSingleChatMsgsAsRead", "userID", userID, "docID", docID, "indexes", indexes)
if err := db.msgDocDatabase.MarkSingleChatMsgsAsRead(ctx, userID, docID, indexes); err != nil { if err := db.msgDocDatabase.MarkSingleChatMsgsAsRead(ctx, userID, docID, indexes); err != nil {
log.ZError(ctx, "MarkSingleChatMsgsAsRead", err, "userID", userID, "docID", docID, "indexes", indexes) log.ZError(ctx, "MarkSingleChatMsgsAsRead", err, "userID", userID, "docID", docID, "indexes", indexes)
return err return err
} }
} }
return nil return nil
} }
@ -354,8 +387,9 @@ func (db *commonMsgDatabase) DelUserDeleteMsgsList(ctx context.Context, conversa
func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) {
currentMaxSeq, err := db.cache.GetMaxSeq(ctx, conversationID) currentMaxSeq, err := db.cache.GetMaxSeq(ctx, conversationID)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && !errors.Is(err, redis.Nil) {
prome.Inc(prome.SeqGetFailedCounter) prome.Inc(prome.SeqGetFailedCounter)
return 0, false, err return 0, false, err
} }
prome.Inc(prome.SeqGetSuccessCounter) prome.Inc(prome.SeqGetSuccessCounter)
@ -366,7 +400,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
if lenList < 1 { if lenList < 1 {
return 0, false, errors.New("too short as 0") return 0, false, errors.New("too short as 0")
} }
if errs.Unwrap(err) == redis.Nil { if errors.Is(err, redis.Nil) {
isNew = true isNew = true
} }
lastMaxSeq := currentMaxSeq lastMaxSeq := currentMaxSeq
@ -396,6 +430,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
} else { } else {
prome.Inc(prome.SeqSetSuccessCounter) prome.Inc(prome.SeqSetSuccessCounter)
} }
return lastMaxSeq, isNew, utils.Wrap(err, "") return lastMaxSeq, isNew, utils.Wrap(err, "")
} }
@ -410,6 +445,7 @@ func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversat
totalMsgs = append(totalMsgs, convert.MsgDB2Pb(msg.Msg)) totalMsgs = append(totalMsgs, convert.MsgDB2Pb(msg.Msg))
} }
} }
return totalMsgs, nil return totalMsgs, nil
} }
@ -420,6 +456,7 @@ func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID
msg.Msg.IsRead = true msg.Msg.IsRead = true
} }
} }
return msgs, err return msgs, err
} }
@ -438,16 +475,76 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin
seqMsgs = append(seqMsgs, convert.MsgDB2Pb(msg.Msg)) seqMsgs = append(seqMsgs, convert.MsgDB2Pb(msg.Msg))
} }
} }
return seqMsgs, nil return seqMsgs, nil
} }
func (db *commonMsgDatabase) getCacheMsgForGetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin int64, seqs []int64) ([]*sdkws.MsgData, []int64, error) {
newBegin := seqs[0]
newEnd := seqs[len(seqs)-1]
log.ZDebug(ctx, "GetMsgBySeqsRange", "first seqs", seqs, "newBegin", newBegin, "newEnd", newEnd)
cachedMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs)
if err != nil {
if !errors.Is(err, redis.Nil) {
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
log.ZError(ctx, "get message from redis exception", err, "conversationID", conversationID, "seqs", seqs)
}
}
var successMsgs []*sdkws.MsgData
if len(cachedMsgs) == 0 {
return successMsgs, failedSeqs, err
}
// if len(cachedMsgs) > 0
delSeqs, err2 := db.cache.GetUserDelList(ctx, userID, conversationID)
if err2 != nil && !errors.Is(err2, redis.Nil) {
return nil, nil, err2
}
var cacheDelNum int
for _, msg := range cachedMsgs {
if !utils.Contain(msg.Seq, delSeqs...) {
successMsgs = append(successMsgs, msg)
} else {
cacheDelNum += 1
}
}
log.ZDebug(ctx, "get delSeqs from redis", "delSeqs", delSeqs, "userID", userID, "conversationID", conversationID, "cacheDelNum", cacheDelNum)
var reGetSeqsCache []int64
for i := 1; i <= cacheDelNum; {
newSeq := newBegin - int64(i)
if newSeq >= begin {
if !utils.Contain(newSeq, delSeqs...) {
log.ZDebug(ctx, "seq del in cache, a new seq in range append", "new seq", newSeq)
reGetSeqsCache = append(reGetSeqsCache, newSeq)
i++
}
} else {
break
}
}
if len(reGetSeqsCache) > 0 {
log.ZDebug(ctx, "reGetSeqsCache", "reGetSeqsCache", reGetSeqsCache)
cachedMsgs, failedSeqs2, err2 := db.cache.GetMessagesBySeq(ctx, conversationID, reGetSeqsCache)
if err2 != nil {
if !errors.Is(err2, redis.Nil) {
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs2))
log.ZError(ctx, "get message from redis exception", err2, "conversationID", conversationID, "seqs", reGetSeqsCache)
}
}
failedSeqs = append(failedSeqs, failedSeqs2...)
successMsgs = append(successMsgs, cachedMsgs...)
}
return successMsgs, failedSeqs, err
}
func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (int64, int64, []*sdkws.MsgData, error) { func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (int64, int64, []*sdkws.MsgData, error) {
// 从缓存中获取最小和最大序列号,并根据给定的范围值进行调整
userMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID) userMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && !errors.Is(err, redis.Nil) {
return 0, 0, nil, err return 0, 0, nil, err
} }
minSeq, err := db.cache.GetMinSeq(ctx, conversationID) minSeq, err := db.cache.GetMinSeq(ctx, conversationID)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && !errors.Is(err, redis.Nil) {
return 0, 0, nil, err return 0, 0, nil, err
} }
if userMinSeq > minSeq { if userMinSeq > minSeq {
@ -455,18 +552,25 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
} }
if minSeq > end { if minSeq > end {
log.ZInfo(ctx, "minSeq > end", "minSeq", minSeq, "end", end) log.ZInfo(ctx, "minSeq > end", "minSeq", minSeq, "end", end)
return 0, 0, nil, nil return 0, 0, nil, nil
} }
maxSeq, err := db.cache.GetMaxSeq(ctx, conversationID) maxSeq, err := db.cache.GetMaxSeq(ctx, conversationID)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && !errors.Is(err, redis.Nil) {
return 0, 0, nil, err return 0, 0, nil, err
} }
// log out debug info
log.ZDebug(ctx, "GetMsgBySeqsRange", "userMinSeq", userMinSeq, "conMinSeq", minSeq, "conMaxSeq", maxSeq, "userMaxSeq", userMaxSeq) log.ZDebug(ctx, "GetMsgBySeqsRange", "userMinSeq", userMinSeq, "conMinSeq", minSeq, "conMaxSeq", maxSeq, "userMaxSeq", userMaxSeq)
// adjust maxSeq according to userMaxSeq
if userMaxSeq != 0 { if userMaxSeq != 0 {
if userMaxSeq < maxSeq { if userMaxSeq < maxSeq {
maxSeq = userMaxSeq maxSeq = userMaxSeq
} }
} }
// adjust begin and end according to minSeq and maxSeq
if begin < minSeq { if begin < minSeq {
begin = minSeq begin = minSeq
} }
@ -476,6 +580,8 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
if end < begin { if end < begin {
return 0, 0, nil, errs.ErrArgs.Wrap("seq end < begin") return 0, 0, nil, errs.ErrArgs.Wrap("seq end < begin")
} }
// get seqs to search
var seqs []int64 var seqs []int64
for i := end; i > end-num; i-- { for i := end; i > end-num; i-- {
if i >= begin { if i >= begin {
@ -487,67 +593,24 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
if len(seqs) == 0 { if len(seqs) == 0 {
return 0, 0, nil, nil return 0, 0, nil, nil
} }
newBegin := seqs[0]
newEnd := seqs[len(seqs)-1] // get info from cache,and filter deleted msg
log.ZDebug(ctx, "GetMsgBySeqsRange", "first seqs", seqs, "newBegin", newBegin, "newEnd", newEnd) successMsgs, failedSeqs, err := db.getCacheMsgForGetMsgBySeqsRange(ctx, userID, conversationID, begin, seqs)
cachedMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs)
if err != nil { if err != nil {
if err != redis.Nil {
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
log.ZError(ctx, "get message from redis exception", err, "conversationID", conversationID, "seqs", seqs)
}
}
var successMsgs []*sdkws.MsgData
if len(cachedMsgs) > 0 {
delSeqs, err := db.cache.GetUserDelList(ctx, userID, conversationID)
if err != nil && errs.Unwrap(err) != redis.Nil {
return 0, 0, nil, err return 0, 0, nil, err
} }
var cacheDelNum int // log out debug info
for _, msg := range cachedMsgs {
if !utils.Contain(msg.Seq, delSeqs...) {
successMsgs = append(successMsgs, msg)
} else {
cacheDelNum += 1
}
}
log.ZDebug(ctx, "get delSeqs from redis", "delSeqs", delSeqs, "userID", userID, "conversationID", conversationID, "cacheDelNum", cacheDelNum)
var reGetSeqsCache []int64
for i := 1; i <= cacheDelNum; {
newSeq := newBegin - int64(i)
if newSeq >= begin {
if !utils.Contain(newSeq, delSeqs...) {
log.ZDebug(ctx, "seq del in cache, a new seq in range append", "new seq", newSeq)
reGetSeqsCache = append(reGetSeqsCache, newSeq)
i++
}
} else {
break
}
}
if len(reGetSeqsCache) > 0 {
log.ZDebug(ctx, "reGetSeqsCache", "reGetSeqsCache", reGetSeqsCache)
cachedMsgs, failedSeqs2, err := db.cache.GetMessagesBySeq(ctx, conversationID, reGetSeqsCache)
if err != nil {
if err != redis.Nil {
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs2))
log.ZError(ctx, "get message from redis exception", err, "conversationID", conversationID, "seqs", reGetSeqsCache)
}
}
failedSeqs = append(failedSeqs, failedSeqs2...)
successMsgs = append(successMsgs, cachedMsgs...)
}
}
log.ZDebug(ctx, "get msgs from cache", "successMsgs", successMsgs) log.ZDebug(ctx, "get msgs from cache", "successMsgs", successMsgs)
if len(failedSeqs) != 0 { if len(failedSeqs) != 0 {
log.ZDebug(ctx, "msgs not exist in redis", "seqs", failedSeqs) log.ZDebug(ctx, "msgs not exist in redis", "seqs", failedSeqs)
} }
// get from cache or db // if not found in cache,find in mongo
prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs)) prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
if len(failedSeqs) > 0 { if len(failedSeqs) > 0 {
mongoMsgs, err := db.getMsgBySeqsRange(ctx, userID, conversationID, failedSeqs, begin, end) mongoMsgs, err := db.getMsgBySeqsRange(ctx, userID, conversationID, failedSeqs, begin, end)
if err != nil { if err != nil {
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs)) prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
return 0, 0, nil, err return 0, 0, nil, err
} }
prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs)) prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs))
@ -559,15 +622,15 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (int64, int64, []*sdkws.MsgData, error) { func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (int64, int64, []*sdkws.MsgData, error) {
userMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID) userMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && !errors.Is(err, redis.Nil) {
return 0, 0, nil, err return 0, 0, nil, err
} }
minSeq, err := db.cache.GetMinSeq(ctx, conversationID) minSeq, err := db.cache.GetMinSeq(ctx, conversationID)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && !errors.Is(err, redis.Nil) {
return 0, 0, nil, err return 0, 0, nil, err
} }
maxSeq, err := db.cache.GetMaxSeq(ctx, conversationID) maxSeq, err := db.cache.GetMaxSeq(ctx, conversationID)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && !errors.Is(err, redis.Nil) {
return 0, 0, nil, err return 0, 0, nil, err
} }
if userMinSeq < minSeq { if userMinSeq < minSeq {
@ -581,7 +644,7 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co
} }
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, newSeqs) successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, newSeqs)
if err != nil { if err != nil {
if err != redis.Nil { if !errors.Is(err, redis.Nil) {
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
log.ZError(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID) log.ZError(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID)
} }
@ -607,11 +670,13 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co
mongoMsgs, err := db.getMsgBySeqs(ctx, userID, conversationID, failedSeqs) mongoMsgs, err := db.getMsgBySeqs(ctx, userID, conversationID, failedSeqs)
if err != nil { if err != nil {
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs)) prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
return 0, 0, nil, err return 0, 0, nil, err
} }
prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs)) prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs))
successMsgs = append(successMsgs, mongoMsgs...) successMsgs = append(successMsgs, mongoMsgs...)
} }
return minSeq, maxSeq, successMsgs, nil return minSeq, maxSeq, successMsgs, nil
} }
@ -632,30 +697,13 @@ func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Cont
log.ZWarn(ctx, "CleanUpOneUserAllMsg", err, "conversationID", conversationID) log.ZWarn(ctx, "CleanUpOneUserAllMsg", err, "conversationID", conversationID)
} }
} }
return db.cache.SetMinSeq(ctx, conversationID, minSeq) return db.cache.SetMinSeq(ctx, conversationID, minSeq)
} }
func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) { func processMsgDocModel(ctx context.Context, msgDocModel *unrelationtb.MsgDocModel, userID, conversationID string, index int64, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, over bool) {
var index int64
for {
// from oldest 2 newest
msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1)
if err != nil || msgDocModel.DocID == "" {
if err != nil {
if err == unrelation.ErrMsgListNotExist {
log.ZDebug(ctx, "not doc find", "conversationID", conversationID, "userID", userID, "index", index)
} else {
log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index)
}
}
// 获取报错或者获取不到了物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList), 结束递归
break
}
index++
//&& msgDocModel.Msg[0].Msg.SendTime > lastMsgDestructTime.UnixMilli()
if len(msgDocModel.Msg) > 0 { if len(msgDocModel.Msg) > 0 {
i := 0 i := 0
var over bool
for _, msg := range msgDocModel.Msg { for _, msg := range msgDocModel.Msg {
i++ i++
if msg != nil && msg.Msg != nil && msg.Msg.SendTime+destructTime*1000 <= time.Now().UnixMilli() { if msg != nil && msg.Msg != nil && msg.Msg.SendTime+destructTime*1000 <= time.Now().UnixMilli() {
@ -665,20 +713,50 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string
} else { } else {
log.ZDebug(ctx, "all msg need destruct is found", "conversationID", conversationID, "userID", userID, "index", index, "stop index", i) log.ZDebug(ctx, "all msg need destruct is found", "conversationID", conversationID, "userID", userID, "index", index, "stop index", i)
over = true over = true
break
return seqs, over
} }
} }
if over { }
return seqs, over
}
func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) {
var index int64
// refresh msg list
for {
// from oldest to newest
msgDocModel, err2 := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1)
if err2 != nil || msgDocModel.DocID == "" {
if err2 != nil {
if errors.Is(err2, unrelation.ErrMsgListNotExist) {
log.ZDebug(ctx, "not doc find", "conversationID", conversationID, "userID", userID, "index", index)
} else {
log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err2, "conversationID", conversationID, "index", index)
}
}
// If there is an error or no message document is found, delete the message physically and return the sequence number, then end the recursion.
break break
} }
index++
//&& msgDocModel.Msg[0].Msg.SendTime > lastMsgDestructTime.UnixMilli()
curSeqs, over := processMsgDocModel(ctx, msgDocModel, userID, conversationID, index, destructTime, lastMsgDestructTime)
seqs = append(seqs, curSeqs...)
if over {
break
} }
} }
// Log the result of the function call.
log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversationID, "userID", userID, "seqs", seqs) log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversationID, "userID", userID, "seqs", seqs)
if len(seqs) > 0 { if len(seqs) == 0 {
return seqs, nil
}
// if len(seqs) > 0
userMinSeq := seqs[len(seqs)-1] + 1 userMinSeq := seqs[len(seqs)-1] + 1
currentUserMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID) currentUserMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && !errors.Is(err, redis.Nil) {
return nil, err return nil, err
} }
if currentUserMinSeq < userMinSeq { if currentUserMinSeq < userMinSeq {
@ -686,7 +764,7 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string
return nil, err return nil, err
} }
} }
}
return seqs, nil return seqs, nil
} }
@ -709,28 +787,43 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio
msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1) msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1)
if err != nil || msgDocModel.DocID == "" { if err != nil || msgDocModel.DocID == "" {
if err != nil { if err != nil {
if err == unrelation.ErrMsgListNotExist { if errors.Is(err, unrelation.ErrMsgListNotExist) {
log.ZDebug(ctx, "deleteMsgRecursion ErrMsgListNotExist", "conversationID", conversationID, "index:", index) log.ZDebug(ctx, "deleteMsgRecursion ErrMsgListNotExist", "conversationID", conversationID, "index:", index)
} else { } else {
log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index) log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index)
} }
} }
// 获取报错或者获取不到了物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList), 结束递归 // get error or miss content, delete physically and return minSeq,delMongoMsgsPhysical(delStruct.delDocIDList), end recursion
err = db.msgDocDatabase.DeleteDocs(ctx, delStruct.delDocIDs) err = db.msgDocDatabase.DeleteDocs(ctx, delStruct.delDocIDs)
if err != nil { if err != nil {
return 0, err return 0, err
} }
return delStruct.getSetMinSeq() + 1, nil return delStruct.getSetMinSeq() + 1, nil
} }
log.ZDebug(ctx, "doc info", "conversationID", conversationID, "index", index, "docID", msgDocModel.DocID, "len", len(msgDocModel.Msg)) log.ZDebug(ctx, "doc info", "conversationID", conversationID, "index", index, "docID", msgDocModel.DocID, "len", len(msgDocModel.Msg))
if int64(len(msgDocModel.Msg)) > db.msg.GetSingleGocMsgNum() { if int64(len(msgDocModel.Msg)) > db.msg.GetSingleGocMsgNum() {
log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgDocModel.Msg), "docID:", msgDocModel.DocID) log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgDocModel.Msg), "docID:", msgDocModel.DocID)
} }
if msgDocModel.IsFull() && msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() { fullAndExpired := msgDocModel.IsFull() && msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill()
if fullAndExpired {
handleFullAndExpiredForDeleteMsgRecursion(ctx, msgDocModel, delStruct)
} else {
handleNotFullAndExpiredForDeleteMsgRecursion(ctx, msgDocModel, remainTime, index, conversationID, delStruct, db)
}
seq, err := db.deleteMsgRecursion(ctx, conversationID, index+1, delStruct, remainTime)
return seq, err
}
func handleFullAndExpiredForDeleteMsgRecursion(ctx context.Context, msgDocModel *unrelationtb.MsgDocModel, delStruct *delMsgRecursionStruct) {
log.ZDebug(ctx, "doc is full and all msg is expired", "docID", msgDocModel.DocID) log.ZDebug(ctx, "doc is full and all msg is expired", "docID", msgDocModel.DocID)
delStruct.delDocIDs = append(delStruct.delDocIDs, msgDocModel.DocID) delStruct.delDocIDs = append(delStruct.delDocIDs, msgDocModel.DocID)
delStruct.minSeq = msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.Seq delStruct.minSeq = msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.Seq
} else { }
func handleNotFullAndExpiredForDeleteMsgRecursion(ctx context.Context, msgDocModel *unrelationtb.MsgDocModel, remainTime, index int64, conversationID string, delStruct *delMsgRecursionStruct, db *commonMsgDatabase) {
var delMsgIndexs []int var delMsgIndexs []int
for i, MsgInfoModel := range msgDocModel.Msg { for i, MsgInfoModel := range msgDocModel.Msg {
if MsgInfoModel != nil && MsgInfoModel.Msg != nil { if MsgInfoModel != nil && MsgInfoModel.Msg != nil {
@ -740,15 +833,13 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio
} }
} }
if len(delMsgIndexs) > 0 { if len(delMsgIndexs) > 0 {
if err := db.msgDocDatabase.DeleteMsgsInOneDocByIndex(ctx, msgDocModel.DocID, delMsgIndexs); err != nil { err2 := db.msgDocDatabase.DeleteMsgsInOneDocByIndex(ctx, msgDocModel.DocID, delMsgIndexs)
log.ZError(ctx, "deleteMsgRecursion DeleteMsgsInOneDocByIndex failed", err, "conversationID", conversationID, "index", index) if err2 != nil {
log.ZError(ctx, "deleteMsgRecursion DeleteMsgsInOneDocByIndex failed", err2, "conversationID", conversationID, "index", index)
} }
delStruct.minSeq = int64(msgDocModel.Msg[delMsgIndexs[len(delMsgIndexs)-1]].Msg.Seq) delStruct.minSeq = msgDocModel.Msg[delMsgIndexs[len(delMsgIndexs)-1]].Msg.Seq
} }
} }
seq, err := db.deleteMsgRecursion(ctx, conversationID, index+1, delStruct, remainTime)
return seq, err
}
func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error { func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error {
if err := db.cache.DeleteMessages(ctx, conversationID, allSeqs); err != nil { if err := db.cache.DeleteMessages(ctx, conversationID, allSeqs); err != nil {
@ -763,13 +854,15 @@ func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conve
return err return err
} }
} }
return nil return nil
} }
func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error { func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error {
cachedMsgs, _, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs) cachedMsgs, _, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && errors.Is(err, redis.Nil) {
log.ZWarn(ctx, "DeleteUserMsgsBySeqs", err, "conversationID", conversationID, "seqs", seqs) log.ZWarn(ctx, "DeleteUserMsgsBySeqs", err, "conversationID", conversationID, "seqs", seqs)
return err return err
} }
if len(cachedMsgs) > 0 { if len(cachedMsgs) > 0 {
@ -789,6 +882,7 @@ func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID st
} }
} }
} }
return nil return nil
} }
@ -800,11 +894,12 @@ func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, u
for _, conversationID := range conversationIDs { for _, conversationID := range conversationIDs {
maxSeq, err := db.cache.GetMaxSeq(ctx, conversationID) maxSeq, err := db.cache.GetMaxSeq(ctx, conversationID)
if err != nil { if err != nil {
if err == redis.Nil { if errors.Is(err, redis.Nil) {
log.ZInfo(ctx, "max seq is nil", "conversationID", conversationID) log.ZInfo(ctx, "max seq is nil", "conversationID", conversationID)
} else { } else {
log.ZError(ctx, "get max seq failed", err, "conversationID", conversationID) log.ZError(ctx, "get max seq failed", err, "conversationID", conversationID)
} }
continue continue
} }
if err := db.cache.SetMinSeq(ctx, conversationID, maxSeq+1); err != nil { if err := db.cache.SetMinSeq(ctx, conversationID, maxSeq+1); err != nil {
@ -898,6 +993,7 @@ func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context
if err != nil { if err != nil {
return return
} }
return return
} }
@ -916,6 +1012,7 @@ func (db *commonMsgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversation
return return
} }
maxSeqMongo = newestMsgMongo.Msg.Seq maxSeqMongo = newestMsgMongo.Msg.Seq
return return
} }
@ -943,7 +1040,7 @@ func (db *commonMsgDatabase) RangeGroupSendCount(
} }
func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int32, msgData []*sdkws.MsgData, err error) { func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int32, msgData []*sdkws.MsgData, err error) {
var totalMsgs []*sdkws.MsgData totalMsgs := make([]*sdkws.MsgData, 0)
total, msgs, err := db.msgDocDatabase.SearchMessage(ctx, req) total, msgs, err := db.msgDocDatabase.SearchMessage(ctx, req)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
@ -954,6 +1051,7 @@ func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.Searc
} }
totalMsgs = append(totalMsgs, convert.MsgDB2Pb(msg.Msg)) totalMsgs = append(totalMsgs, convert.MsgDB2Pb(msg.Msg))
} }
return total, totalMsgs, nil return total, totalMsgs, nil
} }

@ -162,6 +162,7 @@ func GetDB() *commonMsgDatabase {
if err != nil { if err != nil {
panic(err) panic(err)
} }
return &commonMsgDatabase{ return &commonMsgDatabase{
msgDocDatabase: unrelation.NewMsgMongoDriver(mongo.GetDatabase()), msgDocDatabase: unrelation.NewMsgMongoDriver(mongo.GetDatabase()),
} }

@ -89,5 +89,6 @@ func (s *s3Database) AccessURL(ctx context.Context, name string, expire time.Dur
if err != nil { if err != nil {
return time.Time{}, "", err return time.Time{}, "", err
} }
return expireTime, rawURL, nil return expireTime, rawURL, nil
} }

@ -90,6 +90,7 @@ func (u *userDatabase) InitOnce(ctx context.Context, users []*relation.UserModel
if len(miss) > 0 { if len(miss) > 0 {
_ = u.userDB.Create(ctx, miss) _ = u.userDB.Create(ctx, miss)
} }
return nil return nil
} }
@ -102,30 +103,35 @@ func (u *userDatabase) FindWithError(ctx context.Context, userIDs []string) (use
if len(users) != len(userIDs) { if len(users) != len(userIDs) {
err = errs.ErrRecordNotFound.Wrap("userID not found") err = errs.ErrRecordNotFound.Wrap("userID not found")
} }
return return
} }
// Find Get the information of the specified user. If the userID is not found, no error will be returned. // Find Get the information of the specified user. If the userID is not found, no error will be returned.
func (u *userDatabase) Find(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) { func (u *userDatabase) Find(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) {
users, err = u.cache.GetUsersInfo(ctx, userIDs) users, err = u.cache.GetUsersInfo(ctx, userIDs)
return return
} }
// Create Insert multiple external guarantees that the userID is not repeated and does not exist in the db. // Create Insert multiple external guarantees that the userID is not repeated and does not exist in the db.
func (u *userDatabase) Create(ctx context.Context, users []*relation.UserModel) (err error) { func (u *userDatabase) Create(ctx context.Context, users []*relation.UserModel) (err error) {
if err := u.tx.Transaction(func(tx any) error { err = u.tx.Transaction(func(tx any) error {
err = u.userDB.Create(ctx, users) err = u.userDB.Create(ctx, users)
if err != nil { if err != nil {
return err return err
} }
return nil return nil
}); err != nil { })
if err != nil {
return err return err
} }
var userIDs []string userIDs := make([]string, 0, len(users))
for _, user := range users { for _, user := range users {
userIDs = append(userIDs, user.UserID) userIDs = append(userIDs, user.UserID)
} }
return u.cache.DelUsersInfo(userIDs...).ExecDel(ctx) return u.cache.DelUsersInfo(userIDs...).ExecDel(ctx)
} }
@ -134,6 +140,7 @@ func (u *userDatabase) Update(ctx context.Context, user *relation.UserModel) (er
if err := u.userDB.Update(ctx, user); err != nil { if err := u.userDB.Update(ctx, user); err != nil {
return err return err
} }
return u.cache.DelUsersInfo(user.UserID).ExecDel(ctx) return u.cache.DelUsersInfo(user.UserID).ExecDel(ctx)
} }
@ -142,6 +149,7 @@ func (u *userDatabase) UpdateByMap(ctx context.Context, userID string, args map[
if err := u.userDB.UpdateByMap(ctx, userID, args); err != nil { if err := u.userDB.UpdateByMap(ctx, userID, args); err != nil {
return err return err
} }
return u.cache.DelUsersInfo(userID).ExecDel(ctx) return u.cache.DelUsersInfo(userID).ExecDel(ctx)
} }
@ -162,6 +170,7 @@ func (u *userDatabase) IsExist(ctx context.Context, userIDs []string) (exist boo
if len(users) > 0 { if len(users) > 0 {
return true, nil return true, nil
} }
return false, nil return false, nil
} }
@ -183,12 +192,14 @@ func (u *userDatabase) CountRangeEverydayTotal(ctx context.Context, start time.T
// SubscribeUsersStatus Subscribe or unsubscribe a user's presence status. // SubscribeUsersStatus Subscribe or unsubscribe a user's presence status.
func (u *userDatabase) SubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error { func (u *userDatabase) SubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error {
err := u.mongoDB.AddSubscriptionList(ctx, userID, userIDs) err := u.mongoDB.AddSubscriptionList(ctx, userID, userIDs)
return err return err
} }
// UnsubscribeUsersStatus unsubscribe a user's presence status. // UnsubscribeUsersStatus unsubscribe a user's presence status.
func (u *userDatabase) UnsubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error { func (u *userDatabase) UnsubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error {
err := u.mongoDB.UnsubscriptionList(ctx, userID, userIDs) err := u.mongoDB.UnsubscriptionList(ctx, userID, userIDs)
return err return err
} }
@ -198,6 +209,7 @@ func (u *userDatabase) GetAllSubscribeList(ctx context.Context, userID string) (
if err != nil { if err != nil {
return nil, err return nil, err
} }
return list, nil return list, nil
} }
@ -207,12 +219,14 @@ func (u *userDatabase) GetSubscribedList(ctx context.Context, userID string) ([]
if err != nil { if err != nil {
return nil, err return nil, err
} }
return list, nil return list, nil
} }
// GetUserStatus get user status. // GetUserStatus get user status.
func (u *userDatabase) GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) { func (u *userDatabase) GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) {
onlineStatusList, err := u.cache.GetUserStatus(ctx, userIDs) onlineStatusList, err := u.cache.GetUserStatus(ctx, userIDs)
return onlineStatusList, err return onlineStatusList, err
} }

Loading…
Cancel
Save