Merge branch 'openimsdk:main' into main

pull/1897/head
Seal Bell 2 years ago committed by GitHub
commit 185f78a7e9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -17,6 +17,7 @@ package main
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/OpenIMSDK/tools/errs"
"net" "net"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
@ -28,8 +29,6 @@ import (
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/discoveryregistry" "github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/open-im-server/v3/internal/api" "github.com/openimsdk/open-im-server/v3/internal/api"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
@ -44,55 +43,42 @@ func main() {
apiCmd.AddPortFlag() apiCmd.AddPortFlag()
apiCmd.AddApi(run) apiCmd.AddApi(run)
if err := apiCmd.Execute(); err != nil { if err := apiCmd.Execute(); err != nil {
log.ZError(context.Background(), "API command execution failed", err)
panic(err.Error()) panic(err.Error())
} }
} }
func run(port int, proPort int) error { func run(port int, proPort int) error {
log.ZInfo(context.Background(), "Openim api port:", "port", port, "proPort", proPort)
if port == 0 || proPort == 0 { if port == 0 || proPort == 0 {
err := "port or proPort is empty:" + strconv.Itoa(port) + "," + strconv.Itoa(proPort) err := "port or proPort is empty:" + strconv.Itoa(port) + "," + strconv.Itoa(proPort)
log.ZError(context.Background(), err, nil) return errs.Wrap(fmt.Errorf(err))
return fmt.Errorf(err)
} }
rdb, err := cache.NewRedis() rdb, err := cache.NewRedis()
if err != nil { if err != nil {
log.ZError(context.Background(), "Failed to initialize Redis", err)
return err return err
} }
log.ZInfo(context.Background(), "api start init discov client")
var client discoveryregistry.SvcDiscoveryRegistry var client discoveryregistry.SvcDiscoveryRegistry
// Determine whether zk is passed according to whether it is a clustered deployment // Determine whether zk is passed according to whether it is a clustered deployment
client, err = kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery) client, err = kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery)
if err != nil { if err != nil {
log.ZError(context.Background(), "Failed to initialize discovery register", err) return errs.Wrap(err, "register discovery err")
return err
} }
if err = client.CreateRpcRootNodes(config.Config.GetServiceNames()); err != nil { if err = client.CreateRpcRootNodes(config.Config.GetServiceNames()); err != nil {
log.ZError(context.Background(), "Failed to create RPC root nodes", err) return errs.Wrap(err, "create rpc root nodes error")
return err
} }
log.ZInfo(context.Background(), "api register public config to discov")
if err = client.RegisterConf2Registry(constant.OpenIMCommonConfigKey, config.Config.EncodeConfig()); err != nil { if err = client.RegisterConf2Registry(constant.OpenIMCommonConfigKey, config.Config.EncodeConfig()); err != nil {
log.ZError(context.Background(), "Failed to register public config to discov", err)
return err return err
} }
log.ZInfo(context.Background(), "api register public config to discov success")
router := api.NewGinRouter(client, rdb) router := api.NewGinRouter(client, rdb)
if config.Config.Prometheus.Enable { if config.Config.Prometheus.Enable {
p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api")) p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api"))
p.SetListenAddress(fmt.Sprintf(":%d", proPort)) p.SetListenAddress(fmt.Sprintf(":%d", proPort))
p.Use(router) p.Use(router)
} }
log.ZInfo(context.Background(), "api init router success")
var address string var address string
if config.Config.Api.ListenIP != "" { if config.Config.Api.ListenIP != "" {
@ -100,13 +86,11 @@ func run(port int, proPort int) error {
} else { } else {
address = net.JoinHostPort("0.0.0.0", strconv.Itoa(port)) address = net.JoinHostPort("0.0.0.0", strconv.Itoa(port))
} }
log.ZInfo(context.Background(), "start api server", "address", address, "OpenIM version", config.Version)
server := http.Server{Addr: address, Handler: router} server := http.Server{Addr: address, Handler: router}
go func() { go func() {
err = server.ListenAndServe() err = server.ListenAndServe()
if err != nil && err != http.ErrServerClosed { if err != nil && err != http.ErrServerClosed {
log.ZError(context.Background(), "api run failed", err, "address", address)
os.Exit(1) os.Exit(1)
} }
}() }()
@ -120,7 +104,6 @@ func run(port int, proPort int) error {
// graceful shutdown operation. // graceful shutdown operation.
if err := server.Shutdown(ctx); err != nil { if err := server.Shutdown(ctx); err != nil {
log.ZError(context.Background(), "failed to api-server shutdown", err)
return err return err
} }

@ -305,6 +305,7 @@ Feel free to explore the MinIO documentation for more advanced configurations an
This section involves setting up MongoDB, including its port, address, and credentials. This section involves setting up MongoDB, including its port, address, and credentials.
| Parameter | Example Value | Description | | Parameter | Example Value | Description |
| -------------- | -------------- | ----------------------- | | -------------- | -------------- | ----------------------- |
| MONGO_PORT | "27017" | Port used by MongoDB. | | MONGO_PORT | "27017" | Port used by MongoDB. |

@ -5,7 +5,7 @@ go 1.19
require ( require (
firebase.google.com/go v3.13.0+incompatible firebase.google.com/go v3.13.0+incompatible
github.com/OpenIMSDK/protocol v0.0.48 github.com/OpenIMSDK/protocol v0.0.48
github.com/OpenIMSDK/tools v0.0.29 github.com/OpenIMSDK/tools v0.0.31
github.com/bwmarrin/snowflake v0.3.0 // indirect github.com/bwmarrin/snowflake v0.3.0 // indirect
github.com/dtm-labs/rockscache v0.1.1 github.com/dtm-labs/rockscache v0.1.1
github.com/gin-gonic/gin v1.9.1 github.com/gin-gonic/gin v1.9.1
@ -31,7 +31,7 @@ require (
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
) )
require github.com/google/uuid v1.3.1 require github.com/google/uuid v1.5.0
require ( require (
github.com/IBM/sarama v1.41.3 github.com/IBM/sarama v1.41.3
@ -94,8 +94,8 @@ require (
github.com/jinzhu/now v1.1.5 // indirect github.com/jinzhu/now v1.1.5 // indirect
github.com/json-iterator/go v1.1.12 // indirect github.com/json-iterator/go v1.1.12 // indirect
github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd // indirect github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd // indirect
github.com/klauspost/compress v1.16.7 // indirect github.com/klauspost/compress v1.17.4 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/leodido/go-urn v1.2.4 // indirect github.com/leodido/go-urn v1.2.4 // indirect
github.com/lithammer/shortuuid v3.0.0+incompatible // indirect github.com/lithammer/shortuuid v3.0.0+incompatible // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
@ -129,7 +129,7 @@ require (
go.uber.org/atomic v1.7.0 // indirect go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.3.0 // indirect golang.org/x/arch v0.3.0 // indirect
golang.org/x/net v0.17.0 // indirect golang.org/x/net v0.19.0 // indirect
golang.org/x/oauth2 v0.13.0 // indirect golang.org/x/oauth2 v0.13.0 // indirect
golang.org/x/sys v0.15.0 // indirect golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect golang.org/x/text v0.14.0 // indirect
@ -141,7 +141,7 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a // indirect
gopkg.in/src-d/go-billy.v4 v4.3.2 // indirect gopkg.in/src-d/go-billy.v4 v4.3.2 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect
gorm.io/gorm v1.23.8 // indirect gorm.io/gorm v1.25.4 // indirect
stathat.com/c/consistent v1.0.0 // indirect stathat.com/c/consistent v1.0.0 // indirect
) )

@ -20,8 +20,8 @@ github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c=
github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ=
github.com/OpenIMSDK/protocol v0.0.48 h1:8MIMjyzJRsruYhVv2ZKArFiOveroaofDOb3dlAdgjsw= github.com/OpenIMSDK/protocol v0.0.48 h1:8MIMjyzJRsruYhVv2ZKArFiOveroaofDOb3dlAdgjsw=
github.com/OpenIMSDK/protocol v0.0.48/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= github.com/OpenIMSDK/protocol v0.0.48/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/OpenIMSDK/tools v0.0.29 h1:NS4PEwYl9sX3SWsMjDOLVxMo3LcTWREMr+2cjzWjcqc= github.com/OpenIMSDK/tools v0.0.31 h1:fSrhcPTvHEMTSyrJZDupe730mL4nuhvSOUP/BaZiHaY=
github.com/OpenIMSDK/tools v0.0.29/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= github.com/OpenIMSDK/tools v0.0.31/go.mod h1:wBfR5CYmEyvxl03QJbTkhz1CluK6J4/lX0lviu8JAjE=
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs=
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs=
@ -152,8 +152,8 @@ github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o=
github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw= github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/enterprise-certificate-proxy v0.3.1 h1:SBWmZhjUDRorQxrN0nwzf+AHBxnbFjViHQS4P0yVpmQ= github.com/googleapis/enterprise-certificate-proxy v0.3.1 h1:SBWmZhjUDRorQxrN0nwzf+AHBxnbFjViHQS4P0yVpmQ=
github.com/googleapis/enterprise-certificate-proxy v0.3.1/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= github.com/googleapis/enterprise-certificate-proxy v0.3.1/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0=
github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas= github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas=
@ -194,7 +194,6 @@ github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg=
github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4= github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4=
@ -205,12 +204,12 @@ github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd/go.mod h1:CT
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
@ -402,8 +401,8 @@ golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.13.0 h1:jDDenyj+WgFtmV3zYVoi8aE2BwtXFLWOA67ZfNWftiY= golang.org/x/oauth2 v0.13.0 h1:jDDenyj+WgFtmV3zYVoi8aE2BwtXFLWOA67ZfNWftiY=
golang.org/x/oauth2 v0.13.0/go.mod h1:/JMhi4ZRXAf4HG9LiNmxvk+45+96RUlVThiH8FzNBn0= golang.org/x/oauth2 v0.13.0/go.mod h1:/JMhi4ZRXAf4HG9LiNmxvk+45+96RUlVThiH8FzNBn0=
@ -531,8 +530,8 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/gorm v1.23.8 h1:h8sGJ+biDgBA1AD1Ha9gFCx7h8npU7AsLdlkX0n2TpE= gorm.io/gorm v1.25.4 h1:iyNd8fNAe8W9dvtlgeRI5zSVZPsq3OpcTu37cYcpCmw=
gorm.io/gorm v1.23.8/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk= gorm.io/gorm v1.25.4/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

@ -15,8 +15,10 @@
package msgtransfer package msgtransfer
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"github.com/OpenIMSDK/tools/errs"
"log" "log"
"net/http" "net/http"
"sync" "sync"
@ -69,40 +71,47 @@ func StartTransfer(prometheusPort int) error {
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
msgModel := cache.NewMsgCacheModel(rdb) msgModel := cache.NewMsgCacheModel(rdb)
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase()) msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase())
msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, msgModel) msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel)
if err != nil {
return err
}
conversationRpcClient := rpcclient.NewConversationRpcClient(client) conversationRpcClient := rpcclient.NewConversationRpcClient(client)
groupRpcClient := rpcclient.NewGroupRpcClient(client) groupRpcClient := rpcclient.NewGroupRpcClient(client)
msgTransfer := NewMsgTransfer(msgDatabase, &conversationRpcClient, &groupRpcClient) msgTransfer, err := NewMsgTransfer(msgDatabase, &conversationRpcClient, &groupRpcClient)
if err != nil {
return err
}
return msgTransfer.Start(prometheusPort) return msgTransfer.Start(prometheusPort)
} }
func NewMsgTransfer(msgDatabase controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) *MsgTransfer { func NewMsgTransfer(msgDatabase controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*MsgTransfer, error) {
return &MsgTransfer{ historyCH, err := NewOnlineHistoryRedisConsumerHandler(msgDatabase, conversationRpcClient, groupRpcClient)
historyCH: NewOnlineHistoryRedisConsumerHandler(msgDatabase, conversationRpcClient, groupRpcClient), if err != nil {
historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase), return nil, err
} }
historyMongoCH, err := NewOnlineHistoryMongoConsumerHandler(msgDatabase)
if err != nil {
return nil, err
}
return &MsgTransfer{
historyCH: historyCH,
historyMongoCH: historyMongoCH,
}, nil
} }
func (m *MsgTransfer) Start(prometheusPort int) error { func (m *MsgTransfer) Start(prometheusPort int) error {
ctx := context.Background()
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
fmt.Println("start msg transfer", "prometheusPort:", prometheusPort) fmt.Println("start msg transfer", "prometheusPort:", prometheusPort)
if prometheusPort <= 0 { if prometheusPort <= 0 {
return errors.New("prometheusPort not correct") return errs.Wrap(errors.New("prometheusPort not correct"))
}
if config.Config.ChatPersistenceMysql {
// go m.persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(m.persistentCH)
} else {
fmt.Println("msg transfer not start mysql consumer")
} }
go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyCH)
go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyMongoCH) go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(ctx, m.historyCH)
// go m.modifyCH.modifyMsgConsumerGroup.RegisterHandleAndConsumer(m.modifyCH) go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(ctx, m.historyMongoCH)
/*err := prome.StartPrometheusSrv(prometheusPort)
if err != nil {
return err
}*/
////////////////////////////
if config.Config.Prometheus.Enable { if config.Config.Prometheus.Enable {
reg := prometheus.NewRegistry() reg := prometheus.NewRegistry()
reg.MustRegister( reg.MustRegister(

@ -87,7 +87,7 @@ func NewOnlineHistoryRedisConsumerHandler(
database controller.CommonMsgDatabase, database controller.CommonMsgDatabase,
conversationRpcClient *rpcclient.ConversationRpcClient, conversationRpcClient *rpcclient.ConversationRpcClient,
groupRpcClient *rpcclient.GroupRpcClient, groupRpcClient *rpcclient.GroupRpcClient,
) *OnlineHistoryRedisConsumerHandler { ) (*OnlineHistoryRedisConsumerHandler, error) {
var och OnlineHistoryRedisConsumerHandler var och OnlineHistoryRedisConsumerHandler
och.msgDatabase = database och.msgDatabase = database
och.msgDistributionCh = make(chan Cmd2Value) // no buffer channel och.msgDistributionCh = make(chan Cmd2Value) // no buffer channel
@ -98,14 +98,15 @@ func NewOnlineHistoryRedisConsumerHandler(
} }
och.conversationRpcClient = conversationRpcClient och.conversationRpcClient = conversationRpcClient
och.groupRpcClient = groupRpcClient och.groupRpcClient = groupRpcClient
och.historyConsumerGroup = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{ var err error
och.historyConsumerGroup, err = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{
KafkaVersion: sarama.V2_0_0_0, KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false,
}, []string{config.Config.Kafka.LatestMsgToRedis.Topic}, }, []string{config.Config.Kafka.LatestMsgToRedis.Topic},
config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis) config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis)
// statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d // statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d
// second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) // second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
return &och return &och, err
} }
func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {

@ -34,16 +34,21 @@ type OnlineHistoryMongoConsumerHandler struct {
msgDatabase controller.CommonMsgDatabase msgDatabase controller.CommonMsgDatabase
} }
func NewOnlineHistoryMongoConsumerHandler(database controller.CommonMsgDatabase) *OnlineHistoryMongoConsumerHandler { func NewOnlineHistoryMongoConsumerHandler(database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) {
historyConsumerGroup, err := kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{
KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false,
}, []string{config.Config.Kafka.MsgToMongo.Topic},
config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo)
if err != nil {
return nil, err
}
mc := &OnlineHistoryMongoConsumerHandler{ mc := &OnlineHistoryMongoConsumerHandler{
historyConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{ historyConsumerGroup: historyConsumerGroup,
KafkaVersion: sarama.V2_0_0_0, msgDatabase: database,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false,
}, []string{config.Config.Kafka.MsgToMongo.Topic},
config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo),
msgDatabase: database,
} }
return mc return mc, nil
} }
func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo( func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(

@ -14,19 +14,24 @@
package push package push
import "context"
type Consumer struct { type Consumer struct {
pushCh ConsumerHandler pushCh ConsumerHandler
successCount uint64 successCount uint64
} }
func NewConsumer(pusher *Pusher) *Consumer { func NewConsumer(pusher *Pusher) (*Consumer, error) {
return &Consumer{ c, err := NewConsumerHandler(pusher)
pushCh: *NewConsumerHandler(pusher), if err != nil {
return nil, err
} }
return &Consumer{
pushCh: *c,
}, nil
} }
func (c *Consumer) Start() { func (c *Consumer) Start() {
// statistics.NewStatistics(&c.successCount, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to
// msg_gateway count", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) go c.pushCh.pushConsumerGroup.RegisterHandleAndConsumer(context.Background(), &c.pushCh)
go c.pushCh.pushConsumerGroup.RegisterHandleAndConsumer(&c.pushCh)
} }

@ -35,15 +35,19 @@ type ConsumerHandler struct {
pusher *Pusher pusher *Pusher
} }
func NewConsumerHandler(pusher *Pusher) *ConsumerHandler { func NewConsumerHandler(pusher *Pusher) (*ConsumerHandler, error) {
var consumerHandler ConsumerHandler var consumerHandler ConsumerHandler
consumerHandler.pusher = pusher consumerHandler.pusher = pusher
consumerHandler.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{ var err error
consumerHandler.pushConsumerGroup, err = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{
KafkaVersion: sarama.V2_0_0_0, KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false,
}, []string{config.Config.Kafka.MsgToPush.Topic}, config.Config.Kafka.Addr, }, []string{config.Config.Kafka.MsgToPush.Topic}, config.Config.Kafka.Addr,
config.Config.Kafka.ConsumerGroupID.MsgToPush) config.Config.Kafka.ConsumerGroupID.MsgToPush)
return &consumerHandler if err != nil {
return nil, err
}
return &consumerHandler, nil
} }
func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {

@ -66,9 +66,12 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
pusher: pusher, pusher: pusher,
}) })
}() }()
consumer, err := NewConsumer(pusher)
if err != nil {
return err
}
go func() { go func() {
defer wg.Done() defer wg.Done()
consumer := NewConsumer(pusher)
consumer.Start() consumer.Start()
}() }()
wg.Wait() wg.Wait()

@ -80,7 +80,10 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
userRpcClient := rpcclient.NewUserRpcClient(client) userRpcClient := rpcclient.NewUserRpcClient(client)
groupRpcClient := rpcclient.NewGroupRpcClient(client) groupRpcClient := rpcclient.NewGroupRpcClient(client)
friendRpcClient := rpcclient.NewFriendRpcClient(client) friendRpcClient := rpcclient.NewFriendRpcClient(client)
msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, cacheModel) msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, cacheModel)
if err != nil {
return err
}
s := &msgServer{ s := &msgServer{
Conversation: &conversationClient, Conversation: &conversationClient,
User: &userRpcClient, User: &userRpcClient,

@ -17,6 +17,7 @@ package tools
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/OpenIMSDK/tools/errs"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
@ -25,14 +26,13 @@ import (
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3" "github.com/robfig/cron/v3"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
) )
func StartTask() error { func StartTask() error {
fmt.Println("cron task start, config", config.Config.ChatRecordsClearTime) fmt.Println("cron task start, config", config.Config.ChatRecordsClearTime)
msgTool, err := InitMsgTool() msgTool, err := InitMsgTool()
if err != nil { if err != nil {
return err return err
@ -47,18 +47,16 @@ func StartTask() error {
// register cron tasks // register cron tasks
var crontab = cron.New() var crontab = cron.New()
log.ZInfo(context.Background(), "start chatRecordsClearTime cron task", "cron config", config.Config.ChatRecordsClearTime) fmt.Println("start chatRecordsClearTime cron task", "cron config", config.Config.ChatRecordsClearTime)
_, err = crontab.AddFunc(config.Config.ChatRecordsClearTime, cronWrapFunc(rdb, "cron_clear_msg_and_fix_seq", msgTool.AllConversationClearMsgAndFixSeq)) _, err = crontab.AddFunc(config.Config.ChatRecordsClearTime, cronWrapFunc(rdb, "cron_clear_msg_and_fix_seq", msgTool.AllConversationClearMsgAndFixSeq))
if err != nil { if err != nil {
log.ZError(context.Background(), "start allConversationClearMsgAndFixSeq cron failed", err) return errs.Wrap(err)
panic(err)
} }
log.ZInfo(context.Background(), "start msgDestruct cron task", "cron config", config.Config.MsgDestructTime) fmt.Println("start msgDestruct cron task", "cron config", config.Config.MsgDestructTime)
_, err = crontab.AddFunc(config.Config.MsgDestructTime, cronWrapFunc(rdb, "cron_conversations_destruct_msgs", msgTool.ConversationsDestructMsgs)) _, err = crontab.AddFunc(config.Config.MsgDestructTime, cronWrapFunc(rdb, "cron_conversations_destruct_msgs", msgTool.ConversationsDestructMsgs))
if err != nil { if err != nil {
log.ZError(context.Background(), "start conversationsDestructMsgs cron failed", err) return errs.Wrap(err)
panic(err)
} }
// start crontab // start crontab

@ -84,7 +84,10 @@ func InitMsgTool() (*MsgTool, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
msgDatabase := controller.InitCommonMsgDatabase(rdb, mongo.GetDatabase()) msgDatabase, err := controller.InitCommonMsgDatabase(rdb, mongo.GetDatabase())
if err != nil {
return nil, err
}
userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase()) userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase())
ctxTx := tx.NewMongo(mongo.GetClient()) ctxTx := tx.NewMongo(mongo.GetClient())
userDatabase := controller.NewUserDatabase( userDatabase := controller.NewUserDatabase(

@ -49,7 +49,7 @@ func NewRedis() (redis.UniversalClient, error) {
overrideConfigFromEnv() overrideConfigFromEnv()
if len(config.Config.Redis.Address) == 0 { if len(config.Config.Redis.Address) == 0 {
return nil, errors.New("redis address is empty") return nil, errs.Wrap(errors.New("redis address is empty"))
} }
specialerror.AddReplace(redis.Nil, errs.ErrRecordNotFound) specialerror.AddReplace(redis.Nil, errs.ErrRecordNotFound)
var rdb redis.UniversalClient var rdb redis.UniversalClient
@ -77,7 +77,7 @@ func NewRedis() (redis.UniversalClient, error) {
defer cancel() defer cancel()
err = rdb.Ping(ctx).Err() err = rdb.Ping(ctx).Err()
if err != nil { if err != nil {
return nil, fmt.Errorf("redis ping %w", err) return nil, errs.Wrap(fmt.Errorf("redis ping %w", err))
} }
redisClient = rdb redisClient = rdb

@ -126,21 +126,32 @@ type CommonMsgDatabase interface {
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
} }
func NewCommonMsgDatabase(msgDocModel unrelationtb.MsgDocModelInterface, cacheModel cache.MsgModel) CommonMsgDatabase { func NewCommonMsgDatabase(msgDocModel unrelationtb.MsgDocModelInterface, cacheModel cache.MsgModel) (CommonMsgDatabase, error) {
producerToRedis, err := kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.LatestMsgToRedis.Topic)
if err != nil {
return nil, err
}
producerToMongo, err := kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.MsgToMongo.Topic)
if err != nil {
return nil, err
}
producerToPush, err := kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.MsgToPush.Topic)
if err != nil {
return nil, err
}
return &commonMsgDatabase{ return &commonMsgDatabase{
msgDocDatabase: msgDocModel, msgDocDatabase: msgDocModel,
cache: cacheModel, cache: cacheModel,
producer: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.LatestMsgToRedis.Topic), producer: producerToRedis,
producerToMongo: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.MsgToMongo.Topic), producerToMongo: producerToMongo,
producerToPush: kafka.NewKafkaProducer(config.Config.Kafka.Addr, config.Config.Kafka.MsgToPush.Topic), producerToPush: producerToPush,
} }, nil
} }
func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database) CommonMsgDatabase { func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database) (CommonMsgDatabase, error) {
cacheModel := cache.NewMsgCacheModel(rdb) cacheModel := cache.NewMsgCacheModel(rdb)
msgDocModel := unrelation.NewMsgMongoDriver(database) msgDocModel := unrelation.NewMsgMongoDriver(database)
CommonMsgDatabase := NewCommonMsgDatabase(msgDocModel, cacheModel) return NewCommonMsgDatabase(msgDocModel, cacheModel)
return CommonMsgDatabase
} }
type commonMsgDatabase struct { type commonMsgDatabase struct {

@ -16,6 +16,7 @@ package mgo
import ( import (
"context" "context"
"github.com/OpenIMSDK/tools/errs"
"time" "time"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
@ -38,7 +39,7 @@ func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) {
Options: options.Index().SetUnique(true), Options: options.Index().SetUnique(true),
}) })
if err != nil { if err != nil {
return nil, err return nil, errs.Wrap(err)
} }
return &ConversationMgo{coll: coll}, nil return &ConversationMgo{coll: coll}, nil
} }

@ -16,6 +16,7 @@ package mgo
import ( import (
"context" "context"
"github.com/OpenIMSDK/tools/errs"
"time" "time"
"github.com/OpenIMSDK/tools/mgoutil" "github.com/OpenIMSDK/tools/mgoutil"
@ -36,7 +37,7 @@ func NewGroupMongo(db *mongo.Database) (relation.GroupModelInterface, error) {
Options: options.Index().SetUnique(true), Options: options.Index().SetUnique(true),
}) })
if err != nil { if err != nil {
return nil, err return nil, errs.Wrap(err)
} }
return &GroupMgo{coll: coll}, nil return &GroupMgo{coll: coll}, nil
} }

@ -16,6 +16,7 @@ package mgo
import ( import (
"context" "context"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/mgoutil" "github.com/OpenIMSDK/tools/mgoutil"
@ -37,7 +38,7 @@ func NewGroupMember(db *mongo.Database) (relation.GroupMemberModelInterface, err
Options: options.Index().SetUnique(true), Options: options.Index().SetUnique(true),
}) })
if err != nil { if err != nil {
return nil, err return nil, errs.Wrap(err)
} }
return &GroupMemberMgo{coll: coll}, nil return &GroupMemberMgo{coll: coll}, nil
} }

@ -16,6 +16,7 @@ package mgo
import ( import (
"context" "context"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/mgoutil" "github.com/OpenIMSDK/tools/mgoutil"
"github.com/OpenIMSDK/tools/pagination" "github.com/OpenIMSDK/tools/pagination"
@ -36,7 +37,7 @@ func NewGroupRequestMgo(db *mongo.Database) (relation.GroupRequestModelInterface
Options: options.Index().SetUnique(true), Options: options.Index().SetUnique(true),
}) })
if err != nil { if err != nil {
return nil, err return nil, errs.Wrap(err)
} }
return &GroupRequestMgo{coll: coll}, nil return &GroupRequestMgo{coll: coll}, nil
} }

@ -40,7 +40,7 @@ func NewUserMongo(db *mongo.Database) (relation.UserModelInterface, error) {
Options: options.Index().SetUnique(true), Options: options.Index().SetUnique(true),
}) })
if err != nil { if err != nil {
return nil, err return nil, errs.Wrap(err)
} }
return &UserMgo{coll: coll}, nil return &UserMgo{coll: coll}, nil
} }

@ -27,8 +27,6 @@ import (
"github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/mw/specialerror" "github.com/OpenIMSDK/tools/mw/specialerror"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
) )
@ -63,9 +61,9 @@ func NewMongo() (*Mongo, error) {
time.Sleep(time.Second) // exponential backoff could be implemented here time.Sleep(time.Second) // exponential backoff could be implemented here
continue continue
} }
return nil, err return nil, errs.Wrap(err)
} }
return nil, err return nil, errs.Wrap(err)
} }
func buildMongoURI() string { func buildMongoURI() string {
@ -150,7 +148,7 @@ func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...strin
_, err := indexView.CreateOne(context.Background(), index, opts) _, err := indexView.CreateOne(context.Background(), index, opts)
if err != nil { if err != nil {
return utils.Wrap(err, "CreateIndex") return errs.Wrap(err, "CreateIndex")
} }
return nil return nil
} }

@ -16,6 +16,8 @@ package kafka
import ( import (
"context" "context"
"github.com/OpenIMSDK/tools/errs"
"strings"
"github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/log"
@ -36,7 +38,7 @@ type MConsumerGroupConfig struct {
IsReturnErr bool IsReturnErr bool
} }
func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string) *MConsumerGroup { func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string) (*MConsumerGroup, error) {
consumerGroupConfig := sarama.NewConfig() consumerGroupConfig := sarama.NewConfig()
consumerGroupConfig.Version = consumerConfig.KafkaVersion consumerGroupConfig.Version = consumerConfig.KafkaVersion
consumerGroupConfig.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial consumerGroupConfig.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial
@ -49,26 +51,28 @@ func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []str
SetupTLSConfig(consumerGroupConfig) SetupTLSConfig(consumerGroupConfig)
consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, consumerGroupConfig) consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, consumerGroupConfig)
if err != nil { if err != nil {
panic(err.Error()) return nil, errs.Wrap(err, strings.Join(topics, ","), strings.Join(addrs, ","), groupID)
} }
return &MConsumerGroup{ return &MConsumerGroup{
consumerGroup, consumerGroup,
groupID, groupID,
topics, topics,
} }, nil
} }
func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context { func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context {
return GetContextWithMQHeader(cMsg.Headers) return GetContextWithMQHeader(cMsg.Headers)
} }
func (mc *MConsumerGroup) RegisterHandleAndConsumer(handler sarama.ConsumerGroupHandler) { func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler) {
log.ZDebug(context.Background(), "register consumer group", "groupID", mc.groupID) log.ZDebug(context.Background(), "register consumer group", "groupID", mc.groupID)
ctx := context.Background()
for { for {
err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler) err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler)
if err != nil { if err != nil {
panic(err.Error()) log.ZWarn(ctx, "consume err", err, "topic", mc.topics, "groupID", mc.groupID)
}
if ctx.Err() != nil {
return
} }
} }
} }

@ -18,6 +18,7 @@ import (
"bytes" "bytes"
"context" "context"
"errors" "errors"
"github.com/OpenIMSDK/tools/errs"
"strings" "strings"
"time" "time"
@ -44,7 +45,7 @@ type Producer struct {
} }
// NewKafkaProducer initializes a new Kafka producer. // NewKafkaProducer initializes a new Kafka producer.
func NewKafkaProducer(addr []string, topic string) *Producer { func NewKafkaProducer(addr []string, topic string) (*Producer, error) {
p := Producer{ p := Producer{
addr: addr, addr: addr,
topic: topic, topic: topic,
@ -87,17 +88,17 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
for i := 0; i <= maxRetry; i++ { for i := 0; i <= maxRetry; i++ {
p.producer, err = sarama.NewSyncProducer(p.addr, p.config) p.producer, err = sarama.NewSyncProducer(p.addr, p.config)
if err == nil { if err == nil {
return &p return &p, nil
} }
time.Sleep(1 * time.Second) // Wait before retrying time.Sleep(1 * time.Second) // Wait before retrying
} }
// Panic if unable to create producer after retries // Panic if unable to create producer after retries
if err != nil { if err != nil {
panic("Failed to create Kafka producer: " + err.Error()) return nil, errs.Wrap(errors.New("failed to create Kafka producer: " + err.Error()))
} }
return &p return &p, nil
} }
// configureProducerAck configures the producer's acknowledgement level. // configureProducerAck configures the producer's acknowledgement level.

@ -17,6 +17,7 @@ package startrpc
import ( import (
"errors" "errors"
"fmt" "fmt"
"github.com/OpenIMSDK/tools/errs"
"log" "log"
"net" "net"
"net/http" "net/http"
@ -43,7 +44,6 @@ import (
"github.com/OpenIMSDK/tools/discoveryregistry" "github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/mw" "github.com/OpenIMSDK/tools/mw"
"github.com/OpenIMSDK/tools/network" "github.com/OpenIMSDK/tools/network"
"github.com/OpenIMSDK/tools/utils"
) )
// Start rpc server. // Start rpc server.
@ -61,20 +61,20 @@ func Start(
net.JoinHostPort(network.GetListenIP(config.Config.Rpc.ListenIP), strconv.Itoa(rpcPort)), net.JoinHostPort(network.GetListenIP(config.Config.Rpc.ListenIP), strconv.Itoa(rpcPort)),
) )
if err != nil { if err != nil {
return err return errs.Wrap(err, network.GetListenIP(config.Config.Rpc.ListenIP), strconv.Itoa(rpcPort))
} }
defer listener.Close() defer listener.Close()
client, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery) client, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery)
if err != nil { if err != nil {
return utils.Wrap1(err) return errs.Wrap(err)
} }
defer client.Close() defer client.Close()
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
registerIP, err := network.GetRpcRegisterIP(config.Config.Rpc.RegisterIP) registerIP, err := network.GetRpcRegisterIP(config.Config.Rpc.RegisterIP)
if err != nil { if err != nil {
return err return errs.Wrap(err)
} }
var reg *prometheus.Registry var reg *prometheus.Registry
@ -96,7 +96,7 @@ func Start(
err = rpcFn(client, srv) err = rpcFn(client, srv)
if err != nil { if err != nil {
return utils.Wrap1(err) return errs.Wrap(err)
} }
err = client.Register( err = client.Register(
rpcRegisterName, rpcRegisterName,
@ -105,7 +105,7 @@ func Start(
grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithTransportCredentials(insecure.NewCredentials()),
) )
if err != nil { if err != nil {
return utils.Wrap1(err) return errs.Wrap(err)
} }
var wg errgroup.Group var wg errgroup.Group
@ -123,7 +123,7 @@ func Start(
}) })
wg.Go(func() error { wg.Go(func() error {
return utils.Wrap1(srv.Serve(listener)) return errs.Wrap(srv.Serve(listener))
}) })
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)
@ -146,7 +146,7 @@ func Start(
return gerr return gerr
case <-time.After(15 * time.Second): case <-time.After(15 * time.Second):
return utils.Wrap1(errors.New("timeout exit")) return errs.Wrap(errors.New("timeout exit"))
} }
} }

@ -20,7 +20,6 @@ import (
"github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/credentials"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/OpenIMSDK/protocol/third" "github.com/OpenIMSDK/protocol/third"

@ -15,44 +15,27 @@
package main package main
import ( import (
"context"
"errors" "errors"
"flag" "flag"
"fmt" "fmt"
"net"
"net/url"
"os" "os"
"strings" "strings"
"time" "time"
"github.com/IBM/sarama" "github.com/OpenIMSDK/tools/component"
"github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/errs"
"github.com/go-zookeeper/zk"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/redis/go-redis/v9"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
) )
const ( const (
// defaultCfgPath is the default path of the configuration file. // defaultCfgPath is the default path of the configuration file.
defaultCfgPath = "../../../../../config/config.yaml" defaultCfgPath = "../../../../../config/config.yaml"
minioHealthCheckDuration = 1 maxRetry = 300
maxRetry = 300 componentStartErrCode = 6000
componentStartErrCode = 6000 configErrCode = 6001
configErrCode = 6001
mongoConnTimeout = 30 * time.Second
)
const (
colorRed = 31
colorGreen = 32
colorYellow = 33
) )
var ( var (
@ -103,16 +86,16 @@ func main() {
for _, check := range checks { for _, check := range checks {
str, err := check.function() str, err := check.function()
if err != nil { if err != nil {
errorPrint(fmt.Sprintf("Starting %s failed, %v", check.name, err)) component.ErrorPrint(fmt.Sprintf("Starting %s failed, %v", check.name, err))
allSuccess = false allSuccess = false
break break
} else { } else {
successPrint(fmt.Sprintf("%s connected successfully, %s", check.name, str)) component.SuccessPrint(fmt.Sprintf("%s connected successfully, %s", check.name, str))
} }
} }
if allSuccess { if allSuccess {
successPrint("All components started successfully!") component.SuccessPrint("All components started successfully!")
return return
} }
@ -120,19 +103,6 @@ func main() {
os.Exit(1) os.Exit(1)
} }
func exactIP(urll string) string {
u, _ := url.Parse(urll)
host, _, err := net.SplitHostPort(u.Host)
if err != nil {
host = u.Host
}
if strings.HasSuffix(host, ":") {
host = host[0 : len(host)-1]
}
return host
}
// Helper function to get environment variable or default value // Helper function to get environment variable or default value
func getEnv(key, fallback string) string { func getEnv(key, fallback string) string {
if value, exists := os.LookupEnv(key); exists { if value, exists := os.LookupEnv(key); exists {
@ -143,45 +113,23 @@ func getEnv(key, fallback string) string {
// checkMongo checks the MongoDB connection without retries // checkMongo checks the MongoDB connection without retries
func checkMongo() (string, error) { func checkMongo() (string, error) {
uri := getEnv("MONGO_URI", buildMongoURI()) mongo := &component.Mongo{
Address: config.Config.Mongo.Address,
ctx, cancel := context.WithTimeout(context.Background(), mongoConnTimeout) Database: config.Config.Mongo.Database,
defer cancel() Username: config.Config.Mongo.Username,
Password: config.Config.Mongo.Password,
str := "ths addr is:" + strings.Join(config.Config.Mongo.Address, ",") MaxPoolSize: config.Config.Mongo.MaxPoolSize,
client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri))
if err != nil {
return "", errs.Wrap(errStr(err, str))
} }
defer client.Disconnect(context.Background()) uri, uriExist := os.LookupEnv("MONGO_URI")
if uriExist {
ctx, cancel = context.WithTimeout(context.Background(), mongoConnTimeout) mongo.URL = uri
defer cancel()
if err = client.Ping(ctx, nil); err != nil {
return "", errs.Wrap(errStr(err, str))
} }
return str, nil str, err := component.CheckMongo(mongo)
} if err != nil {
return "", err
// buildMongoURI constructs the MongoDB URI using configuration settings
func buildMongoURI() string {
// Fallback to config if environment variables are not set
username := config.Config.Mongo.Username
password := config.Config.Mongo.Password
database := config.Config.Mongo.Database
maxPoolSize := config.Config.Mongo.MaxPoolSize
mongodbHosts := strings.Join(config.Config.Mongo.Address, ",")
if username != "" && password != "" {
return fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d",
username, password, mongodbHosts, database, maxPoolSize)
} }
return fmt.Sprintf("mongodb://%s/%s?maxPoolSize=%d", return str, nil
mongodbHosts, database, maxPoolSize)
} }
// checkMinio checks the MinIO connection // checkMinio checks the MinIO connection
@ -191,52 +139,24 @@ func checkMinio() (string, error) {
return "", nil return "", nil
} }
// Prioritize environment variables endpoint, err := getMinioAddr("MINIO_ENDPOINT", "MINIO_ADDRESS", "MINIO_PORT", config.Config.Object.Minio.Endpoint)
endpoint := getEnv("MINIO_ENDPOINT", config.Config.Object.Minio.Endpoint)
accessKeyID := getEnv("MINIO_ACCESS_KEY_ID", config.Config.Object.Minio.AccessKeyID)
secretAccessKey := getEnv("MINIO_SECRET_ACCESS_KEY", config.Config.Object.Minio.SecretAccessKey)
useSSL := getEnv("MINIO_USE_SSL", "false") // Assuming SSL is not used by default
if endpoint == "" || accessKeyID == "" || secretAccessKey == "" {
return "", ErrConfig.Wrap("MinIO configuration missing")
}
// Parse endpoint URL to determine if SSL is enabled
u, err := url.Parse(endpoint)
if err != nil { if err != nil {
str := "the endpoint is:" + endpoint return "", err
return "", errs.Wrap(errStr(err, str))
}
secure := u.Scheme == "https" || useSSL == "true"
// Initialize MinIO client
minioClient, err := minio.New(u.Host, &minio.Options{
Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""),
Secure: secure,
})
str := "ths addr is:" + u.Host
if err != nil {
strs := fmt.Sprintf("%v;host:%s,accessKeyID:%s,secretAccessKey:%s,Secure:%v", err, u.Host, accessKeyID, secretAccessKey, secure)
return "", errs.Wrap(err, strs)
} }
// Perform health check minio := &component.Minio{
cancel, err := minioClient.HealthCheck(time.Duration(minioHealthCheckDuration) * time.Second) ApiURL: config.Config.Object.ApiURL,
if err != nil { Endpoint: endpoint,
return "", errs.Wrap(errStr(err, str)) AccessKeyID: getEnv("MINIO_ACCESS_KEY_ID", config.Config.Object.Minio.AccessKeyID),
SecretAccessKey: getEnv("MINIO_SECRET_ACCESS_KEY", config.Config.Object.Minio.SecretAccessKey),
SignEndpoint: config.Config.Object.Minio.SignEndpoint,
UseSSL: getEnv("MINIO_USE_SSL", "false"),
} }
defer cancel()
if minioClient.IsOffline() { str, err := component.CheckMinio(minio)
str := fmt.Sprintf("Minio server is offline;%s", str) if err != nil {
return "", ErrComponentStart.Wrap(str) return "", err
}
// Check for localhost in API URL and Minio SignEndpoint
if exactIP(config.Config.Object.ApiURL) == "127.0.0.1" || exactIP(config.Config.Object.Minio.SignEndpoint) == "127.0.0.1" {
return "", ErrConfig.Wrap("apiURL or Minio SignEndpoint endpoint contain 127.0.0.1")
} }
return str, nil return str, nil
} }
@ -247,76 +167,48 @@ func checkRedis() (string, error) {
username := getEnv("REDIS_USERNAME", config.Config.Redis.Username) username := getEnv("REDIS_USERNAME", config.Config.Redis.Username)
password := getEnv("REDIS_PASSWORD", config.Config.Redis.Password) password := getEnv("REDIS_PASSWORD", config.Config.Redis.Password)
// Split address to handle multiple addresses for cluster setup redis := &component.Redis{
redisAddresses := strings.Split(address, ",") Address: strings.Split(address, ","),
Username: username,
var redisClient redis.UniversalClient Password: password,
if len(redisAddresses) > 1 {
// Use cluster client for multiple addresses
redisClient = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: redisAddresses,
Username: username,
Password: password,
})
} else {
// Use regular client for single address
redisClient = redis.NewClient(&redis.Options{
Addr: redisAddresses[0],
Username: username,
Password: password,
})
} }
defer redisClient.Close()
// Ping Redis to check connectivity addresses, err := getAddress("REDIS_ADDRESS", "REDIS_PORT", config.Config.Redis.Address)
_, err := redisClient.Ping(context.Background()).Result()
str := "the addr is:" + strings.Join(redisAddresses, ",")
if err != nil { if err != nil {
return "", errs.Wrap(errStr(err, str)) return "", err
} }
redis.Address = addresses
str, err := component.CheckRedis(redis)
if err != nil {
return "", err
}
return str, nil return str, nil
} }
// checkZookeeper checks the Zookeeper connection // checkZookeeper checks the Zookeeper connection
func checkZookeeper() (string, error) { func checkZookeeper() (string, error) {
// Prioritize environment variables // Prioritize environment variables
schema := getEnv("ZOOKEEPER_SCHEMA", "digest")
address := getEnv("ZOOKEEPER_ADDRESS", strings.Join(config.Config.Zookeeper.ZkAddr, ",")) address := getEnv("ZOOKEEPER_ADDRESS", strings.Join(config.Config.Zookeeper.ZkAddr, ","))
username := getEnv("ZOOKEEPER_USERNAME", config.Config.Zookeeper.Username)
password := getEnv("ZOOKEEPER_PASSWORD", config.Config.Zookeeper.Password)
// Split addresses to handle multiple Zookeeper nodes zk := &component.Zookeeper{
zookeeperAddresses := strings.Split(address, ",") Schema: getEnv("ZOOKEEPER_SCHEMA", "digest"),
ZkAddr: strings.Split(address, ","),
Username: getEnv("ZOOKEEPER_USERNAME", config.Config.Zookeeper.Username),
Password: getEnv("ZOOKEEPER_PASSWORD", config.Config.Zookeeper.Password),
}
// Connect to Zookeeper addresses, err := getAddress("ZOOKEEPER_ADDRESS", "ZOOKEEPER_PORT", config.Config.Zookeeper.ZkAddr)
str := "the addr is:" + address
c, eventChan, err := zk.Connect(zookeeperAddresses, time.Second) // Adjust the timeout as necessary
if err != nil { if err != nil {
return "", errs.Wrap(errStr(err, str)) return "", nil
}
timeout := time.After(5 * time.Second)
for {
select {
case event := <-eventChan:
if event.State == zk.StateConnected {
fmt.Println("Connected to Zookeeper")
goto Connected
}
case <-timeout:
return "", errs.Wrap(errors.New("timeout waiting for Zookeeper connection"), "Zookeeper Addr: "+strings.Join(config.Config.Zookeeper.ZkAddr, " "))
}
} }
Connected: zk.ZkAddr = addresses
defer c.Close()
// Set authentication if username and password are provided str, err := component.CheckZookeeper(zk)
if username != "" && password != "" { if err != nil {
if err := c.AddAuth(schema, []byte(username+":"+password)); err != nil { return "", err
return "", errs.Wrap(errStr(err, str))
}
} }
return str, nil return str, nil
} }
@ -327,24 +219,21 @@ func checkKafka() (string, error) {
password := getEnv("KAFKA_PASSWORD", config.Config.Kafka.Password) password := getEnv("KAFKA_PASSWORD", config.Config.Kafka.Password)
address := getEnv("KAFKA_ADDRESS", strings.Join(config.Config.Kafka.Addr, ",")) address := getEnv("KAFKA_ADDRESS", strings.Join(config.Config.Kafka.Addr, ","))
// Split addresses to handle multiple Kafka brokers kafka := &component.Kafka{
kafkaAddresses := strings.Split(address, ",") Username: username,
Password: password,
Addr: strings.Split(address, ","),
}
// Configure Kafka client addresses, err := getAddress("KAFKA_ADDRESS", "KAFKA_PORT", config.Config.Kafka.Addr)
cfg := sarama.NewConfig() if err != nil {
if username != "" && password != "" { return "", nil
cfg.Net.SASL.Enable = true
cfg.Net.SASL.User = username
cfg.Net.SASL.Password = password
} }
// Additional Kafka setup (e.g., TLS configuration) can be added here kafka.Addr = addresses
// kafka.SetupTLSConfig(cfg)
// Create Kafka client str, kafkaClient, err := component.CheckKafka(kafka)
str := "the addr is:" + address
kafkaClient, err := sarama.NewClient(kafkaAddresses, cfg)
if err != nil { if err != nil {
return "", errs.Wrap(errStr(err, str)) return "", err
} }
defer kafkaClient.Close() defer kafkaClient.Close()
@ -379,22 +268,42 @@ func isTopicPresent(topic string, topics []string) bool {
return false return false
} }
func colorPrint(colorCode int, format string, a ...interface{}) { func getAddress(key1, key2 string, fallback []string) ([]string, error) {
fmt.Printf("\x1b[%dm%s\x1b[0m\n", colorCode, fmt.Sprintf(format, a...)) address, addrExist := os.LookupEnv(key1)
} port, portExist := os.LookupEnv(key2)
func errorPrint(s string) {
colorPrint(colorRed, "%v", s)
}
func successPrint(s string) {
colorPrint(colorGreen, "%v", s)
}
func warningPrint(s string) { if addrExist && portExist {
colorPrint(colorYellow, "Warning: But %v", s) addresses := strings.Split(address, ",")
for i, addr := range addresses {
addresses[i] = addr + ":" + port
}
return addresses, nil
} else if !addrExist && portExist {
result := make([]string, len(config.Config.Redis.Address))
for i, addr := range config.Config.Redis.Address {
add := strings.Split(addr, ":")
result[i] = add[0] + ":" + port
}
return result, nil
} else if addrExist && !portExist {
return nil, errs.Wrap(errors.New("the ZOOKEEPER_PORT of minio is empty"))
}
return fallback, nil
} }
func errStr(err error, str string) error { func getMinioAddr(key1, key2, key3, fallback string) (string, error) {
return fmt.Errorf("%v;%s", err, str) // Prioritize environment variables
endpoint := getEnv(key1, fallback)
address, addressExist := os.LookupEnv(key2)
port, portExist := os.LookupEnv(key3)
if portExist && addressExist {
endpoint = "http://" + address + ":" + port
} else if !portExist && addressExist {
return "", errs.Wrap(errors.New("the MINIO_PORT of minio is empty"))
} else if portExist && !addressExist {
arr := strings.Split(config.Config.Object.Minio.Endpoint, ":")
arr[2] = port
endpoint = strings.Join(arr, ":")
}
return endpoint, nil
} }

Loading…
Cancel
Save