Merge remote-tracking branch 'origin/v3dev' into v3dev

test-v3dev1
withchao 2 years ago
commit 5fd464979d

@ -1,6 +1,7 @@
package tools package tools
import ( import (
"context"
"time" "time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
@ -9,12 +10,14 @@ import (
) )
func (c *MsgTool) ConversationsDestructMsgs() { func (c *MsgTool) ConversationsDestructMsgs() {
log.ZInfo(context.Background(), "start msg destruct cron task")
ctx := mcontext.NewCtx(utils.GetSelfFuncName()) ctx := mcontext.NewCtx(utils.GetSelfFuncName())
conversations, err := c.conversationDatabase.GetConversationIDsNeedDestruct(ctx) conversations, err := c.conversationDatabase.GetConversationIDsNeedDestruct(ctx)
if err != nil { if err != nil {
log.ZError(ctx, "get conversation id need destruct failed", err) log.ZError(ctx, "get conversation id need destruct failed", err)
return return
} }
log.ZDebug(context.Background(), "nums conversations need destruct", "nums", len(conversations))
for _, conversation := range conversations { for _, conversation := range conversations {
log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "msgDestructTime", conversation.MsgDestructTime, "lastMsgDestructTime", conversation.LatestMsgDestructTime) log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "msgDestructTime", conversation.MsgDestructTime, "lastMsgDestructTime", conversation.LatestMsgDestructTime)
seqs, err := c.msgDatabase.UserMsgsDestruct(ctx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime) seqs, err := c.msgDatabase.UserMsgsDestruct(ctx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)

@ -11,7 +11,6 @@ import (
) )
func StartCronTask() error { func StartCronTask() error {
log.ZInfo(context.Background(), "start cron task", "cron config", config.Config.ChatRecordsClearTime)
fmt.Println("cron task start, config", config.Config.ChatRecordsClearTime) fmt.Println("cron task start, config", config.Config.ChatRecordsClearTime)
msgTool, err := InitMsgTool() msgTool, err := InitMsgTool()
if err != nil { if err != nil {
@ -20,11 +19,13 @@ func StartCronTask() error {
c := cron.New() c := cron.New()
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
log.ZInfo(context.Background(), "start chatRecordsClearTime cron task", "cron config", config.Config.ChatRecordsClearTime)
_, err = c.AddFunc(config.Config.ChatRecordsClearTime, msgTool.AllConversationClearMsgAndFixSeq) _, err = c.AddFunc(config.Config.ChatRecordsClearTime, msgTool.AllConversationClearMsgAndFixSeq)
if err != nil { if err != nil {
fmt.Println("start allConversationClearMsgAndFixSeq cron failed", err.Error(), config.Config.ChatRecordsClearTime) fmt.Println("start allConversationClearMsgAndFixSeq cron failed", err.Error(), config.Config.ChatRecordsClearTime)
panic(err) panic(err)
} }
log.ZInfo(context.Background(), "start msgDestruct cron task", "cron config", config.Config.MsgDestructTime)
_, err = c.AddFunc(config.Config.MsgDestructTime, msgTool.ConversationsDestructMsgs) _, err = c.AddFunc(config.Config.MsgDestructTime, msgTool.ConversationsDestructMsgs)
if err != nil { if err != nil {
fmt.Println("start conversationsDestructMsgs cron failed", err.Error(), config.Config.ChatRecordsClearTime) fmt.Println("start conversationsDestructMsgs cron failed", err.Error(), config.Config.ChatRecordsClearTime)

@ -2,7 +2,6 @@ package tools
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"math" "math"
"time" "time"
@ -17,9 +16,11 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mw" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mw"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry/zookeeper" "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry/zookeeper"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/notification" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/notification"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/redis/go-redis/v9"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
) )
@ -32,8 +33,6 @@ type MsgTool struct {
msgNotificationSender *notification.MsgNotificationSender msgNotificationSender *notification.MsgNotificationSender
} }
var errSeq = errors.New("cache max seq and mongo max seq is diff > 10")
func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controller.UserDatabase, func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controller.UserDatabase,
groupDatabase controller.GroupDatabase, conversationDatabase controller.ConversationDatabase, msgNotificationSender *notification.MsgNotificationSender) *MsgTool { groupDatabase controller.GroupDatabase, conversationDatabase controller.ConversationDatabase, msgNotificationSender *notification.MsgNotificationSender) *MsgTool {
return &MsgTool{ return &MsgTool{
@ -103,12 +102,12 @@ func (c *MsgTool) ClearConversationsMsg(ctx context.Context, conversationIDs []s
} }
func (c *MsgTool) checkMaxSeqWithMongo(ctx context.Context, conversationID string, maxSeqCache int64) error { func (c *MsgTool) checkMaxSeqWithMongo(ctx context.Context, conversationID string, maxSeqCache int64) error {
maxSeqMongo, _, err := c.msgDatabase.GetMongoMaxAndMinSeq(ctx, conversationID) minSeqMongo, maxSeqMongo, err := c.msgDatabase.GetMongoMaxAndMinSeq(ctx, conversationID)
if err != nil { if err != nil {
return err return err
} }
if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 { if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 {
return errSeq log.ZError(ctx, "cache max seq and mongo max seq is diff > 10", nil, "maxSeqMongo", maxSeqMongo, "minSeqMongo", minSeqMongo, "maxSeqCache", maxSeqCache, "conversationID", conversationID)
} }
return nil return nil
} }
@ -116,6 +115,9 @@ func (c *MsgTool) checkMaxSeqWithMongo(ctx context.Context, conversationID strin
func (c *MsgTool) checkMaxSeq(ctx context.Context, conversationID string) error { func (c *MsgTool) checkMaxSeq(ctx context.Context, conversationID string) error {
maxSeq, err := c.msgDatabase.GetMaxSeq(ctx, conversationID) maxSeq, err := c.msgDatabase.GetMaxSeq(ctx, conversationID)
if err != nil { if err != nil {
if errs.Unwrap(err) == redis.Nil {
return nil
}
return err return err
} }
if err := c.checkMaxSeqWithMongo(ctx, conversationID, maxSeq); err != nil { if err := c.checkMaxSeqWithMongo(ctx, conversationID, maxSeq); err != nil {

@ -74,7 +74,7 @@ type CommonMsgDatabase interface {
GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error)
UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error
GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (maxSeq, minSeq int64, err error) GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error)
GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
SetSendMsgStatus(ctx context.Context, id string, status int32) error SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error) GetSendMsgStatus(ctx context.Context, id string) (int32, error)
@ -843,7 +843,7 @@ func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context
return return
} }
func (db *commonMsgDatabase) GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (maxSeq, minSeq int64, err error) { func (db *commonMsgDatabase) GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) {
return db.GetMinMaxSeqMongo(ctx, conversationID) return db.GetMinMaxSeqMongo(ctx, conversationID)
} }

@ -82,10 +82,7 @@ func (c *ConversationGorm) GetAllConversationIDs(ctx context.Context) (conversat
} }
func (c *ConversationGorm) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (hasReadSeqs map[string]int64, err error) { func (c *ConversationGorm) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (hasReadSeqs map[string]int64, err error) {
var conversations []*relation.ConversationModel return nil, nil
err = utils.Wrap(c.db(ctx).Where("owner_user_id = ?", ownerUserID).Select("conversation_id", "has_read_seq").Find(&conversations).Error, "")
hasReadSeqs = make(map[string]int64, len(conversations))
return hasReadSeqs, err
} }
func (c *ConversationGorm) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) (conversations []*relation.ConversationModel, err error) { func (c *ConversationGorm) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) (conversations []*relation.ConversationModel, err error) {
@ -93,5 +90,5 @@ func (c *ConversationGorm) GetConversationsByConversationID(ctx context.Context,
} }
func (c *ConversationGorm) GetConversationIDsNeedDestruct(ctx context.Context) (conversations []*relation.ConversationModel, err error) { func (c *ConversationGorm) GetConversationIDsNeedDestruct(ctx context.Context) (conversations []*relation.ConversationModel, err error) {
return conversations, utils.Wrap(c.db(ctx).Where("is_msg_destruct = 1 && UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) && msg_destruct_time != 0").Error, "") return conversations, utils.Wrap(c.db(ctx).Where("is_msg_destruct = 1 && UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) && msg_destruct_time != 0").Find(&conversations).Error, "")
} }

Loading…
Cancel
Save