From 7347e67119e5679af663ef3b368b5b3dc2952937 Mon Sep 17 00:00:00 2001 From: luhaoling <2198702716@qq.com> Date: Thu, 1 Feb 2024 18:25:42 +0800 Subject: [PATCH 01/17] feat: add component check func --- tools/component/component.go | 256 +++++++++++++++-------------------- 1 file changed, 110 insertions(+), 146 deletions(-) diff --git a/tools/component/component.go b/tools/component/component.go index 4200b46f5..f5d159666 100644 --- a/tools/component/component.go +++ b/tools/component/component.go @@ -15,9 +15,18 @@ package main import ( - "errors" + "context" "flag" "fmt" + "github.com/IBM/sarama" + "github.com/OpenIMSDK/tools/log" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/cos" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/minio" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/oss" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" + "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/zookeeper" + "github.com/openimsdk/open-im-server/v3/pkg/common/kafka" "os" "strings" "time" @@ -34,13 +43,11 @@ const ( defaultCfgPath = "../../../../../config/config.yaml" maxRetry = 300 componentStartErrCode = 6000 - configErrCode = 6001 ) var ( cfgPath = flag.String("c", defaultCfgPath, "Path to the configuration file") ErrComponentStart = errs.NewCodeError(componentStartErrCode, "ComponentStartErr") - ErrConfig = errs.NewCodeError(configErrCode, "Config file is incorrect") ) func initCfg() error { @@ -66,11 +73,13 @@ func main() { return } + configGetEnv() + checks := []checkFunc{ //{name: "Mysql", function: checkMysql}, {name: "Mongo", function: checkMongo}, - {name: "Minio", function: checkMinio}, {name: "Redis", function: checkRedis}, + {name: "Minio", function: checkMinio}, {name: "Zookeeper", function: checkZookeeper}, {name: "Kafka", function: checkKafka}, } @@ -81,156 +90,115 @@ func main() { } fmt.Printf("Checking components Round %v...\n", i+1) - allSuccess := true + var ( + allSuccess bool + disruptions bool + err error + errInfo string + ) + disruptions = true for _, check := range checks { - str, err := check.function() + errInfo, err = check.function() if err != nil { - component.ErrorPrint(fmt.Sprintf("Starting %s failed, %v", check.name, err)) + component.ErrorPrint(fmt.Sprintf("Starting %s failed, %v, the conneted info is:%s", check.name, err, errInfo)) + log.ZError(context.Background(), errInfo, err) allSuccess = false break } else { - component.SuccessPrint(fmt.Sprintf("%s connected successfully, %s", check.name, str)) + component.SuccessPrint(fmt.Sprintf("%s connected successfully, the addr is:%s", check.name, errInfo)) + log.ZError(context.Background(), errInfo, err) + } + if check.name == "kafka" && errs.Unwrap(err) == ErrComponentStart { + disruptions = false } } if allSuccess { component.SuccessPrint("All components started successfully!") - + log.ZInfo(context.Background(), errInfo, err) return } - } - os.Exit(1) -} -// Helper function to get environment variable or default value -func getEnv(key, fallback string) string { - if value, exists := os.LookupEnv(key); exists { - return value + if disruptions { + component.ErrorPrint(fmt.Sprintf("component check exit,err: %v", err)) + return + } } - return fallback } // checkMongo checks the MongoDB connection without retries func checkMongo() (string, error) { - mongo := &component.Mongo{ - Address: config.Config.Mongo.Address, - Database: config.Config.Mongo.Database, - Username: config.Config.Mongo.Username, - Password: config.Config.Mongo.Password, - MaxPoolSize: config.Config.Mongo.MaxPoolSize, - } - uri, uriExist := os.LookupEnv("MONGO_URI") - if uriExist { - mongo.URL = uri - } - - str, err := component.CheckMongo(mongo) + _, err := unrelation.NewMongo() if err != nil { - return "", err + if config.Config.Mongo.Uri != "" { + return config.Config.Mongo.Uri, err + } + uriFormat := "mongodb://%s/%s?maxPoolSize=%s" + if config.Config.Mongo.Username != "" && config.Config.Mongo.Password != "" { + uriFormat = "mongodb://%s:%s@%s/%s?maxPoolSize=%s" + return fmt.Sprintf(uriFormat, config.Config.Mongo.Username, config.Config.Mongo.Password, config.Config.Mongo.Address, config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize), err + } + return fmt.Sprintf(uriFormat, config.Config.Mongo.Address, config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize), err } - return str, nil + return strings.Join(config.Config.Mongo.Address, ","), nil } -// checkMinio checks the MinIO connection -func checkMinio() (string, error) { - // Check if MinIO is enabled - if config.Config.Object.Enable != "minio" { - return "", nil - } - - endpoint, err := getMinioAddr("MINIO_ENDPOINT", "MINIO_ADDRESS", "MINIO_PORT", config.Config.Object.Minio.Endpoint) - if err != nil { - return "", err - } - - minio := &component.Minio{ - ApiURL: config.Config.Object.ApiURL, - Endpoint: endpoint, - AccessKeyID: getEnv("MINIO_ACCESS_KEY_ID", config.Config.Object.Minio.AccessKeyID), - SecretAccessKey: getEnv("MINIO_SECRET_ACCESS_KEY", config.Config.Object.Minio.SecretAccessKey), - SignEndpoint: config.Config.Object.Minio.SignEndpoint, - UseSSL: getEnv("MINIO_USE_SSL", "false"), - } - - str, err := component.CheckMinio(minio) +// checkRedis checks the Redis connection +func checkRedis() (string, error) { + _, err := cache.NewRedis() if err != nil { - return "", err + uriFormat := "The username is:%s, the password is:%s, the address is:%s, the clusterMode is:%t" + return fmt.Sprintf(uriFormat, config.Config.Redis.Username, config.Config.Redis.Password, config.Config.Redis.Address, config.Config.Redis.ClusterMode), err } - return str, nil + return strings.Join(config.Config.Redis.Address, ","), err } -// checkRedis checks the Redis connection -func checkRedis() (string, error) { - // Prioritize environment variables - address := getEnv("REDIS_ADDRESS", strings.Join(config.Config.Redis.Address, ",")) - username := getEnv("REDIS_USERNAME", config.Config.Redis.Username) - password := getEnv("REDIS_PASSWORD", config.Config.Redis.Password) - - redis := &component.Redis{ - Address: strings.Split(address, ","), - Username: username, - Password: password, - } +// checkMinio checks the MinIO connection +func checkMinio() (string, error) { - addresses, err := getAddress("REDIS_ADDRESS", "REDIS_PORT", config.Config.Redis.Address) - if err != nil { - return "", err + rdb, err := cache.NewRedis() + + enable := config.Config.Object.Enable + switch config.Config.Object.Enable { + case "minio": + _, err = minio.NewMinio(cache.NewMinioCache(rdb)) + case "cos": + _, err = cos.NewCos() + case "oss": + _, err = oss.NewOSS() + default: + err = fmt.Errorf("invalid object enable: %s", enable) } - redis.Address = addresses - - str, err := component.CheckRedis(redis) if err != nil { - return "", err + uriFormat := "The apiURL is:%s, the endpoint is:%s, the signEndpoint is:%s." + return fmt.Sprintf(uriFormat, config.Config.Object.ApiURL, config.Config.Object.Minio.Endpoint, config.Config.Object.Minio.SignEndpoint), err } - return str, nil + return config.Config.Object.Minio.Endpoint, nil } // checkZookeeper checks the Zookeeper connection func checkZookeeper() (string, error) { - // Prioritize environment variables - - address := getEnv("ZOOKEEPER_ADDRESS", strings.Join(config.Config.Zookeeper.ZkAddr, ",")) - - zk := &component.Zookeeper{ - Schema: getEnv("ZOOKEEPER_SCHEMA", "digest"), - ZkAddr: strings.Split(address, ","), - Username: getEnv("ZOOKEEPER_USERNAME", config.Config.Zookeeper.Username), - Password: getEnv("ZOOKEEPER_PASSWORD", config.Config.Zookeeper.Password), - } - - addresses, err := getAddress("ZOOKEEPER_ADDRESS", "ZOOKEEPER_PORT", config.Config.Zookeeper.ZkAddr) + _, err := zookeeper.NewZookeeperDiscoveryRegister() if err != nil { - return "", nil - } - zk.ZkAddr = addresses - - str, err := component.CheckZookeeper(zk) - if err != nil { - return "", err + if config.Config.Zookeeper.Username != "" && config.Config.Zookeeper.Password != "" { + return fmt.Sprintf("The addr is:%s,the schema is:%s, the username is:%s, the password is:%s.", config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, config.Config.Zookeeper.Username, config.Config.Zookeeper.Password), err + } + return fmt.Sprintf("The addr is:%s,the schema is:%s", config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema), err } - return str, nil + return strings.Join(config.Config.Zookeeper.ZkAddr, ","), nil } // checkKafka checks the Kafka connection func checkKafka() (string, error) { - // Prioritize environment variables - username := getEnv("KAFKA_USERNAME", config.Config.Kafka.Username) - password := getEnv("KAFKA_PASSWORD", config.Config.Kafka.Password) - address := getEnv("KAFKA_ADDRESS", strings.Join(config.Config.Kafka.Addr, ",")) - - kafka := &component.Kafka{ - Username: username, - Password: password, - Addr: strings.Split(address, ","), - } - addresses, err := getAddress("KAFKA_ADDRESS", "KAFKA_PORT", config.Config.Kafka.Addr) - if err != nil { - return "", nil + // Prioritize environment variables + kafkaStu := &component.Kafka{ + Username: config.Config.Kafka.Username, + Password: config.Config.Kafka.Password, + Addr: config.Config.Kafka.Addr, } - kafka.Addr = addresses - str, kafkaClient, err := component.CheckKafka(kafka) + str, kafkaClient, err := component.CheckKafka(kafkaStu) if err != nil { return "", err } @@ -254,6 +222,24 @@ func checkKafka() (string, error) { } } + kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{ + KafkaVersion: sarama.V2_0_0_0, + OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, + }, []string{config.Config.Kafka.LatestMsgToRedis.Topic}, + config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis) + + kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{ + KafkaVersion: sarama.V2_0_0_0, + OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, + }, []string{config.Config.Kafka.MsgToMongo.Topic}, + config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo) + + kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{ + KafkaVersion: sarama.V2_0_0_0, + OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, + }, []string{config.Config.Kafka.MsgToPush.Topic}, config.Config.Kafka.Addr, + config.Config.Kafka.ConsumerGroupID.MsgToPush) + return str, nil } @@ -267,42 +253,20 @@ func isTopicPresent(topic string, topics []string) bool { return false } -func getAddress(key1, key2 string, fallback []string) ([]string, error) { - address, addrExist := os.LookupEnv(key1) - port, portExist := os.LookupEnv(key2) +func configGetEnv() { + config.Config.Mongo.Uri = getEnv("MONGO_URI", config.Config.Mongo.Uri) + config.Config.Mongo.Username = getEnv("MONGO_OPENIM_USERNAME", config.Config.Mongo.Username) + config.Config.Mongo.Password = getEnv("MONGO_OPENIM_PASSWORD", config.Config.Mongo.Password) + config.Config.Kafka.Username = getEnv("KAFKA_USERNAME", config.Config.Kafka.Username) + config.Config.Kafka.Password = getEnv("KAFKA_PASSWORD", config.Config.Kafka.Password) + config.Config.Kafka.Addr = strings.Split(getEnv("KAFKA_ADDRESS", strings.Join(config.Config.Kafka.Addr, ",")), ",") - if addrExist && portExist { - addresses := strings.Split(address, ",") - for i, addr := range addresses { - addresses[i] = addr + ":" + port - } - return addresses, nil - } else if !addrExist && portExist { - result := make([]string, len(config.Config.Redis.Address)) - for i, addr := range config.Config.Redis.Address { - add := strings.Split(addr, ":") - result[i] = add[0] + ":" + port - } - return result, nil - } else if addrExist && !portExist { - return nil, errs.Wrap(errors.New("the ZOOKEEPER_PORT of minio is empty")) - } - return fallback, nil } -func getMinioAddr(key1, key2, key3, fallback string) (string, error) { - // Prioritize environment variables - endpoint := getEnv(key1, fallback) - address, addressExist := os.LookupEnv(key2) - port, portExist := os.LookupEnv(key3) - if portExist && addressExist { - endpoint = "http://" + address + ":" + port - } else if !portExist && addressExist { - return "", errs.Wrap(errors.New("the MINIO_PORT of minio is empty")) - } else if portExist && !addressExist { - arr := strings.Split(config.Config.Object.Minio.Endpoint, ":") - arr[2] = port - endpoint = strings.Join(arr, ":") +// Helper function to get environment variable or default value +func getEnv(key, fallback string) string { + if value, exists := os.LookupEnv(key); exists { + return value } - return endpoint, nil -} \ No newline at end of file + return fallback +} From e4964bb6107a3b6dd263ca73ab9f5a9c8779468d Mon Sep 17 00:00:00 2001 From: luhaoling <2198702716@qq.com> Date: Thu, 1 Feb 2024 18:31:04 +0800 Subject: [PATCH 02/17] fix: fix the outpu error --- tools/component/component.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tools/component/component.go b/tools/component/component.go index f5d159666..293e6d9fa 100644 --- a/tools/component/component.go +++ b/tools/component/component.go @@ -91,12 +91,11 @@ func main() { fmt.Printf("Checking components Round %v...\n", i+1) var ( - allSuccess bool - disruptions bool - err error - errInfo string + err error + errInfo string ) - disruptions = true + allSuccess := true + disruptions := true for _, check := range checks { errInfo, err = check.function() if err != nil { From b8ca407cda8f61ea50401c20a24109e6bdce6c21 Mon Sep 17 00:00:00 2001 From: luhaoling <2198702716@qq.com> Date: Thu, 1 Feb 2024 18:48:48 +0800 Subject: [PATCH 03/17] fix: fix the stderr outpu --- tools/component/component.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tools/component/component.go b/tools/component/component.go index 293e6d9fa..71422ce77 100644 --- a/tools/component/component.go +++ b/tools/component/component.go @@ -15,11 +15,9 @@ package main import ( - "context" "flag" "fmt" "github.com/IBM/sarama" - "github.com/OpenIMSDK/tools/log" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/cos" "github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/minio" @@ -100,12 +98,12 @@ func main() { errInfo, err = check.function() if err != nil { component.ErrorPrint(fmt.Sprintf("Starting %s failed, %v, the conneted info is:%s", check.name, err, errInfo)) - log.ZError(context.Background(), errInfo, err) + fmt.Fprintln(os.Stderr, errInfo, err) allSuccess = false break } else { component.SuccessPrint(fmt.Sprintf("%s connected successfully, the addr is:%s", check.name, errInfo)) - log.ZError(context.Background(), errInfo, err) + fmt.Fprintln(os.Stderr, errInfo, err) } if check.name == "kafka" && errs.Unwrap(err) == ErrComponentStart { disruptions = false @@ -114,12 +112,12 @@ func main() { if allSuccess { component.SuccessPrint("All components started successfully!") - log.ZInfo(context.Background(), errInfo, err) return } if disruptions { component.ErrorPrint(fmt.Sprintf("component check exit,err: %v", err)) + fmt.Fprintln(os.Stderr, errInfo, err) return } } From e729fe62e130bcb3b27f21fe0a559a2753c043ba Mon Sep 17 00:00:00 2001 From: luhaoling <2198702716@qq.com> Date: Fri, 2 Feb 2024 12:25:00 +0800 Subject: [PATCH 04/17] fix: fix the component check func --- pkg/common/db/cache/init_redis.go | 5 +- pkg/common/db/unrelation/mongo.go | 4 +- .../discoveryregister/zookeeper/zookeeper.go | 14 +- pkg/common/kafka/consumer_group.go | 2 +- tools/component/component.go | 173 ++++++++++-------- 5 files changed, 113 insertions(+), 85 deletions(-) diff --git a/pkg/common/db/cache/init_redis.go b/pkg/common/db/cache/init_redis.go index c593089e6..3cec73be5 100644 --- a/pkg/common/db/cache/init_redis.go +++ b/pkg/common/db/cache/init_redis.go @@ -77,9 +77,10 @@ func NewRedis() (redis.UniversalClient, error) { defer cancel() err = rdb.Ping(ctx).Err() if err != nil { - return nil, errs.Wrap(fmt.Errorf("redis ping %w", err)) + uriFormat := "address:%s, username:%s, password:%s, clusterMode:%t, enablePipeline:%t" + errMsg := fmt.Sprintf(uriFormat, config.Config.Redis.Address, config.Config.Redis.Username, config.Config.Redis.Password, config.Config.Redis.ClusterMode, config.Config.Redis.EnablePipeline) + return nil, errs.Wrap(err, errMsg) } - redisClient = rdb return rdb, err } diff --git a/pkg/common/db/unrelation/mongo.go b/pkg/common/db/unrelation/mongo.go index e07c5b7f4..fe89c6b8a 100644 --- a/pkg/common/db/unrelation/mongo.go +++ b/pkg/common/db/unrelation/mongo.go @@ -61,9 +61,9 @@ func NewMongo() (*Mongo, error) { time.Sleep(time.Second) // exponential backoff could be implemented here continue } - return nil, errs.Wrap(err) + return nil, errs.Wrap(err, uri) } - return nil, errs.Wrap(err) + return nil, errs.Wrap(err, uri) } func buildMongoURI() string { diff --git a/pkg/common/discoveryregister/zookeeper/zookeeper.go b/pkg/common/discoveryregister/zookeeper/zookeeper.go index db1a5c6c4..0082e9833 100644 --- a/pkg/common/discoveryregister/zookeeper/zookeeper.go +++ b/pkg/common/discoveryregister/zookeeper/zookeeper.go @@ -15,6 +15,8 @@ package zookeeper import ( + "fmt" + "github.com/OpenIMSDK/tools/errs" "os" "strings" "time" @@ -33,7 +35,7 @@ func NewZookeeperDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, er username := getEnv("ZOOKEEPER_USERNAME", config.Config.Zookeeper.Username) password := getEnv("ZOOKEEPER_PASSWORD", config.Config.Zookeeper.Password) - return openkeeper.NewClient( + zk, err := openkeeper.NewClient( zkAddr, schema, openkeeper.WithFreq(time.Hour), @@ -42,6 +44,16 @@ func NewZookeeperDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, er openkeeper.WithTimeout(10), openkeeper.WithLogger(log.NewZkLogger()), ) + if err != nil { + uriFormat := "address:%s, username :%s, password :%s, schema:%s." + errInfo := fmt.Sprintf(uriFormat, + config.Config.Zookeeper.ZkAddr, + config.Config.Zookeeper.Username, + config.Config.Zookeeper.Password, + config.Config.Zookeeper.Schema) + return nil, errs.Wrap(err, errInfo) + } + return zk, nil } // getEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value. diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index 05006f582..87e6d5686 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -51,7 +51,7 @@ func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []str SetupTLSConfig(consumerGroupConfig) consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, consumerGroupConfig) if err != nil { - return nil, errs.Wrap(err, strings.Join(topics, ","), strings.Join(addrs, ","), groupID) + return nil, errs.Wrap(err, strings.Join(topics, ","), strings.Join(addrs, ","), groupID, config.Config.Kafka.Username, config.Config.Kafka.Password) } return &MConsumerGroup{ consumerGroup, diff --git a/tools/component/component.go b/tools/component/component.go index 71422ce77..3624c03c9 100644 --- a/tools/component/component.go +++ b/tools/component/component.go @@ -15,16 +15,15 @@ package main import ( + "errors" "flag" "fmt" "github.com/IBM/sarama" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/cos" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/minio" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/oss" "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/zookeeper" "github.com/openimsdk/open-im-server/v3/pkg/common/kafka" + "log" "os" "strings" "time" @@ -38,14 +37,18 @@ import ( const ( // defaultCfgPath is the default path of the configuration file. - defaultCfgPath = "../../../../../config/config.yaml" - maxRetry = 300 - componentStartErrCode = 6000 + defaultCfgPath = "../../../../../config/config.yaml" + maxRetry = 300 + colorRed = 31 ) var ( - cfgPath = flag.String("c", defaultCfgPath, "Path to the configuration file") - ErrComponentStart = errs.NewCodeError(componentStartErrCode, "ComponentStartErr") + cfgPath = flag.String("c", defaultCfgPath, "Path to the configuration file") + MongoAuthFailed = "Authentication failed." + RedisAuthFailed = "NOAUTH Authentication required." + MinioAuthFailed = "Minio Authentication failed" + ZkAuthFailed = "zk Authentication failed" + KafkaAuthFailed = "SASL Authentication failed" ) func initCfg() error { @@ -59,7 +62,8 @@ func initCfg() error { type checkFunc struct { name string - function func() (string, error) + function func() error + authInfo string } func main() { @@ -75,11 +79,11 @@ func main() { checks := []checkFunc{ //{name: "Mysql", function: checkMysql}, - {name: "Mongo", function: checkMongo}, - {name: "Redis", function: checkRedis}, - {name: "Minio", function: checkMinio}, - {name: "Zookeeper", function: checkZookeeper}, - {name: "Kafka", function: checkKafka}, + {name: "Mongo", function: checkMongo, authInfo: MongoAuthFailed}, + {name: "Redis", function: checkRedis, authInfo: RedisAuthFailed}, + {name: "Minio", function: checkMinio, authInfo: MinioAuthFailed}, + {name: "Zookeeper", function: checkZookeeper, authInfo: ZkAuthFailed}, + {name: "Kafka", function: checkKafka, authInfo: KafkaAuthFailed}, } for i := 0; i < maxRetry; i++ { @@ -89,25 +93,24 @@ func main() { fmt.Printf("Checking components Round %v...\n", i+1) var ( - err error - errInfo string + err error + errInfo string + disruptions bool ) allSuccess := true - disruptions := true for _, check := range checks { - errInfo, err = check.function() + err = check.function() if err != nil { - component.ErrorPrint(fmt.Sprintf("Starting %s failed, %v, the conneted info is:%s", check.name, err, errInfo)) - fmt.Fprintln(os.Stderr, errInfo, err) + if errorJudge(err, check.authInfo) { + disruptions = true + } + ErrorPrint(fmt.Sprintf("Starting %s failed, %v, the conneted info is:%s", check.name, err, errInfo)) allSuccess = false break } else { component.SuccessPrint(fmt.Sprintf("%s connected successfully, the addr is:%s", check.name, errInfo)) - fmt.Fprintln(os.Stderr, errInfo, err) - } - if check.name == "kafka" && errs.Unwrap(err) == ErrComponentStart { - disruptions = false } + } if allSuccess { @@ -117,76 +120,50 @@ func main() { if disruptions { component.ErrorPrint(fmt.Sprintf("component check exit,err: %v", err)) - fmt.Fprintln(os.Stderr, errInfo, err) return } } } // checkMongo checks the MongoDB connection without retries -func checkMongo() (string, error) { +func checkMongo() error { _, err := unrelation.NewMongo() - if err != nil { - if config.Config.Mongo.Uri != "" { - return config.Config.Mongo.Uri, err - } - uriFormat := "mongodb://%s/%s?maxPoolSize=%s" - if config.Config.Mongo.Username != "" && config.Config.Mongo.Password != "" { - uriFormat = "mongodb://%s:%s@%s/%s?maxPoolSize=%s" - return fmt.Sprintf(uriFormat, config.Config.Mongo.Username, config.Config.Mongo.Password, config.Config.Mongo.Address, config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize), err - } - return fmt.Sprintf(uriFormat, config.Config.Mongo.Address, config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize), err - } - return strings.Join(config.Config.Mongo.Address, ","), nil + return err } // checkRedis checks the Redis connection -func checkRedis() (string, error) { +func checkRedis() error { _, err := cache.NewRedis() - if err != nil { - uriFormat := "The username is:%s, the password is:%s, the address is:%s, the clusterMode is:%t" - return fmt.Sprintf(uriFormat, config.Config.Redis.Username, config.Config.Redis.Password, config.Config.Redis.Address, config.Config.Redis.ClusterMode), err - } - return strings.Join(config.Config.Redis.Address, ","), err + return err } // checkMinio checks the MinIO connection -func checkMinio() (string, error) { - - rdb, err := cache.NewRedis() - - enable := config.Config.Object.Enable - switch config.Config.Object.Enable { - case "minio": - _, err = minio.NewMinio(cache.NewMinioCache(rdb)) - case "cos": - _, err = cos.NewCos() - case "oss": - _, err = oss.NewOSS() - default: - err = fmt.Errorf("invalid object enable: %s", enable) +func checkMinio() error { + + // Check if MinIO is enabled + if config.Config.Object.Enable != "minio" { + return nil } - if err != nil { - uriFormat := "The apiURL is:%s, the endpoint is:%s, the signEndpoint is:%s." - return fmt.Sprintf(uriFormat, config.Config.Object.ApiURL, config.Config.Object.Minio.Endpoint, config.Config.Object.Minio.SignEndpoint), err + minio := &component.Minio{ + ApiURL: config.Config.Object.ApiURL, + Endpoint: config.Config.Object.Minio.Endpoint, + AccessKeyID: config.Config.Object.Minio.AccessKeyID, + SecretAccessKey: config.Config.Object.Minio.SecretAccessKey, + SignEndpoint: config.Config.Object.Minio.SignEndpoint, + UseSSL: getEnv("MINIO_USE_SSL", "false"), } - return config.Config.Object.Minio.Endpoint, nil + _, err := component.CheckMinio(minio) + return err } // checkZookeeper checks the Zookeeper connection -func checkZookeeper() (string, error) { +func checkZookeeper() error { _, err := zookeeper.NewZookeeperDiscoveryRegister() - if err != nil { - if config.Config.Zookeeper.Username != "" && config.Config.Zookeeper.Password != "" { - return fmt.Sprintf("The addr is:%s,the schema is:%s, the username is:%s, the password is:%s.", config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, config.Config.Zookeeper.Username, config.Config.Zookeeper.Password), err - } - return fmt.Sprintf("The addr is:%s,the schema is:%s", config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema), err - } - return strings.Join(config.Config.Zookeeper.ZkAddr, ","), nil + return err } // checkKafka checks the Kafka connection -func checkKafka() (string, error) { +func checkKafka() error { // Prioritize environment variables kafkaStu := &component.Kafka{ @@ -195,16 +172,16 @@ func checkKafka() (string, error) { Addr: config.Config.Kafka.Addr, } - str, kafkaClient, err := component.CheckKafka(kafkaStu) + _, kafkaClient, err := component.CheckKafka(kafkaStu) if err != nil { - return "", err + return err } defer kafkaClient.Close() // Verify if necessary topics exist topics, err := kafkaClient.Topics() if err != nil { - return "", errs.Wrap(err) + return errs.Wrap(err) } requiredTopics := []string{ @@ -215,29 +192,38 @@ func checkKafka() (string, error) { for _, requiredTopic := range requiredTopics { if !isTopicPresent(requiredTopic, topics) { - return "", ErrComponentStart.Wrap(fmt.Sprintf("Kafka doesn't contain topic: %v", requiredTopic)) + return errs.Wrap(err, fmt.Sprintf("Kafka doesn't contain topic: %v", requiredTopic)) } } - kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{ + _, err = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{ KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, }, []string{config.Config.Kafka.LatestMsgToRedis.Topic}, config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis) + if err != nil { + return err + } - kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{ + _, err = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{ KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, - }, []string{config.Config.Kafka.MsgToMongo.Topic}, + }, []string{config.Config.Kafka.MsgToPush.Topic}, config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo) + if err != nil { + return err + } kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{ KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, }, []string{config.Config.Kafka.MsgToPush.Topic}, config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToPush) + if err != nil { + return err + } - return str, nil + return nil } // isTopicPresent checks if a topic is present in the list of topics @@ -251,13 +237,27 @@ func isTopicPresent(topic string, topics []string) bool { } func configGetEnv() { + config.Config.Object.Minio.AccessKeyID = getEnv("MINIO_ACCESS_KEY_ID", config.Config.Object.Minio.AccessKeyID) + config.Config.Object.Minio.SecretAccessKey = getEnv("MINIO_SECRET_ACCESS_KEY", config.Config.Object.Minio.SecretAccessKey) config.Config.Mongo.Uri = getEnv("MONGO_URI", config.Config.Mongo.Uri) config.Config.Mongo.Username = getEnv("MONGO_OPENIM_USERNAME", config.Config.Mongo.Username) config.Config.Mongo.Password = getEnv("MONGO_OPENIM_PASSWORD", config.Config.Mongo.Password) config.Config.Kafka.Username = getEnv("KAFKA_USERNAME", config.Config.Kafka.Username) config.Config.Kafka.Password = getEnv("KAFKA_PASSWORD", config.Config.Kafka.Password) config.Config.Kafka.Addr = strings.Split(getEnv("KAFKA_ADDRESS", strings.Join(config.Config.Kafka.Addr, ",")), ",") + config.Config.Object.Minio.Endpoint = getMinioAddr("MINIO_ENDPOINT", "MINIO_ADDRESS", "MINIO_PORT", config.Config.Object.Minio.Endpoint) +} +func getMinioAddr(key1, key2, key3, fallback string) string { + // Prioritize environment variables + endpoint := getEnv(key1, fallback) + address, addressExist := os.LookupEnv(key2) + port, portExist := os.LookupEnv(key3) + if portExist && addressExist { + endpoint = "http://" + address + ":" + port + return endpoint + } + return endpoint } // Helper function to get environment variable or default value @@ -267,3 +267,18 @@ func getEnv(key, fallback string) string { } return fallback } + +func ErrorPrint(s string) { + colorPrint(colorRed, "%v", s) +} + +func colorPrint(colorCode int, format string, a ...interface{}) { + log.Printf("\x1b[%dm%s\x1b[0m\n", colorCode, fmt.Sprintf(format, a...)) +} + +func errorJudge(err error, errMsg string) bool { + if strings.Contains(errors.Unwrap(err).Error(), errMsg) { + return true + } + return false +} From 5a96112d242c5e826ae5d8380861538df9e90ae8 Mon Sep 17 00:00:00 2001 From: luhaoling <2198702716@qq.com> Date: Fri, 2 Feb 2024 14:14:40 +0800 Subject: [PATCH 05/17] fix: fix the error --- tools/component/component.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/tools/component/component.go b/tools/component/component.go index 3624c03c9..e86761ec9 100644 --- a/tools/component/component.go +++ b/tools/component/component.go @@ -61,9 +61,9 @@ func initCfg() error { } type checkFunc struct { - name string - function func() error - authInfo string + name string + function func() error + authErrInfo string } func main() { @@ -79,11 +79,11 @@ func main() { checks := []checkFunc{ //{name: "Mysql", function: checkMysql}, - {name: "Mongo", function: checkMongo, authInfo: MongoAuthFailed}, - {name: "Redis", function: checkRedis, authInfo: RedisAuthFailed}, - {name: "Minio", function: checkMinio, authInfo: MinioAuthFailed}, - {name: "Zookeeper", function: checkZookeeper, authInfo: ZkAuthFailed}, - {name: "Kafka", function: checkKafka, authInfo: KafkaAuthFailed}, + {name: "Mongo", function: checkMongo, authErrInfo: MongoAuthFailed}, + {name: "Redis", function: checkRedis, authErrInfo: RedisAuthFailed}, + {name: "Minio", function: checkMinio, authErrInfo: MinioAuthFailed}, + {name: "Zookeeper", function: checkZookeeper, authErrInfo: ZkAuthFailed}, + {name: "Kafka", function: checkKafka, authErrInfo: KafkaAuthFailed}, } for i := 0; i < maxRetry; i++ { @@ -101,25 +101,25 @@ func main() { for _, check := range checks { err = check.function() if err != nil { - if errorJudge(err, check.authInfo) { + if errorJudge(err, check.authErrInfo) { disruptions = true } - ErrorPrint(fmt.Sprintf("Starting %s failed, %v, the conneted info is:%s", check.name, err, errInfo)) + ErrorPrint(fmt.Sprintf("Starting %s failed:%v, conneted info:%s", check.name, err, errInfo)) allSuccess = false break } else { - component.SuccessPrint(fmt.Sprintf("%s connected successfully, the addr is:%s", check.name, errInfo)) + component.SuccessPrint(fmt.Sprintf("%s connected successfully, address:%s", check.name, errInfo)) } } - if allSuccess { - component.SuccessPrint("All components started successfully!") + if disruptions { + component.ErrorPrint(fmt.Sprintf("component check exit,err: %v", err)) return } - if disruptions { - component.ErrorPrint(fmt.Sprintf("component check exit,err: %v", err)) + if allSuccess { + component.SuccessPrint("All components started successfully!") return } } From 6e2b50d63beaa18f18892b3aeef97f0fbd591030 Mon Sep 17 00:00:00 2001 From: luhaoling <2198702716@qq.com> Date: Fri, 2 Feb 2024 14:23:10 +0800 Subject: [PATCH 06/17] fix: fix the output error --- tools/component/component.go | 1 - 1 file changed, 1 deletion(-) diff --git a/tools/component/component.go b/tools/component/component.go index e86761ec9..d7b3f6e73 100644 --- a/tools/component/component.go +++ b/tools/component/component.go @@ -106,7 +106,6 @@ func main() { } ErrorPrint(fmt.Sprintf("Starting %s failed:%v, conneted info:%s", check.name, err, errInfo)) allSuccess = false - break } else { component.SuccessPrint(fmt.Sprintf("%s connected successfully, address:%s", check.name, errInfo)) } From e0fd4ee4c12262b53264298a1c61abbac4416556 Mon Sep 17 00:00:00 2001 From: luhaoling <2198702716@qq.com> Date: Fri, 2 Feb 2024 14:44:58 +0800 Subject: [PATCH 07/17] fix: del the disruptions code --- tools/component/component.go | 48 +++++++++--------------------------- 1 file changed, 11 insertions(+), 37 deletions(-) diff --git a/tools/component/component.go b/tools/component/component.go index d7b3f6e73..ae8eb96ef 100644 --- a/tools/component/component.go +++ b/tools/component/component.go @@ -15,7 +15,6 @@ package main import ( - "errors" "flag" "fmt" "github.com/IBM/sarama" @@ -43,12 +42,7 @@ const ( ) var ( - cfgPath = flag.String("c", defaultCfgPath, "Path to the configuration file") - MongoAuthFailed = "Authentication failed." - RedisAuthFailed = "NOAUTH Authentication required." - MinioAuthFailed = "Minio Authentication failed" - ZkAuthFailed = "zk Authentication failed" - KafkaAuthFailed = "SASL Authentication failed" + cfgPath = flag.String("c", defaultCfgPath, "Path to the configuration file") ) func initCfg() error { @@ -61,9 +55,8 @@ func initCfg() error { } type checkFunc struct { - name string - function func() error - authErrInfo string + name string + function func() error } func main() { @@ -79,11 +72,11 @@ func main() { checks := []checkFunc{ //{name: "Mysql", function: checkMysql}, - {name: "Mongo", function: checkMongo, authErrInfo: MongoAuthFailed}, - {name: "Redis", function: checkRedis, authErrInfo: RedisAuthFailed}, - {name: "Minio", function: checkMinio, authErrInfo: MinioAuthFailed}, - {name: "Zookeeper", function: checkZookeeper, authErrInfo: ZkAuthFailed}, - {name: "Kafka", function: checkKafka, authErrInfo: KafkaAuthFailed}, + {name: "Mongo", function: checkMongo}, + {name: "Redis", function: checkRedis}, + {name: "Minio", function: checkMinio}, + {name: "Zookeeper", function: checkZookeeper}, + {name: "Kafka", function: checkKafka}, } for i := 0; i < maxRetry; i++ { @@ -92,29 +85,16 @@ func main() { } fmt.Printf("Checking components Round %v...\n", i+1) - var ( - err error - errInfo string - disruptions bool - ) + var err error allSuccess := true for _, check := range checks { err = check.function() if err != nil { - if errorJudge(err, check.authErrInfo) { - disruptions = true - } - ErrorPrint(fmt.Sprintf("Starting %s failed:%v, conneted info:%s", check.name, err, errInfo)) + ErrorPrint(fmt.Sprintf("Starting %s failed:%v.", check.name, err)) allSuccess = false } else { - component.SuccessPrint(fmt.Sprintf("%s connected successfully, address:%s", check.name, errInfo)) + component.SuccessPrint(fmt.Sprintf("%s connected successfully", check.name)) } - - } - - if disruptions { - component.ErrorPrint(fmt.Sprintf("component check exit,err: %v", err)) - return } if allSuccess { @@ -275,9 +255,3 @@ func colorPrint(colorCode int, format string, a ...interface{}) { log.Printf("\x1b[%dm%s\x1b[0m\n", colorCode, fmt.Sprintf(format, a...)) } -func errorJudge(err error, errMsg string) bool { - if strings.Contains(errors.Unwrap(err).Error(), errMsg) { - return true - } - return false -} From 3bcc33f269c1c134437e26046e24e29c07f806e8 Mon Sep 17 00:00:00 2001 From: luhaoling <2198702716@qq.com> Date: Fri, 2 Feb 2024 15:13:52 +0800 Subject: [PATCH 08/17] fix the log output format --- tools/component/component.go | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/tools/component/component.go b/tools/component/component.go index ae8eb96ef..7a406a408 100644 --- a/tools/component/component.go +++ b/tools/component/component.go @@ -22,7 +22,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/zookeeper" "github.com/openimsdk/open-im-server/v3/pkg/common/kafka" - "log" "os" "strings" "time" @@ -38,7 +37,6 @@ const ( // defaultCfgPath is the default path of the configuration file. defaultCfgPath = "../../../../../config/config.yaml" maxRetry = 300 - colorRed = 31 ) var ( @@ -90,7 +88,7 @@ func main() { for _, check := range checks { err = check.function() if err != nil { - ErrorPrint(fmt.Sprintf("Starting %s failed:%v.", check.name, err)) + component.ErrorPrint(fmt.Sprintf("Starting %s failed:%v.", check.name, err)) allSuccess = false } else { component.SuccessPrint(fmt.Sprintf("%s connected successfully", check.name)) @@ -246,12 +244,3 @@ func getEnv(key, fallback string) string { } return fallback } - -func ErrorPrint(s string) { - colorPrint(colorRed, "%v", s) -} - -func colorPrint(colorCode int, format string, a ...interface{}) { - log.Printf("\x1b[%dm%s\x1b[0m\n", colorCode, fmt.Sprintf(format, a...)) -} - From 633388b6d8af09d0520c41cfe85919b9e834435f Mon Sep 17 00:00:00 2001 From: luhaoling <2198702716@qq.com> Date: Fri, 2 Feb 2024 16:03:26 +0800 Subject: [PATCH 09/17] fix: fix the tools version --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index a5fecb5ee..16a10a945 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.19 require ( firebase.google.com/go v3.13.0+incompatible github.com/OpenIMSDK/protocol v0.0.48 - github.com/OpenIMSDK/tools v0.0.31 + github.com/OpenIMSDK/tools v0.0.32 github.com/bwmarrin/snowflake v0.3.0 // indirect github.com/dtm-labs/rockscache v0.1.1 github.com/gin-gonic/gin v1.9.1 diff --git a/go.sum b/go.sum index fbd9366c3..136035cef 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,8 @@ github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c= github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= github.com/OpenIMSDK/protocol v0.0.48 h1:8MIMjyzJRsruYhVv2ZKArFiOveroaofDOb3dlAdgjsw= github.com/OpenIMSDK/protocol v0.0.48/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= -github.com/OpenIMSDK/tools v0.0.31 h1:fSrhcPTvHEMTSyrJZDupe730mL4nuhvSOUP/BaZiHaY= -github.com/OpenIMSDK/tools v0.0.31/go.mod h1:wBfR5CYmEyvxl03QJbTkhz1CluK6J4/lX0lviu8JAjE= +github.com/OpenIMSDK/tools v0.0.32 h1:b8KwtxXKZTsyyHUcZ4OtSo6s/vVXx4HjMuPxH7Kb7Gg= +github.com/OpenIMSDK/tools v0.0.32/go.mod h1:wBfR5CYmEyvxl03QJbTkhz1CluK6J4/lX0lviu8JAjE= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs= From 7a8597fa853409d2c3d6e886bf3130ab1dd23c2e Mon Sep 17 00:00:00 2001 From: luhaoling <2198702716@qq.com> Date: Fri, 2 Feb 2024 16:52:40 +0800 Subject: [PATCH 10/17] fix: fix the cycle detection --- pkg/common/db/unrelation/mongo.go | 1 - tools/component/component.go | 7 ++++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/common/db/unrelation/mongo.go b/pkg/common/db/unrelation/mongo.go index fe89c6b8a..d396f80cc 100644 --- a/pkg/common/db/unrelation/mongo.go +++ b/pkg/common/db/unrelation/mongo.go @@ -57,7 +57,6 @@ func NewMongo() (*Mongo, error) { return &Mongo{db: mongoClient}, nil } if shouldRetry(err) { - fmt.Printf("Failed to connect to MongoDB, retrying: %s\n", err) time.Sleep(time.Second) // exponential backoff could be implemented here continue } diff --git a/tools/component/component.go b/tools/component/component.go index 787ca8af6..aebd96b69 100644 --- a/tools/component/component.go +++ b/tools/component/component.go @@ -56,6 +56,7 @@ func initCfg() error { type checkFunc struct { name string function func() error + flag bool } func main() { @@ -87,11 +88,15 @@ func main() { var err error allSuccess := true for _, check := range checks { - err = check.function() + if !check.flag { + err = check.function() + continue + } if err != nil { component.ErrorPrint(fmt.Sprintf("Starting %s failed:%v.", check.name, err)) allSuccess = false } else { + check.flag = true component.SuccessPrint(fmt.Sprintf("%s connected successfully", check.name)) } } From 9d2497a8cb141e42661a39dac906748e00df0c1d Mon Sep 17 00:00:00 2001 From: luhaoling <2198702716@qq.com> Date: Fri, 2 Feb 2024 17:02:32 +0800 Subject: [PATCH 11/17] fix: fix the error --- tools/component/component.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/tools/component/component.go b/tools/component/component.go index aebd96b69..8ebafb3be 100644 --- a/tools/component/component.go +++ b/tools/component/component.go @@ -90,14 +90,13 @@ func main() { for _, check := range checks { if !check.flag { err = check.function() - continue - } - if err != nil { - component.ErrorPrint(fmt.Sprintf("Starting %s failed:%v.", check.name, err)) - allSuccess = false - } else { - check.flag = true - component.SuccessPrint(fmt.Sprintf("%s connected successfully", check.name)) + if err != nil { + component.ErrorPrint(fmt.Sprintf("Starting %s failed:%v.", check.name, err)) + allSuccess = false + } else { + check.flag = true + component.SuccessPrint(fmt.Sprintf("%s connected successfully", check.name)) + } } } From e99577b21fca0d3c8d2f9b52dfe31f110f44499d Mon Sep 17 00:00:00 2001 From: luhaoling <2198702716@qq.com> Date: Fri, 2 Feb 2024 17:08:02 +0800 Subject: [PATCH 12/17] fix: fix the flag --- tools/component/component.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/component/component.go b/tools/component/component.go index 8ebafb3be..32d887815 100644 --- a/tools/component/component.go +++ b/tools/component/component.go @@ -87,14 +87,14 @@ func main() { var err error allSuccess := true - for _, check := range checks { + for i, check := range checks { if !check.flag { err = check.function() if err != nil { component.ErrorPrint(fmt.Sprintf("Starting %s failed:%v.", check.name, err)) allSuccess = false } else { - check.flag = true + checks[i].flag = true component.SuccessPrint(fmt.Sprintf("%s connected successfully", check.name)) } } From 68e29cd36032c16dcc21712341bd9f52d21e7fe7 Mon Sep 17 00:00:00 2001 From: luhaoling <2198702716@qq.com> Date: Fri, 2 Feb 2024 17:17:27 +0800 Subject: [PATCH 13/17] fix: add mongo ping detection --- pkg/common/db/unrelation/mongo.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/common/db/unrelation/mongo.go b/pkg/common/db/unrelation/mongo.go index d396f80cc..6d2cdd300 100644 --- a/pkg/common/db/unrelation/mongo.go +++ b/pkg/common/db/unrelation/mongo.go @@ -54,6 +54,9 @@ func NewMongo() (*Mongo, error) { defer cancel() mongoClient, err = mongo.Connect(ctx, options.Client().ApplyURI(uri)) if err == nil { + if err = mongoClient.Ping(ctx, nil); err != nil { + return nil, errs.Wrap(err, uri) + } return &Mongo{db: mongoClient}, nil } if shouldRetry(err) { From fc1fa6ffbaad827a52c653cde4e787715f199f81 Mon Sep 17 00:00:00 2001 From: luhaoling <2198702716@qq.com> Date: Fri, 2 Feb 2024 18:41:25 +0800 Subject: [PATCH 14/17] fix: fix the tools pkg version --- go.mod | 2 +- go.sum | 4 ++-- tools/component/component.go | 9 +++++---- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index 16a10a945..d82d3390b 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.19 require ( firebase.google.com/go v3.13.0+incompatible github.com/OpenIMSDK/protocol v0.0.48 - github.com/OpenIMSDK/tools v0.0.32 + github.com/OpenIMSDK/tools v0.0.33 github.com/bwmarrin/snowflake v0.3.0 // indirect github.com/dtm-labs/rockscache v0.1.1 github.com/gin-gonic/gin v1.9.1 diff --git a/go.sum b/go.sum index 136035cef..fb047dfdc 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,8 @@ github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c= github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= github.com/OpenIMSDK/protocol v0.0.48 h1:8MIMjyzJRsruYhVv2ZKArFiOveroaofDOb3dlAdgjsw= github.com/OpenIMSDK/protocol v0.0.48/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= -github.com/OpenIMSDK/tools v0.0.32 h1:b8KwtxXKZTsyyHUcZ4OtSo6s/vVXx4HjMuPxH7Kb7Gg= -github.com/OpenIMSDK/tools v0.0.32/go.mod h1:wBfR5CYmEyvxl03QJbTkhz1CluK6J4/lX0lviu8JAjE= +github.com/OpenIMSDK/tools v0.0.33 h1:rvFCxXaXxLv1MJFC4qcoWRGwKBnV+hR68UN2N0/zZhE= +github.com/OpenIMSDK/tools v0.0.33/go.mod h1:wBfR5CYmEyvxl03QJbTkhz1CluK6J4/lX0lviu8JAjE= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs= diff --git a/tools/component/component.go b/tools/component/component.go index 32d887815..c0f3aebba 100644 --- a/tools/component/component.go +++ b/tools/component/component.go @@ -87,14 +87,15 @@ func main() { var err error allSuccess := true - for i, check := range checks { + for index, check := range checks { if !check.flag { err = check.function() if err != nil { component.ErrorPrint(fmt.Sprintf("Starting %s failed:%v.", check.name, err)) allSuccess = false + } else { - checks[i].flag = true + checks[index].flag = true component.SuccessPrint(fmt.Sprintf("%s connected successfully", check.name)) } } @@ -134,7 +135,7 @@ func checkMinio() error { SignEndpoint: config.Config.Object.Minio.SignEndpoint, UseSSL: getEnv("MINIO_USE_SSL", "false"), } - _, err := component.CheckMinio(minio) + err := component.CheckMinio(minio) return err } @@ -153,7 +154,7 @@ func checkKafka() error { Addr: config.Config.Kafka.Addr, } - _, kafkaClient, err := component.CheckKafka(kafkaStu) + kafkaClient, err := component.CheckKafka(kafkaStu) if err != nil { return err } From a23ae2294d8e8d6963d80b27d9bd7a0c5692ae5d Mon Sep 17 00:00:00 2001 From: luhaoling <2198702716@qq.com> Date: Fri, 2 Feb 2024 19:24:00 +0800 Subject: [PATCH 15/17] fix: del the err --- pkg/common/db/unrelation/mongo.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/common/db/unrelation/mongo.go b/pkg/common/db/unrelation/mongo.go index 6d2cdd300..2bade9a62 100644 --- a/pkg/common/db/unrelation/mongo.go +++ b/pkg/common/db/unrelation/mongo.go @@ -63,7 +63,6 @@ func NewMongo() (*Mongo, error) { time.Sleep(time.Second) // exponential backoff could be implemented here continue } - return nil, errs.Wrap(err, uri) } return nil, errs.Wrap(err, uri) } From e33c66ca6c8cc1d2881a56fe617841fecfdffc96 Mon Sep 17 00:00:00 2001 From: luhaoling <2198702716@qq.com> Date: Fri, 2 Feb 2024 20:23:24 +0800 Subject: [PATCH 16/17] fix: fix the minio nil error --- pkg/common/discoveryregister/zookeeper/zookeeper.go | 2 +- tools/component/component.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/common/discoveryregister/zookeeper/zookeeper.go b/pkg/common/discoveryregister/zookeeper/zookeeper.go index 0082e9833..9c58807c1 100644 --- a/pkg/common/discoveryregister/zookeeper/zookeeper.go +++ b/pkg/common/discoveryregister/zookeeper/zookeeper.go @@ -45,7 +45,7 @@ func NewZookeeperDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, er openkeeper.WithLogger(log.NewZkLogger()), ) if err != nil { - uriFormat := "address:%s, username :%s, password :%s, schema:%s." + uriFormat := "address:%s, username:%s, password:%s, schema:%s." errInfo := fmt.Sprintf(uriFormat, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Username, diff --git a/tools/component/component.go b/tools/component/component.go index c0f3aebba..bd6007015 100644 --- a/tools/component/component.go +++ b/tools/component/component.go @@ -15,6 +15,7 @@ package main import ( + "errors" "flag" "fmt" "github.com/IBM/sarama" @@ -125,7 +126,7 @@ func checkMinio() error { // Check if MinIO is enabled if config.Config.Object.Enable != "minio" { - return nil + return errs.Wrap(errors.New("minio.Enable is empty")) } minio := &component.Minio{ ApiURL: config.Config.Object.ApiURL, From ee8c7fd90432ef575ba9886001a6345d01925db0 Mon Sep 17 00:00:00 2001 From: luhaoling <2198702716@qq.com> Date: Fri, 2 Feb 2024 20:49:51 +0800 Subject: [PATCH 17/17] fix: del the repeated wrap and add err print --- cmd/openim-api/main.go | 3 ++- cmd/openim-cmdutils/main.go | 5 ++++- cmd/openim-crontask/main.go | 5 ++++- cmd/openim-msggateway/main.go | 5 ++++- cmd/openim-msgtransfer/main.go | 5 ++++- cmd/openim-push/main.go | 5 ++++- cmd/openim-rpc/openim-rpc-auth/main.go | 5 ++++- cmd/openim-rpc/openim-rpc-conversation/main.go | 5 ++++- cmd/openim-rpc/openim-rpc-friend/main.go | 5 ++++- cmd/openim-rpc/openim-rpc-group/main.go | 5 ++++- cmd/openim-rpc/openim-rpc-msg/main.go | 5 ++++- cmd/openim-rpc/openim-rpc-third/main.go | 5 ++++- cmd/openim-rpc/openim-rpc-user/main.go | 5 ++++- pkg/common/startrpc/start.go | 2 +- 14 files changed, 51 insertions(+), 14 deletions(-) diff --git a/cmd/openim-api/main.go b/cmd/openim-api/main.go index 59e0b7f9e..755b35591 100644 --- a/cmd/openim-api/main.go +++ b/cmd/openim-api/main.go @@ -43,7 +43,8 @@ func main() { apiCmd.AddPortFlag() apiCmd.AddApi(run) if err := apiCmd.Execute(); err != nil { - panic(err.Error()) + fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) + os.Exit(-1) } } diff --git a/cmd/openim-cmdutils/main.go b/cmd/openim-cmdutils/main.go index 058aa2e29..a13bc4f33 100644 --- a/cmd/openim-cmdutils/main.go +++ b/cmd/openim-cmdutils/main.go @@ -15,7 +15,9 @@ package main import ( + "fmt" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" + "os" ) func main() { @@ -54,6 +56,7 @@ func main() { // openIM clear msg --clearAll msgUtilsCmd.AddCommand(&getCmd.Command, &fixCmd.Command, &clearCmd.Command) if err := msgUtilsCmd.Execute(); err != nil { - panic(err) + fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) + os.Exit(-1) } } diff --git a/cmd/openim-crontask/main.go b/cmd/openim-crontask/main.go index 3bd0d882b..5061c5408 100644 --- a/cmd/openim-crontask/main.go +++ b/cmd/openim-crontask/main.go @@ -15,13 +15,16 @@ package main import ( + "fmt" "github.com/openimsdk/open-im-server/v3/internal/tools" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" + "os" ) func main() { cronTaskCmd := cmd.NewCronTaskCmd() if err := cronTaskCmd.Exec(tools.StartTask); err != nil { - panic(err.Error()) + fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) + os.Exit(-1) } } diff --git a/cmd/openim-msggateway/main.go b/cmd/openim-msggateway/main.go index 6d212e467..25cfe5fa8 100644 --- a/cmd/openim-msggateway/main.go +++ b/cmd/openim-msggateway/main.go @@ -15,7 +15,9 @@ package main import ( + "fmt" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" + "os" ) func main() { @@ -25,6 +27,7 @@ func main() { msgGatewayCmd.AddPrometheusPortFlag() if err := msgGatewayCmd.Exec(); err != nil { - panic(err.Error()) + fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) + os.Exit(-1) } } diff --git a/cmd/openim-msgtransfer/main.go b/cmd/openim-msgtransfer/main.go index 6895bcecc..e3c2f374f 100644 --- a/cmd/openim-msgtransfer/main.go +++ b/cmd/openim-msgtransfer/main.go @@ -15,7 +15,9 @@ package main import ( + "fmt" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" + "os" ) func main() { @@ -23,6 +25,7 @@ func main() { msgTransferCmd.AddPrometheusPortFlag() msgTransferCmd.AddTransferProgressFlag() if err := msgTransferCmd.Exec(); err != nil { - panic(err.Error()) + fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) + os.Exit(-1) } } diff --git a/cmd/openim-push/main.go b/cmd/openim-push/main.go index c19cfda60..79152f945 100644 --- a/cmd/openim-push/main.go +++ b/cmd/openim-push/main.go @@ -15,9 +15,11 @@ package main import ( + "fmt" "github.com/openimsdk/open-im-server/v3/internal/push" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "os" ) func main() { @@ -28,6 +30,7 @@ func main() { panic(err.Error()) } if err := pushCmd.StartSvr(config.Config.RpcRegisterName.OpenImPushName, push.Start); err != nil { - panic(err.Error()) + fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) + os.Exit(-1) } } diff --git a/cmd/openim-rpc/openim-rpc-auth/main.go b/cmd/openim-rpc/openim-rpc-auth/main.go index 645d8cab8..8ee7053f3 100644 --- a/cmd/openim-rpc/openim-rpc-auth/main.go +++ b/cmd/openim-rpc/openim-rpc-auth/main.go @@ -15,9 +15,11 @@ package main import ( + "fmt" "github.com/openimsdk/open-im-server/v3/internal/rpc/auth" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "os" ) func main() { @@ -28,6 +30,7 @@ func main() { panic(err.Error()) } if err := authCmd.StartSvr(config.Config.RpcRegisterName.OpenImAuthName, auth.Start); err != nil { - panic(err.Error()) + fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) + os.Exit(-1) } } diff --git a/cmd/openim-rpc/openim-rpc-conversation/main.go b/cmd/openim-rpc/openim-rpc-conversation/main.go index 13d7db605..259d79abf 100644 --- a/cmd/openim-rpc/openim-rpc-conversation/main.go +++ b/cmd/openim-rpc/openim-rpc-conversation/main.go @@ -15,9 +15,11 @@ package main import ( + "fmt" "github.com/openimsdk/open-im-server/v3/internal/rpc/conversation" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "os" ) func main() { @@ -28,6 +30,7 @@ func main() { panic(err.Error()) } if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImConversationName, conversation.Start); err != nil { - panic(err.Error()) + fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) + os.Exit(-1) } } diff --git a/cmd/openim-rpc/openim-rpc-friend/main.go b/cmd/openim-rpc/openim-rpc-friend/main.go index ec18306a2..bd85da7b0 100644 --- a/cmd/openim-rpc/openim-rpc-friend/main.go +++ b/cmd/openim-rpc/openim-rpc-friend/main.go @@ -15,9 +15,11 @@ package main import ( + "fmt" "github.com/openimsdk/open-im-server/v3/internal/rpc/friend" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "os" ) func main() { @@ -28,6 +30,7 @@ func main() { panic(err.Error()) } if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImFriendName, friend.Start); err != nil { - panic(err.Error()) + fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) + os.Exit(-1) } } diff --git a/cmd/openim-rpc/openim-rpc-group/main.go b/cmd/openim-rpc/openim-rpc-group/main.go index 887329926..e40c04d19 100644 --- a/cmd/openim-rpc/openim-rpc-group/main.go +++ b/cmd/openim-rpc/openim-rpc-group/main.go @@ -15,9 +15,11 @@ package main import ( + "fmt" "github.com/openimsdk/open-im-server/v3/internal/rpc/group" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "os" ) func main() { @@ -28,6 +30,7 @@ func main() { panic(err.Error()) } if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImGroupName, group.Start); err != nil { - panic(err.Error()) + fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) + os.Exit(-1) } } diff --git a/cmd/openim-rpc/openim-rpc-msg/main.go b/cmd/openim-rpc/openim-rpc-msg/main.go index dcc3abef5..989746dc7 100644 --- a/cmd/openim-rpc/openim-rpc-msg/main.go +++ b/cmd/openim-rpc/openim-rpc-msg/main.go @@ -15,9 +15,11 @@ package main import ( + "fmt" "github.com/openimsdk/open-im-server/v3/internal/rpc/msg" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "os" ) func main() { @@ -28,6 +30,7 @@ func main() { panic(err.Error()) } if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImMsgName, msg.Start); err != nil { - panic(err.Error()) + fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) + os.Exit(-1) } } diff --git a/cmd/openim-rpc/openim-rpc-third/main.go b/cmd/openim-rpc/openim-rpc-third/main.go index cf0bf4b70..fbd386946 100644 --- a/cmd/openim-rpc/openim-rpc-third/main.go +++ b/cmd/openim-rpc/openim-rpc-third/main.go @@ -15,9 +15,11 @@ package main import ( + "fmt" "github.com/openimsdk/open-im-server/v3/internal/rpc/third" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "os" ) func main() { @@ -28,6 +30,7 @@ func main() { panic(err.Error()) } if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImThirdName, third.Start); err != nil { - panic(err.Error()) + fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) + os.Exit(-1) } } diff --git a/cmd/openim-rpc/openim-rpc-user/main.go b/cmd/openim-rpc/openim-rpc-user/main.go index cbf2a8fc3..160bbe311 100644 --- a/cmd/openim-rpc/openim-rpc-user/main.go +++ b/cmd/openim-rpc/openim-rpc-user/main.go @@ -15,9 +15,11 @@ package main import ( + "fmt" "github.com/openimsdk/open-im-server/v3/internal/rpc/user" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "os" ) func main() { @@ -28,6 +30,7 @@ func main() { panic(err.Error()) } if err := rpcCmd.StartSvr(config.Config.RpcRegisterName.OpenImUserName, user.Start); err != nil { - panic(err.Error()) + fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) + os.Exit(-1) } } diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 31fe4fdd5..5ca64ea76 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -96,7 +96,7 @@ func Start( err = rpcFn(client, srv) if err != nil { - return errs.Wrap(err) + return err } err = client.Register( rpcRegisterName,