// Copyright © 2023 OpenIM. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "errors" "flag" "fmt" "os" "strconv" "strings" "time" "github.com/IBM/sarama" "github.com/openimsdk/open-im-server/v3/pkg/common/kafka" "github.com/OpenIMSDK/tools/component" "github.com/OpenIMSDK/tools/errs" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "gopkg.in/yaml.v3" ) const ( // defaultCfgPath is the default path of the configuration file. defaultCfgPath = "../../../../../config/config.yaml" maxRetry = 100 ) var ( cfgPath = flag.String("c", defaultCfgPath, "Path to the configuration file") ) func initCfg() (*config.GlobalConfig, error) { data, err := os.ReadFile(*cfgPath) if err != nil { return nil, errs.Wrap(err, "ReadFile unmarshal failed") } conf := config.NewGlobalConfig() err = yaml.Unmarshal(data, &conf) if err != nil { return nil, errs.Wrap(err, "InitConfig unmarshal failed") } return conf, nil } type checkFunc struct { name string function func(*config.GlobalConfig) error flag bool config *config.GlobalConfig } func main() { flag.Parse() conf, err := initCfg() if err != nil { fmt.Printf("Read config failed: %v\n", err) return } err = configGetEnv(conf) if err != nil { fmt.Printf("configGetEnv failed,err:%v", err) return } checks := []checkFunc{ //{name: "Mysql", function: checkMysql}, {name: "Mongo", function: checkMongo, config: conf}, {name: "Redis", function: checkRedis, config: conf}, {name: "Minio", function: checkMinio, config: conf}, {name: "Zookeeper", function: checkZookeeper, config: conf}, {name: "Kafka", function: checkKafka, config: conf}, } for i := 0; i < maxRetry; i++ { if i != 0 { time.Sleep(1 * time.Second) } fmt.Printf("Checking components Round %v...\n", i+1) var err error allSuccess := true for index, check := range checks { if !check.flag { err = check.function(check.config) if err != nil { if errors.Is(err, errMinioNotEnabled) { fmt.Println(err.Error()) checks[index].flag = true } if errors.Is(err, errSignEndPoint) { fmt.Fprintf(os.Stderr, err.Error()) checks[index].flag = true } component.ErrorPrint(fmt.Sprintf("Starting %s failed:%v.", check.name, errs.Unwrap(err).Error())) if !strings.Contains(errs.Unwrap(err).Error(), "connection refused") && !strings.Contains(errs.Unwrap(err).Error(), "timeout waiting") { component.ErrorPrint("Some components started failed!") os.Exit(-1) } } else { checks[index].flag = true component.SuccessPrint(fmt.Sprintf("%s connected successfully", check.name)) } } } if allSuccess { component.SuccessPrint("All components started successfully!") return } } component.ErrorPrint("Some components started failed!") os.Exit(-1) } var errMinioNotEnabled = errors.New("minio.Enable is not configured to use MinIO") var errSignEndPoint = errors.New("minio.signEndPoint contains 127.0.0.1, causing issues with image sending") var errApiURL = errors.New("object.apiURL contains 127.0.0.1, causing issues with image sending") // checkMongo checks the MongoDB connection without retries func checkMongo(config *config.GlobalConfig) error { mongoStu := &component.Mongo{ URL: config.Mongo.Uri, Address: config.Mongo.Address, Database: config.Mongo.Database, Username: config.Mongo.Username, Password: config.Mongo.Password, MaxPoolSize: config.Mongo.MaxPoolSize, } err := component.CheckMongo(mongoStu) return err } // checkRedis checks the Redis connection func checkRedis(config *config.GlobalConfig) error { redisStu := &component.Redis{ Address: config.Redis.Address, Username: config.Redis.Username, Password: config.Redis.Password, } err := component.CheckRedis(redisStu) return err } // checkMinio checks the MinIO connection func checkMinio(config *config.GlobalConfig) error { if strings.Contains(config.Object.ApiURL, "127.0.0.1") { return errs.Wrap(errApiURL, "config.Object.ApiURL: "+config.Object.ApiURL) } if config.Object.Enable != "minio" { return errs.Wrap(errMinioNotEnabled, "config.Object.Enable: "+config.Object.Enable) } if strings.Contains(config.Object.Minio.Endpoint, "127.0.0.1") { return errs.Wrap(errSignEndPoint, "config.Object.Minio.Endpoint: "+config.Object.Minio.Endpoint) } minio := &component.Minio{ ApiURL: config.Object.ApiURL, Endpoint: config.Object.Minio.Endpoint, AccessKeyID: config.Object.Minio.AccessKeyID, SecretAccessKey: config.Object.Minio.SecretAccessKey, SignEndpoint: config.Object.Minio.SignEndpoint, UseSSL: getEnv("MINIO_USE_SSL", "false"), } err := component.CheckMinio(minio) return err } // checkZookeeper checks the Zookeeper connection func checkZookeeper(config *config.GlobalConfig) error { zkStu := &component.Zookeeper{ Schema: config.Zookeeper.Schema, ZkAddr: config.Zookeeper.ZkAddr, Username: config.Zookeeper.Username, Password: config.Zookeeper.Password, } err := component.CheckZookeeper(zkStu) return err } // checkKafka checks the Kafka connection func checkKafka(config *config.GlobalConfig) error { // Prioritize environment variables kafkaStu := &component.Kafka{ Username: config.Kafka.Username, Password: config.Kafka.Password, Addr: config.Kafka.Addr, } kafkaClient, err := component.CheckKafka(kafkaStu) if err != nil { return err } defer kafkaClient.Close() // Verify if necessary topics exist topics, err := kafkaClient.Topics() if err != nil { return errs.Wrap(err) } requiredTopics := []string{ config.Kafka.MsgToMongo.Topic, config.Kafka.MsgToPush.Topic, config.Kafka.LatestMsgToRedis.Topic, } for _, requiredTopic := range requiredTopics { if !isTopicPresent(requiredTopic, topics) { return errs.Wrap(err, fmt.Sprintf("Kafka doesn't contain topic: %v", requiredTopic)) } } var tlsConfig *kafka.TLSConfig if config.Kafka.TLS != nil { tlsConfig = &kafka.TLSConfig{ CACrt: config.Kafka.TLS.CACrt, ClientCrt: config.Kafka.TLS.ClientCrt, ClientKey: config.Kafka.TLS.ClientKey, ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd, InsecureSkipVerify: config.Kafka.TLS.InsecureSkipVerify, } } _, err = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{ KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, UserName: config.Kafka.Username, Password: config.Kafka.Password, }, []string{config.Kafka.LatestMsgToRedis.Topic}, config.Kafka.Addr, config.Kafka.ConsumerGroupID.MsgToRedis, tlsConfig) if err != nil { return err } _, err = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{ KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, }, []string{config.Kafka.MsgToPush.Topic}, config.Kafka.Addr, config.Kafka.ConsumerGroupID.MsgToMongo, tlsConfig) if err != nil { return err } kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{ KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, }, []string{config.Kafka.MsgToPush.Topic}, config.Kafka.Addr, config.Kafka.ConsumerGroupID.MsgToPush, tlsConfig) if err != nil { return err } return nil } // isTopicPresent checks if a topic is present in the list of topics func isTopicPresent(topic string, topics []string) bool { for _, t := range topics { if t == topic { return true } } return false } func configGetEnv(config *config.GlobalConfig) error { config.Mongo.Uri = getEnv("MONGO_URI", config.Mongo.Uri) config.Mongo.Username = getEnv("MONGO_OPENIM_USERNAME", config.Mongo.Username) config.Mongo.Password = getEnv("MONGO_OPENIM_PASSWORD", config.Mongo.Password) config.Mongo.Address = getArrEnv("MONGO_ADDRESS", "MONGO_PORT", config.Mongo.Address) config.Mongo.Database = getEnv("MONGO_DATABASE", config.Mongo.Database) maxPoolSize, err := getEnvInt("MONGO_DATABASE", config.Mongo.MaxPoolSize) if err != nil { return err } config.Mongo.MaxPoolSize = maxPoolSize config.Redis.Username = getEnv("REDIS_USERNAME", config.Redis.Username) config.Redis.Password = getEnv("REDIS_PASSWORD", config.Redis.Password) config.Redis.Address = getArrEnv("REDIS_ADDRESS", "REDIS_PASSWORD", config.Redis.Address) config.Object.ApiURL = getEnv("OBJECT_APIURL", config.Object.ApiURL) config.Object.Minio.Endpoint = getEnv("MINIO_ENDPOINT", config.Object.Minio.Endpoint) config.Object.Minio.AccessKeyID = getEnv("MINIO_ACCESS_KEY_ID", config.Object.Minio.AccessKeyID) config.Object.Minio.SecretAccessKey = getEnv("MINIO_SECRET_ACCESS_KEY", config.Object.Minio.SecretAccessKey) config.Object.Minio.SignEndpoint = getEnv("MINIO_SIGN_ENDPOINT", config.Object.Minio.SignEndpoint) config.Zookeeper.Schema = getEnv("ZOOKEEPER_SCHEMA", config.Zookeeper.Schema) config.Zookeeper.ZkAddr = getArrEnv("ZOOKEEPER_ADDRESS", "ZOOKEEPER_PORT", config.Zookeeper.ZkAddr) config.Zookeeper.Username = getEnv("ZOOKEEPER_USERNAME", config.Zookeeper.Username) config.Zookeeper.Password = getEnv("ZOOKEEPER_PASSWORD", config.Zookeeper.Password) config.Kafka.Username = getEnv("KAFKA_USERNAME", config.Kafka.Username) config.Kafka.Password = getEnv("KAFKA_PASSWORD", config.Kafka.Password) config.Kafka.Addr = getArrEnv("KAFKA_ADDRESS", "KAFKA_PORT", config.Kafka.Addr) config.Object.Minio.Endpoint = getMinioAddr("MINIO_ENDPOINT", "MINIO_ADDRESS", "MINIO_PORT", config.Object.Minio.Endpoint) return nil } func getMinioAddr(key1, key2, key3, fallback string) string { // Prioritize environment variables endpoint := getEnv(key1, fallback) address, addressExist := os.LookupEnv(key2) port, portExist := os.LookupEnv(key3) if portExist && addressExist { endpoint = "http://" + address + ":" + port return endpoint } return endpoint } // Helper function to get environment variable or default value func getEnv(key, fallback string) string { if value, exists := os.LookupEnv(key); exists { return value } return fallback } // Helper function to get environment variable or default value func getEnvInt(key string, fallback int) (int, error) { if value, exists := os.LookupEnv(key); exists { val, err := strconv.Atoi(value) if err != nil { return 0, errs.Wrap(err, "string to int failed") } return val, nil } return fallback, nil } func getArrEnv(key1, key2 string, fallback []string) []string { address, addrExists := os.LookupEnv(key1) port, portExists := os.LookupEnv(key2) if addrExists && portExists { addresses := strings.Split(address, ",") for i, addr := range addresses { addresses[i] = addr + ":" + port } return addresses } if addrExists && !portExists { addresses := strings.Split(address, ",") for i, addr := range addresses { addresses[i] = addr + ":" + "0" } return addresses } if !addrExists && portExists { result := make([]string, len(fallback)) for i, addr := range fallback { add := strings.Split(addr, ":") result[i] = add[0] + ":" + port } return result } return fallback }