feat: optimize corn tasks (#2237)

* fix: GroupApplicationAcceptedNotification

* fix: GroupApplicationAcceptedNotification

* fix: NotificationUserInfoUpdate

* cicd: robot automated Change

* fix: component

* fix: getConversationInfo

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

---------

Co-authored-by: withchao <withchao@users.noreply.github.com>
pull/2183/merge
chao 2 months ago committed by GitHub
parent 25fe09952c
commit 74f4fdb156
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,4 +1,2 @@
chatRecordsClearTime: "0 2 * * 3"
msgDestructTime: "0 2 * * *"
chatRecordsClearTime: "0 2 * * *"
retainChatRecords: 365
enableCronLocker: false

@ -13,7 +13,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.64
github.com/openimsdk/protocol v0.0.65
github.com/openimsdk/tools v0.0.49-alpha.2
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0

@ -281,8 +281,8 @@ github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
github.com/openimsdk/gomake v0.0.9 h1:ouf25ygN2PMQ68Gfgns/EQRPiLPnp+77SIr68GfE+n4=
github.com/openimsdk/gomake v0.0.9/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.64 h1:OrjSs4CgKN9VLvJvrAsc37O7Ru0E0VllXZQSmG/ab7U=
github.com/openimsdk/protocol v0.0.64/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/protocol v0.0.65 h1:SPT9qyUsFRTTKSKb/FjpS+xr6sxz/Kbnu+su1bxYagc=
github.com/openimsdk/protocol v0.0.65/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.49-alpha.2 h1:8IfV6o2ySU7C54sh/MG7ctEp1h3lSNe03OCUDWSk5Ws=
github.com/openimsdk/tools v0.0.49-alpha.2/go.mod h1:P4oGP1Pd+d4ctbLD5U/XQTgl8yu8Hd3skx640Fr69ko=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=

@ -536,3 +536,46 @@ func (c *conversationServer) GetConversationNotReceiveMessageUserIDs(ctx context
}
return &pbconversation.GetConversationNotReceiveMessageUserIDsResp{UserIDs: userIDs}, nil
}
func (c *conversationServer) UpdateConversation(ctx context.Context, req *pbconversation.UpdateConversationReq) (*pbconversation.UpdateConversationResp, error) {
m := make(map[string]any)
if req.RecvMsgOpt != nil {
m["recv_msg_opt"] = req.RecvMsgOpt.Value
}
if req.AttachedInfo != nil {
m["attached_info"] = req.AttachedInfo.Value
}
if req.Ex != nil {
m["ex"] = req.Ex.Value
}
if req.IsPinned != nil {
m["is_pinned"] = req.IsPinned.Value
}
if req.GroupAtType != nil {
m["group_at_type"] = req.GroupAtType.Value
}
if req.MsgDestructTime != nil {
m["msg_destruct_time"] = req.MsgDestructTime.Value
}
if req.IsMsgDestruct != nil {
m["is_msg_destruct"] = req.IsMsgDestruct.Value
}
if req.BurnDuration != nil {
m["burn_duration"] = req.BurnDuration.Value
}
if req.IsPrivateChat != nil {
m["is_private_chat"] = req.IsPrivateChat.Value
}
if req.MinSeq != nil {
m["min_seq"] = req.MinSeq.Value
}
if req.MaxSeq != nil {
m["max_seq"] = req.MaxSeq.Value
}
if len(m) > 0 {
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.UserIDs, req.ConversationID, m); err != nil {
return nil, err
}
}
return &pbconversation.UpdateConversationResp{}, nil
}

@ -0,0 +1,77 @@
package msg
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/wrapperspb"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"strings"
"time"
)
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) {
if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil {
return nil, err
}
if req.Timestamp > time.Now().UnixMilli() {
return nil, errs.ErrArgs.WrapMsg("request millisecond timestamp error")
}
var (
docNum int
msgNum int
start = time.Now()
)
clearMsg := func(ctx context.Context) (bool, error) {
conversationSeqs := make(map[string]struct{})
defer func() {
req := &conversation.UpdateConversationReq{
MsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli()),
}
for conversationID := range conversationSeqs {
req.ConversationID = conversationID
if err := m.Conversation.UpdateConversations(ctx, req); err != nil {
log.ZError(ctx, "update conversation max seq failed", err, "conversationID", conversationID, "msgDestructTime", req.MsgDestructTime)
}
}
}()
msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, 100)
if err != nil {
return false, err
}
if len(msgs) == 0 {
return false, nil
}
for _, msg := range msgs {
index, err := m.MsgDatabase.DeleteDocMsgBefore(ctx, req.Timestamp, msg)
if err != nil {
return false, err
}
if len(index) == 0 {
return false, errs.ErrInternalServer.WrapMsg("delete doc msg failed")
}
docNum++
msgNum += len(index)
conversationID := msg.DocID[:strings.LastIndex(msg.DocID, ":")]
if _, ok := conversationSeqs[conversationID]; !ok {
conversationSeqs[conversationID] = struct{}{}
}
}
return true, nil
}
for {
keep, err := clearMsg(ctx)
if err != nil {
log.ZError(ctx, "clear msg failed", err, "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
return nil, err
}
if !keep {
log.ZInfo(ctx, "clear msg success", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
break
}
log.ZInfo(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
}
return &msg.ClearMsgResp{}, nil
}

@ -1,149 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tools
import (
"context"
"math/rand"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/idutil"
"github.com/openimsdk/tools/utils/stringutil"
)
// func (c *MsgTool) ConversationsDestructMsgs() {
// log.ZInfo(context.Background(), "start msg destruct cron task")
// ctx := mcontext.NewCtx(utils.GetSelfFuncName())
// conversations, err := c.conversationDatabase.GetConversationIDsNeedDestruct(ctx)
// if err != nil {
// log.ZError(ctx, "get conversation id need destruct failed", err)
// return
// }
// log.ZDebug(context.Background(), "nums conversations need destruct", "nums", len(conversations))
// for _, conversation := range conversations {
// ctx = mcontext.NewCtx(utils.GetSelfFuncName() + "-" + utils.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID)
// log.ZDebug(
// ctx,
// "UserMsgsDestruct",
// "conversationID",
// conversation.ConversationID,
// "ownerUserID",
// conversation.OwnerUserID,
// "msgDestructTime",
// conversation.MsgDestructTime,
// "lastMsgDestructTime",
// conversation.LatestMsgDestructTime,
// )
// now := time.Now()
// seqs, err := c.msgDatabase.UserMsgsDestruct(ctx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
// if err != nil {
// log.ZError(ctx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
// continue
// }
// if len(seqs) > 0 {
// if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]interface{}{"latest_msg_destruct_time": now}); err
// != nil {
// log.ZError(ctx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
// continue
// }
// if err := c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs); err != nil {
// log.ZError(ctx, "userDeleteMsgsNotification failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
// }
// }
// }
//}
func (c *MsgTool) ConversationsDestructMsgs() {
log.ZInfo(context.Background(), "start msg destruct cron task")
ctx := mcontext.NewCtx(stringutil.GetSelfFuncName())
num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx)
if err != nil {
log.ZError(ctx, "GetAllConversationIDsNumber failed", err)
return
}
const batchNum = 50
log.ZDebug(ctx, "GetAllConversationIDsNumber", "num", num)
if num == 0 {
return
}
count := int(num/batchNum + num/batchNum/2)
if count < 1 {
count = 1
}
maxPage := 1 + num/batchNum
if num%batchNum != 0 {
maxPage++
}
for i := 0; i < count; i++ {
pageNumber := rand.Int63() % maxPage
pagination := &sdkws.RequestPagination{
PageNumber: int32(pageNumber),
ShowNumber: batchNum,
}
conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination)
if err != nil {
log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber)
continue
}
log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs)
if len(conversationIDs) == 0 {
continue
}
conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, conversationIDs)
if err != nil {
log.ZError(ctx, "GetConversationsByConversationID failed", err, "conversationIDs", conversationIDs)
continue
}
temp := make([]*relation.ConversationModel, 0, len(conversations))
for i, conversation := range conversations {
if conversation.IsMsgDestruct && conversation.MsgDestructTime != 0 && (time.Now().Unix() > (conversation.MsgDestructTime+conversation.LatestMsgDestructTime.Unix()+8*60*60)) ||
conversation.LatestMsgDestructTime.IsZero() {
temp = append(temp, conversations[i])
}
}
for _, conversation := range temp {
ctx = mcontext.NewCtx(stringutil.GetSelfFuncName() + "-" + idutil.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID)
log.ZDebug(
ctx,
"UserMsgsDestruct",
"conversationID",
conversation.ConversationID,
"ownerUserID",
conversation.OwnerUserID,
"msgDestructTime",
conversation.MsgDestructTime,
"lastMsgDestructTime",
conversation.LatestMsgDestructTime,
)
now := time.Now()
seqs, err := c.msgDatabase.UserMsgsDestruct(ctx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
if err != nil {
log.ZError(ctx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue
}
if len(seqs) > 0 {
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]any{"latest_msg_destruct_time": now}); err != nil {
log.ZError(ctx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue
}
c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs)
}
}
}
}

@ -16,8 +16,14 @@ package tools
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/db/redisutil"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/mw"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"os"
"os/signal"
"syscall"
@ -25,96 +31,58 @@ import (
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
)
type CronTaskConfig struct {
CronTask config.CronTask
RedisConfig config.Redis
MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper
Share config.Share
KafkaConfig config.Kafka
}
func Start(ctx context.Context, config *CronTaskConfig) error {
log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime",
config.CronTask.ChatRecordsClearTime, "msgDestructTime", config.CronTask.MsgDestructTime)
msgTool, err := InitMsgTool(ctx, config)
if err != nil {
return err
}
msgTool.convertTools()
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
if err != nil {
return err
log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", config.CronTask.ChatRecordsClearTime, "msgDestructTime", config.CronTask.RetainChatRecords)
if config.CronTask.RetainChatRecords < 1 {
return errs.New("msg destruct time must be greater than 1").Wrap()
}
// register cron tasks
var crontab = cron.New()
_, err = crontab.AddFunc(config.CronTask.ChatRecordsClearTime,
cronWrapFunc(config, rdb, "cron_clear_msg_and_fix_seq", msgTool.AllConversationClearMsgAndFixSeq))
client, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share)
if err != nil {
return errs.Wrap(err)
return errs.WrapMsg(err, "failed to register discovery service")
}
_, err = crontab.AddFunc(config.CronTask.MsgDestructTime,
cronWrapFunc(config, rdb, "cron_conversations_destruct_msgs", msgTool.ConversationsDestructMsgs))
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
ctx, exitBy := context.WithCancelCause(context.Background())
ctx = mcontext.SetOpUserID(ctx, config.Share.IMAdminUserID[0])
conn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg)
if err != nil {
return errs.WrapMsg(err, "cron_conversations_destruct_msgs")
}
// start crontab
crontab.Start()
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)
<-sigs
// stop crontab, Wait for the running task to exit.
cronCtx := crontab.Stop()
select {
case <-cronCtx.Done():
// graceful exit
case <-time.After(15 * time.Second):
// forced exit on timeout
}
return nil
}
// netlock redis lock.
func netlock(rdb redis.UniversalClient, key string, ttl time.Duration) bool {
value := "used"
ok, err := rdb.SetNX(context.Background(), key, value, ttl).Result() // nolint
if err != nil {
// when err is about redis server, return true.
return false
return err
}
return ok
}
func cronWrapFunc(config *CronTaskConfig, rdb redis.UniversalClient, key string, fn func()) func() {
enableCronLocker := config.CronTask.EnableCronLocker
return func() {
// if don't enable cron-locker, call fn directly.
if !enableCronLocker {
fn()
return
cli := msg.NewMsgClient(conn)
go func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)
select {
case <-ctx.Done():
case s := <-sigs:
exitBy(fmt.Errorf("exit signal %s", s))
}
// when acquire redis lock, call fn().
if netlock(rdb, key, 5*time.Second) {
fn()
}()
crontab := cron.New()
clearFunc := func() {
now := time.Now()
deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords))
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli()))
log.ZInfo(ctx, "clear chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())
if _, err := cli.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil {
log.ZError(ctx, "cron clear chat records failed", err, "deltime", deltime, "cont", time.Since(now))
return
}
log.ZInfo(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now))
}
if _, err := crontab.AddFunc(config.CronTask.ChatRecordsClearTime, clearFunc); err != nil {
return errs.Wrap(err)
}
log.ZInfo(ctx, "start cron task", "chatRecordsClearTime", config.CronTask.ChatRecordsClearTime)
crontab.Start()
<-ctx.Done()
return context.Cause(ctx)
}

@ -1,113 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tools
import (
"testing"
"time"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
)
func TestDisLock(t *testing.T) {
rdb := redis.NewClient(&redis.Options{})
defer rdb.Close()
assert.Equal(t, true, netlock(rdb, "cron-1", 1*time.Second))
// if exists, get false
assert.Equal(t, false, netlock(rdb, "cron-1", 1*time.Second))
time.Sleep(2 * time.Second)
// wait for key on timeout, get true
assert.Equal(t, true, netlock(rdb, "cron-1", 2*time.Second))
// set different key
assert.Equal(t, true, netlock(rdb, "cron-2", 2*time.Second))
}
//func TestCronWrapFunc(t *testing.T) {
// rdb := redis.NewClient(&redis.Options{})
// defer rdb.Close()
//
// once := sync.Once{}
// done := make(chan struct{}, 1)
// cb := func() {
// once.Do(func() {
// close(done)
// })
// }
//
// start := time.Now()
// key := fmt.Sprintf("cron-%v", rand.Int31())
// crontab := cron.New(cron.WithSeconds())
// crontab.AddFunc("*/1 * * * * *", cronWrapFunc(config.NewGlobalConfig(), rdb, key, cb))
// crontab.Start()
// <-done
//
// dur := time.Since(start)
// assert.LessOrEqual(t, dur.Seconds(), float64(2*time.Second))
// crontab.Stop()
//}
//
//func TestCronWrapFuncWithNetlock(t *testing.T) {
// conf, err := initCfg()
// if err != nil {
// panic(err)
// }
// conf.EnableCronLocker = true
// rdb := redis.NewClient(&redis.Options{})
// defer rdb.Close()
//
// done := make(chan string, 10)
//
// crontab := cron.New(cron.WithSeconds())
//
// key := fmt.Sprintf("cron-%v", rand.Int31())
// crontab.AddFunc("*/1 * * * * *", cronWrapFunc(conf, rdb, key, func() {
// done <- "host1"
// }))
// crontab.AddFunc("*/1 * * * * *", cronWrapFunc(conf, rdb, key, func() {
// done <- "host2"
// }))
// crontab.Start()
//
// time.Sleep(12 * time.Second)
// // the ttl of netlock is 5s, so expected value is 2.
// assert.Equal(t, len(done), 2)
//
// crontab.Stop()
//}
//
//func initCfg() (*config.GlobalConfig, error) {
// const (
// defaultCfgPath = "../../../../../config/config.yaml"
// )
//
// cfgPath := flag.String("c", defaultCfgPath, "Path to the configuration file")
// data, err := os.ReadFile(*cfgPath)
// if err != nil {
// return nil, errs.WrapMsg(err, "ReadFile unmarshal failed")
// }
//
// conf := config.NewGlobalConfig()
// err = yaml.Unmarshal(data, &conf)
// if err != nil {
// return nil, errs.WrapMsg(err, "InitConfig unmarshal failed")
// }
// return conf, nil
//}

@ -1,226 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tools
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/internal/rpc/msg"
"math"
"math/rand"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/stringutil"
"github.com/redis/go-redis/v9"
)
type MsgTool struct {
msgDatabase controller.CommonMsgDatabase
conversationDatabase controller.ConversationDatabase
userDatabase controller.UserDatabase
groupDatabase controller.GroupDatabase
msgNotificationSender *msg.MsgNotificationSender
config *CronTaskConfig
}
func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controller.UserDatabase,
groupDatabase controller.GroupDatabase, conversationDatabase controller.ConversationDatabase,
msgNotificationSender *msg.MsgNotificationSender, config *CronTaskConfig,
) *MsgTool {
return &MsgTool{
msgDatabase: msgDatabase,
userDatabase: userDatabase,
groupDatabase: groupDatabase,
conversationDatabase: conversationDatabase,
msgNotificationSender: msgNotificationSender,
config: config,
}
}
func InitMsgTool(ctx context.Context, config *CronTaskConfig) (*MsgTool, error) {
ch := make(chan int)
<-ch
//mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
//if err != nil {
// return nil, err
//}
//rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
//if err != nil {
// return nil, err
//}
//discov, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share)
//if err != nil {
// return nil, err
//}
//discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
//userDB, err := mgo.NewUserMongo(mgocli.GetDB())
//if err != nil {
// return nil, err
//}
////msgDatabase, err := controller.InitCommonMsgDatabase(rdb, mgocli.GetDB(), config)
//if err != nil {
// return nil, err
//}
//userMongoDB := mgo.NewUserMongoDriver(mgocli.GetDB())
//userDatabase := controller.NewUserDatabase(
// userDB,
// cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt()),
// mgocli.GetTx(),
// userMongoDB,
//)
//groupDB, err := mgo.NewGroupMongo(mgocli.GetDB())
//if err != nil {
// return nil, err
//}
//groupMemberDB, err := mgo.NewGroupMember(mgocli.GetDB())
//if err != nil {
// return nil, err
//}
//groupRequestDB, err := mgo.NewGroupRequestMgo(mgocli.GetDB())
//if err != nil {
// return nil, err
//}
//conversationDB, err := mgo.NewConversationMongo(mgocli.GetDB())
//if err != nil {
// return nil, err
//}
//groupDatabase := controller.NewGroupDatabase(rdb, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), nil)
//conversationDatabase := controller.NewConversationDatabase(
// conversationDB,
// cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB),
// mgocli.GetTx(),
//)
//msgRpcClient := rpcclient.NewMessageRpcClient(discov, config.Share.RpcRegisterName.Msg)
//msgNotificationSender := notification.NewMsgNotificationSender(config, rpcclient.WithRpcClient(&msgRpcClient))
//msgTool := NewMsgTool(msgDatabase, userDatabase, groupDatabase, conversationDatabase, msgNotificationSender, config)
//return msgTool, nil
return nil, nil
}
// func (c *MsgTool) AllConversationClearMsgAndFixSeq() {
// ctx := mcontext.NewCtx(utils.GetSelfFuncName())
// log.ZInfo(ctx, "============================ start del cron task ============================")
// conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx)
// if err != nil {
// log.ZError(ctx, "GetAllConversationIDs failed", err)
// return
// }
// for _, conversationID := range conversationIDs {
// conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID))
// }
// c.ClearConversationsMsg(ctx, conversationIDs)
// log.ZInfo(ctx, "============================ start del cron finished ============================")
//}
func (c *MsgTool) AllConversationClearMsgAndFixSeq() {
ctx := mcontext.NewCtx(stringutil.GetSelfFuncName())
log.ZInfo(ctx, "============================ start del cron task ============================")
num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx)
if err != nil {
log.ZError(ctx, "GetAllConversationIDsNumber failed", err)
return
}
const batchNum = 50
log.ZDebug(ctx, "GetAllConversationIDsNumber", "num", num)
if num == 0 {
return
}
count := int(num/batchNum + num/batchNum/2)
if count < 1 {
count = 1
}
maxPage := 1 + num/batchNum
if num%batchNum != 0 {
maxPage++
}
for i := 0; i < count; i++ {
pageNumber := rand.Int63() % maxPage
pagination := &sdkws.RequestPagination{
PageNumber: int32(pageNumber),
ShowNumber: batchNum,
}
conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination)
if err != nil {
log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber)
continue
}
log.ZDebug(ctx, "PageConversationIDs failed", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs)
if len(conversationIDs) == 0 {
continue
}
c.ClearConversationsMsg(ctx, conversationIDs)
}
log.ZInfo(ctx, "============================ start del cron finished ============================")
}
func (c *MsgTool) ClearConversationsMsg(ctx context.Context, conversationIDs []string) {
for _, conversationID := range conversationIDs {
if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, int64(c.config.CronTask.RetainChatRecords*24*60*60)); err != nil {
log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "conversationID",
conversationID, "DBRetainChatRecords", c.config.CronTask.RetainChatRecords)
}
if err := c.checkMaxSeq(ctx, conversationID); err != nil {
log.ZError(ctx, "fixSeq failed", err, "conversationID", conversationID)
}
}
}
func (c *MsgTool) checkMaxSeqWithMongo(ctx context.Context, conversationID string, maxSeqCache int64) error {
minSeqMongo, maxSeqMongo, err := c.msgDatabase.GetMongoMaxAndMinSeq(ctx, conversationID)
if err != nil {
return err
}
if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 {
err = fmt.Errorf("cache max seq and mongo max seq is diff > 10, maxSeqMongo:%d,minSeqMongo:%d,maxSeqCache:%d,conversationID:%s", maxSeqMongo, minSeqMongo, maxSeqCache, conversationID)
return errs.Wrap(err)
}
return nil
}
func (c *MsgTool) checkMaxSeq(ctx context.Context, conversationID string) error {
maxSeq, err := c.msgDatabase.GetMaxSeq(ctx, conversationID)
if err != nil {
if errs.Unwrap(err) == redis.Nil {
return nil
}
return err
}
if err := c.checkMaxSeqWithMongo(ctx, conversationID, maxSeq); err != nil {
return err
}
return nil
}
func (c *MsgTool) FixAllSeq(ctx context.Context) error {
conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx)
if err != nil {
return err
}
for _, conversationID := range conversationIDs {
conversationIDs = append(conversationIDs, conversationutil.GetNotificationConversationIDByConversationID(conversationID))
}
for _, conversationID := range conversationIDs {
if err := c.checkMaxSeq(ctx, conversationID); err != nil {
log.ZWarn(ctx, "fixSeq failed", err, "conversationID", conversationID)
}
}
fmt.Println("fix all seq finished")
return nil
}

@ -1,46 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tools
import (
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
)
func (c *MsgTool) convertTools() {
ctx := mcontext.NewCtx("convert")
conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx)
if err != nil {
log.ZError(ctx, "get all conversation ids failed", err)
return
}
for _, conversationID := range conversationIDs {
conversationIDs = append(conversationIDs, msgprocessor.GetNotificationConversationIDByConversationID(conversationID))
}
_, userIDs, err := c.userDatabase.GetAllUserID(ctx, nil)
if err != nil {
log.ZError(ctx, "get all user ids failed", err)
return
}
log.ZDebug(ctx, "all userIDs", "len userIDs", len(userIDs))
for _, userID := range userIDs {
conversationIDs = append(conversationIDs, msgprocessor.GetConversationIDBySessionType(constant.SingleChatType, userID, userID))
conversationIDs = append(conversationIDs, msgprocessor.GetNotificationConversationID(constant.SingleChatType, userID, userID))
}
log.ZDebug(ctx, "all conversationIDs", "len userIDs", len(conversationIDs))
c.msgDatabase.ConvertMsgsDocLen(ctx, conversationIDs)
}

@ -46,7 +46,6 @@ func IsAppManagerUid(ctx context.Context, imAdminUserID []string) bool {
}
func CheckAdmin(ctx context.Context, imAdminUserID []string) error {
if datautil.Contain(mcontext.GetOpUserID(ctx), imAdminUserID...) {
return nil
}

@ -34,11 +34,8 @@ func NewCronTaskCmd() *CronTaskCmd {
ret := &CronTaskCmd{cronTaskConfig: &cronTaskConfig}
ret.configMap = map[string]any{
OpenIMCronTaskCfgFileName: &cronTaskConfig.CronTask,
RedisConfigFileName: &cronTaskConfig.RedisConfig,
MongodbConfigFileName: &cronTaskConfig.MongodbConfig,
ZookeeperConfigFileName: &cronTaskConfig.ZookeeperConfig,
ShareFileName: &cronTaskConfig.Share,
KafkaConfigFileName: &cronTaskConfig.KafkaConfig,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)

@ -15,16 +15,11 @@
package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/tools"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
type MsgUtilsCmd struct {
cobra.Command
MsgTool *tools.MsgTool
}
func (m *MsgUtilsCmd) AddUserIDFlag() {
@ -146,27 +141,7 @@ func NewSeqCmd() *SeqCmd {
func (s *SeqCmd) GetSeqCmd() *cobra.Command {
s.Command.Run = func(cmdLines *cobra.Command, args []string) {
_, err := tools.InitMsgTool(context.Background(), nil)
if err != nil {
program.ExitWithError(err)
}
userID := s.getUserIDFlag(cmdLines)
superGroupID := s.getSuperGroupIDFlag(cmdLines)
// beginSeq := s.getBeginSeqFlag(cmdLines)
// limit := s.getLimitFlag(cmdLines)
if userID != "" {
// seq, err := msgTool.s(context.Background(), userID)
if err != nil {
panic(err)
}
// println(seq)
} else if superGroupID != "" {
// seq, err := msgTool.GetSuperGroupSeq(context.Background(), superGroupID)
if err != nil {
panic(err)
}
// println(seq)
}
}
return &s.Command
}

@ -107,9 +107,7 @@ type API struct {
type CronTask struct {
ChatRecordsClearTime string `mapstructure:"chatRecordsClearTime"`
MsgDestructTime string `mapstructure:"msgDestructTime"`
RetainChatRecords int `mapstructure:"retainChatRecords"`
EnableCronLocker bool `yaml:"enableCronLocker"`
}
type OfflinePushConfig struct {

@ -17,6 +17,8 @@ package controller
import (
"context"
"encoding/json"
"errors"
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
@ -102,6 +104,10 @@ type CommonMsgDatabase interface {
RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*relation.UserCount, dateCount map[string]int64, err error)
RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*relation.GroupCount, dateCount map[string]int64, err error)
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
// clear msg
GetBeforeMsg(ctx context.Context, ts int64, limit int) ([]*relation.MsgDocModel, error)
DeleteDocMsgBefore(ctx context.Context, ts int64, doc *relation.MsgDocModel) ([]int, error)
}
func NewCommonMsgDatabase(msgDocModel relation.MsgDocModelInterface, msg cache.MsgCache, seq cache.SeqCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
@ -1047,3 +1053,72 @@ func (db *commonMsgDatabase) FindOneByDocIDs(ctx context.Context, conversationID
func (db *commonMsgDatabase) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) {
db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs)
}
func (db *commonMsgDatabase) GetBeforeMsg(ctx context.Context, ts int64, limit int) ([]*relation.MsgDocModel, error) {
return db.msgDocDatabase.GetBeforeMsg(ctx, ts, limit)
}
func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *relation.MsgDocModel) ([]int, error) {
var notNull int
index := make([]int, 0, len(doc.Msg))
for i, message := range doc.Msg {
if message.Msg != nil {
notNull++
if message.Msg.SendTime < ts {
index = append(index, i)
}
}
}
if len(index) == 0 {
return index, nil
}
maxSeq := doc.Msg[index[len(index)-1]].Msg.Seq
conversationID := doc.DocID[:strings.LastIndex(doc.DocID, ":")]
if err := db.setMinSeq(ctx, conversationID, maxSeq+1); err != nil {
return index, err
}
if len(index) == notNull {
return index, db.msgDocDatabase.DeleteDoc(ctx, doc.DocID)
} else {
return index, db.msgDocDatabase.DeleteMsgByIndex(ctx, doc.DocID, index)
}
}
//func (db *commonMsgDatabase) ClearMsg(ctx context.Context, ts int64) (err error) {
// var (
// docNum int
// msgNum int
// start = time.Now()
// )
// for {
// msgs, err := db.msgDocDatabase.GetBeforeMsg(ctx, ts, 100)
// if err != nil {
// return err
// }
// if len(msgs) == 0 {
// return nil
// }
// for _, msg := range msgs {
// num, err := db.deleteOneMsg(ctx, ts, msg)
// if err != nil {
// return err
// }
// docNum++
// msgNum += num
// }
// }
//}
func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID string, seq int64) error {
dbSeq, err := db.seq.GetMinSeq(ctx, conversationID)
if err != nil {
if errors.Is(errs.Unwrap(err), redis.Nil) {
return nil
}
return err
}
if dbSeq >= seq {
return nil
}
return db.seq.SetMinSeq(ctx, conversationID, seq)
}

@ -56,7 +56,16 @@ func (c *ConversationMgo) Delete(ctx context.Context, groupIDs []string) (err er
}
func (c *ConversationMgo) UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error) {
res, err := mongoutil.UpdateMany(ctx, c.coll, bson.M{"owner_user_id": bson.M{"$in": userIDs}, "conversation_id": conversationID}, bson.M{"$set": args})
if len(args) == 0 {
return 0, nil
}
filter := bson.M{
"conversation_id": conversationID,
}
if len(userIDs) > 0 {
filter["owner_user_id"] = bson.M{"$in": userIDs}
}
res, err := mongoutil.UpdateMany(ctx, c.coll, filter, bson.M{"$set": args})
if err != nil {
return 0, err
}

@ -896,3 +896,89 @@ func (m *MsgMgo) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string
}
}
}
func (m *MsgMgo) GetBeforeMsg(ctx context.Context, ts int64, limit int) ([]*relation.MsgDocModel, error) {
return mongoutil.Aggregate[*relation.MsgDocModel](ctx, m.coll, []bson.M{
{
"$match": bson.M{
"msgs.msg.send_time": bson.M{
"$lt": ts,
},
},
},
{
"$project": bson.M{
"_id": 0,
"doc_id": 1,
"msgs.msg.send_time": 1,
"msgs.msg.seq": 1,
},
},
{
"$limit": limit,
},
})
}
func (m *MsgMgo) DeleteMsgByIndex(ctx context.Context, docID string, index []int) error {
if len(index) == 0 {
return nil
}
model := &relation.MsgInfoModel{DelList: []string{}}
set := make(map[string]any)
for i := range index {
set[fmt.Sprintf("msgs.%d", i)] = model
}
return mongoutil.UpdateOne(ctx, m.coll, bson.M{"doc_id": docID}, bson.M{"$set": set}, true)
}
//func (m *MsgMgo) ClearMsg(ctx context.Context, t time.Time) (int64, error) {
// ts := t.UnixMilli()
// var count int64
// for {
// msgs, err := m.GetBeforeMsg(ctx, ts, 100)
// if err != nil {
// return count, err
// }
// if len(msgs) == 0 {
// return count, nil
// }
// for _, msg := range msgs {
// num, err := m.deleteOneMsg(ctx, ts, msg)
// count += num
// if err != nil {
// return count, err
// }
// }
// }
//}
func (m *MsgMgo) DeleteDoc(ctx context.Context, docID string) error {
return mongoutil.DeleteOne(ctx, m.coll, bson.M{"doc_id": docID})
}
//func (m *MsgMgo) DeleteDocMsg(ctx context.Context, ts int64, doc *relation.MsgDocModel) (int64, error) {
// var notNull int
// index := make([]int, 0, len(doc.Msg))
// for i, message := range doc.Msg {
// if message.Msg != nil {
// notNull++
// if message.Msg.SendTime < ts {
// index = append(index, i)
// }
// }
// }
// if len(index) == 0 {
// return 0, errs.New("no msg to delete").WrapMsg("deleteOneMsg", "docID", doc.DocID)
// }
// if len(index) == notNull {
// if err := m.DeleteDoc(ctx, doc.DocID); err != nil {
// return 0, err
// }
// } else {
// if err := m.setNullMsg(ctx, doc.DocID, index); err != nil {
// return 0, err
// }
// }
// return int64(len(index)), nil
//}

@ -116,6 +116,12 @@ type MsgDocModelInterface interface {
RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*UserCount, dateCount map[string]int64, err error)
RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*GroupCount, dateCount map[string]int64, err error)
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
DeleteDoc(ctx context.Context, docID string) error
DeleteMsgByIndex(ctx context.Context, docID string, index []int) error
GetBeforeMsg(ctx context.Context, ts int64, limit int) ([]*MsgDocModel, error)
//ClearMsg(ctx context.Context, t time.Time) (int64, error)
}
func (MsgDocModel) TableName() string {

@ -82,6 +82,11 @@ func (c *ConversationRpcClient) SetConversations(ctx context.Context, userIDs []
return err
}
func (c *ConversationRpcClient) UpdateConversations(ctx context.Context, conversation *pbconversation.UpdateConversationReq) error {
_, err := c.Client.UpdateConversation(ctx, conversation)
return err
}
func (c *ConversationRpcClient) GetConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) {
resp, err := c.Client.GetConversationIDs(ctx, &pbconversation.GetConversationIDsReq{UserID: ownerUserID})
if err != nil {

@ -212,6 +212,11 @@ func (m *MessageRpcClient) GetConversationMaxSeq(ctx context.Context, conversati
return resp.MaxSeq, nil
}
func (m *MessageRpcClient) ClearMsg(ctx context.Context, ts int64) error {
_, err := m.Client.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: ts})
return err
}
type NotificationSender struct {
contentTypeConf map[int32]config.NotificationConfig
sessionTypeConf map[int32]int32

Loading…
Cancel
Save