@ -15,9 +15,13 @@
package main
import (
"errors"
"flag"
"fmt"
"github.com/IBM/sarama"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/zookeeper"
"github.com/openimsdk/open-im-server/v3/pkg/common/kafka"
"os"
"strings"
"time"
@ -34,14 +38,10 @@ const (
// defaultCfgPath is the default path of the configuration file.
defaultCfgPath = "../../../../../config/config.yaml"
maxRetry = 300
componentStartErrCode = 6000
configErrCode = 6001
)
var (
cfgPath = flag . String ( "c" , defaultCfgPath , "Path to the configuration file" )
ErrComponentStart = errs . NewCodeError ( componentStartErrCode , "ComponentStartErr" )
ErrConfig = errs . NewCodeError ( configErrCode , "Config file is incorrect" )
)
func initCfg ( ) error {
@ -55,7 +55,7 @@ func initCfg() error {
type checkFunc struct {
name string
function func ( ) ( string , error )
function func ( ) error
}
func main ( ) {
@ -67,11 +67,13 @@ func main() {
return
}
configGetEnv ( )
checks := [ ] checkFunc {
//{name: "Mysql", function: checkMysql},
{ name : "Mongo" , function : checkMongo } ,
{ name : "Minio" , function : checkMinio } ,
{ name : "Redis" , function : checkRedis } ,
{ name : "Minio" , function : checkMinio } ,
{ name : "Zookeeper" , function : checkZookeeper } ,
{ name : "Kafka" , function : checkKafka } ,
}
@ -82,165 +84,81 @@ func main() {
}
fmt . Printf ( "Checking components Round %v...\n" , i + 1 )
var err error
allSuccess := true
for _ , check := range checks {
str, err : = check . function ( )
err = check . function ( )
if err != nil {
component . ErrorPrint ( fmt . Sprintf ( "Starting %s failed , %v ", check . name , err ) )
component . ErrorPrint ( fmt . Sprintf ( "Starting %s failed :%v. ", check . name , err ) )
allSuccess = false
break
} else {
component . SuccessPrint ( fmt . Sprintf ( "%s connected successfully , %s ", check . name , str ) )
component . SuccessPrint ( fmt . Sprintf ( "%s connected successfully ", check . name ) )
}
}
if allSuccess {
component . SuccessPrint ( "All components started successfully!" )
return
}
}
os . Exit ( 1 )
}
// Helper function to get environment variable or default value
func getEnv ( key , fallback string ) string {
if value , exists := os . LookupEnv ( key ) ; exists {
return value
}
return fallback
}
// checkMongo checks the MongoDB connection without retries
func checkMongo ( ) ( string , error ) {
mongo := & component . Mongo {
Address : config . Config . Mongo . Address ,
Database : config . Config . Mongo . Database ,
Username : config . Config . Mongo . Username ,
Password : config . Config . Mongo . Password ,
MaxPoolSize : config . Config . Mongo . MaxPoolSize ,
}
uri , uriExist := os . LookupEnv ( "MONGO_URI" )
if uriExist {
mongo . URL = uri
func checkMongo ( ) error {
_ , err := unrelation . NewMongo ( )
return err
}
str , err := component . CheckMongo ( mongo )
if err != nil {
return "" , err
}
return str , nil
// checkRedis checks the Redis connection
func checkRedis ( ) error {
_ , err := cache . NewRedis ( )
return err
}
// checkMinio checks the MinIO connection
func checkMinio ( ) ( string , error ) {
func checkMinio ( ) error {
// Check if MinIO is enabled
if config . Config . Object . Enable != "minio" {
return "" , nil
}
endpoint , err := getMinioAddr ( "MINIO_ENDPOINT" , "MINIO_ADDRESS" , "MINIO_PORT" , config . Config . Object . Minio . Endpoint )
if err != nil {
return "" , err
return nil
}
minio := & component . Minio {
ApiURL : config . Config . Object . ApiURL ,
Endpoint : endpoint,
AccessKeyID : getEnv ( "MINIO_ACCESS_KEY_ID" , config . Config . Object . Minio . AccessKeyID ) ,
SecretAccessKey : getEnv ( "MINIO_SECRET_ACCESS_KEY" , config . Config . Object . Minio . SecretAccessKey ) ,
Endpoint : config . Config . Object . Minio . Endpoint ,
AccessKeyID : config . Config . Object . Minio . AccessKeyID ,
SecretAccessKey : config . Config . Object . Minio . SecretAccessKey ,
SignEndpoint : config . Config . Object . Minio . SignEndpoint ,
UseSSL : getEnv ( "MINIO_USE_SSL" , "false" ) ,
}
str , err := component . CheckMinio ( minio )
if err != nil {
return "" , err
}
return str , nil
}
// checkRedis checks the Redis connection
func checkRedis ( ) ( string , error ) {
// Prioritize environment variables
address := getEnv ( "REDIS_ADDRESS" , strings . Join ( config . Config . Redis . Address , "," ) )
username := getEnv ( "REDIS_USERNAME" , config . Config . Redis . Username )
password := getEnv ( "REDIS_PASSWORD" , config . Config . Redis . Password )
redis := & component . Redis {
Address : strings . Split ( address , "," ) ,
Username : username ,
Password : password ,
}
addresses , err := getAddress ( "REDIS_ADDRESS" , "REDIS_PORT" , config . Config . Redis . Address )
if err != nil {
return "" , err
}
redis . Address = addresses
str , err := component . CheckRedis ( redis )
if err != nil {
return "" , err
}
return str , nil
_ , err := component . CheckMinio ( minio )
return err
}
// checkZookeeper checks the Zookeeper connection
func checkZookeeper ( ) ( string , error ) {
// Prioritize environment variables
address := getEnv ( "ZOOKEEPER_ADDRESS" , strings . Join ( config . Config . Zookeeper . ZkAddr , "," ) )
zk := & component . Zookeeper {
Schema : getEnv ( "ZOOKEEPER_SCHEMA" , "digest" ) ,
ZkAddr : strings . Split ( address , "," ) ,
Username : getEnv ( "ZOOKEEPER_USERNAME" , config . Config . Zookeeper . Username ) ,
Password : getEnv ( "ZOOKEEPER_PASSWORD" , config . Config . Zookeeper . Password ) ,
}
addresses , err := getAddress ( "ZOOKEEPER_ADDRESS" , "ZOOKEEPER_PORT" , config . Config . Zookeeper . ZkAddr )
if err != nil {
return "" , nil
}
zk . ZkAddr = addresses
str , err := component . CheckZookeeper ( zk )
if err != nil {
return "" , err
}
return str , nil
func checkZookeeper ( ) error {
_ , err := zookeeper . NewZookeeperDiscoveryRegister ( )
return err
}
// checkKafka checks the Kafka connection
func checkKafka ( ) ( string , error ) {
func checkKafka ( ) error {
// Prioritize environment variables
username := getEnv ( "KAFKA_USERNAME" , config . Config . Kafka . Username )
password := getEnv ( "KAFKA_PASSWORD" , config . Config . Kafka . Password )
address := getEnv ( "KAFKA_ADDRESS" , strings . Join ( config . Config . Kafka . Addr , "," ) )
kafka := & component . Kafka {
Username : username ,
Password : password ,
Addr : strings . Split ( address , "," ) ,
kafkaStu := & component . Kafka {
Username : config . Config . Kafka . Username ,
Password : config . Config . Kafka . Password ,
Addr : config . Config . Kafka . Addr ,
}
addresses, err := getAddress ( "KAFKA_ADDRESS" , "KAFKA_PORT" , config . Config . Kafka . Addr )
_ , kafkaClient , err := component . CheckKafka ( kafkaStu )
if err != nil {
return "" , nil
}
kafka . Addr = addresses
str , kafkaClient , err := component . CheckKafka ( kafka )
if err != nil {
return "" , err
return err
}
defer kafkaClient . Close ( )
// Verify if necessary topics exist
topics , err := kafkaClient . Topics ( )
if err != nil {
return "" , errs . Wrap ( err )
return errs . Wrap ( err )
}
requiredTopics := [ ] string {
@ -251,11 +169,38 @@ func checkKafka() (string, error) {
for _ , requiredTopic := range requiredTopics {
if ! isTopicPresent ( requiredTopic , topics ) {
return "" , ErrComponentStart . Wrap ( fmt . Sprintf ( "Kafka doesn't contain topic: %v" , requiredTopic ) )
return errs . Wrap ( err , fmt . Sprintf ( "Kafka doesn't contain topic: %v" , requiredTopic ) )
}
}
_ , err = kafka . NewMConsumerGroup ( & kafka . MConsumerGroupConfig {
KafkaVersion : sarama . V2_0_0_0 ,
OffsetsInitial : sarama . OffsetNewest , IsReturnErr : false ,
} , [ ] string { config . Config . Kafka . LatestMsgToRedis . Topic } ,
config . Config . Kafka . Addr , config . Config . Kafka . ConsumerGroupID . MsgToRedis )
if err != nil {
return err
}
_ , err = kafka . NewMConsumerGroup ( & kafka . MConsumerGroupConfig {
KafkaVersion : sarama . V2_0_0_0 ,
OffsetsInitial : sarama . OffsetNewest , IsReturnErr : false ,
} , [ ] string { config . Config . Kafka . MsgToPush . Topic } ,
config . Config . Kafka . Addr , config . Config . Kafka . ConsumerGroupID . MsgToMongo )
if err != nil {
return err
}
kafka . NewMConsumerGroup ( & kafka . MConsumerGroupConfig {
KafkaVersion : sarama . V2_0_0_0 ,
OffsetsInitial : sarama . OffsetNewest , IsReturnErr : false ,
} , [ ] string { config . Config . Kafka . MsgToPush . Topic } , config . Config . Kafka . Addr ,
config . Config . Kafka . ConsumerGroupID . MsgToPush )
if err != nil {
return err
}
return str , nil
return nil
}
// isTopicPresent checks if a topic is present in the list of topics
@ -268,42 +213,34 @@ func isTopicPresent(topic string, topics []string) bool {
return false
}
func getAddress ( key1 , key2 string , fallback [ ] string ) ( [ ] string , error ) {
address , addrExist := os . LookupEnv ( key1 )
port , portExist := os . LookupEnv ( key2 )
if addrExist && portExist {
addresses := strings . Split ( address , "," )
for i , addr := range addresses {
addresses [ i ] = addr + ":" + port
}
return addresses , nil
} else if ! addrExist && portExist {
result := make ( [ ] string , len ( config . Config . Redis . Address ) )
for i , addr := range config . Config . Redis . Address {
add := strings . Split ( addr , ":" )
result [ i ] = add [ 0 ] + ":" + port
}
return result , nil
} else if addrExist && ! portExist {
return nil , errs . Wrap ( errors . New ( "the ZOOKEEPER_PORT of minio is empty" ) )
}
return fallback , nil
func configGetEnv ( ) {
config . Config . Object . Minio . AccessKeyID = getEnv ( "MINIO_ACCESS_KEY_ID" , config . Config . Object . Minio . AccessKeyID )
config . Config . Object . Minio . SecretAccessKey = getEnv ( "MINIO_SECRET_ACCESS_KEY" , config . Config . Object . Minio . SecretAccessKey )
config . Config . Mongo . Uri = getEnv ( "MONGO_URI" , config . Config . Mongo . Uri )
config . Config . Mongo . Username = getEnv ( "MONGO_OPENIM_USERNAME" , config . Config . Mongo . Username )
config . Config . Mongo . Password = getEnv ( "MONGO_OPENIM_PASSWORD" , config . Config . Mongo . Password )
config . Config . Kafka . Username = getEnv ( "KAFKA_USERNAME" , config . Config . Kafka . Username )
config . Config . Kafka . Password = getEnv ( "KAFKA_PASSWORD" , config . Config . Kafka . Password )
config . Config . Kafka . Addr = strings . Split ( getEnv ( "KAFKA_ADDRESS" , strings . Join ( config . Config . Kafka . Addr , "," ) ) , "," )
config . Config . Object . Minio . Endpoint = getMinioAddr ( "MINIO_ENDPOINT" , "MINIO_ADDRESS" , "MINIO_PORT" , config . Config . Object . Minio . Endpoint )
}
func getMinioAddr ( key1 , key2 , key3 , fallback string ) ( string , error ) {
func getMinioAddr ( key1 , key2 , key3 , fallback string ) string {
// Prioritize environment variables
endpoint := getEnv ( key1 , fallback )
address , addressExist := os . LookupEnv ( key2 )
port , portExist := os . LookupEnv ( key3 )
if portExist && addressExist {
endpoint = "http://" + address + ":" + port
} else if ! portExist && addressExist {
return "" , errs . Wrap ( errors . New ( "the MINIO_PORT of minio is empty" ) )
} else if portExist && ! addressExist {
arr := strings . Split ( config . Config . Object . Minio . Endpoint , ":" )
arr [ 2 ] = port
endpoint = strings . Join ( arr , ":" )
}
return endpoint , nil
return endpoint
}
return endpoint
}
// Helper function to get environment variable or default value
func getEnv ( key , fallback string ) string {
if value , exists := os . LookupEnv ( key ) ; exists {
return value
}
return fallback
}