Merge remote-tracking branch 'origin/errcode' into errcode

test-errcode
withchao 2 years ago
commit 3533dbd354

@ -13,6 +13,7 @@ groupCreated:
groupInfoSet:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: false
@ -23,6 +24,7 @@ groupInfoSet:
joinGroupApplication:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: false
@ -32,6 +34,7 @@ joinGroupApplication:
memberQuit:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: false
@ -41,6 +44,7 @@ memberQuit:
groupApplicationAccepted:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: false
@ -50,6 +54,7 @@ groupApplicationAccepted:
groupApplicationRejected:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: false
@ -60,6 +65,7 @@ groupApplicationRejected:
groupOwnerTransferred:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: false
@ -69,6 +75,7 @@ groupOwnerTransferred:
memberKicked:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: false
@ -78,6 +85,7 @@ memberKicked:
memberInvited:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: false
@ -87,6 +95,7 @@ memberInvited:
memberEnter:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: false
@ -96,6 +105,7 @@ memberEnter:
groupDismissed:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: false
@ -105,6 +115,7 @@ groupDismissed:
groupMuted:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: false
@ -114,6 +125,7 @@ groupMuted:
groupCancelMuted:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: false
@ -126,6 +138,7 @@ groupCancelMuted:
groupMemberMuted:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: false
@ -135,6 +148,7 @@ groupMemberMuted:
groupMemberCancelMuted:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: false
@ -144,6 +158,7 @@ groupMemberCancelMuted:
groupMemberInfoSet:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: false
@ -154,6 +169,7 @@ groupMemberInfoSet:
#############################friend#################################
friendApplicationAdded:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: false
@ -163,6 +179,7 @@ friendApplicationAdded:
friendApplicationApproved:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: true
@ -172,6 +189,7 @@ friendApplicationApproved:
friendApplicationRejected:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: true
@ -181,6 +199,7 @@ friendApplicationRejected:
friendAdded:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: true
@ -190,6 +209,7 @@ friendAdded:
friendDeleted:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: true
@ -199,6 +219,7 @@ friendDeleted:
friendRemarkSet:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: true
@ -208,6 +229,7 @@ friendRemarkSet:
blackAdded:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: true
@ -217,6 +239,7 @@ blackAdded:
blackDeleted:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: true
@ -226,6 +249,7 @@ blackDeleted:
friendInfoUpdated:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: true
@ -236,6 +260,7 @@ friendInfoUpdated:
#####################user#########################
userInfoUpdated:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: true
@ -246,6 +271,7 @@ userInfoUpdated:
#####################conversation#########################
conversationChanged:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: true
@ -255,6 +281,7 @@ conversationChanged:
conversationSetPrivate:
isSendMsg: true
reliabilityLevel: 1
unreadCount: false
offlinePush:
enable: true

@ -235,14 +235,14 @@ func (c *Client) writeBinaryMsg(resp Resp) error {
}
encodedBuf := bufferPool.Get().([]byte)
resultBuf := bufferPool.Get().([]byte)
encodeBuf, err := c.longConnServer.Encode(resp)
encodedBuf, err := c.longConnServer.Encode(resp)
if err != nil {
return utils.Wrap(err, "")
}
_ = c.conn.SetWriteDeadline(writeWait)
if c.isCompress {
var compressErr error
resultBuf, compressErr = c.longConnServer.Compress(encodeBuf)
resultBuf, compressErr = c.longConnServer.Compress(encodedBuf)
if compressErr != nil {
return utils.Wrap(compressErr, "")
}

@ -90,11 +90,11 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
ctxMsgList := msgChannelValue.ctxMsgList
ctx := msgChannelValue.ctx
log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "conversationID", msgChannelValue.conversationID)
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(msgChannelValue.conversationID, ctxMsgList)
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(ctxMsgList)
log.ZDebug(ctx, "msg lens", "storageMsgList", len(storageMsgList), "notStorageMsgList", len(notStorageMsgList),
"storageNotificationList", len(storageNotificationList), "notStorageNotificationList", len(notStorageNotificationList), "modifyMsgList", len(modifyMsgList))
och.handleMsg(ctx, msgChannelValue.conversationID, storageMsgList, notStorageMsgList)
och.handleNotification(ctx, msgChannelValue.conversationID, storageNotificationList, notStorageNotificationList)
och.handleMsg(ctx, utils.GetChatConversationIDByMsg(ctxMsgList[0].message), storageMsgList, notStorageMsgList)
och.handleNotification(ctx, utils.GetNotificationConversationID(ctxMsgList[0].message), storageNotificationList, notStorageNotificationList)
if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.conversationID, modifyMsgList); err != nil {
log.ZError(ctx, "msg to modify mq error", err, "conversationID", msgChannelValue.conversationID, "modifyMsgList", modifyMsgList)
}
@ -104,7 +104,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
}
// 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表,
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(conversationID string, totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*sdkws.MsgData) {
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*sdkws.MsgData) {
isStorage := func(msg *sdkws.MsgData) bool {
options2 := utils.Options(msg.Options)
if options2.IsHistory() {
@ -124,7 +124,7 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(conversation
if options.IsSendMsg() {
// 消息
if v.message.Options != nil {
v.message.Options = utils.WithOptions(utils.Options(v.message.Options), utils.WithNotification(false), utils.WithSendMsg(false))
v.message.Options = utils.WithOptions(utils.Options(v.message.Options), utils.WithNotNotification(true), utils.WithSendMsg(false))
}
storageMsgList = append(storageMsgList, v.message)
}
@ -155,7 +155,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
log.ZError(ctx, "notification batch insert to redis error", err, "conversationID", conversationID, "storageList", storageList)
return
}
log.ZDebug(ctx, "success to next topic")
log.ZDebug(ctx, "success to next topic", "conversationID", conversationID)
och.msgDatabase.MsgToMongoMQ(ctx, conversationID, storageList, lastSeq)
och.toPushTopic(ctx, conversationID, storageList)
}

@ -62,10 +62,14 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq)
if err != nil {
return nil, err
}
var isSend bool = true
conversationID := utils.GetConversationIDByMsg(req.MsgData)
isSend, err := m.modifyMessageByUserMessageReceiveOpt(ctx, req.MsgData.RecvID, conversationID, constant.SingleChatType, req)
if err != nil {
return nil, err
if utils.MsgIsNotification(req.MsgData) {
isSend, err = m.modifyMessageByUserMessageReceiveOpt(ctx, req.MsgData.RecvID, conversationID, constant.SingleChatType, req)
if err != nil {
return nil, err
}
}
if isSend {
err = m.MsgDatabase.MsgToMQ(ctx, conversationID, req.MsgData)

@ -341,7 +341,7 @@ type Notification struct {
////////////////////////user///////////////////////
UserInfoUpdated NotificationConf `yaml:"userInfoUpdated"`
//////////////////////friend///////////////////////
FriendApplication NotificationConf `yaml:"friendApplicationAdded"`
FriendApplicationAdded NotificationConf `yaml:"friendApplicationAdded"`
FriendApplicationApproved NotificationConf `yaml:"friendApplicationApproved"`
FriendApplicationRejected NotificationConf `yaml:"friendApplicationRejected"`
FriendAdded NotificationConf `yaml:"friendAdded"`

@ -142,7 +142,7 @@ const (
IsSenderConversationUpdate = "senderConversationUpdate"
IsSenderNotificationPush = "senderNotificationPush"
IsReactionFromCache = "reactionFromCache"
IsNotification = "isNotification"
IsNotNotification = "isNotNotification"
IsSendMsg = "isSendMsg"
//GroupStatus

@ -30,10 +30,10 @@ type ConversationCache interface {
NewCache() ConversationCache
// get user's conversationIDs from msgCache
GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error)
DelConversationIDs(userIDs []string) ConversationCache
DelConversationIDs(userIDs ...string) ConversationCache
// get one conversation from msgCache
GetConversation(ctx context.Context, ownerUserID, conversationID string) (*relationTb.ConversationModel, error)
DelConvsersations(ownerUserID string, conversationIDs []string) ConversationCache
DelConvsersations(ownerUserID string, conversationIDs ...string) ConversationCache
DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache
// get one conversation from msgCache
GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error)
@ -97,7 +97,7 @@ func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, own
})
}
func (c *ConversationRedisCache) DelConversationIDs(userIDs []string) ConversationCache {
func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) ConversationCache {
var keys []string
for _, userID := range userIDs {
keys = append(keys, c.getConversationIDsKey(userID))
@ -113,7 +113,7 @@ func (c *ConversationRedisCache) GetConversation(ctx context.Context, ownerUserI
})
}
func (c *ConversationRedisCache) DelConvsersations(ownerUserID string, convsersationIDs []string) ConversationCache {
func (c *ConversationRedisCache) DelConvsersations(ownerUserID string, convsersationIDs ...string) ConversationCache {
var keys []string
for _, conversationID := range convsersationIDs {
keys = append(keys, c.getConversationKey(ownerUserID, conversationID))

@ -7,9 +7,11 @@ import (
"time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/dtm-labs/rockscache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
@ -29,6 +31,7 @@ const (
getuiToken = "GETUI_TOKEN"
getuiTaskID = "GETUI_TASK_ID"
messageCache = "MESSAGE_CACHE:"
messageReadCache = "MESSAGE_READ_CACHE:"
signalCache = "SIGNAL_CACHE:"
signalListCache = "SIGNAL_LIST_CACHE:"
fcmToken = "FCM_TOKEN:"
@ -84,6 +87,9 @@ type MsgModel interface {
SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error
LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
GetMsgsByConversationIDAndSeq(ctx context.Context, docID string, seqs []int64) ([]*sdkws.MsgData, error)
DeleteMsgByConversationIDAndSeq(ctx context.Context, docID string, seq int64) MsgModel
}
func NewMsgCacheModel(client redis.UniversalClient) MsgModel {
@ -91,39 +97,11 @@ func NewMsgCacheModel(client redis.UniversalClient) MsgModel {
}
type msgCache struct {
rdb redis.UniversalClient
}
// 兼容老版本调用
func (c *msgCache) DelKeys() {
for _, key := range []string{"GROUP_CACHE:", "FRIEND_RELATION_CACHE:", "BLACK_LIST_CACHE:", "USER_INFO_CACHE:", "GROUP_INFO_CACHE:", "JOINED_GROUP_LIST_CACHE:",
"GROUP_MEMBER_INFO_CACHE:", "GROUP_ALL_MEMBER_INFO_CACHE:", "ALL_FRIEND_INFO_CACHE:"} {
fName := utils.GetSelfFuncName()
var cursor uint64
var n int
for {
var keys []string
var err error
keys, cursor, err = c.rdb.Scan(context.Background(), cursor, key+"*", scanCount).Result()
if err != nil {
panic(err.Error())
}
n += len(keys)
// for each for redis cluster
for _, key := range keys {
if err = c.rdb.Del(context.Background(), key).Err(); err != nil {
log.NewError("", fName, key, err.Error())
err = c.rdb.Del(context.Background(), key).Err()
if err != nil {
panic(err.Error())
}
}
}
if cursor == 0 {
break
}
}
}
metaCache
rdb redis.UniversalClient
expireTime time.Duration
rcClient *rockscache.Client
msgDocDatabase unRelationTb.MsgDocModelInterface
}
func (c *msgCache) getMaxSeqKey(conversationID string) string {
@ -145,7 +123,6 @@ func (c *msgCache) getSeq(ctx context.Context, conversationID string, getkey fun
func (c *msgCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) {
pipe := c.rdb.Pipeline()
for _, v := range items {
log.ZDebug(ctx, "getSeqs", "getkey", getkey(v))
if err := pipe.Get(ctx, getkey(v)).Err(); err != nil && err != redis.Nil {
return nil, errs.Wrap(err)
}
@ -550,3 +527,41 @@ func (c *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID
func (c *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err())
}
func (c *msgCache) NewCache() MsgModel {
return &msgCache{
metaCache: NewMetaCacheRedis(c.rcClient, c.metaCache.GetPreDelKeys()...),
expireTime: c.expireTime,
rcClient: c.rcClient,
}
}
func (c msgCache) getMsgReadCacheKey(docID string, seq int64) string {
return messageReadCache + docID + "_" + strconv.Itoa(int(seq))
}
func (c *msgCache) getMsgsIndex(msg *sdkws.MsgData, keys []string) (int, error) {
key := c.getMsgReadCacheKey(utils.GetConversationIDByMsg(msg), msg.Seq)
for i, _key := range keys {
if key == _key {
return i, nil
}
}
return 0, errIndex
}
func (c *msgCache) GetMsgsByConversationIDAndSeq(ctx context.Context, docID string, seqs []int64) ([]*sdkws.MsgData, error) {
var keys []string
for _, seq := range seqs {
keys = append(keys, c.getMsgReadCacheKey(docID, seq))
}
return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getMsgsIndex, func(ctx context.Context) ([]*sdkws.MsgData, error) {
return c.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, seqs)
})
}
func (c *msgCache) DeleteMsgByConversationIDAndSeq(ctx context.Context, docID string, seq int64) MsgModel {
cache := c.NewCache()
c.AddKeys(c.getMsgReadCacheKey(docID, seq))
return cache
}

@ -2,10 +2,10 @@ package controller
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"gorm.io/gorm"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
)
type BlackDatabase interface {
@ -31,12 +31,26 @@ func NewBlackDatabase(black relation.BlackModelInterface, cache cache.BlackCache
// Create 增加黑名单
func (b *blackDatabase) Create(ctx context.Context, blacks []*relation.BlackModel) (err error) {
return b.black.Create(ctx, blacks)
if err := b.black.Create(ctx, blacks); err != nil {
return err
}
return b.deleteBlackIDsCache(ctx, blacks)
}
// Delete 删除黑名单
func (b *blackDatabase) Delete(ctx context.Context, blacks []*relation.BlackModel) (err error) {
return b.black.Delete(ctx, blacks)
if err := b.black.Delete(ctx, blacks); err != nil {
return err
}
return b.deleteBlackIDsCache(ctx, blacks)
}
func (b *blackDatabase) deleteBlackIDsCache(ctx context.Context, blacks []*relation.BlackModel) (err error) {
cache := b.cache.NewCache()
for _, black := range blacks {
cache = cache.DelBlackIDs(ctx, black.OwnerUserID)
}
return cache.ExecDel(ctx)
}
// FindOwnerBlacks 获取黑名单列表
@ -46,21 +60,15 @@ func (b *blackDatabase) FindOwnerBlacks(ctx context.Context, ownerUserID string,
// CheckIn 检查user2是否在user1的黑名单列表中(inUser1Blacks==true) 检查user1是否在user2的黑名单列表中(inUser2Blacks==true)
func (b *blackDatabase) CheckIn(ctx context.Context, userID1, userID2 string) (inUser1Blacks bool, inUser2Blacks bool, err error) {
_, err = b.black.Take(ctx, userID1, userID2)
userID1BlackIDs, err := b.cache.GetBlackIDs(ctx, userID1)
if err != nil {
if errs.Unwrap(err) != gorm.ErrRecordNotFound {
return
}
return
}
inUser1Blacks = err == nil
_, err = b.black.Take(ctx, userID2, userID1)
userID2BlackIDs, err := b.cache.GetBlackIDs(ctx, userID2)
if err != nil {
if errs.Unwrap(err) != gorm.ErrRecordNotFound {
return
}
return
}
inUser2Blacks = err == nil
return inUser1Blacks, inUser2Blacks, nil
return utils.IsContain(userID2, userID1BlackIDs), utils.IsContain(userID1, userID2BlackIDs), nil
}
func (b *blackDatabase) FindBlackIDs(ctx context.Context, ownerUserID string) (blackIDs []string, err error) {

@ -46,14 +46,14 @@ type ConversationDataBase struct {
tx tx.Tx
}
func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error {
return c.tx.Transaction(func(tx any) error {
func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) (err error) {
cache := c.cache.NewCache()
if err := c.tx.Transaction(func(tx any) error {
conversationTx := c.conversationDB.NewTx(tx)
haveUserIDs, err := conversationTx.FindUserID(ctx, userIDs, []string{conversation.ConversationID})
if err != nil {
return err
}
cache := c.cache.NewCache()
if len(haveUserIDs) > 0 {
_, err = conversationTx.UpdateByMap(ctx, haveUserIDs, conversation.ConversationID, filedMap)
if err != nil {
@ -71,19 +71,20 @@ func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context,
}
temp.OwnerUserID = v
conversations = append(conversations, temp)
}
}
if len(conversations) > 0 {
err = conversationTx.Create(ctx, conversations)
if err != nil {
return err
}
cache = cache.DelConversationIDs(NotUserIDs)
cache = cache.DelConversationIDs(NotUserIDs...)
}
// clear cache
log.ZDebug(ctx, "SetUsersConversationFiledTx", "cache", cache.GetPreDelKeys(), "addr", &cache)
return cache.ExecDel(ctx)
})
return nil
}); err != nil {
return err
}
return cache.ExecDel(ctx)
}
func (c *ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) error {
@ -98,13 +99,17 @@ func (c *ConversationDataBase) CreateConversation(ctx context.Context, conversat
if err := c.conversationDB.Create(ctx, conversations); err != nil {
return err
}
return nil
var userIDs []string
for _, conversation := range conversations {
userIDs = append(userIDs, conversation.OwnerUserID)
}
return c.cache.DelConversationIDs(userIDs...).ExecDel(ctx)
}
func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationTb.ConversationModel) error {
return c.tx.Transaction(func(tx any) error {
cache := c.cache.NewCache()
if err := c.tx.Transaction(func(tx any) error {
conversationTx := c.conversationDB.NewTx(tx)
cache := c.cache.NewCache()
for _, conversation := range conversations {
for _, v := range [][2]string{{conversation.OwnerUserID, conversation.UserID}, {conversation.UserID, conversation.OwnerUserID}} {
haveUserIDs, err := conversationTx.FindUserID(ctx, []string{v[0]}, []string{conversation.ConversationID})
@ -126,12 +131,15 @@ func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Con
if err := conversationTx.Create(ctx, []*relationTb.ConversationModel{&newConversation}); err != nil {
return err
}
cache = cache.DelConversationIDs([]string{v[0]})
cache = cache.DelConversationIDs([]string{v[0]}...)
}
}
}
return c.cache.ExecDel(ctx)
})
return nil
}); err != nil {
return err
}
return c.cache.ExecDel(ctx)
}
func (c *ConversationDataBase) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) {
@ -147,7 +155,8 @@ func (c *ConversationDataBase) GetUserAllConversation(ctx context.Context, owner
}
func (c *ConversationDataBase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error {
return c.tx.Transaction(func(tx any) error {
cache := c.cache.NewCache()
if err := c.tx.Transaction(func(tx any) error {
var conversationIDs []string
for _, conversation := range conversations {
conversationIDs = append(conversationIDs, conversation.ConversationID)
@ -181,13 +190,14 @@ func (c *ConversationDataBase) SetUserConversations(ctx context.Context, ownerUs
if err != nil {
return err
}
cache = cache.DelConversationIDs([]string{ownerUserID}...)
}
cache := c.cache.NewCache()
if len(notExistConversations) > 0 {
cache = cache.DelConversationIDs([]string{ownerUserID})
}
return cache.DelConvsersations(ownerUserID, existConversationIDs).ExecDel(ctx)
})
cache = cache.DelConvsersations(ownerUserID, existConversationIDs...)
return nil
}); err != nil {
return err
}
return cache.ExecDel(ctx)
}
func (c *ConversationDataBase) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) {
@ -195,27 +205,36 @@ func (c *ConversationDataBase) FindRecvMsgNotNotifyUserIDs(ctx context.Context,
}
func (c *ConversationDataBase) CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error {
cache := c.cache.NewCache()
conversationID := utils.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID)
return c.tx.Transaction(func(tx any) error {
existConversationUserIDs, err := c.conversationDB.FindUserID(ctx, userIDs, []string{groupID})
if err := c.tx.Transaction(func(tx any) error {
existConversationUserIDs, err := c.conversationDB.FindUserID(ctx, userIDs, []string{conversationID})
if err != nil {
return err
}
notExistUserIDs := utils.DifferenceString(userIDs, existConversationUserIDs)
var conversations []*relationTb.ConversationModel
for _, v := range notExistUserIDs {
conversation := relationTb.ConversationModel{ConversationType: constant.SuperGroupChatType, GroupID: groupID, OwnerUserID: v, ConversationID: conversationID}
conversations = append(conversations, &conversation)
}
cache = cache.DelConversationIDs(notExistUserIDs...)
err = c.conversationDB.Create(ctx, conversations)
if err != nil {
return err
}
_, err = c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, map[string]interface{}{"max_seq": 0})
_, err = c.conversationDB.UpdateByMap(ctx, existConversationUserIDs, conversationID, map[string]interface{}{"max_seq": 0})
if err != nil {
return err
}
for _, v := range existConversationUserIDs {
cache = cache.DelConvsersations(v, conversationID)
}
return nil
}); err != nil {
return err
})
}
return cache.ExecDel(ctx)
}
func (c *ConversationDataBase) GetConversationIDs(ctx context.Context, userID string) ([]string, error) {

@ -55,19 +55,15 @@ func NewFriendDatabase(friend relation.FriendModelInterface, friendRequest relat
// ok 检查user2是否在user1的好友列表中(inUser1Friends==true) 检查user1是否在user2的好友列表中(inUser2Friends==true)
func (f *friendDatabase) CheckIn(ctx context.Context, userID1, userID2 string) (inUser1Friends bool, inUser2Friends bool, err error) {
friends, err := f.friend.FindUserState(ctx, userID1, userID2)
userID1FriendIDs, err := f.cache.GetFriendIDs(ctx, userID1)
if err != nil {
return false, false, err
return
}
for _, v := range friends {
if v.OwnerUserID == userID1 && v.FriendUserID == userID2 {
inUser1Friends = true
}
if v.OwnerUserID == userID2 && v.FriendUserID == userID1 {
inUser2Friends = true
}
userID2FriendIDs, err := f.cache.GetFriendIDs(ctx, userID2)
if err != nil {
return
}
return
return utils.IsContain(userID2, userID1FriendIDs), utils.IsContain(userID1, userID2FriendIDs), nil
}
// 增加或者更新好友申请 如果之前有记录则更新,没有记录则新增
@ -100,7 +96,8 @@ func (f *friendDatabase) AddFriendRequest(ctx context.Context, fromUserID, toUse
// (1)先判断是否在好友表 (在不在都不返回错误) (2)对于不在好友列表的 插入即可
func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string, friendUserIDs []string, addSource int32) (err error) {
return f.tx.Transaction(func(tx any) error {
cache := f.cache.NewCache()
if err := f.tx.Transaction(func(tx any) error {
//先find 找出重复的 去掉重复的
fs1, err := f.friend.NewTx(tx).FindFriends(ctx, ownerUserID, friendUserIDs)
if err != nil {
@ -135,8 +132,12 @@ func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string,
return err
}
newFriendIDs = append(newFriendIDs, ownerUserID)
return f.cache.DelFriendIDs(newFriendIDs...).ExecDel(ctx)
})
cache = cache.DelFriendIDs(newFriendIDs...)
return nil
}); err != nil {
return nil
}
return cache.ExecDel(ctx)
}
// 拒绝好友申请 (1)检查是否有申请记录且为未处理状态 (没有记录返回错误) (2)修改申请记录 已拒绝
@ -199,24 +200,18 @@ func (f *friendDatabase) AgreeFriendRequest(ctx context.Context, friendRequest *
// 删除好友 外部判断是否好友关系
func (f *friendDatabase) Delete(ctx context.Context, ownerUserID string, friendUserIDs []string) (err error) {
return f.tx.Transaction(func(tx any) error {
if err := f.friend.Delete(ctx, ownerUserID, friendUserIDs); err != nil {
return err
}
return f.cache.DelFriendIDs(append(friendUserIDs, ownerUserID)...).ExecDel(ctx)
})
if err := f.friend.Delete(ctx, ownerUserID, friendUserIDs); err != nil {
return err
}
return f.cache.DelFriendIDs(append(friendUserIDs, ownerUserID)...).ExecDel(ctx)
}
// 更新好友备注 零值也支持
func (f *friendDatabase) UpdateRemark(ctx context.Context, ownerUserID, friendUserID, remark string) (err error) {
return f.tx.Transaction(func(tx any) error {
err := f.friend.UpdateRemark(ctx, ownerUserID, friendUserID, remark)
if err != nil {
return err
}
return f.cache.DelFriend(ownerUserID, friendUserID).ExecDel(ctx)
})
if err := f.friend.UpdateRemark(ctx, ownerUserID, friendUserID, remark); err != nil {
return err
}
return f.cache.DelFriend(ownerUserID, friendUserID).ExecDel(ctx)
}
// 获取ownerUserID的好友列表 无结果不返回错误

@ -113,7 +113,8 @@ func (g *groupDatabase) FindGroupMemberUserID(ctx context.Context, groupID strin
}
func (g *groupDatabase) CreateGroup(ctx context.Context, groups []*relationTb.GroupModel, groupMembers []*relationTb.GroupMemberModel) error {
return g.tx.Transaction(func(tx any) error {
var cache = g.cache.NewCache()
if err := g.tx.Transaction(func(tx any) error {
if len(groups) > 0 {
if err := g.groupDB.NewTx(tx).Create(ctx, groups); err != nil {
return err
@ -128,7 +129,7 @@ func (g *groupDatabase) CreateGroup(ctx context.Context, groups []*relationTb.Gr
return group.GroupID
})
m := make(map[string]struct{})
var cache = g.cache.NewCache()
for _, groupMember := range groupMembers {
if _, ok := m[groupMember.GroupID]; !ok {
m[groupMember.GroupID] = struct{}{}
@ -137,8 +138,11 @@ func (g *groupDatabase) CreateGroup(ctx context.Context, groups []*relationTb.Gr
cache = cache.DelJoinedGroupID(groupMember.UserID).DelGroupMembersInfo(groupMember.GroupID, groupMember.UserID)
}
cache = cache.DelGroupsInfo(createGroupIDs...)
return cache.ExecDel(ctx)
})
return nil
}); err != nil {
return err
}
return cache.ExecDel(ctx)
}
func (g *groupDatabase) TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) {
@ -154,16 +158,15 @@ func (g *groupDatabase) SearchGroup(ctx context.Context, keyword string, pageNum
}
func (g *groupDatabase) UpdateGroup(ctx context.Context, groupID string, data map[string]any) error {
return g.tx.Transaction(func(tx any) error {
if err := g.groupDB.NewTx(tx).UpdateMap(ctx, groupID, data); err != nil {
return err
}
return g.cache.DelGroupsInfo(groupID).ExecDel(ctx)
})
if err := g.groupDB.UpdateMap(ctx, groupID, data); err != nil {
return err
}
return g.cache.DelGroupsInfo(groupID).ExecDel(ctx)
}
func (g *groupDatabase) DismissGroup(ctx context.Context, groupID string) error {
return g.tx.Transaction(func(tx any) error {
cache := g.cache.NewCache()
if err := g.tx.Transaction(func(tx any) error {
if err := g.groupDB.NewTx(tx).UpdateStatus(ctx, groupID, constant.GroupStatusDismissed); err != nil {
return err
}
@ -174,8 +177,12 @@ func (g *groupDatabase) DismissGroup(ctx context.Context, groupID string) error
if err != nil {
return err
}
return g.cache.DelJoinedGroupID(userIDs...).DelGroupsInfo(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelGroupMembersHash(groupID).ExecDel(ctx)
})
cache = cache.DelJoinedGroupID(userIDs...).DelGroupsInfo(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelGroupMembersHash(groupID)
return nil
}); err != nil {
return err
}
return cache.ExecDel(ctx)
}
func (g *groupDatabase) TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationTb.GroupMemberModel, err error) {
@ -236,7 +243,8 @@ func (g *groupDatabase) SearchGroupMember(ctx context.Context, keyword string, g
}
func (g *groupDatabase) HandlerGroupRequest(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32, member *relationTb.GroupMemberModel) error {
return g.tx.Transaction(func(tx any) error {
cache := g.cache.NewCache()
if err := g.tx.Transaction(func(tx any) error {
if err := g.groupRequestDB.NewTx(tx).UpdateHandler(ctx, groupID, userID, handledMsg, handleResult); err != nil {
return err
}
@ -244,19 +252,20 @@ func (g *groupDatabase) HandlerGroupRequest(ctx context.Context, groupID string,
if err := g.groupMemberDB.NewTx(tx).Create(ctx, []*relationTb.GroupMemberModel{member}); err != nil {
return err
}
return g.cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(member.UserID).ExecDel(ctx)
cache = cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(member.UserID)
}
return nil
})
}); err != nil {
return err
}
return cache.ExecDel(ctx)
}
func (g *groupDatabase) DeleteGroupMember(ctx context.Context, groupID string, userIDs []string) error {
return g.tx.Transaction(func(tx any) error {
if err := g.groupMemberDB.NewTx(tx).Delete(ctx, groupID, userIDs); err != nil {
return err
}
return g.cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(userIDs...).DelGroupMembersInfo(groupID, userIDs...).ExecDel(ctx)
})
if err := g.groupMemberDB.Delete(ctx, groupID, userIDs); err != nil {
return err
}
return g.cache.DelGroupMembersHash(groupID).DelGroupMemberIDs(groupID).DelGroupsMemberNum(groupID).DelJoinedGroupID(userIDs...).DelGroupMembersInfo(groupID, userIDs...).ExecDel(ctx)
}
func (g *groupDatabase) MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) {
@ -276,7 +285,7 @@ func (g *groupDatabase) MapGroupMemberNum(ctx context.Context, groupIDs []string
}
func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error {
return g.tx.Transaction(func(tx any) error {
if err := g.tx.Transaction(func(tx any) error {
rowsAffected, err := g.groupMemberDB.NewTx(tx).UpdateRoleLevel(ctx, groupID, oldOwnerUserID, roleLevel)
if err != nil {
return err
@ -291,30 +300,34 @@ func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string,
if rowsAffected != 1 {
return utils.Wrap(fmt.Errorf("newOwnerUserID %s rowsAffected = %d", newOwnerUserID, rowsAffected), "")
}
return g.cache.DelGroupMembersInfo(groupID, oldOwnerUserID, newOwnerUserID).ExecDel(ctx)
})
return nil
}); err != nil {
return err
}
return g.cache.DelGroupMembersInfo(groupID, oldOwnerUserID, newOwnerUserID).ExecDel(ctx)
}
func (g *groupDatabase) UpdateGroupMember(ctx context.Context, groupID string, userID string, data map[string]any) error {
return g.tx.Transaction(func(tx any) error {
if err := g.groupMemberDB.NewTx(tx).Update(ctx, groupID, userID, data); err != nil {
return err
}
return g.cache.DelGroupMembersInfo(groupID, userID).ExecDel(ctx)
})
if err := g.groupMemberDB.Update(ctx, groupID, userID, data); err != nil {
return err
}
return g.cache.DelGroupMembersInfo(groupID, userID).ExecDel(ctx)
}
func (g *groupDatabase) UpdateGroupMembers(ctx context.Context, data []*relationTb.BatchUpdateGroupMember) error {
return g.tx.Transaction(func(tx any) error {
var cache = g.cache.NewCache()
var cache = g.cache.NewCache()
if err := g.tx.Transaction(func(tx any) error {
for _, item := range data {
if err := g.groupMemberDB.NewTx(tx).Update(ctx, item.GroupID, item.UserID, item.Map); err != nil {
return err
}
cache = cache.DelGroupMembersInfo(item.GroupID, item.UserID)
}
return cache.ExecDel(ctx)
})
return nil
}); err != nil {
return err
}
return cache.ExecDel(ctx)
}
func (g *groupDatabase) CreateGroupRequest(ctx context.Context, requests []*relationTb.GroupRequestModel) error {
@ -346,16 +359,15 @@ func (g *groupDatabase) FindJoinSuperGroup(ctx context.Context, userID string) (
}
func (g *groupDatabase) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDs []string) error {
return g.ctxTx.Transaction(ctx, func(ctx context.Context) error {
if err := g.mongoDB.CreateSuperGroup(ctx, groupID, initMemberIDs); err != nil {
return err
}
return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(initMemberIDs...).ExecDel(ctx)
})
if err := g.mongoDB.CreateSuperGroup(ctx, groupID, initMemberIDs); err != nil {
return err
}
return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(initMemberIDs...).ExecDel(ctx)
}
func (g *groupDatabase) DeleteSuperGroup(ctx context.Context, groupID string) error {
return g.ctxTx.Transaction(ctx, func(ctx context.Context) error {
cache := g.cache.NewCache()
if err := g.ctxTx.Transaction(ctx, func(ctx context.Context) error {
if err := g.mongoDB.DeleteSuperGroup(ctx, groupID); err != nil {
return err
}
@ -363,28 +375,27 @@ func (g *groupDatabase) DeleteSuperGroup(ctx context.Context, groupID string) er
if err != nil {
return err
}
cache := g.cache.DelSuperGroupMemberIDs(groupID)
cache = cache.DelSuperGroupMemberIDs(groupID)
if len(models) > 0 {
cache = cache.DelJoinedSuperGroupIDs(models[0].MemberIDs...)
}
return cache.ExecDel(ctx)
})
return nil
}); err != nil {
return err
}
return cache.ExecDel(ctx)
}
func (g *groupDatabase) DeleteSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error {
return g.ctxTx.Transaction(ctx, func(ctx context.Context) error {
if err := g.mongoDB.RemoverUserFromSuperGroup(ctx, groupID, userIDs); err != nil {
return err
}
return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx)
})
if err := g.mongoDB.RemoverUserFromSuperGroup(ctx, groupID, userIDs); err != nil {
return err
}
return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx)
}
func (g *groupDatabase) CreateSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error {
return g.ctxTx.Transaction(ctx, func(ctx context.Context) error {
if err := g.mongoDB.AddUserToSuperGroup(ctx, groupID, userIDs); err != nil {
return err
}
return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx)
})
if err := g.mongoDB.AddUserToSuperGroup(ctx, groupID, userIDs); err != nil {
return err
}
return g.cache.DelSuperGroupMemberIDs(groupID).DelJoinedSuperGroupIDs(userIDs...).ExecDel(ctx)
}

@ -100,13 +100,12 @@ type commonMsgDatabase struct {
msgDocDatabase unRelationTb.MsgDocModelInterface
extendMsgDatabase unRelationTb.ExtendMsgSetModelInterface
extendMsgSetModel unRelationTb.ExtendMsgSetModel
msg unRelationTb.MsgDocModel
cache cache.MsgModel
producer *kafka.Producer
producerToMongo *kafka.Producer
producerToModify *kafka.Producer
producerToPush *kafka.Producer
// model
msg unRelationTb.MsgDocModel
}
func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error {
@ -277,7 +276,7 @@ func (db *commonMsgDatabase) DelMsgBySeqs(ctx context.Context, conversationID st
}
func (db *commonMsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) {
seqMsgs, indexes, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
seqMsgs, indexes, unExistSeqs, err := db.msgDocDatabase.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
if err != nil {
return nil, err
}
@ -289,37 +288,6 @@ func (db *commonMsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID str
return unExistSeqs, nil
}
func (db *commonMsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) {
doc, err := db.msgDocDatabase.FindOneByDocID(ctx, docID)
if err != nil {
return nil, nil, nil, err
}
singleCount := 0
var hasSeqList []int64
for i := 0; i < len(doc.Msg); i++ {
msgPb, err := db.unmarshalMsg(&doc.Msg[i])
if err != nil {
return nil, nil, nil, err
}
if utils.Contain(msgPb.Seq, seqs...) {
indexes = append(indexes, i)
seqMsgs = append(seqMsgs, msgPb)
hasSeqList = append(hasSeqList, msgPb.Seq)
singleCount++
if singleCount == len(seqs) {
break
}
}
}
for _, i := range seqs {
if utils.Contain(i, hasSeqList...) {
continue
}
unExistSeqs = append(unExistSeqs, i)
}
return seqMsgs, indexes, unExistSeqs, nil
}
func (db *commonMsgDatabase) GetNewestMsg(ctx context.Context, conversationID string) (msgPb *sdkws.MsgData, err error) {
msgInfo, err := db.msgDocDatabase.GetNewestMsg(ctx, conversationID)
if err != nil {
@ -345,15 +313,22 @@ func (db *commonMsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (m
return msgPb, nil
}
func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, err error) {
seqMsgs, unexistSeqs, err := db.findMsgBySeq(ctx, conversationID, seqs)
if err != nil {
return nil, err
func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) {
m := db.msg.GetDocIDSeqsMap(conversationID, seqs)
var totalUnExistSeqs []int64
for docID, seqs := range m {
log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs)
seqMsgs, unexistSeqs, err := db.findMsgBySeq(ctx, docID, seqs)
if err != nil {
return nil, err
}
totalMsgs = append(totalMsgs, seqMsgs...)
totalUnExistSeqs = append(totalUnExistSeqs, unexistSeqs...)
}
for _, unexistSeq := range unexistSeqs {
seqMsgs = append(seqMsgs, db.msg.GenExceptionMessageBySeqs([]int64{unexistSeq})...)
for _, unexistSeq := range totalUnExistSeqs {
totalMsgs = append(totalMsgs, db.msg.GenExceptionMessageBySeqs([]int64{unexistSeq})...)
}
return seqMsgs, nil
return totalMsgs, nil
}
func (db *commonMsgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversationID string, delNums, rangeBegin, begin int64) (seqMsgs []*sdkws.MsgData, err error) {
@ -372,8 +347,8 @@ func (db *commonMsgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversatio
}
if len(reFetchSeqs) > 0 {
m := db.msg.GetDocIDSeqsMap(conversationID, reFetchSeqs)
for docID, seq := range m {
msgs, _, err := db.findMsgBySeq(ctx, docID, seq)
for docID, seqs := range m {
msgs, _, err := db.findMsgBySeq(ctx, docID, seqs)
if err != nil {
return nil, err
}
@ -395,12 +370,11 @@ func (db *commonMsgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversatio
}
func (db *commonMsgDatabase) findMsgBySeq(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, unExistSeqs []int64, err error) {
beginSeq, endSeq := db.msg.GetSeqsBeginEnd(seqs)
msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, beginSeq, endSeq)
msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, seqs)
if err != nil {
return nil, nil, err
}
log.ZDebug(ctx, "findMsgBySeq", "docID", docID, "seqs", seqs, "beginSeq", beginSeq, "endSeq", endSeq, "len(msgs)", len(msgs))
log.ZDebug(ctx, "findMsgBySeq", "docID", docID, "seqs", seqs, "len(msgs)", len(msgs))
seqMsgs = append(seqMsgs, msgs...)
if len(msgs) == 0 {
unExistSeqs = seqs
@ -416,7 +390,7 @@ func (db *commonMsgDatabase) findMsgBySeq(ctx context.Context, docID string, seq
}
}
}
msgs, _, unExistSeqs, err = db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, unExistSeqs)
msgs, _, unExistSeqs, err = db.msgDocDatabase.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, unExistSeqs)
if err != nil {
return nil, nil, err
}
@ -446,7 +420,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, conversation
m = db.msg.GetDocIDSeqsMap(conversationID, totalNotExistSeqs)
for docID, seqs := range m {
docID = db.msg.ToNextDoc(docID)
msgs, _, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
msgs, _, unExistSeqs, err := db.msgDocDatabase.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
if err != nil {
missedSeqs = append(missedSeqs, seqs...)
log.ZError(ctx, "get message from mongo exception", err, "docID", docID, "seqs", seqs)
@ -477,7 +451,6 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, conversation
if len(totalNotExistSeqs) > 0 || len(delSeqs) > 0 {
sort.Sort(utils.MsgBySeq(seqMsgs))
}
// missSeqs为依然缺失的
return seqMsgs, nil
}
@ -485,7 +458,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, conversation
var seqs []int64
for i := end; i > end-num; i-- {
if i >= begin {
seqs = append(seqs, i)
seqs = append([]int64{i}, seqs...)
} else {
break
}

@ -76,40 +76,36 @@ func (u *userDatabase) Find(ctx context.Context, userIDs []string) (users []*rel
// 插入多条 外部保证userID 不重复 且在db中不存在
func (u *userDatabase) Create(ctx context.Context, users []*relation.UserModel) (err error) {
return u.tx.Transaction(func(tx any) error {
if err := u.tx.Transaction(func(tx any) error {
err = u.userDB.Create(ctx, users)
if err != nil {
return err
}
var userIDs []string
for _, user := range users {
userIDs = append(userIDs, user.UserID)
}
return u.cache.DelUsersInfo(userIDs...).ExecDel(ctx)
})
return nil
}); err != nil {
return err
}
var userIDs []string
for _, user := range users {
userIDs = append(userIDs, user.UserID)
}
return u.cache.DelUsersInfo(userIDs...).ExecDel(ctx)
}
// 更新(非零值) 外部保证userID存在
func (u *userDatabase) Update(ctx context.Context, user *relation.UserModel) (err error) {
return u.tx.Transaction(func(tx any) error {
err = u.userDB.Update(ctx, user)
if err != nil {
return err
}
return u.cache.DelUsersInfo(user.UserID).ExecDel(ctx)
})
if err := u.userDB.Update(ctx, user); err != nil {
return err
}
return u.cache.DelUsersInfo(user.UserID).ExecDel(ctx)
}
// 更新(零值) 外部保证userID存在
func (u *userDatabase) UpdateByMap(ctx context.Context, userID string, args map[string]interface{}) (err error) {
return u.tx.Transaction(func(tx any) error {
err = u.userDB.UpdateByMap(ctx, userID, args)
if err != nil {
return err
}
return u.cache.DelUsersInfo(userID).ExecDel(ctx)
})
if err := u.userDB.UpdateByMap(ctx, userID, args); err != nil {
return err
}
return u.cache.DelUsersInfo(userID).ExecDel(ctx)
}
// 获取,如果没找到,不返回错误

@ -31,7 +31,8 @@ type MsgDocModelInterface interface {
Create(ctx context.Context, model *MsgDocModel) error
UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error
FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error)
GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, beginSeq, endSeq int64) ([]*sdkws.MsgData, error)
GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) ([]*sdkws.MsgData, error)
GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error)
GetNewestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error)
GetOldestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error)
Delete(ctx context.Context, docIDs []string) error
@ -97,13 +98,6 @@ func (m MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[st
return t
}
func (m MsgDocModel) GetSeqsBeginEnd(seqs []int64) (int64, int64) {
if len(seqs) == 0 {
return 0, 0
}
return seqs[len(seqs)-1], seqs[0]
}
func (m MsgDocModel) GetMsgIndex(seq int64) int64 {
seqSuffix := seq / singleGocMsgNum
var index int64

@ -57,6 +57,37 @@ func (m *MsgMongoDriver) FindOneByDocID(ctx context.Context, docID string) (*tab
return doc, err
}
func (m *MsgMongoDriver) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) {
doc, err := m.FindOneByDocID(ctx, docID)
if err != nil {
return nil, nil, nil, err
}
singleCount := 0
var hasSeqList []int64
for i := 0; i < len(doc.Msg); i++ {
var msg sdkws.MsgData
if err := proto.Unmarshal(doc.Msg[i].Msg, &msg); err != nil {
return nil, nil, nil, err
}
if utils.Contain(msg.Seq, seqs...) {
indexes = append(indexes, i)
seqMsgs = append(seqMsgs, &msg)
hasSeqList = append(hasSeqList, msg.Seq)
singleCount++
if singleCount == len(seqs) {
break
}
}
}
for _, i := range seqs {
if utils.Contain(i, hasSeqList...) {
continue
}
unExistSeqs = append(unExistSeqs, i)
}
return seqMsgs, indexes, unExistSeqs, nil
}
func (m *MsgMongoDriver) GetMsgsByIndex(ctx context.Context, conversationID string, index int64) (*table.MsgDocModel, error) {
findOpts := options.Find().SetLimit(1).SetSkip(index).SetSort(bson.M{"doc_id": 1})
cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)}}, findOpts)
@ -134,7 +165,8 @@ func (m *MsgMongoDriver) UpdateOneDoc(ctx context.Context, msg *table.MsgDocMode
return err
}
func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, beginSeq, endSeq int64) (msgs []*sdkws.MsgData, err error) {
func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) (msgs []*sdkws.MsgData, err error) {
beginSeq, endSeq := utils.GetSeqsBeginEnd(seqs)
beginIndex := m.msg.GetMsgIndex(beginSeq)
num := endSeq - beginSeq + 1
pipeline := bson.A{
@ -165,7 +197,7 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID strin
break
}
}
log.ZDebug(ctx, "msgInfos", "num", len(doc.Msg))
log.ZDebug(ctx, "msgInfos", "num", len(doc.Msg), "docID", docID)
for _, v := range doc.Msg {
var msg sdkws.MsgData
if err := proto.Unmarshal(v.Msg, &msg); err != nil {

@ -3,7 +3,6 @@ package friend
import "github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
func (m *ApplyToAddFriendReq) Check() error {
*m = ApplyToAddFriendReq{}
if m.GetToUserID() == "" {
return errs.ErrArgs.Wrap("get toUserID is empty")
}

@ -41,7 +41,7 @@ func newContentTypeConf() map[int32]config.NotificationConf {
// user
constant.UserInfoUpdatedNotification: config.Config.Notification.UserInfoUpdated,
// friend
constant.FriendApplicationNotification: config.Config.Notification.FriendApplication,
constant.FriendApplicationNotification: config.Config.Notification.FriendApplicationAdded,
constant.FriendApplicationApprovedNotification: config.Config.Notification.FriendApplicationApproved,
constant.FriendApplicationRejectedNotification: config.Config.Notification.FriendApplicationRejected,
constant.FriendAddedNotification: config.Config.Notification.FriendAdded,

@ -7,7 +7,7 @@ type OptionsOpt func(Options)
func NewOptions(opts ...OptionsOpt) Options {
options := make(map[string]bool, 11)
options[constant.IsNotification] = false
options[constant.IsNotNotification] = false
options[constant.IsSendMsg] = false
options[constant.IsHistory] = false
options[constant.IsPersistent] = false
@ -32,9 +32,9 @@ func WithOptions(options Options, opts ...OptionsOpt) Options {
return options
}
func WithNotification(b bool) OptionsOpt {
func WithNotNotification(b bool) OptionsOpt {
return func(options Options) {
options[constant.IsNotification] = b
options[constant.IsNotNotification] = b
}
}
@ -113,7 +113,7 @@ func (o Options) Is(notification string) bool {
}
func (o Options) IsNotNotification() bool {
return o.Is(constant.IsNotification)
return o.Is(constant.IsNotNotification)
}
func (o Options) IsSendMsg() bool {

@ -170,6 +170,43 @@ func GetHashCode(s string) uint32 {
return crc32.ChecksumIEEE([]byte(s))
}
func MsgIsNotification(msg *sdkws.MsgData) bool {
options := Options(msg.Options)
return !options.IsNotNotification()
}
func GetNotificationConversationID(msg *sdkws.MsgData) string {
switch msg.SessionType {
case constant.SingleChatType:
l := []string{msg.SendID, msg.RecvID}
sort.Strings(l)
return "n_" + strings.Join(l, "_")
case constant.GroupChatType:
return "n_" + msg.GroupID
case constant.SuperGroupChatType:
return "n_" + msg.GroupID
case constant.NotificationChatType:
return "n_" + msg.SendID + "_" + msg.RecvID
}
return ""
}
func GetChatConversationIDByMsg(msg *sdkws.MsgData) string {
switch msg.SessionType {
case constant.SingleChatType:
l := []string{msg.SendID, msg.RecvID}
sort.Strings(l)
return "si_" + strings.Join(l, "_")
case constant.GroupChatType:
return "g_" + msg.GroupID
case constant.SuperGroupChatType:
return "sg_" + msg.GroupID
case constant.NotificationChatType:
return "sn_" + msg.SendID + "_" + msg.RecvID
}
return ""
}
func GetConversationIDByMsg(msg *sdkws.MsgData) string {
options := Options(msg.Options)
switch msg.SessionType {
@ -252,6 +289,13 @@ func GetSelfNotificationConversationID(userID string) []string {
return []string{"n_" + userID + "_" + userID, "si_" + userID + "_" + userID}
}
func GetSeqsBeginEnd(seqs []int64) (int64, int64) {
if len(seqs) == 0 {
return 0, 0
}
return seqs[0], seqs[len(seqs)-1]
}
type MsgBySeq []*sdkws.MsgData
func (s MsgBySeq) Len() int {

Loading…
Cancel
Save