diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go index 77979e153..f52ef6cd4 100644 --- a/pkg/common/db/cache/conversation.go +++ b/pkg/common/db/cache/conversation.go @@ -30,10 +30,10 @@ type ConversationCache interface { NewCache() ConversationCache // get user's conversationIDs from msgCache GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) - DelConversationIDs(userIDs []string) ConversationCache + DelConversationIDs(userIDs ...string) ConversationCache // get one conversation from msgCache GetConversation(ctx context.Context, ownerUserID, conversationID string) (*relationTb.ConversationModel, error) - DelConvsersations(ownerUserID string, conversationIDs []string) ConversationCache + DelConvsersations(ownerUserID string, conversationIDs ...string) ConversationCache DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache // get one conversation from msgCache GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) @@ -97,7 +97,7 @@ func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, own }) } -func (c *ConversationRedisCache) DelConversationIDs(userIDs []string) ConversationCache { +func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) ConversationCache { var keys []string for _, userID := range userIDs { keys = append(keys, c.getConversationIDsKey(userID)) @@ -113,7 +113,7 @@ func (c *ConversationRedisCache) GetConversation(ctx context.Context, ownerUserI }) } -func (c *ConversationRedisCache) DelConvsersations(ownerUserID string, convsersationIDs []string) ConversationCache { +func (c *ConversationRedisCache) DelConvsersations(ownerUserID string, convsersationIDs ...string) ConversationCache { var keys []string for _, conversationID := range convsersationIDs { keys = append(keys, c.getConversationKey(ownerUserID, conversationID)) diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 89f6c9967..99dfe46b4 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -7,9 +7,11 @@ import ( "time" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" + "github.com/dtm-labs/rockscache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" + unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" @@ -29,6 +31,7 @@ const ( getuiToken = "GETUI_TOKEN" getuiTaskID = "GETUI_TASK_ID" messageCache = "MESSAGE_CACHE:" + messageReadCache = "MESSAGE_READ_CACHE:" signalCache = "SIGNAL_CACHE:" signalListCache = "SIGNAL_LIST_CACHE:" fcmToken = "FCM_TOKEN:" @@ -84,6 +87,9 @@ type MsgModel interface { SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error + + GetMsgsByConversationIDAndSeq(ctx context.Context, docID string, seqs []int64) ([]*sdkws.MsgData, error) + DeleteMsgByConversationIDAndSeq(ctx context.Context, docID string, seq int64) MsgModel } func NewMsgCacheModel(client redis.UniversalClient) MsgModel { @@ -91,39 +97,11 @@ func NewMsgCacheModel(client redis.UniversalClient) MsgModel { } type msgCache struct { - rdb redis.UniversalClient -} - -// 兼容老版本调用 -func (c *msgCache) DelKeys() { - for _, key := range []string{"GROUP_CACHE:", "FRIEND_RELATION_CACHE:", "BLACK_LIST_CACHE:", "USER_INFO_CACHE:", "GROUP_INFO_CACHE:", "JOINED_GROUP_LIST_CACHE:", - "GROUP_MEMBER_INFO_CACHE:", "GROUP_ALL_MEMBER_INFO_CACHE:", "ALL_FRIEND_INFO_CACHE:"} { - fName := utils.GetSelfFuncName() - var cursor uint64 - var n int - for { - var keys []string - var err error - keys, cursor, err = c.rdb.Scan(context.Background(), cursor, key+"*", scanCount).Result() - if err != nil { - panic(err.Error()) - } - n += len(keys) - // for each for redis cluster - for _, key := range keys { - if err = c.rdb.Del(context.Background(), key).Err(); err != nil { - log.NewError("", fName, key, err.Error()) - err = c.rdb.Del(context.Background(), key).Err() - if err != nil { - panic(err.Error()) - } - } - } - if cursor == 0 { - break - } - } - } + metaCache + rdb redis.UniversalClient + expireTime time.Duration + rcClient *rockscache.Client + msgDocDatabase unRelationTb.MsgDocModelInterface } func (c *msgCache) getMaxSeqKey(conversationID string) string { @@ -145,7 +123,6 @@ func (c *msgCache) getSeq(ctx context.Context, conversationID string, getkey fun func (c *msgCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) { pipe := c.rdb.Pipeline() for _, v := range items { - log.ZDebug(ctx, "getSeqs", "getkey", getkey(v)) if err := pipe.Get(ctx, getkey(v)).Err(); err != nil && err != redis.Nil { return nil, errs.Wrap(err) } @@ -550,3 +527,41 @@ func (c *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID func (c *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err()) } + +func (c *msgCache) NewCache() MsgModel { + return &msgCache{ + metaCache: NewMetaCacheRedis(c.rcClient, c.metaCache.GetPreDelKeys()...), + expireTime: c.expireTime, + rcClient: c.rcClient, + } +} + +func (c msgCache) getMsgReadCacheKey(docID string, seq int64) string { + return messageReadCache + docID + "_" + strconv.Itoa(int(seq)) +} + +func (c *msgCache) getMsgsIndex(msg *sdkws.MsgData, keys []string) (int, error) { + key := c.getMsgReadCacheKey(utils.GetConversationIDByMsg(msg), msg.Seq) + for i, _key := range keys { + if key == _key { + return i, nil + } + } + return 0, errIndex +} + +func (c *msgCache) GetMsgsByConversationIDAndSeq(ctx context.Context, docID string, seqs []int64) ([]*sdkws.MsgData, error) { + var keys []string + for _, seq := range seqs { + keys = append(keys, c.getMsgReadCacheKey(docID, seq)) + } + return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getMsgsIndex, func(ctx context.Context) ([]*sdkws.MsgData, error) { + return c.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, seqs) + }) +} + +func (c *msgCache) DeleteMsgByConversationIDAndSeq(ctx context.Context, docID string, seq int64) MsgModel { + cache := c.NewCache() + c.AddKeys(c.getMsgReadCacheKey(docID, seq)) + return cache +} diff --git a/pkg/common/db/controller/black.go b/pkg/common/db/controller/black.go index 07380053a..7479722b8 100644 --- a/pkg/common/db/controller/black.go +++ b/pkg/common/db/controller/black.go @@ -2,10 +2,10 @@ package controller import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" - "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" - "gorm.io/gorm" + "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" ) type BlackDatabase interface { @@ -31,12 +31,26 @@ func NewBlackDatabase(black relation.BlackModelInterface, cache cache.BlackCache // Create 增加黑名单 func (b *blackDatabase) Create(ctx context.Context, blacks []*relation.BlackModel) (err error) { - return b.black.Create(ctx, blacks) + if err := b.black.Create(ctx, blacks); err != nil { + return err + } + return b.deleteBlackIDsCache(ctx, blacks) } // Delete 删除黑名单 func (b *blackDatabase) Delete(ctx context.Context, blacks []*relation.BlackModel) (err error) { - return b.black.Delete(ctx, blacks) + if err := b.black.Delete(ctx, blacks); err != nil { + return err + } + return b.deleteBlackIDsCache(ctx, blacks) +} + +func (b *blackDatabase) deleteBlackIDsCache(ctx context.Context, blacks []*relation.BlackModel) (err error) { + cache := b.cache.NewCache() + for _, black := range blacks { + cache = cache.DelBlackIDs(ctx, black.OwnerUserID) + } + return cache.ExecDel(ctx) } // FindOwnerBlacks 获取黑名单列表 @@ -46,21 +60,15 @@ func (b *blackDatabase) FindOwnerBlacks(ctx context.Context, ownerUserID string, // CheckIn 检查user2是否在user1的黑名单列表中(inUser1Blacks==true) 检查user1是否在user2的黑名单列表中(inUser2Blacks==true) func (b *blackDatabase) CheckIn(ctx context.Context, userID1, userID2 string) (inUser1Blacks bool, inUser2Blacks bool, err error) { - _, err = b.black.Take(ctx, userID1, userID2) + userID1BlackIDs, err := b.cache.GetBlackIDs(ctx, userID1) if err != nil { - if errs.Unwrap(err) != gorm.ErrRecordNotFound { - return - } + return } - inUser1Blacks = err == nil - _, err = b.black.Take(ctx, userID2, userID1) + userID2BlackIDs, err := b.cache.GetBlackIDs(ctx, userID2) if err != nil { - if errs.Unwrap(err) != gorm.ErrRecordNotFound { - return - } + return } - inUser2Blacks = err == nil - return inUser1Blacks, inUser2Blacks, nil + return utils.IsContain(userID2, userID1BlackIDs), utils.IsContain(userID1, userID2BlackIDs), nil } func (b *blackDatabase) FindBlackIDs(ctx context.Context, ownerUserID string) (blackIDs []string, err error) { diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go index aa3eec74d..bcdd36eba 100644 --- a/pkg/common/db/controller/conversation.go +++ b/pkg/common/db/controller/conversation.go @@ -46,14 +46,14 @@ type ConversationDataBase struct { tx tx.Tx } -func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error { - return c.tx.Transaction(func(tx any) error { +func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) (err error) { + cache := c.cache.NewCache() + if err := c.tx.Transaction(func(tx any) error { conversationTx := c.conversationDB.NewTx(tx) haveUserIDs, err := conversationTx.FindUserID(ctx, userIDs, []string{conversation.ConversationID}) if err != nil { return err } - cache := c.cache.NewCache() if len(haveUserIDs) > 0 { _, err = conversationTx.UpdateByMap(ctx, haveUserIDs, conversation.ConversationID, filedMap) if err != nil { @@ -71,19 +71,20 @@ func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, } temp.OwnerUserID = v conversations = append(conversations, temp) - } + } if len(conversations) > 0 { err = conversationTx.Create(ctx, conversations) if err != nil { return err } - cache = cache.DelConversationIDs(NotUserIDs) + cache = cache.DelConversationIDs(NotUserIDs...) } - // clear cache - log.ZDebug(ctx, "SetUsersConversationFiledTx", "cache", cache.GetPreDelKeys(), "addr", &cache) - return cache.ExecDel(ctx) - }) + return nil + }); err != nil { + return err + } + return cache.ExecDel(ctx) } func (c *ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) error { @@ -98,13 +99,17 @@ func (c *ConversationDataBase) CreateConversation(ctx context.Context, conversat if err := c.conversationDB.Create(ctx, conversations); err != nil { return err } - return nil + var userIDs []string + for _, conversation := range conversations { + userIDs = append(userIDs, conversation.OwnerUserID) + } + return c.cache.DelConversationIDs(userIDs...).ExecDel(ctx) } func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationTb.ConversationModel) error { - return c.tx.Transaction(func(tx any) error { + cache := c.cache.NewCache() + if err := c.tx.Transaction(func(tx any) error { conversationTx := c.conversationDB.NewTx(tx) - cache := c.cache.NewCache() for _, conversation := range conversations { for _, v := range [][2]string{{conversation.OwnerUserID, conversation.UserID}, {conversation.UserID, conversation.OwnerUserID}} { haveUserIDs, err := conversationTx.FindUserID(ctx, []string{v[0]}, []string{conversation.ConversationID}) @@ -126,12 +131,15 @@ func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Con if err := conversationTx.Create(ctx, []*relationTb.ConversationModel{&newConversation}); err != nil { return err } - cache = cache.DelConversationIDs([]string{v[0]}) + cache = cache.DelConversationIDs([]string{v[0]}...) } } } - return c.cache.ExecDel(ctx) - }) + return nil + }); err != nil { + return err + } + return c.cache.ExecDel(ctx) } func (c *ConversationDataBase) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) { @@ -147,7 +155,8 @@ func (c *ConversationDataBase) GetUserAllConversation(ctx context.Context, owner } func (c *ConversationDataBase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error { - return c.tx.Transaction(func(tx any) error { + cache := c.cache.NewCache() + if err := c.tx.Transaction(func(tx any) error { var conversationIDs []string for _, conversation := range conversations { conversationIDs = append(conversationIDs, conversation.ConversationID) @@ -181,13 +190,14 @@ func (c *ConversationDataBase) SetUserConversations(ctx context.Context, ownerUs if err != nil { return err } + cache = cache.DelConversationIDs([]string{ownerUserID}...) } - cache := c.cache.NewCache() - if len(notExistConversations) > 0 { - cache = cache.DelConversationIDs([]string{ownerUserID}) - } - return cache.DelConvsersations(ownerUserID, existConversationIDs).ExecDel(ctx) - }) + cache = cache.DelConvsersations(ownerUserID, existConversationIDs...) + return nil + }); err != nil { + return err + } + return cache.ExecDel(ctx) } func (c *ConversationDataBase) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) { @@ -195,27 +205,36 @@ func (c *ConversationDataBase) FindRecvMsgNotNotifyUserIDs(ctx context.Context, } func (c *ConversationDataBase) CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error { + cache := c.cache.NewCache() conversationID := utils.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID) - return c.tx.Transaction(func(tx any) error { - existConversationUserIDs, err := c.conversationDB.FindUserID(ctx, userIDs, []string{groupID}) + if err := c.tx.Transaction(func(tx any) error { + existConversationUserIDs, err := c.conversationDB.FindUserID(ctx, userIDs, []string{conversationID}) if err != nil { return err } notExistUserIDs := utils.DifferenceString(userIDs, existConversationUserIDs) - var conversations []*relationTb.ConversationModel for _, v := range notExistUserIDs { conversation := relationTb.ConversationModel{ConversationType: constant.SuperGroupChatType, GroupID: groupID, OwnerUserID: v, ConversationID: conversationID} conversations = append(conversations, &conversation) } + cache = cache.DelConversationIDs(notExistUserIDs...) err = c.conversationDB.Create(ctx, conversations) if err != nil { return err } - _, err = c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, map[string]interface{}{"max_seq": 0}) + _, err = c.conversationDB.UpdateByMap(ctx, existConversationUserIDs, conversationID, map[string]interface{}{"max_seq": 0}) + if err != nil { + return err + } + for _, v := range existConversationUserIDs { + cache = cache.DelConvsersations(v, conversationID) + } + return nil + }); err != nil { return err - }) - + } + return cache.ExecDel(ctx) } func (c *ConversationDataBase) GetConversationIDs(ctx context.Context, userID string) ([]string, error) { diff --git a/pkg/common/db/controller/friend.go b/pkg/common/db/controller/friend.go index e24fc9165..1c3fc0364 100644 --- a/pkg/common/db/controller/friend.go +++ b/pkg/common/db/controller/friend.go @@ -55,19 +55,15 @@ func NewFriendDatabase(friend relation.FriendModelInterface, friendRequest relat // ok 检查user2是否在user1的好友列表中(inUser1Friends==true) 检查user1是否在user2的好友列表中(inUser2Friends==true) func (f *friendDatabase) CheckIn(ctx context.Context, userID1, userID2 string) (inUser1Friends bool, inUser2Friends bool, err error) { - friends, err := f.friend.FindUserState(ctx, userID1, userID2) + userID1FriendIDs, err := f.cache.GetFriendIDs(ctx, userID1) if err != nil { - return false, false, err + return } - for _, v := range friends { - if v.OwnerUserID == userID1 && v.FriendUserID == userID2 { - inUser1Friends = true - } - if v.OwnerUserID == userID2 && v.FriendUserID == userID1 { - inUser2Friends = true - } + userID2FriendIDs, err := f.cache.GetFriendIDs(ctx, userID2) + if err != nil { + return } - return + return utils.IsContain(userID2, userID1FriendIDs), utils.IsContain(userID1, userID2FriendIDs), nil } // 增加或者更新好友申请 如果之前有记录则更新,没有记录则新增 @@ -100,7 +96,8 @@ func (f *friendDatabase) AddFriendRequest(ctx context.Context, fromUserID, toUse // (1)先判断是否在好友表 (在不在都不返回错误) (2)对于不在好友列表的 插入即可 func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string, friendUserIDs []string, addSource int32) (err error) { - return f.tx.Transaction(func(tx any) error { + cache := f.cache.NewCache() + if err := f.tx.Transaction(func(tx any) error { //先find 找出重复的 去掉重复的 fs1, err := f.friend.NewTx(tx).FindFriends(ctx, ownerUserID, friendUserIDs) if err != nil { @@ -135,8 +132,12 @@ func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string, return err } newFriendIDs = append(newFriendIDs, ownerUserID) - return f.cache.DelFriendIDs(newFriendIDs...).ExecDel(ctx) - }) + cache = cache.DelFriendIDs(newFriendIDs...) + return nil + }); err != nil { + return nil + } + return cache.ExecDel(ctx) } // 拒绝好友申请 (1)检查是否有申请记录且为未处理状态 (没有记录返回错误) (2)修改申请记录 已拒绝 @@ -199,24 +200,18 @@ func (f *friendDatabase) AgreeFriendRequest(ctx context.Context, friendRequest * // 删除好友 外部判断是否好友关系 func (f *friendDatabase) Delete(ctx context.Context, ownerUserID string, friendUserIDs []string) (err error) { - return f.tx.Transaction(func(tx any) error { - if err := f.friend.Delete(ctx, ownerUserID, friendUserIDs); err != nil { - return err - } - return f.cache.DelFriendIDs(append(friendUserIDs, ownerUserID)...).ExecDel(ctx) - }) - + if err := f.friend.Delete(ctx, ownerUserID, friendUserIDs); err != nil { + return err + } + return f.cache.DelFriendIDs(append(friendUserIDs, ownerUserID)...).ExecDel(ctx) } // 更新好友备注 零值也支持 func (f *friendDatabase) UpdateRemark(ctx context.Context, ownerUserID, friendUserID, remark string) (err error) { - return f.tx.Transaction(func(tx any) error { - err := f.friend.UpdateRemark(ctx, ownerUserID, friendUserID, remark) - if err != nil { - return err - } - return f.cache.DelFriend(ownerUserID, friendUserID).ExecDel(ctx) - }) + if err := f.friend.UpdateRemark(ctx, ownerUserID, friendUserID, remark); err != nil { + return err + } + return f.cache.DelFriend(ownerUserID, friendUserID).ExecDel(ctx) } // 获取ownerUserID的好友列表 无结果不返回错误 diff --git a/pkg/common/db/controller/group.go b/pkg/common/db/controller/group.go index 57c5f1fb2..7f5cc4101 100644 --- a/pkg/common/db/controller/group.go +++ b/pkg/common/db/controller/group.go @@ -113,7 +113,8 @@ func (g *groupDatabase) FindGroupMemberUserID(ctx context.Context, groupID strin } func (g *groupDatabase) CreateGroup(ctx context.Context, groups []*relationTb.GroupModel, groupMembers []*relationTb.GroupMemberModel) error { - return g.tx.Transaction(func(tx any) error { + var cache = g.cache.NewCache() + if err := g.tx.Transaction(func(tx any) error { if len(groups) > 0 { if err := g.groupDB.NewTx(tx).Create(ctx, groups); err != nil { return err @@ -128,7 +129,7 @@ func (g *groupDatabase) CreateGroup(ctx context.Context, groups []*relationTb.Gr return group.GroupID }) m := make(map[string]struct{}) - var cache = g.cache.NewCache() + for _, groupMember := range groupMembers { if _, ok := m[groupMember.GroupID]; !ok { m[groupMember.GroupID] = struct{}{} @@ -137,8 +138,11 @@ func (g *groupDatabase) CreateGroup(ctx context.Context, groups []*relationTb.Gr cache = cache.DelJoinedGroupID(groupMember.UserID).DelGroupMembersInfo(groupMember.GroupID, groupMember.UserID) } cache = cache.DelGroupsInfo(createGroupIDs...) - return cache.ExecDel(ctx) - }) + return nil + }); err != nil { + return err + } + return cache.ExecDel(ctx) } func (g *groupDatabase) TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) { @@ -154,16 +158,15 @@ func (g *groupDatabase) SearchGroup(ctx context.Context, keyword string, pageNum } func (g *groupDatabase) UpdateGroup(ctx context.Context, groupID string, data map[string]any) error { - return g.tx.Transaction(func(tx any) error { - if err := g.groupDB.NewTx(tx).UpdateMap(ctx, groupID, data); err != nil { - return err - } - return g.cache.DelGroupsInfo(groupID).ExecDel(ctx) - }) + if err := g.groupDB.UpdateMap(ctx, groupID, data); err != nil { + return err + } + return g.cache.DelGroupsInfo(groupID).ExecDel(ctx) } func (g *groupDatabase) DismissGroup(ctx context.Context, groupID string) error { - return g.tx.Transaction(func(tx any) error { + cache := g.cache.NewCache() + if err := g.tx.Transaction(func(tx any) error { if err := g.groupDB.NewTx(tx).UpdateStatus(ctx, groupID, constant.GroupStatusDismissed); err != nil { return err } @@ -174,8 +177,12 @@ func (g *groupDatabase) DismissGroup(ctx context.Context, groupID string) error if err != nil { return err } - return g.cache.DelJoinedGroupID(userIDs...).DelGroupsInfo(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelGroupMembersHash(groupID).ExecDel(ctx) - }) + cache = cache.DelJoinedGroupID(userIDs...).DelGroupsInfo(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelGroupMembersHash(groupID) + return nil + }); err != nil { + return err + } + return cache.ExecDel(ctx) } func (g *groupDatabase) TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationTb.GroupMemberModel, err error) { @@ -236,7 +243,8 @@ func (g *groupDatabase) SearchGroupMember(ctx context.Context, keyword string, g } func (g *groupDatabase) HandlerGroupRequest(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32, member *relationTb.GroupMemberModel) error { - return g.tx.Transaction(func(tx any) error { + cache := g.cache.NewCache() + if err := g.tx.Transaction(func(tx any) error { if err := g.groupRequestDB.NewTx(tx).UpdateHandler(ctx, groupID, userID, handledMsg, handleResult); err != nil { return err } @@ -244,19 +252,20 @@ func (g *groupDatabase) HandlerGroupRequest(ctx context.Context, groupID string, if err := g.groupMemberDB.NewTx(tx).Create(ctx, []*relationTb.GroupMemberModel{member}); err != nil { return err } - return g.cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(member.UserID).ExecDel(ctx) + cache = cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(member.UserID) } return nil - }) + }); err != nil { + return err + } + return cache.ExecDel(ctx) } func (g *groupDatabase) DeleteGroupMember(ctx context.Context, groupID string, userIDs []string) error { - return g.tx.Transaction(func(tx any) error { - if err := g.groupMemberDB.NewTx(tx).Delete(ctx, groupID, userIDs); err != nil { - return err - } - return g.cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(userIDs...).DelGroupMembersInfo(groupID, userIDs...).ExecDel(ctx) - }) + if err := g.groupMemberDB.Delete(ctx, groupID, userIDs); err != nil { + return err + } + return g.cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(userIDs...).DelGroupMembersInfo(groupID, userIDs...).ExecDel(ctx) } func (g *groupDatabase) MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) { @@ -276,7 +285,7 @@ func (g *groupDatabase) MapGroupMemberNum(ctx context.Context, groupIDs []string } func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error { - return g.tx.Transaction(func(tx any) error { + if err := g.tx.Transaction(func(tx any) error { rowsAffected, err := g.groupMemberDB.NewTx(tx).UpdateRoleLevel(ctx, groupID, oldOwnerUserID, roleLevel) if err != nil { return err @@ -291,30 +300,34 @@ func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string, if rowsAffected != 1 { return utils.Wrap(fmt.Errorf("newOwnerUserID %s rowsAffected = %d", newOwnerUserID, rowsAffected), "") } - return g.cache.DelGroupMembersInfo(groupID, oldOwnerUserID, newOwnerUserID).ExecDel(ctx) - }) + return nil + }); err != nil { + return err + } + return g.cache.DelGroupMembersInfo(groupID, oldOwnerUserID, newOwnerUserID).ExecDel(ctx) } func (g *groupDatabase) UpdateGroupMember(ctx context.Context, groupID string, userID string, data map[string]any) error { - return g.tx.Transaction(func(tx any) error { - if err := g.groupMemberDB.NewTx(tx).Update(ctx, groupID, userID, data); err != nil { - return err - } - return g.cache.DelGroupMembersInfo(groupID, userID).ExecDel(ctx) - }) + if err := g.groupMemberDB.Update(ctx, groupID, userID, data); err != nil { + return err + } + return g.cache.DelGroupMembersInfo(groupID, userID).ExecDel(ctx) } func (g *groupDatabase) UpdateGroupMembers(ctx context.Context, data []*relationTb.BatchUpdateGroupMember) error { - return g.tx.Transaction(func(tx any) error { - var cache = g.cache.NewCache() + var cache = g.cache.NewCache() + if err := g.tx.Transaction(func(tx any) error { for _, item := range data { if err := g.groupMemberDB.NewTx(tx).Update(ctx, item.GroupID, item.UserID, item.Map); err != nil { return err } cache = cache.DelGroupMembersInfo(item.GroupID, item.UserID) } - return cache.ExecDel(ctx) - }) + return nil + }); err != nil { + return err + } + return cache.ExecDel(ctx) } func (g *groupDatabase) CreateGroupRequest(ctx context.Context, requests []*relationTb.GroupRequestModel) error { @@ -346,16 +359,15 @@ func (g *groupDatabase) FindJoinSuperGroup(ctx context.Context, userID string) ( } func (g *groupDatabase) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDs []string) error { - return g.ctxTx.Transaction(ctx, func(ctx context.Context) error { - if err := g.mongoDB.CreateSuperGroup(ctx, groupID, initMemberIDs); err != nil { - return err - } - return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(initMemberIDs...).ExecDel(ctx) - }) + if err := g.mongoDB.CreateSuperGroup(ctx, groupID, initMemberIDs); err != nil { + return err + } + return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(initMemberIDs...).ExecDel(ctx) } func (g *groupDatabase) DeleteSuperGroup(ctx context.Context, groupID string) error { - return g.ctxTx.Transaction(ctx, func(ctx context.Context) error { + cache := g.cache.NewCache() + if err := g.ctxTx.Transaction(ctx, func(ctx context.Context) error { if err := g.mongoDB.DeleteSuperGroup(ctx, groupID); err != nil { return err } @@ -363,28 +375,27 @@ func (g *groupDatabase) DeleteSuperGroup(ctx context.Context, groupID string) er if err != nil { return err } - cache := g.cache.DelSuperGroupMemberIDs(groupID) + cache = cache.DelSuperGroupMemberIDs(groupID) if len(models) > 0 { cache = cache.DelJoinedSuperGroupIDs(models[0].MemberIDs...) } - return cache.ExecDel(ctx) - }) + return nil + }); err != nil { + return err + } + return cache.ExecDel(ctx) } func (g *groupDatabase) DeleteSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error { - return g.ctxTx.Transaction(ctx, func(ctx context.Context) error { - if err := g.mongoDB.RemoverUserFromSuperGroup(ctx, groupID, userIDs); err != nil { - return err - } - return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx) - }) + if err := g.mongoDB.RemoverUserFromSuperGroup(ctx, groupID, userIDs); err != nil { + return err + } + return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx) } func (g *groupDatabase) CreateSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error { - return g.ctxTx.Transaction(ctx, func(ctx context.Context) error { - if err := g.mongoDB.AddUserToSuperGroup(ctx, groupID, userIDs); err != nil { - return err - } - return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx) - }) + if err := g.mongoDB.AddUserToSuperGroup(ctx, groupID, userIDs); err != nil { + return err + } + return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx) } diff --git a/pkg/common/db/controller/common_msg.go b/pkg/common/db/controller/msg.go similarity index 95% rename from pkg/common/db/controller/common_msg.go rename to pkg/common/db/controller/msg.go index 819d20811..d544e0595 100644 --- a/pkg/common/db/controller/common_msg.go +++ b/pkg/common/db/controller/msg.go @@ -100,13 +100,12 @@ type commonMsgDatabase struct { msgDocDatabase unRelationTb.MsgDocModelInterface extendMsgDatabase unRelationTb.ExtendMsgSetModelInterface extendMsgSetModel unRelationTb.ExtendMsgSetModel + msg unRelationTb.MsgDocModel cache cache.MsgModel producer *kafka.Producer producerToMongo *kafka.Producer producerToModify *kafka.Producer producerToPush *kafka.Producer - // model - msg unRelationTb.MsgDocModel } func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error { @@ -277,7 +276,7 @@ func (db *commonMsgDatabase) DelMsgBySeqs(ctx context.Context, conversationID st } func (db *commonMsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) { - seqMsgs, indexes, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs) + seqMsgs, indexes, unExistSeqs, err := db.msgDocDatabase.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs) if err != nil { return nil, err } @@ -289,37 +288,6 @@ func (db *commonMsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID str return unExistSeqs, nil } -func (db *commonMsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) { - doc, err := db.msgDocDatabase.FindOneByDocID(ctx, docID) - if err != nil { - return nil, nil, nil, err - } - singleCount := 0 - var hasSeqList []int64 - for i := 0; i < len(doc.Msg); i++ { - msgPb, err := db.unmarshalMsg(&doc.Msg[i]) - if err != nil { - return nil, nil, nil, err - } - if utils.Contain(msgPb.Seq, seqs...) { - indexes = append(indexes, i) - seqMsgs = append(seqMsgs, msgPb) - hasSeqList = append(hasSeqList, msgPb.Seq) - singleCount++ - if singleCount == len(seqs) { - break - } - } - } - for _, i := range seqs { - if utils.Contain(i, hasSeqList...) { - continue - } - unExistSeqs = append(unExistSeqs, i) - } - return seqMsgs, indexes, unExistSeqs, nil -} - func (db *commonMsgDatabase) GetNewestMsg(ctx context.Context, conversationID string) (msgPb *sdkws.MsgData, err error) { msgInfo, err := db.msgDocDatabase.GetNewestMsg(ctx, conversationID) if err != nil { @@ -395,12 +363,11 @@ func (db *commonMsgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversatio } func (db *commonMsgDatabase) findMsgBySeq(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, unExistSeqs []int64, err error) { - beginSeq, endSeq := db.msg.GetSeqsBeginEnd(seqs) - msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, beginSeq, endSeq) + msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, seqs) if err != nil { return nil, nil, err } - log.ZDebug(ctx, "findMsgBySeq", "docID", docID, "seqs", seqs, "beginSeq", beginSeq, "endSeq", endSeq, "len(msgs)", len(msgs)) + log.ZDebug(ctx, "findMsgBySeq", "docID", docID, "seqs", seqs, "len(msgs)", len(msgs)) seqMsgs = append(seqMsgs, msgs...) if len(msgs) == 0 { unExistSeqs = seqs @@ -416,7 +383,7 @@ func (db *commonMsgDatabase) findMsgBySeq(ctx context.Context, docID string, seq } } } - msgs, _, unExistSeqs, err = db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, unExistSeqs) + msgs, _, unExistSeqs, err = db.msgDocDatabase.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, unExistSeqs) if err != nil { return nil, nil, err } @@ -446,7 +413,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, conversation m = db.msg.GetDocIDSeqsMap(conversationID, totalNotExistSeqs) for docID, seqs := range m { docID = db.msg.ToNextDoc(docID) - msgs, _, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs) + msgs, _, unExistSeqs, err := db.msgDocDatabase.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs) if err != nil { missedSeqs = append(missedSeqs, seqs...) log.ZError(ctx, "get message from mongo exception", err, "docID", docID, "seqs", seqs) @@ -477,7 +444,6 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, conversation if len(totalNotExistSeqs) > 0 || len(delSeqs) > 0 { sort.Sort(utils.MsgBySeq(seqMsgs)) } - // missSeqs为依然缺失的 return seqMsgs, nil } diff --git a/pkg/common/db/controller/user.go b/pkg/common/db/controller/user.go index 019942cda..72b1a9c49 100644 --- a/pkg/common/db/controller/user.go +++ b/pkg/common/db/controller/user.go @@ -76,40 +76,36 @@ func (u *userDatabase) Find(ctx context.Context, userIDs []string) (users []*rel // 插入多条 外部保证userID 不重复 且在db中不存在 func (u *userDatabase) Create(ctx context.Context, users []*relation.UserModel) (err error) { - return u.tx.Transaction(func(tx any) error { + if err := u.tx.Transaction(func(tx any) error { err = u.userDB.Create(ctx, users) if err != nil { return err } - var userIDs []string - for _, user := range users { - userIDs = append(userIDs, user.UserID) - } - return u.cache.DelUsersInfo(userIDs...).ExecDel(ctx) - }) + return nil + }); err != nil { + return err + } + var userIDs []string + for _, user := range users { + userIDs = append(userIDs, user.UserID) + } + return u.cache.DelUsersInfo(userIDs...).ExecDel(ctx) } // 更新(非零值) 外部保证userID存在 func (u *userDatabase) Update(ctx context.Context, user *relation.UserModel) (err error) { - return u.tx.Transaction(func(tx any) error { - err = u.userDB.Update(ctx, user) - if err != nil { - return err - } - return u.cache.DelUsersInfo(user.UserID).ExecDel(ctx) - }) + if err := u.userDB.Update(ctx, user); err != nil { + return err + } + return u.cache.DelUsersInfo(user.UserID).ExecDel(ctx) } // 更新(零值) 外部保证userID存在 func (u *userDatabase) UpdateByMap(ctx context.Context, userID string, args map[string]interface{}) (err error) { - return u.tx.Transaction(func(tx any) error { - err = u.userDB.UpdateByMap(ctx, userID, args) - if err != nil { - return err - } - return u.cache.DelUsersInfo(userID).ExecDel(ctx) - }) - + if err := u.userDB.UpdateByMap(ctx, userID, args); err != nil { + return err + } + return u.cache.DelUsersInfo(userID).ExecDel(ctx) } // 获取,如果没找到,不返回错误 diff --git a/pkg/common/db/table/unrelation/msg.go b/pkg/common/db/table/unrelation/msg.go index 6e6916cb1..7d67269d5 100644 --- a/pkg/common/db/table/unrelation/msg.go +++ b/pkg/common/db/table/unrelation/msg.go @@ -31,7 +31,8 @@ type MsgDocModelInterface interface { Create(ctx context.Context, model *MsgDocModel) error UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error) - GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, beginSeq, endSeq int64) ([]*sdkws.MsgData, error) + GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) ([]*sdkws.MsgData, error) + GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) GetNewestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error) GetOldestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error) Delete(ctx context.Context, docIDs []string) error @@ -97,13 +98,6 @@ func (m MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[st return t } -func (m MsgDocModel) GetSeqsBeginEnd(seqs []int64) (int64, int64) { - if len(seqs) == 0 { - return 0, 0 - } - return seqs[len(seqs)-1], seqs[0] -} - func (m MsgDocModel) GetMsgIndex(seq int64) int64 { seqSuffix := seq / singleGocMsgNum var index int64 diff --git a/pkg/common/db/unrelation/msg.go b/pkg/common/db/unrelation/msg.go index b0ace47ae..7e738864b 100644 --- a/pkg/common/db/unrelation/msg.go +++ b/pkg/common/db/unrelation/msg.go @@ -57,6 +57,37 @@ func (m *MsgMongoDriver) FindOneByDocID(ctx context.Context, docID string) (*tab return doc, err } +func (m *MsgMongoDriver) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) { + doc, err := m.FindOneByDocID(ctx, docID) + if err != nil { + return nil, nil, nil, err + } + singleCount := 0 + var hasSeqList []int64 + for i := 0; i < len(doc.Msg); i++ { + var msg sdkws.MsgData + if err := proto.Unmarshal(doc.Msg[i].Msg, &msg); err != nil { + return nil, nil, nil, err + } + if utils.Contain(msg.Seq, seqs...) { + indexes = append(indexes, i) + seqMsgs = append(seqMsgs, &msg) + hasSeqList = append(hasSeqList, msg.Seq) + singleCount++ + if singleCount == len(seqs) { + break + } + } + } + for _, i := range seqs { + if utils.Contain(i, hasSeqList...) { + continue + } + unExistSeqs = append(unExistSeqs, i) + } + return seqMsgs, indexes, unExistSeqs, nil +} + func (m *MsgMongoDriver) GetMsgsByIndex(ctx context.Context, conversationID string, index int64) (*table.MsgDocModel, error) { findOpts := options.Find().SetLimit(1).SetSkip(index).SetSort(bson.M{"doc_id": 1}) cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)}}, findOpts) @@ -134,7 +165,8 @@ func (m *MsgMongoDriver) UpdateOneDoc(ctx context.Context, msg *table.MsgDocMode return err } -func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, beginSeq, endSeq int64) (msgs []*sdkws.MsgData, err error) { +func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) (msgs []*sdkws.MsgData, err error) { + beginSeq, endSeq := utils.GetSeqsBeginEnd(seqs) beginIndex := m.msg.GetMsgIndex(beginSeq) num := endSeq - beginSeq + 1 pipeline := bson.A{ diff --git a/pkg/proto/friend/validate.go b/pkg/proto/friend/validate.go index aea3d09e5..9a864cfa5 100644 --- a/pkg/proto/friend/validate.go +++ b/pkg/proto/friend/validate.go @@ -3,7 +3,6 @@ package friend import "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" func (m *ApplyToAddFriendReq) Check() error { - *m = ApplyToAddFriendReq{} if m.GetToUserID() == "" { return errs.ErrArgs.Wrap("get toUserID is empty") } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 4e28b631c..8f8af972d 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -252,6 +252,13 @@ func GetSelfNotificationConversationID(userID string) []string { return []string{"n_" + userID + "_" + userID, "si_" + userID + "_" + userID} } +func GetSeqsBeginEnd(seqs []int64) (int64, int64) { + if len(seqs) == 0 { + return 0, 0 + } + return seqs[len(seqs)-1], seqs[0] +} + type MsgBySeq []*sdkws.MsgData func (s MsgBySeq) Len() int {