Merge remote-tracking branch 'origin/errcode' into errcode

test-errcode
skiffer-git 2 years ago
commit c318c62542

@ -8,8 +8,8 @@ import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
) )
func (c *Check) DeleteMessageNotification(ctx context.Context, userID string, seqList []uint32, operationID string) { func (c *Check) DeleteMessageNotification(ctx context.Context, userID string, seqs []int64, operationID string) {
DeleteMessageTips := sdkws.DeleteMessageTips{UserID: userID, SeqList: seqList} DeleteMessageTips := sdkws.DeleteMessageTips{UserID: userID, Seqs: seqs}
c.MessageNotification(ctx, userID, userID, constant.DeleteMessageNotification, &DeleteMessageTips) c.MessageNotification(ctx, userID, userID, constant.DeleteMessageNotification, &DeleteMessageTips)
} }

@ -626,7 +626,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq)
if group.GroupType == constant.SuperGroup { if group.GroupType == constant.SuperGroup {
return nil, constant.ErrGroupTypeNotSupport.Wrap() return nil, constant.ErrGroupTypeNotSupport.Wrap()
} }
user, err := relation.GetUserByUserID(tracelog.GetOpUserID(ctx)) user, err := s.UserCheck.GetUsersInfo(ctx, tracelog.GetOpUserID(ctx))
if err != nil { if err != nil {
return nil, err return nil, err
} }

@ -1,10 +1,10 @@
package cache package cache
import ( import (
"Open_IM/pkg/common/db/relation"
"Open_IM/pkg/common/tracelog" "Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"context" "context"
"encoding/json"
"github.com/dtm-labs/rockscache" "github.com/dtm-labs/rockscache"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"time" "time"
@ -26,6 +26,7 @@ type BlackCache interface {
type BlackCacheRedis struct { type BlackCacheRedis struct {
expireTime time.Duration expireTime time.Duration
rcClient *rockscache.Client rcClient *rockscache.Client
black *relation.BlackGorm
} }
func NewBlackCacheRedis(rdb redis.UniversalClient, blackDB BlackCache, options rockscache.Options) *BlackCacheRedis { func NewBlackCacheRedis(rdb redis.UniversalClient, blackDB BlackCache, options rockscache.Options) *BlackCacheRedis {
@ -40,31 +41,14 @@ func (b *BlackCacheRedis) getBlackIDsKey(ownerUserID string) string {
} }
func (b *BlackCacheRedis) GetBlackIDs(ctx context.Context, userID string) (blackIDs []string, err error) { func (b *BlackCacheRedis) GetBlackIDs(ctx context.Context, userID string) (blackIDs []string, err error) {
getBlackIDList := func() (string, error) { return GetCache(ctx, b.rcClient, b.getBlackIDsKey(userID), b.expireTime, func(ctx context.Context) ([]string, error) {
blackIDs, err := b.blackDB.GetBlackIDs(ctx, userID) return b.black.FindBlackUserIDs(ctx, userID)
if err != nil { })
return "", utils.Wrap(err, "")
}
bytes, err := json.Marshal(blackIDs)
if err != nil {
return "", utils.Wrap(err, "")
}
return string(bytes), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "blackIDList", blackIDs)
}()
blackIDListStr, err := b.rcClient.Fetch(blackListCache+userID, b.expireTime, getBlackIDList)
if err != nil {
return nil, utils.Wrap(err, "")
}
err = json.Unmarshal([]byte(blackIDListStr), &blackIDs)
return blackIDs, utils.Wrap(err, "")
} }
func (b *BlackCacheRedis) DelBlackIDs(ctx context.Context, userID string) (err error) { func (b *BlackCacheRedis) DelBlackIDs(ctx context.Context, userID string) (err error) {
defer func() { defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID) tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID)
}() }()
return b.rcClient.TagAsDeleted(blackListCache + userID) return b.rcClient.TagAsDeleted(b.getBlackIDsKey(userID))
} }

@ -6,10 +6,8 @@ import (
"Open_IM/pkg/common/tracelog" "Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"context" "context"
"encoding/json"
"github.com/dtm-labs/rockscache" "github.com/dtm-labs/rockscache"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"strconv"
"time" "time"
) )
@ -54,6 +52,7 @@ type ConversationCache interface {
type ConversationRedis struct { type ConversationRedis struct {
rcClient *rockscache.Client rcClient *rockscache.Client
expireTime time.Duration
} }
func (c *ConversationRedis) DelUsersConversationIDs(ctx context.Context, userIDList []string) error { func (c *ConversationRedis) DelUsersConversationIDs(ctx context.Context, userIDList []string) error {
@ -88,7 +87,7 @@ func (c *ConversationRedis) getSuperGroupRecvNotNotifyUserIDsKey(groupID string)
return superGroupRecvMsgNotNotifyUserIDsKey + groupID return superGroupRecvMsgNotNotifyUserIDsKey + groupID
} }
func (c *ConversationRedis) GetUserConversationIDs(ctx context.Context, ownerUserID string, f func(userID string) ([]string, error)) (conversationIDs []string, err error) { func (c *ConversationRedis) GetUserConversationIDs(ctx context.Context, ownerUserID string) (conversationIDs []string, err error) {
//getConversationIDs := func() (string, error) { //getConversationIDs := func() (string, error) {
// conversationIDs, err := relation.GetConversationIDsByUserID(ownerUserID) // conversationIDs, err := relation.GetConversationIDsByUserID(ownerUserID)
// if err != nil { // if err != nil {
@ -110,7 +109,7 @@ func (c *ConversationRedis) GetUserConversationIDs(ctx context.Context, ownerUse
//} //}
//return conversationIDs, nil //return conversationIDs, nil
return GetCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), conversationExpireTime, func(ctx context.Context) ([]string, error) { return GetCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), conversationExpireTime, func(ctx context.Context) ([]string, error) {
return f(ownerUserID) panic("implement me")
}) })
} }
@ -172,28 +171,10 @@ func (c *ConversationRedis) DelUserConversationIDs(ctx context.Context, ownerUse
return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationIDsKey(ownerUserID)), "DelUserConversationIDs err") return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationIDsKey(ownerUserID)), "DelUserConversationIDs err")
} }
func (c *ConversationRedis) GetConversation(ctx context.Context, ownerUserID, conversationID string) (conversation *relationTb.Conversation, err error) { func (c *ConversationRedis) GetConversation(ctx context.Context, ownerUserID, conversationID string) (conversation *relationTb.ConversationModel, err error) {
getConversation := func() (string, error) { return GetCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (*relationTb.ConversationModel, error) {
conversation, err := relation.GetConversation(ownerUserID, conversationID) panic("implement me")
if err != nil { })
return "", err
}
bytes, err := json.Marshal(conversation)
if err != nil {
return "", utils.Wrap(err, "conversation Marshal failed")
}
return string(bytes), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationID", conversationID, "conversation", *conversation)
}()
conversationStr, err := c.rcClient.Fetch(c.getConversationKey(ownerUserID, conversationID), c.expireTime, getConversation)
if err != nil {
return nil, err
}
conversation = &relationTb.ConversationModel{}
err = json.Unmarshal([]byte(conversationStr), &conversation)
return conversation, utils.Wrap(err, "Unmarshal failed")
} }
func (c *ConversationRedis) DelConversation(ctx context.Context, ownerUserID, conversationID string) (err error) { func (c *ConversationRedis) DelConversation(ctx context.Context, ownerUserID, conversationID string) (err error) {
@ -237,21 +218,25 @@ func (c *ConversationRedis) GetUserAllConversations(ctx context.Context, ownerUs
} }
func (c *ConversationRedis) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) { func (c *ConversationRedis) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) {
getConversation := func() (string, error) { //getConversation := func() (string, error) {
conversation, err := relation.GetConversation(ownerUserID, conversationID) // conversation, err := relation.GetConversation(ownerUserID, conversationID)
if err != nil { // if err != nil {
return "", err // return "", err
} // }
return strconv.Itoa(int(conversation.RecvMsgOpt)), nil // return strconv.Itoa(int(conversation.RecvMsgOpt)), nil
} //}
defer func() { //defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationID", conversationID, "opt", opt) // tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationID", conversationID, "opt", opt)
}() //}()
optStr, err := c.rcClient.Fetch(c.getConversationKey(ownerUserID, conversationID), c.expireTime, getConversation) //optStr, err := c.rcClient.Fetch(c.getConversationKey(ownerUserID, conversationID), c.expireTime, getConversation)
if err != nil { //if err != nil {
return 0, err // return 0, err
} //}
return strconv.Atoi(optStr) //return strconv.Atoi(optStr)
// panic("implement me")
return GetCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (int, error) {
panic("implement me")
})
} }
func (c *ConversationRedis) DelUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) error { func (c *ConversationRedis) DelUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) error {
@ -259,17 +244,17 @@ func (c *ConversationRedis) DelUserRecvMsgOpt(ctx context.Context, ownerUserID,
} }
func (c *ConversationRedis) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) { func (c *ConversationRedis) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) {
return nil, nil panic("implement me")
} }
func (c *ConversationRedis) DelSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (err error) { func (c *ConversationRedis) DelSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (err error) {
return nil panic("implement me")
} }
func (c *ConversationRedis) GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint32, err error) { func (c *ConversationRedis) GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint32, err error) {
return panic("implement me")
} }
func (c *ConversationRedis) DelSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) { func (c *ConversationRedis) DelSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) {
return panic("implement me")
} }

@ -19,34 +19,42 @@ type ExtendMsgSetCache struct {
rcClient *rockscache.Client rcClient *rockscache.Client
} }
func (e *ExtendMsgSetCache) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, firstModifyTime int64) (extendMsg *unrelation.ExtendMsg, err error) { func (e *ExtendMsgSetCache) getKey(clientMsgID string) string {
getExtendMsg := func() (string, error) { return extendMsgCache + clientMsgID
extendMsg, err := db.DB.GetExtendMsg(sourceID, sessionType, clientMsgID, firstModifyTime)
if err != nil {
return "", utils.Wrap(err, "GetExtendMsgList failed")
} }
bytes, err := json.Marshal(extendMsg)
if err != nil { func (e *ExtendMsgSetCache) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, firstModifyTime int64) (extendMsg *unrelation.ExtendMsgModel, err error) {
return "", utils.Wrap(err, "Marshal failed") //getExtendMsg := func() (string, error) {
} // extendMsg, err := db.DB.GetExtendMsg(sourceID, sessionType, clientMsgID, firstModifyTime)
return string(bytes), nil // if err != nil {
} // return "", utils.Wrap(err, "GetExtendMsgList failed")
defer func() { // }
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "sourceID", sourceID, "sessionType", // bytes, err := json.Marshal(extendMsg)
sessionType, "clientMsgID", clientMsgID, "firstModifyTime", firstModifyTime, "extendMsg", extendMsg) // if err != nil {
}() // return "", utils.Wrap(err, "Marshal failed")
extendMsgStr, err := db.DB.Rc.Fetch(extendMsgCache+clientMsgID, time.Second*30*60, getExtendMsg) // }
if err != nil { // return string(bytes), nil
return nil, utils.Wrap(err, "Fetch failed") //}
} //defer func() {
extendMsg = &mongoDB.ExtendMsg{} // tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "sourceID", sourceID, "sessionType",
err = json.Unmarshal([]byte(extendMsgStr), extendMsg) // sessionType, "clientMsgID", clientMsgID, "firstModifyTime", firstModifyTime, "extendMsg", extendMsg)
return extendMsg, utils.Wrap(err, "Unmarshal failed") //}()
//extendMsgStr, err := db.DB.Rc.Fetch(extendMsgCache+clientMsgID, time.Second*30*60, getExtendMsg)
//if err != nil {
// return nil, utils.Wrap(err, "Fetch failed")
//}
//extendMsg = &mongoDB.ExtendMsg{}
//err = json.Unmarshal([]byte(extendMsgStr), extendMsg)
//return extendMsg, utils.Wrap(err, "Unmarshal failed")
return GetCache(ctx, e.rcClient, e.getKey(clientMsgID), e.expireTime, func(ctx context.Context) (*unrelation.ExtendMsgModel, error) {
panic("")
})
} }
func (e *ExtendMsgSetCache) DelExtendMsg(ctx context.Context, clientMsgID string) (err error) { func (e *ExtendMsgSetCache) DelExtendMsg(ctx context.Context, clientMsgID string) (err error) {
defer func() { defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "clientMsgID", clientMsgID) tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "clientMsgID", clientMsgID)
}() }()
return utils.Wrap(db.DB.Rc.TagAsDeleted(extendMsgCache+clientMsgID), "DelExtendMsg err") return utils.Wrap(e.rcClient.TagAsDeleted(e.getKey(clientMsgID)), "DelExtendMsg err")
} }

@ -56,27 +56,10 @@ func (f *FriendCacheRedis) getFriendKey(ownerUserID, friendUserID string) string
return friendKey + ownerUserID + "-" + friendUserID return friendKey + ownerUserID + "-" + friendUserID
} }
func (f *FriendCacheRedis) GetFriendIDs(ctx context.Context, ownerUserID string, fn func(ctx context.Context, ownerUserID string) (friendIDs []string, err error)) (friendIDs []string, err error) { func (f *FriendCacheRedis) GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) {
getFriendIDs := func() (string, error) { return GetCache(ctx, f.rcClient, f.getFriendIDsKey(ownerUserID), f.expireTime, func(ctx context.Context) ([]string, error) {
friendIDs, err := f.friendDB.GetFriendIDs(ctx, ownerUserID) return f.friendDB.FindFriendUserIDs(ctx, ownerUserID)
if err != nil { })
return "", err
}
bytes, err := json.Marshal(friendIDs)
if err != nil {
return "", utils.Wrap(err, "")
}
return string(bytes), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "friendIDs", friendIDs)
}()
friendIDsStr, err := f.rcClient.Fetch(f.getFriendIDsKey(ownerUserID), f.expireTime, getFriendIDs)
if err != nil {
return nil, err
}
err = json.Unmarshal([]byte(friendIDsStr), &friendIDs)
return friendIDs, utils.Wrap(err, "")
} }
func (f *FriendCacheRedis) DelFriendIDs(ctx context.Context, ownerUserID string) (err error) { func (f *FriendCacheRedis) DelFriendIDs(ctx context.Context, ownerUserID string) (err error) {

@ -139,6 +139,10 @@ func NewRedisClient(rdb redis.UniversalClient) *RedisClient {
return &RedisClient{rdb: rdb} return &RedisClient{rdb: rdb}
} }
func (r *RedisClient) GetClient() redis.UniversalClient {
return r.rdb
}
// Perform seq auto-increment operation of user messages // Perform seq auto-increment operation of user messages
func (r *RedisClient) IncrUserSeq(ctx context.Context, uid string) (int64, error) { func (r *RedisClient) IncrUserSeq(ctx context.Context, uid string) (int64, error) {
key := userIncrSeq + uid key := userIncrSeq + uid

@ -10,38 +10,36 @@ import (
const scanCount = 3000 const scanCount = 3000
//func (rc *RcClient) DelKeys() {
func (rc *RcClient) DelKeys() { // for _, key := range []string{"GROUP_CACHE:", "FRIEND_RELATION_CACHE", "BLACK_LIST_CACHE:", "USER_INFO_CACHE:", "GROUP_INFO_CACHE", groupOwnerIDCache, joinedGroupListCache,
for _, key := range []string{"GROUP_CACHE:", "FRIEND_RELATION_CACHE", "BLACK_LIST_CACHE:", "USER_INFO_CACHE:", "GROUP_INFO_CACHE", groupOwnerIDCache, joinedGroupListCache, // groupMemberInfoCache, groupAllMemberInfoCache, "ALL_FRIEND_INFO_CACHE:"} {
groupMemberInfoCache, groupAllMemberInfoCache, "ALL_FRIEND_INFO_CACHE:"} { // fName := utils.GetSelfFuncName()
fName := utils.GetSelfFuncName() // var cursor uint64
var cursor uint64 // var n int
var n int // for {
for { // var keys []string
var keys []string // var err error
var err error // keys, cursor, err = rc.rdb.Scan(context.Background(), cursor, key+"*", scanCount).Result()
keys, cursor, err = rc.rdb.Scan(context.Background(), cursor, key+"*", scanCount).Result() // if err != nil {
if err != nil { // panic(err.Error())
panic(err.Error()) // }
} // n += len(keys)
n += len(keys) // // for each for redis cluster
// for each for redis cluster // for _, key := range keys {
for _, key := range keys { // if err = rc.rdb.Del(context.Background(), key).Err(); err != nil {
if err = rc.rdb.Del(context.Background(), key).Err(); err != nil { // log.NewError("", fName, key, err.Error())
log.NewError("", fName, key, err.Error()) // err = rc.rdb.Del(context.Background(), key).Err()
err = rc.rdb.Del(context.Background(), key).Err() // if err != nil {
if err != nil { // panic(err.Error())
panic(err.Error()) // }
} // }
} // }
} // if cursor == 0 {
if cursor == 0 { // break
break // }
} // }
} // }
} //}
}
func GetCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) { func GetCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) {
var t T var t T

@ -76,3 +76,10 @@ func (b *BlackGorm) FindOwnerBlacks(ctx context.Context, ownerUserID string, pag
err = utils.Wrap(b.DB.Model(&relation.BlackModel{}).Limit(int(showNumber)).Offset(int(pageNumber*showNumber)).Find(&blacks).Error, "") err = utils.Wrap(b.DB.Model(&relation.BlackModel{}).Limit(int(showNumber)).Offset(int(pageNumber*showNumber)).Find(&blacks).Error, "")
return return
} }
func (b *BlackGorm) FindBlackUserIDs(ctx context.Context, ownerUserID string) (blackUserIDs []string, err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "blackUserIDs", blackUserIDs)
}()
return blackUserIDs, utils.Wrap(b.DB.Model(&relation.BlackModel{}).Where("owner_user_id = ?", blackUserIDs).Pluck("block_user_id", &blackUserIDs).Error, "")
}

@ -133,3 +133,10 @@ func (f *FriendGorm) FindInWhoseFriends(ctx context.Context, friendUserID string
err = utils.Wrap(getDBConn(f.DB, tx).Where("friend_user_id = ? ", friendUserID).Limit(int(showNumber)).Offset(int(pageNumber*showNumber)).Find(&friends).Error, "") err = utils.Wrap(getDBConn(f.DB, tx).Where("friend_user_id = ? ", friendUserID).Limit(int(showNumber)).Offset(int(pageNumber*showNumber)).Find(&friends).Error, "")
return return
} }
func (f *FriendGorm) FindFriendUserIDs(ctx context.Context, ownerUserID string, tx ...any) (friendUserIDs []string, err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "friendUserIDs", friendUserIDs)
}()
return friendUserIDs, utils.Wrap(getDBConn(f.DB, tx).Model(&relation.FriendModel{}).Where("owner_user_id = ? ", ownerUserID).Pluck("friend_user_id", &friendUserIDs).Error, "")
}

Loading…
Cancel
Save