
400 lines
12 KiB

// Copyright © 2023 OpenIM. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
const (
// defaultCfgPath is the default path of the configuration file.
defaultCfgPath = "../../../../../config/config.yaml"
maxRetry = 100
var (
cfgPath = flag.String("c", defaultCfgPath, "Path to the configuration file")
func initCfg() (*config.GlobalConfig, error) {
data, err := os.ReadFile(*cfgPath)
if err != nil {
return nil, errs.Wrap(err, "ReadFile unmarshal failed")
conf := config.NewGlobalConfig()
err = yaml.Unmarshal(data, &conf)
if err != nil {
return nil, errs.Wrap(err, "InitConfig unmarshal failed")
return conf, nil
type checkFunc struct {
name string
function func(*config.GlobalConfig) error
flag bool
config *config.GlobalConfig
// colorErrPrint prints formatted string in red to stderr
func colorErrPrint(msg string) {
// ANSI escape code for red text
const redColor = "\033[31m"
// ANSI escape code to reset color
const resetColor = "\033[0m"
msg = redColor + msg + resetColor
// Print to stderr in red
fmt.Fprintf(os.Stderr, "%s\n", msg)
func colorSuccessPrint(format string, a ...interface{}) {
// ANSI escape code for green text is \033[32m
// \033[0m resets the color
fmt.Printf("\033[32m"+format+"\033[0m", a...)
func main() {
conf, err := initCfg()
if err != nil {
fmt.Printf("Read config failed: %v\n", err)
err = configGetEnv(conf)
if err != nil {
fmt.Printf("configGetEnv failed, err:%v", err)
checks := []checkFunc{
//{name: "Mysql", function: checkMysql},
{name: "Mongo", function: checkMongo, config: conf},
{name: "Redis", function: checkRedis, config: conf},
{name: "Minio", function: checkMinio, config: conf},
{name: "Zookeeper", function: checkZookeeper, config: conf},
{name: "Kafka", function: checkKafka, config: conf},
for i := 0; i < maxRetry; i++ {
if i != 0 {
time.Sleep(1 * time.Second)
fmt.Printf("Checking components round %v...\n", i+1)
var err error
allSuccess := true
for index, check := range checks {
if !check.flag {
err = check.function(check.config)
if err != nil {
allSuccess = false
colorErrPrint(fmt.Sprintf("Check component: %s, failed: %v",, err.Error()))
if == "Minio" {
if errors.Is(err, errMinioNotEnabled) ||
errors.Is(err, errSignEndPoint) ||
errors.Is(err, errApiURL) {
checks[index].flag = true
} else {
checks[index].flag = true
component.SuccessPrint(fmt.Sprintf("%s connected successfully",
if allSuccess {
component.SuccessPrint("All components started successfully!")
component.ErrorPrint("Some components checked failed!")
var errMinioNotEnabled = errors.New("minio.Enable is not configured to use MinIO")
var errSignEndPoint = errors.New("minio.signEndPoint contains, causing issues with image sending")
var errApiURL = errors.New("object.apiURL contains, causing issues with image sending")
// checkMongo checks the MongoDB connection without retries
func checkMongo(config *config.GlobalConfig) error {
mongoStu := &component.Mongo{
URL: config.Mongo.Uri,
Address: config.Mongo.Address,
Database: config.Mongo.Database,
Username: config.Mongo.Username,
Password: config.Mongo.Password,
MaxPoolSize: config.Mongo.MaxPoolSize,
err := component.CheckMongo(mongoStu)
return err
// checkRedis checks the Redis connection
func checkRedis(config *config.GlobalConfig) error {
redisStu := &component.Redis{
Address: config.Redis.Address,
Username: config.Redis.Username,
Password: config.Redis.Password,
err := component.CheckRedis(redisStu)
return err
// checkMinio checks the MinIO connection
func checkMinio(config *config.GlobalConfig) error {
if strings.Contains(config.Object.ApiURL, "") {
return errs.Wrap(errApiURL)
if config.Object.Enable != "minio" {
return errs.Wrap(errMinioNotEnabled)
if strings.Contains(config.Object.Minio.Endpoint, "") {
return errs.Wrap(errSignEndPoint)
minio := &component.Minio{
ApiURL: config.Object.ApiURL,
Endpoint: config.Object.Minio.Endpoint,
AccessKeyID: config.Object.Minio.AccessKeyID,
SecretAccessKey: config.Object.Minio.SecretAccessKey,
SignEndpoint: config.Object.Minio.SignEndpoint,
UseSSL: getEnv("MINIO_USE_SSL", "false"),
err := component.CheckMinio(minio)
return err
// checkZookeeper checks the Zookeeper connection
func checkZookeeper(config *config.GlobalConfig) error {
zkStu := &component.Zookeeper{
Schema: config.Zookeeper.Schema,
ZkAddr: config.Zookeeper.ZkAddr,
Username: config.Zookeeper.Username,
Password: config.Zookeeper.Password,
err := component.CheckZookeeper(zkStu)
return err
// checkKafka checks the Kafka connection
func checkKafka(config *config.GlobalConfig) error {
// Prioritize environment variables
kafkaStu := &component.Kafka{
Username: config.Kafka.Username,
Password: config.Kafka.Password,
Addr: config.Kafka.Addr,
kafkaClient, err := component.CheckKafka(kafkaStu)
if err != nil {
return err
defer kafkaClient.Close()
// Verify if necessary topics exist
topics, err := kafkaClient.Topics()
if err != nil {
return errs.Wrap(err)
requiredTopics := []string{
for _, requiredTopic := range requiredTopics {
if !isTopicPresent(requiredTopic, topics) {
return errs.Wrap(err, fmt.Sprintf("Kafka doesn't contain topic: %v", requiredTopic))
var tlsConfig *kafka.TLSConfig
if config.Kafka.TLS != nil {
tlsConfig = &kafka.TLSConfig{
CACrt: config.Kafka.TLS.CACrt,
ClientCrt: config.Kafka.TLS.ClientCrt,
ClientKey: config.Kafka.TLS.ClientKey,
ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd,
InsecureSkipVerify: config.Kafka.TLS.InsecureSkipVerify,
_, err = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{
KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest,
IsReturnErr: false,
UserName: config.Kafka.Username,
Password: config.Kafka.Password,
}, []string{config.Kafka.LatestMsgToRedis.Topic},
config.Kafka.Addr, config.Kafka.ConsumerGroupID.MsgToRedis, tlsConfig)
if err != nil {
return err
_, err = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{
KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false,
}, []string{config.Kafka.MsgToMongo.Topic},
config.Kafka.Addr, config.Kafka.ConsumerGroupID.MsgToMongo, tlsConfig)
if err != nil {
return err
_, err = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{
KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false,
}, []string{config.Kafka.MsgToPush.Topic}, config.Kafka.Addr,
config.Kafka.ConsumerGroupID.MsgToPush, tlsConfig)
if err != nil {
return err
return nil
// isTopicPresent checks if a topic is present in the list of topics
func isTopicPresent(topic string, topics []string) bool {
for _, t := range topics {
if t == topic {
return true
return false
func configGetEnv(config *config.GlobalConfig) error {
config.Mongo.Uri = getEnv("MONGO_URI", config.Mongo.Uri)
config.Mongo.Username = getEnv("MONGO_OPENIM_USERNAME", config.Mongo.Username)
config.Mongo.Password = getEnv("MONGO_OPENIM_PASSWORD", config.Mongo.Password)
config.Mongo.Address = getArrEnv("MONGO_ADDRESS", "MONGO_PORT", config.Mongo.Address)
config.Mongo.Database = getEnv("MONGO_DATABASE", config.Mongo.Database)
maxPoolSize, err := getEnvInt("MONGO_MAX_POOL_SIZE", config.Mongo.MaxPoolSize)
if err != nil {
return errs.Wrap(err, "MONGO_MAX_POOL_SIZE")
config.Mongo.MaxPoolSize = maxPoolSize
config.Redis.Username = getEnv("REDIS_USERNAME", config.Redis.Username)
config.Redis.Password = getEnv("REDIS_PASSWORD", config.Redis.Password)
config.Redis.Address = getArrEnv("REDIS_ADDRESS", "REDIS_PORT", config.Redis.Address)
config.Object.ApiURL = getEnv("OBJECT_APIURL", config.Object.ApiURL)
config.Object.Minio.Endpoint = getEnv("MINIO_ENDPOINT", config.Object.Minio.Endpoint)
config.Object.Minio.AccessKeyID = getEnv("MINIO_ACCESS_KEY_ID", config.Object.Minio.AccessKeyID)
config.Object.Minio.SecretAccessKey = getEnv("MINIO_SECRET_ACCESS_KEY", config.Object.Minio.SecretAccessKey)
config.Object.Minio.SignEndpoint = getEnv("MINIO_SIGN_ENDPOINT", config.Object.Minio.SignEndpoint)
config.Zookeeper.Schema = getEnv("ZOOKEEPER_SCHEMA", config.Zookeeper.Schema)
config.Zookeeper.ZkAddr = getArrEnv("ZOOKEEPER_ADDRESS", "ZOOKEEPER_PORT", config.Zookeeper.ZkAddr)
config.Zookeeper.Username = getEnv("ZOOKEEPER_USERNAME", config.Zookeeper.Username)
config.Zookeeper.Password = getEnv("ZOOKEEPER_PASSWORD", config.Zookeeper.Password)
config.Kafka.Username = getEnv("KAFKA_USERNAME", config.Kafka.Username)
config.Kafka.Password = getEnv("KAFKA_PASSWORD", config.Kafka.Password)
config.Kafka.Addr = getArrEnv("KAFKA_ADDRESS", "KAFKA_PORT", config.Kafka.Addr)
config.Object.Minio.Endpoint = getMinioAddr("MINIO_ENDPOINT", "MINIO_ADDRESS", "MINIO_PORT", config.Object.Minio.Endpoint)
return nil
func getMinioAddr(key1, key2, key3, fallback string) string {
// Prioritize environment variables
endpoint := getEnv(key1, fallback)
address, addressExist := os.LookupEnv(key2)
port, portExist := os.LookupEnv(key3)
if portExist && addressExist {
endpoint = "http://" + address + ":" + port
return endpoint
return endpoint
// Helper function to get environment variable or default value
func getEnv(key, fallback string) string {
if value, exists := os.LookupEnv(key); exists {
return value
return fallback
// Helper function to get environment variable or default value
func getEnvInt(key string, fallback int) (int, error) {
if value, exists := os.LookupEnv(key); exists {
val, err := strconv.Atoi(value)
if err != nil {
return 0, errs.Wrap(err, "string to int failed")
return val, nil
return fallback, nil
func getArrEnv(key1, key2 string, fallback []string) []string {
address, addrExists := os.LookupEnv(key1)
port, portExists := os.LookupEnv(key2)
if addrExists && portExists {
addresses := strings.Split(address, ",")
for i, addr := range addresses {
addresses[i] = addr + ":" + port
return addresses
if addrExists && !portExists {
addresses := strings.Split(address, ",")
for i, addr := range addresses {
addresses[i] = addr + ":" + "0"
return addresses
if !addrExists && portExists {
result := make([]string, len(fallback))
for i, addr := range fallback {
add := strings.Split(addr, ":")
result[i] = add[0] + ":" + port
return result
return fallback