diff --git a/pkg/common/db/cache/init_redis.go b/pkg/common/db/cache/init_redis.go index c593089e6..3cec73be5 100644 --- a/pkg/common/db/cache/init_redis.go +++ b/pkg/common/db/cache/init_redis.go @@ -77,9 +77,10 @@ func NewRedis() (redis.UniversalClient, error) { defer cancel() err = rdb.Ping(ctx).Err() if err != nil { - return nil, errs.Wrap(fmt.Errorf("redis ping %w", err)) + uriFormat := "address:%s, username:%s, password:%s, clusterMode:%t, enablePipeline:%t" + errMsg := fmt.Sprintf(uriFormat, config.Config.Redis.Address, config.Config.Redis.Username, config.Config.Redis.Password, config.Config.Redis.ClusterMode, config.Config.Redis.EnablePipeline) + return nil, errs.Wrap(err, errMsg) } - redisClient = rdb return rdb, err } diff --git a/pkg/common/db/unrelation/mongo.go b/pkg/common/db/unrelation/mongo.go index e07c5b7f4..fe89c6b8a 100644 --- a/pkg/common/db/unrelation/mongo.go +++ b/pkg/common/db/unrelation/mongo.go @@ -61,9 +61,9 @@ func NewMongo() (*Mongo, error) { time.Sleep(time.Second) // exponential backoff could be implemented here continue } - return nil, errs.Wrap(err) + return nil, errs.Wrap(err, uri) } - return nil, errs.Wrap(err) + return nil, errs.Wrap(err, uri) } func buildMongoURI() string { diff --git a/pkg/common/discoveryregister/zookeeper/zookeeper.go b/pkg/common/discoveryregister/zookeeper/zookeeper.go index db1a5c6c4..0082e9833 100644 --- a/pkg/common/discoveryregister/zookeeper/zookeeper.go +++ b/pkg/common/discoveryregister/zookeeper/zookeeper.go @@ -15,6 +15,8 @@ package zookeeper import ( + "fmt" + "github.com/OpenIMSDK/tools/errs" "os" "strings" "time" @@ -33,7 +35,7 @@ func NewZookeeperDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, er username := getEnv("ZOOKEEPER_USERNAME", config.Config.Zookeeper.Username) password := getEnv("ZOOKEEPER_PASSWORD", config.Config.Zookeeper.Password) - return openkeeper.NewClient( + zk, err := openkeeper.NewClient( zkAddr, schema, openkeeper.WithFreq(time.Hour), @@ -42,6 +44,16 @@ func NewZookeeperDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, er openkeeper.WithTimeout(10), openkeeper.WithLogger(log.NewZkLogger()), ) + if err != nil { + uriFormat := "address:%s, username :%s, password :%s, schema:%s." + errInfo := fmt.Sprintf(uriFormat, + config.Config.Zookeeper.ZkAddr, + config.Config.Zookeeper.Username, + config.Config.Zookeeper.Password, + config.Config.Zookeeper.Schema) + return nil, errs.Wrap(err, errInfo) + } + return zk, nil } // getEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value. diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index 05006f582..87e6d5686 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -51,7 +51,7 @@ func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []str SetupTLSConfig(consumerGroupConfig) consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, consumerGroupConfig) if err != nil { - return nil, errs.Wrap(err, strings.Join(topics, ","), strings.Join(addrs, ","), groupID) + return nil, errs.Wrap(err, strings.Join(topics, ","), strings.Join(addrs, ","), groupID, config.Config.Kafka.Username, config.Config.Kafka.Password) } return &MConsumerGroup{ consumerGroup, diff --git a/tools/component/component.go b/tools/component/component.go index 71422ce77..3624c03c9 100644 --- a/tools/component/component.go +++ b/tools/component/component.go @@ -15,16 +15,15 @@ package main import ( + "errors" "flag" "fmt" "github.com/IBM/sarama" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/cos" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/minio" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/oss" "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/zookeeper" "github.com/openimsdk/open-im-server/v3/pkg/common/kafka" + "log" "os" "strings" "time" @@ -38,14 +37,18 @@ import ( const ( // defaultCfgPath is the default path of the configuration file. - defaultCfgPath = "../../../../../config/config.yaml" - maxRetry = 300 - componentStartErrCode = 6000 + defaultCfgPath = "../../../../../config/config.yaml" + maxRetry = 300 + colorRed = 31 ) var ( - cfgPath = flag.String("c", defaultCfgPath, "Path to the configuration file") - ErrComponentStart = errs.NewCodeError(componentStartErrCode, "ComponentStartErr") + cfgPath = flag.String("c", defaultCfgPath, "Path to the configuration file") + MongoAuthFailed = "Authentication failed." + RedisAuthFailed = "NOAUTH Authentication required." + MinioAuthFailed = "Minio Authentication failed" + ZkAuthFailed = "zk Authentication failed" + KafkaAuthFailed = "SASL Authentication failed" ) func initCfg() error { @@ -59,7 +62,8 @@ func initCfg() error { type checkFunc struct { name string - function func() (string, error) + function func() error + authInfo string } func main() { @@ -75,11 +79,11 @@ func main() { checks := []checkFunc{ //{name: "Mysql", function: checkMysql}, - {name: "Mongo", function: checkMongo}, - {name: "Redis", function: checkRedis}, - {name: "Minio", function: checkMinio}, - {name: "Zookeeper", function: checkZookeeper}, - {name: "Kafka", function: checkKafka}, + {name: "Mongo", function: checkMongo, authInfo: MongoAuthFailed}, + {name: "Redis", function: checkRedis, authInfo: RedisAuthFailed}, + {name: "Minio", function: checkMinio, authInfo: MinioAuthFailed}, + {name: "Zookeeper", function: checkZookeeper, authInfo: ZkAuthFailed}, + {name: "Kafka", function: checkKafka, authInfo: KafkaAuthFailed}, } for i := 0; i < maxRetry; i++ { @@ -89,25 +93,24 @@ func main() { fmt.Printf("Checking components Round %v...\n", i+1) var ( - err error - errInfo string + err error + errInfo string + disruptions bool ) allSuccess := true - disruptions := true for _, check := range checks { - errInfo, err = check.function() + err = check.function() if err != nil { - component.ErrorPrint(fmt.Sprintf("Starting %s failed, %v, the conneted info is:%s", check.name, err, errInfo)) - fmt.Fprintln(os.Stderr, errInfo, err) + if errorJudge(err, check.authInfo) { + disruptions = true + } + ErrorPrint(fmt.Sprintf("Starting %s failed, %v, the conneted info is:%s", check.name, err, errInfo)) allSuccess = false break } else { component.SuccessPrint(fmt.Sprintf("%s connected successfully, the addr is:%s", check.name, errInfo)) - fmt.Fprintln(os.Stderr, errInfo, err) - } - if check.name == "kafka" && errs.Unwrap(err) == ErrComponentStart { - disruptions = false } + } if allSuccess { @@ -117,76 +120,50 @@ func main() { if disruptions { component.ErrorPrint(fmt.Sprintf("component check exit,err: %v", err)) - fmt.Fprintln(os.Stderr, errInfo, err) return } } } // checkMongo checks the MongoDB connection without retries -func checkMongo() (string, error) { +func checkMongo() error { _, err := unrelation.NewMongo() - if err != nil { - if config.Config.Mongo.Uri != "" { - return config.Config.Mongo.Uri, err - } - uriFormat := "mongodb://%s/%s?maxPoolSize=%s" - if config.Config.Mongo.Username != "" && config.Config.Mongo.Password != "" { - uriFormat = "mongodb://%s:%s@%s/%s?maxPoolSize=%s" - return fmt.Sprintf(uriFormat, config.Config.Mongo.Username, config.Config.Mongo.Password, config.Config.Mongo.Address, config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize), err - } - return fmt.Sprintf(uriFormat, config.Config.Mongo.Address, config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize), err - } - return strings.Join(config.Config.Mongo.Address, ","), nil + return err } // checkRedis checks the Redis connection -func checkRedis() (string, error) { +func checkRedis() error { _, err := cache.NewRedis() - if err != nil { - uriFormat := "The username is:%s, the password is:%s, the address is:%s, the clusterMode is:%t" - return fmt.Sprintf(uriFormat, config.Config.Redis.Username, config.Config.Redis.Password, config.Config.Redis.Address, config.Config.Redis.ClusterMode), err - } - return strings.Join(config.Config.Redis.Address, ","), err + return err } // checkMinio checks the MinIO connection -func checkMinio() (string, error) { - - rdb, err := cache.NewRedis() - - enable := config.Config.Object.Enable - switch config.Config.Object.Enable { - case "minio": - _, err = minio.NewMinio(cache.NewMinioCache(rdb)) - case "cos": - _, err = cos.NewCos() - case "oss": - _, err = oss.NewOSS() - default: - err = fmt.Errorf("invalid object enable: %s", enable) +func checkMinio() error { + + // Check if MinIO is enabled + if config.Config.Object.Enable != "minio" { + return nil } - if err != nil { - uriFormat := "The apiURL is:%s, the endpoint is:%s, the signEndpoint is:%s." - return fmt.Sprintf(uriFormat, config.Config.Object.ApiURL, config.Config.Object.Minio.Endpoint, config.Config.Object.Minio.SignEndpoint), err + minio := &component.Minio{ + ApiURL: config.Config.Object.ApiURL, + Endpoint: config.Config.Object.Minio.Endpoint, + AccessKeyID: config.Config.Object.Minio.AccessKeyID, + SecretAccessKey: config.Config.Object.Minio.SecretAccessKey, + SignEndpoint: config.Config.Object.Minio.SignEndpoint, + UseSSL: getEnv("MINIO_USE_SSL", "false"), } - return config.Config.Object.Minio.Endpoint, nil + _, err := component.CheckMinio(minio) + return err } // checkZookeeper checks the Zookeeper connection -func checkZookeeper() (string, error) { +func checkZookeeper() error { _, err := zookeeper.NewZookeeperDiscoveryRegister() - if err != nil { - if config.Config.Zookeeper.Username != "" && config.Config.Zookeeper.Password != "" { - return fmt.Sprintf("The addr is:%s,the schema is:%s, the username is:%s, the password is:%s.", config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, config.Config.Zookeeper.Username, config.Config.Zookeeper.Password), err - } - return fmt.Sprintf("The addr is:%s,the schema is:%s", config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema), err - } - return strings.Join(config.Config.Zookeeper.ZkAddr, ","), nil + return err } // checkKafka checks the Kafka connection -func checkKafka() (string, error) { +func checkKafka() error { // Prioritize environment variables kafkaStu := &component.Kafka{ @@ -195,16 +172,16 @@ func checkKafka() (string, error) { Addr: config.Config.Kafka.Addr, } - str, kafkaClient, err := component.CheckKafka(kafkaStu) + _, kafkaClient, err := component.CheckKafka(kafkaStu) if err != nil { - return "", err + return err } defer kafkaClient.Close() // Verify if necessary topics exist topics, err := kafkaClient.Topics() if err != nil { - return "", errs.Wrap(err) + return errs.Wrap(err) } requiredTopics := []string{ @@ -215,29 +192,38 @@ func checkKafka() (string, error) { for _, requiredTopic := range requiredTopics { if !isTopicPresent(requiredTopic, topics) { - return "", ErrComponentStart.Wrap(fmt.Sprintf("Kafka doesn't contain topic: %v", requiredTopic)) + return errs.Wrap(err, fmt.Sprintf("Kafka doesn't contain topic: %v", requiredTopic)) } } - kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{ + _, err = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{ KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, }, []string{config.Config.Kafka.LatestMsgToRedis.Topic}, config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis) + if err != nil { + return err + } - kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{ + _, err = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{ KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, - }, []string{config.Config.Kafka.MsgToMongo.Topic}, + }, []string{config.Config.Kafka.MsgToPush.Topic}, config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo) + if err != nil { + return err + } kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{ KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, }, []string{config.Config.Kafka.MsgToPush.Topic}, config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToPush) + if err != nil { + return err + } - return str, nil + return nil } // isTopicPresent checks if a topic is present in the list of topics @@ -251,13 +237,27 @@ func isTopicPresent(topic string, topics []string) bool { } func configGetEnv() { + config.Config.Object.Minio.AccessKeyID = getEnv("MINIO_ACCESS_KEY_ID", config.Config.Object.Minio.AccessKeyID) + config.Config.Object.Minio.SecretAccessKey = getEnv("MINIO_SECRET_ACCESS_KEY", config.Config.Object.Minio.SecretAccessKey) config.Config.Mongo.Uri = getEnv("MONGO_URI", config.Config.Mongo.Uri) config.Config.Mongo.Username = getEnv("MONGO_OPENIM_USERNAME", config.Config.Mongo.Username) config.Config.Mongo.Password = getEnv("MONGO_OPENIM_PASSWORD", config.Config.Mongo.Password) config.Config.Kafka.Username = getEnv("KAFKA_USERNAME", config.Config.Kafka.Username) config.Config.Kafka.Password = getEnv("KAFKA_PASSWORD", config.Config.Kafka.Password) config.Config.Kafka.Addr = strings.Split(getEnv("KAFKA_ADDRESS", strings.Join(config.Config.Kafka.Addr, ",")), ",") + config.Config.Object.Minio.Endpoint = getMinioAddr("MINIO_ENDPOINT", "MINIO_ADDRESS", "MINIO_PORT", config.Config.Object.Minio.Endpoint) +} +func getMinioAddr(key1, key2, key3, fallback string) string { + // Prioritize environment variables + endpoint := getEnv(key1, fallback) + address, addressExist := os.LookupEnv(key2) + port, portExist := os.LookupEnv(key3) + if portExist && addressExist { + endpoint = "http://" + address + ":" + port + return endpoint + } + return endpoint } // Helper function to get environment variable or default value @@ -267,3 +267,18 @@ func getEnv(key, fallback string) string { } return fallback } + +func ErrorPrint(s string) { + colorPrint(colorRed, "%v", s) +} + +func colorPrint(colorCode int, format string, a ...interface{}) { + log.Printf("\x1b[%dm%s\x1b[0m\n", colorCode, fmt.Sprintf(format, a...)) +} + +func errorJudge(err error, errMsg string) bool { + if strings.Contains(errors.Unwrap(err).Error(), errMsg) { + return true + } + return false +}