From 678d361ccd7555b3325ad384cf62e5229131222a Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Sat, 11 May 2024 16:44:15 +0800 Subject: [PATCH] etcd --- pkg/common/discoveryregister/etcd/etcd.go | 46 +++++++++++++++++++++++ tools/check-component/main.go | 31 ++++++++++----- 2 files changed, 67 insertions(+), 10 deletions(-) diff --git a/pkg/common/discoveryregister/etcd/etcd.go b/pkg/common/discoveryregister/etcd/etcd.go index 904f83dd0..51fafd0b6 100644 --- a/pkg/common/discoveryregister/etcd/etcd.go +++ b/pkg/common/discoveryregister/etcd/etcd.go @@ -3,6 +3,7 @@ package etcd import ( "context" "fmt" + "github.com/pkg/errors" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/naming/endpoints" "go.etcd.io/etcd/client/v3/naming/resolver" @@ -173,3 +174,48 @@ func (r *SvcDiscoveryRegistryImpl) Close() { _ = r.client.Close() } } + +// Check verifies if etcd is running by checking the existence of the root node and optionally creates it +func Check(ctx context.Context, etcdServers []string, etcdRoot string, createIfNotExist bool, options ...ZkOption) error { + // Configure the etcd client with default settings + cfg := clientv3.Config{ + Endpoints: etcdServers, + } + + // Apply provided options to the config + for _, opt := range options { + opt(&cfg) + } + + client, err := clientv3.New(cfg) + if err != nil { + return errors.Wrap(err, "failed to connect to etcd") + } + defer client.Close() + + // Create a child context with a default timeout or use the provided context + opCtx, cancel := context.WithCancel(ctx) + defer cancel() + + // Check if the root node exists + resp, err := client.Get(opCtx, etcdRoot) + if err != nil { + return errors.Wrap(err, "failed to get the root node from etcd") + } + + // If root node does not exist and createIfNotExist is true, create the root node + if len(resp.Kvs) == 0 { + if createIfNotExist { + _, err := client.Put(opCtx, etcdRoot, "") + if err != nil { + return errors.Wrap(err, "failed to create the root node in etcd") + } + fmt.Printf("Root node %s did not exist, but has been created.\n", etcdRoot) + } else { + return fmt.Errorf("root node %s does not exist in etcd", etcdRoot) + } + } else { + fmt.Printf("Etcd is running and the root node %s exists.\n", etcdRoot) + } + return nil +} diff --git a/tools/check-component/main.go b/tools/check-component/main.go index 5fa84ac36..5a936f775 100644 --- a/tools/check-component/main.go +++ b/tools/check-component/main.go @@ -20,9 +20,9 @@ import ( "fmt" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/etcd" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" - "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/discovery/zookeeper" "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/s3/minio" @@ -45,11 +45,22 @@ func CheckZookeeper(ctx context.Context, config *config.ZooKeeper) error { } func CheckEtcd(ctx context.Context, config *config.Etcd) error { - return etcd.Check(ctx, config.Address, "/check_openim_component", + etcd.Check(ctx, config.Address, "/check_openim_component", true, etcd.WithDialTimeout(10*time.Second), etcd.WithMaxCallSendMsgSize(20*1024*1024), etcd.WithUsernameAndPassword(config.Username, config.Password)) + return nil +} + +func CheckDiscovery(ctx context.Context, config *config.Discovery) error { + switch config.Enable { + case "etcd": + return CheckEtcd(ctx, &config.Etcd) + + } + + return nil } func CheckMongo(ctx context.Context, config *config.Mongo) error { @@ -139,28 +150,28 @@ func main() { func performChecks(ctx context.Context, mongoConfig *config.Mongo, redisConfig *config.Redis, kafkaConfig *config.Kafka, minioConfig *config.Minio, discovery *config.Discovery, maxRetry int) error { checksDone := make(map[string]bool) - checks := map[string]func(ctx context.Context) error{ - "Mongo": func(ctx context.Context) error { + checks := map[string]func() error{ + "Mongo": func() error { return CheckMongo(ctx, mongoConfig) }, - "Redis": func(ctx context.Context) error { + "Redis": func() error { return CheckRedis(ctx, redisConfig) }, - "Kafka": func(ctx context.Context) error { + "Kafka": func() error { return CheckKafka(ctx, kafkaConfig) }, } if minioConfig != nil { - checks["MinIO"] = func(ctx context.Context) error { + checks["MinIO"] = func() error { return CheckMinIO(ctx, minioConfig) } } if discovery.Enable == "etcd" { - checks["Etcd"] = func(ctx context.Context) error { + checks["Etcd"] = func() error { return CheckEtcd(ctx, &discovery.Etcd) } } else if discovery.Enable == "zookeeper" { - checks["Zookeeper"] = func(ctx context.Context) error { + checks["Zookeeper"] = func() error { return CheckZookeeper(ctx, &discovery.ZooKeeper) } } @@ -169,7 +180,7 @@ func performChecks(ctx context.Context, mongoConfig *config.Mongo, redisConfig * allSuccess := true for name, check := range checks { if !checksDone[name] { - if err := check(ctx); err != nil { + if err := check(); err != nil { fmt.Printf("%s check failed: %v\n", name, err) allSuccess = false } else {