feat: cron task

pull/2237/head
withchao 7 months ago
parent 2e439508bc
commit 7a29a853e9

@ -19,8 +19,11 @@ import (
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/mw"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"os"
"os/signal"
"syscall"
@ -33,11 +36,8 @@ import (
type CronTaskConfig struct {
CronTask config.CronTask
RedisConfig config.Redis
MongodbConfig config.Mongo
ZookeeperConfig config.ZooKeeper
Share config.Share
KafkaConfig config.Kafka
}
func Start(ctx context.Context, config *CronTaskConfig) error {
@ -49,9 +49,14 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
if err != nil {
return errs.WrapMsg(err, "failed to register discovery service")
}
cli := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg)
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
ctx, exitBy := context.WithCancelCause(context.Background())
ctx = mcontext.SetOpUserID(ctx, config.Share.IMAdminUserID[0])
conn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg)
if err != nil {
return err
}
cli := msg.NewMsgClient(conn)
go func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)
@ -67,7 +72,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords))
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli()))
log.ZInfo(ctx, "clear chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())
if err := cli.ClearMsg(ctx, deltime.UnixMilli()); err != nil {
if _, err := cli.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil {
log.ZError(ctx, "cron clear chat records failed", err, "deltime", deltime, "cont", time.Since(now))
return
}

@ -34,11 +34,8 @@ func NewCronTaskCmd() *CronTaskCmd {
ret := &CronTaskCmd{cronTaskConfig: &cronTaskConfig}
ret.configMap = map[string]any{
OpenIMCronTaskCfgFileName: &cronTaskConfig.CronTask,
RedisConfigFileName: &cronTaskConfig.RedisConfig,
MongodbConfigFileName: &cronTaskConfig.MongodbConfig,
ZookeeperConfigFileName: &cronTaskConfig.ZookeeperConfig,
ShareFileName: &cronTaskConfig.Share,
KafkaConfigFileName: &cronTaskConfig.KafkaConfig,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)

@ -1,46 +1,34 @@
package mgo
import (
"strings"
"context"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/mw"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"testing"
"time"
)
func TestName(t *testing.T) {
//conf := config.Mongo{
// Address: []string{"localhost:37017"},
// Username: "openIM",
// Password: "openIM123",
// Database: "demo",
//}
//conf.URI = `mongodb://openIM:openIM123@localhost:37017/demo?maxPoolSize=100&authSource=admin`
//cli, err := mongoutil.NewMongoDB(context.Background(), conf.Build())
//if err != nil {
// panic(err)
//}
//msg, _ := NewMsgMongo(cli.GetDB())
//count, err := msg.ClearMsg(context.Background(), time.Now().Add(-time.Hour*24*61))
//if err != nil {
// t.Log("error", err)
// return
//}
//t.Log("count", count)
s := `si_5300327160_9129042887:0123`
t.Log(s[:strings.LastIndex(s, ":")])
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
ctx = mcontext.SetOpUserID(ctx, "imAdmin")
ctx = mcontext.SetOperationID(ctx, "test123456")
conn, err := grpc.DialContext(ctx, "172.16.8.48:10130", grpc.WithTransportCredentials(insecure.NewCredentials()), mw.GrpcClient())
if err != nil {
panic(err)
}
defer conn.Close()
cli := msg.NewMsgClient(conn)
var ts int64
ts = time.Now().UnixMilli()
func TestName2(t *testing.T) {
m := map[string]string{
"1": "1",
"2": "2",
if _, err := cli.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: ts}); err != nil {
panic(err)
}
t.Log(m)
clear(m)
t.Log(m)
a := []string{"1", "2"}
t.Log(a)
clear(a)
t.Log(a)
t.Log("success!")
}

Loading…
Cancel
Save