From 88eb7d5b823926b0f9ee3f47a2eb6d61455437c8 Mon Sep 17 00:00:00 2001 From: "Xinwei Xiong (cubxxw)" <3293172751nss@gmail.com> Date: Wed, 27 Mar 2024 23:08:40 +0800 Subject: [PATCH] feat: fix components design --- docs/contrib/go-code.md | 36 +++---- go.mod | 2 +- go.sum | 3 + internal/api/route.go | 3 +- pkg/common/db/unrelation/doc.go | 1 + pkg/common/kafka/doc.go | 1 + pkg/common/servererrs/doc.go | 1 + pkg/util/conversationutil/doc.go | 1 + tools/component/checks/checker.go | 20 ++++ tools/component/checks/etcd.go | 10 ++ tools/component/checks/kafka.go | 38 +++++++ tools/component/checks/minio.go | 107 ++++++++++++++++++++ tools/component/checks/mongo.go | 36 +++++++ tools/component/checks/redis.go | 32 ++++++ tools/component/checks/zookeeper.go | 31 ++++++ tools/component/component.go | 147 +--------------------------- tools/component/config/config.go | 3 + 17 files changed, 308 insertions(+), 164 deletions(-) create mode 100644 pkg/common/db/unrelation/doc.go create mode 100644 pkg/common/kafka/doc.go create mode 100644 pkg/common/servererrs/doc.go create mode 100644 pkg/util/conversationutil/doc.go create mode 100644 tools/component/checks/checker.go create mode 100644 tools/component/checks/etcd.go create mode 100644 tools/component/checks/kafka.go create mode 100644 tools/component/checks/minio.go create mode 100644 tools/component/checks/mongo.go create mode 100644 tools/component/checks/redis.go create mode 100644 tools/component/checks/zookeeper.go create mode 100644 tools/component/config/config.go diff --git a/docs/contrib/go-code.md b/docs/contrib/go-code.md index 22f5b56e2..38c876d7d 100644 --- a/docs/contrib/go-code.md +++ b/docs/contrib/go-code.md @@ -335,8 +335,6 @@ The use of `panic` should be carefully controlled in Go applications to ensure p - **Restricted Use in Main Package:** In the main package, the use of `panic` should be reserved for situations where the program is entirely inoperable, such as failure to open essential files, inability to connect to the database, or other critical startup issues. Even in these scenarios, prefer using structured error handling to terminate the program. -- **Use `log.Fatal` for Critical Errors:** Instead of panicking, use `log.Fatal` to log critical errors that prevent the program from operating normally. This approach allows the program to terminate while ensuring the error is properly logged for troubleshooting. - - **Prohibition on Exportable Interfaces:** Exportable interfaces must not invoke `panic`. They should handle errors gracefully and return errors as part of their contract. - **Prefer Errors Over Panic:** It is recommended to use error returns instead of panic to convey errors within a package. This approach promotes error handling that integrates smoothly with Go's error handling idioms. @@ -418,27 +416,31 @@ The naming convention is a very important part of the code specification. A unif - Don't use broad, meaningless package names like common, util, shared or lib. - The package name should be simple and clear, such as net, time, log. -### 2.2 Function Naming -- The function name is in camel case, and the first letter is uppercase or lowercase according to the access control decision,For example: `MixedCaps` or `mixedCaps`. -- Code automatically generated by code generation tools (such as `xxxx.pb.go`) and underscores used to group related test cases (such as `TestMyFunction_WhatIsBeingTested`) exclude this rule. +### 2.2 Function Naming Conventions + +Function names should adhere to the following guidelines, inspired by OpenIM’s standards and Google’s Go Style Guide: + +- Use camel case for function names. Start with an uppercase letter for public functions (`MixedCaps`) and a lowercase letter for private functions (`mixedCaps`). +- Exceptions to this rule include code automatically generated by tools (e.g., `xxxx.pb.go`) and test functions that use underscores for clarity (e.g., `TestMyFunction_WhatIsBeingTested`). + +### 2.3 File and Directory Naming Practices + +To maintain consistency and readability across the OpenIM project, observe the following naming practices: + +**File Names:** +- Use underscores (`_`) as the default separator in filenames, keeping them short and descriptive. +- Both hyphens (`-`) and underscores (`_`) are allowed, but underscores are preferred for general use. -In accordance with the naming conventions adopted by OpenIM and drawing reference from the Google Naming Conventions as per the guidelines available at https://google.github.io/styleguide/go/, the following expectations for naming practices within the project are set forth: +**Script and Markdown Files:** +- Prefer hyphens (`-`) for shell scripts and Markdown (`.md`) files to enhance searchability and web compatibility. -1. **File Names:** - + Both hyphens (`-`) and underscores (`_`) are permitted when naming files. - + A preference is established for the use of underscores (`_`), suggesting it as the best practice in general scenarios. -2. **Script and Markdown Files:** - + For shell scripts (bash files) and Markdown (`.md`) documents, the use of hyphens (`-`) is recommended. - + This recommendation is based on the improved searchability and compatibility in web browsers when hyphens are used in names. -3. **Directories:** - + A consistent approach is mandated for naming directories, exclusively using hyphens (`-`) to separate words within directory names. +**Directories:** +- Name directories with hyphens (`-`) exclusively to separate words, ensuring consistency and readability. +Remember to keep filenames lowercase and use meaningful, concise identifiers to facilitate better organization and navigation within the project. -### 2.3 File Naming -- Keep the filename short and meaningful. -- Filenames should be lowercase and use underscores to separate words. ### 2.4 Structure Naming diff --git a/go.mod b/go.mod index 3589daf74..d049fda07 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/localcache v0.0.1 github.com/openimsdk/protocol v0.0.58-google - github.com/openimsdk/tools v0.0.47-alpha.5 + github.com/openimsdk/tools v0.0.47-alpha.6 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/sirupsen/logrus v1.9.3 // indirect diff --git a/go.sum b/go.sum index 2e1db8322..fa95ed0ba 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,7 @@ firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIw github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/IBM/sarama v1.42.2 h1:VoY4hVIZ+WQJ8G9KNY/SQlWguBQXQ9uvFPOnrcu8hEw= github.com/IBM/sarama v1.42.2/go.mod h1:FLPGUGwYqEs62hq2bVG6Io2+5n+pS6s/WOXVKWSLFtE= +github.com/IBM/sarama v1.43.0/go.mod h1:zlE6HEbC/SMQ9mhEYaF7nNLYOUyrs0obySKCckWP9BM= github.com/OpenIMSDK/protocol v0.0.56 h1:mbVFyDBachEsmJLfYW5AU1z2KL8AUEpoHG8RPCIxjgg= github.com/OpenIMSDK/protocol v0.0.56/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= github.com/OpenIMSDK/tools v0.0.37 h1:qvDqmA4RbEJtPjZouWCkVuf/pjm6Y8nUrG5iH2gcnOg= @@ -282,6 +283,8 @@ github.com/openimsdk/tools v0.0.47-alpha.4 h1:18BJ3A7G5gN4DxtVSWuiNH1ZgaTd8blyts github.com/openimsdk/tools v0.0.47-alpha.4/go.mod h1:7LNWVXlqHUhTXeETk2ZcBPVzUFIjafHpP5SHh9E+9AY= github.com/openimsdk/tools v0.0.47-alpha.5 h1:CcLnq0Ne30f9aachsN/LpW1p+C0bzIoIcwen5eCA6to= github.com/openimsdk/tools v0.0.47-alpha.5/go.mod h1:7LNWVXlqHUhTXeETk2ZcBPVzUFIjafHpP5SHh9E+9AY= +github.com/openimsdk/tools v0.0.47-alpha.6 h1:GwV026servNNisB5MjlLr3G497+4z7hy6Rc+E14IElc= +github.com/openimsdk/tools v0.0.47-alpha.6/go.mod h1:7LNWVXlqHUhTXeETk2ZcBPVzUFIjafHpP5SHh9E+9AY= github.com/pelletier/go-buffruneio v0.2.0/go.mod h1:JkE26KsDizTr40EUHkXVtNPvgGtbSNq5BcowyYOWdKo= github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ= github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4= diff --git a/internal/api/route.go b/internal/api/route.go index 202c8f18d..5a82fa54d 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -104,8 +104,7 @@ func Start(ctx context.Context, config *config.GlobalConfig, port int, proPort i } server := http.Server{Addr: address, Handler: router} - log.CInfo(ctx, "API server is initializing", "address", address, "apiPort", port, - "prometheusPort", proPort) + log.CInfo(ctx, "API server is initializing", "address", address, "apiPort", port, "prometheusPort", proPort) go func() { err = server.ListenAndServe() if err != nil && err != http.ErrServerClosed { diff --git a/pkg/common/db/unrelation/doc.go b/pkg/common/db/unrelation/doc.go new file mode 100644 index 000000000..a6a696306 --- /dev/null +++ b/pkg/common/db/unrelation/doc.go @@ -0,0 +1 @@ +package unrelation // import "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" diff --git a/pkg/common/kafka/doc.go b/pkg/common/kafka/doc.go new file mode 100644 index 000000000..579a1779a --- /dev/null +++ b/pkg/common/kafka/doc.go @@ -0,0 +1 @@ +package kafka // import "github.com/openimsdk/open-im-server/v3/pkg/common/kafka" diff --git a/pkg/common/servererrs/doc.go b/pkg/common/servererrs/doc.go new file mode 100644 index 000000000..d408a7514 --- /dev/null +++ b/pkg/common/servererrs/doc.go @@ -0,0 +1 @@ +package servererrs // import "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" diff --git a/pkg/util/conversationutil/doc.go b/pkg/util/conversationutil/doc.go new file mode 100644 index 000000000..90325ccb0 --- /dev/null +++ b/pkg/util/conversationutil/doc.go @@ -0,0 +1 @@ +package conversationutil // import "github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil" diff --git a/tools/component/checks/checker.go b/tools/component/checks/checker.go new file mode 100644 index 000000000..4e2af5eef --- /dev/null +++ b/tools/component/checks/checker.go @@ -0,0 +1,20 @@ +package checks + +import ( + "context" + + "github.com/openimsdk/open-im-server/v3/pkg/common/config" +) + +type ServiceChecker interface { + Check(ctx context.Context, config *config.GlobalConfig) error +} + +func CheckServices(ctx context.Context, cfg *config.GlobalConfig, checkers []ServiceChecker) error { + for _, checker := range checkers { + if err := checker.Check(ctx, cfg); err != nil { + return err + } + } + return nil +} diff --git a/tools/component/checks/etcd.go b/tools/component/checks/etcd.go new file mode 100644 index 000000000..e85c5d636 --- /dev/null +++ b/tools/component/checks/etcd.go @@ -0,0 +1,10 @@ +package checks + +import "context" + +type EtcdCheck struct{} + +func CheckEtcd(ctx context.Context, config *EtcdCheck) error { + + return nil +} diff --git a/tools/component/checks/kafka.go b/tools/component/checks/kafka.go new file mode 100644 index 000000000..34874719c --- /dev/null +++ b/tools/component/checks/kafka.go @@ -0,0 +1,38 @@ +package checks + +import ( + "context" + + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/mq/kafka" +) + +type KafkaCheck struct { + Kafka *config.Kafka +} + +func CheckKafka(ctx context.Context, config *KafkaCheck) error { + kafkaConfig := &kafka.Config{ + Addr: config.Kafka.Addr, + Username: config.Kafka.Username, + Password: config.Kafka.Password, + } + + requiredTopics := []string{ + config.Kafka.MsgToMongo.Topic, + config.Kafka.MsgToPush.Topic, + config.Kafka.LatestMsgToRedis.Topic, + } + + log.CInfo(ctx, "Checking Kafka connection", "Address", kafkaConfig.Addr, "Topics", requiredTopics) + + err := kafka.CheckKafka(ctx, kafkaConfig, requiredTopics) + if err != nil { + log.CInfo(ctx, "Kafka connection failed", "error", err) + return err + } + + log.CInfo(ctx, "Kafka connection and required topics verified successfully") + return nil +} diff --git a/tools/component/checks/minio.go b/tools/component/checks/minio.go new file mode 100644 index 000000000..7271ee6d9 --- /dev/null +++ b/tools/component/checks/minio.go @@ -0,0 +1,107 @@ +package checks + +import ( + "errors" + "net" + "net/url" + "strings" + "time" + + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/utils/jsonutil" + + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/openimsdk/tools/log" + + s3minio "github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/minio" +) + +const ( + minioHealthCheckDuration = 1 + mongoConnTimeout = 5 * time.Second + MaxRetry = 300 +) + +type MinioConfig struct { + s3minio.Config + UseSSL string + ApiURL string +} + +// CheckMinio checks the MinIO connection. +func CheckMinio(minioStu MinioConfig) error { + if minioStu.Endpoint == "" || minioStu.AccessKeyID == "" || minioStu.SecretAccessKey == "" { + log.CInfo(nil, "Missing configuration for MinIO", "endpoint", minioStu.Endpoint, "accessKeyID", minioStu.AccessKeyID, "secretAccessKey", minioStu.SecretAccessKey) + return errs.New("missing configuration for endpoint, accessKeyID, or secretAccessKey").Wrap() + } + + minioInfo, err := jsonutil.JsonMarshal(minioStu) + if err != nil { + log.CInfo(nil, "MinioStu Marshal failed", "error", err) + return errs.WrapMsg(err, "minioStu Marshal failed") + } + logJsonInfo := string(minioInfo) + + u, err := url.Parse(minioStu.Endpoint) + if err != nil { + log.CInfo(nil, "URL parse failed", "error", err, "minioInfo", logJsonInfo) + return errs.WrapMsg(err, "url parse failed") + } + + secure := u.Scheme == "https" || minioStu.UseSSL == "true" + + minioClient, err := minio.New(u.Host, &minio.Options{ + Creds: credentials.NewStaticV4(minioStu.AccessKeyID, minioStu.SecretAccessKey, ""), + Secure: secure, + }) + if err != nil { + log.CInfo(nil, "Initialize MinIO client failed", "error", err, "minioInfo", logJsonInfo) + return errs.WrapMsg(err, "initialize minio client failed") + } + + cancel, err := minioClient.HealthCheck(time.Duration(minioHealthCheckDuration) * time.Second) + if err != nil { + log.CInfo(nil, "MinIO client health check failed", "error", err, "minioInfo", logJsonInfo) + return errs.WrapMsg(err, "minio client health check failed") + } + defer cancel() + + if minioClient.IsOffline() { + log.CInfo(nil, "MinIO client is offline", "minioInfo", logJsonInfo) + return errors.New("minio client is offline") + } + + apiURL, err := exactIP(minioStu.ApiURL) + if err != nil { + return err + } + signEndPoint, err := exactIP(minioStu.SignEndpoint) + if err != nil { + return err + } + + if apiURL == "127.0.0.1" { + log.CInfo(nil, "Warning, MinIOStu.apiURL contains localhost", "apiURL", minioStu.ApiURL) + } + if signEndPoint == "127.0.0.1" { + log.CInfo(nil, "Warning, MinIOStu.signEndPoint contains localhost", "signEndPoint", minioStu.SignEndpoint) + } + return nil +} + +func exactIP(urlStr string) (string, error) { + u, err := url.Parse(urlStr) + if err != nil { + log.CInfo(nil, "URL parse error", "error", err, "url", urlStr) + return "", errs.WrapMsg(err, "url parse error") + } + host, _, err := net.SplitHostPort(u.Host) + if err != nil { + host = u.Host // Assume the entire host part is the host name if split fails + } + if strings.HasSuffix(host, ":") { + host = host[:len(host)-1] + } + return host, nil +} diff --git a/tools/component/checks/mongo.go b/tools/component/checks/mongo.go new file mode 100644 index 000000000..a149ca82e --- /dev/null +++ b/tools/component/checks/mongo.go @@ -0,0 +1,36 @@ +package checks + +import ( + "context" + + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/log" +) + +type MongoCheck struct { + Mongo *config.Mongo +} + +func CheckMongo(ctx context.Context, config *MongoCheck) error { + mongoConfig := &mongoutil.Config{ + Uri: config.Mongo.Uri, + Address: config.Mongo.Address, + Database: config.Mongo.Database, + Username: config.Mongo.Username, + Password: config.Mongo.Password, + MaxPoolSize: config.Mongo.MaxPoolSize, + MaxRetry: 0, + } + + log.CInfo(ctx, "Checking MongoDB connection", "URI", mongoConfig.Uri, "Database", mongoConfig.Database) + + err := mongoutil.CheckMongo(ctx, mongoConfig) + if err != nil { + log.CInfo(ctx, "MongoDB connection failed", "error", err) + return err + } + + log.CInfo(ctx, "MongoDB connection established successfully") + return nil +} diff --git a/tools/component/checks/redis.go b/tools/component/checks/redis.go new file mode 100644 index 000000000..225951037 --- /dev/null +++ b/tools/component/checks/redis.go @@ -0,0 +1,32 @@ +package checks + +import ( + "context" + + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/tools/db/redisutil" + "github.com/openimsdk/tools/log" +) + +type RedisCheck struct { + Redis *config.Redis +} + +func CheckRedis(ctx context.Context, config *RedisCheck) error { + redisConfig := &redisutil.Config{ + Address: config.Redis.Address, + Username: config.Redis.Username, + Password: config.Redis.Password, + } + + log.CInfo(ctx, "Checking Redis connection", "Address", redisConfig.Address) + + err := redisutil.CheckRedis(ctx, redisConfig) + if err != nil { + log.CInfo(ctx, "Redis connection failed", "error", err) + return err + } + + log.CInfo(ctx, "Redis connection established successfully") + return nil +} diff --git a/tools/component/checks/zookeeper.go b/tools/component/checks/zookeeper.go new file mode 100644 index 000000000..7421c9d9a --- /dev/null +++ b/tools/component/checks/zookeeper.go @@ -0,0 +1,31 @@ +package checks + +import ( + "context" + + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/tools/discovery/zookeeper" + "github.com/openimsdk/tools/log" +) + +type ZookeeperCheck struct { + Zookeeper *config.Zookeeper +} + +func checkZookeeper(ctx context.Context, config *ZookeeperCheck) error { + zkServers := config.Zookeeper.ZkAddr + schema := config.Zookeeper.Schema + + authOption := zookeeper.WithUserNameAndPassword(config.Zookeeper.Username, config.Zookeeper.Password) + + log.CInfo(ctx, "Checking Zookeeper connection", "Schema", schema, "ZkServers", zkServers) + + err := zookeeper.CheckZookeeper(ctx, zkServers, config.Zookeeper.Schema, authOption) + if err != nil { + log.CInfo(ctx, "Zookeeper connection failed", "error", err) + return err + } + + log.CInfo(ctx, "Zookeeper connection established successfully") + return nil +} diff --git a/tools/component/component.go b/tools/component/component.go index 485573837..3acc80d35 100644 --- a/tools/component/component.go +++ b/tools/component/component.go @@ -23,10 +23,10 @@ import ( "strings" "time" - "github.com/openimsdk/open-im-server/v3/pkg/common/kafka" + "github.com/OpenIMSDK/tools/component" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/util" "gopkg.in/yaml.v2" - "github.com/openimsdk/tools/component" "github.com/openimsdk/tools/errs" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -131,10 +131,9 @@ func main() { } } else { checks[index].flag = true - component.SuccessPrint(fmt.Sprintf("%s connected successfully", check.name)) + util.SuccessPrint(fmt.Sprintf("%s connected successfully", check.name)) } } - } if allSuccess { component.SuccessPrint("All components started successfully!") @@ -150,146 +149,6 @@ var errMinioNotEnabled = errors.New("minio.Enable is not configured to use MinIO var errSignEndPoint = errors.New("minio.signEndPoint contains 127.0.0.1, causing issues with image sending") var errApiURL = errors.New("object.apiURL contains 127.0.0.1, causing issues with image sending") -// checkMongo checks the MongoDB connection without retries -func checkMongo(config *config.GlobalConfig) error { - mongoStu := &component.Mongo{ - URL: config.Mongo.Uri, - Address: config.Mongo.Address, - Database: config.Mongo.Database, - Username: config.Mongo.Username, - Password: config.Mongo.Password, - MaxPoolSize: config.Mongo.MaxPoolSize, - } - err := component.CheckMongo(mongoStu) - - return err -} - -// checkRedis checks the Redis connection -func checkRedis(config *config.GlobalConfig) error { - redisStu := &component.Redis{ - Address: config.Redis.Address, - Username: config.Redis.Username, - Password: config.Redis.Password, - } - err := component.CheckRedis(redisStu) - return err -} - -// checkMinio checks the MinIO connection -func checkMinio(config *config.GlobalConfig) error { - if strings.Contains(config.Object.ApiURL, "127.0.0.1") { - return errs.Wrap(errApiURL) - } - if config.Object.Enable != "minio" { - return errs.Wrap(errMinioNotEnabled) - } - if strings.Contains(config.Object.Minio.Endpoint, "127.0.0.1") { - return errs.Wrap(errSignEndPoint) - } - - minio := &component.Minio{ - ApiURL: config.Object.ApiURL, - Endpoint: config.Object.Minio.Endpoint, - AccessKeyID: config.Object.Minio.AccessKeyID, - SecretAccessKey: config.Object.Minio.SecretAccessKey, - SignEndpoint: config.Object.Minio.SignEndpoint, - UseSSL: getEnv("MINIO_USE_SSL", "false"), - } - err := component.CheckMinio(minio) - return err -} - -// checkZookeeper checks the Zookeeper connection -func checkZookeeper(config *config.GlobalConfig) error { - zkStu := &component.Zookeeper{ - Schema: config.Zookeeper.Schema, - ZkAddr: config.Zookeeper.ZkAddr, - Username: config.Zookeeper.Username, - Password: config.Zookeeper.Password, - } - err := component.CheckZookeeper(zkStu) - return err -} - -// checkKafka checks the Kafka connection -func checkKafka(config *config.GlobalConfig) error { - // Prioritize environment variables - kafkaStu := &component.Kafka{ - Username: config.Kafka.Username, - Password: config.Kafka.Password, - Addr: config.Kafka.Addr, - } - - kafkaClient, err := component.CheckKafka(kafkaStu) - if err != nil { - return err - } - defer kafkaClient.Close() - - // Verify if necessary topics exist - topics, err := kafkaClient.Topics() - if err != nil { - return errs.Wrap(err) - } - - requiredTopics := []string{ - config.Kafka.MsgToMongo.Topic, - config.Kafka.MsgToPush.Topic, - config.Kafka.LatestMsgToRedis.Topic, - } - - for _, requiredTopic := range requiredTopics { - if !isTopicPresent(requiredTopic, topics) { - return errs.WrapMsg(nil, "Kafka missing required topic", "topic", requiredTopic, "availableTopics", strings.Join(topics, ", ")) - } - } - - type Item struct { - Topic string - GroupID string - } - - items := []Item{ - { - Topic: config.Kafka.LatestMsgToRedis.Topic, - GroupID: config.Kafka.ConsumerGroupID.MsgToRedis, - }, - - { - Topic: config.Kafka.MsgToMongo.Topic, - GroupID: config.Kafka.ConsumerGroupID.MsgToMongo, - }, - - { - Topic: config.Kafka.MsgToPush.Topic, - GroupID: config.Kafka.ConsumerGroupID.MsgToPush, - }, - } - - for _, item := range items { - cg, err := kafka.NewMConsumerGroup(config.Kafka.Config, item.GroupID, []string{item.Topic}) - if err != nil { - return err - } - if err := cg.Close(); err != nil { - return err - } - } - - return nil -} - -// isTopicPresent checks if a topic is present in the list of topics -func isTopicPresent(topic string, topics []string) bool { - for _, t := range topics { - if t == topic { - return true - } - } - return false -} - func configGetEnv(config *config.GlobalConfig) error { config.Mongo.Uri = getEnv("MONGO_URI", config.Mongo.Uri) config.Mongo.Username = getEnv("MONGO_OPENIM_USERNAME", config.Mongo.Username) diff --git a/tools/component/config/config.go b/tools/component/config/config.go new file mode 100644 index 000000000..b663f62c4 --- /dev/null +++ b/tools/component/config/config.go @@ -0,0 +1,3 @@ +package config + +