feat: optimize componten

pull/2148/head
Xinwei Xiong (cubxxw) 2 years ago
parent 2dd52d5274
commit 2b93ce0299

@ -1,20 +0,0 @@
package checks
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
)
type ServiceChecker interface {
Check(ctx context.Context, config *config.GlobalConfig) error
}
func CheckServices(ctx context.Context, cfg *config.GlobalConfig, checkers []ServiceChecker) error {
for _, checker := range checkers {
if err := checker.Check(ctx, cfg); err != nil {
return err
}
}
return nil
}

@ -12,6 +12,7 @@ type KafkaCheck struct {
Kafka *config.Kafka
}
func CheckKafka(ctx context.Context, config *KafkaCheck) error {
kafkaConfig := &kafka.Config{
Addr: config.Kafka.Addr,

@ -1,107 +1,79 @@
package checks
import (
"errors"
"context"
"net"
"net/url"
"strings"
"time"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/jsonutil"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
s3minio "github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/minio"
)
const (
minioHealthCheckDuration = 1
mongoConnTimeout = 5 * time.Second
MaxRetry = 300
minioHealthCheckDuration = 1 * time.Second
)
type MinioConfig struct {
type MinioCheck struct {
s3minio.Config
UseSSL string
ApiURL string
UseSSL bool `yaml:"useSSL"`
ApiURL string `yaml:"apiURL"`
}
// CheckMinio checks the MinIO connection.
func CheckMinio(minioStu MinioConfig) error {
if minioStu.Endpoint == "" || minioStu.AccessKeyID == "" || minioStu.SecretAccessKey == "" {
log.CInfo(nil, "Missing configuration for MinIO", "endpoint", minioStu.Endpoint, "accessKeyID", minioStu.AccessKeyID, "secretAccessKey", minioStu.SecretAccessKey)
return errs.New("missing configuration for endpoint, accessKeyID, or secretAccessKey").Wrap()
}
func CheckMinio(ctx context.Context, config MinioCheck) error {
minioInfo, err := jsonutil.JsonMarshal(minioStu)
if err != nil {
log.CInfo(nil, "MinioStu Marshal failed", "error", err)
return errs.WrapMsg(err, "minioStu Marshal failed")
if config.Endpoint == "" || config.AccessKeyID == "" || config.SecretAccessKey == "" {
logMsg := "Missing configuration for MinIO: endpoint, accessKeyID, or secretAccessKey"
log.CInfo(ctx, logMsg, "Config", config)
return errs.New(logMsg)
}
logJsonInfo := string(minioInfo)
u, err := url.Parse(minioStu.Endpoint)
endpointURL, err := url.Parse(config.Endpoint)
if err != nil {
log.CInfo(nil, "URL parse failed", "error", err, "minioInfo", logJsonInfo)
return errs.WrapMsg(err, "url parse failed")
return errs.WrapMsg(err, "Failed to parse MinIO endpoint URL")
}
secure := endpointURL.Scheme == "https" || config.UseSSL
secure := u.Scheme == "https" || minioStu.UseSSL == "true"
minioClient, err := minio.New(u.Host, &minio.Options{
Creds: credentials.NewStaticV4(minioStu.AccessKeyID, minioStu.SecretAccessKey, ""),
minioClient, err := minio.New(endpointURL.Host, &minio.Options{
Creds: credentials.NewStaticV4(config.AccessKeyID, config.SecretAccessKey, ""),
Secure: secure,
})
if err != nil {
log.CInfo(nil, "Initialize MinIO client failed", "error", err, "minioInfo", logJsonInfo)
return errs.WrapMsg(err, "initialize minio client failed")
return errs.WrapMsg(err, "Failed to initialize MinIO client", "Endpoint", config.Endpoint)
}
cancel, err := minioClient.HealthCheck(time.Duration(minioHealthCheckDuration) * time.Second)
cancel, err := minioClient.HealthCheck(minioHealthCheckDuration)
if err != nil {
log.CInfo(nil, "MinIO client health check failed", "error", err, "minioInfo", logJsonInfo)
return errs.WrapMsg(err, "minio client health check failed")
return errs.WrapMsg(err, "MinIO client health check failed")
}
defer cancel()
if minioClient.IsOffline() {
log.CInfo(nil, "MinIO client is offline", "minioInfo", logJsonInfo)
return errors.New("minio client is offline")
return errs.New("minio client is offline").Wrap()
}
apiURL, err := exactIP(minioStu.ApiURL)
if err != nil {
return err
}
signEndPoint, err := exactIP(minioStu.SignEndpoint)
if err != nil {
return err
apiURLHost, _ := exactIP(config.ApiURL)
signEndpointHost, _ := exactIP(config.SignEndpoint)
if apiURLHost == "127.0.0.1" || signEndpointHost == "127.0.0.1" {
logMsg := "Warning: MinIO ApiURL or SignEndpoint contains localhost"
log.CInfo(ctx, logMsg, "ApiURL", config.ApiURL, "SignEndpoint", config.SignEndpoint)
}
if apiURL == "127.0.0.1" {
log.CInfo(nil, "Warning, MinIOStu.apiURL contains localhost", "apiURL", minioStu.ApiURL)
}
if signEndPoint == "127.0.0.1" {
log.CInfo(nil, "Warning, MinIOStu.signEndPoint contains localhost", "signEndPoint", minioStu.SignEndpoint)
}
return nil
}
func exactIP(urlStr string) (string, error) {
u, err := url.Parse(urlStr)
if err != nil {
log.CInfo(nil, "URL parse error", "error", err, "url", urlStr)
return "", errs.WrapMsg(err, "url parse error")
return "", errs.WrapMsg(err, "URL parse error")
}
host, _, err := net.SplitHostPort(u.Host)
if err != nil {
host = u.Host // Assume the entire host part is the host name if split fails
}
if strings.HasSuffix(host, ":") {
host = host[:len(host)-1]
host = u.Host
}
return host, nil
}

@ -12,7 +12,7 @@ type ZookeeperCheck struct {
Zookeeper *config.Zookeeper
}
func checkZookeeper(ctx context.Context, config *ZookeeperCheck) error {
func CheckZookeeper(ctx context.Context, config *ZookeeperCheck) error {
zkServers := config.Zookeeper.ZkAddr
schema := config.Zookeeper.Schema

@ -16,21 +16,22 @@ package main
import (
"context"
"errors"
"flag"
"fmt"
"os"
"strings"
"strconv"
"time"
"github.com/OpenIMSDK/tools/component"
kfk "github.com/openimsdk/tools/mq/kafka"
"github.com/openimsdk/open-im-server/v3/tools/component/checks"
"github.com/openimsdk/open-im-server/v3/tools/component/util"
"github.com/openimsdk/tools/log"
"gopkg.in/yaml.v2"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/minio"
)
const (
@ -57,191 +58,88 @@ func initCfg() (*config.GlobalConfig, error) {
return conf, nil
}
type checkFunc struct {
name string
function func(*config.GlobalConfig) error
flag bool
config *config.GlobalConfig
}
// colorErrPrint prints formatted string in red to stderr
func colorErrPrint(msg string) {
// ANSI escape code for red text
const redColor = "\033[31m"
// ANSI escape code to reset color
const resetColor = "\033[0m"
msg = redColor + msg + resetColor
// Print to stderr in red
fmt.Fprintf(os.Stderr, "%s\n", msg)
}
func colorSuccessPrint(format string, a ...interface{}) {
// ANSI escape code for green text is \033[32m
// \033[0m resets the color
fmt.Printf("\033[32m"+format+"\033[0m", a...)
}
func main() {
flag.Parse()
ctx := context.Background()
conf, err := initCfg()
if err != nil {
fmt.Printf("Read config failed: %v\n", err)
return
fmt.Fprintf(os.Stderr, "Initialization failed: %v\n", err)
os.Exit(1)
}
err = configGetEnv(conf)
if err != nil {
fmt.Printf("configGetEnv failed, err:%v", err)
return
if err := util.ConfigGetEnv(conf); err != nil {
fmt.Fprintf(os.Stderr, "Environment variable override failed: %v\n", err)
os.Exit(1)
}
checks := []checkFunc{
{name: "Mongo", function: checkMongo, config: conf},
{name: "Redis", function: checkRedis, config: conf},
{name: "Zookeeper", function: checkZookeeper, config: conf},
{name: "Kafka", function: checkKafka, config: conf},
// Define a slice of functions to perform each service check
serviceChecks := []func(context.Context, *config.GlobalConfig) error{
func(ctx context.Context, cfg *config.GlobalConfig) error {
return checks.CheckMongo(ctx, &checks.MongoCheck{Mongo: &cfg.Mongo})
},
func(ctx context.Context, cfg *config.GlobalConfig) error {
return checks.CheckRedis(ctx, &checks.RedisCheck{Redis: &cfg.Redis})
},
func(ctx context.Context, cfg *config.GlobalConfig) error {
return checks.CheckZookeeper(ctx, &checks.ZookeeperCheck{Zookeeper: &cfg.Zookeeper})
},
func(ctx context.Context, cfg *config.GlobalConfig) error {
return checks.CheckKafka(ctx, &checks.KafkaCheck{Kafka: &cfg.Kafka})
},
}
if conf.Object.Enable == "minio" {
checks = append(checks, checkFunc{name: "Minio", function: checkMinio, config: conf})
minioConfig := checks.MinioCheck{
Config: minio.Config(conf.Object.Minio),
// UseSSL: conf.Minio.UseSSL,
ApiURL: conf.Object.ApiURL,
}
adjustUseSSL(&minioConfig)
minioCheck := func(ctx context.Context, cfg *config.GlobalConfig) error {
return checks.CheckMinio(ctx, minioConfig)
}
serviceChecks = append(serviceChecks, minioCheck)
}
// Execute checks with retry logic
for i := 0; i < maxRetry; i++ {
if i != 0 {
time.Sleep(1 * time.Second)
if i > 0 {
time.Sleep(time.Second)
}
fmt.Printf("Checking components round %v...\n", i+1)
fmt.Printf("Checking components, attempt %d/%d\n", i+1, maxRetry)
var err error
allSuccess := true
for index, check := range checks {
if !check.flag {
err = check.function(check.config)
if err != nil {
allSuccess = false
colorErrPrint(fmt.Sprintf("Check component: %s, failed: %v", check.name, err.Error()))
if check.name == "Minio" {
if errors.Is(err, errMinioNotEnabled) ||
errors.Is(err, errSignEndPoint) ||
errors.Is(err, errApiURL) {
checks[index].flag = true
continue
}
break
}
} else {
checks[index].flag = true
util.SuccessPrint(fmt.Sprintf("%s connected successfully", check.name))
}
for _, check := range serviceChecks {
if err := check(ctx, conf); err != nil {
util.ColorErrPrint(fmt.Sprintf("Check failed: %v", err))
allSuccess = false
break
}
}
if allSuccess {
component.SuccessPrint("All components started successfully!")
util.SuccessPrint("All components started successfully!")
return
}
}
component.ErrorPrint("Some components checked failed!")
os.Exit(-1)
}
var errMinioNotEnabled = errors.New("minio.Enable is not configured to use MinIO")
var errSignEndPoint = errors.New("minio.signEndPoint contains 127.0.0.1, causing issues with image sending")
var errApiURL = errors.New("object.apiURL contains 127.0.0.1, causing issues with image sending")
// checkMongo checks the MongoDB connection without retries
func checkMongo(config *config.GlobalConfig) error {
mongoStu := &component.Mongo{
URL: config.Mongo.Uri,
Address: config.Mongo.Address,
Database: config.Mongo.Database,
Username: config.Mongo.Username,
Password: config.Mongo.Password,
MaxPoolSize: config.Mongo.MaxPoolSize,
}
err := component.CheckMongo(mongoStu)
return err
}
// checkRedis checks the Redis connection
func checkRedis(config *config.GlobalConfig) error {
redisStu := &component.Redis{
Address: config.Redis.Address,
Username: config.Redis.Username,
Password: config.Redis.Password,
}
err := component.CheckRedis(redisStu)
return err
}
// checkMinio checks the MinIO connection
func checkMinio(config *config.GlobalConfig) error {
if strings.Contains(config.Object.ApiURL, "127.0.0.1") {
return errs.Wrap(errApiURL)
}
if config.Object.Enable != "minio" {
return errs.Wrap(errMinioNotEnabled)
}
if strings.Contains(config.Object.Minio.Endpoint, "127.0.0.1") {
return errs.Wrap(errSignEndPoint)
}
minio := &component.Minio{
ApiURL: config.Object.ApiURL,
Endpoint: config.Object.Minio.Endpoint,
AccessKeyID: config.Object.Minio.AccessKeyID,
SecretAccessKey: config.Object.Minio.SecretAccessKey,
SignEndpoint: config.Object.Minio.SignEndpoint,
UseSSL: getEnv("MINIO_USE_SSL", "false"),
}
err := component.CheckMinio(minio)
return err
}
// checkZookeeper checks the Zookeeper connection
func checkZookeeper(config *config.GlobalConfig) error {
zkStu := &component.Zookeeper{
Schema: config.Zookeeper.Schema,
ZkAddr: config.Zookeeper.ZkAddr,
Username: config.Zookeeper.Username,
Password: config.Zookeeper.Password,
}
err := component.CheckZookeeper(zkStu)
return err
}
// checkKafka checks the Kafka connection
func checkKafka(config *config.GlobalConfig) error {
topics := []string{
config.Kafka.MsgToMongo.Topic,
config.Kafka.MsgToPush.Topic,
config.Kafka.LatestMsgToRedis.Topic,
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()
return kfk.CheckKafka(ctx, &config.Kafka.Config, topics)
util.ErrorPrint("Some components failed to start correctly.")
os.Exit(-1)
}
// isTopicPresent checks if a topic is present in the list of topics
func isTopicPresent(topic string, topics []string) bool {
for _, t := range topics {
if t == topic {
return true
// adjustUseSSL updates the UseSSL setting based on the MINIO_USE_SSL environment variable.
func adjustUseSSL(config *checks.MinioCheck) {
useSSL := config.UseSSL
if envSSL, exists := os.LookupEnv("MINIO_USE_SSL"); exists {
parsedSSL, err := strconv.ParseBool(envSSL)
if err == nil {
useSSL = parsedSSL
} else {
log.CInfo(context.Background(), "Invalid MINIO_USE_SSL value; using config file setting.", "MINIO_USE_SSL", envSSL)
}
}
return false
}
func getMinioAddr(key1, key2, key3, fallback string) string {
// Prioritize environment variables
endpoint := getEnv(key1, fallback)
address, addressExist := os.LookupEnv(key2)
port, portExist := os.LookupEnv(key3)
if portExist && addressExist {
endpoint = "http://" + address + ":" + port
return endpoint
}
return endpoint
config.UseSSL = useSSL
}

@ -1,4 +1,4 @@
package env
package util
import (
"os"
@ -9,7 +9,7 @@ import (
"github.com/openimsdk/tools/errs"
)
func configGetEnv(config *config.GlobalConfig) error {
func ConfigGetEnv(config *config.GlobalConfig) error {
config.Mongo.Uri = getEnv("MONGO_URI", config.Mongo.Uri)
config.Mongo.Username = getEnv("MONGO_OPENIM_USERNAME", config.Mongo.Username)
config.Mongo.Password = getEnv("MONGO_OPENIM_PASSWORD", config.Mongo.Password)
@ -93,3 +93,15 @@ func getArrEnv(key1, key2 string, fallback []string) []string {
}
return fallback
}
func getMinioAddr(key1, key2, key3, fallback string) string {
// Prioritize environment variables
endpoint := getEnv(key1, fallback)
address, addressExist := os.LookupEnv(key2)
port, portExist := os.LookupEnv(key3)
if portExist && addressExist {
endpoint = "http://" + address + ":" + port
return endpoint
}
return endpoint
}

@ -0,0 +1,54 @@
package util
import (
"fmt"
"log"
"os"
)
const (
colorRed = 31
colorGreen = 32
colorYellow = 33
)
// colorErrPrint prints formatted string in red to stderr
func ColorErrPrint(msg string) {
// ANSI escape code for red text
const redColor = "\033[31m"
// ANSI escape code to reset color
const resetColor = "\033[0m"
msg = redColor + msg + resetColor
// Print to stderr in red
fmt.Fprintf(os.Stderr, "%s\n", msg)
}
func ColorSuccessPrint(format string, a ...interface{}) {
// ANSI escape code for green text is \033[32m
// \033[0m resets the color
fmt.Printf("\033[32m"+format+"\033[0m", a...)
}
func colorPrint(colorCode int, format string, a ...any) {
fmt.Printf("\x1b[%dm%s\x1b[0m\n", colorCode, fmt.Sprintf(format, a...))
}
func colorErrPrint(colorCode int, format string, a ...any) {
log.Printf("\x1b[%dm%s\x1b[0m\n", colorCode, fmt.Sprintf(format, a...))
}
func ErrorPrint(s string) {
colorErrPrint(colorRed, "%v", s)
}
func SuccessPrint(s string) {
colorPrint(colorGreen, "%v", s)
}
func WarningPrint(s string) {
colorPrint(colorYellow, "Warning: But %v", s)
}
func ErrStr(err error, str string) error {
return fmt.Errorf("%v;%s", err, str)
}
Loading…
Cancel
Save