From eb33487367c58ba6ef1e3dffca0110ee2d74fe54 Mon Sep 17 00:00:00 2001 From: hanzhixiao <709674996@qq.com> Date: Tue, 8 Aug 2023 12:23:10 +0800 Subject: [PATCH] 815 Signed-off-by: hanzhixiao <709674996@qq.com> --- tools/component/check_component.go | 160 ----------------- tools/component/main.go | 269 +++++++++++++++++++++++++++++ 2 files changed, 269 insertions(+), 160 deletions(-) delete mode 100644 tools/component/check_component.go create mode 100644 tools/component/main.go diff --git a/tools/component/check_component.go b/tools/component/check_component.go deleted file mode 100644 index 93461f640..000000000 --- a/tools/component/check_component.go +++ /dev/null @@ -1,160 +0,0 @@ -package main - -import ( - "context" - "database/sql" - "fmt" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" - "github.com/OpenIMSDK/tools/utils" - "github.com/Shopify/sarama" - "github.com/go-zookeeper/zk" - "github.com/minio/minio-go/v7" - "github.com/redis/go-redis/v9" - "go.mongodb.org/mongo-driver/mongo/readpref" - "net" - "net/url" - "strings" - "time" - - "github.com/minio/minio-go/v7/pkg/credentials" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" -) - -const ( - sqlDriver = "mysql" - minioHealthCheckDuration = 1 - maxRetry = 3 -) - -func main() { - - for i := 0; i < maxRetry; i++ { - success := 1 - // Check MySQL - db, err := sql.Open(sqlDriver, fmt.Sprintf("%s:%s@tcp(%s)/", - config.Config.Mysql.Username, config.Config.Mysql.Password, config.Config.Mysql.Address)) - if err != nil { - fmt.Printf("Cannot connect to MySQL: %v", err) - success = 0 - } - err = db.Ping() - if err != nil { - fmt.Printf("ping mysql failed: %v. please make sure your mysql service has started", err) - success = 0 - } - db.Close() - - // Check MongoDB - client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI( - fmt.Sprintf("mongodb://%s:%s@%s", - config.Config.Mongo.Username, config.Config.Mongo.Password, config.Config.Mongo.Address))) - if err != nil { - fmt.Printf("Cannot connect to MongoDB: %v", err) - success = 0 - } - err = client.Ping(context.TODO(), &readpref.ReadPref{}) - if err != nil { - fmt.Printf("ping mysql failed: %v. please make sure your mysql service has started", err) - success = 0 - } - client.Disconnect(context.TODO()) - - // Check Minio - if config.Config.Object.Enable == "minio" { - if exactIP(config.Config.Object.ApiURL) == "127.0.0.1" && config.Config.Object.ApiURL == config.Config.Object.Minio.Endpoint { - fmt.Printf("ApiURL contain the same address with Endpoint: %v. please modify the config file", config.Config.Object.ApiURL) - } - minioClient, err := minio.New(config.Config.Object.Minio.Endpoint, &minio.Options{ - Creds: credentials.NewStaticV4(config.Config.Object.Minio.AccessKeyID, config.Config.Object.Minio.SecretAccessKey, ""), - Secure: false, - }) - if err != nil { - fmt.Printf("Cannot connect to Minio: %v", err) - success = 0 - } - cancel, err := minioClient.HealthCheck(time.Duration(minioHealthCheckDuration)) - if err != nil { - fmt.Printf("starting minio health check failed:%v", err) - success = 0 - } - if minioClient.IsOffline() { - fmt.Printf("Error: minio server is offline.") - success = 0 - } - cancel() - } - - // Check Redis - var redisClient redis.UniversalClient - if len(config.Config.Redis.Address) > 1 { - redisClient = redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: config.Config.Redis.Address, - Username: config.Config.Redis.Username, - Password: config.Config.Redis.Password, - }) - } else { - redisClient = redis.NewClient(&redis.Options{ - Addr: config.Config.Redis.Address[0], - Username: config.Config.Redis.Username, - Password: config.Config.Redis.Password, - }) - } - _, err = redisClient.Ping(context.Background()).Result() - if err != nil { - fmt.Printf("Cannot connect to Redis: %v", err) - success = 0 - } - - // Check Zookeeper - c, _, err := zk.Connect(config.Config.Zookeeper.ZkAddr, time.Second) - if err != nil { - fmt.Printf("Cannot connect to Zookeeper: %v", err) - success = 0 - } - c.Close() - - // Check Kafka - kafkaClient, err := sarama.NewClient(config.Config.Kafka.Addr, &sarama.Config{}) - if err != nil { - fmt.Printf("Cannot connect to Kafka: %v", err) - success = 0 - } else { - topics, err := kafkaClient.Topics() - if err != nil { - fmt.Println("get kafka topic error") - success = 0 - } - if !utils.IsContain(config.Config.Kafka.MsgToMongo.Topic, topics) { - fmt.Printf("kafka doesn't contain topic:%v", config.Config.Kafka.MsgToMongo.Topic) - success = 0 - } - if !utils.IsContain(config.Config.Kafka.MsgToPush.Topic, topics) { - fmt.Printf("kafka doesn't contain topic:%v", config.Config.Kafka.MsgToPush.Topic) - success = 0 - } - if !utils.IsContain(config.Config.Kafka.LatestMsgToRedis.Topic, topics) { - fmt.Printf("kafka doesn't contain topic:%v", config.Config.Kafka.LatestMsgToRedis.Topic) - success = 0 - } - } - kafkaClient.Close() - if success == 1 { - fmt.Println("all compose check pass") - return - } - time.Sleep(3 * time.Second) - } -} - -func exactIP(urll string) string { - u, _ := url.Parse(urll) - host, _, err := net.SplitHostPort(u.Host) - if err != nil { - host = u.Host - } - if strings.HasSuffix(host, ":") { - host = host[0 : len(host)-1] - } - return host -} diff --git a/tools/component/main.go b/tools/component/main.go new file mode 100644 index 000000000..306105fa3 --- /dev/null +++ b/tools/component/main.go @@ -0,0 +1,269 @@ +package main + +import ( + "context" + "fmt" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" + "github.com/OpenIMSDK/tools/errs" + "github.com/OpenIMSDK/tools/utils" + "github.com/Shopify/sarama" + "github.com/go-zookeeper/zk" + "github.com/minio/minio-go/v7" + "github.com/redis/go-redis/v9" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readpref" + "gopkg.in/yaml.v2" + "gorm.io/driver/mysql" + "gorm.io/gorm" + "net" + "net/url" + "os" + "strings" + "time" + + "github.com/minio/minio-go/v7/pkg/credentials" +) + +const ( + cfgPath = "../../../config/config.yaml" + minioHealthCheckDuration = 1 + maxRetry = 3 + componentStartErr = 1705 +) + +var ( + ErrComponentStart = errs.NewCodeError(componentStartErr, "ComponentStartErr") +) + +func initCfg() error { + data, err := os.ReadFile(cfgPath) + if err != nil { + return err + } + if err = yaml.Unmarshal(data, &config.Config); err != nil { + return err + } + return nil +} + +func main() { + err := initCfg() + if err != nil { + fmt.Printf("Read config failed: %v", err.Error()) + } + for i := 0; i < maxRetry; i++ { + if i != 0 { + time.Sleep(3 * time.Second) + } + fmt.Printf("Checking components Round %v......\n", i+1) + // Check MySQL + if err := checkMysql(); err != nil { + errorPrint(fmt.Sprintf("Starting Mysql failed: %v.Please make sure your mysql service has started", err.Error())) + continue + } else { + successPrint(fmt.Sprint("Mysql starts successfully")) + } + + // Check MongoDB + if err := checkMongo(); err != nil { + errorPrint(fmt.Sprintf("Starting Mongo failed: %v.Please make sure your monngo service has started", err.Error())) + continue + } else { + successPrint(fmt.Sprint("Mongo starts successfully")) + } + + // Check Minio + if err := checkMinio(); err != nil { + errorPrint(fmt.Sprintf("Starting Minio failed: %v.Please make sure your Minio service has started", err.Error())) + continue + } else { + successPrint(fmt.Sprint("Minio starts successfully")) + } + // Check Redis + if err := checkRedis(); err != nil { + errorPrint(fmt.Sprintf("Starting Redis failed: %v.Please make sure your Redis service has started", err.Error())) + continue + } else { + successPrint(fmt.Sprint("Redis starts successfully")) + } + + // Check Zookeeper + if err := checkZookeeper(); err != nil { + errorPrint(fmt.Sprintf("Starting Zookeeper failed: %v.Please make sure your Zookeeper service has started", err.Error())) + continue + } else { + successPrint(fmt.Sprint("Zookeeper starts successfully")) + } + + // Check Kafka + if err := checkKafka(); err != nil { + errorPrint(fmt.Sprintf("Starting Kafka failed: %v.Please make sure your Kafka service has started", err.Error())) + continue + } else { + successPrint(fmt.Sprint("Kafka starts successfully")) + } + successPrint(fmt.Sprint("All components starts successfully")) + break + } +} + +func exactIP(urll string) string { + u, _ := url.Parse(urll) + host, _, err := net.SplitHostPort(u.Host) + if err != nil { + host = u.Host + } + if strings.HasSuffix(host, ":") { + host = host[0 : len(host)-1] + } + return host +} + +func checkMysql() error { + dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local", + config.Config.Mysql.Username, config.Config.Mysql.Password, config.Config.Mysql.Address[0], "mysql") + db, err := gorm.Open(mysql.Open(dsn), nil) + if err != nil { + return err + } else { + sqlDB, err := db.DB() + err = sqlDB.Ping() + sqlDB.Close() + if err != nil { + return err + } + } + return nil +} + +func checkMongo() error { + mongodbHosts := "" + for i, v := range config.Config.Mongo.Address { + if i == len(config.Config.Mongo.Address)-1 { + mongodbHosts += v + } else { + mongodbHosts += v + "," + } + } + client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI( + fmt.Sprintf("mongodb://%v:%v@%v/?authSource=admin", + config.Config.Mongo.Username, config.Config.Mongo.Password, mongodbHosts))) + if err != nil { + return err + } else { + err = client.Ping(context.TODO(), &readpref.ReadPref{}) + client.Disconnect(context.TODO()) + if err != nil { + return err + } + } + return nil +} + +func checkMinio() error { + if config.Config.Object.Enable == "minio" { + if exactIP(config.Config.Object.ApiURL) == "127.0.0.1" || exactIP(config.Config.Object.Minio.Endpoint) == "127.0.0.1" { + return ErrComponentStart.Wrap("apiURL or endpoint contain 127.0.0.1. Please modify your config file") + } + conf := config.Config.Object.Minio + u, _ := url.Parse(conf.Endpoint) + minioClient, err := minio.New(u.Host, &minio.Options{ + Creds: credentials.NewStaticV4(conf.AccessKeyID, conf.SecretAccessKey, ""), + Secure: u.Scheme == "https", + }) + if err != nil { + return err + } + + cancel, err := minioClient.HealthCheck(time.Duration(minioHealthCheckDuration) * time.Second) + if err != nil { + return err + } else { + if minioClient.IsOffline() { + return ErrComponentStart.Wrap("Minio server is offline") + } + cancel() + } + } + return nil +} + +func checkRedis() error { + var redisClient redis.UniversalClient + if len(config.Config.Redis.Address) > 1 { + redisClient = redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: config.Config.Redis.Address, + Username: config.Config.Redis.Username, + Password: config.Config.Redis.Password, + }) + } else { + redisClient = redis.NewClient(&redis.Options{ + Addr: config.Config.Redis.Address[0], + Username: config.Config.Redis.Username, + Password: config.Config.Redis.Password, + }) + } + _, err := redisClient.Ping(context.Background()).Result() + if err != nil { + return err + } + return nil +} + +func checkZookeeper() error { + c, _, err := zk.Connect(config.Config.Zookeeper.ZkAddr, time.Second) + if err != nil { + return err + } else { + if config.Config.Zookeeper.Username != "" && config.Config.Zookeeper.Password != "" { + if err := c.AddAuth("digest", []byte(config.Config.Zookeeper.Username+":"+config.Config.Zookeeper.Password)); err != nil { + c.Close() + return err + } + } + _, _, err = c.Get("/") + if err != nil { + c.Close() + return err + } + } + return nil +} + +func checkKafka() error { + cfg := sarama.NewConfig() + if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" { + cfg.Net.SASL.Enable = true + cfg.Net.SASL.User = config.Config.Kafka.Username + cfg.Net.SASL.Password = config.Config.Kafka.Password + } + kafkaClient, err := sarama.NewClient(config.Config.Kafka.Addr, cfg) + if err != nil { + return err + } else { + topics, err := kafkaClient.Topics() + kafkaClient.Close() + if err != nil { + return err + } + if !utils.IsContain(config.Config.Kafka.MsgToMongo.Topic, topics) { + return ErrComponentStart.Wrap(fmt.Sprintf("kafka doesn't contain topic:%v", config.Config.Kafka.MsgToMongo.Topic)) + } + if !utils.IsContain(config.Config.Kafka.MsgToPush.Topic, topics) { + return ErrComponentStart.Wrap(fmt.Sprintf("kafka doesn't contain topic:%v", config.Config.Kafka.MsgToPush.Topic)) + } + if !utils.IsContain(config.Config.Kafka.LatestMsgToRedis.Topic, topics) { + return ErrComponentStart.Wrap(fmt.Sprintf("kafka doesn't contain topic:%v", config.Config.Kafka.LatestMsgToRedis.Topic)) + } + } + return nil +} + +func errorPrint(s string) { + fmt.Printf("\x1b[%dm%v\x1b[0m\n", 31, s) +} + +func successPrint(s string) { + fmt.Printf("\x1b[%dm%v\x1b[0m\n", 32, s) +}