From e7e48a8c746d5e03ad088029713276250506ea5c Mon Sep 17 00:00:00 2001 From: cncsmonster Date: Mon, 23 Oct 2023 00:11:45 +0800 Subject: [PATCH] fix: fix lint errors in pkg/common/db/controller --- pkg/common/db/controller/auth.go | 7 +- pkg/common/db/controller/black.go | 4 + pkg/common/db/controller/conversation.go | 19 +- pkg/common/db/controller/friend.go | 51 ++- pkg/common/db/controller/group.go | 26 ++ pkg/common/db/controller/msg.go | 448 ++++++++++++++--------- pkg/common/db/controller/msg_test.go | 1 + pkg/common/db/controller/s3.go | 1 + pkg/common/db/controller/user.go | 20 +- 9 files changed, 373 insertions(+), 204 deletions(-) diff --git a/pkg/common/db/controller/auth.go b/pkg/common/db/controller/auth.go index 17b4a440d..13d06a964 100644 --- a/pkg/common/db/controller/auth.go +++ b/pkg/common/db/controller/auth.go @@ -69,9 +69,9 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI } } if len(deleteTokenKey) != 0 { - err := a.cache.DeleteTokenByUidPid(ctx, userID, platformID, deleteTokenKey) - if err != nil { - return "", err + err2 := a.cache.DeleteTokenByUidPid(ctx, userID, platformID, deleteTokenKey) + if err2 != nil { + return "", err2 } } claims := tokenverify.BuildClaims(userID, platformID, a.accessExpire) @@ -80,5 +80,6 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI if err != nil { return "", utils.Wrap(err, "") } + return tokenString, a.cache.AddTokenFlag(ctx, userID, platformID, tokenString, constant.NormalToken) } diff --git a/pkg/common/db/controller/black.go b/pkg/common/db/controller/black.go index 70e942a77..38147e4e9 100644 --- a/pkg/common/db/controller/black.go +++ b/pkg/common/db/controller/black.go @@ -55,6 +55,7 @@ func (b *blackDatabase) Create(ctx context.Context, blacks []*relation.BlackMode if err := b.black.Create(ctx, blacks); err != nil { return err } + return b.deleteBlackIDsCache(ctx, blacks) } @@ -63,6 +64,7 @@ func (b *blackDatabase) Delete(ctx context.Context, blacks []*relation.BlackMode if err := b.black.Delete(ctx, blacks); err != nil { return err } + return b.deleteBlackIDsCache(ctx, blacks) } @@ -71,6 +73,7 @@ func (b *blackDatabase) deleteBlackIDsCache(ctx context.Context, blacks []*relat for _, black := range blacks { cache = cache.DelBlackIDs(ctx, black.OwnerUserID) } + return cache.ExecDel(ctx) } @@ -97,6 +100,7 @@ func (b *blackDatabase) CheckIn( return } log.ZDebug(ctx, "blackIDs", "user1BlackIDs", userID1BlackIDs, "user2BlackIDs", userID2BlackIDs) + return utils.IsContain(userID2, userID1BlackIDs), utils.IsContain(userID1, userID2BlackIDs), nil } diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go index c3dd6980e..e68eb25ba 100644 --- a/pkg/common/db/controller/conversation.go +++ b/pkg/common/db/controller/conversation.go @@ -99,8 +99,8 @@ func (c *conversationDatabase) SetUsersConversationFiledTx(ctx context.Context, now := time.Now() for _, v := range NotUserIDs { temp := new(relationtb.ConversationModel) - if err := utils.CopyStructFields(temp, conversation); err != nil { - return err + if err2 := utils.CopyStructFields(temp, conversation); err2 != nil { + return err2 } temp.OwnerUserID = v temp.CreateTime = now @@ -113,10 +113,12 @@ func (c *conversationDatabase) SetUsersConversationFiledTx(ctx context.Context, } cache = cache.DelConversationIDs(NotUserIDs...).DelUserConversationIDsHash(NotUserIDs...).DelConversations(conversation.ConversationID, NotUserIDs...) } + return nil }); err != nil { return err } + return cache.ExecDel(ctx) } @@ -130,6 +132,7 @@ func (c *conversationDatabase) UpdateUsersConversationFiled(ctx context.Context, if _, ok := args["recv_msg_opt"]; ok { cache = cache.DelConversationNotReceiveMessageUserIDs(conversationID) } + return cache.ExecDel(ctx) } @@ -137,13 +140,14 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat if err := c.conversationDB.Create(ctx, conversations); err != nil { return err } - var userIDs []string + userIDs := make([]string, 0, len(conversations)) cache := c.cache.NewCache() for _, conversation := range conversations { cache = cache.DelConversations(conversation.OwnerUserID, conversation.ConversationID) cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID) userIDs = append(userIDs, conversation.OwnerUserID) } + return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).ExecDel(ctx) } @@ -178,10 +182,12 @@ func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(ctx context.Con } } } + return nil }); err != nil { return err } + return cache.ExecDel(ctx) } @@ -234,12 +240,15 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs if err != nil { return err } - cache = cache.DelConversationIDs(ownerUserID).DelUserConversationIDsHash(ownerUserID).DelConversationNotReceiveMessageUserIDs(utils.Slice(notExistConversations, func(e *relationtb.ConversationModel) string { return e.ConversationID })...) + cache = cache.DelConversationIDs(ownerUserID).DelUserConversationIDsHash(ownerUserID) + cache = cache.DelConversationNotReceiveMessageUserIDs(utils.Slice(notExistConversations, func(e *relationtb.ConversationModel) string { return e.ConversationID })...) } + return nil }); err != nil { return err } + return cache.ExecDel(ctx) } @@ -276,10 +285,12 @@ func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context, for _, v := range existConversationUserIDs { cache = cache.DelConversations(v, conversationID) } + return nil }); err != nil { return err } + return cache.ExecDel(ctx) } diff --git a/pkg/common/db/controller/friend.go b/pkg/common/db/controller/friend.go index 7816ef935..f35d6728b 100644 --- a/pkg/common/db/controller/friend.go +++ b/pkg/common/db/controller/friend.go @@ -16,6 +16,7 @@ package controller import ( "context" + "errors" "time" "gorm.io/gorm" @@ -109,6 +110,7 @@ func (f *friendDatabase) CheckIn( if err != nil { return } + return utils.IsContain(userID2, userID1FriendIDs), utils.IsContain(userID1, userID2FriendIDs), nil } @@ -121,8 +123,8 @@ func (f *friendDatabase) AddFriendRequest( ) (err error) { return f.tx.Transaction(func(tx any) error { _, err := f.friendRequest.NewTx(tx).Take(ctx, fromUserID, toUserID) - // 有db错误 - if err != nil && errs.Unwrap(err) != gorm.ErrRecordNotFound { + // if there is a db error + if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { return err } // 无错误 则更新 @@ -136,12 +138,14 @@ func (f *friendDatabase) AddFriendRequest( if err := f.friendRequest.NewTx(tx).UpdateByMap(ctx, fromUserID, toUserID, m); err != nil { return err } + return nil } // gorm.ErrRecordNotFound 错误,则新增 if err := f.friendRequest.NewTx(tx).Create(ctx, []*relation.FriendRequestModel{{FromUserID: fromUserID, ToUserID: toUserID, ReqMsg: reqMsg, Ex: ex, CreateTime: time.Now(), HandleTime: time.Unix(0, 0)}}); err != nil { return err } + return nil }) } @@ -154,11 +158,11 @@ func (f *friendDatabase) BecomeFriends( addSource int32, ) (err error) { cache := f.cache.NewCache() - if err := f.tx.Transaction(func(tx any) error { - // 先find 找出重复的 去掉重复的 - fs1, err := f.friend.NewTx(tx).FindFriends(ctx, ownerUserID, friendUserIDs) - if err != nil { - return err + fn := func(tx any) error { + // first,find and drop delete ones + fs1, err2 := f.friend.NewTx(tx).FindFriends(ctx, ownerUserID, friendUserIDs) + if err2 != nil { + return err2 } opUserID := mcontext.GetOperationID(ctx) for _, v := range friendUserIDs { @@ -168,13 +172,13 @@ func (f *friendDatabase) BecomeFriends( return e.FriendUserID }) - err = f.friend.NewTx(tx).Create(ctx, fs11) - if err != nil { - return err + err2 = f.friend.NewTx(tx).Create(ctx, fs11) + if err2 != nil { + return err2 } - fs2, err := f.friend.NewTx(tx).FindReversalFriends(ctx, ownerUserID, friendUserIDs) - if err != nil { - return err + fs2, err2 := f.friend.NewTx(tx).FindReversalFriends(ctx, ownerUserID, friendUserIDs) + if err2 != nil { + return err2 } var newFriendIDs []string for _, v := range friendUserIDs { @@ -184,16 +188,20 @@ func (f *friendDatabase) BecomeFriends( fs22 := utils.DistinctAny(fs2, func(e *relation.FriendModel) string { return e.OwnerUserID }) - err = f.friend.NewTx(tx).Create(ctx, fs22) - if err != nil { - return err + err2 = f.friend.NewTx(tx).Create(ctx, fs22) + if err2 != nil { + return err2 } newFriendIDs = append(newFriendIDs, ownerUserID) cache = cache.DelFriendIDs(newFriendIDs...) + return nil - }); err != nil { - return nil } + err = f.tx.Transaction(fn) + if err != nil { + return err + } + return cache.ExecDel(ctx) } @@ -216,6 +224,7 @@ func (f *friendDatabase) RefuseFriendRequest( if err != nil { return err } + return nil } @@ -251,7 +260,7 @@ func (f *friendDatabase) AgreeFriendRequest( if err != nil { return err } - } else if err != nil && errs.Unwrap(err) != gorm.ErrRecordNotFound { + } else if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { return err } @@ -290,6 +299,7 @@ func (f *friendDatabase) AgreeFriendRequest( return err } } + return f.cache.DelFriendIDs(friendRequest.ToUserID, friendRequest.FromUserID).ExecDel(ctx) }) } @@ -299,6 +309,7 @@ func (f *friendDatabase) Delete(ctx context.Context, ownerUserID string, friendU if err := f.friend.Delete(ctx, ownerUserID, friendUserIDs); err != nil { return err } + return f.cache.DelFriendIDs(append(friendUserIDs, ownerUserID)...).ExecDel(ctx) } @@ -307,6 +318,7 @@ func (f *friendDatabase) UpdateRemark(ctx context.Context, ownerUserID, friendUs if err := f.friend.UpdateRemark(ctx, ownerUserID, friendUserID, remark); err != nil { return err } + return f.cache.DelFriend(ownerUserID, friendUserID).ExecDel(ctx) } @@ -359,6 +371,7 @@ func (f *friendDatabase) FindFriendsWithError( if len(friends) != len(friendUserIDs) { err = errs.ErrRecordNotFound.Wrap() } + return } diff --git a/pkg/common/db/controller/group.go b/pkg/common/db/controller/group.go index 194f3e8b2..0788429a8 100644 --- a/pkg/common/db/controller/group.go +++ b/pkg/common/db/controller/group.go @@ -102,6 +102,7 @@ func NewGroupDatabase( cache: cache, mongoDB: superGroup, } + return database } @@ -109,6 +110,7 @@ func InitGroupDatabase(db *gorm.DB, rdb redis.UniversalClient, database *mongo.D rcOptions := rockscache.NewDefaultOptions() rcOptions.StrongConsistency = true rcOptions.RandomExpireAdjustment = 0.2 + return NewGroupDatabase( relation.NewGroupDB(db), relation.NewGroupMemberDB(db), @@ -151,6 +153,7 @@ func (g *groupDatabase) FindGroupMemberNum(ctx context.Context, groupID string) if err != nil { return 0, err } + return uint32(num), nil } @@ -184,10 +187,12 @@ func (g *groupDatabase) CreateGroup( cache = cache.DelJoinedGroupID(groupMember.UserID).DelGroupMembersInfo(groupMember.GroupID, groupMember.UserID) } cache = cache.DelGroupsInfo(createGroupIDs...) + return nil }); err != nil { return err } + return cache.ExecDel(ctx) } @@ -211,6 +216,7 @@ func (g *groupDatabase) UpdateGroup(ctx context.Context, groupID string, data ma if err := g.groupDB.UpdateMap(ctx, groupID, data); err != nil { return err } + return g.cache.DelGroupsInfo(groupID).ExecDel(ctx) } @@ -231,10 +237,12 @@ func (g *groupDatabase) DismissGroup(ctx context.Context, groupID string, delete cache = cache.DelJoinedGroupID(userIDs...).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelGroupMembersHash(groupID) } cache = cache.DelGroupsInfo(groupID) + return nil }); err != nil { return err } + return cache.ExecDel(ctx) } @@ -276,6 +284,7 @@ func (g *groupDatabase) FindGroupMember(ctx context.Context, groupIDs []string, } res = append(res, v) } + return res, nil } if len(roleLevels) == 0 { @@ -286,8 +295,10 @@ func (g *groupDatabase) FindGroupMember(ctx context.Context, groupIDs []string, } totalGroupMembers = append(totalGroupMembers, groupMembers...) } + return totalGroupMembers, nil } + return g.groupMemberDB.Find(ctx, groupIDs, userIDs, roleLevels) } @@ -307,6 +318,7 @@ func (g *groupDatabase) PageGetJoinGroup( } totalGroupMembers = append(totalGroupMembers, groupMembers...) } + return uint32(len(groupIDs)), totalGroupMembers, nil } @@ -327,6 +339,7 @@ func (g *groupDatabase) PageGetGroupMember( if err != nil { return 0, nil, err } + return uint32(len(groupMemberIDs)), members, nil } @@ -378,6 +391,7 @@ func (g *groupDatabase) HandlerGroupRequest( return err } } + return nil }) } @@ -386,6 +400,7 @@ func (g *groupDatabase) DeleteGroupMember(ctx context.Context, groupID string, u if err := g.groupMemberDB.Delete(ctx, groupID, userIDs); err != nil { return err } + return g.cache.DelGroupMembersHash(groupID). DelGroupMemberIDs(groupID). DelGroupsMemberNum(groupID). @@ -410,6 +425,7 @@ func (g *groupDatabase) MapGroupMemberNum(ctx context.Context, groupIDs []string } m[groupID] = uint32(num) } + return m, nil } @@ -429,6 +445,7 @@ func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string, if rowsAffected != 1 { return utils.Wrap(fmt.Errorf("newOwnerUserID %s rowsAffected = %d", newOwnerUserID, rowsAffected), "") } + return g.cache.DelGroupMembersInfo(groupID, oldOwnerUserID, newOwnerUserID).DelGroupMembersHash(groupID).ExecDel(ctx) }) } @@ -442,6 +459,7 @@ func (g *groupDatabase) UpdateGroupMember( if err := g.groupMemberDB.Update(ctx, groupID, userID, data); err != nil { return err } + return g.cache.DelGroupMembersInfo(groupID, userID).ExecDel(ctx) } @@ -454,10 +472,12 @@ func (g *groupDatabase) UpdateGroupMembers(ctx context.Context, data []*relation } cache = cache.DelGroupMembersInfo(item.GroupID, item.UserID) } + return nil }); err != nil { return err } + return cache.ExecDel(ctx) } @@ -469,6 +489,7 @@ func (g *groupDatabase) CreateGroupRequest(ctx context.Context, requests []*rela return err } } + return db.Create(ctx, requests) }) } @@ -504,6 +525,7 @@ func (g *groupDatabase) CreateSuperGroup(ctx context.Context, groupID string, in if err := g.mongoDB.CreateSuperGroup(ctx, groupID, initMemberIDs); err != nil { return err } + return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(initMemberIDs...).ExecDel(ctx) } @@ -521,10 +543,12 @@ func (g *groupDatabase) DeleteSuperGroup(ctx context.Context, groupID string) er if len(models) > 0 { cache = cache.DelJoinedSuperGroupIDs(models[0].MemberIDs...) } + return nil }); err != nil { return err } + return cache.ExecDel(ctx) } @@ -532,6 +556,7 @@ func (g *groupDatabase) DeleteSuperGroupMember(ctx context.Context, groupID stri if err := g.mongoDB.RemoverUserFromSuperGroup(ctx, groupID, userIDs); err != nil { return err } + return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx) } @@ -539,6 +564,7 @@ func (g *groupDatabase) CreateSuperGroupMember(ctx context.Context, groupID stri if err := g.mongoDB.AddUserToSuperGroup(ctx, groupID, userIDs); err != nil { return err } + return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx) } diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index af678f92c..1bbf4cdf6 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -135,6 +135,7 @@ func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database) cacheModel := cache.NewMsgCacheModel(rdb) msgDocModel := unrelation.NewMsgMongoDriver(database) CommonMsgDatabase := NewCommonMsgDatabase(msgDocModel, cacheModel) + return CommonMsgDatabase } @@ -150,14 +151,17 @@ type commonMsgDatabase struct { func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error { _, _, err := db.producer.SendMessage(ctx, key, msg2mq) + return err } func (db *commonMsgDatabase) MsgToModifyMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData) error { if len(messages) > 0 { _, _, err := db.producerToModify.SendMessage(ctx, key, &pbmsg.MsgDataToModifyByMQ{ConversationID: conversationID, Messages: messages}) + return err } + return nil } @@ -165,26 +169,26 @@ func (db *commonMsgDatabase) MsgToPushMQ(ctx context.Context, key, conversationI partition, offset, err := db.producerToPush.SendMessage(ctx, key, &pbmsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID}) if err != nil { log.ZError(ctx, "MsgToPushMQ", err, "key", key, "msg2mq", msg2mq) + return 0, 0, err } + return partition, offset, nil } func (db *commonMsgDatabase) MsgToMongoMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error { if len(messages) > 0 { _, _, err := db.producerToMongo.SendMessage(ctx, key, &pbmsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages}) + return err } + return nil } -func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error { - if len(fields) == 0 { - return nil - } - num := db.msg.GetSingleGocMsgNum() +func checkTypeForBatchInsertBlock(fields []any, key int8, firstSeq int64) error { // num = 100 - for i, field := range fields { // 检查类型 + for i, field := range fields { // check type var ok bool switch key { case updateKeyMsg: @@ -202,80 +206,106 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI return errs.ErrInternalServer.Wrap("field type is invalid") } } - // 返回值为true表示数据库存在该文档,false表示数据库不存在该文档 - updateMsgModel := func(seq int64, i int) (bool, error) { - var ( - res *mongo.UpdateResult - err error - ) - docID := db.msg.GetDocID(conversationID, seq) - index := db.msg.GetMsgIndex(seq) - field := fields[i] + + return nil +} + +func (db *commonMsgDatabase) updateMsgModelForBatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, seq int64, i int) (bool, error) { + var ( + res *mongo.UpdateResult + err error + ) + docID := db.msg.GetDocID(conversationID, seq) + index := db.msg.GetMsgIndex(seq) + field := fields[i] + switch key { + case updateKeyMsg: + res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", field) + case updateKeyRevoke: + res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", field) + } + if err != nil { + return false, err + } + + return res.MatchedCount > 0, nil +} + +func (db *commonMsgDatabase) newDocForBatchInsertBlock(conversationID string, fields []any, key int8, seq, firstSeq, num int64, i int) (unrelationtb.MsgDocModel, int) { + doc := unrelationtb.MsgDocModel{ + DocID: db.msg.GetDocID(conversationID, seq), + Msg: make([]*unrelationtb.MsgInfoModel, num), + } + var insert int // number of inserted + for j := i; j < len(fields); j++ { + seq = firstSeq + int64(j) + if db.msg.GetDocID(conversationID, seq) != doc.DocID { + break + } + insert++ switch key { case updateKeyMsg: - res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", field) + doc.Msg[db.msg.GetMsgIndex(seq)] = &unrelationtb.MsgInfoModel{ + Msg: fields[j].(*unrelationtb.MsgDataModel), + } case updateKeyRevoke: - res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", field) + doc.Msg[db.msg.GetMsgIndex(seq)] = &unrelationtb.MsgInfoModel{ + Revoke: fields[j].(*unrelationtb.RevokeModel), + } } - if err != nil { - return false, err + } + for i, model := range doc.Msg { + if model == nil { + model = &unrelationtb.MsgInfoModel{} + doc.Msg[i] = model + } + if model.DelList == nil { + doc.Msg[i].DelList = []string{} } - return res.MatchedCount > 0, nil + } + + return doc, insert +} + +func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error { + if len(fields) == 0 { + return nil + } + num := db.msg.GetSingleGocMsgNum() + // num = 100 + err := checkTypeForBatchInsertBlock(fields, key, firstSeq) + if err != nil { + return err } tryUpdate := true for i := 0; i < len(fields); i++ { - seq := firstSeq + int64(i) // 当前seq + seq := firstSeq + int64(i) // current seq + // try update if tryUpdate { - matched, err := updateMsgModel(seq, i) + matched, err := db.updateMsgModelForBatchInsertBlock(ctx, conversationID, fields, key, seq, i) if err != nil { return err } if matched { - continue // 匹配到了,继续下一个(不一定修改) - } - } - doc := unrelationtb.MsgDocModel{ - DocID: db.msg.GetDocID(conversationID, seq), - Msg: make([]*unrelationtb.MsgInfoModel, num), - } - var insert int // 插入的数量 - for j := i; j < len(fields); j++ { - seq = firstSeq + int64(j) - if db.msg.GetDocID(conversationID, seq) != doc.DocID { - break - } - insert++ - switch key { - case updateKeyMsg: - doc.Msg[db.msg.GetMsgIndex(seq)] = &unrelationtb.MsgInfoModel{ - Msg: fields[j].(*unrelationtb.MsgDataModel), - } - case updateKeyRevoke: - doc.Msg[db.msg.GetMsgIndex(seq)] = &unrelationtb.MsgInfoModel{ - Revoke: fields[j].(*unrelationtb.RevokeModel), - } - } - } - for i, model := range doc.Msg { - if model == nil { - model = &unrelationtb.MsgInfoModel{} - doc.Msg[i] = model - } - if model.DelList == nil { - doc.Msg[i].DelList = []string{} + continue // if matched,skip } } + doc, insert := db.newDocForBatchInsertBlock(conversationID, fields, key, seq, firstSeq, num, i) + // insert doc into db if err := db.msgDocDatabase.Create(ctx, &doc); err != nil { if mongo.IsDuplicateKeyError(err) { - i-- // 存在并发,重试当前数据 - tryUpdate = true // 以修改模式 + i-- // exists concurrent, + tryUpdate = true // try update + continue } + return err } - tryUpdate = false // 当前以插入成功,下一块优先插入模式 - i += insert - 1 // 跳过已插入的数据 + tryUpdate = false // if insert success,change to insert mode + i += insert - 1 // skip inserted data } + return nil } @@ -322,6 +352,7 @@ func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversatio Ex: msg.Ex, } } + return db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, msgList[0].Seq) } @@ -338,9 +369,11 @@ func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userI log.ZDebug(ctx, "MarkSingleChatMsgsAsRead", "userID", userID, "docID", docID, "indexes", indexes) if err := db.msgDocDatabase.MarkSingleChatMsgsAsRead(ctx, userID, docID, indexes); err != nil { log.ZError(ctx, "MarkSingleChatMsgsAsRead", err, "userID", userID, "docID", docID, "indexes", indexes) + return err } } + return nil } @@ -354,8 +387,9 @@ func (db *commonMsgDatabase) DelUserDeleteMsgsList(ctx context.Context, conversa func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { currentMaxSeq, err := db.cache.GetMaxSeq(ctx, conversationID) - if err != nil && errs.Unwrap(err) != redis.Nil { + if err != nil && !errors.Is(err, redis.Nil) { prome.Inc(prome.SeqGetFailedCounter) + return 0, false, err } prome.Inc(prome.SeqGetSuccessCounter) @@ -366,7 +400,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa if lenList < 1 { return 0, false, errors.New("too short as 0") } - if errs.Unwrap(err) == redis.Nil { + if errors.Is(err, redis.Nil) { isNew = true } lastMaxSeq := currentMaxSeq @@ -396,6 +430,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa } else { prome.Inc(prome.SeqSetSuccessCounter) } + return lastMaxSeq, isNew, utils.Wrap(err, "") } @@ -410,6 +445,7 @@ func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversat totalMsgs = append(totalMsgs, convert.MsgDB2Pb(msg.Msg)) } } + return totalMsgs, nil } @@ -420,6 +456,7 @@ func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID msg.Msg.IsRead = true } } + return msgs, err } @@ -438,16 +475,76 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin seqMsgs = append(seqMsgs, convert.MsgDB2Pb(msg.Msg)) } } + return seqMsgs, nil } +func (db *commonMsgDatabase) getCacheMsgForGetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin int64, seqs []int64) ([]*sdkws.MsgData, []int64, error) { + newBegin := seqs[0] + newEnd := seqs[len(seqs)-1] + log.ZDebug(ctx, "GetMsgBySeqsRange", "first seqs", seqs, "newBegin", newBegin, "newEnd", newEnd) + cachedMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs) + if err != nil { + if !errors.Is(err, redis.Nil) { + prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) + log.ZError(ctx, "get message from redis exception", err, "conversationID", conversationID, "seqs", seqs) + } + } + var successMsgs []*sdkws.MsgData + if len(cachedMsgs) == 0 { + return successMsgs, failedSeqs, err + } + // if len(cachedMsgs) > 0 + delSeqs, err2 := db.cache.GetUserDelList(ctx, userID, conversationID) + if err2 != nil && !errors.Is(err2, redis.Nil) { + return nil, nil, err2 + } + var cacheDelNum int + for _, msg := range cachedMsgs { + if !utils.Contain(msg.Seq, delSeqs...) { + successMsgs = append(successMsgs, msg) + } else { + cacheDelNum += 1 + } + } + log.ZDebug(ctx, "get delSeqs from redis", "delSeqs", delSeqs, "userID", userID, "conversationID", conversationID, "cacheDelNum", cacheDelNum) + var reGetSeqsCache []int64 + for i := 1; i <= cacheDelNum; { + newSeq := newBegin - int64(i) + if newSeq >= begin { + if !utils.Contain(newSeq, delSeqs...) { + log.ZDebug(ctx, "seq del in cache, a new seq in range append", "new seq", newSeq) + reGetSeqsCache = append(reGetSeqsCache, newSeq) + i++ + } + } else { + break + } + } + if len(reGetSeqsCache) > 0 { + log.ZDebug(ctx, "reGetSeqsCache", "reGetSeqsCache", reGetSeqsCache) + cachedMsgs, failedSeqs2, err2 := db.cache.GetMessagesBySeq(ctx, conversationID, reGetSeqsCache) + if err2 != nil { + if !errors.Is(err2, redis.Nil) { + prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs2)) + log.ZError(ctx, "get message from redis exception", err2, "conversationID", conversationID, "seqs", reGetSeqsCache) + } + } + failedSeqs = append(failedSeqs, failedSeqs2...) + successMsgs = append(successMsgs, cachedMsgs...) + } + + return successMsgs, failedSeqs, err +} + func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (int64, int64, []*sdkws.MsgData, error) { + // 从缓存中获取最小和最大序列号,并根据给定的范围值进行调整 userMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID) - if err != nil && errs.Unwrap(err) != redis.Nil { + if err != nil && !errors.Is(err, redis.Nil) { return 0, 0, nil, err } minSeq, err := db.cache.GetMinSeq(ctx, conversationID) - if err != nil && errs.Unwrap(err) != redis.Nil { + if err != nil && !errors.Is(err, redis.Nil) { return 0, 0, nil, err } if userMinSeq > minSeq { @@ -455,18 +552,25 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin } if minSeq > end { log.ZInfo(ctx, "minSeq > end", "minSeq", minSeq, "end", end) + return 0, 0, nil, nil } maxSeq, err := db.cache.GetMaxSeq(ctx, conversationID) - if err != nil && errs.Unwrap(err) != redis.Nil { + if err != nil && !errors.Is(err, redis.Nil) { return 0, 0, nil, err } + + // log out debug info log.ZDebug(ctx, "GetMsgBySeqsRange", "userMinSeq", userMinSeq, "conMinSeq", minSeq, "conMaxSeq", maxSeq, "userMaxSeq", userMaxSeq) + + // adjust maxSeq according to userMaxSeq if userMaxSeq != 0 { if userMaxSeq < maxSeq { maxSeq = userMaxSeq } } + + // adjust begin and end according to minSeq and maxSeq if begin < minSeq { begin = minSeq } @@ -476,6 +580,8 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin if end < begin { return 0, 0, nil, errs.ErrArgs.Wrap("seq end < begin") } + + // get seqs to search var seqs []int64 for i := end; i > end-num; i-- { if i >= begin { @@ -487,67 +593,24 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin if len(seqs) == 0 { return 0, 0, nil, nil } - newBegin := seqs[0] - newEnd := seqs[len(seqs)-1] - log.ZDebug(ctx, "GetMsgBySeqsRange", "first seqs", seqs, "newBegin", newBegin, "newEnd", newEnd) - cachedMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs) + + // get info from cache,and filter deleted msg + successMsgs, failedSeqs, err := db.getCacheMsgForGetMsgBySeqsRange(ctx, userID, conversationID, begin, seqs) if err != nil { - if err != redis.Nil { - prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) - log.ZError(ctx, "get message from redis exception", err, "conversationID", conversationID, "seqs", seqs) - } - } - var successMsgs []*sdkws.MsgData - if len(cachedMsgs) > 0 { - delSeqs, err := db.cache.GetUserDelList(ctx, userID, conversationID) - if err != nil && errs.Unwrap(err) != redis.Nil { - return 0, 0, nil, err - } - var cacheDelNum int - for _, msg := range cachedMsgs { - if !utils.Contain(msg.Seq, delSeqs...) { - successMsgs = append(successMsgs, msg) - } else { - cacheDelNum += 1 - } - } - log.ZDebug(ctx, "get delSeqs from redis", "delSeqs", delSeqs, "userID", userID, "conversationID", conversationID, "cacheDelNum", cacheDelNum) - var reGetSeqsCache []int64 - for i := 1; i <= cacheDelNum; { - newSeq := newBegin - int64(i) - if newSeq >= begin { - if !utils.Contain(newSeq, delSeqs...) { - log.ZDebug(ctx, "seq del in cache, a new seq in range append", "new seq", newSeq) - reGetSeqsCache = append(reGetSeqsCache, newSeq) - i++ - } - } else { - break - } - } - if len(reGetSeqsCache) > 0 { - log.ZDebug(ctx, "reGetSeqsCache", "reGetSeqsCache", reGetSeqsCache) - cachedMsgs, failedSeqs2, err := db.cache.GetMessagesBySeq(ctx, conversationID, reGetSeqsCache) - if err != nil { - if err != redis.Nil { - prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs2)) - log.ZError(ctx, "get message from redis exception", err, "conversationID", conversationID, "seqs", reGetSeqsCache) - } - } - failedSeqs = append(failedSeqs, failedSeqs2...) - successMsgs = append(successMsgs, cachedMsgs...) - } + return 0, 0, nil, err } + // log out debug info log.ZDebug(ctx, "get msgs from cache", "successMsgs", successMsgs) if len(failedSeqs) != 0 { log.ZDebug(ctx, "msgs not exist in redis", "seqs", failedSeqs) } - // get from cache or db + // if not found in cache,find in mongo prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs)) if len(failedSeqs) > 0 { mongoMsgs, err := db.getMsgBySeqsRange(ctx, userID, conversationID, failedSeqs, begin, end) if err != nil { prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs)) + return 0, 0, nil, err } prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs)) @@ -559,15 +622,15 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (int64, int64, []*sdkws.MsgData, error) { userMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID) - if err != nil && errs.Unwrap(err) != redis.Nil { + if err != nil && !errors.Is(err, redis.Nil) { return 0, 0, nil, err } minSeq, err := db.cache.GetMinSeq(ctx, conversationID) - if err != nil && errs.Unwrap(err) != redis.Nil { + if err != nil && !errors.Is(err, redis.Nil) { return 0, 0, nil, err } maxSeq, err := db.cache.GetMaxSeq(ctx, conversationID) - if err != nil && errs.Unwrap(err) != redis.Nil { + if err != nil && !errors.Is(err, redis.Nil) { return 0, 0, nil, err } if userMinSeq < minSeq { @@ -581,7 +644,7 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co } successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, newSeqs) if err != nil { - if err != redis.Nil { + if !errors.Is(err, redis.Nil) { prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) log.ZError(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID) } @@ -607,11 +670,13 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co mongoMsgs, err := db.getMsgBySeqs(ctx, userID, conversationID, failedSeqs) if err != nil { prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs)) + return 0, 0, nil, err } prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs)) successMsgs = append(successMsgs, mongoMsgs...) } + return minSeq, maxSeq, successMsgs, nil } @@ -632,61 +697,74 @@ func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Cont log.ZWarn(ctx, "CleanUpOneUserAllMsg", err, "conversationID", conversationID) } } + return db.cache.SetMinSeq(ctx, conversationID, minSeq) } -func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) { +func processMsgDocModel(ctx context.Context, msgDocModel *unrelationtb.MsgDocModel, userID, conversationID string, index int64, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, over bool) { + if len(msgDocModel.Msg) > 0 { + i := 0 + for _, msg := range msgDocModel.Msg { + i++ + if msg != nil && msg.Msg != nil && msg.Msg.SendTime+destructTime*1000 <= time.Now().UnixMilli() { + if msg.Msg.SendTime+destructTime*1000 > lastMsgDestructTime.UnixMilli() && !utils.Contain(userID, msg.DelList...) { + seqs = append(seqs, msg.Msg.Seq) + } + } else { + log.ZDebug(ctx, "all msg need destruct is found", "conversationID", conversationID, "userID", userID, "index", index, "stop index", i) + over = true + + return seqs, over + } + } + } + + return seqs, over +} + +func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) { var index int64 + + // refresh msg list for { - // from oldest 2 newest - msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1) - if err != nil || msgDocModel.DocID == "" { - if err != nil { - if err == unrelation.ErrMsgListNotExist { + // from oldest to newest + msgDocModel, err2 := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1) + if err2 != nil || msgDocModel.DocID == "" { + if err2 != nil { + if errors.Is(err2, unrelation.ErrMsgListNotExist) { log.ZDebug(ctx, "not doc find", "conversationID", conversationID, "userID", userID, "index", index) } else { - log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index) + log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err2, "conversationID", conversationID, "index", index) } } - // 获取报错,或者获取不到了,物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList), 结束递归 + // If there is an error or no message document is found, delete the message physically and return the sequence number, then end the recursion. break } index++ //&& msgDocModel.Msg[0].Msg.SendTime > lastMsgDestructTime.UnixMilli() - if len(msgDocModel.Msg) > 0 { - i := 0 - var over bool - for _, msg := range msgDocModel.Msg { - i++ - if msg != nil && msg.Msg != nil && msg.Msg.SendTime+destructTime*1000 <= time.Now().UnixMilli() { - if msg.Msg.SendTime+destructTime*1000 > lastMsgDestructTime.UnixMilli() && !utils.Contain(userID, msg.DelList...) { - seqs = append(seqs, msg.Msg.Seq) - } - } else { - log.ZDebug(ctx, "all msg need destruct is found", "conversationID", conversationID, "userID", userID, "index", index, "stop index", i) - over = true - break - } - } - if over { - break - } + curSeqs, over := processMsgDocModel(ctx, msgDocModel, userID, conversationID, index, destructTime, lastMsgDestructTime) + seqs = append(seqs, curSeqs...) + if over { + break } } - + // Log the result of the function call. log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversationID, "userID", userID, "seqs", seqs) - if len(seqs) > 0 { - userMinSeq := seqs[len(seqs)-1] + 1 - currentUserMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID) - if err != nil && errs.Unwrap(err) != redis.Nil { + if len(seqs) == 0 { + return seqs, nil + } + // if len(seqs) > 0 + userMinSeq := seqs[len(seqs)-1] + 1 + currentUserMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID) + if err != nil && !errors.Is(err, redis.Nil) { + return nil, err + } + if currentUserMinSeq < userMinSeq { + if err := db.cache.SetConversationUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil { return nil, err } - if currentUserMinSeq < userMinSeq { - if err := db.cache.SetConversationUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil { - return nil, err - } - } } + return seqs, nil } @@ -709,45 +787,58 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1) if err != nil || msgDocModel.DocID == "" { if err != nil { - if err == unrelation.ErrMsgListNotExist { + if errors.Is(err, unrelation.ErrMsgListNotExist) { log.ZDebug(ctx, "deleteMsgRecursion ErrMsgListNotExist", "conversationID", conversationID, "index:", index) } else { log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index) } } - // 获取报错,或者获取不到了,物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList), 结束递归 + // get error or miss content, delete physically and return minSeq,delMongoMsgsPhysical(delStruct.delDocIDList), end recursion err = db.msgDocDatabase.DeleteDocs(ctx, delStruct.delDocIDs) if err != nil { return 0, err } + return delStruct.getSetMinSeq() + 1, nil } + log.ZDebug(ctx, "doc info", "conversationID", conversationID, "index", index, "docID", msgDocModel.DocID, "len", len(msgDocModel.Msg)) if int64(len(msgDocModel.Msg)) > db.msg.GetSingleGocMsgNum() { log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgDocModel.Msg), "docID:", msgDocModel.DocID) } - if msgDocModel.IsFull() && msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() { - log.ZDebug(ctx, "doc is full and all msg is expired", "docID", msgDocModel.DocID) - delStruct.delDocIDs = append(delStruct.delDocIDs, msgDocModel.DocID) - delStruct.minSeq = msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.Seq + fullAndExpired := msgDocModel.IsFull() && msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() + if fullAndExpired { + handleFullAndExpiredForDeleteMsgRecursion(ctx, msgDocModel, delStruct) } else { - var delMsgIndexs []int - for i, MsgInfoModel := range msgDocModel.Msg { - if MsgInfoModel != nil && MsgInfoModel.Msg != nil { - if utils.GetCurrentTimestampByMill() > MsgInfoModel.Msg.SendTime+(remainTime*1000) { - delMsgIndexs = append(delMsgIndexs, i) - } + handleNotFullAndExpiredForDeleteMsgRecursion(ctx, msgDocModel, remainTime, index, conversationID, delStruct, db) + } + seq, err := db.deleteMsgRecursion(ctx, conversationID, index+1, delStruct, remainTime) + + return seq, err +} + +func handleFullAndExpiredForDeleteMsgRecursion(ctx context.Context, msgDocModel *unrelationtb.MsgDocModel, delStruct *delMsgRecursionStruct) { + log.ZDebug(ctx, "doc is full and all msg is expired", "docID", msgDocModel.DocID) + delStruct.delDocIDs = append(delStruct.delDocIDs, msgDocModel.DocID) + delStruct.minSeq = msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.Seq +} + +func handleNotFullAndExpiredForDeleteMsgRecursion(ctx context.Context, msgDocModel *unrelationtb.MsgDocModel, remainTime, index int64, conversationID string, delStruct *delMsgRecursionStruct, db *commonMsgDatabase) { + var delMsgIndexs []int + for i, MsgInfoModel := range msgDocModel.Msg { + if MsgInfoModel != nil && MsgInfoModel.Msg != nil { + if utils.GetCurrentTimestampByMill() > MsgInfoModel.Msg.SendTime+(remainTime*1000) { + delMsgIndexs = append(delMsgIndexs, i) } } - if len(delMsgIndexs) > 0 { - if err := db.msgDocDatabase.DeleteMsgsInOneDocByIndex(ctx, msgDocModel.DocID, delMsgIndexs); err != nil { - log.ZError(ctx, "deleteMsgRecursion DeleteMsgsInOneDocByIndex failed", err, "conversationID", conversationID, "index", index) - } - delStruct.minSeq = int64(msgDocModel.Msg[delMsgIndexs[len(delMsgIndexs)-1]].Msg.Seq) + } + if len(delMsgIndexs) > 0 { + err2 := db.msgDocDatabase.DeleteMsgsInOneDocByIndex(ctx, msgDocModel.DocID, delMsgIndexs) + if err2 != nil { + log.ZError(ctx, "deleteMsgRecursion DeleteMsgsInOneDocByIndex failed", err2, "conversationID", conversationID, "index", index) } + delStruct.minSeq = msgDocModel.Msg[delMsgIndexs[len(delMsgIndexs)-1]].Msg.Seq } - seq, err := db.deleteMsgRecursion(ctx, conversationID, index+1, delStruct, remainTime) - return seq, err } func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error { @@ -763,13 +854,15 @@ func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conve return err } } + return nil } func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error { cachedMsgs, _, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs) - if err != nil && errs.Unwrap(err) != redis.Nil { + if err != nil && errors.Is(err, redis.Nil) { log.ZWarn(ctx, "DeleteUserMsgsBySeqs", err, "conversationID", conversationID, "seqs", seqs) + return err } if len(cachedMsgs) > 0 { @@ -789,6 +882,7 @@ func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID st } } } + return nil } @@ -800,11 +894,12 @@ func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, u for _, conversationID := range conversationIDs { maxSeq, err := db.cache.GetMaxSeq(ctx, conversationID) if err != nil { - if err == redis.Nil { + if errors.Is(err, redis.Nil) { log.ZInfo(ctx, "max seq is nil", "conversationID", conversationID) } else { log.ZError(ctx, "get max seq failed", err, "conversationID", conversationID) } + continue } if err := db.cache.SetMinSeq(ctx, conversationID, maxSeq+1); err != nil { @@ -898,6 +993,7 @@ func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context if err != nil { return } + return } @@ -916,6 +1012,7 @@ func (db *commonMsgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversation return } maxSeqMongo = newestMsgMongo.Msg.Seq + return } @@ -943,7 +1040,7 @@ func (db *commonMsgDatabase) RangeGroupSendCount( } func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int32, msgData []*sdkws.MsgData, err error) { - var totalMsgs []*sdkws.MsgData + totalMsgs := make([]*sdkws.MsgData, 0) total, msgs, err := db.msgDocDatabase.SearchMessage(ctx, req) if err != nil { return 0, nil, err @@ -954,6 +1051,7 @@ func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.Searc } totalMsgs = append(totalMsgs, convert.MsgDB2Pb(msg.Msg)) } + return total, totalMsgs, nil } diff --git a/pkg/common/db/controller/msg_test.go b/pkg/common/db/controller/msg_test.go index 80e2db122..15448674b 100644 --- a/pkg/common/db/controller/msg_test.go +++ b/pkg/common/db/controller/msg_test.go @@ -162,6 +162,7 @@ func GetDB() *commonMsgDatabase { if err != nil { panic(err) } + return &commonMsgDatabase{ msgDocDatabase: unrelation.NewMsgMongoDriver(mongo.GetDatabase()), } diff --git a/pkg/common/db/controller/s3.go b/pkg/common/db/controller/s3.go index 6ef3e73b3..f848f15a4 100644 --- a/pkg/common/db/controller/s3.go +++ b/pkg/common/db/controller/s3.go @@ -89,5 +89,6 @@ func (s *s3Database) AccessURL(ctx context.Context, name string, expire time.Dur if err != nil { return time.Time{}, "", err } + return expireTime, rawURL, nil } diff --git a/pkg/common/db/controller/user.go b/pkg/common/db/controller/user.go index 9c6fdc5c4..d4a120f1c 100644 --- a/pkg/common/db/controller/user.go +++ b/pkg/common/db/controller/user.go @@ -90,6 +90,7 @@ func (u *userDatabase) InitOnce(ctx context.Context, users []*relation.UserModel if len(miss) > 0 { _ = u.userDB.Create(ctx, miss) } + return nil } @@ -102,30 +103,35 @@ func (u *userDatabase) FindWithError(ctx context.Context, userIDs []string) (use if len(users) != len(userIDs) { err = errs.ErrRecordNotFound.Wrap("userID not found") } + return } // Find Get the information of the specified user. If the userID is not found, no error will be returned. func (u *userDatabase) Find(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) { users, err = u.cache.GetUsersInfo(ctx, userIDs) + return } // Create Insert multiple external guarantees that the userID is not repeated and does not exist in the db. func (u *userDatabase) Create(ctx context.Context, users []*relation.UserModel) (err error) { - if err := u.tx.Transaction(func(tx any) error { + err = u.tx.Transaction(func(tx any) error { err = u.userDB.Create(ctx, users) if err != nil { return err } + return nil - }); err != nil { + }) + if err != nil { return err } - var userIDs []string + userIDs := make([]string, 0, len(users)) for _, user := range users { userIDs = append(userIDs, user.UserID) } + return u.cache.DelUsersInfo(userIDs...).ExecDel(ctx) } @@ -134,6 +140,7 @@ func (u *userDatabase) Update(ctx context.Context, user *relation.UserModel) (er if err := u.userDB.Update(ctx, user); err != nil { return err } + return u.cache.DelUsersInfo(user.UserID).ExecDel(ctx) } @@ -142,6 +149,7 @@ func (u *userDatabase) UpdateByMap(ctx context.Context, userID string, args map[ if err := u.userDB.UpdateByMap(ctx, userID, args); err != nil { return err } + return u.cache.DelUsersInfo(userID).ExecDel(ctx) } @@ -162,6 +170,7 @@ func (u *userDatabase) IsExist(ctx context.Context, userIDs []string) (exist boo if len(users) > 0 { return true, nil } + return false, nil } @@ -183,12 +192,14 @@ func (u *userDatabase) CountRangeEverydayTotal(ctx context.Context, start time.T // SubscribeUsersStatus Subscribe or unsubscribe a user's presence status. func (u *userDatabase) SubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error { err := u.mongoDB.AddSubscriptionList(ctx, userID, userIDs) + return err } // UnsubscribeUsersStatus unsubscribe a user's presence status. func (u *userDatabase) UnsubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error { err := u.mongoDB.UnsubscriptionList(ctx, userID, userIDs) + return err } @@ -198,6 +209,7 @@ func (u *userDatabase) GetAllSubscribeList(ctx context.Context, userID string) ( if err != nil { return nil, err } + return list, nil } @@ -207,12 +219,14 @@ func (u *userDatabase) GetSubscribedList(ctx context.Context, userID string) ([] if err != nil { return nil, err } + return list, nil } // GetUserStatus get user status. func (u *userDatabase) GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) { onlineStatusList, err := u.cache.GetUserStatus(ctx, userIDs) + return onlineStatusList, err }