diff --git a/go.mod b/go.mod index 23a71e12f..6e5a93873 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module OpenIM -go 1.16 +go 1.18 require ( firebase.google.com/go v3.13.0+incompatible diff --git a/internal/msgtransfer/modify_msg_handler.go b/internal/msgtransfer/modify_msg_handler.go index 4da679499..27b31c9c3 100644 --- a/internal/msgtransfer/modify_msg_handler.go +++ b/internal/msgtransfer/modify_msg_handler.go @@ -24,7 +24,7 @@ type ModifyMsgConsumerHandler struct { modifyMsgConsumerGroup *kfk.MConsumerGroup extendMsgInterface controller.ExtendMsgInterface - cache cache.Cache + cache cache.MsgCache } func (mmc *ModifyMsgConsumerHandler) Init() { diff --git a/internal/push/fcm/push.go b/internal/push/fcm/push.go index 1a8617c3b..2aa7f7f63 100644 --- a/internal/push/fcm/push.go +++ b/internal/push/fcm/push.go @@ -19,10 +19,10 @@ var Terminal = []int{constant.IOSPlatformID, constant.AndroidPlatformID, constan type Fcm struct { fcmMsgCli *messaging.Client - cache cache.Cache + cache cache.MsgCache } -func NewClient(cache cache.Cache) *Fcm { +func NewClient(cache cache.MsgCache) *Fcm { opt := option.WithCredentialsFile(filepath.Join(config.Root, "config", config.Config.Push.Fcm.ServiceAccount)) fcmApp, err := firebase.NewApp(context.Background(), nil, opt) if err != nil { diff --git a/internal/push/fcm/push_test.go b/internal/push/fcm/push_test.go index 0df2b45da..cc310b30e 100644 --- a/internal/push/fcm/push_test.go +++ b/internal/push/fcm/push_test.go @@ -9,7 +9,7 @@ import ( ) func Test_Push(t *testing.T) { - var redis cache.Cache + var redis cache.MsgCache offlinePusher := NewClient(redis) err := offlinePusher.Push(context.Background(), []string{"userID1"}, "test", "test", &push.Opts{}) assert.Nil(t, err) diff --git a/internal/push/getui/push.go b/internal/push/getui/push.go index 5010cb97e..461956675 100644 --- a/internal/push/getui/push.go +++ b/internal/push/getui/push.go @@ -38,12 +38,12 @@ const ( ) type Client struct { - cache cache.Cache + cache cache.MsgCache tokenExpireTime int64 taskIDTTL int64 } -func NewClient(cache cache.Cache) *Client { +func NewClient(cache cache.MsgCache) *Client { return &Client{cache: cache, tokenExpireTime: tokenExpireTime, taskIDTTL: taskIDTTL} } diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index 080377305..b5b478a12 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -25,7 +25,7 @@ type RPCServer struct { pusher Pusher } -func (r *RPCServer) Init(rpcPort int, cache cache.Cache) { +func (r *RPCServer) Init(rpcPort int, cache cache.MsgCache) { r.rpcPort = rpcPort r.rpcRegisterName = config.Config.RpcRegisterName.OpenImPushName } diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 53b05b572..097257c55 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -25,7 +25,7 @@ import ( ) type Pusher struct { - cache cache.Cache + cache cache.MsgCache client discoveryregistry.SvcDiscoveryRegistry offlinePusher OfflinePusher groupLocalCache localcache.GroupLocalCache @@ -33,7 +33,7 @@ type Pusher struct { successCount int } -func NewPusher(cache cache.Cache, client discoveryregistry.SvcDiscoveryRegistry, offlinePusher OfflinePusher) *Pusher { +func NewPusher(cache cache.MsgCache, client discoveryregistry.SvcDiscoveryRegistry, offlinePusher OfflinePusher) *Pusher { return &Pusher{ cache: cache, client: client, diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go new file mode 100644 index 000000000..0f7320c26 --- /dev/null +++ b/pkg/common/db/cache/msg.go @@ -0,0 +1,475 @@ +package cache + +import ( + "Open_IM/pkg/common/config" + "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/tracelog" + pbChat "Open_IM/pkg/proto/msg" + pbRtc "Open_IM/pkg/proto/rtc" + "Open_IM/pkg/proto/sdkws" + "Open_IM/pkg/utils" + "context" + "errors" + "fmt" + "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" + "strconv" + "time" + + "github.com/go-redis/redis/v8" +) + +const ( + userIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq + appleDeviceToken = "DEVICE_TOKEN" + userMinSeq = "REDIS_USER_MIN_SEQ:" + + getuiToken = "GETUI_TOKEN" + getuiTaskID = "GETUI_TASK_ID" + messageCache = "MESSAGE_CACHE:" + signalCache = "SIGNAL_CACHE:" + signalListCache = "SIGNAL_LIST_CACHE:" + FcmToken = "FCM_TOKEN:" + groupUserMinSeq = "GROUP_USER_MIN_SEQ:" + groupMaxSeq = "GROUP_MAX_SEQ:" + groupMinSeq = "GROUP_MIN_SEQ:" + sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:" + userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:" + exTypeKeyLocker = "EX_LOCK:" + + uidPidToken = "UID_PID_TOKEN_STATUS:" + + SignalListCache = "SIGNAL_LIST_CACHE:" + + SignalCache = "SIGNAL_CACHE:" +) + +type MsgCache interface { + IncrUserSeq(ctx context.Context, userID string) (int64, error) + GetUserMaxSeq(ctx context.Context, userID string) (int64, error) + SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error + SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) + GetUserMinSeq(ctx context.Context, userID string) (int64, error) + + SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) + GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) + GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) + GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) + IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) + SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error + SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error + + AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error + + GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) + + SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error + DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error + GetMessageListBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) + SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error) + DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error + CleanUpOneUserAllMsg(ctx context.Context, userID string) error + HandleSignalInfo(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) + GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) + GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) + DelUserSignalList(ctx context.Context, userID string) error + DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error + + SetGetuiToken(ctx context.Context, token string, expireTime int64) error + GetGetuiToken(ctx context.Context) (string, error) + SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error + GetGetuiTaskID(ctx context.Context) (string, error) + + SetSendMsgStatus(ctx context.Context, id string, status int32) error + GetSendMsgStatus(ctx context.Context, id string) (int32, error) + SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) + GetFcmToken(ctx context.Context, account string, platformID int) (string, error) + DelFcmToken(ctx context.Context, account string, platformID int) error + IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) + SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error + GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) + JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) + GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) + DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error + SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) + GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) + SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error + LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error + UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error +} + +func NewMsgCache(client redis.UniversalClient) MsgCache { + return &msgCache{rdb: client} +} + +type msgCache struct { + rdb redis.UniversalClient +} + +func (m *msgCache) IncrUserSeq(ctx context.Context, userID string) (int64, error) { + return utils.Wrap2(m.rdb.Get(ctx, userIncrSeq+userID).Int64()) +} + +func (m *msgCache) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) { + return utils.Wrap2(m.rdb.Get(ctx, userIncrSeq+userID).Int64()) +} + +func (m *msgCache) SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error { + return utils.Wrap1(m.rdb.Set(ctx, userIncrSeq+userID, maxSeq, 0).Err()) +} + +func (m *msgCache) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { + return utils.Wrap1(m.rdb.Set(ctx, userMinSeq+userID, minSeq, 0).Err()) +} + +func (m *msgCache) GetUserMinSeq(ctx context.Context, userID string) (int64, error) { + return utils.Wrap2(m.rdb.Get(ctx, userMinSeq+userID).Int64()) +} + +func (m *msgCache) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { + key := groupUserMinSeq + "g:" + groupID + "u:" + userID + return utils.Wrap1(m.rdb.Set(ctx, key, minSeq, 0).Err()) +} + +func (m *msgCache) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) { + return utils.Wrap2(m.rdb.Get(ctx, groupMinSeq+groupID).Int64()) +} + +func (m *msgCache) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { + return utils.Wrap2(m.rdb.Get(ctx, groupMaxSeq+groupID).Int64()) +} + +func (m *msgCache) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) { + return utils.Wrap2(m.rdb.Get(ctx, groupMinSeq+groupID).Int64()) +} + +func (m *msgCache) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { + key := groupMaxSeq + groupID + seq, err := m.rdb.Incr(ctx, key).Uint64() + return int64(seq), utils.Wrap1(err) +} + +func (m *msgCache) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error { + key := groupMaxSeq + groupID + return utils.Wrap1(m.rdb.Set(ctx, key, maxSeq, 0).Err()) +} + +func (m *msgCache) SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error { + key := groupMinSeq + groupID + return utils.Wrap1(m.rdb.Set(ctx, key, minSeq, 0).Err()) +} + +func (m *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { + key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) + return utils.Wrap1(m.rdb.HSet(ctx, key, token, flag).Err()) +} + +func (m *msgCache) GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) { + key := uidPidToken + userID + ":" + platformID + m, err := m.rdb.HGetAll(ctx, key).Result() + if err != nil { + return nil, utils.Wrap1(err) + } + mm := make(map[string]int) + for k, v := range m { + mm[k] = utils.StringToInt(v) + } + return mm, nil +} + +func (m *msgCache) SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error { + key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) + mm := make(map[string]interface{}) + for k, v := range m { + mm[k] = v + } + return utils.Wrap1(m.rdb.HSet(ctx, key, mm).Err()) +} + +func (m *msgCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error { + key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) + return utils.Wrap1(m.rdb.HDel(ctx, key, fields...).Err()) +} + +func (m *msgCache) GetMessageListBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) { + var errResult error + for _, v := range seqList { + //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 + key := messageCache + userID + "_" + strconv.Itoa(int(v)) + result, err := m.rdb.Get(ctx, key).Result() + if err != nil { + errResult = err + failedSeqList = append(failedSeqList, v) + } else { + msg := sdkws.MsgData{} + err = jsonpb.UnmarshalString(result, &msg) + if err != nil { + errResult = err + failedSeqList = append(failedSeqList, v) + } else { + seqMsg = append(seqMsg, &msg) + } + + } + } + return seqMsg, failedSeqList, errResult +} + +func (m *msgCache) SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error) { + pipe := m.rdb.Pipeline() + var failedList []pbChat.MsgDataToMQ + for _, msg := range msgList { + key := messageCache + userID + "_" + strconv.Itoa(int(msg.MsgData.Seq)) + s, err := utils.Pb2String(msg.MsgData) + if err != nil { + return 0, utils.Wrap1(err) + } + err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err() + if err != nil { + return 0, utils.Wrap1(err) + } + } + if len(failedList) != 0 { + return len(failedList), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList, tracelog.GetOperationID(ctx))) + } + _, err := pipe.Exec(ctx) + return 0, err +} + +func (m *msgCache) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error { + for _, msg := range msgList { + if err := m.rdb.Del(ctx, messageCache+userID+"_"+strconv.Itoa(int(msg.MsgData.Seq))).Err(); err != nil { + return utils.Wrap1(err) + } + } + return nil +} + +func (m *msgCache) CleanUpOneUserAllMsg(ctx context.Context, userID string) error { + key := messageCache + userID + "_" + "*" + vals, err := m.rdb.Keys(ctx, key).Result() + if err == redis.Nil { + return nil + } + if err != nil { + return utils.Wrap1(err) + } + for _, v := range vals { + if err := m.rdb.Del(ctx, v).Err(); err != nil { + return utils.Wrap1(err) + } + } + return nil +} + +func (m *msgCache) HandleSignalInfo(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) { + req := &pbRtc.SignalReq{} + if err := proto.Unmarshal(msg.Content, req); err != nil { + return false, utils.Wrap1(err) + } + var inviteeUserIDList []string + var isInviteSignal bool + switch signalInfo := req.Payload.(type) { + case *pbRtc.SignalReq_Invite: + inviteeUserIDList = signalInfo.Invite.Invitation.InviteeUserIDList + isInviteSignal = true + case *pbRtc.SignalReq_InviteInGroup: + inviteeUserIDList = signalInfo.InviteInGroup.Invitation.InviteeUserIDList + isInviteSignal = true + if !utils.Contain(pushToUserID, inviteeUserIDList...) { + return false, nil + } + case *pbRtc.SignalReq_HungUp, *pbRtc.SignalReq_Cancel, *pbRtc.SignalReq_Reject, *pbRtc.SignalReq_Accept: + return false, utils.Wrap1(errors.New("signalInfo do not need offlinePush")) + default: + return false, nil + } + if isInviteSignal { + for _, userID := range inviteeUserIDList { + timeout, err := strconv.Atoi(config.Config.Rtc.SignalTimeout) + if err != nil { + return false, utils.Wrap1(err) + } + keyList := SignalListCache + userID + err = m.rdb.LPush(ctx, keyList, msg.ClientMsgID).Err() + if err != nil { + return false, utils.Wrap1(err) + } + err = m.rdb.Expire(ctx, keyList, time.Duration(timeout)*time.Second).Err() + if err != nil { + return false, utils.Wrap1(err) + } + key := SignalCache + msg.ClientMsgID + err = m.rdb.Set(ctx, key, msg.Content, time.Duration(timeout)*time.Second).Err() + if err != nil { + return false, utils.Wrap1(err) + } + } + } + return true, nil +} + +func (m *msgCache) GetSignalInfoFromCacheByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *pbRtc.SignalInviteReq, err error) { + bytes, err := m.rdb.Get(ctx, SignalCache+clientMsgID).Bytes() + if err != nil { + return nil, utils.Wrap1(err) + } + req := &pbRtc.SignalReq{} + if err = proto.Unmarshal(bytes, req); err != nil { + return nil, utils.Wrap1(err) + } + invitationInfo = &pbRtc.SignalInviteReq{} + switch req2 := req.Payload.(type) { + case *pbRtc.SignalReq_Invite: + invitationInfo.Invitation = req2.Invite.Invitation + invitationInfo.OpUserID = req2.Invite.OpUserID + case *pbRtc.SignalReq_InviteInGroup: + invitationInfo.Invitation = req2.InviteInGroup.Invitation + invitationInfo.OpUserID = req2.InviteInGroup.OpUserID + } + return invitationInfo, nil +} + +func (m *msgCache) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *pbRtc.SignalInviteReq, err error) { + key, err := m.rdb.LPop(ctx, SignalListCache+userID).Result() + if err != nil { + return nil, utils.Wrap1(err) + } + invitationInfo, err = m.GetSignalInfoFromCacheByClientMsgID(ctx, key) + if err != nil { + return nil, err + } + return invitationInfo, m.DelUserSignalList(ctx, userID) +} + +func (m *msgCache) DelUserSignalList(ctx context.Context, userID string) error { + return utils.Wrap1(m.rdb.Del(ctx, SignalListCache+userID).Err()) +} + +func (m *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error { + for _, seq := range seqList { + key := messageCache + userID + "_" + strconv.Itoa(int(seq)) + result, err := m.rdb.Get(ctx, key).Result() + if err != nil { + if err == redis.Nil { + continue + } + return utils.Wrap1(err) + } + var msg sdkws.MsgData + if err := jsonpb.UnmarshalString(result, &msg); err != nil { + return err + } + msg.Status = constant.MsgDeleted + s, err := utils.Pb2String(&msg) + if err != nil { + return utils.Wrap1(err) + } + if err := m.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + return utils.Wrap1(err) + } + } + return nil +} + +func (m *msgCache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error { + return utils.Wrap1(m.rdb.Set(ctx, getuiToken, token, time.Duration(expireTime)*time.Second).Err()) +} + +func (m *msgCache) GetGetuiToken(ctx context.Context) (string, error) { + return utils.Wrap2(m.rdb.Get(ctx, getuiToken).Result()) +} + +func (m *msgCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error { + return utils.Wrap1(m.rdb.Set(ctx, getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err()) +} + +func (m *msgCache) GetGetuiTaskID(ctx context.Context) (string, error) { + return utils.Wrap2(m.rdb.Get(ctx, getuiTaskID).Result()) +} + +func (m *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error { + return utils.Wrap1(m.rdb.Set(ctx, sendMsgFailedFlag+id, status, time.Hour*24).Err()) +} + +func (m *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { + result, err := m.rdb.Get(ctx, sendMsgFailedFlag+id).Int() + return int32(result), utils.Wrap1(err) +} + +func (m *msgCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) { + return utils.Wrap1(m.rdb.Set(ctx, FcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err()) +} + +func (m *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) { + return utils.Wrap2(m.rdb.Get(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Result()) +} + +func (m *msgCache) DelFcmToken(ctx context.Context, account string, platformID int) error { + return utils.Wrap1(m.rdb.Del(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Err()) +} + +func (m *msgCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { + seq, err := m.rdb.Incr(ctx, userBadgeUnreadCountSum+userID).Result() + return int(seq), utils.Wrap1(err) +} + +func (m *msgCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error { + return utils.Wrap1(m.rdb.Set(ctx, userBadgeUnreadCountSum+userID, value, 0).Err()) +} + +func (m *msgCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { + return utils.Wrap2(m.rdb.Get(ctx, userBadgeUnreadCountSum+userID).Int()) +} + +func (m *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { + key := exTypeKeyLocker + clientMsgID + "_" + TypeKey + return utils.Wrap1(m.rdb.SetNX(ctx, key, 1, time.Minute).Err()) +} + +func (m *msgCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { + key := exTypeKeyLocker + clientMsgID + "_" + TypeKey + return utils.Wrap1(m.rdb.Del(ctx, key).Err()) +} + +func (m *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string { + switch sessionType { + case constant.SingleChatType: + return "EX_SINGLE_" + clientMsgID + case constant.GroupChatType: + return "EX_GROUP_" + clientMsgID + case constant.SuperGroupChatType: + return "EX_SUPER_GROUP_" + clientMsgID + case constant.NotificationChatType: + return "EX_NOTIFICATION" + clientMsgID + } + return "" +} + +func (m *msgCache) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { + n, err := m.rdb.Exists(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType)).Result() + if err != nil { + return false, utils.Wrap(err, "") + } + return n > 0, nil +} + +func (m *msgCache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error { + return utils.Wrap1(m.rdb.HSet(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err()) +} + +func (m *msgCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { + return utils.Wrap2(m.rdb.Expire(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result()) +} + +func (m *msgCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { + return utils.Wrap2(m.rdb.HGet(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result()) +} + +func (m *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) { + return utils.Wrap2(m.rdb.HGetAll(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()) +} + +func (m *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { + return utils.Wrap1(m.rdb.HDel(ctx, m.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err()) +} diff --git a/pkg/common/db/cache/redis.go b/pkg/common/db/cache/redis.go index 8869eaf59..46ab5db96 100644 --- a/pkg/common/db/cache/redis.go +++ b/pkg/common/db/cache/redis.go @@ -59,7 +59,7 @@ type Cache interface { SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error - GetMessagesBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) + GetMessageListBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) (int, error) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error CleanUpOneUserAllMsg(ctx context.Context, userID string) error diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 08043e5cd..d9d39f283 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -1,20 +1,21 @@ package controller import ( - "OpenIM/pkg/common/constant" - "OpenIM/pkg/common/db/cache" - unRelationTb "OpenIM/pkg/common/db/table/unrelation" - "OpenIM/pkg/common/db/unrelation" - "OpenIM/pkg/common/log" - "OpenIM/pkg/common/prome" - "OpenIM/pkg/common/tracelog" + "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/db/cache" + unRelationTb "Open_IM/pkg/common/db/table/unrelation" + "Open_IM/pkg/common/db/unrelation" + "Open_IM/pkg/common/log" + "Open_IM/pkg/common/prome" + "Open_IM/pkg/common/tracelog" + "fmt" "github.com/gogo/protobuf/sortkeys" "sync" "time" - pbMsg "OpenIM/pkg/proto/msg" - "OpenIM/pkg/proto/sdkws" - "OpenIM/pkg/utils" + pbMsg "Open_IM/pkg/proto/msg" + "Open_IM/pkg/proto/sdkws" + "Open_IM/pkg/utils" "context" "errors" "github.com/go-redis/redis/v8" @@ -23,98 +24,7 @@ import ( "github.com/golang/protobuf/proto" ) -//type MsgInterface interface { -// // 批量插入消息到db -// BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error -// // 刪除redis中消息缓存 -// DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error -// // incrSeq然后批量插入缓存 -// BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) -// // 删除消息 返回不存在的seqList -// DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) -// // 通过seqList获取db中写扩散消息 -// GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) -// // 通过seqList获取大群在db里面的消息 没找到返回错误 -// GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) -// // 删除用户所有消息/cache/db然后重置seq -// CleanUpUserMsg(ctx context.Context, userID string) error -// // 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache) -// DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userID []string, remainTime int64) error -// // 删除用户消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache) -// DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error -// // 获取用户 seq mongo和redis -// GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) -// // 获取群 seq mongo和redis -// GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) -// // 设置群用户最小seq 直接调用cache -// SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) -// // 设置用户最小seq 直接调用cache -// SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) -// -// MsgToMQ(ctx context.Context, key string, data *pbMsg.MsgDataToMQ) (err error) -//} -// -//func NewMsgController(mgo *mongo.Client, rdb redis.UniversalClient) MsgInterface { -// return &MsgController{} -//} -// -//type MsgController struct { -// database MsgDatabase -//} -// -//func (m *MsgController) BatchInsertChat2DB(ctx context.Context, ID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error { -// return m.database.BatchInsertChat2DB(ctx, ID, msgList, currentMaxSeq) -//} -// -//func (m *MsgController) DeleteMessageFromCache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) error { -// return m.database.DeleteMessageFromCache(ctx, sourceID, msgList) -//} -// -//func (m *MsgController) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) { -// return m.database.BatchInsertChat2Cache(ctx, sourceID, msgList) -//} -// -//func (m *MsgController) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) { -// return m.database.DelMsgBySeqs(ctx, userID, seqs) -//} -// -//func (m *MsgController) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { -// return m.database.GetMsgBySeqs(ctx, userID, seqs) -//} -// -//func (m *MsgController) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { -// return m.database.GetSuperGroupMsgBySeqs(ctx, groupID, seqs) -//} -// -//func (m *MsgController) CleanUpUserMsg(ctx context.Context, userID string) error { -// return m.database.CleanUpUserMsg(ctx, userID) -//} -// -//func (m *MsgController) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error { -// return m.database.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, groupID, userIDs, remainTime) -//} -// -//func (m *MsgController) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error { -// return m.database.DeleteUserMsgsAndSetMinSeq(ctx, userID, remainTime) -//} -// -//func (m *MsgController) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) { -// return m.database.GetUserMinMaxSeqInMongoAndCache(ctx, userID) -//} -// -//func (m *MsgController) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) { -// return m.database.GetSuperGroupMinMaxSeqInMongoAndCache(ctx, groupID) -//} -// -//func (m *MsgController) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { -// return m.database.SetGroupUserMinSeq(ctx, groupID, userID, minSeq) -//} -// -//func (m *MsgController) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { -// return m.database.SetUserMinSeq(ctx, userID, minSeq) -//} - -type MsgDatabase interface { +type MsgDatabaseInterface interface { // 批量插入消息 BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error // 刪除redis中消息缓存 @@ -140,12 +50,11 @@ type MsgDatabase interface { GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) // 设置群用户最小seq 直接调用cache SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) - // - GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) // 设置用户最小seq 直接调用cache SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) + SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) @@ -155,105 +64,135 @@ type MsgDatabase interface { GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error - SetSendMsgStatus(ctx context.Context, userID string, status int32) error - GetSendMsgStatus(ctx context.Context, userID string) (int32, error) + SetSendMsgStatus(ctx context.Context, id string, status int32) error + GetSendMsgStatus(ctx context.Context, id string) (int32, error) MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error GetUserMaxSeq(ctx context.Context, userID string) (int64, error) GetUserMinSeq(ctx context.Context, userID string) (int64, error) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) + GetMessageListBySeq(ctx context.Context, userID string, seqs []int64) ([]*sdkws.MsgData, error) } -type msgDatabase struct { - mgo unRelationTb.MsgDocModelInterface - cache cache.Cache - msg unRelationTb.MsgDocModel + +func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabaseInterface { + return &MsgDatabase{} } -func (db *msgDatabase) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { - //TODO implement me - panic("implement me") +type MsgDatabase struct { + mgo unRelationTb.MsgDocModelInterface + cache cache.MsgCache + msg unRelationTb.MsgDocModel + ExtendMsg unRelationTb.ExtendMsgSetModelInterface + rdb redis.Client } -func (db *msgDatabase) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error { - //TODO implement me - panic("implement me") +func (db *MsgDatabase) reactionExtensionList(reactionExtensionList map[string]*sdkws.KeyValue) map[string]unRelationTb.KeyValueModel { + r := make(map[string]unRelationTb.KeyValueModel) + for key, value := range reactionExtensionList { + r[key] = unRelationTb.KeyValueModel{ + TypeKey: value.TypeKey, + Value: value.Value, + LatestUpdateTime: value.LatestUpdateTime, + } + } + return r } -func (db *msgDatabase) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { - //TODO implement me - panic("implement me") +func (db *MsgDatabase) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { + return db.cache.JudgeMessageReactionEXISTS(ctx, clientMsgID, sessionType) } -func (db *msgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) { - //TODO implement me - panic("implement me") +func (db *MsgDatabase) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error { + return db.cache.SetMessageTypeKeyValue(ctx, clientMsgID, sessionType, typeKey, value) } -func (db *msgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error { - //TODO implement me - panic("implement me") +func (db *MsgDatabase) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { + return db.cache.SetMessageReactionExpire(ctx, clientMsgID, sessionType, expiration) } -func (db *msgDatabase) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { - //TODO implement me - panic("implement me") +func (db *MsgDatabase) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { + return db.cache.GetMessageTypeKeyValue(ctx, clientMsgID, sessionType, typeKey) } -func (db *msgDatabase) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) { - //TODO implement me - panic("implement me") +func (db *MsgDatabase) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) { + return db.cache.GetOneMessageAllReactionList(ctx, clientMsgID, sessionType) } -func (db *msgDatabase) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { - //TODO implement me - panic("implement me") +func (db *MsgDatabase) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { + return db.cache.DeleteOneMessageKey(ctx, clientMsgID, sessionType, subKey) } -func (db *msgDatabase) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error { - //TODO implement me - panic("implement me") +func (db *MsgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error { + return db.ExtendMsg.InsertOrUpdateReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, db.reactionExtensionList(reactionExtensionList)) } -func (db *msgDatabase) SetSendMsgStatus(ctx context.Context, userID string, status int32) error { - //TODO implement me - panic("implement me") +func (db *MsgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) { + extendMsgSet, err := db.ExtendMsg.GetExtendMsgSet(ctx, sourceID, sessionType, maxMsgUpdateTime) + if err != nil { + return nil, err + } + extendMsg, ok := extendMsgSet.ExtendMsgs[clientMsgID] + if !ok { + return nil, errors.New(fmt.Sprintf("cant find client msg id: %s", clientMsgID)) + } + reactionExtensionList := make(map[string]*pbMsg.KeyValueResp) + for key, model := range extendMsg.ReactionExtensionList { + reactionExtensionList[key] = &pbMsg.KeyValueResp{ + KeyValue: &sdkws.KeyValue{ + TypeKey: model.TypeKey, + Value: model.Value, + LatestUpdateTime: model.LatestUpdateTime, + }, + } + } + return &pbMsg.ExtendMsg{ + ReactionExtensionList: reactionExtensionList, + ClientMsgID: extendMsg.ClientMsgID, + MsgFirstModifyTime: extendMsg.MsgFirstModifyTime, + AttachedInfo: extendMsg.AttachedInfo, + Ex: extendMsg.Ex, + }, nil } -func (db *msgDatabase) GetSendMsgStatus(ctx context.Context, userID string) (int32, error) { - //TODO implement me - panic("implement me") +func (db *MsgDatabase) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error { + return db.ExtendMsg.DeleteReactionExtendMsgSet(ctx, sourceID, sessionType, clientMsgID, msgFirstModifyTime, db.reactionExtensionList(reactionExtensionList)) } -func (db *msgDatabase) MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error { - //TODO implement me - panic("implement me") +func (db *MsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error { + return db.cache.SetSendMsgStatus(ctx, id, status) } -func (db *msgDatabase) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) { - //TODO implement me - panic("implement me") +func (db *MsgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { + return db.cache.GetSendMsgStatus(ctx, id) } -func (db *msgDatabase) GetUserMinSeq(ctx context.Context, userID string) (int64, error) { +func (db *MsgDatabase) MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error { //TODO implement me panic("implement me") } -func (db *msgDatabase) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { - //TODO implement me - panic("implement me") +func (db *MsgDatabase) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) { + return db.cache.GetUserMaxSeq(ctx, userID) } -func (db *msgDatabase) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) { - //TODO implement me - panic("implement me") +func (db *MsgDatabase) GetUserMinSeq(ctx context.Context, userID string) (int64, error) { + return db.cache.GetUserMinSeq(ctx, userID) +} + +func (db *MsgDatabase) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { + return db.cache.GetGroupMaxSeq(ctx, groupID) +} + +func (db *MsgDatabase) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) { + return db.cache.GetGroupMinSeq(ctx, groupID) } -func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabase { - return &msgDatabase{} +func (db *MsgDatabase) GetMessageListBySeq(ctx context.Context, userID string, seqs []int64) ([]*sdkws.MsgData, error) { + seqMsg, _, err := db.cache.GetMessageListBySeq(ctx, userID, seqs) + return seqMsg, err } -func (db *msgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error { +func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error { //newTime := utils.GetCurrentTimestampByMill() if int64(len(msgList)) > db.msg.GetSingleGocMsgNum() { return errors.New("too large") @@ -337,11 +276,11 @@ func (db *msgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, return nil } -func (db *msgDatabase) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbMsg.MsgDataToMQ) error { +func (db *MsgDatabase) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbMsg.MsgDataToMQ) error { return db.cache.DeleteMessageFromCache(ctx, userID, msgs) } -func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) { +func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) { //newTime := utils.GetCurrentTimestampByMill() lenList := len(msgList) if int64(lenList) > db.msg.GetSingleGocMsgNum() { @@ -393,7 +332,7 @@ func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID strin return lastMaxSeq, utils.Wrap(err, "") } -func (db *msgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) { +func (db *MsgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) { sortkeys.Int64s(seqs) docIDSeqsMap := db.msg.GetDocIDSeqsMap(userID, seqs) lock := sync.Mutex{} @@ -414,7 +353,7 @@ func (db *msgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []i return totalUnExistSeqs, nil } -func (db *msgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) { +func (db *MsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) { seqMsgs, indexes, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs) if err != nil { return nil, err @@ -427,7 +366,7 @@ func (db *msgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, s return unExistSeqs, nil } -func (db *msgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) { +func (db *MsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) { doc, err := db.mgo.FindOneByDocID(ctx, docID) if err != nil { return nil, nil, nil, err @@ -458,7 +397,7 @@ func (db *msgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID s return seqMsgs, indexes, unExistSeqs, nil } -func (db *msgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) { +func (db *MsgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) { msgInfo, err := db.mgo.GetNewestMsg(ctx, sourceID) if err != nil { return nil, err @@ -466,7 +405,7 @@ func (db *msgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb return db.unmarshalMsg(msgInfo) } -func (db *msgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) { +func (db *MsgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) { msgInfo, err := db.mgo.GetOldestMsg(ctx, sourceID) if err != nil { return nil, err @@ -474,7 +413,7 @@ func (db *msgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb return db.unmarshalMsg(msgInfo) } -func (db *msgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *sdkws.MsgData, err error) { +func (db *MsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *sdkws.MsgData, err error) { msgPb = &sdkws.MsgData{} err = proto.Unmarshal(msgInfo.Msg, msgPb) if err != nil { @@ -483,7 +422,7 @@ func (db *msgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb * return msgPb, nil } -func (db *msgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs []int64, diffusionType int) (seqMsg []*sdkws.MsgData, err error) { +func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs []int64, diffusionType int) (seqMsg []*sdkws.MsgData, err error) { var hasSeqs []int64 singleCount := 0 m := db.msg.GetDocIDSeqsMap(sourceID, seqs) @@ -524,8 +463,8 @@ func (db *msgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs [ return seqMsg, nil } -func (db *msgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { - successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, userID, seqs) +func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { + successMsgs, failedSeqs, err := db.cache.GetMessageListBySeq(ctx, userID, seqs) if err != nil { if err != redis.Nil { prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) @@ -545,8 +484,8 @@ func (db *msgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []i return successMsgs, nil } -func (db *msgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { - successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, groupID, seqs) +func (db *MsgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { + successMsgs, failedSeqs, err := db.cache.GetMessageListBySeq(ctx, groupID, seqs) if err != nil { if err != redis.Nil { prome.PromeAdd(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) @@ -566,7 +505,7 @@ func (db *msgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID strin return successMsgs, nil } -func (db *msgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error { +func (db *MsgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error { err := db.DeleteUserMsgsAndSetMinSeq(ctx, userID, 0) if err != nil { return err @@ -575,7 +514,7 @@ func (db *msgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error return utils.Wrap(err, "") } -func (db *msgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error { +func (db *MsgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error { var delStruct delMsgRecursionStruct minSeq, err := db.deleteMsgRecursion(ctx, groupID, unRelationTb.OldestList, &delStruct, remainTime) if err != nil { @@ -603,7 +542,7 @@ func (db *msgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, return nil } -func (db *msgDatabase) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error { +func (db *MsgDatabase) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error { var delStruct delMsgRecursionStruct minSeq, err := db.deleteMsgRecursion(ctx, userID, unRelationTb.OldestList, &delStruct, remainTime) if err != nil { @@ -629,7 +568,7 @@ func (d *delMsgRecursionStruct) getSetMinSeq() int64 { // seq 70 // set minSeq 21 // recursion 删除list并且返回设置的最小seq -func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) { +func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) { // find from oldest list msgs, err := db.mgo.GetMsgsByIndex(ctx, sourceID, index) if err != nil || msgs.DocID == "" { @@ -691,10 +630,10 @@ func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, //log.NewDebug(operationID, sourceID, "continue to", delStruct) // 继续递归 index+1 seq, err := db.deleteMsgRecursion(ctx, sourceID, index+1, delStruct, remainTime) - return seq, err + return seq, utils.Wrap(err, "deleteMsg failed") } -func (db *msgDatabase) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) { +func (db *MsgDatabase) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) { minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, userID) if err != nil { return 0, 0, 0, 0, err @@ -711,7 +650,7 @@ func (db *msgDatabase) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, user return } -func (db *msgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) { +func (db *MsgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) { minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, groupID) if err != nil { return 0, 0, 0, err @@ -723,7 +662,7 @@ func (db *msgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context return } -func (db *msgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (minSeqMongo, maxSeqMongo int64, err error) { +func (db *MsgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (minSeqMongo, maxSeqMongo int64, err error) { oldestMsgMongo, err := db.mgo.GetOldestMsg(ctx, sourceID) if err != nil { return 0, 0, err @@ -745,10 +684,10 @@ func (db *msgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) ( return } -func (db *msgDatabase) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { +func (db *MsgDatabase) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { return db.cache.SetGroupUserMinSeq(ctx, groupID, userID, minSeq) } -func (db *msgDatabase) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { +func (db *MsgDatabase) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { return db.cache.SetUserMinSeq(ctx, userID, minSeq) } diff --git a/pkg/common/db/controller/push.go b/pkg/common/db/controller/push.go index 16b931c1f..09b29c9bf 100644 --- a/pkg/common/db/controller/push.go +++ b/pkg/common/db/controller/push.go @@ -10,7 +10,7 @@ type PushInterface interface { } type PushDataBase struct { - cache cache.Cache + cache cache.MsgCache } func (p *PushDataBase) DelFcmToken(ctx context.Context, userID string, platformID int) error { diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 9cc9b6908..213d93200 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -21,6 +21,27 @@ func CopyStructFields(a interface{}, b interface{}, fields ...string) (err error return copier.Copy(a, b) } +func Wrap1(err error) error { + if err != nil { + return Wrap(err, "") + } + return nil +} + +func Wrap2[T any](a T, err error) (T, error) { + if err != nil { + return a, Wrap(err, "") + } + return a, nil +} + +func Wrap3[T any, V any](a T, b V, err error) (T, V, error) { + if err != nil { + return a, b, Wrap(err, "") + } + return a, b, nil +} + func Wrap(err error, message string) error { return errors.Wrap(err, "==> "+printCallerNameAndLine()+message) }