// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tools
import (
"context"
"fmt"
"math"
"math/rand"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/mw"
"github.com/OpenIMSDK/tools/tx"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
"github.com/redis/go-redis/v9"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type MsgTool struct {
msgDatabase controller . CommonMsgDatabase
conversationDatabase controller . ConversationDatabase
userDatabase controller . UserDatabase
groupDatabase controller . GroupDatabase
msgNotificationSender * notification . MsgNotificationSender
}
func NewMsgTool ( msgDatabase controller . CommonMsgDatabase , userDatabase controller . UserDatabase ,
groupDatabase controller . GroupDatabase , conversationDatabase controller . ConversationDatabase , msgNotificationSender * notification . MsgNotificationSender ,
) * MsgTool {
return & MsgTool {
msgDatabase : msgDatabase ,
userDatabase : userDatabase ,
groupDatabase : groupDatabase ,
conversationDatabase : conversationDatabase ,
msgNotificationSender : msgNotificationSender ,
}
}
func InitMsgTool ( ) ( * MsgTool , error ) {
rdb , err := cache . NewRedis ( )
if err != nil {
return nil , err
}
mongo , err := unrelation . NewMongo ( )
if err != nil {
return nil , err
}
discov , err := kdisc . NewDiscoveryRegister ( config . Config . Envs . Discovery )
if err != nil {
return nil , err
}
discov . AddOption ( mw . GrpcClient ( ) , grpc . WithTransportCredentials ( insecure . NewCredentials ( ) ) , grpc . WithDefaultServiceConfig ( fmt . Sprintf ( ` { "LoadBalancingPolicy": "%s"} ` , "round_robin" ) ) )
userDB , err := mgo . NewUserMongo ( mongo . GetDatabase ( ) )
if err != nil {
return nil , err
}
msgDatabase , err := controller . InitCommonMsgDatabase ( rdb , mongo . GetDatabase ( ) )
if err != nil {
return nil , err
}
userMongoDB := unrelation . NewUserMongoDriver ( mongo . GetDatabase ( ) )
ctxTx := tx . NewMongo ( mongo . GetClient ( ) )
userDatabase := controller . NewUserDatabase (
userDB ,
cache . NewUserCacheRedis ( rdb , userDB , cache . GetDefaultOpt ( ) ) ,
ctxTx ,
userMongoDB ,
)
groupDB , err := mgo . NewGroupMongo ( mongo . GetDatabase ( ) )
if err != nil {
return nil , err
}
groupMemberDB , err := mgo . NewGroupMember ( mongo . GetDatabase ( ) )
if err != nil {
return nil , err
}
groupRequestDB , err := mgo . NewGroupRequestMgo ( mongo . GetDatabase ( ) )
if err != nil {
return nil , err
}
conversationDB , err := mgo . NewConversationMongo ( mongo . GetDatabase ( ) )
if err != nil {
return nil , err
}
groupDatabase := controller . NewGroupDatabase ( rdb , groupDB , groupMemberDB , groupRequestDB , ctxTx , nil )
conversationDatabase := controller . NewConversationDatabase (
conversationDB ,
cache . NewConversationRedis ( rdb , cache . GetDefaultOpt ( ) , conversationDB ) ,
ctxTx ,
)
msgRpcClient := rpcclient . NewMessageRpcClient ( discov )
msgNotificationSender := notification . NewMsgNotificationSender ( rpcclient . WithRpcClient ( & msgRpcClient ) )
msgTool := NewMsgTool ( msgDatabase , userDatabase , groupDatabase , conversationDatabase , msgNotificationSender )
return msgTool , nil
}
//func (c *MsgTool) AllConversationClearMsgAndFixSeq() {
// ctx := mcontext.NewCtx(utils.GetSelfFuncName())
// log.ZInfo(ctx, "============================ start del cron task ============================")
// conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx)
// if err != nil {
// log.ZError(ctx, "GetAllConversationIDs failed", err)
// return
// }
// for _, conversationID := range conversationIDs {
// conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID))
// }
// c.ClearConversationsMsg(ctx, conversationIDs)
// log.ZInfo(ctx, "============================ start del cron finished ============================")
//}
func ( c * MsgTool ) AllConversationClearMsgAndFixSeq ( ) {
ctx := mcontext . NewCtx ( utils . GetSelfFuncName ( ) )
log . ZInfo ( ctx , "============================ start del cron task ============================" )
num , err := c . conversationDatabase . GetAllConversationIDsNumber ( ctx )
if err != nil {
log . ZError ( ctx , "GetAllConversationIDsNumber failed" , err )
return
}
const batchNum = 50
log . ZDebug ( ctx , "GetAllConversationIDsNumber" , "num" , num )
if num == 0 {
return
}
count := int ( num / batchNum + num / batchNum / 2 )
if count < 1 {
count = 1
}
maxPage := 1 + num / batchNum
if num % batchNum != 0 {
maxPage ++
}
for i := 0 ; i < count ; i ++ {
pageNumber := rand . Int63 ( ) % maxPage
pagination := & sdkws . RequestPagination {
PageNumber : int32 ( pageNumber ) ,
ShowNumber : batchNum ,
}
conversationIDs , err := c . conversationDatabase . PageConversationIDs ( ctx , pagination )
if err != nil {
log . ZError ( ctx , "PageConversationIDs failed" , err , "pageNumber" , pageNumber )
continue
}
log . ZDebug ( ctx , "PageConversationIDs failed" , "pageNumber" , pageNumber , "conversationIDsNum" , len ( conversationIDs ) , "conversationIDs" , conversationIDs )
if len ( conversationIDs ) == 0 {
continue
}
c . ClearConversationsMsg ( ctx , conversationIDs )
}
log . ZInfo ( ctx , "============================ start del cron finished ============================" )
}
func ( c * MsgTool ) ClearConversationsMsg ( ctx context . Context , conversationIDs [ ] string ) {
for _ , conversationID := range conversationIDs {
if err := c . msgDatabase . DeleteConversationMsgsAndSetMinSeq ( ctx , conversationID , int64 ( config . Config . RetainChatRecords * 24 * 60 * 60 ) ) ; err != nil {
log . ZError ( ctx , "DeleteUserSuperGroupMsgsAndSetMinSeq failed" , err , "conversationID" , conversationID , "DBRetainChatRecords" , config . Config . RetainChatRecords )
}
if err := c . checkMaxSeq ( ctx , conversationID ) ; err != nil {
log . ZError ( ctx , "fixSeq failed" , err , "conversationID" , conversationID )
}
}
}
func ( c * MsgTool ) checkMaxSeqWithMongo ( ctx context . Context , conversationID string , maxSeqCache int64 ) error {
minSeqMongo , maxSeqMongo , err := c . msgDatabase . GetMongoMaxAndMinSeq ( ctx , conversationID )
if err != nil {
return err
}
if math . Abs ( float64 ( maxSeqMongo - maxSeqCache ) ) > 10 {
err = fmt . Errorf ( "cache max seq and mongo max seq is diff > 10, maxSeqMongo:%d,minSeqMongo:%d,maxSeqCache:%d,conversationID:%s" , maxSeqMongo , minSeqMongo , maxSeqCache , conversationID )
return errs . Wrap ( err )
}
return nil
}
func ( c * MsgTool ) checkMaxSeq ( ctx context . Context , conversationID string ) error {
maxSeq , err := c . msgDatabase . GetMaxSeq ( ctx , conversationID )
if err != nil {
if errs . Unwrap ( err ) == redis . Nil {
return nil
}
return err
}
if err := c . checkMaxSeqWithMongo ( ctx , conversationID , maxSeq ) ; err != nil {
return err
}
return nil
}
func ( c * MsgTool ) FixAllSeq ( ctx context . Context ) error {
conversationIDs , err := c . conversationDatabase . GetAllConversationIDs ( ctx )
if err != nil {
return err
}
for _ , conversationID := range conversationIDs {
conversationIDs = append ( conversationIDs , utils . GetNotificationConversationIDByConversationID ( conversationID ) )
}
for _ , conversationID := range conversationIDs {
if err := c . checkMaxSeq ( ctx , conversationID ) ; err != nil {
log . ZWarn ( ctx , "fixSeq failed" , err , "conversationID" , conversationID )
}
}
fmt . Println ( "fix all seq finished" )
return nil
}