refactor: crontab update

pull/2148/head
Gordon 1 year ago
parent 9b3aedc795
commit 93b68f070c

@ -1,3 +1,4 @@
chatRecordsClearTime: "0 2 * * 3"
msgDestructTime: "0 2 * * *"
retainChatRecords: 365
enableCronLocker: false

@ -16,22 +16,23 @@ package tools
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/tools/db/redisutil"
"os"
"os/signal"
"syscall"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
)
func StartTask(ctx context.Context, config *config.GlobalConfig) error {
func Start(ctx context.Context, config *cmd.CronTaskConfig) error {
log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", config.ChatRecordsClearTime, "msgDestructTime", config.MsgDestructTime)
log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime",
config.CronTask.ChatRecordsClearTime, "msgDestructTime", config.CronTask.MsgDestructTime)
msgTool, err := InitMsgTool(ctx, config)
if err != nil {
@ -40,7 +41,7 @@ func StartTask(ctx context.Context, config *config.GlobalConfig) error {
msgTool.convertTools()
rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build())
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
if err != nil {
return err
}
@ -48,12 +49,14 @@ func StartTask(ctx context.Context, config *config.GlobalConfig) error {
// register cron tasks
var crontab = cron.New()
_, err = crontab.AddFunc(config.ChatRecordsClearTime, cronWrapFunc(config, rdb, "cron_clear_msg_and_fix_seq", msgTool.AllConversationClearMsgAndFixSeq))
_, err = crontab.AddFunc(config.CronTask.ChatRecordsClearTime,
cronWrapFunc(config, rdb, "cron_clear_msg_and_fix_seq", msgTool.AllConversationClearMsgAndFixSeq))
if err != nil {
return errs.Wrap(err)
}
_, err = crontab.AddFunc(config.MsgDestructTime, cronWrapFunc(config, rdb, "cron_conversations_destruct_msgs", msgTool.ConversationsDestructMsgs))
_, err = crontab.AddFunc(config.CronTask.MsgDestructTime,
cronWrapFunc(config, rdb, "cron_conversations_destruct_msgs", msgTool.ConversationsDestructMsgs))
if err != nil {
return errs.WrapMsg(err, "cron_conversations_destruct_msgs")
}
@ -91,8 +94,8 @@ func netlock(rdb redis.UniversalClient, key string, ttl time.Duration) bool {
return ok
}
func cronWrapFunc(config *config.GlobalConfig, rdb redis.UniversalClient, key string, fn func()) func() {
enableCronLocker := config.EnableCronLocker
func cronWrapFunc(config *cmd.CronTaskConfig, rdb redis.UniversalClient, key string, fn func()) func() {
enableCronLocker := config.CronTask.EnableCronLocker
return func() {
// if don't enable cron-locker, call fn directly.
if !enableCronLocker {

@ -64,6 +64,7 @@ const (
kafkaEnvPrefix = "openim-kafka"
zoopkeeperEnvPrefix = "openim-zookeeper"
apiEnvPrefix = "openim-api"
cornTaskEnvPrefix = "openim-crontask"
authEnvPrefix = "openim-auth"
conversationEnvPrefix = "openim-conversation"
friendEnvPrefix = "openim-friend"

@ -16,7 +16,6 @@ package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/tools"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/system/program"
@ -25,29 +24,34 @@ import (
type CronTaskCmd struct {
*RootCmd
initFunc func(ctx context.Context, config *config.GlobalConfig) error
ctx context.Context
ctx context.Context
configMap map[string]StructEnvPrefix
cronTaskConfig CronTaskConfig
}
func NewCronTaskCmd(name string) *CronTaskCmd {
ret := &CronTaskCmd{RootCmd: NewRootCmd(program.GetProcessName(), name, WithCronTaskLogName()),
initFunc: tools.StartTask}
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.addRunE()
ret.SetRootCmdPt(ret)
return ret
type CronTaskConfig struct {
CronTask config.CronTask
RedisConfig config.Redis
}
func (c *CronTaskCmd) addRunE() {
c.Command.RunE = func(cmd *cobra.Command, args []string) error {
return c.initFunc(c.ctx, c.config)
func NewCronTaskCmd() *CronTaskCmd {
var cronTaskConfig CronTaskConfig
ret := &CronTaskCmd{cronTaskConfig: cronTaskConfig}
ret.configMap = map[string]StructEnvPrefix{
OpenIMCronTaskCfgFileName: {EnvPrefix: cornTaskEnvPrefix, ConfigStruct: &cronTaskConfig.CronTask},
RedisConfigFileName: {EnvPrefix: redisEnvPrefix, ConfigStruct: &cronTaskConfig.RedisConfig},
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.Command.PreRunE = func(cmd *cobra.Command, args []string) error {
return ret.preRunE()
}
return ret
}
func (c *CronTaskCmd) Exec() error {
return c.Execute()
func (a *CronTaskCmd) Exec() error {
return a.Execute()
}
func (c *CronTaskCmd) GetPortFromConfig(portType string) int {
return 0
func (a *CronTaskCmd) preRunE() error {
return tools.Start(a.ctx, &a.cronTaskConfig)
}

@ -95,6 +95,7 @@ type CronTask struct {
ChatRecordsClearTime string `mapstructure:"chatRecordsClearTime"`
MsgDestructTime string `mapstructure:"msgDestructTime"`
RetainChatRecords int `mapstructure:"retainChatRecords"`
EnableCronLocker bool `yaml:"enableCronLocker"`
}
type OfflinePushConfig struct {

Loading…
Cancel
Save