Merge remote-tracking branch 'origin/errcode' into errcode

test-errcode
withchao 2 years ago
commit 9e7bc1b769

@ -10,8 +10,6 @@ import (
"github.com/robfig/cron/v3"
)
const moduleName = "cron"
func StartCronTask() error {
log.ZInfo(context.Background(), "start cron task", "cron config", config.Config.Mongo.ChatRecordsClearTime)
fmt.Println("cron task start, config", config.Config.Mongo.ChatRecordsClearTime)

@ -19,18 +19,19 @@ import (
type MsgTool struct {
msgDatabase controller.CommonMsgDatabase
conversationDatabase controller.ConversationDataBase
conversationDatabase controller.ConversationDatabase
userDatabase controller.UserDatabase
groupDatabase controller.GroupDatabase
}
var errSeq = errors.New("cache max seq and mongo max seq is diff > 10")
func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controller.UserDatabase, groupDatabase controller.GroupDatabase) *MsgTool {
func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controller.UserDatabase, groupDatabase controller.GroupDatabase, conversationDatabase controller.ConversationDatabase) *MsgTool {
return &MsgTool{
msgDatabase: msgDatabase,
userDatabase: userDatabase,
groupDatabase: groupDatabase,
conversationDatabase: conversationDatabase,
}
}
@ -51,7 +52,8 @@ func InitMsgTool() (*MsgTool, error) {
msgDatabase := controller.InitCommonMsgDatabase(rdb, mongo.GetDatabase())
userDatabase := controller.NewUserDatabase(userDB, cache.NewUserCacheRedis(rdb, relation.NewUserGorm(db), cache.GetDefaultOpt()), tx.NewGorm(db))
groupDatabase := controller.InitGroupDatabase(db, rdb, mongo.GetDatabase())
msgTool := NewMsgTool(msgDatabase, userDatabase, groupDatabase)
conversationDatabase := controller.NewConversationDatabase(relation.NewConversationGorm(db), cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), relation.NewConversationGorm(db)), tx.NewGorm(db))
msgTool := NewMsgTool(msgDatabase, userDatabase, groupDatabase, conversationDatabase)
return msgTool, nil
}

@ -36,20 +36,20 @@ type ConversationDatabase interface {
}
func NewConversationDatabase(conversation relationTb.ConversationModelInterface, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
return &ConversationDataBase{
return &conversationDatabase{
conversationDB: conversation,
cache: cache,
tx: tx,
}
}
type ConversationDataBase struct {
type conversationDatabase struct {
conversationDB relationTb.ConversationModelInterface
cache cache.ConversationCache
tx tx.Tx
}
func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) (err error) {
func (c *conversationDatabase) SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) (err error) {
cache := c.cache.NewCache()
if err := c.tx.Transaction(func(tx any) error {
conversationTx := c.conversationDB.NewTx(tx)
@ -95,7 +95,7 @@ func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context,
return cache.ExecDel(ctx)
}
func (c *ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) error {
func (c *conversationDatabase) UpdateUsersConversationFiled(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) error {
_, err := c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, args)
if err != nil {
return err
@ -103,7 +103,7 @@ func (c *ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context,
return c.cache.DelUsersConversation(conversationID, userIDs...).ExecDel(ctx)
}
func (c *ConversationDataBase) CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error {
func (c *conversationDatabase) CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error {
if err := c.conversationDB.Create(ctx, conversations); err != nil {
return err
}
@ -114,7 +114,7 @@ func (c *ConversationDataBase) CreateConversation(ctx context.Context, conversat
return c.cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).ExecDel(ctx)
}
func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationTb.ConversationModel) error {
func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationTb.ConversationModel) error {
cache := c.cache.NewCache()
if err := c.tx.Transaction(func(tx any) error {
conversationTx := c.conversationDB.NewTx(tx)
@ -150,19 +150,19 @@ func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Con
return c.cache.ExecDel(ctx)
}
func (c *ConversationDataBase) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) {
func (c *conversationDatabase) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) {
return c.cache.GetConversations(ctx, ownerUserID, conversationIDs)
}
func (c *ConversationDataBase) GetConversation(ctx context.Context, ownerUserID string, conversationID string) (*relationTb.ConversationModel, error) {
func (c *conversationDatabase) GetConversation(ctx context.Context, ownerUserID string, conversationID string) (*relationTb.ConversationModel, error) {
return c.cache.GetConversation(ctx, ownerUserID, conversationID)
}
func (c *ConversationDataBase) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) {
func (c *conversationDatabase) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) {
return c.cache.GetUserAllConversations(ctx, ownerUserID)
}
func (c *ConversationDataBase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error {
func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error {
cache := c.cache.NewCache()
if err := c.tx.Transaction(func(tx any) error {
var conversationIDs []string
@ -208,11 +208,11 @@ func (c *ConversationDataBase) SetUserConversations(ctx context.Context, ownerUs
return cache.ExecDel(ctx)
}
func (c *ConversationDataBase) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) {
func (c *conversationDatabase) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) {
return c.cache.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
}
func (c *ConversationDataBase) CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error {
func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error {
cache := c.cache.NewCache()
conversationID := utils.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID)
if err := c.tx.Transaction(func(tx any) error {
@ -247,18 +247,18 @@ func (c *ConversationDataBase) CreateGroupChatConversation(ctx context.Context,
return cache.ExecDel(ctx)
}
func (c *ConversationDataBase) GetConversationIDs(ctx context.Context, userID string) ([]string, error) {
func (c *conversationDatabase) GetConversationIDs(ctx context.Context, userID string) ([]string, error) {
return c.cache.GetUserConversationIDs(ctx, userID)
}
func (c *ConversationDataBase) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) {
func (c *conversationDatabase) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) {
return c.cache.GetUserConversationIDsHash(ctx, ownerUserID)
}
func (c *ConversationDataBase) GetAllConversationIDs(ctx context.Context) ([]string, error) {
func (c *conversationDatabase) GetAllConversationIDs(ctx context.Context) ([]string, error) {
return c.conversationDB.GetAllConversationIDs(ctx)
}
func (c *ConversationDataBase) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) {
func (c *conversationDatabase) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) {
return c.cache.GetUserAllHasReadSeqs(ctx, ownerUserID)
}

@ -568,7 +568,7 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio
}
return delStruct.getSetMinSeq() + 1, nil
}
log.ZDebug(ctx, "conversationID", conversationID, "index:", index, "docID", msgs.DocID, "len", len(msgs.Msg))
log.ZDebug(ctx, "doc info", "conversationID", conversationID, "index", index, "docID", msgs.DocID, "len", len(msgs.Msg))
if int64(len(msgs.Msg)) > db.msg.GetSingleGocMsgNum() {
log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgs.Msg), "docID:", msgs.DocID)
}

Loading…
Cancel
Save