diff --git a/tools/component/checks/checker.go b/tools/component/checks/checker.go deleted file mode 100644 index 4e2af5eef..000000000 --- a/tools/component/checks/checker.go +++ /dev/null @@ -1,20 +0,0 @@ -package checks - -import ( - "context" - - "github.com/openimsdk/open-im-server/v3/pkg/common/config" -) - -type ServiceChecker interface { - Check(ctx context.Context, config *config.GlobalConfig) error -} - -func CheckServices(ctx context.Context, cfg *config.GlobalConfig, checkers []ServiceChecker) error { - for _, checker := range checkers { - if err := checker.Check(ctx, cfg); err != nil { - return err - } - } - return nil -} diff --git a/tools/component/checks/kafka.go b/tools/component/checks/kafka.go index 34874719c..17fa84c5a 100644 --- a/tools/component/checks/kafka.go +++ b/tools/component/checks/kafka.go @@ -12,6 +12,7 @@ type KafkaCheck struct { Kafka *config.Kafka } + func CheckKafka(ctx context.Context, config *KafkaCheck) error { kafkaConfig := &kafka.Config{ Addr: config.Kafka.Addr, diff --git a/tools/component/checks/minio.go b/tools/component/checks/minio.go index 7271ee6d9..f24b3303e 100644 --- a/tools/component/checks/minio.go +++ b/tools/component/checks/minio.go @@ -1,107 +1,79 @@ package checks import ( - "errors" + "context" "net" "net/url" - "strings" "time" - "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/utils/jsonutil" - "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" s3minio "github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/minio" ) const ( - minioHealthCheckDuration = 1 - mongoConnTimeout = 5 * time.Second - MaxRetry = 300 + minioHealthCheckDuration = 1 * time.Second ) -type MinioConfig struct { +type MinioCheck struct { s3minio.Config - UseSSL string - ApiURL string + UseSSL bool `yaml:"useSSL"` + ApiURL string `yaml:"apiURL"` } -// CheckMinio checks the MinIO connection. -func CheckMinio(minioStu MinioConfig) error { - if minioStu.Endpoint == "" || minioStu.AccessKeyID == "" || minioStu.SecretAccessKey == "" { - log.CInfo(nil, "Missing configuration for MinIO", "endpoint", minioStu.Endpoint, "accessKeyID", minioStu.AccessKeyID, "secretAccessKey", minioStu.SecretAccessKey) - return errs.New("missing configuration for endpoint, accessKeyID, or secretAccessKey").Wrap() - } +func CheckMinio(ctx context.Context, config MinioCheck) error { - minioInfo, err := jsonutil.JsonMarshal(minioStu) - if err != nil { - log.CInfo(nil, "MinioStu Marshal failed", "error", err) - return errs.WrapMsg(err, "minioStu Marshal failed") + if config.Endpoint == "" || config.AccessKeyID == "" || config.SecretAccessKey == "" { + logMsg := "Missing configuration for MinIO: endpoint, accessKeyID, or secretAccessKey" + log.CInfo(ctx, logMsg, "Config", config) + return errs.New(logMsg) } - logJsonInfo := string(minioInfo) - u, err := url.Parse(minioStu.Endpoint) + endpointURL, err := url.Parse(config.Endpoint) if err != nil { - log.CInfo(nil, "URL parse failed", "error", err, "minioInfo", logJsonInfo) - return errs.WrapMsg(err, "url parse failed") + return errs.WrapMsg(err, "Failed to parse MinIO endpoint URL") } + secure := endpointURL.Scheme == "https" || config.UseSSL - secure := u.Scheme == "https" || minioStu.UseSSL == "true" - - minioClient, err := minio.New(u.Host, &minio.Options{ - Creds: credentials.NewStaticV4(minioStu.AccessKeyID, minioStu.SecretAccessKey, ""), + minioClient, err := minio.New(endpointURL.Host, &minio.Options{ + Creds: credentials.NewStaticV4(config.AccessKeyID, config.SecretAccessKey, ""), Secure: secure, }) if err != nil { - log.CInfo(nil, "Initialize MinIO client failed", "error", err, "minioInfo", logJsonInfo) - return errs.WrapMsg(err, "initialize minio client failed") + return errs.WrapMsg(err, "Failed to initialize MinIO client", "Endpoint", config.Endpoint) } - cancel, err := minioClient.HealthCheck(time.Duration(minioHealthCheckDuration) * time.Second) + cancel, err := minioClient.HealthCheck(minioHealthCheckDuration) if err != nil { - log.CInfo(nil, "MinIO client health check failed", "error", err, "minioInfo", logJsonInfo) - return errs.WrapMsg(err, "minio client health check failed") + return errs.WrapMsg(err, "MinIO client health check failed") } defer cancel() if minioClient.IsOffline() { - log.CInfo(nil, "MinIO client is offline", "minioInfo", logJsonInfo) - return errors.New("minio client is offline") + return errs.New("minio client is offline").Wrap() } - apiURL, err := exactIP(minioStu.ApiURL) - if err != nil { - return err - } - signEndPoint, err := exactIP(minioStu.SignEndpoint) - if err != nil { - return err + apiURLHost, _ := exactIP(config.ApiURL) + signEndpointHost, _ := exactIP(config.SignEndpoint) + if apiURLHost == "127.0.0.1" || signEndpointHost == "127.0.0.1" { + logMsg := "Warning: MinIO ApiURL or SignEndpoint contains localhost" + log.CInfo(ctx, logMsg, "ApiURL", config.ApiURL, "SignEndpoint", config.SignEndpoint) } - if apiURL == "127.0.0.1" { - log.CInfo(nil, "Warning, MinIOStu.apiURL contains localhost", "apiURL", minioStu.ApiURL) - } - if signEndPoint == "127.0.0.1" { - log.CInfo(nil, "Warning, MinIOStu.signEndPoint contains localhost", "signEndPoint", minioStu.SignEndpoint) - } return nil } func exactIP(urlStr string) (string, error) { u, err := url.Parse(urlStr) if err != nil { - log.CInfo(nil, "URL parse error", "error", err, "url", urlStr) - return "", errs.WrapMsg(err, "url parse error") + return "", errs.WrapMsg(err, "URL parse error") } host, _, err := net.SplitHostPort(u.Host) if err != nil { - host = u.Host // Assume the entire host part is the host name if split fails - } - if strings.HasSuffix(host, ":") { - host = host[:len(host)-1] + host = u.Host } return host, nil } diff --git a/tools/component/checks/zookeeper.go b/tools/component/checks/zookeeper.go index 7421c9d9a..05aa38504 100644 --- a/tools/component/checks/zookeeper.go +++ b/tools/component/checks/zookeeper.go @@ -12,7 +12,7 @@ type ZookeeperCheck struct { Zookeeper *config.Zookeeper } -func checkZookeeper(ctx context.Context, config *ZookeeperCheck) error { +func CheckZookeeper(ctx context.Context, config *ZookeeperCheck) error { zkServers := config.Zookeeper.ZkAddr schema := config.Zookeeper.Schema diff --git a/tools/component/component.go b/tools/component/component.go index b17960399..c5810a9cf 100644 --- a/tools/component/component.go +++ b/tools/component/component.go @@ -16,21 +16,22 @@ package main import ( "context" - "errors" "flag" "fmt" "os" - "strings" + "strconv" "time" - "github.com/OpenIMSDK/tools/component" - kfk "github.com/openimsdk/tools/mq/kafka" + "github.com/openimsdk/open-im-server/v3/tools/component/checks" + "github.com/openimsdk/open-im-server/v3/tools/component/util" + "github.com/openimsdk/tools/log" "gopkg.in/yaml.v2" "github.com/openimsdk/tools/errs" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/minio" ) const ( @@ -57,191 +58,88 @@ func initCfg() (*config.GlobalConfig, error) { return conf, nil } -type checkFunc struct { - name string - function func(*config.GlobalConfig) error - flag bool - config *config.GlobalConfig -} - -// colorErrPrint prints formatted string in red to stderr -func colorErrPrint(msg string) { - // ANSI escape code for red text - const redColor = "\033[31m" - // ANSI escape code to reset color - const resetColor = "\033[0m" - msg = redColor + msg + resetColor - // Print to stderr in red - fmt.Fprintf(os.Stderr, "%s\n", msg) -} - -func colorSuccessPrint(format string, a ...interface{}) { - // ANSI escape code for green text is \033[32m - // \033[0m resets the color - fmt.Printf("\033[32m"+format+"\033[0m", a...) -} - func main() { flag.Parse() + ctx := context.Background() conf, err := initCfg() if err != nil { - fmt.Printf("Read config failed: %v\n", err) - return + fmt.Fprintf(os.Stderr, "Initialization failed: %v\n", err) + os.Exit(1) } - err = configGetEnv(conf) - if err != nil { - fmt.Printf("configGetEnv failed, err:%v", err) - return + if err := util.ConfigGetEnv(conf); err != nil { + fmt.Fprintf(os.Stderr, "Environment variable override failed: %v\n", err) + os.Exit(1) } - checks := []checkFunc{ - {name: "Mongo", function: checkMongo, config: conf}, - {name: "Redis", function: checkRedis, config: conf}, - {name: "Zookeeper", function: checkZookeeper, config: conf}, - {name: "Kafka", function: checkKafka, config: conf}, + // Define a slice of functions to perform each service check + serviceChecks := []func(context.Context, *config.GlobalConfig) error{ + func(ctx context.Context, cfg *config.GlobalConfig) error { + return checks.CheckMongo(ctx, &checks.MongoCheck{Mongo: &cfg.Mongo}) + }, + func(ctx context.Context, cfg *config.GlobalConfig) error { + return checks.CheckRedis(ctx, &checks.RedisCheck{Redis: &cfg.Redis}) + }, + func(ctx context.Context, cfg *config.GlobalConfig) error { + return checks.CheckZookeeper(ctx, &checks.ZookeeperCheck{Zookeeper: &cfg.Zookeeper}) + }, + func(ctx context.Context, cfg *config.GlobalConfig) error { + return checks.CheckKafka(ctx, &checks.KafkaCheck{Kafka: &cfg.Kafka}) + }, } + if conf.Object.Enable == "minio" { - checks = append(checks, checkFunc{name: "Minio", function: checkMinio, config: conf}) + minioConfig := checks.MinioCheck{ + Config: minio.Config(conf.Object.Minio), + // UseSSL: conf.Minio.UseSSL, + ApiURL: conf.Object.ApiURL, + } + + adjustUseSSL(&minioConfig) + + minioCheck := func(ctx context.Context, cfg *config.GlobalConfig) error { + return checks.CheckMinio(ctx, minioConfig) + } + serviceChecks = append(serviceChecks, minioCheck) } + // Execute checks with retry logic for i := 0; i < maxRetry; i++ { - if i != 0 { - time.Sleep(1 * time.Second) + if i > 0 { + time.Sleep(time.Second) } - fmt.Printf("Checking components round %v...\n", i+1) + fmt.Printf("Checking components, attempt %d/%d\n", i+1, maxRetry) - var err error allSuccess := true - for index, check := range checks { - if !check.flag { - err = check.function(check.config) - if err != nil { - allSuccess = false - colorErrPrint(fmt.Sprintf("Check component: %s, failed: %v", check.name, err.Error())) - - if check.name == "Minio" { - if errors.Is(err, errMinioNotEnabled) || - errors.Is(err, errSignEndPoint) || - errors.Is(err, errApiURL) { - checks[index].flag = true - continue - } - break - } - } else { - checks[index].flag = true - util.SuccessPrint(fmt.Sprintf("%s connected successfully", check.name)) - } + for _, check := range serviceChecks { + if err := check(ctx, conf); err != nil { + util.ColorErrPrint(fmt.Sprintf("Check failed: %v", err)) + allSuccess = false + break } } + if allSuccess { - component.SuccessPrint("All components started successfully!") + util.SuccessPrint("All components started successfully!") return } } - component.ErrorPrint("Some components checked failed!") - os.Exit(-1) -} - -var errMinioNotEnabled = errors.New("minio.Enable is not configured to use MinIO") - -var errSignEndPoint = errors.New("minio.signEndPoint contains 127.0.0.1, causing issues with image sending") -var errApiURL = errors.New("object.apiURL contains 127.0.0.1, causing issues with image sending") - -// checkMongo checks the MongoDB connection without retries -func checkMongo(config *config.GlobalConfig) error { - mongoStu := &component.Mongo{ - URL: config.Mongo.Uri, - Address: config.Mongo.Address, - Database: config.Mongo.Database, - Username: config.Mongo.Username, - Password: config.Mongo.Password, - MaxPoolSize: config.Mongo.MaxPoolSize, - } - err := component.CheckMongo(mongoStu) - return err -} - -// checkRedis checks the Redis connection -func checkRedis(config *config.GlobalConfig) error { - redisStu := &component.Redis{ - Address: config.Redis.Address, - Username: config.Redis.Username, - Password: config.Redis.Password, - } - err := component.CheckRedis(redisStu) - return err -} - -// checkMinio checks the MinIO connection -func checkMinio(config *config.GlobalConfig) error { - if strings.Contains(config.Object.ApiURL, "127.0.0.1") { - return errs.Wrap(errApiURL) - } - if config.Object.Enable != "minio" { - return errs.Wrap(errMinioNotEnabled) - } - if strings.Contains(config.Object.Minio.Endpoint, "127.0.0.1") { - return errs.Wrap(errSignEndPoint) - } - - minio := &component.Minio{ - ApiURL: config.Object.ApiURL, - Endpoint: config.Object.Minio.Endpoint, - AccessKeyID: config.Object.Minio.AccessKeyID, - SecretAccessKey: config.Object.Minio.SecretAccessKey, - SignEndpoint: config.Object.Minio.SignEndpoint, - UseSSL: getEnv("MINIO_USE_SSL", "false"), - } - err := component.CheckMinio(minio) - return err -} - -// checkZookeeper checks the Zookeeper connection -func checkZookeeper(config *config.GlobalConfig) error { - zkStu := &component.Zookeeper{ - Schema: config.Zookeeper.Schema, - ZkAddr: config.Zookeeper.ZkAddr, - Username: config.Zookeeper.Username, - Password: config.Zookeeper.Password, - } - err := component.CheckZookeeper(zkStu) - return err -} - -// checkKafka checks the Kafka connection -func checkKafka(config *config.GlobalConfig) error { - topics := []string{ - config.Kafka.MsgToMongo.Topic, - config.Kafka.MsgToPush.Topic, - config.Kafka.LatestMsgToRedis.Topic, - } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*60) - defer cancel() - return kfk.CheckKafka(ctx, &config.Kafka.Config, topics) + util.ErrorPrint("Some components failed to start correctly.") + os.Exit(-1) } -// isTopicPresent checks if a topic is present in the list of topics -func isTopicPresent(topic string, topics []string) bool { - for _, t := range topics { - if t == topic { - return true +// adjustUseSSL updates the UseSSL setting based on the MINIO_USE_SSL environment variable. +func adjustUseSSL(config *checks.MinioCheck) { + useSSL := config.UseSSL + if envSSL, exists := os.LookupEnv("MINIO_USE_SSL"); exists { + parsedSSL, err := strconv.ParseBool(envSSL) + if err == nil { + useSSL = parsedSSL + } else { + log.CInfo(context.Background(), "Invalid MINIO_USE_SSL value; using config file setting.", "MINIO_USE_SSL", envSSL) } } - return false -} - -func getMinioAddr(key1, key2, key3, fallback string) string { - // Prioritize environment variables - endpoint := getEnv(key1, fallback) - address, addressExist := os.LookupEnv(key2) - port, portExist := os.LookupEnv(key3) - if portExist && addressExist { - endpoint = "http://" + address + ":" + port - return endpoint - } - return endpoint + config.UseSSL = useSSL } diff --git a/tools/component/util/env.go b/tools/component/util/env.go index 62bad9fa3..92f072f76 100644 --- a/tools/component/util/env.go +++ b/tools/component/util/env.go @@ -1,4 +1,4 @@ -package env +package util import ( "os" @@ -9,7 +9,7 @@ import ( "github.com/openimsdk/tools/errs" ) -func configGetEnv(config *config.GlobalConfig) error { +func ConfigGetEnv(config *config.GlobalConfig) error { config.Mongo.Uri = getEnv("MONGO_URI", config.Mongo.Uri) config.Mongo.Username = getEnv("MONGO_OPENIM_USERNAME", config.Mongo.Username) config.Mongo.Password = getEnv("MONGO_OPENIM_PASSWORD", config.Mongo.Password) @@ -93,3 +93,15 @@ func getArrEnv(key1, key2 string, fallback []string) []string { } return fallback } + +func getMinioAddr(key1, key2, key3, fallback string) string { + // Prioritize environment variables + endpoint := getEnv(key1, fallback) + address, addressExist := os.LookupEnv(key2) + port, portExist := os.LookupEnv(key3) + if portExist && addressExist { + endpoint = "http://" + address + ":" + port + return endpoint + } + return endpoint +} diff --git a/tools/component/util/print.go b/tools/component/util/print.go index e69de29bb..4389d3f6a 100644 --- a/tools/component/util/print.go +++ b/tools/component/util/print.go @@ -0,0 +1,54 @@ +package util + +import ( + "fmt" + "log" + "os" +) + +const ( + colorRed = 31 + colorGreen = 32 + colorYellow = 33 +) + +// colorErrPrint prints formatted string in red to stderr +func ColorErrPrint(msg string) { + // ANSI escape code for red text + const redColor = "\033[31m" + // ANSI escape code to reset color + const resetColor = "\033[0m" + msg = redColor + msg + resetColor + // Print to stderr in red + fmt.Fprintf(os.Stderr, "%s\n", msg) +} + +func ColorSuccessPrint(format string, a ...interface{}) { + // ANSI escape code for green text is \033[32m + // \033[0m resets the color + fmt.Printf("\033[32m"+format+"\033[0m", a...) +} + +func colorPrint(colorCode int, format string, a ...any) { + fmt.Printf("\x1b[%dm%s\x1b[0m\n", colorCode, fmt.Sprintf(format, a...)) +} + +func colorErrPrint(colorCode int, format string, a ...any) { + log.Printf("\x1b[%dm%s\x1b[0m\n", colorCode, fmt.Sprintf(format, a...)) +} + +func ErrorPrint(s string) { + colorErrPrint(colorRed, "%v", s) +} + +func SuccessPrint(s string) { + colorPrint(colorGreen, "%v", s) +} + +func WarningPrint(s string) { + colorPrint(colorYellow, "Warning: But %v", s) +} + +func ErrStr(err error, str string) error { + return fmt.Errorf("%v;%s", err, str) +}