From 7a29a853e9e50f87925ff96d6bcbdb4317d47146 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Fri, 26 Apr 2024 15:32:20 +0800 Subject: [PATCH] feat: cron task --- internal/tools/cron_task.go | 17 +++++++---- pkg/common/cmd/cron_task.go | 3 -- pkg/common/db/mgo/msg_test.go | 56 ++++++++++++++--------------------- 3 files changed, 33 insertions(+), 43 deletions(-) diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index d52ed6509..7161f55fc 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -19,8 +19,11 @@ import ( "fmt" "github.com/openimsdk/open-im-server/v3/pkg/common/config" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + "github.com/openimsdk/protocol/msg" "github.com/openimsdk/tools/mcontext" + "github.com/openimsdk/tools/mw" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "os" "os/signal" "syscall" @@ -33,11 +36,8 @@ import ( type CronTaskConfig struct { CronTask config.CronTask - RedisConfig config.Redis - MongodbConfig config.Mongo ZookeeperConfig config.ZooKeeper Share config.Share - KafkaConfig config.Kafka } func Start(ctx context.Context, config *CronTaskConfig) error { @@ -49,9 +49,14 @@ func Start(ctx context.Context, config *CronTaskConfig) error { if err != nil { return errs.WrapMsg(err, "failed to register discovery service") } - cli := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) + client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) ctx, exitBy := context.WithCancelCause(context.Background()) ctx = mcontext.SetOpUserID(ctx, config.Share.IMAdminUserID[0]) + conn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg) + if err != nil { + return err + } + cli := msg.NewMsgClient(conn) go func() { sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGTERM) @@ -67,7 +72,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error { deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords)) ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli())) log.ZInfo(ctx, "clear chat records", "deltime", deltime, "timestamp", deltime.UnixMilli()) - if err := cli.ClearMsg(ctx, deltime.UnixMilli()); err != nil { + if _, err := cli.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil { log.ZError(ctx, "cron clear chat records failed", err, "deltime", deltime, "cont", time.Since(now)) return } diff --git a/pkg/common/cmd/cron_task.go b/pkg/common/cmd/cron_task.go index 0e94cf52c..be26f5af3 100644 --- a/pkg/common/cmd/cron_task.go +++ b/pkg/common/cmd/cron_task.go @@ -34,11 +34,8 @@ func NewCronTaskCmd() *CronTaskCmd { ret := &CronTaskCmd{cronTaskConfig: &cronTaskConfig} ret.configMap = map[string]any{ OpenIMCronTaskCfgFileName: &cronTaskConfig.CronTask, - RedisConfigFileName: &cronTaskConfig.RedisConfig, - MongodbConfigFileName: &cronTaskConfig.MongodbConfig, ZookeeperConfigFileName: &cronTaskConfig.ZookeeperConfig, ShareFileName: &cronTaskConfig.Share, - KafkaConfigFileName: &cronTaskConfig.KafkaConfig, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) diff --git a/pkg/common/db/mgo/msg_test.go b/pkg/common/db/mgo/msg_test.go index 15026d69c..0a92b7d4a 100644 --- a/pkg/common/db/mgo/msg_test.go +++ b/pkg/common/db/mgo/msg_test.go @@ -1,46 +1,34 @@ package mgo import ( - "strings" + "context" + "github.com/openimsdk/protocol/msg" + "github.com/openimsdk/tools/mcontext" + "github.com/openimsdk/tools/mw" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "testing" + "time" ) func TestName(t *testing.T) { - //conf := config.Mongo{ - // Address: []string{"localhost:37017"}, - // Username: "openIM", - // Password: "openIM123", - // Database: "demo", - //} - //conf.URI = `mongodb://openIM:openIM123@localhost:37017/demo?maxPoolSize=100&authSource=admin` - //cli, err := mongoutil.NewMongoDB(context.Background(), conf.Build()) - //if err != nil { - // panic(err) - //} - //msg, _ := NewMsgMongo(cli.GetDB()) - //count, err := msg.ClearMsg(context.Background(), time.Now().Add(-time.Hour*24*61)) - //if err != nil { - // t.Log("error", err) - // return - //} - //t.Log("count", count) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + ctx = mcontext.SetOpUserID(ctx, "imAdmin") + ctx = mcontext.SetOperationID(ctx, "test123456") - s := `si_5300327160_9129042887:0123` - - t.Log(s[:strings.LastIndex(s, ":")]) + conn, err := grpc.DialContext(ctx, "172.16.8.48:10130", grpc.WithTransportCredentials(insecure.NewCredentials()), mw.GrpcClient()) + if err != nil { + panic(err) + } + defer conn.Close() + cli := msg.NewMsgClient(conn) + var ts int64 -} + ts = time.Now().UnixMilli() -func TestName2(t *testing.T) { - m := map[string]string{ - "1": "1", - "2": "2", + if _, err := cli.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: ts}); err != nil { + panic(err) } - t.Log(m) - clear(m) - t.Log(m) - a := []string{"1", "2"} - t.Log(a) - clear(a) - t.Log(a) + t.Log("success!") }