feat: cron task

pull/2237/head
withchao 5 months ago
parent 20dfafd218
commit dc57d38856

@ -1,4 +1,2 @@
chatRecordsClearTime: "0 2 * * 3" chatRecordsClearTime: "0 2 * * *"
msgDestructTime: "0 2 * * *"
retainChatRecords: 365 retainChatRecords: 365
enableCronLocker: false

@ -1,6 +1,8 @@
module github.com/openimsdk/open-im-server/v3 module github.com/openimsdk/open-im-server/v3
go 1.20 go 1.21.2
toolchain go1.21.9
require ( require (
firebase.google.com/go v3.13.0+incompatible firebase.google.com/go v3.13.0+incompatible
@ -172,3 +174,8 @@ require (
golang.org/x/crypto v0.21.0 // indirect golang.org/x/crypto v0.21.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect
) )
replace (
github.com/openimsdk/protocol => /Users/chao/Desktop/withchao/protocol
)

@ -536,3 +536,46 @@ func (c *conversationServer) GetConversationNotReceiveMessageUserIDs(ctx context
} }
return &pbconversation.GetConversationNotReceiveMessageUserIDsResp{UserIDs: userIDs}, nil return &pbconversation.GetConversationNotReceiveMessageUserIDsResp{UserIDs: userIDs}, nil
} }
func (c *conversationServer) UpdateConversation(ctx context.Context, req *pbconversation.UpdateConversationReq) (*pbconversation.UpdateConversationResp, error) {
m := make(map[string]any)
if req.RecvMsgOpt != nil {
m["recv_msg_opt"] = req.RecvMsgOpt.Value
}
if req.AttachedInfo != nil {
m["attached_info"] = req.AttachedInfo.Value
}
if req.Ex != nil {
m["ex"] = req.Ex.Value
}
if req.IsPinned != nil {
m["is_pinned"] = req.IsPinned.Value
}
if req.GroupAtType != nil {
m["group_at_type"] = req.GroupAtType.Value
}
if req.MsgDestructTime != nil {
m["msg_destruct_time"] = req.MsgDestructTime.Value
}
if req.IsMsgDestruct != nil {
m["is_msg_destruct"] = req.IsMsgDestruct.Value
}
if req.BurnDuration != nil {
m["burn_duration"] = req.BurnDuration.Value
}
if req.IsPrivateChat != nil {
m["is_private_chat"] = req.IsPrivateChat.Value
}
if req.MinSeq != nil {
m["min_seq"] = req.MinSeq.Value
}
if req.MaxSeq != nil {
m["max_seq"] = req.MaxSeq.Value
}
if len(m) > 0 {
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.UserIDs, req.ConversationID, m); err != nil {
return nil, err
}
}
return &pbconversation.UpdateConversationResp{}, nil
}

@ -0,0 +1,77 @@
package msg
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/wrapperspb"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"strings"
"time"
)
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) {
if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil {
return nil, err
}
if req.Timestamp > time.Now().UnixMilli() {
return nil, errs.ErrArgs.WrapMsg("request millisecond timestamp error")
}
var (
docNum int
msgNum int
start = time.Now()
)
clearMsg := func(ctx context.Context) (bool, error) {
conversationSeqs := make(map[string]struct{})
defer func() {
req := &conversation.UpdateConversationReq{
MsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli()),
}
for conversationID := range conversationSeqs {
req.ConversationID = conversationID
if err := m.Conversation.UpdateConversations(ctx, req); err != nil {
log.ZError(ctx, "update conversation max seq failed", err, "conversationID", conversationID, "msgDestructTime", req.MsgDestructTime)
}
}
}()
msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, 100)
if err != nil {
return false, err
}
if len(msgs) == 0 {
return false, nil
}
for _, msg := range msgs {
index, err := m.MsgDatabase.DeleteDocMsgBefore(ctx, req.Timestamp, msg)
if err != nil {
return false, err
}
if len(index) == 0 {
return false, errs.ErrInternalServer.WrapMsg("delete doc msg failed")
}
docNum++
msgNum += len(index)
conversationID := msg.DocID[:strings.LastIndex(msg.DocID, ":")]
if _, ok := conversationSeqs[conversationID]; !ok {
conversationSeqs[conversationID] = struct{}{}
}
}
return true, nil
}
for {
keep, err := clearMsg(ctx)
if err != nil {
log.ZError(ctx, "clear msg failed", err, "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
return nil, err
}
if !keep {
log.ZInfo(ctx, "clear msg success", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
break
}
log.ZInfo(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
}
return &msg.ClearMsgResp{}, nil
}

@ -1,149 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tools
import (
"context"
"math/rand"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/idutil"
"github.com/openimsdk/tools/utils/stringutil"
)
// func (c *MsgTool) ConversationsDestructMsgs() {
// log.ZInfo(context.Background(), "start msg destruct cron task")
// ctx := mcontext.NewCtx(utils.GetSelfFuncName())
// conversations, err := c.conversationDatabase.GetConversationIDsNeedDestruct(ctx)
// if err != nil {
// log.ZError(ctx, "get conversation id need destruct failed", err)
// return
// }
// log.ZDebug(context.Background(), "nums conversations need destruct", "nums", len(conversations))
// for _, conversation := range conversations {
// ctx = mcontext.NewCtx(utils.GetSelfFuncName() + "-" + utils.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID)
// log.ZDebug(
// ctx,
// "UserMsgsDestruct",
// "conversationID",
// conversation.ConversationID,
// "ownerUserID",
// conversation.OwnerUserID,
// "msgDestructTime",
// conversation.MsgDestructTime,
// "lastMsgDestructTime",
// conversation.LatestMsgDestructTime,
// )
// now := time.Now()
// seqs, err := c.msgDatabase.UserMsgsDestruct(ctx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
// if err != nil {
// log.ZError(ctx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
// continue
// }
// if len(seqs) > 0 {
// if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]interface{}{"latest_msg_destruct_time": now}); err
// != nil {
// log.ZError(ctx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
// continue
// }
// if err := c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs); err != nil {
// log.ZError(ctx, "userDeleteMsgsNotification failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
// }
// }
// }
//}
func (c *MsgTool) ConversationsDestructMsgs() {
log.ZInfo(context.Background(), "start msg destruct cron task")
ctx := mcontext.NewCtx(stringutil.GetSelfFuncName())
num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx)
if err != nil {
log.ZError(ctx, "GetAllConversationIDsNumber failed", err)
return
}
const batchNum = 50
log.ZDebug(ctx, "GetAllConversationIDsNumber", "num", num)
if num == 0 {
return
}
count := int(num/batchNum + num/batchNum/2)
if count < 1 {
count = 1
}
maxPage := 1 + num/batchNum
if num%batchNum != 0 {
maxPage++
}
for i := 0; i < count; i++ {
pageNumber := rand.Int63() % maxPage
pagination := &sdkws.RequestPagination{
PageNumber: int32(pageNumber),
ShowNumber: batchNum,
}
conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination)
if err != nil {
log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber)
continue
}
log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs)
if len(conversationIDs) == 0 {
continue
}
conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, conversationIDs)
if err != nil {
log.ZError(ctx, "GetConversationsByConversationID failed", err, "conversationIDs", conversationIDs)
continue
}
temp := make([]*relation.ConversationModel, 0, len(conversations))
for i, conversation := range conversations {
if conversation.IsMsgDestruct && conversation.MsgDestructTime != 0 && (time.Now().Unix() > (conversation.MsgDestructTime+conversation.LatestMsgDestructTime.Unix()+8*60*60)) ||
conversation.LatestMsgDestructTime.IsZero() {
temp = append(temp, conversations[i])
}
}
for _, conversation := range temp {
ctx = mcontext.NewCtx(stringutil.GetSelfFuncName() + "-" + idutil.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID)
log.ZDebug(
ctx,
"UserMsgsDestruct",
"conversationID",
conversation.ConversationID,
"ownerUserID",
conversation.OwnerUserID,
"msgDestructTime",
conversation.MsgDestructTime,
"lastMsgDestructTime",
conversation.LatestMsgDestructTime,
)
now := time.Now()
seqs, err := c.msgDatabase.UserMsgsDestruct(ctx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
if err != nil {
log.ZError(ctx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue
}
if len(seqs) > 0 {
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]any{"latest_msg_destruct_time": now}); err != nil {
log.ZError(ctx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue
}
c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs)
}
}
}
}

@ -16,8 +16,11 @@ package tools
import ( import (
"context" "context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/db/redisutil" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/tools/mcontext"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
@ -25,7 +28,6 @@ import (
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3" "github.com/robfig/cron/v3"
) )
@ -39,82 +41,43 @@ type CronTaskConfig struct {
} }
func Start(ctx context.Context, config *CronTaskConfig) error { func Start(ctx context.Context, config *CronTaskConfig) error {
log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", config.CronTask.ChatRecordsClearTime, "msgDestructTime", config.CronTask.RetainChatRecords)
log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", if config.CronTask.RetainChatRecords < 1 {
config.CronTask.ChatRecordsClearTime, "msgDestructTime", config.CronTask.MsgDestructTime) return errs.New("msg destruct time must be greater than 1").Wrap()
msgTool, err := InitMsgTool(ctx, config)
if err != nil {
return err
}
msgTool.convertTools()
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
if err != nil {
return err
}
// register cron tasks
var crontab = cron.New()
_, err = crontab.AddFunc(config.CronTask.ChatRecordsClearTime,
cronWrapFunc(config, rdb, "cron_clear_msg_and_fix_seq", msgTool.AllConversationClearMsgAndFixSeq))
if err != nil {
return errs.Wrap(err)
}
_, err = crontab.AddFunc(config.CronTask.MsgDestructTime,
cronWrapFunc(config, rdb, "cron_conversations_destruct_msgs", msgTool.ConversationsDestructMsgs))
if err != nil {
return errs.WrapMsg(err, "cron_conversations_destruct_msgs")
}
// start crontab
crontab.Start()
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)
<-sigs
// stop crontab, Wait for the running task to exit.
cronCtx := crontab.Stop()
select {
case <-cronCtx.Done():
// graceful exit
case <-time.After(15 * time.Second):
// forced exit on timeout
} }
client, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share)
return nil
}
// netlock redis lock.
func netlock(rdb redis.UniversalClient, key string, ttl time.Duration) bool {
value := "used"
ok, err := rdb.SetNX(context.Background(), key, value, ttl).Result() // nolint
if err != nil { if err != nil {
// when err is about redis server, return true. return errs.WrapMsg(err, "failed to register discovery service")
return false
} }
cli := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg)
return ok ctx, exitBy := context.WithCancelCause(context.Background())
} ctx = mcontext.SetOpUserID(ctx, config.Share.IMAdminUserID[0])
go func() {
func cronWrapFunc(config *CronTaskConfig, rdb redis.UniversalClient, key string, fn func()) func() { sigs := make(chan os.Signal, 1)
enableCronLocker := config.CronTask.EnableCronLocker signal.Notify(sigs, syscall.SIGTERM)
return func() { select {
// if don't enable cron-locker, call fn directly. case <-ctx.Done():
if !enableCronLocker { case s := <-sigs:
fn() exitBy(fmt.Errorf("exit signal %s", s))
return
} }
}()
// when acquire redis lock, call fn(). crontab := cron.New()
if netlock(rdb, key, 5*time.Second) { clearFunc := func() {
fn() now := time.Now()
deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords))
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli()))
log.ZInfo(ctx, "clear chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())
if err := cli.ClearMsg(ctx, deltime.UnixMilli()); err != nil {
log.ZError(ctx, "cron clear chat records failed", err, "deltime", deltime, "cont", time.Since(now))
return
} }
log.ZInfo(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now))
}
if _, err := crontab.AddFunc(config.CronTask.ChatRecordsClearTime, clearFunc); err != nil {
return errs.Wrap(err)
} }
log.ZInfo(ctx, "start cron task", "chatRecordsClearTime", config.CronTask.ChatRecordsClearTime)
crontab.Start()
<-ctx.Done()
return context.Cause(ctx)
} }

@ -1,113 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tools
import (
"testing"
"time"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
)
func TestDisLock(t *testing.T) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
assert.Equal(t, true, netlock(rdb, "cron-1", 1*time.Second))
// if exists, get false
assert.Equal(t, false, netlock(rdb, "cron-1", 1*time.Second))
time.Sleep(2 * time.Second)
// wait for key on timeout, get true
assert.Equal(t, true, netlock(rdb, "cron-1", 2*time.Second))
// set different key
assert.Equal(t, true, netlock(rdb, "cron-2", 2*time.Second))
}
//func TestCronWrapFunc(t *testing.T) {
// rdb := redis.NewClient(&redis.Options{})
// defer rdb.Close()
//
// once := sync.Once{}
// done := make(chan struct{}, 1)
// cb := func() {
// once.Do(func() {
// close(done)
// })
// }
//
// start := time.Now()
// key := fmt.Sprintf("cron-%v", rand.Int31())
// crontab := cron.New(cron.WithSeconds())
// crontab.AddFunc("*/1 * * * * *", cronWrapFunc(config.NewGlobalConfig(), rdb, key, cb))
// crontab.Start()
// <-done
//
// dur := time.Since(start)
// assert.LessOrEqual(t, dur.Seconds(), float64(2*time.Second))
// crontab.Stop()
//}
//
//func TestCronWrapFuncWithNetlock(t *testing.T) {
// conf, err := initCfg()
// if err != nil {
// panic(err)
// }
// conf.EnableCronLocker = true
// rdb := redis.NewClient(&redis.Options{})
// defer rdb.Close()
//
// done := make(chan string, 10)
//
// crontab := cron.New(cron.WithSeconds())
//
// key := fmt.Sprintf("cron-%v", rand.Int31())
// crontab.AddFunc("*/1 * * * * *", cronWrapFunc(conf, rdb, key, func() {
// done <- "host1"
// }))
// crontab.AddFunc("*/1 * * * * *", cronWrapFunc(conf, rdb, key, func() {
// done <- "host2"
// }))
// crontab.Start()
//
// time.Sleep(12 * time.Second)
// // the ttl of netlock is 5s, so expected value is 2.
// assert.Equal(t, len(done), 2)
//
// crontab.Stop()
//}
//
//func initCfg() (*config.GlobalConfig, error) {
// const (
// defaultCfgPath = "../../../../../config/config.yaml"
// )
//
// cfgPath := flag.String("c", defaultCfgPath, "Path to the configuration file")
// data, err := os.ReadFile(*cfgPath)
// if err != nil {
// return nil, errs.WrapMsg(err, "ReadFile unmarshal failed")
// }
//
// conf := config.NewGlobalConfig()
// err = yaml.Unmarshal(data, &conf)
// if err != nil {
// return nil, errs.WrapMsg(err, "InitConfig unmarshal failed")
// }
// return conf, nil
//}

@ -1,226 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tools
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/internal/rpc/msg"
"math"
"math/rand"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/stringutil"
"github.com/redis/go-redis/v9"
)
type MsgTool struct {
msgDatabase controller.CommonMsgDatabase
conversationDatabase controller.ConversationDatabase
userDatabase controller.UserDatabase
groupDatabase controller.GroupDatabase
msgNotificationSender *msg.MsgNotificationSender
config *CronTaskConfig
}
func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controller.UserDatabase,
groupDatabase controller.GroupDatabase, conversationDatabase controller.ConversationDatabase,
msgNotificationSender *msg.MsgNotificationSender, config *CronTaskConfig,
) *MsgTool {
return &MsgTool{
msgDatabase: msgDatabase,
userDatabase: userDatabase,
groupDatabase: groupDatabase,
conversationDatabase: conversationDatabase,
msgNotificationSender: msgNotificationSender,
config: config,
}
}
func InitMsgTool(ctx context.Context, config *CronTaskConfig) (*MsgTool, error) {
ch := make(chan int)
<-ch
//mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
//if err != nil {
// return nil, err
//}
//rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
//if err != nil {
// return nil, err
//}
//discov, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share)
//if err != nil {
// return nil, err
//}
//discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
//userDB, err := mgo.NewUserMongo(mgocli.GetDB())
//if err != nil {
// return nil, err
//}
////msgDatabase, err := controller.InitCommonMsgDatabase(rdb, mgocli.GetDB(), config)
//if err != nil {
// return nil, err
//}
//userMongoDB := mgo.NewUserMongoDriver(mgocli.GetDB())
//userDatabase := controller.NewUserDatabase(
// userDB,
// cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt()),
// mgocli.GetTx(),
// userMongoDB,
//)
//groupDB, err := mgo.NewGroupMongo(mgocli.GetDB())
//if err != nil {
// return nil, err
//}
//groupMemberDB, err := mgo.NewGroupMember(mgocli.GetDB())
//if err != nil {
// return nil, err
//}
//groupRequestDB, err := mgo.NewGroupRequestMgo(mgocli.GetDB())
//if err != nil {
// return nil, err
//}
//conversationDB, err := mgo.NewConversationMongo(mgocli.GetDB())
//if err != nil {
// return nil, err
//}
//groupDatabase := controller.NewGroupDatabase(rdb, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), nil)
//conversationDatabase := controller.NewConversationDatabase(
// conversationDB,
// cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB),
// mgocli.GetTx(),
//)
//msgRpcClient := rpcclient.NewMessageRpcClient(discov, config.Share.RpcRegisterName.Msg)
//msgNotificationSender := notification.NewMsgNotificationSender(config, rpcclient.WithRpcClient(&msgRpcClient))
//msgTool := NewMsgTool(msgDatabase, userDatabase, groupDatabase, conversationDatabase, msgNotificationSender, config)
//return msgTool, nil
return nil, nil
}
// func (c *MsgTool) AllConversationClearMsgAndFixSeq() {
// ctx := mcontext.NewCtx(utils.GetSelfFuncName())
// log.ZInfo(ctx, "============================ start del cron task ============================")
// conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx)
// if err != nil {
// log.ZError(ctx, "GetAllConversationIDs failed", err)
// return
// }
// for _, conversationID := range conversationIDs {
// conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID))
// }
// c.ClearConversationsMsg(ctx, conversationIDs)
// log.ZInfo(ctx, "============================ start del cron finished ============================")
//}
func (c *MsgTool) AllConversationClearMsgAndFixSeq() {
ctx := mcontext.NewCtx(stringutil.GetSelfFuncName())
log.ZInfo(ctx, "============================ start del cron task ============================")
num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx)
if err != nil {
log.ZError(ctx, "GetAllConversationIDsNumber failed", err)
return
}
const batchNum = 50
log.ZDebug(ctx, "GetAllConversationIDsNumber", "num", num)
if num == 0 {
return
}
count := int(num/batchNum + num/batchNum/2)
if count < 1 {
count = 1
}
maxPage := 1 + num/batchNum
if num%batchNum != 0 {
maxPage++
}
for i := 0; i < count; i++ {
pageNumber := rand.Int63() % maxPage
pagination := &sdkws.RequestPagination{
PageNumber: int32(pageNumber),
ShowNumber: batchNum,
}
conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination)
if err != nil {
log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber)
continue
}
log.ZDebug(ctx, "PageConversationIDs failed", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs)
if len(conversationIDs) == 0 {
continue
}
c.ClearConversationsMsg(ctx, conversationIDs)
}
log.ZInfo(ctx, "============================ start del cron finished ============================")
}
func (c *MsgTool) ClearConversationsMsg(ctx context.Context, conversationIDs []string) {
for _, conversationID := range conversationIDs {
if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, int64(c.config.CronTask.RetainChatRecords*24*60*60)); err != nil {
log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "conversationID",
conversationID, "DBRetainChatRecords", c.config.CronTask.RetainChatRecords)
}
if err := c.checkMaxSeq(ctx, conversationID); err != nil {
log.ZError(ctx, "fixSeq failed", err, "conversationID", conversationID)
}
}
}
func (c *MsgTool) checkMaxSeqWithMongo(ctx context.Context, conversationID string, maxSeqCache int64) error {
minSeqMongo, maxSeqMongo, err := c.msgDatabase.GetMongoMaxAndMinSeq(ctx, conversationID)
if err != nil {
return err
}
if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 {
err = fmt.Errorf("cache max seq and mongo max seq is diff > 10, maxSeqMongo:%d,minSeqMongo:%d,maxSeqCache:%d,conversationID:%s", maxSeqMongo, minSeqMongo, maxSeqCache, conversationID)
return errs.Wrap(err)
}
return nil
}
func (c *MsgTool) checkMaxSeq(ctx context.Context, conversationID string) error {
maxSeq, err := c.msgDatabase.GetMaxSeq(ctx, conversationID)
if err != nil {
if errs.Unwrap(err) == redis.Nil {
return nil
}
return err
}
if err := c.checkMaxSeqWithMongo(ctx, conversationID, maxSeq); err != nil {
return err
}
return nil
}
func (c *MsgTool) FixAllSeq(ctx context.Context) error {
conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx)
if err != nil {
return err
}
for _, conversationID := range conversationIDs {
conversationIDs = append(conversationIDs, conversationutil.GetNotificationConversationIDByConversationID(conversationID))
}
for _, conversationID := range conversationIDs {
if err := c.checkMaxSeq(ctx, conversationID); err != nil {
log.ZWarn(ctx, "fixSeq failed", err, "conversationID", conversationID)
}
}
fmt.Println("fix all seq finished")
return nil
}

@ -1,46 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tools
import (
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
)
func (c *MsgTool) convertTools() {
ctx := mcontext.NewCtx("convert")
conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx)
if err != nil {
log.ZError(ctx, "get all conversation ids failed", err)
return
}
for _, conversationID := range conversationIDs {
conversationIDs = append(conversationIDs, msgprocessor.GetNotificationConversationIDByConversationID(conversationID))
}
_, userIDs, err := c.userDatabase.GetAllUserID(ctx, nil)
if err != nil {
log.ZError(ctx, "get all user ids failed", err)
return
}
log.ZDebug(ctx, "all userIDs", "len userIDs", len(userIDs))
for _, userID := range userIDs {
conversationIDs = append(conversationIDs, msgprocessor.GetConversationIDBySessionType(constant.SingleChatType, userID, userID))
conversationIDs = append(conversationIDs, msgprocessor.GetNotificationConversationID(constant.SingleChatType, userID, userID))
}
log.ZDebug(ctx, "all conversationIDs", "len userIDs", len(conversationIDs))
c.msgDatabase.ConvertMsgsDocLen(ctx, conversationIDs)
}

@ -46,7 +46,6 @@ func IsAppManagerUid(ctx context.Context, imAdminUserID []string) bool {
} }
func CheckAdmin(ctx context.Context, imAdminUserID []string) error { func CheckAdmin(ctx context.Context, imAdminUserID []string) error {
if datautil.Contain(mcontext.GetOpUserID(ctx), imAdminUserID...) { if datautil.Contain(mcontext.GetOpUserID(ctx), imAdminUserID...) {
return nil return nil
} }

@ -107,9 +107,7 @@ type API struct {
type CronTask struct { type CronTask struct {
ChatRecordsClearTime string `mapstructure:"chatRecordsClearTime"` ChatRecordsClearTime string `mapstructure:"chatRecordsClearTime"`
MsgDestructTime string `mapstructure:"msgDestructTime"`
RetainChatRecords int `mapstructure:"retainChatRecords"` RetainChatRecords int `mapstructure:"retainChatRecords"`
EnableCronLocker bool `yaml:"enableCronLocker"`
} }
type OfflinePushConfig struct { type OfflinePushConfig struct {

@ -17,6 +17,8 @@ package controller
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"strings"
"time" "time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
@ -102,6 +104,10 @@ type CommonMsgDatabase interface {
RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*relation.UserCount, dateCount map[string]int64, err error) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*relation.UserCount, dateCount map[string]int64, err error)
RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*relation.GroupCount, dateCount map[string]int64, err error) RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*relation.GroupCount, dateCount map[string]int64, err error)
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
// clear msg
GetBeforeMsg(ctx context.Context, ts int64, limit int) ([]*relation.MsgDocModel, error)
DeleteDocMsgBefore(ctx context.Context, ts int64, doc *relation.MsgDocModel) ([]int, error)
} }
func NewCommonMsgDatabase(msgDocModel relation.MsgDocModelInterface, msg cache.MsgCache, seq cache.SeqCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { func NewCommonMsgDatabase(msgDocModel relation.MsgDocModelInterface, msg cache.MsgCache, seq cache.SeqCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
@ -1047,3 +1053,72 @@ func (db *commonMsgDatabase) FindOneByDocIDs(ctx context.Context, conversationID
func (db *commonMsgDatabase) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) { func (db *commonMsgDatabase) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) {
db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs) db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs)
} }
func (db *commonMsgDatabase) GetBeforeMsg(ctx context.Context, ts int64, limit int) ([]*relation.MsgDocModel, error) {
return db.msgDocDatabase.GetBeforeMsg(ctx, ts, limit)
}
func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *relation.MsgDocModel) ([]int, error) {
var notNull int
index := make([]int, 0, len(doc.Msg))
for i, message := range doc.Msg {
if message.Msg != nil {
notNull++
if message.Msg.SendTime < ts {
index = append(index, i)
}
}
}
if len(index) == 0 {
return index, nil
}
maxSeq := doc.Msg[index[len(index)-1]].Msg.Seq
conversationID := doc.DocID[:strings.LastIndex(doc.DocID, ":")]
if err := db.setMinSeq(ctx, conversationID, maxSeq+1); err != nil {
return index, err
}
if len(index) == notNull {
return index, db.msgDocDatabase.DeleteDoc(ctx, doc.DocID)
} else {
return index, db.msgDocDatabase.DeleteMsgByIndex(ctx, doc.DocID, index)
}
}
//func (db *commonMsgDatabase) ClearMsg(ctx context.Context, ts int64) (err error) {
// var (
// docNum int
// msgNum int
// start = time.Now()
// )
// for {
// msgs, err := db.msgDocDatabase.GetBeforeMsg(ctx, ts, 100)
// if err != nil {
// return err
// }
// if len(msgs) == 0 {
// return nil
// }
// for _, msg := range msgs {
// num, err := db.deleteOneMsg(ctx, ts, msg)
// if err != nil {
// return err
// }
// docNum++
// msgNum += num
// }
// }
//}
func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID string, seq int64) error {
dbSeq, err := db.seq.GetMinSeq(ctx, conversationID)
if err != nil {
if errors.Is(errs.Unwrap(err), redis.Nil) {
return nil
}
return err
}
if dbSeq >= seq {
return nil
}
return db.seq.SetMinSeq(ctx, conversationID, seq)
}

@ -56,7 +56,16 @@ func (c *ConversationMgo) Delete(ctx context.Context, groupIDs []string) (err er
} }
func (c *ConversationMgo) UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error) { func (c *ConversationMgo) UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error) {
res, err := mongoutil.UpdateMany(ctx, c.coll, bson.M{"owner_user_id": bson.M{"$in": userIDs}, "conversation_id": conversationID}, bson.M{"$set": args}) if len(args) == 0 {
return 0, nil
}
filter := bson.M{
"conversation_id": conversationID,
}
if len(userIDs) > 0 {
filter["owner_user_id"] = bson.M{"$in": userIDs}
}
res, err := mongoutil.UpdateMany(ctx, c.coll, filter, bson.M{"$set": args})
if err != nil { if err != nil {
return 0, err return 0, err
} }

@ -896,3 +896,89 @@ func (m *MsgMgo) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string
} }
} }
} }
func (m *MsgMgo) GetBeforeMsg(ctx context.Context, ts int64, limit int) ([]*relation.MsgDocModel, error) {
return mongoutil.Aggregate[*relation.MsgDocModel](ctx, m.coll, []bson.M{
{
"$match": bson.M{
"msgs.msg.send_time": bson.M{
"$lt": ts,
},
},
},
{
"$project": bson.M{
"_id": 0,
"doc_id": 1,
"msgs.msg.send_time": 1,
"msgs.msg.seq": 1,
},
},
{
"$limit": limit,
},
})
}
func (m *MsgMgo) DeleteMsgByIndex(ctx context.Context, docID string, index []int) error {
if len(index) == 0 {
return nil
}
model := &relation.MsgInfoModel{DelList: []string{}}
set := make(map[string]any)
for i := range index {
set[fmt.Sprintf("msgs.%d", i)] = model
}
return mongoutil.UpdateOne(ctx, m.coll, bson.M{"doc_id": docID}, bson.M{"$set": set}, true)
}
//func (m *MsgMgo) ClearMsg(ctx context.Context, t time.Time) (int64, error) {
// ts := t.UnixMilli()
// var count int64
// for {
// msgs, err := m.GetBeforeMsg(ctx, ts, 100)
// if err != nil {
// return count, err
// }
// if len(msgs) == 0 {
// return count, nil
// }
// for _, msg := range msgs {
// num, err := m.deleteOneMsg(ctx, ts, msg)
// count += num
// if err != nil {
// return count, err
// }
// }
// }
//}
func (m *MsgMgo) DeleteDoc(ctx context.Context, docID string) error {
return mongoutil.DeleteOne(ctx, m.coll, bson.M{"doc_id": docID})
}
//func (m *MsgMgo) DeleteDocMsg(ctx context.Context, ts int64, doc *relation.MsgDocModel) (int64, error) {
// var notNull int
// index := make([]int, 0, len(doc.Msg))
// for i, message := range doc.Msg {
// if message.Msg != nil {
// notNull++
// if message.Msg.SendTime < ts {
// index = append(index, i)
// }
// }
// }
// if len(index) == 0 {
// return 0, errs.New("no msg to delete").WrapMsg("deleteOneMsg", "docID", doc.DocID)
// }
// if len(index) == notNull {
// if err := m.DeleteDoc(ctx, doc.DocID); err != nil {
// return 0, err
// }
// } else {
// if err := m.setNullMsg(ctx, doc.DocID, index); err != nil {
// return 0, err
// }
// }
// return int64(len(index)), nil
//}

@ -0,0 +1,71 @@
package mgo
import (
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"go.mongodb.org/mongo-driver/mongo"
)
func NewMsgDelMongo(db *mongo.Database) *MsgDelMgo {
return &MsgDelMgo{coll: db.Collection(new(relation.MsgDocModel).TableName())}
}
type MsgDelMgo struct {
coll *mongo.Collection
model relation.MsgDocModel
}
//func (m *MsgDelMgo) getEmptyMsg(ctx context.Context, limit int) ([]string, error) {
// return mongoutil.Aggregate[string](ctx, m.coll, []bson.M{
// {
// "$match": bson.M{
// "msgs": bson.M{
// "$exists": true,
// },
// },
// },
// {
// "$project": bson.M{
// "_id": 0,
// "doc_id": 1,
// "all_null_msgs": bson.M{
// "$not": []bson.M{
// {
// "$anyElementTrue": bson.M{
// "$map": bson.M{
// "input": "$msgs",
// "as": "item",
// "in": "$$item.msg",
// },
// },
// },
// },
// },
// },
// },
// {
// "$project": bson.M{
// "doc_id": 1,
// },
// },
// {
// "$limit": limit,
// },
// })
//}
//
//func (m *MsgDelMgo) deleteEmptyMsgs(ctx context.Context) error {
// for {
// docIDs, err := m.getEmptyMsg(ctx, 100)
// if err != nil {
// return err
// }
// if len(docIDs) == 0 {
// return nil
// }
// for _, docID := range docIDs {
// if err := m.deleteEmptyMsg(ctx, docID); err != nil {
// return err
// }
// }
// }
//}

@ -0,0 +1,46 @@
package mgo
import (
"strings"
"testing"
)
func TestName(t *testing.T) {
//conf := config.Mongo{
// Address: []string{"localhost:37017"},
// Username: "openIM",
// Password: "openIM123",
// Database: "demo",
//}
//conf.URI = `mongodb://openIM:openIM123@localhost:37017/demo?maxPoolSize=100&authSource=admin`
//cli, err := mongoutil.NewMongoDB(context.Background(), conf.Build())
//if err != nil {
// panic(err)
//}
//msg, _ := NewMsgMongo(cli.GetDB())
//count, err := msg.ClearMsg(context.Background(), time.Now().Add(-time.Hour*24*61))
//if err != nil {
// t.Log("error", err)
// return
//}
//t.Log("count", count)
s := `si_5300327160_9129042887:0123`
t.Log(s[:strings.LastIndex(s, ":")])
}
func TestName2(t *testing.T) {
m := map[string]string{
"1": "1",
"2": "2",
}
t.Log(m)
clear(m)
t.Log(m)
a := []string{"1", "2"}
t.Log(a)
clear(a)
t.Log(a)
}

@ -116,6 +116,12 @@ type MsgDocModelInterface interface {
RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*UserCount, dateCount map[string]int64, err error) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*UserCount, dateCount map[string]int64, err error)
RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*GroupCount, dateCount map[string]int64, err error) RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*GroupCount, dateCount map[string]int64, err error)
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
DeleteDoc(ctx context.Context, docID string) error
DeleteMsgByIndex(ctx context.Context, docID string, index []int) error
GetBeforeMsg(ctx context.Context, ts int64, limit int) ([]*MsgDocModel, error)
//ClearMsg(ctx context.Context, t time.Time) (int64, error)
} }
func (MsgDocModel) TableName() string { func (MsgDocModel) TableName() string {

@ -82,6 +82,11 @@ func (c *ConversationRpcClient) SetConversations(ctx context.Context, userIDs []
return err return err
} }
func (c *ConversationRpcClient) UpdateConversations(ctx context.Context, conversation *pbconversation.UpdateConversationReq) error {
_, err := c.Client.UpdateConversation(ctx, conversation)
return err
}
func (c *ConversationRpcClient) GetConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { func (c *ConversationRpcClient) GetConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) {
resp, err := c.Client.GetConversationIDs(ctx, &pbconversation.GetConversationIDsReq{UserID: ownerUserID}) resp, err := c.Client.GetConversationIDs(ctx, &pbconversation.GetConversationIDsReq{UserID: ownerUserID})
if err != nil { if err != nil {

@ -212,6 +212,11 @@ func (m *MessageRpcClient) GetConversationMaxSeq(ctx context.Context, conversati
return resp.MaxSeq, nil return resp.MaxSeq, nil
} }
func (m *MessageRpcClient) ClearMsg(ctx context.Context, ts int64) error {
_, err := m.Client.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: ts})
return err
}
type NotificationSender struct { type NotificationSender struct {
contentTypeConf map[int32]config.NotificationConfig contentTypeConf map[int32]config.NotificationConfig
sessionTypeConf map[int32]int32 sessionTypeConf map[int32]int32

Loading…
Cancel
Save