diff --git a/pkg/common/db/cache/black.go b/pkg/common/db/cache/black.go index fda6ea04d..ec8fbcc42 100644 --- a/pkg/common/db/cache/black.go +++ b/pkg/common/db/cache/black.go @@ -2,11 +2,13 @@ package cache import ( "Open_IM/pkg/common/db/relation" + "Open_IM/pkg/common/db/table" "Open_IM/pkg/common/tracelog" "Open_IM/pkg/utils" "context" "encoding/json" "github.com/dtm-labs/rockscache" + "github.com/go-redis/redis/v8" "time" ) @@ -16,16 +18,16 @@ const ( ) type BlackCache struct { - blackDB *relation.Black + blackDB *table.BlackModel expireTime time.Duration rcClient *rockscache.Client } -func NewBlackCache(blackDB *relation.Black) *BlackCache { +func NewBlackCache(rdb redis.UniversalClient, blackDB *relation.BlackGorm, options rockscache.Options) *BlackCache { return &BlackCache{ - blackDB: nil, - expireTime: 0, - rcClient: nil, + blackDB: blackDB, + expireTime: blackExpireTime, + rcClient: rockscache.NewClient(rdb, options), } } @@ -48,7 +50,7 @@ func (b *BlackCache) GetBlackIDs(ctx context.Context, userID string) (blackIDs [ defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "blackIDList", blackIDs) }() - blackIDListStr, err := b.rcClient.Fetch(blackListCache+userID, time.Second*30*60, getBlackIDList) + blackIDListStr, err := b.rcClient.Fetch(blackListCache+userID, b.expireTime, getBlackIDList) if err != nil { return nil, utils.Wrap(err, "") } diff --git a/pkg/common/db/cache/friend.go b/pkg/common/db/cache/friend.go index 65d128159..8e03279c9 100644 --- a/pkg/common/db/cache/friend.go +++ b/pkg/common/db/cache/friend.go @@ -2,8 +2,10 @@ package cache import ( "Open_IM/pkg/common/db/relation" + "Open_IM/pkg/common/db/table" "Open_IM/pkg/common/tracelog" "Open_IM/pkg/utils" + "Open_IM/pkg/utilsv2" "context" "encoding/json" "github.com/dtm-labs/rockscache" @@ -12,17 +14,19 @@ import ( ) const ( - friendExpireTime = time.Second * 60 * 60 * 12 - friendIDsKey = "FRIEND_IDS:" + friendExpireTime = time.Second * 60 * 60 * 12 + friendIDsKey = "FRIEND_IDS:" + TwoWayFriendsIDsKey = "COMMON_FRIENDS_IDS:" + friendKey = "FRIEND_INFO:" ) type FriendCache struct { - friendDB *relation.Friend + friendDB *relation.FriendGorm expireTime time.Duration rcClient *rockscache.Client } -func NewFriendCache(rdb redis.UniversalClient, friendDB *relation.Friend, options rockscache.Options) *FriendCache { +func NewFriendCache(rdb redis.UniversalClient, friendDB *relation.FriendGorm, options rockscache.Options) *FriendCache { return &FriendCache{ friendDB: friendDB, expireTime: friendExpireTime, @@ -30,17 +34,25 @@ func NewFriendCache(rdb redis.UniversalClient, friendDB *relation.Friend, option } } -func (f *FriendCache) getFriendRelationKey(ownerUserID string) string { +func (f *FriendCache) getFriendIDsKey(ownerUserID string) string { return friendIDsKey + ownerUserID } +func (f *FriendCache) getTwoWayFriendsIDsKey(ownerUserID string) string { + return TwoWayFriendsIDsKey + ownerUserID +} + +func (f *FriendCache) getFriendKey(ownerUserID, friendUserID string) string { + return friendKey + ownerUserID + "-" + friendUserID +} + func (f *FriendCache) GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) { - getFriendIDList := func() (string, error) { - friendIDList, err := f.friendDB.GetFriendIDs(ctx, ownerUserID) + getFriendIDs := func() (string, error) { + friendIDs, err := f.friendDB.GetFriendIDs(ctx, ownerUserID) if err != nil { return "", err } - bytes, err := json.Marshal(friendIDList) + bytes, err := json.Marshal(friendIDs) if err != nil { return "", utils.Wrap(err, "") } @@ -49,11 +61,11 @@ func (f *FriendCache) GetFriendIDs(ctx context.Context, ownerUserID string) (fri defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "friendIDs", friendIDs) }() - friendIDListStr, err := f.rcClient.Fetch(f.getFriendRelationKey(ownerUserID), f.expireTime, getFriendIDList) + friendIDsStr, err := f.rcClient.Fetch(f.getFriendIDsKey(ownerUserID), f.expireTime, getFriendIDs) if err != nil { return nil, err } - err = json.Unmarshal([]byte(friendIDListStr), &friendIDs) + err = json.Unmarshal([]byte(friendIDsStr), &friendIDs) return friendIDs, utils.Wrap(err, "") } @@ -61,5 +73,57 @@ func (f *FriendCache) DelFriendIDs(ctx context.Context, ownerUserID string) (err defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID) }() - return f.rcClient.TagAsDeleted(f.getFriendRelationKey(ownerUserID)) + return f.rcClient.TagAsDeleted(f.getFriendIDsKey(ownerUserID)) +} + +func (f *FriendCache) GetTwoWayFriendIDs(ctx context.Context, ownerUserID string) (twoWayFriendIDs []string, err error) { + friendIDs, err := f.GetFriendIDs(ctx, ownerUserID) + if err != nil { + return nil, err + } + for _, friendID := range friendIDs { + friendFriendID, err := f.GetFriendIDs(ctx, friendID) + if err != nil { + return nil, err + } + if utils.IsContain(ownerUserID, friendFriendID) { + twoWayFriendIDs = append(twoWayFriendIDs, ownerUserID) + } + } + return twoWayFriendIDs, nil +} + +func (f *FriendCache) DelTwoWayFriendIDs(ctx context.Context, ownerUserID string) (err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID) + }() + return f.rcClient.TagAsDeleted(f.getTwoWayFriendsIDsKey(ownerUserID)) +} + +func (f *FriendCache) GetFriend(ctx context.Context, ownerUserID, friendUserID string) (friend *table.FriendModel, err error) { + getFriend := func() (string, error) { + friend, err = f.friendDB.Take(ctx, ownerUserID, friendUserID) + if err != nil { + return "", err + } + bytes, err := json.Marshal(friend) + if err != nil { + return "", utils.Wrap(err, "") + } + return string(bytes), nil + } + friendStr, err := f.rcClient.Fetch(f.getFriendKey(ownerUserID, friendUserID), f.expireTime, getFriend) + if err != nil { + return nil, err + } + friend = &table.FriendModel{} + err = json.Unmarshal([]byte(friendStr), friend) + return friend, utils.Wrap(err, "") +} + +func (f *FriendCache) DelFriend(ctx context.Context, ownerUserID, friendUserID string) (err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "friendUserID", friendUserID) + }() + return f.rcClient.TagAsDeleted(f.getFriendKey(ownerUserID, friendUserID)) } diff --git a/pkg/common/db/cache/group.go b/pkg/common/db/cache/group.go index 77c96c214..83fa95f8d 100644 --- a/pkg/common/db/cache/group.go +++ b/pkg/common/db/cache/group.go @@ -33,20 +33,18 @@ type GroupCache struct { group *relation.GroupGorm groupMember *relation.GroupMemberGorm groupRequest *relation.GroupRequestGorm - mongoDB *unrelation.SuperGroupMgoDB + mongoDB *unrelation.SuperGroupMongoDriver expireTime time.Duration redisClient *RedisClient rcClient *rockscache.Client //local cache - cacheGroupMtx sync.RWMutex - cacheGroupMemberUserIDs map[string]*localcache.GroupMemberIDsHash } -func NewGroupCache(rdb redis.UniversalClient, groupDB *relation.GroupGorm, groupMemberDB *relation.GroupMemberGorm, groupRequestDB *relation.GroupRequestGorm, mongoClient *unrelation.SuperGroupMgoDB, opts rockscache.Options) *GroupCache { +func NewGroupCache(rdb redis.UniversalClient, groupDB *relation.GroupGorm, groupMemberDB *relation.GroupMemberGorm, groupRequestDB *relation.GroupRequestGorm, mongoClient *unrelation.SuperGroupMongoDriver, opts rockscache.Options) *GroupCache { return &GroupCache{rcClient: rockscache.NewClient(rdb, opts), expireTime: groupExpireTime, group: groupDB, groupMember: groupMemberDB, groupRequest: groupRequestDB, redisClient: NewRedisClient(rdb), - mongoDB: mongoClient, cacheGroupMemberUserIDs: make(map[string]*localcache.GroupMemberIDsHash, 0), + mongoDB: mongoClient, } } @@ -260,36 +258,6 @@ func (g *GroupCache) DelGroupMemberIDs(ctx context.Context, groupID string) (err return g.rcClient.TagAsDeleted(g.getGroupMemberIDsKey(groupID)) } -// from local map -func (g *GroupCache) LocalGetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) { - remoteHash, err := g.GetGroupMembersHash(ctx, groupID) - if err != nil { - g.cacheGroupMtx.Lock() - defer g.cacheGroupMtx.Unlock() - delete(g.cacheGroupMemberUserIDs, groupID) - return nil, err - } - g.cacheGroupMtx.Lock() - defer g.cacheGroupMtx.Unlock() - if remoteHash == 0 { - delete(g.cacheGroupMemberUserIDs, groupID) - return []string{}, nil - } - localCache, ok := g.cacheGroupMemberUserIDs[groupID] - if ok && localCache.MemberListHash == remoteHash { - return localCache.UserIDs, nil - } - groupMemberIDsRemote, err := g.GetGroupMemberIDs(ctx, groupID) - if err != nil { - return nil, err - } - g.cacheGroupMemberUserIDs[groupID] = &localcache.GroupMemberIDsHash{ - MemberListHash: remoteHash, - UserIDs: groupMemberIDsRemote, - } - return groupMemberIDsRemote, nil -} - // JoinedGroups func (g *GroupCache) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) { getJoinedGroupIDList := func() (string, error) { diff --git a/pkg/common/db/cache/redis.go b/pkg/common/db/cache/redis.go index 3f7488b15..6f090228a 100644 --- a/pkg/common/db/cache/redis.go +++ b/pkg/common/db/cache/redis.go @@ -324,9 +324,8 @@ func (r *RedisClient) HandleSignalInfo(operationID string, msg *pbCommon.MsgData return false, nil } case *pbRtc.SignalReq_HungUp, *pbRtc.SignalReq_Cancel, *pbRtc.SignalReq_Reject, *pbRtc.SignalReq_Accept: - return false, errors.New("signalInfo do not need offlinePush") + return false, nil default: - log2.NewDebug(operationID, utils.GetSelfFuncName(), "req invalid type", string(msg.Content)) return false, nil } if isInviteSignal { diff --git a/pkg/common/db/cache/user.go b/pkg/common/db/cache/user.go index 7c8526121..d9ba1e2c8 100644 --- a/pkg/common/db/cache/user.go +++ b/pkg/common/db/cache/user.go @@ -2,32 +2,35 @@ package cache import ( "Open_IM/pkg/common/db/relation" + "Open_IM/pkg/common/db/table" "Open_IM/pkg/common/tracelog" "Open_IM/pkg/utils" "context" "encoding/json" "github.com/dtm-labs/rockscache" "github.com/go-redis/redis/v8" + "strconv" "time" ) const ( - UserExpireTime = time.Second * 60 * 60 * 12 - userInfoKey = "USER_INFO:" + userExpireTime = time.Second * 60 * 60 * 12 + userInfoKey = "USER_INFO:" + userGlobalRecvMsgOptKey = "USER_GLOBAL_RECV_MSG_OPT_KEY:" ) type UserCache struct { - userDB *relation.User + userDB *relation.UserGorm expireTime time.Duration redisClient *RedisClient rcClient *rockscache.Client } -func NewUserCache(rdb redis.UniversalClient, userDB *relation.User, options rockscache.Options) *UserCache { +func NewUserCache(rdb redis.UniversalClient, userDB *relation.UserGorm, options rockscache.Options) *UserCache { return &UserCache{ userDB: userDB, - expireTime: UserExpireTime, + expireTime: userExpireTime, redisClient: NewRedisClient(rdb), rcClient: rockscache.NewClient(rdb, options), } @@ -37,7 +40,11 @@ func (u *UserCache) getUserInfoKey(userID string) string { return userInfoKey + userID } -func (u *UserCache) GetUserInfo(ctx context.Context, userID string) (userInfo *relation.User, err error) { +func (u *UserCache) getUserGlobalRecvMsgOptKey(userID string) string { + return userGlobalRecvMsgOptKey + userID +} + +func (u *UserCache) GetUserInfo(ctx context.Context, userID string) (userInfo *table.UserModel, err error) { getUserInfo := func() (string, error) { userInfo, err := u.userDB.Take(ctx, userID) if err != nil { @@ -52,17 +59,17 @@ func (u *UserCache) GetUserInfo(ctx context.Context, userID string) (userInfo *r defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "userInfo", *userInfo) }() - userInfoStr, err := u.rcClient.Fetch(u.getUserInfoKey(userID), time.Second*30*60, getUserInfo) + userInfoStr, err := u.rcClient.Fetch(u.getUserInfoKey(userID), u.expireTime, getUserInfo) if err != nil { return nil, err } - userInfo = &relation.User{} + userInfo = &table.UserModel{} err = json.Unmarshal([]byte(userInfoStr), userInfo) return userInfo, utils.Wrap(err, "") } -func (u *UserCache) GetUsersInfo(ctx context.Context, userIDs []string) ([]*relation.User, error) { - var users []*relation.User +func (u *UserCache) GetUsersInfo(ctx context.Context, userIDs []string) ([]*table.UserModel, error) { + var users []*table.UserModel for _, userID := range userIDs { user, err := GetUserInfoFromCache(ctx, userID) if err != nil { @@ -77,7 +84,7 @@ func (u *UserCache) DelUserInfo(ctx context.Context, userID string) (err error) defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID) }() - return u.rcClient.TagAsDeleted(u.getUserInfoKey(userID) + userID) + return u.rcClient.TagAsDeleted(u.getUserInfoKey(userID)) } func (u *UserCache) DelUsersInfo(ctx context.Context, userIDs []string) (err error) { @@ -88,3 +95,28 @@ func (u *UserCache) DelUsersInfo(ctx context.Context, userIDs []string) (err err } return nil } + +func (u *UserCache) GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) { + getUserGlobalRecvMsgOpt := func() (string, error) { + userInfo, err := u.userDB.Take(ctx, userID) + if err != nil { + return "", err + } + return strconv.Itoa(int(userInfo.GlobalRecvMsgOpt)), nil + } + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "opt", opt) + }() + optStr, err := u.rcClient.Fetch(u.getUserInfoKey(userID), u.expireTime, getUserGlobalRecvMsgOpt) + if err != nil { + return 0, err + } + return strconv.Atoi(optStr) +} + +func (u *UserCache) DelUserGlobalRecvMsgOpt(ctx context.Context, userID string) (err error) { + defer func() { + tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID) + }() + return u.rcClient.TagAsDeleted(u.getUserGlobalRecvMsgOptKey(userID)) +} diff --git a/pkg/common/db/controller/group.go b/pkg/common/db/controller/group.go index f536978c1..e28bf7974 100644 --- a/pkg/common/db/controller/group.go +++ b/pkg/common/db/controller/group.go @@ -180,7 +180,7 @@ type GroupDataBase struct { db *gorm.DB cache *cache.GroupCache - mongoDB *unrelation.SuperGroupMgoDB + mongoDB *unrelation.SuperGroupMongoDriver } func newGroupDatabase(db *gorm.DB, rdb redis.UniversalClient, mgoClient *mongo.Client) GroupDataBaseInterface { @@ -188,19 +188,19 @@ func newGroupDatabase(db *gorm.DB, rdb redis.UniversalClient, mgoClient *mongo.C groupMemberDB := relation.NewGroupMemberDB(db) groupRequestDB := relation.NewGroupRequest(db) newDB := *db - superGroupMgoDB := unrelation.NewSuperGroupMgoDB(mgoClient) + SuperGroupMongoDriver := unrelation.NewSuperGroupMongoDriver(mgoClient) database := &GroupDataBase{ groupDB: groupDB, groupMemberDB: groupMemberDB, groupRequestDB: groupRequestDB, db: &newDB, - cache: cache.NewGroupCache(rdb, groupDB, groupMemberDB, groupRequestDB, superGroupMgoDB, rockscache.Options{ + cache: cache.NewGroupCache(rdb, groupDB, groupMemberDB, groupRequestDB, SuperGroupMongoDriver, rockscache.Options{ RandomExpireAdjustment: 0.2, DisableCacheRead: false, DisableCacheDelete: false, StrongConsistency: true, }), - mongoDB: superGroupMgoDB, + mongoDB: SuperGroupMongoDriver, } return database } diff --git a/pkg/common/db/localcache/conversation.go b/pkg/common/db/localcache/conversation.go new file mode 100644 index 000000000..8a190a8fc --- /dev/null +++ b/pkg/common/db/localcache/conversation.go @@ -0,0 +1,27 @@ +package localcache + +import ( + "Open_IM/pkg/proto/conversation" + "context" + "google.golang.org/grpc" + "sync" +) + +type ConversationLocalCache struct { + lock sync.Mutex + SuperGroupRecvMsgNotNotifyUserIDs map[string][]string + rpc *grpc.ClientConn + conversation conversation.ConversationClient +} + +func NewConversationLocalCache(rpc *grpc.ClientConn) ConversationLocalCache { + return ConversationLocalCache{ + SuperGroupRecvMsgNotNotifyUserIDs: make(map[string][]string, 0), + rpc: rpc, + conversation: conversation.NewConversationClient(rpc), + } +} + +func (g *ConversationLocalCache) GetRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) []string { + return []string{} +} diff --git a/pkg/common/db/relation/chat_log_model.go b/pkg/common/db/relation/chat_log_model.go index 2266c9275..2c702711a 100644 --- a/pkg/common/db/relation/chat_log_model.go +++ b/pkg/common/db/relation/chat_log_model.go @@ -2,6 +2,7 @@ package relation import ( "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/db/table" pbMsg "Open_IM/pkg/proto/msg" server_api_params "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" @@ -10,38 +11,18 @@ import ( "github.com/golang/protobuf/proto" "github.com/jinzhu/copier" "gorm.io/gorm" - "time" ) -type ChatLog struct { - ServerMsgID string `gorm:"column:server_msg_id;primary_key;type:char(64)" json:"serverMsgID"` - ClientMsgID string `gorm:"column:client_msg_id;type:char(64)" json:"clientMsgID"` - SendID string `gorm:"column:send_id;type:char(64);index:send_id,priority:2" json:"sendID"` - RecvID string `gorm:"column:recv_id;type:char(64);index:recv_id,priority:2" json:"recvID"` - SenderPlatformID int32 `gorm:"column:sender_platform_id" json:"senderPlatformID"` - SenderNickname string `gorm:"column:sender_nick_name;type:varchar(255)" json:"senderNickname"` - SenderFaceURL string `gorm:"column:sender_face_url;type:varchar(255);" json:"senderFaceURL"` - SessionType int32 `gorm:"column:session_type;index:session_type,priority:2;index:session_type_alone" json:"sessionType"` - MsgFrom int32 `gorm:"column:msg_from" json:"msgFrom"` - ContentType int32 `gorm:"column:content_type;index:content_type,priority:2;index:content_type_alone" json:"contentType"` - Content string `gorm:"column:content;type:varchar(3000)" json:"content"` - Status int32 `gorm:"column:status" json:"status"` - SendTime time.Time `gorm:"column:send_time;index:sendTime;index:content_type,priority:1;index:session_type,priority:1;index:recv_id,priority:1;index:send_id,priority:1" json:"sendTime"` - CreateTime time.Time `gorm:"column:create_time" json:"createTime"` - Ex string `gorm:"column:ex;type:varchar(1024)" json:"ex"` - DB *gorm.DB +type ChatLogGorm struct { + DB *gorm.DB } -func (ChatLog) TableName() string { - return "chat_logs" +func NewChatLog(db *gorm.DB) *ChatLogGorm { + return &ChatLogGorm{DB: db} } -func NewChatLog(db *gorm.DB) *ChatLog { - return &ChatLog{DB: db} -} - -func (c *ChatLog) Create(msg pbMsg.MsgDataToMQ) error { - chatLog := new(ChatLog) +func (c *ChatLogGorm) Create(msg pbMsg.MsgDataToMQ) error { + chatLog := new(table.ChatLogModel) copier.Copy(chatLog, msg.MsgData) switch msg.MsgData.SessionType { case constant.GroupChatType, constant.SuperGroupChatType: @@ -66,7 +47,7 @@ func (c *ChatLog) Create(msg pbMsg.MsgDataToMQ) error { return c.DB.Create(chatLog).Error } -func (c *ChatLog) GetChatLog(chatLog *ChatLog, pageNumber, showNumber int32, contentTypeList []int32) (int64, []ChatLog, error) { +func (c *ChatLogGorm) GetChatLog(chatLog *table.ChatLogModel, pageNumber, showNumber int32, contentTypeList []int32) (int64, []ChatLogModel, error) { mdb := c.DB.Model(chatLog) if chatLog.SendTime.Unix() > 0 { mdb = mdb.Where("send_time > ? and send_time < ?", chatLog.SendTime, chatLog.SendTime.AddDate(0, 0, 1)) @@ -95,7 +76,7 @@ func (c *ChatLog) GetChatLog(chatLog *ChatLog, pageNumber, showNumber int32, con if err := mdb.Count(&count).Error; err != nil { return 0, nil, err } - var chatLogs []ChatLog + var chatLogs []table.ChatLogModel mdb = mdb.Limit(int(showNumber)).Offset(int(showNumber * (pageNumber - 1))) if err := mdb.Find(&chatLogs).Error; err != nil { return 0, nil, err diff --git a/pkg/common/db/table/relation.go b/pkg/common/db/table/relation.go index 1deaf7869..121539cb4 100644 --- a/pkg/common/db/table/relation.go +++ b/pkg/common/db/table/relation.go @@ -1,9 +1,15 @@ package table import ( + "gorm.io/gorm" "time" ) +const ( + FriendModelTableName = "" + ConversationModelTableName = "" +) + type FriendModel struct { OwnerUserID string `gorm:"column:owner_user_id;primary_key;size:64"` FriendUserID string `gorm:"column:friend_user_id;primary_key;size:64"` @@ -128,7 +134,6 @@ type UserModel struct { CreateTime time.Time `gorm:"column:create_time;index:create_time"` AppMangerLevel int32 `gorm:"column:app_manger_level"` GlobalRecvMsgOpt int32 `gorm:"column:global_recv_msg_opt"` - status int32 `gorm:"column:status"` } type BlackModel struct { @@ -139,3 +144,22 @@ type BlackModel struct { OperatorUserID string `gorm:"column:operator_user_id;size:64"` Ex string `gorm:"column:ex;size:1024"` } + +type ChatLogModel struct { + ServerMsgID string `gorm:"column:server_msg_id;primary_key;type:char(64)" json:"serverMsgID"` + ClientMsgID string `gorm:"column:client_msg_id;type:char(64)" json:"clientMsgID"` + SendID string `gorm:"column:send_id;type:char(64);index:send_id,priority:2" json:"sendID"` + RecvID string `gorm:"column:recv_id;type:char(64);index:recv_id,priority:2" json:"recvID"` + SenderPlatformID int32 `gorm:"column:sender_platform_id" json:"senderPlatformID"` + SenderNickname string `gorm:"column:sender_nick_name;type:varchar(255)" json:"senderNickname"` + SenderFaceURL string `gorm:"column:sender_face_url;type:varchar(255);" json:"senderFaceURL"` + SessionType int32 `gorm:"column:session_type;index:session_type,priority:2;index:session_type_alone" json:"sessionType"` + MsgFrom int32 `gorm:"column:msg_from" json:"msgFrom"` + ContentType int32 `gorm:"column:content_type;index:content_type,priority:2;index:content_type_alone" json:"contentType"` + Content string `gorm:"column:content;type:varchar(3000)" json:"content"` + Status int32 `gorm:"column:status" json:"status"` + SendTime time.Time `gorm:"column:send_time;index:sendTime;index:content_type,priority:1;index:session_type,priority:1;index:recv_id,priority:1;index:send_id,priority:1" json:"sendTime"` + CreateTime time.Time `gorm:"column:create_time" json:"createTime"` + Ex string `gorm:"column:ex;type:varchar(1024)" json:"ex"` + DB *gorm.DB +} diff --git a/pkg/common/db/table/unrelation.go b/pkg/common/db/table/unrelation.go index e79d4f4bb..911c681f3 100644 --- a/pkg/common/db/table/unrelation.go +++ b/pkg/common/db/table/unrelation.go @@ -1,48 +1,84 @@ package table -type SuperGroup struct { +import ( + "strconv" + "strings" +) + +const ( + CSuperGroup = "super_group" + CUserToSuperGroup = "user_to_super_group" + CTag = "tag" + CSendLog = "send_log" + CWorkMoment = "work_moment" + CExtendMsgSet = "extend_msgs" + + ExtendMsgMaxNum = 100 +) + +type SuperGroupModel struct { GroupID string `bson:"group_id" json:"groupID"` MemberIDList []string `bson:"member_id_list" json:"memberIDList"` } -type UserToSuperGroup struct { +func (SuperGroupModel) TableName() string { + return CSuperGroup +} + +type UserToSuperGroupModel struct { UserID string `bson:"user_id" json:"userID"` GroupIDList []string `bson:"group_id_list" json:"groupIDList"` } -type Tag struct { +func (UserToSuperGroupModel) TableName() string { + return CUserToSuperGroup +} + +type TagModel struct { UserID string `bson:"user_id"` TagID string `bson:"tag_id"` TagName string `bson:"tag_name"` UserList []string `bson:"user_list"` } -type CommonUser struct { +func (TagModel) TableName() string { + return CTag +} + +type CommonUserModel struct { UserID string `bson:"user_id"` UserName string `bson:"user_name"` } -type TagSendLog struct { - UserList []CommonUser `bson:"tag_list"` - SendID string `bson:"send_id"` - SenderPlatformID int32 `bson:"sender_platform_id"` - Content string `bson:"content"` - SendTime int64 `bson:"send_time"` +type TagSendLogModel struct { + UserList []CommonUserModel `bson:"tag_list"` + SendID string `bson:"send_id"` + SenderPlatformID int32 `bson:"sender_platform_id"` + Content string `bson:"content"` + SendTime int64 `bson:"send_time"` +} + +func (TagSendLogModel) TableName() string { + return CSendLog } type WorkMoment struct { - WorkMomentID string `bson:"work_moment_id"` - UserID string `bson:"user_id"` - UserName string `bson:"user_name"` - FaceURL string `bson:"face_url"` - Content string `bson:"content"` - LikeUserList []*CommonUser `bson:"like_user_list"` - AtUserList []*CommonUser `bson:"at_user_list"` - PermissionUserList []*CommonUser `bson:"permission_user_list"` - Comments []*CommonUser `bson:"comments"` - PermissionUserIDList []string `bson:"permission_user_id_list"` - Permission int32 `bson:"permission"` - CreateTime int32 `bson:"create_time"` + WorkMomentID string `bson:"work_moment_id"` + UserID string `bson:"user_id"` + UserName string `bson:"user_name"` + FaceURL string `bson:"face_url"` + Content string `bson:"content"` + LikeUserList []*CommonUserModel `bson:"like_user_list"` + AtUserList []*CommonUserModel `bson:"at_user_list"` + PermissionUserList []*CommonUserModel `bson:"permission_user_list"` + Comments []*CommonUserModel `bson:"comments"` + PermissionUserIDList []string `bson:"permission_user_id_list"` + Permission int32 `bson:"permission"` + CreateTime int32 `bson:"create_time"` +} + +func (WorkMoment) TableName() string { + return CWorkMoment } type Comment struct { @@ -54,3 +90,44 @@ type Comment struct { Content string `bson:"content" json:"content"` CreateTime int32 `bson:"create_time" json:"create_time"` } + +type ExtendMsgSet struct { + SourceID string `bson:"source_id" json:"sourceID"` + SessionType int32 `bson:"session_type" json:"sessionType"` + ExtendMsgs map[string]ExtendMsg `bson:"extend_msgs" json:"extendMsgs"` + ExtendMsgNum int32 `bson:"extend_msg_num" json:"extendMsgNum"` + CreateTime int64 `bson:"create_time" json:"createTime"` // this block's create time + MaxMsgUpdateTime int64 `bson:"max_msg_update_time" json:"maxMsgUpdateTime"` // index find msg +} + +type KeyValue struct { + TypeKey string `bson:"type_key" json:"typeKey"` + Value string `bson:"value" json:"value"` + LatestUpdateTime int64 `bson:"latest_update_time" json:"latestUpdateTime"` +} + +type ExtendMsg struct { + ReactionExtensionList map[string]KeyValue `bson:"reaction_extension_list" json:"reactionExtensionList"` + ClientMsgID string `bson:"client_msg_id" json:"clientMsgID"` + MsgFirstModifyTime int64 `bson:"msg_first_modify_time" json:"msgFirstModifyTime"` // this extendMsg create time + AttachedInfo string `bson:"attached_info" json:"attachedInfo"` + Ex string `bson:"ex" json:"ex"` +} + +func (ExtendMsgSet) TableName() string { + return CExtendMsgSet +} + +func (ExtendMsgSet) GetExtendMsgMaxNum() int32 { + return ExtendMsgMaxNum +} + +func (ExtendMsgSet) GetSourceID(ID string, index int32) string { + return ID + ":" + strconv.Itoa(int(index)) +} + +func (e *ExtendMsgSet) SplitSourceIDAndGetIndex() int32 { + l := strings.Split(e.SourceID, ":") + index, _ := strconv.Atoi(l[len(l)-1]) + return int32(index) +} diff --git a/pkg/common/db/unrelation/extend_msg_mongo_model.go b/pkg/common/db/unrelation/extend_msg_mongo_model.go index b9a9040d5..3eb3474f4 100644 --- a/pkg/common/db/unrelation/extend_msg_mongo_model.go +++ b/pkg/common/db/unrelation/extend_msg_mongo_model.go @@ -1,67 +1,29 @@ package unrelation import ( - "Open_IM/pkg/common/config" - "Open_IM/pkg/common/db" + "Open_IM/pkg/common/db/table" server_api_params "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" "context" "errors" "fmt" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" - "strconv" - "strings" - "time" - - "go.mongodb.org/mongo-driver/bson" ) -const cExtendMsgSet = "extend_msgs" -const MaxNum = 100 - -type ExtendMsgSet struct { - SourceID string `bson:"source_id" json:"sourceID"` - SessionType int32 `bson:"session_type" json:"sessionType"` - ExtendMsgs map[string]ExtendMsg `bson:"extend_msgs" json:"extendMsgs"` - ExtendMsgNum int32 `bson:"extend_msg_num" json:"extendMsgNum"` - CreateTime int64 `bson:"create_time" json:"createTime"` // this block's create time - MaxMsgUpdateTime int64 `bson:"max_msg_update_time" json:"maxMsgUpdateTime"` // index find msg +type ExtendMsgSetMongoDriver struct { + mgoDB *mongo.Database + ExtendMsgSetCollection *mongo.Collection } -type KeyValue struct { - TypeKey string `bson:"type_key" json:"typeKey"` - Value string `bson:"value" json:"value"` - LatestUpdateTime int64 `bson:"latest_update_time" json:"latestUpdateTime"` +func NewExtendMsgSetMongoDriver(mgoDB *mongo.Database) *ExtendMsgSetMongoDriver { + return &ExtendMsgSetMongoDriver{mgoDB: mgoDB, ExtendMsgSetCollection: mgoDB.Collection(table.CExtendMsgSet)} } -type ExtendMsg struct { - ReactionExtensionList map[string]KeyValue `bson:"reaction_extension_list" json:"reactionExtensionList"` - ClientMsgID string `bson:"client_msg_id" json:"clientMsgID"` - MsgFirstModifyTime int64 `bson:"msg_first_modify_time" json:"msgFirstModifyTime"` // this extendMsg create time - AttachedInfo string `bson:"attached_info" json:"attachedInfo"` - Ex string `bson:"ex" json:"ex"` -} - -func GetExtendMsgMaxNum() int32 { - return MaxNum -} - -func GetExtendMsgSourceID(ID string, index int32) string { - return ID + ":" + strconv.Itoa(int(index)) -} - -func SplitSourceIDAndGetIndex(sourceID string) int32 { - l := strings.Split(sourceID, ":") - index, _ := strconv.Atoi(l[len(l)-1]) - return int32(index) -} - -func (d *db.DataBases) CreateExtendMsgSet(set *ExtendMsgSet) error { - ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) - c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) - _, err := c.InsertOne(ctx, set) +func (e *ExtendMsgSetMongoDriver) CreateExtendMsgSet(ctx context.Context, set *table.ExtendMsgSet) error { + _, err := e.ExtendMsgSetCollection.InsertOne(ctx, set) return err } @@ -69,9 +31,7 @@ type GetAllExtendMsgSetOpts struct { ExcludeExtendMsgs bool } -func (d *db.DataBases) GetAllExtendMsgSet(ID string, opts *GetAllExtendMsgSetOpts) (sets []*ExtendMsgSet, err error) { - ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) - c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) +func (e *ExtendMsgSetMongoDriver) GetAllExtendMsgSet(ctx context.Context, ID string, opts *GetAllExtendMsgSetOpts) (sets []*table.ExtendMsgSet, err error) { regex := fmt.Sprintf("^%s", ID) var findOpts *options.FindOptions if opts != nil { @@ -80,7 +40,7 @@ func (d *db.DataBases) GetAllExtendMsgSet(ID string, opts *GetAllExtendMsgSetOpt findOpts.SetProjection(bson.M{"extend_msgs": 0}) } } - cursor, err := c.Find(ctx, bson.M{"uid": primitive.Regex{Pattern: regex}}, findOpts) + cursor, err := e.ExtendMsgSetCollection.Find(ctx, bson.M{"uid": primitive.Regex{Pattern: regex}}, findOpts) if err != nil { return nil, utils.Wrap(err, "") } @@ -91,20 +51,19 @@ func (d *db.DataBases) GetAllExtendMsgSet(ID string, opts *GetAllExtendMsgSetOpt return sets, nil } -func (d *db.DataBases) GetExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, maxMsgUpdateTime int64, c *mongo.Collection) (*ExtendMsgSet, error) { - regex := fmt.Sprintf("^%s", sourceID) +func (e *ExtendMsgSetMongoDriver) GetExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, maxMsgUpdateTime int64) (*table.ExtendMsgSet, error) { var err error findOpts := options.Find().SetLimit(1).SetSkip(0).SetSort(bson.M{"source_id": -1}).SetProjection(bson.M{"extend_msgs": 0}) // update newest - find := bson.M{"source_id": primitive.Regex{Pattern: regex}, "session_type": sessionType} + find := bson.M{"source_id": primitive.Regex{Pattern: fmt.Sprintf("^%s", sourceID)}, "session_type": sessionType} if maxMsgUpdateTime > 0 { find["max_msg_update_time"] = maxMsgUpdateTime } - result, err := c.Find(ctx, find, findOpts) + result, err := e.ExtendMsgSetCollection.Find(ctx, find, findOpts) if err != nil { return nil, utils.Wrap(err, "") } - var setList []ExtendMsgSet + var setList []table.ExtendMsgSet if err := result.All(ctx, &setList); err != nil { return nil, utils.Wrap(err, "") } @@ -115,36 +74,32 @@ func (d *db.DataBases) GetExtendMsgSet(ctx context.Context, sourceID string, ses } // first modify msg -func (d *db.DataBases) InsertExtendMsg(sourceID string, sessionType int32, msg *ExtendMsg) error { - ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) - c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) - set, err := d.GetExtendMsgSet(ctx, sourceID, sessionType, 0, c) +func (e *ExtendMsgSetMongoDriver) InsertExtendMsg(ctx context.Context, sourceID string, sessionType int32, msg *table.ExtendMsg) error { + set, err := e.GetExtendMsgSet(ctx, sourceID, sessionType, 0) if err != nil { return utils.Wrap(err, "") } - if set == nil || set.ExtendMsgNum >= GetExtendMsgMaxNum() { + if set == nil || set.ExtendMsgNum >= set.GetExtendMsgMaxNum() { var index int32 if set != nil { - index = SplitSourceIDAndGetIndex(set.SourceID) + index = set.SplitSourceIDAndGetIndex() } - err = d.CreateExtendMsgSet(&ExtendMsgSet{ - SourceID: GetExtendMsgSourceID(sourceID, index), + err = e.CreateExtendMsgSet(ctx, &table.ExtendMsgSet{ + SourceID: set.GetSourceID(sourceID, index), SessionType: sessionType, - ExtendMsgs: map[string]ExtendMsg{msg.ClientMsgID: *msg}, + ExtendMsgs: map[string]table.ExtendMsg{msg.ClientMsgID: *msg}, ExtendMsgNum: 1, CreateTime: msg.MsgFirstModifyTime, MaxMsgUpdateTime: msg.MsgFirstModifyTime, }) } else { - _, err = c.UpdateOne(ctx, bson.M{"source_id": set.SourceID, "session_type": sessionType}, bson.M{"$set": bson.M{"max_msg_update_time": msg.MsgFirstModifyTime, "$inc": bson.M{"extend_msg_num": 1}, fmt.Sprintf("extend_msgs.%s", msg.ClientMsgID): msg}}) + _, err = e.ExtendMsgSetCollection.UpdateOne(ctx, bson.M{"source_id": set.SourceID, "session_type": sessionType}, bson.M{"$set": bson.M{"max_msg_update_time": msg.MsgFirstModifyTime, "$inc": bson.M{"extend_msg_num": 1}, fmt.Sprintf("extend_msgs.%s", msg.ClientMsgID): msg}}) } return utils.Wrap(err, "") } // insert or update -func (d *db.DataBases) InsertOrUpdateReactionExtendMsgSet(sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*server_api_params.KeyValue) error { - ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) - c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) +func (e *ExtendMsgSetMongoDriver) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*server_api_params.KeyValue) error { var updateBson = bson.M{} for _, v := range reactionExtensionList { updateBson[fmt.Sprintf("extend_msgs.%s.%s", clientMsgID, v.TypeKey)] = v @@ -153,46 +108,42 @@ func (d *db.DataBases) InsertOrUpdateReactionExtendMsgSet(sourceID string, sessi opt := &options.UpdateOptions{ Upsert: &upsert, } - set, err := d.GetExtendMsgSet(ctx, sourceID, sessionType, msgFirstModifyTime, c) + set, err := e.GetExtendMsgSet(ctx, sourceID, sessionType, msgFirstModifyTime) if err != nil { return utils.Wrap(err, "") } if set == nil { return errors.New(fmt.Sprintf("sourceID %s has no set", sourceID)) } - _, err = c.UpdateOne(ctx, bson.M{"source_id": set.SourceID, "session_type": sessionType}, bson.M{"$set": updateBson}, opt) + _, err = e.ExtendMsgSetCollection.UpdateOne(ctx, bson.M{"source_id": set.SourceID, "session_type": sessionType}, bson.M{"$set": updateBson}, opt) return utils.Wrap(err, "") } // delete TypeKey -func (d *db.DataBases) DeleteReactionExtendMsgSet(sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*server_api_params.KeyValue) error { - ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) - c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) +func (e *ExtendMsgSetMongoDriver) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*server_api_params.KeyValue) error { var updateBson = bson.M{} for _, v := range reactionExtensionList { updateBson[fmt.Sprintf("extend_msgs.%s.%s", clientMsgID, v.TypeKey)] = "" } - set, err := d.GetExtendMsgSet(ctx, sourceID, sessionType, msgFirstModifyTime, c) + set, err := e.GetExtendMsgSet(ctx, sourceID, sessionType, msgFirstModifyTime) if err != nil { return utils.Wrap(err, "") } if set == nil { return errors.New(fmt.Sprintf("sourceID %s has no set", sourceID)) } - _, err = c.UpdateOne(ctx, bson.M{"source_id": set.SourceID, "session_type": sessionType}, bson.M{"$unset": updateBson}) + _, err = e.ExtendMsgSetCollection.UpdateOne(ctx, bson.M{"source_id": set.SourceID, "session_type": sessionType}, bson.M{"$unset": updateBson}) return err } -func (d *db.DataBases) GetExtendMsg(sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (extendMsg *ExtendMsg, err error) { - ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) - c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) +func (e *ExtendMsgSetMongoDriver) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (extendMsg *table.ExtendMsg, err error) { findOpts := options.Find().SetLimit(1).SetSkip(0).SetSort(bson.M{"source_id": -1}).SetProjection(bson.M{fmt.Sprintf("extend_msgs.%s", clientMsgID): 1}) regex := fmt.Sprintf("^%s", sourceID) - result, err := c.Find(ctx, bson.M{"source_id": primitive.Regex{Pattern: regex}, "session_type": sessionType, "max_msg_update_time": bson.M{"$lte": maxMsgUpdateTime}}, findOpts) + result, err := e.ExtendMsgSetCollection.Find(ctx, bson.M{"source_id": primitive.Regex{Pattern: regex}, "session_type": sessionType, "max_msg_update_time": bson.M{"$lte": maxMsgUpdateTime}}, findOpts) if err != nil { return nil, utils.Wrap(err, "") } - var setList []ExtendMsgSet + var setList []table.ExtendMsgSet if err := result.All(ctx, &setList); err != nil { return nil, utils.Wrap(err, "") } diff --git a/pkg/common/db/unrelation/init_mongo.go b/pkg/common/db/unrelation/init_mongo.go index 90e359c29..d21fdad1a 100644 --- a/pkg/common/db/unrelation/init_mongo.go +++ b/pkg/common/db/unrelation/init_mongo.go @@ -2,6 +2,7 @@ package unrelation import ( "Open_IM/pkg/common/config" + "Open_IM/pkg/common/db/table" "Open_IM/pkg/utils" "context" "fmt" @@ -13,7 +14,7 @@ import ( ) type Mongo struct { - DB *mongo.Client + db *mongo.Client } func (m *Mongo) InitMongo() { @@ -53,22 +54,22 @@ func (m *Mongo) InitMongo() { panic(err.Error() + " mongo.Connect failed " + uri) } } - m.DB = mongoClient + m.db = mongoClient } func (m *Mongo) GetClient() *mongo.Client { - return m.DB + return m.db } func (m *Mongo) CreateTagIndex() { - if err := m.createMongoIndex(cSendLog, false, "send_id", "-send_time"); err != nil { - panic(err.Error() + " index create failed " + cSendLog + " send_id, -send_time") + if err := m.createMongoIndex(table.CSendLog, false, "send_id", "-send_time"); err != nil { + panic(err.Error() + " index create failed " + table.CSendLog + " send_id, -send_time") } - if err := m.createMongoIndex(cTag, false, "user_id", "-create_time"); err != nil { - panic(err.Error() + "index create failed " + cTag + " user_id, -create_time") + if err := m.createMongoIndex(table.CTag, false, "user_id", "-create_time"); err != nil { + panic(err.Error() + "index create failed " + table.CTag + " user_id, -create_time") } - if err := m.createMongoIndex(cTag, true, "tag_id"); err != nil { - panic(err.Error() + "index create failed " + cTag + " tag_id") + if err := m.createMongoIndex(table.CTag, true, "tag_id"); err != nil { + panic(err.Error() + "index create failed " + table.CTag + " tag_id") } } @@ -79,32 +80,37 @@ func (m *Mongo) CreateMsgIndex() { } func (m *Mongo) CreateSuperGroupIndex() { - if err := m.createMongoIndex(cSuperGroup, true, "group_id"); err != nil { - panic(err.Error() + "index create failed " + cTag + " group_id") + if err := m.createMongoIndex(table.CSuperGroup, true, "group_id"); err != nil { + panic(err.Error() + "index create failed " + table.CTag + " group_id") } - if err := m.createMongoIndex(cUserToSuperGroup, true, "user_id"); err != nil { - panic(err.Error() + "index create failed " + cTag + "user_id") + if err := m.createMongoIndex(table.CUserToSuperGroup, true, "user_id"); err != nil { + panic(err.Error() + "index create failed " + table.CTag + "user_id") } } func (m *Mongo) CreateWorkMomentIndex() { - if err := m.createMongoIndex(cWorkMoment, true, "-create_time", "work_moment_id"); err != nil { - panic(err.Error() + "index create failed " + cWorkMoment + " -create_time, work_moment_id") + if err := m.createMongoIndex(table.CWorkMoment, true, "-create_time", "work_moment_id"); err != nil { + panic(err.Error() + "index create failed " + table.CWorkMoment + " -create_time, work_moment_id") } - if err := m.createMongoIndex(cWorkMoment, true, "work_moment_id"); err != nil { - panic(err.Error() + "index create failed " + cWorkMoment + " work_moment_id ") + if err := m.createMongoIndex(table.CWorkMoment, true, "work_moment_id"); err != nil { + panic(err.Error() + "index create failed " + table.CWorkMoment + " work_moment_id ") } - if err := m.createMongoIndex(cWorkMoment, false, "user_id", "-create_time"); err != nil { - panic(err.Error() + "index create failed " + cWorkMoment + "user_id, -create_time") + if err := m.createMongoIndex(table.CWorkMoment, false, "user_id", "-create_time"); err != nil { + panic(err.Error() + "index create failed " + table.CWorkMoment + "user_id, -create_time") + } +} + +func (m *Mongo) CreateExtendMsgSetIndex() { + if err := m.createMongoIndex(table.CExtendMsgSet, true, "-create_time", "work_moment_id"); err != nil { + panic(err.Error() + "index create failed " + table.CWorkMoment + " -create_time, work_moment_id") } } func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...string) error { - db := m.DB.Database(config.Config.Mongo.DBDatabase).Collection(collection) + db := m.db.Database(config.Config.Mongo.DBDatabase).Collection(collection) opts := options.CreateIndexes().SetMaxTime(10 * time.Second) indexView := db.Indexes() keysDoc := bsonx.Doc{} - // create composite indexes for _, key := range keys { if strings.HasPrefix(key, "-") { diff --git a/pkg/common/db/unrelation/office.go b/pkg/common/db/unrelation/office.go index 8bb4e025f..379840450 100644 --- a/pkg/common/db/unrelation/office.go +++ b/pkg/common/db/unrelation/office.go @@ -12,23 +12,19 @@ import ( "time" ) -const cTag = "tag" -const cSendLog = "send_log" -const cWorkMoment = "work_moment" - -type OfficeMgoDB struct { +type OfficeMongoDriver struct { mgoDB *mongo.Database TagCollection *mongo.Collection TagSendLogCollection *mongo.Collection WorkMomentCollection *mongo.Collection } -func NewOfficeMgoDB(mgoDB *mongo.Database) *OfficeMgoDB { - return &OfficeMgoDB{mgoDB: mgoDB, TagCollection: mgoDB.Collection(cTag), TagSendLogCollection: mgoDB.Collection(cSendLog), WorkMomentCollection: mgoDB.Collection(cSendLog)} +func NewOfficeMongoDriver(mgoDB *mongo.Database) *OfficeMongoDriver { + return &OfficeMongoDriver{mgoDB: mgoDB, TagCollection: mgoDB.Collection(table.CTag), TagSendLogCollection: mgoDB.Collection(table.CSendLog), WorkMomentCollection: mgoDB.Collection(table.CSendLog)} } -func (db *OfficeMgoDB) GetUserTags(ctx context.Context, userID string) ([]table.Tag, error) { - var tags []table.Tag +func (db *OfficeMongoDriver) GetUserTags(ctx context.Context, userID string) ([]table.TagModel, error) { + var tags []table.TagModel cursor, err := db.TagCollection.Find(ctx, bson.M{"user_id": userID}) if err != nil { return tags, err @@ -39,9 +35,9 @@ func (db *OfficeMgoDB) GetUserTags(ctx context.Context, userID string) ([]table. return tags, nil } -func (db *OfficeMgoDB) CreateTag(ctx context.Context, userID, tagName string, userList []string) error { +func (db *OfficeMongoDriver) CreateTag(ctx context.Context, userID, tagName string, userList []string) error { tagID := generateTagID(tagName, userID) - tag := table.Tag{ + tag := table.TagModel{ UserID: userID, TagID: tagID, TagName: tagName, @@ -51,19 +47,19 @@ func (db *OfficeMgoDB) CreateTag(ctx context.Context, userID, tagName string, us return err } -func (db *OfficeMgoDB) GetTagByID(ctx context.Context, userID, tagID string) (table.Tag, error) { - var tag table.Tag +func (db *OfficeMongoDriver) GetTagByID(ctx context.Context, userID, tagID string) (table.TagModel, error) { + var tag table.TagModel err := db.TagCollection.FindOne(ctx, bson.M{"user_id": userID, "tag_id": tagID}).Decode(&tag) return tag, err } -func (db *OfficeMgoDB) DeleteTag(ctx context.Context, userID, tagID string) error { +func (db *OfficeMongoDriver) DeleteTag(ctx context.Context, userID, tagID string) error { _, err := db.TagCollection.DeleteOne(ctx, bson.M{"user_id": userID, "tag_id": tagID}) return err } -func (db *OfficeMgoDB) SetTag(ctx context.Context, userID, tagID, newName string, increaseUserIDList []string, reduceUserIDList []string) error { - var tag table.Tag +func (db *OfficeMongoDriver) SetTag(ctx context.Context, userID, tagID, newName string, increaseUserIDList []string, reduceUserIDList []string) error { + var tag table.TagModel if err := db.TagCollection.FindOne(ctx, bson.M{"tag_id": tagID, "user_id": userID}).Decode(&tag); err != nil { return err } @@ -95,19 +91,19 @@ func (db *OfficeMgoDB) SetTag(ctx context.Context, userID, tagID, newName string return nil } -func (db *OfficeMgoDB) GetUserIDListByTagID(ctx context.Context, userID, tagID string) ([]string, error) { - var tag table.Tag +func (db *OfficeMongoDriver) GetUserIDListByTagID(ctx context.Context, userID, tagID string) ([]string, error) { + var tag table.TagModel err := db.TagCollection.FindOne(ctx, bson.M{"user_id": userID, "tag_id": tagID}).Decode(&tag) return tag.UserList, err } -func (db *OfficeMgoDB) SaveTagSendLog(ctx context.Context, tagSendLog *table.TagSendLog) error { +func (db *OfficeMongoDriver) SaveTagSendLog(ctx context.Context, tagSendLog *table.TagSendLogModel) error { _, err := db.TagSendLogCollection.InsertOne(ctx, tagSendLog) return err } -func (db *OfficeMgoDB) GetTagSendLogs(ctx context.Context, userID string, showNumber, pageNumber int32) ([]table.TagSendLog, error) { - var tagSendLogs []table.TagSendLog +func (db *OfficeMongoDriver) GetTagSendLogs(ctx context.Context, userID string, showNumber, pageNumber int32) ([]table.TagSendLogModel, error) { + var tagSendLogs []table.TagSendLogModel findOpts := options.Find().SetLimit(int64(showNumber)).SetSkip(int64(showNumber) * (int64(pageNumber) - 1)).SetSort(bson.M{"send_time": -1}) cursor, err := db.TagSendLogCollection.Find(ctx, bson.M{"send_id": userID}, findOpts) if err != nil { @@ -117,7 +113,7 @@ func (db *OfficeMgoDB) GetTagSendLogs(ctx context.Context, userID string, showNu return tagSendLogs, err } -func (db *OfficeMgoDB) CreateOneWorkMoment(ctx context.Context, workMoment *table.WorkMoment) error { +func (db *OfficeMongoDriver) CreateOneWorkMoment(ctx context.Context, workMoment *table.WorkMoment) error { workMomentID := generateWorkMomentID(workMoment.UserID) workMoment.WorkMomentID = workMomentID workMoment.CreateTime = int32(time.Now().Unix()) @@ -125,12 +121,12 @@ func (db *OfficeMgoDB) CreateOneWorkMoment(ctx context.Context, workMoment *tabl return err } -func (db *OfficeMgoDB) DeleteOneWorkMoment(ctx context.Context, workMomentID string) error { +func (db *OfficeMongoDriver) DeleteOneWorkMoment(ctx context.Context, workMomentID string) error { _, err := db.WorkMomentCollection.DeleteOne(ctx, bson.M{"work_moment_id": workMomentID}) return err } -func (db *OfficeMgoDB) DeleteComment(ctx context.Context, workMomentID, contentID, opUserID string) error { +func (db *OfficeMongoDriver) DeleteComment(ctx context.Context, workMomentID, contentID, opUserID string) error { _, err := db.WorkMomentCollection.UpdateOne(ctx, bson.D{{"work_moment_id", workMomentID}, {"$or", bson.A{ bson.D{{"user_id", opUserID}}, @@ -140,13 +136,13 @@ func (db *OfficeMgoDB) DeleteComment(ctx context.Context, workMomentID, contentI return err } -func (db *OfficeMgoDB) GetWorkMomentByID(ctx context.Context, workMomentID string) (*table.WorkMoment, error) { +func (db *OfficeMongoDriver) GetWorkMomentByID(ctx context.Context, workMomentID string) (*table.WorkMoment, error) { workMoment := &table.WorkMoment{} err := db.WorkMomentCollection.FindOne(ctx, bson.M{"work_moment_id": workMomentID}).Decode(workMoment) return workMoment, err } -func (db *OfficeMgoDB) LikeOneWorkMoment(ctx context.Context, likeUserID, userName, workMomentID string) (*table.WorkMoment, bool, error) { +func (db *OfficeMongoDriver) LikeOneWorkMoment(ctx context.Context, likeUserID, userName, workMomentID string) (*table.WorkMoment, bool, error) { workMoment, err := db.GetWorkMomentByID(ctx, workMomentID) if err != nil { return nil, false, err @@ -159,24 +155,24 @@ func (db *OfficeMgoDB) LikeOneWorkMoment(ctx context.Context, likeUserID, userNa } } if !isAlreadyLike { - workMoment.LikeUserList = append(workMoment.LikeUserList, &table.CommonUser{UserID: likeUserID, UserName: userName}) + workMoment.LikeUserList = append(workMoment.LikeUserList, &table.CommonUserModel{UserID: likeUserID, UserName: userName}) } _, err = db.WorkMomentCollection.UpdateOne(ctx, bson.M{"work_moment_id": workMomentID}, bson.M{"$set": bson.M{"like_user_list": workMoment.LikeUserList}}) return workMoment, !isAlreadyLike, err } -func (db *OfficeMgoDB) SetUserWorkMomentsLevel(ctx context.Context, userID string, level int32) error { +func (db *OfficeMongoDriver) SetUserWorkMomentsLevel(ctx context.Context, userID string, level int32) error { return nil } -func (db *OfficeMgoDB) CommentOneWorkMoment(ctx context.Context, comment *table.Comment, workMomentID string) (table.WorkMoment, error) { +func (db *OfficeMongoDriver) CommentOneWorkMoment(ctx context.Context, comment *table.Comment, workMomentID string) (table.WorkMoment, error) { comment.ContentID = generateWorkMomentCommentID(workMomentID) var workMoment table.WorkMoment err := db.WorkMomentCollection.FindOneAndUpdate(ctx, bson.M{"work_moment_id": workMomentID}, bson.M{"$push": bson.M{"comments": comment}}).Decode(&workMoment) return workMoment, err } -func (db *OfficeMgoDB) GetUserSelfWorkMoments(ctx context.Context, userID string, showNumber, pageNumber int32) ([]table.WorkMoment, error) { +func (db *OfficeMongoDriver) GetUserSelfWorkMoments(ctx context.Context, userID string, showNumber, pageNumber int32) ([]table.WorkMoment, error) { var workMomentList []table.WorkMoment findOpts := options.Find().SetLimit(int64(showNumber)).SetSkip(int64(showNumber) * (int64(pageNumber) - 1)).SetSort(bson.M{"create_time": -1}) result, err := db.WorkMomentCollection.Find(ctx, bson.M{"user_id": userID}, findOpts) @@ -187,7 +183,7 @@ func (db *OfficeMgoDB) GetUserSelfWorkMoments(ctx context.Context, userID string return workMomentList, err } -func (db *OfficeMgoDB) GetUserWorkMoments(ctx context.Context, opUserID, userID string, showNumber, pageNumber int32, friendIDList []string) ([]table.WorkMoment, error) { +func (db *OfficeMongoDriver) GetUserWorkMoments(ctx context.Context, opUserID, userID string, showNumber, pageNumber int32, friendIDList []string) ([]table.WorkMoment, error) { var workMomentList []table.WorkMoment findOpts := options.Find().SetLimit(int64(showNumber)).SetSkip(int64(showNumber) * (int64(pageNumber) - 1)).SetSort(bson.M{"create_time": -1}) result, err := db.WorkMomentCollection.Find(ctx, bson.D{ // 等价条件: select * from @@ -205,8 +201,8 @@ func (db *OfficeMgoDB) GetUserWorkMoments(ctx context.Context, opUserID, userID return workMomentList, err } -func (db *OfficeMgoDB) GetUserFriendWorkMoments(ctx context.Context, showNumber, pageNumber int32, userID string, friendIDList []string) ([]WorkMoment, error) { - var workMomentList []WorkMoment +func (db *OfficeMongoDriver) GetUserFriendWorkMoments(ctx context.Context, showNumber, pageNumber int32, userID string, friendIDList []string) ([]table.WorkMoment, error) { + var workMomentList []table.WorkMoment findOpts := options.Find().SetLimit(int64(showNumber)).SetSkip(int64(showNumber) * (int64(pageNumber) - 1)).SetSort(bson.M{"create_time": -1}) var filter bson.D permissionFilter := bson.D{ diff --git a/pkg/common/db/unrelation/super_group.go b/pkg/common/db/unrelation/super_group.go index f4d99a25f..82ec1af47 100644 --- a/pkg/common/db/unrelation/super_group.go +++ b/pkg/common/db/unrelation/super_group.go @@ -11,25 +11,20 @@ import ( "go.mongodb.org/mongo-driver/mongo/readconcern" ) -const ( - cSuperGroup = "super_group" - cUserToSuperGroup = "user_to_super_group" -) - -type SuperGroupMgoDB struct { +type SuperGroupMongoDriver struct { MgoClient *mongo.Client MgoDB *mongo.Database superGroupCollection *mongo.Collection userToSuperGroupCollection *mongo.Collection } -func NewSuperGroupMgoDB(mgoClient *mongo.Client) *SuperGroupMgoDB { +func NewSuperGroupMongoDriver(mgoClient *mongo.Client) *SuperGroupMongoDriver { mgoDB := mgoClient.Database(config.Config.Mongo.DBDatabase) - return &SuperGroupMgoDB{MgoDB: mgoDB, MgoClient: mgoClient, superGroupCollection: mgoDB.Collection(cSuperGroup), userToSuperGroupCollection: mgoDB.Collection(cUserToSuperGroup)} + return &SuperGroupMongoDriver{MgoDB: mgoDB, MgoClient: mgoClient, superGroupCollection: mgoDB.Collection(table.CSuperGroup), userToSuperGroupCollection: mgoDB.Collection(table.CUserToSuperGroup)} } -func (db *SuperGroupMgoDB) CreateSuperGroup(sCtx mongo.SessionContext, groupID string, initMemberIDList []string) error { - superGroup := table.SuperGroup{ +func (db *SuperGroupMongoDriver) CreateSuperGroup(sCtx mongo.SessionContext, groupID string, initMemberIDList []string) error { + superGroup := table.SuperGroupModel{ GroupID: groupID, MemberIDList: initMemberIDList, } @@ -51,13 +46,13 @@ func (db *SuperGroupMgoDB) CreateSuperGroup(sCtx mongo.SessionContext, groupID s } -func (db *SuperGroupMgoDB) GetSuperGroup(ctx context.Context, groupID string) (*table.SuperGroup, error) { - superGroup := table.SuperGroup{} +func (db *SuperGroupMongoDriver) GetSuperGroup(ctx context.Context, groupID string) (*table.SuperGroupModel, error) { + superGroup := table.SuperGroupModel{} err := db.superGroupCollection.FindOne(ctx, bson.M{"group_id": groupID}).Decode(&superGroup) return &superGroup, err } -func (db *SuperGroupMgoDB) AddUserToSuperGroup(ctx context.Context, groupID string, userIDList []string) error { +func (db *SuperGroupMongoDriver) AddUserToSuperGroup(ctx context.Context, groupID string, userIDList []string) error { opts := options.Session().SetDefaultReadConcern(readconcern.Majority()) return db.MgoDB.Client().UseSessionWithOptions(ctx, opts, func(sCtx mongo.SessionContext) error { _, err := db.superGroupCollection.UpdateOne(sCtx, bson.M{"group_id": groupID}, bson.M{"$addToSet": bson.M{"member_id_list": bson.M{"$each": userIDList}}}) @@ -80,7 +75,7 @@ func (db *SuperGroupMgoDB) AddUserToSuperGroup(ctx context.Context, groupID stri }) } -func (db *SuperGroupMgoDB) RemoverUserFromSuperGroup(ctx context.Context, groupID string, userIDList []string) error { +func (db *SuperGroupMongoDriver) RemoverUserFromSuperGroup(ctx context.Context, groupID string, userIDList []string) error { opts := options.Session().SetDefaultReadConcern(readconcern.Majority()) return db.MgoDB.Client().UseSessionWithOptions(ctx, opts, func(sCtx mongo.SessionContext) error { _, err := db.superGroupCollection.UpdateOne(sCtx, bson.M{"group_id": groupID}, bson.M{"$pull": bson.M{"member_id_list": bson.M{"$in": userIDList}}}) @@ -97,16 +92,16 @@ func (db *SuperGroupMgoDB) RemoverUserFromSuperGroup(ctx context.Context, groupI }) } -func (db *SuperGroupMgoDB) GetSuperGroupByUserID(ctx context.Context, userID string) (*table.UserToSuperGroup, error) { - var user table.UserToSuperGroup +func (db *SuperGroupMongoDriver) GetSuperGroupByUserID(ctx context.Context, userID string) (*table.UserToSuperGroupModel, error) { + var user table.UserToSuperGroupModel err := db.userToSuperGroupCollection.FindOne(ctx, bson.M{"user_id": userID}).Decode(&user) return &user, utils.Wrap(err, "") } -func (db *SuperGroupMgoDB) DeleteSuperGroup(ctx context.Context, groupID string) error { +func (db *SuperGroupMongoDriver) DeleteSuperGroup(ctx context.Context, groupID string) error { opts := options.Session().SetDefaultReadConcern(readconcern.Majority()) return db.MgoDB.Client().UseSessionWithOptions(ctx, opts, func(sCtx mongo.SessionContext) error { - superGroup := &table.SuperGroup{} + superGroup := &table.SuperGroupModel{} _, err := db.superGroupCollection.DeleteOne(sCtx, bson.M{"group_id": groupID}) if err != nil { _ = sCtx.AbortTransaction(ctx) @@ -120,7 +115,7 @@ func (db *SuperGroupMgoDB) DeleteSuperGroup(ctx context.Context, groupID string) }) } -func (db *SuperGroupMgoDB) RemoveGroupFromUser(sCtx context.Context, groupID string, userIDList []string) error { +func (db *SuperGroupMongoDriver) RemoveGroupFromUser(sCtx context.Context, groupID string, userIDList []string) error { _, err := db.userToSuperGroupCollection.UpdateOne(sCtx, bson.M{"user_id": bson.M{"$in": userIDList}}, bson.M{"$pull": bson.M{"group_id_list": groupID}}) return err } diff --git a/pkg/utils/utils_v2.go b/pkg/utils/utils_v2.go index 342638421..1822f0c89 100644 --- a/pkg/utils/utils_v2.go +++ b/pkg/utils/utils_v2.go @@ -261,6 +261,14 @@ func SortAny[E any](es []E, fn func(a, b E) bool) { }) } +// If true -> a, false -> b +func If[T any](isa bool, a, b T) T { + if isa { + return a + } + return b +} + type sortSlice[E any] struct { ts []E fn func(a, b E) bool