From 835ff3824f52c8f525927b7332ebbe6e122b8282 Mon Sep 17 00:00:00 2001 From: skiffer-git <72860476+skiffer-git@users.noreply.github.com> Date: Tue, 14 May 2024 18:21:36 +0800 Subject: [PATCH 1/6] Etcd naming and discovery (#2300) * add etcd * add etcd * add etcd * add etcd * add etcd * add etcd * add etcd * add etcd * add etcd * add etcd * add etcd * add etcd * add etcd * add etcd * add etcd * add etcd * add etcd * add etcd * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism * Add etcd as a service discovery mechanism --- .env | 2 +- config/discovery.yml | 13 +++++ config/share.yml | 1 - config/zookeeper.yml | 6 --- docker-compose.yml | 20 +++++++ go.mod | 10 ++-- go.sum | 19 +++++-- internal/api/init.go | 19 +++---- internal/api/router.go | 2 +- internal/msggateway/client.go | 1 + internal/msggateway/hub_server.go | 2 +- internal/msggateway/init.go | 8 +-- internal/msggateway/n_ws_server.go | 7 ++- internal/msgtransfer/init.go | 16 +++--- .../msgtransfer/online_history_msg_handler.go | 2 +- .../online_msg_to_mongo_handler.go | 2 +- internal/push/onlinepusher.go | 22 ++++---- internal/push/push.go | 2 +- internal/push/push_handler.go | 2 +- internal/rpc/auth/auth.go | 8 +-- internal/rpc/conversation/conversaion.go | 2 +- internal/rpc/friend/friend.go | 9 ++-- internal/rpc/group/group.go | 2 +- internal/rpc/msg/server.go | 2 +- internal/rpc/third/third.go | 2 +- internal/rpc/user/user.go | 2 +- internal/tools/cron_task.go | 8 +-- pkg/common/cmd/api.go | 4 +- pkg/common/cmd/auth.go | 4 +- pkg/common/cmd/constant.go | 8 +-- pkg/common/cmd/conversation.go | 4 +- pkg/common/cmd/cron_task.go | 2 +- pkg/common/cmd/friend.go | 4 +- pkg/common/cmd/group.go | 4 +- pkg/common/cmd/msg.go | 4 +- pkg/common/cmd/msg_gateway.go | 2 +- pkg/common/cmd/msg_transfer.go | 2 +- pkg/common/cmd/push.go | 4 +- pkg/common/cmd/third.go | 4 +- pkg/common/cmd/user.go | 4 +- pkg/common/config/config.go | 14 ++++- .../discoveryregister/discoveryregister.go | 34 ++++++------ pkg/common/discoveryregister/etcd/doc.go | 15 ++++++ .../discoveryregister/zookeeper/zookeeper.go | 44 --------------- pkg/common/startrpc/start.go | 4 +- tools/check-component/main.go | 54 ++++++++++++------- 46 files changed, 224 insertions(+), 182 deletions(-) create mode 100644 config/discovery.yml delete mode 100644 config/zookeeper.yml create mode 100644 pkg/common/discoveryregister/etcd/doc.go delete mode 100644 pkg/common/discoveryregister/zookeeper/zookeeper.go diff --git a/.env b/.env index d3ae0426a..1e7b1e11a 100644 --- a/.env +++ b/.env @@ -4,7 +4,7 @@ REDIS_IMAGE=redis:7.0.0 ZOOKEEPER_IMAGE=bitnami/zookeeper:3.8 KAFKA_IMAGE=bitnami/kafka:3.5.1 MINIO_IMAGE=minio/minio:RELEASE.2024-01-11T07-46-16Z - +ETCD_IMAGE=quay.io/coreos/etcd:v3.5.13 OPENIM_WEB_FRONT_IMAGE=openim/openim-web-front:release-v3.5.1 OPENIM_ADMIN_FRONT_IMAGE=openim/openim-admin-front:release-v1.7 diff --git a/config/discovery.yml b/config/discovery.yml new file mode 100644 index 000000000..3d96ff9b6 --- /dev/null +++ b/config/discovery.yml @@ -0,0 +1,13 @@ +enable: "etcd" +etcd: + rootDirectory: openim + address: [ localhost:12379 ] + username: '' + password: '' + +zookeeper: + schema: openim + address: [ localhost:12181 ] + username: '' + password: '' + diff --git a/config/share.yml b/config/share.yml index 2abbb77a0..fc97b6a1f 100644 --- a/config/share.yml +++ b/config/share.yml @@ -1,5 +1,4 @@ secret: openIM123 -env: zookeeper rpcRegisterName: user: user friend: friend diff --git a/config/zookeeper.yml b/config/zookeeper.yml deleted file mode 100644 index 33f52d7ca..000000000 --- a/config/zookeeper.yml +++ /dev/null @@ -1,6 +0,0 @@ - -schema: openim -address: [ localhost:12181 ] -username: '' -password: '' - diff --git a/docker-compose.yml b/docker-compose.yml index aeb53a417..d72c1a2fa 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -58,6 +58,26 @@ services: networks: - openim + etcd: + image: "${ETCD_IMAGE}" + container_name: etcd + ports: + - "12379:2379" + - "12380:2380" + environment: + - ETCD_NAME=s1 + - ETCD_DATA_DIR=/etcd-data + - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 + - ETCD_ADVERTISE_CLIENT_URLS=http://0.0.0.0:2379 + - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 + - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://0.0.0.0:2380 + - ETCD_INITIAL_CLUSTER=s1=http://0.0.0.0:2380 + - ETCD_INITIAL_CLUSTER_TOKEN=tkn + - ETCD_INITIAL_CLUSTER_STATE=new + restart: always + networks: + - openim + kafka: image: "${KAFKA_IMAGE}" container_name: kafka diff --git a/go.mod b/go.mod index b522065da..e9777eaa8 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.65 - github.com/openimsdk/tools v0.0.49-alpha.2 + github.com/openimsdk/tools v0.0.49-alpha.18 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 @@ -44,7 +44,6 @@ require ( golang.org/x/sync v0.6.0 ) - require ( cloud.google.com/go v0.112.0 // indirect cloud.google.com/go/compute v1.23.3 // indirect @@ -59,6 +58,8 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect github.com/clbanning/mxj v1.8.4 // indirect + github.com/coreos/go-semver v0.3.0 // indirect + github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dustin/go-humanize v1.0.1 // indirect @@ -75,7 +76,7 @@ require ( github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-zookeeper/zk v1.0.3 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/golang/protobuf v1.5.3 // indirect + github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/go-querystring v1.1.0 // indirect github.com/google/s2a-go v0.1.7 // indirect @@ -138,6 +139,9 @@ require ( github.com/xdg-go/stringprep v1.0.4 // indirect github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect + go.etcd.io/etcd/api/v3 v3.5.13 // indirect + go.etcd.io/etcd/client/pkg/v3 v3.5.13 // indirect + go.etcd.io/etcd/client/v3 v3.5.13 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect diff --git a/go.sum b/go.sum index e2ecf7284..3acb4709c 100644 --- a/go.sum +++ b/go.sum @@ -47,6 +47,10 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ= github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM= +github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= +github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= +github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -111,6 +115,7 @@ github.com/go-zookeeper/zk v1.0.3 h1:7M2kwOsc//9VeeFiPtf+uSJlVpU66x9Ba5+8XK7/TDg github.com/go-zookeeper/zk v1.0.3/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= @@ -132,8 +137,8 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -283,8 +288,8 @@ github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJ github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.65 h1:SPT9qyUsFRTTKSKb/FjpS+xr6sxz/Kbnu+su1bxYagc= github.com/openimsdk/protocol v0.0.65/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.49-alpha.2 h1:8IfV6o2ySU7C54sh/MG7ctEp1h3lSNe03OCUDWSk5Ws= -github.com/openimsdk/tools v0.0.49-alpha.2/go.mod h1:P4oGP1Pd+d4ctbLD5U/XQTgl8yu8Hd3skx640Fr69ko= +github.com/openimsdk/tools v0.0.49-alpha.18 h1:ARQeCiRmExvtB6XYItegThuV63JGOTxddwhSLHYXd78= +github.com/openimsdk/tools v0.0.49-alpha.18/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= @@ -378,6 +383,12 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +go.etcd.io/etcd/api/v3 v3.5.13 h1:8WXU2/NBge6AUF1K1gOexB6e07NgsN1hXK0rSTtgSp4= +go.etcd.io/etcd/api/v3 v3.5.13/go.mod h1:gBqlqkcMMZMVTMm4NDZloEVJzxQOQIls8splbqBDa0c= +go.etcd.io/etcd/client/pkg/v3 v3.5.13 h1:RVZSAnWWWiI5IrYAXjQorajncORbS0zI48LQlE2kQWg= +go.etcd.io/etcd/client/pkg/v3 v3.5.13/go.mod h1:XxHT4u1qU12E2+po+UVPrEeL94Um6zL58ppuJWXSAB8= +go.etcd.io/etcd/client/v3 v3.5.13 h1:o0fHTNJLeO0MyVbc7I3fsCf6nrOqn5d+diSarKnB2js= +go.etcd.io/etcd/client/v3 v3.5.13/go.mod h1:cqiAeY8b5DEEcpxvgWKsbLIWNM/8Wy2xJSDMtioMcoI= go.mongodb.org/mongo-driver v1.14.0 h1:P98w8egYRjYe3XDjxhYJagTokP/H6HzlsnojRgZRd80= go.mongodb.org/mongo-driver v1.14.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= diff --git a/internal/api/init.go b/internal/api/init.go index 6e784da9a..b49a14569 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -38,20 +38,17 @@ import ( ) type Config struct { - RpcConfig config.API - MongodbConfig config.Mongo - ZookeeperConfig config.ZooKeeper - NotificationConfig config.Notification - Share config.Share - MinioConfig config.Minio + API config.API + Share config.Share + Discovery config.Discovery } func Start(ctx context.Context, index int, config *Config) error { - apiPort, err := datautil.GetElemByIndex(config.RpcConfig.Api.Ports, index) + apiPort, err := datautil.GetElemByIndex(config.API.Api.Ports, index) if err != nil { return err } - prometheusPort, err := datautil.GetElemByIndex(config.RpcConfig.Prometheus.Ports, index) + prometheusPort, err := datautil.GetElemByIndex(config.API.Prometheus.Ports, index) if err != nil { return err } @@ -59,7 +56,7 @@ func Start(ctx context.Context, index int, config *Config) error { var client discovery.SvcDiscoveryRegistry // Determine whether zk is passed according to whether it is a clustered deployment - client, err = kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share) + client, err = kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share) if err != nil { return errs.WrapMsg(err, "failed to register discovery service") } @@ -70,7 +67,7 @@ func Start(ctx context.Context, index int, config *Config) error { ) router := newGinRouter(client, config) - if config.RpcConfig.Prometheus.Enable { + if config.API.Prometheus.Enable { go func() { p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api")) p.SetListenAddress(fmt.Sprintf(":%d", prometheusPort)) @@ -81,7 +78,7 @@ func Start(ctx context.Context, index int, config *Config) error { }() } - address := net.JoinHostPort(network.GetListenIP(config.RpcConfig.Api.ListenIP), strconv.Itoa(apiPort)) + address := net.JoinHostPort(network.GetListenIP(config.API.Api.ListenIP), strconv.Itoa(apiPort)) server := http.Server{Addr: address, Handler: router} log.CInfo(ctx, "API server is initializing", "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort) diff --git a/internal/api/router.go b/internal/api/router.go index bd2de99db..1fbb33b09 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -34,7 +34,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En messageRpc := rpcclient.NewMessage(disCov, config.Share.RpcRegisterName.Msg) conversationRpc := rpcclient.NewConversation(disCov, config.Share.RpcRegisterName.Conversation) authRpc := rpcclient.NewAuth(disCov, config.Share.RpcRegisterName.Auth) - thirdRpc := rpcclient.NewThird(disCov, config.Share.RpcRegisterName.Third, config.RpcConfig.Prometheus.GrafanaURL) + thirdRpc := rpcclient.NewThird(disCov, config.Share.RpcRegisterName.Third, config.API.Prometheus.GrafanaURL) u := NewUserApi(*userRpc) m := NewMessageApi(messageRpc, userRpc, config.Share.IMAdminUserID) diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index af869dd85..0581a025b 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -286,6 +286,7 @@ func (c *Client) KickOnlineMessage() error { resp := Resp{ ReqIdentifier: WSKickOnlineMsg, } + log.ZDebug(c.ctx, "KickOnlineMessage debug ") err := c.writeBinaryMsg(resp) c.close() return err diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index bfe81b602..f9bb699ed 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -35,7 +35,7 @@ func (s *Server) InitServer(ctx context.Context, config *Config, disCov discover } func (s *Server) Start(ctx context.Context, index int, conf *Config) error { - return startrpc.Start(ctx, &conf.ZookeeperConfig, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP, + return startrpc.Start(ctx, &conf.Discovery, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP, conf.MsgGateway.RPC.RegisterIP, conf.MsgGateway.RPC.Ports, index, conf.Share.RpcRegisterName.MessageGateway, diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 727ade0af..ef24d1bf9 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -24,10 +24,10 @@ import ( ) type Config struct { - MsgGateway config.MsgGateway - ZookeeperConfig config.ZooKeeper - Share config.Share - WebhooksConfig config.Webhooks + MsgGateway config.MsgGateway + Share config.Share + WebhooksConfig config.Webhooks + Discovery config.Discovery } // Start run ws server. diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index cf607d470..defec16df 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -211,7 +211,8 @@ func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *C // Online push user online message to other node for _, v := range conns { - v := v // safe closure var + v := v + log.ZDebug(ctx, " sendUserOnlineInfoToOtherNode conn ", "target", v.Target()) if v.Target() == ws.disCov.GetSelfConnTarget() { log.ZDebug(ctx, "Filter out this node", "node", v.Target()) continue @@ -267,7 +268,9 @@ func (ws *WsServer) registerClient(client *Client) { } wg := sync.WaitGroup{} - if ws.msgGatewayConfig.Share.Env == "zookeeper" { + log.ZDebug(client.ctx, "ws.msgGatewayConfig.Discovery.Enable", ws.msgGatewayConfig.Discovery.Enable) + + if ws.msgGatewayConfig.Discovery.Enable != "k8s" { wg.Add(1) go func() { defer wg.Done() diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 68d953e90..3384b8493 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -56,13 +56,13 @@ type MsgTransfer struct { } type Config struct { - MsgTransfer config.MsgTransfer - RedisConfig config.Redis - MongodbConfig config.Mongo - KafkaConfig config.Kafka - ZookeeperConfig config.ZooKeeper - Share config.Share - WebhooksConfig config.Webhooks + MsgTransfer config.MsgTransfer + RedisConfig config.Redis + MongodbConfig config.Mongo + KafkaConfig config.Kafka + Share config.Share + WebhooksConfig config.Webhooks + Discovery config.Discovery } func Start(ctx context.Context, index int, config *Config) error { @@ -76,7 +76,7 @@ func Start(ctx context.Context, index int, config *Config) error { if err != nil { return err } - client, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share) + client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share) if err != nil { return err } diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 8691e92ab..df2660804 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -83,7 +83,7 @@ type OnlineHistoryRedisConsumerHandler struct { func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*OnlineHistoryRedisConsumerHandler, error) { - historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}) + historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}, true) if err != nil { return nil, err } diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 978302e76..c9c035893 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -33,7 +33,7 @@ type OnlineHistoryMongoConsumerHandler struct { } func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) { - historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToMongoGroupID, []string{kafkaConf.ToMongoTopic}) + historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToMongoGroupID, []string{kafkaConf.ToMongoTopic}, true) if err != nil { return nil, err } diff --git a/internal/push/onlinepusher.go b/internal/push/onlinepusher.go index 30bdf3e2e..a61399fb6 100644 --- a/internal/push/onlinepusher.go +++ b/internal/push/onlinepusher.go @@ -12,11 +12,6 @@ import ( "sync" ) -const ( - KUBERNETES = "k8s" - ZOOKEEPER = "zookeeper" -) - type OnlinePusher interface { GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) @@ -42,10 +37,12 @@ func (u emptyOnlinePUsher) GetOnlinePushFailedUserIDs(ctx context.Context, msg * } func NewOnlinePusher(disCov discovery.SvcDiscoveryRegistry, config *Config) OnlinePusher { - switch config.Share.Env { - case KUBERNETES: + switch config.Discovery.Enable { + case "k8s": return NewK8sStaticConsistentHash(disCov, config) - case ZOOKEEPER: + case "zookeeper": + return NewDefaultAllNode(disCov, config) + case "etcd": return NewDefaultAllNode(disCov, config) default: return newEmptyOnlinePUsher() @@ -64,7 +61,12 @@ func NewDefaultAllNode(disCov discovery.SvcDiscoveryRegistry, config *Config) *D func (d *DefaultAllNode) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { conns, err := d.disCov.GetConns(ctx, d.config.Share.RpcRegisterName.MessageGateway) - log.ZDebug(ctx, "get gateway conn", "conn length", len(conns)) + if len(conns) == 0 { + log.ZWarn(ctx, "get gateway conn 0 ", nil) + } else { + log.ZDebug(ctx, "get gateway conn", "conn length", len(conns)) + } + if err != nil { return nil, err } @@ -85,10 +87,12 @@ func (d *DefaultAllNode) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.M // Online push message for _, conn := range conns { conn := conn // loop var safe + ctx := ctx wg.Go(func() error { msgClient := msggateway.NewMsgGatewayClient(conn) reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input) if err != nil { + log.ZError(ctx, "SuperGroupOnlineBatchPushOneMsg ", err, "req:", input.String()) return nil } diff --git a/internal/push/push.go b/internal/push/push.go index 18012a864..2e5c4e526 100644 --- a/internal/push/push.go +++ b/internal/push/push.go @@ -24,11 +24,11 @@ type Config struct { RedisConfig config.Redis MongodbConfig config.Mongo KafkaConfig config.Kafka - ZookeeperConfig config.ZooKeeper NotificationConfig config.Notification Share config.Share WebhooksConfig config.Webhooks LocalCacheConfig config.LocalCache + Discovery config.Discovery } func (p pushServer) PushMsg(ctx context.Context, req *pbpush.PushMsgReq) (*pbpush.PushMsgResp, error) { diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 3a9a696f6..bf0ede375 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -60,7 +60,7 @@ func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher, var consumerHandler ConsumerHandler var err error consumerHandler.pushConsumerGroup, err = kafka.NewMConsumerGroup(config.KafkaConfig.Build(), config.KafkaConfig.ToPushGroupID, - []string{config.KafkaConfig.ToPushTopic}) + []string{config.KafkaConfig.ToPushTopic}, true) if err != nil { return nil, err } diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index c6d236b21..ddb655398 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -45,10 +45,10 @@ type authServer struct { } type Config struct { - RpcConfig config.Auth - RedisConfig config.Redis - ZookeeperConfig config.ZooKeeper - Share config.Share + RpcConfig config.Auth + RedisConfig config.Redis + Share config.Share + Discovery config.Discovery } func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 96d2a403f..4c7828610 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -51,10 +51,10 @@ type Config struct { RpcConfig config.Conversation RedisConfig config.Redis MongodbConfig config.Mongo - ZookeeperConfig config.ZooKeeper NotificationConfig config.Notification Share config.Share LocalCacheConfig config.LocalCache + Discovery config.Discovery } func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index bffda3c04..b49490f26 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -50,14 +50,15 @@ type friendServer struct { } type Config struct { - RpcConfig config.Friend - RedisConfig config.Redis - MongodbConfig config.Mongo - ZookeeperConfig config.ZooKeeper + RpcConfig config.Friend + RedisConfig config.Redis + MongodbConfig config.Mongo + //ZookeeperConfig config.ZooKeeper NotificationConfig config.Notification Share config.Share WebhooksConfig config.Webhooks LocalCacheConfig config.LocalCache + Discovery config.Discovery } func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 13bd7f9be..551554c23 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -68,11 +68,11 @@ type Config struct { RpcConfig config.Group RedisConfig config.Redis MongodbConfig config.Mongo - ZookeeperConfig config.ZooKeeper NotificationConfig config.Notification Share config.Share WebhooksConfig config.Webhooks LocalCacheConfig config.LocalCache + Discovery config.Discovery } func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 3f4df8d4b..5d7c0b297 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -59,11 +59,11 @@ type ( RedisConfig config.Redis MongodbConfig config.Mongo KafkaConfig config.Kafka - ZookeeperConfig config.ZooKeeper NotificationConfig config.Notification Share config.Share WebhooksConfig config.Webhooks LocalCacheConfig config.LocalCache + Discovery config.Discovery } ) diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index 9bf8cafa9..a3d9085d3 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -46,11 +46,11 @@ type Config struct { RpcConfig config.Third RedisConfig config.Redis MongodbConfig config.Mongo - ZookeeperConfig config.ZooKeeper NotificationConfig config.Notification Share config.Share MinioConfig config.Minio LocalCacheConfig config.LocalCache + Discovery config.Discovery } func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index c453ac9f8..a28fa24e2 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -61,11 +61,11 @@ type Config struct { RedisConfig config.Redis MongodbConfig config.Mongo KafkaConfig config.Kafka - ZookeeperConfig config.ZooKeeper NotificationConfig config.Notification Share config.Share WebhooksConfig config.Webhooks LocalCacheConfig config.LocalCache + Discovery config.Discovery } func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegistry, server *grpc.Server) error { diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 20baeffaf..bf037b694 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -33,9 +33,9 @@ import ( ) type CronTaskConfig struct { - CronTask config.CronTask - ZookeeperConfig config.ZooKeeper - Share config.Share + CronTask config.CronTask + Share config.Share + Discovery config.Discovery } func Start(ctx context.Context, config *CronTaskConfig) error { @@ -43,7 +43,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error { if config.CronTask.RetainChatRecords < 1 { return errs.New("msg destruct time must be greater than 1").Wrap() } - client, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share) + client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share) if err != nil { return errs.WrapMsg(err, "failed to register discovery service") } diff --git a/pkg/common/cmd/api.go b/pkg/common/cmd/api.go index 022fb1097..ecdb0dd3a 100644 --- a/pkg/common/cmd/api.go +++ b/pkg/common/cmd/api.go @@ -33,9 +33,9 @@ func NewApiCmd() *ApiCmd { var apiConfig api.Config ret := &ApiCmd{apiConfig: &apiConfig} ret.configMap = map[string]any{ - OpenIMAPICfgFileName: &apiConfig.RpcConfig, - ZookeeperConfigFileName: &apiConfig.ZookeeperConfig, + OpenIMAPICfgFileName: &apiConfig.API, ShareFileName: &apiConfig.Share, + DiscoveryConfigFilename: &apiConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) diff --git a/pkg/common/cmd/auth.go b/pkg/common/cmd/auth.go index 5ed02ffd0..7d75a7da6 100644 --- a/pkg/common/cmd/auth.go +++ b/pkg/common/cmd/auth.go @@ -36,8 +36,8 @@ func NewAuthRpcCmd() *AuthRpcCmd { ret.configMap = map[string]any{ OpenIMRPCAuthCfgFileName: &authConfig.RpcConfig, RedisConfigFileName: &authConfig.RedisConfig, - ZookeeperConfigFileName: &authConfig.ZookeeperConfig, ShareFileName: &authConfig.Share, + DiscoveryConfigFilename: &authConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -53,7 +53,7 @@ func (a *AuthRpcCmd) Exec() error { } func (a *AuthRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.authConfig.ZookeeperConfig, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.Ports, a.Index(), a.authConfig.Share.RpcRegisterName.Auth, &a.authConfig.Share, a.authConfig, auth.Start) } diff --git a/pkg/common/cmd/constant.go b/pkg/common/cmd/constant.go index 55eb4a069..45dbcafda 100644 --- a/pkg/common/cmd/constant.go +++ b/pkg/common/cmd/constant.go @@ -26,7 +26,6 @@ var ( LocalCacheConfigFileName string KafkaConfigFileName string RedisConfigFileName string - ZookeeperConfigFileName string MongodbConfigFileName string MinioConfigFileName string LogConfigFileName string @@ -42,6 +41,7 @@ var ( OpenIMRPCMsgCfgFileName string OpenIMRPCThirdCfgFileName string OpenIMRPCUserCfgFileName string + DiscoveryConfigFilename string ) var ConfigEnvPrefixMap map[string]string @@ -54,7 +54,6 @@ func init() { LocalCacheConfigFileName = "local-cache.yml" KafkaConfigFileName = "kafka.yml" RedisConfigFileName = "redis.yml" - ZookeeperConfigFileName = "zookeeper.yml" MongodbConfigFileName = "mongodb.yml" MinioConfigFileName = "minio.yml" LogConfigFileName = "log.yml" @@ -70,16 +69,17 @@ func init() { OpenIMRPCMsgCfgFileName = "openim-rpc-msg.yml" OpenIMRPCThirdCfgFileName = "openim-rpc-third.yml" OpenIMRPCUserCfgFileName = "openim-rpc-user.yml" + DiscoveryConfigFilename = "discovery.yml" ConfigEnvPrefixMap = make(map[string]string) fileNames := []string{ FileName, NotificationFileName, ShareFileName, WebhooksConfigFileName, - KafkaConfigFileName, RedisConfigFileName, ZookeeperConfigFileName, + KafkaConfigFileName, RedisConfigFileName, MongodbConfigFileName, MinioConfigFileName, LogConfigFileName, OpenIMAPICfgFileName, OpenIMCronTaskCfgFileName, OpenIMMsgGatewayCfgFileName, OpenIMMsgTransferCfgFileName, OpenIMPushCfgFileName, OpenIMRPCAuthCfgFileName, OpenIMRPCConversationCfgFileName, OpenIMRPCFriendCfgFileName, OpenIMRPCGroupCfgFileName, - OpenIMRPCMsgCfgFileName, OpenIMRPCThirdCfgFileName, OpenIMRPCUserCfgFileName, + OpenIMRPCMsgCfgFileName, OpenIMRPCThirdCfgFileName, OpenIMRPCUserCfgFileName, DiscoveryConfigFilename, } for _, fileName := range fileNames { diff --git a/pkg/common/cmd/conversation.go b/pkg/common/cmd/conversation.go index 0a617c729..57ffa52bc 100644 --- a/pkg/common/cmd/conversation.go +++ b/pkg/common/cmd/conversation.go @@ -36,11 +36,11 @@ func NewConversationRpcCmd() *ConversationRpcCmd { ret.configMap = map[string]any{ OpenIMRPCConversationCfgFileName: &conversationConfig.RpcConfig, RedisConfigFileName: &conversationConfig.RedisConfig, - ZookeeperConfigFileName: &conversationConfig.ZookeeperConfig, MongodbConfigFileName: &conversationConfig.MongodbConfig, ShareFileName: &conversationConfig.Share, NotificationFileName: &conversationConfig.NotificationConfig, LocalCacheConfigFileName: &conversationConfig.LocalCacheConfig, + DiscoveryConfigFilename: &conversationConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -55,7 +55,7 @@ func (a *ConversationRpcCmd) Exec() error { } func (a *ConversationRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.conversationConfig.ZookeeperConfig, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.Ports, a.Index(), a.conversationConfig.Share.RpcRegisterName.Conversation, &a.conversationConfig.Share, a.conversationConfig, conversation.Start) } diff --git a/pkg/common/cmd/cron_task.go b/pkg/common/cmd/cron_task.go index be26f5af3..fd4447524 100644 --- a/pkg/common/cmd/cron_task.go +++ b/pkg/common/cmd/cron_task.go @@ -34,8 +34,8 @@ func NewCronTaskCmd() *CronTaskCmd { ret := &CronTaskCmd{cronTaskConfig: &cronTaskConfig} ret.configMap = map[string]any{ OpenIMCronTaskCfgFileName: &cronTaskConfig.CronTask, - ZookeeperConfigFileName: &cronTaskConfig.ZookeeperConfig, ShareFileName: &cronTaskConfig.Share, + DiscoveryConfigFilename: &cronTaskConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) diff --git a/pkg/common/cmd/friend.go b/pkg/common/cmd/friend.go index b8d46f77e..8be1f7745 100644 --- a/pkg/common/cmd/friend.go +++ b/pkg/common/cmd/friend.go @@ -36,12 +36,12 @@ func NewFriendRpcCmd() *FriendRpcCmd { ret.configMap = map[string]any{ OpenIMRPCFriendCfgFileName: &friendConfig.RpcConfig, RedisConfigFileName: &friendConfig.RedisConfig, - ZookeeperConfigFileName: &friendConfig.ZookeeperConfig, MongodbConfigFileName: &friendConfig.MongodbConfig, ShareFileName: &friendConfig.Share, NotificationFileName: &friendConfig.NotificationConfig, WebhooksConfigFileName: &friendConfig.WebhooksConfig, LocalCacheConfigFileName: &friendConfig.LocalCacheConfig, + DiscoveryConfigFilename: &friendConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -56,7 +56,7 @@ func (a *FriendRpcCmd) Exec() error { } func (a *FriendRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.friendConfig.ZookeeperConfig, &a.friendConfig.RpcConfig.Prometheus, a.friendConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.friendConfig.Discovery, &a.friendConfig.RpcConfig.Prometheus, a.friendConfig.RpcConfig.RPC.ListenIP, a.friendConfig.RpcConfig.RPC.RegisterIP, a.friendConfig.RpcConfig.RPC.Ports, a.Index(), a.friendConfig.Share.RpcRegisterName.Friend, &a.friendConfig.Share, a.friendConfig, friend.Start) } diff --git a/pkg/common/cmd/group.go b/pkg/common/cmd/group.go index 8bf977824..f158b8c62 100644 --- a/pkg/common/cmd/group.go +++ b/pkg/common/cmd/group.go @@ -36,12 +36,12 @@ func NewGroupRpcCmd() *GroupRpcCmd { ret.configMap = map[string]any{ OpenIMRPCGroupCfgFileName: &groupConfig.RpcConfig, RedisConfigFileName: &groupConfig.RedisConfig, - ZookeeperConfigFileName: &groupConfig.ZookeeperConfig, MongodbConfigFileName: &groupConfig.MongodbConfig, ShareFileName: &groupConfig.Share, NotificationFileName: &groupConfig.NotificationConfig, WebhooksConfigFileName: &groupConfig.WebhooksConfig, LocalCacheConfigFileName: &groupConfig.LocalCacheConfig, + DiscoveryConfigFilename: &groupConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -56,7 +56,7 @@ func (a *GroupRpcCmd) Exec() error { } func (a *GroupRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.groupConfig.ZookeeperConfig, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.Ports, a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, group.Start) } diff --git a/pkg/common/cmd/msg.go b/pkg/common/cmd/msg.go index a3b521b4b..91f7931fb 100644 --- a/pkg/common/cmd/msg.go +++ b/pkg/common/cmd/msg.go @@ -36,13 +36,13 @@ func NewMsgRpcCmd() *MsgRpcCmd { ret.configMap = map[string]any{ OpenIMRPCMsgCfgFileName: &msgConfig.RpcConfig, RedisConfigFileName: &msgConfig.RedisConfig, - ZookeeperConfigFileName: &msgConfig.ZookeeperConfig, MongodbConfigFileName: &msgConfig.MongodbConfig, KafkaConfigFileName: &msgConfig.KafkaConfig, ShareFileName: &msgConfig.Share, NotificationFileName: &msgConfig.NotificationConfig, WebhooksConfigFileName: &msgConfig.WebhooksConfig, LocalCacheConfigFileName: &msgConfig.LocalCacheConfig, + DiscoveryConfigFilename: &msgConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -57,7 +57,7 @@ func (a *MsgRpcCmd) Exec() error { } func (a *MsgRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.msgConfig.ZookeeperConfig, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.Ports, a.Index(), a.msgConfig.Share.RpcRegisterName.Msg, &a.msgConfig.Share, a.msgConfig, msg.Start) } diff --git a/pkg/common/cmd/msg_gateway.go b/pkg/common/cmd/msg_gateway.go index 897fd7008..78004094c 100644 --- a/pkg/common/cmd/msg_gateway.go +++ b/pkg/common/cmd/msg_gateway.go @@ -36,9 +36,9 @@ func NewMsgGatewayCmd() *MsgGatewayCmd { ret := &MsgGatewayCmd{msgGatewayConfig: &msgGatewayConfig} ret.configMap = map[string]any{ OpenIMMsgGatewayCfgFileName: &msgGatewayConfig.MsgGateway, - ZookeeperConfigFileName: &msgGatewayConfig.ZookeeperConfig, ShareFileName: &msgGatewayConfig.Share, WebhooksConfigFileName: &msgGatewayConfig.WebhooksConfig, + DiscoveryConfigFilename: &msgGatewayConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) diff --git a/pkg/common/cmd/msg_transfer.go b/pkg/common/cmd/msg_transfer.go index 86f42dc56..0d48281e5 100644 --- a/pkg/common/cmd/msg_transfer.go +++ b/pkg/common/cmd/msg_transfer.go @@ -37,9 +37,9 @@ func NewMsgTransferCmd() *MsgTransferCmd { RedisConfigFileName: &msgTransferConfig.RedisConfig, MongodbConfigFileName: &msgTransferConfig.MongodbConfig, KafkaConfigFileName: &msgTransferConfig.KafkaConfig, - ZookeeperConfigFileName: &msgTransferConfig.ZookeeperConfig, ShareFileName: &msgTransferConfig.Share, WebhooksConfigFileName: &msgTransferConfig.WebhooksConfig, + DiscoveryConfigFilename: &msgTransferConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) diff --git a/pkg/common/cmd/push.go b/pkg/common/cmd/push.go index 0140ced23..3e7c4c249 100644 --- a/pkg/common/cmd/push.go +++ b/pkg/common/cmd/push.go @@ -36,13 +36,13 @@ func NewPushRpcCmd() *PushRpcCmd { ret.configMap = map[string]any{ OpenIMPushCfgFileName: &pushConfig.RpcConfig, RedisConfigFileName: &pushConfig.RedisConfig, - ZookeeperConfigFileName: &pushConfig.ZookeeperConfig, MongodbConfigFileName: &pushConfig.MongodbConfig, KafkaConfigFileName: &pushConfig.KafkaConfig, ShareFileName: &pushConfig.Share, NotificationFileName: &pushConfig.NotificationConfig, WebhooksConfigFileName: &pushConfig.WebhooksConfig, LocalCacheConfigFileName: &pushConfig.LocalCacheConfig, + DiscoveryConfigFilename: &pushConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -57,7 +57,7 @@ func (a *PushRpcCmd) Exec() error { } func (a *PushRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.pushConfig.ZookeeperConfig, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.Ports, a.Index(), a.pushConfig.Share.RpcRegisterName.Push, &a.pushConfig.Share, a.pushConfig, push.Start) } diff --git a/pkg/common/cmd/third.go b/pkg/common/cmd/third.go index 0dfa7d5be..b6731f1ff 100644 --- a/pkg/common/cmd/third.go +++ b/pkg/common/cmd/third.go @@ -36,12 +36,12 @@ func NewThirdRpcCmd() *ThirdRpcCmd { ret.configMap = map[string]any{ OpenIMRPCThirdCfgFileName: &thirdConfig.RpcConfig, RedisConfigFileName: &thirdConfig.RedisConfig, - ZookeeperConfigFileName: &thirdConfig.ZookeeperConfig, MongodbConfigFileName: &thirdConfig.MongodbConfig, ShareFileName: &thirdConfig.Share, NotificationFileName: &thirdConfig.NotificationConfig, MinioConfigFileName: &thirdConfig.MinioConfig, LocalCacheConfigFileName: &thirdConfig.LocalCacheConfig, + DiscoveryConfigFilename: &thirdConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -56,7 +56,7 @@ func (a *ThirdRpcCmd) Exec() error { } func (a *ThirdRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.thirdConfig.ZookeeperConfig, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.Ports, a.Index(), a.thirdConfig.Share.RpcRegisterName.Third, &a.thirdConfig.Share, a.thirdConfig, third.Start) } diff --git a/pkg/common/cmd/user.go b/pkg/common/cmd/user.go index 315b93256..674f9e3a6 100644 --- a/pkg/common/cmd/user.go +++ b/pkg/common/cmd/user.go @@ -36,13 +36,13 @@ func NewUserRpcCmd() *UserRpcCmd { ret.configMap = map[string]any{ OpenIMRPCUserCfgFileName: &userConfig.RpcConfig, RedisConfigFileName: &userConfig.RedisConfig, - ZookeeperConfigFileName: &userConfig.ZookeeperConfig, MongodbConfigFileName: &userConfig.MongodbConfig, KafkaConfigFileName: &userConfig.KafkaConfig, ShareFileName: &userConfig.Share, NotificationFileName: &userConfig.NotificationConfig, WebhooksConfigFileName: &userConfig.WebhooksConfig, LocalCacheConfigFileName: &userConfig.LocalCacheConfig, + DiscoveryConfigFilename: &userConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) @@ -57,7 +57,7 @@ func (a *UserRpcCmd) Exec() error { } func (a *UserRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.userConfig.ZookeeperConfig, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, + return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.Ports, a.Index(), a.userConfig.Share.RpcRegisterName.User, &a.userConfig.Share, a.userConfig, user.Start) } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 24d04d8cc..12c4f7f78 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -345,7 +345,6 @@ type AfterConfig struct { type Share struct { Secret string `mapstructure:"secret"` - Env string `mapstructure:"env"` RpcRegisterName RpcRegisterName `mapstructure:"rpcRegisterName"` IMAdminUserID []string `mapstructure:"imAdminUserID"` } @@ -432,6 +431,19 @@ type ZooKeeper struct { Password string `mapstructure:"password"` } +type Discovery struct { + Enable string `mapstructure:"enable"` + Etcd Etcd `mapstructure:"etcd"` + ZooKeeper ZooKeeper `mapstructure:"zooKeeper"` +} + +type Etcd struct { + RootDirectory string `mapstructure:"rootDirectory"` + Address []string `mapstructure:"address"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` +} + func (m *Mongo) Build() *mongoutil.Config { return &mongoutil.Config{ Uri: m.URI, diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index 38d7382fa..559c937c1 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -18,36 +18,34 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes" "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/discovery/zookeeper" "github.com/openimsdk/tools/errs" "time" ) -const ( - zookeeperConst = "zookeeper" - kubenetesConst = "k8s" - directConst = "direct" -) - // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. -func NewDiscoveryRegister(zookeeperConfig *config.ZooKeeper, share *config.Share) (discovery.SvcDiscoveryRegistry, error) { - switch share.Env { - case zookeeperConst: - +func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share) (discovery.SvcDiscoveryRegistry, error) { + switch discovery.Enable { + case "zookeeper": return zookeeper.NewZkClient( - zookeeperConfig.Address, - zookeeperConfig.Schema, + discovery.ZooKeeper.Address, + discovery.ZooKeeper.Schema, zookeeper.WithFreq(time.Hour), - zookeeper.WithUserNameAndPassword(zookeeperConfig.Username, zookeeperConfig.Password), + zookeeper.WithUserNameAndPassword(discovery.ZooKeeper.Username, discovery.ZooKeeper.Password), zookeeper.WithRoundRobin(), zookeeper.WithTimeout(10), ) - case kubenetesConst: + case "k8s": return kubernetes.NewK8sDiscoveryRegister(share.RpcRegisterName.MessageGateway) - case directConst: - //return direct.NewConnDirect(config) + case "etcd": + return etcd.NewSvcDiscoveryRegistry( + discovery.Etcd.RootDirectory, + discovery.Etcd.Address, + etcd.WithDialTimeout(10*time.Second), + etcd.WithMaxCallSendMsgSize(20*1024*1024), + etcd.WithUsernameAndPassword(discovery.Etcd.Username, discovery.Etcd.Password)) default: - return nil, errs.New("unsupported discovery type", "type", share.Env).Wrap() + return nil, errs.New("unsupported discovery type", "type", discovery.Enable).Wrap() } - return nil, nil } diff --git a/pkg/common/discoveryregister/etcd/doc.go b/pkg/common/discoveryregister/etcd/doc.go new file mode 100644 index 000000000..1da7508a1 --- /dev/null +++ b/pkg/common/discoveryregister/etcd/doc.go @@ -0,0 +1,15 @@ +// Copyright © 2024 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes // import "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/etcd" diff --git a/pkg/common/discoveryregister/zookeeper/zookeeper.go b/pkg/common/discoveryregister/zookeeper/zookeeper.go deleted file mode 100644 index 1d11414b6..000000000 --- a/pkg/common/discoveryregister/zookeeper/zookeeper.go +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package zookeeper - -import ( - "os" - "strings" -) - -// getEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value. -func getEnv(key, fallback string) string { - if value, exists := os.LookupEnv(key); exists { - return value - } - return fallback -} - -// getZkAddrFromEnv returns the Zookeeper addresses combined from the ZOOKEEPER_ADDRESS and ZOOKEEPER_PORT environment variables. -// If the environment variables are not set, it returns the fallback value. -func getZkAddrFromEnv(fallback []string) []string { - address, addrExists := os.LookupEnv("ZOOKEEPER_ADDRESS") - port, portExists := os.LookupEnv("ZOOKEEPER_PORT") - - if addrExists && portExists { - addresses := strings.Split(address, ",") - for i, addr := range addresses { - addresses[i] = addr + ":" + port - } - return addresses - } - return fallback -} diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index ebcd5aa7c..a36bcfe1c 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -44,7 +44,7 @@ import ( ) // Start rpc server. -func Start[T any](ctx context.Context, zookeeperConfig *config2.ZooKeeper, prometheusConfig *config2.Prometheus, listenIP, +func Start[T any](ctx context.Context, discovery *config2.Discovery, prometheusConfig *config2.Prometheus, listenIP, registerIP string, rpcPorts []int, index int, rpcRegisterName string, share *config2.Share, config T, rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { @@ -68,7 +68,7 @@ func Start[T any](ctx context.Context, zookeeperConfig *config2.ZooKeeper, prome } defer listener.Close() - client, err := kdisc.NewDiscoveryRegister(zookeeperConfig, share) + client, err := kdisc.NewDiscoveryRegister(discovery, share) if err != nil { return err } diff --git a/tools/check-component/main.go b/tools/check-component/main.go index 7fe64d3c5..5fa84ac36 100644 --- a/tools/check-component/main.go +++ b/tools/check-component/main.go @@ -22,6 +22,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" + "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/discovery/zookeeper" "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/s3/minio" @@ -43,6 +44,14 @@ func CheckZookeeper(ctx context.Context, config *config.ZooKeeper) error { return zookeeper.Check(ctx, config.Address, config.Schema, zookeeper.WithUserNameAndPassword(config.Username, config.Password)) } +func CheckEtcd(ctx context.Context, config *config.Etcd) error { + return etcd.Check(ctx, config.Address, "/check_openim_component", + true, + etcd.WithDialTimeout(10*time.Second), + etcd.WithMaxCallSendMsgSize(20*1024*1024), + etcd.WithUsernameAndPassword(config.Username, config.Password)) +} + func CheckMongo(ctx context.Context, config *config.Mongo) error { return mongoutil.Check(ctx, config.Build()) } @@ -59,14 +68,14 @@ func CheckKafka(ctx context.Context, conf *config.Kafka) error { return kafka.Check(ctx, conf.Build(), []string{conf.ToMongoTopic, conf.ToRedisTopic, conf.ToPushTopic}) } -func initConfig(configDir string) (*config.Mongo, *config.Redis, *config.Kafka, *config.Minio, *config.ZooKeeper, error) { +func initConfig(configDir string) (*config.Mongo, *config.Redis, *config.Kafka, *config.Minio, *config.Discovery, error) { var ( - mongoConfig = &config.Mongo{} - redisConfig = &config.Redis{} - kafkaConfig = &config.Kafka{} - minioConfig = &config.Minio{} - zookeeperConfig = &config.ZooKeeper{} - thirdConfig = &config.Third{} + mongoConfig = &config.Mongo{} + redisConfig = &config.Redis{} + kafkaConfig = &config.Kafka{} + minioConfig = &config.Minio{} + discovery = &config.Discovery{} + thirdConfig = &config.Third{} ) err := config.LoadConfig(filepath.Join(configDir, cmd.MongodbConfigFileName), cmd.ConfigEnvPrefixMap[cmd.MongodbConfigFileName], mongoConfig) if err != nil { @@ -96,11 +105,11 @@ func initConfig(configDir string) (*config.Mongo, *config.Redis, *config.Kafka, } else { minioConfig = nil } - err = config.LoadConfig(filepath.Join(configDir, cmd.ZookeeperConfigFileName), cmd.ConfigEnvPrefixMap[cmd.ZookeeperConfigFileName], zookeeperConfig) + err = config.LoadConfig(filepath.Join(configDir, cmd.DiscoveryConfigFilename), cmd.ConfigEnvPrefixMap[cmd.DiscoveryConfigFilename], discovery) if err != nil { return nil, nil, nil, nil, nil, err } - return mongoConfig, redisConfig, kafkaConfig, minioConfig, zookeeperConfig, nil + return mongoConfig, redisConfig, kafkaConfig, minioConfig, discovery, nil } func main() { @@ -127,35 +136,40 @@ func main() { } } -func performChecks(ctx context.Context, mongoConfig *config.Mongo, redisConfig *config.Redis, kafkaConfig *config.Kafka, minioConfig *config.Minio, zookeeperConfig *config.ZooKeeper, maxRetry int) error { +func performChecks(ctx context.Context, mongoConfig *config.Mongo, redisConfig *config.Redis, kafkaConfig *config.Kafka, minioConfig *config.Minio, discovery *config.Discovery, maxRetry int) error { checksDone := make(map[string]bool) - checks := map[string]func() error{ - "Zookeeper": func() error { - return CheckZookeeper(ctx, zookeeperConfig) - }, - "Mongo": func() error { + checks := map[string]func(ctx context.Context) error{ + "Mongo": func(ctx context.Context) error { return CheckMongo(ctx, mongoConfig) }, - "Redis": func() error { + "Redis": func(ctx context.Context) error { return CheckRedis(ctx, redisConfig) }, - "Kafka": func() error { + "Kafka": func(ctx context.Context) error { return CheckKafka(ctx, kafkaConfig) }, } - if minioConfig != nil { - checks["MinIO"] = func() error { + checks["MinIO"] = func(ctx context.Context) error { return CheckMinIO(ctx, minioConfig) } } + if discovery.Enable == "etcd" { + checks["Etcd"] = func(ctx context.Context) error { + return CheckEtcd(ctx, &discovery.Etcd) + } + } else if discovery.Enable == "zookeeper" { + checks["Zookeeper"] = func(ctx context.Context) error { + return CheckZookeeper(ctx, &discovery.ZooKeeper) + } + } for i := 0; i < maxRetry; i++ { allSuccess := true for name, check := range checks { if !checksDone[name] { - if err := check(); err != nil { + if err := check(ctx); err != nil { fmt.Printf("%s check failed: %v\n", name, err) allSuccess = false } else { From 2bcc65a24b15768d21796ab7844a0def3f98edd7 Mon Sep 17 00:00:00 2001 From: chao <48119764+withchao@users.noreply.github.com> Date: Wed, 15 May 2024 11:43:20 +0800 Subject: [PATCH 2/6] fix: s3 config (#2303) * fix: GroupApplicationAcceptedNotification * fix: GroupApplicationAcceptedNotification * fix: NotificationUserInfoUpdate * cicd: robot automated Change * fix: component * fix: getConversationInfo * feat: cron task * feat: cron task * feat: cron task * feat: cron task * feat: cron task * fix: minio config url recognition error * fix: SearchMessage * fix: s3 config * fix: oss panic * kafka * go.sum --------- Co-authored-by: withchao --- config/openim-rpc-third.yml | 17 +---------------- go.mod | 2 +- go.sum | 4 ++-- 3 files changed, 4 insertions(+), 19 deletions(-) diff --git a/config/openim-rpc-third.yml b/config/openim-rpc-third.yml index bb41c93ae..bde38ccc4 100644 --- a/config/openim-rpc-third.yml +++ b/config/openim-rpc-third.yml @@ -29,19 +29,4 @@ object: accessKeyID: '' accessKeySecret: '' sessionToken: '' - publicRead: false - kodo: - endpoint: "webhook://s3.cn-east-1.qiniucs.com" - bucket: "demo-9999999" - bucketURL: "webhook://your.domain.com" - accessKeyID: '' - accessKeySecret: '' - sessionToken: '' - publicRead: false - aws: - endpoint: "''" - region: "us-east-1" - bucket: "demo-9999999" - accessKeyID: '' - accessKeySecret: '' - publicRead: false + publicRead: false \ No newline at end of file diff --git a/go.mod b/go.mod index e9777eaa8..54e8a8e0e 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.65 - github.com/openimsdk/tools v0.0.49-alpha.18 + github.com/openimsdk/tools v0.0.49-alpha.19 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 3acb4709c..5611a6ca6 100644 --- a/go.sum +++ b/go.sum @@ -288,8 +288,8 @@ github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJ github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.65 h1:SPT9qyUsFRTTKSKb/FjpS+xr6sxz/Kbnu+su1bxYagc= github.com/openimsdk/protocol v0.0.65/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.49-alpha.18 h1:ARQeCiRmExvtB6XYItegThuV63JGOTxddwhSLHYXd78= -github.com/openimsdk/tools v0.0.49-alpha.18/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM= +github.com/openimsdk/tools v0.0.49-alpha.19 h1:CbASL0yefRSVAmWPVeRnhF7wZKd6umLfz31CIhEgrBs= +github.com/openimsdk/tools v0.0.49-alpha.19/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= From 13b1661a77cce14f69bf21866ebc336e2fc2895c Mon Sep 17 00:00:00 2001 From: skiffer-git <72860476+skiffer-git@users.noreply.github.com> Date: Wed, 15 May 2024 15:42:35 +0800 Subject: [PATCH 3/6] Delete .github/dependabot.yml --- .github/dependabot.yml | 59 ------------------------------------------ 1 file changed, 59 deletions(-) delete mode 100644 .github/dependabot.yml diff --git a/.github/dependabot.yml b/.github/dependabot.yml deleted file mode 100644 index 49ffd7173..000000000 --- a/.github/dependabot.yml +++ /dev/null @@ -1,59 +0,0 @@ -# Copyright © 2023 OpenIM. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# To get started with Dependabot version updates, you'll need to specify which -# package ecosystems to update and where the package manifests are located. -# Please see the documentation for all configuration options: -# https://docs.github.com/github/administering-a-repository/configuration-options-for-dependency-updates - -version: 2 -updates: - - package-ecosystem: "gomod" - directory: "/" - schedule: - interval: "daily" - time: "08:00" - labels: - - "dependencies" - commit-message: - prefix: "feat" - include: "scope" - groups: - gomod-deps: - patterns: - - "*" - - package-ecosystem: "github-actions" - directory: "/" - schedule: - interval: "daily" - time: "08:00" - labels: - - "dependencies" - commit-message: - prefix: "chore" - include: "scope" - groups: - github-actions: - patterns: - - "*" - - package-ecosystem: "docker" - directory: "/" - schedule: - interval: "daily" - time: "08:00" - labels: - - "dependencies" - commit-message: - prefix: "feat" - include: "scope" \ No newline at end of file From 814e378fa1a6f24b5c0295c6c90d6905b3f61e21 Mon Sep 17 00:00:00 2001 From: skiffer-git <72860476+skiffer-git@users.noreply.github.com> Date: Wed, 15 May 2024 15:53:48 +0800 Subject: [PATCH 4/6] Update CONTRIBUTING-zh_CN.md --- CONTRIBUTING-zh_CN.md | 46 ++++++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/CONTRIBUTING-zh_CN.md b/CONTRIBUTING-zh_CN.md index c10d7337f..47965a9f4 100644 --- a/CONTRIBUTING-zh_CN.md +++ b/CONTRIBUTING-zh_CN.md @@ -1,7 +1,9 @@ -# 如何给OpenIM贡献代码(提交pull request) + + +# 如何给 OpenIM 贡献代码(提交 Pull Request)

- Englist · + English · 中文 · Українська · Česky · @@ -28,55 +30,67 @@ Türkçe

+本指南将以 [openimsdk/open-im-server](https://github.com/openimsdk/open-im-server) 为例,详细说明如何为 OpenIM 项目贡献代码。我们采用“一问题一分支”的策略,确保每个 Issue 都对应一个专门的分支,以便有效管理代码变更。 -本指南将以 [openimsdk/open-im-server](https://github.com/openimsdk/open-im-server)为例详细说明如何为 OpenIM 项目贡献代码。我们采用“一问题一分支”的策略,确保每个 Issue 都对应一个专门的分支,以便有效管理代码变更。 - -## 1. Fork 仓库 +### 1. Fork 仓库 前往 [openimsdk/open-im-server](https://github.com/openimsdk/open-im-server) GitHub 页面,点击右上角的 "Fork" 按钮,将仓库 Fork 到你的 GitHub 账户下。 -## 2. 克隆仓库 +### 2. 克隆仓库 将你 Fork 的仓库克隆到本地: ```bash git clone https://github.com/your-username/open-im-server.git ``` -## 3. 设置远程上游 +### 3. 设置远程上游 添加原始仓库为远程上游以便跟踪其更新: ```bash git remote add upstream https://github.com/openimsdk/open-im-server.git ``` -## 4. 创建 Issue -在原始仓库中创建一个新的 Issue,详细描述你遇到的问题或希望添加的新功能。 +### 4. 创建 Issue +在原始仓库中创建一个新的 Issue,详细描述你遇到的问题或希望添加 -## 5. 创建新分支 +的新功能。 + +### 5. 创建新分支 基于主分支创建一个新分支,并使用描述性的名称与 Issue ID,例如: ```bash git checkout -b fix-bug-123 ``` -## 6. 提交更改 +### 6. 提交更改 在你的本地分支上进行更改后,提交这些更改: ```bash git add . git commit -m "Describe your changes in detail" ``` -## 7. 推送分支 +### 7. 推送分支 将你的分支推送回你的 GitHub Fork: ```bash git push origin fix-bug-123 ``` -## 8. 创建 Pull Request +### 8. 创建 Pull Request 在 GitHub 上转到你的 Fork 仓库,点击 "Pull Request" 按钮。确保 PR 描述清楚,并链接到相关的 Issue。 -## 9. 签署 CLA +### 9. 签署 CLA 如果这是你第一次提交 PR,你需要在 PR 的评论中回复: ``` I have read the CLA Document and I hereby sign the CLA ``` -## 其他说明 -如果需要将同一修改提交到两个不同的分支(例如 `main` 和 `release-v3.7`),应从对应的远程分支分别创建两个新分支。首先在一个分支上完成修改,然后使用 `cherry-pick` 命令将这些更改应用到另一个分支。之后,为每个分支独立提交 Pull Request。 +### 编程规范 +请参考以下文档以了解关于 Go 语言编程规范的详细信息: +- [Go 编码规范](https://github.com/openimsdk/open-im-server/blob/main/docs/contrib/go-code.md) +- [代码约定](https://github.com/openimsdk/open-im-server/blob/main/docs/contrib/code-conventions.md) + +### 日志规范 +- **禁止使用标准的 `log` 包**。 +- 应使用 `"github.com/openimsdk/tools/log"` 包来打印日志,该包支持多种日志级别:`debug`、`info`、`warn`、`error`。 +- **错误日志应仅在首次调用的函数中打印**,以防止日志重复,并确保错误的上下文清晰。 +### 异常及错误处理 +- **禁止使用 `panic`**:程序中不应使用 `panic`,以避免在遇到不可恢复的错误时突然终止。 +- **错误包裹**:使用 `"github.com/openimsdk/tools/errs"` 来包裹错误,保持错误信息的完整性并增加调试便利。 +- **错误传递**:如果函数本身不能处理错误,应将错误返回给调用者,而不是隐藏或忽略这些错误。 From ddf8cc98ce10dc624c4231226277405e09a39488 Mon Sep 17 00:00:00 2001 From: skiffer-git <72860476+skiffer-git@users.noreply.github.com> Date: Wed, 15 May 2024 15:56:14 +0800 Subject: [PATCH 5/6] Update CONTRIBUTING.md --- CONTRIBUTING.md | 41 +++++++++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index a85ba891f..0aa07393e 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,7 +1,7 @@ -# How do I contribute code to OpenIM +# How to Contribute to OpenIM (Submitting Pull Requests)

- Englist · + English · 中文 · Українська · Česky · @@ -27,13 +27,14 @@ Ελληνικά · Türkçe

-This guide will explain in detail how to contribute code to the OpenIM project, using `openimsdk/open-im-server` as an example. We adopt a "one issue, one branch" strategy to ensure each issue corresponds to a dedicated branch, allowing for effective management of code changes. + +This guide will use [openimsdk/open-im-server](https://github.com/openimsdk/open-im-server) as an example to explain in detail how to contribute code to the OpenIM project. We adopt a "one issue, one branch" strategy to ensure each issue corresponds to a dedicated branch for effective code change management. ### 1. Fork the Repository -Go to the `openimsdk/open-im-server` GitHub page, click the "Fork" button in the upper right corner to fork the repository to your GitHub account. +Go to the [openimsdk/open-im-server](https://github.com/openimsdk/open-im-server) GitHub page, click the "Fork" button in the upper right corner to fork the repository to your GitHub account. ### 2. Clone the Repository -Clone the forked repository to your local machine: +Clone the repository you forked to your local machine: ```bash git clone https://github.com/your-username/open-im-server.git ``` @@ -45,19 +46,21 @@ git remote add upstream https://github.com/openimsdk/open-im-server.git ``` ### 4. Create an Issue -Create a new issue in the original repository describing the problem you are facing or the new feature you want to add. For significant feature adjustments, propose an RFC issue to facilitate broad discussion and participation from community members. +Create a new issue in the original repository detailing the problem you encountered or the new feature you wish to add. ### 5. Create a New Branch -Create a new branch based on the main branch and name it descriptively, including the Issue ID, for example: +Create a new branch off the main branch with a descriptive name and Issue ID, for example: ```bash git checkout -b fix-bug-123 ``` ### 6. Commit Changes -After making changes on your local branch, commit them: +After making changes on your local branch, commit these changes: ```bash git add . -git commit -m "Describe your changes in detail" +git commit -m "Describe your changes + + in detail" ``` ### 7. Push the Branch @@ -67,15 +70,25 @@ git push origin fix-bug-123 ``` ### 8. Create a Pull Request -Go to your fork on GitHub, click the "Pull Request" button. Make sure the PR description is clear and links to the related Issue. -#### 🅰 Fixed issue #issueID +Go to your fork on GitHub and click the "Pull Request" button. Ensure the PR description is clear and links to the related issue. ### 9. Sign the CLA -If this is your first time submitting a PR, you need to reply in the PR comments: +If this is your first time submitting a PR, you will need to reply in the comments of the PR: ``` I have read the CLA Document and I hereby sign the CLA ``` -### Additional Notes -If the same modification needs to be submitted to two different branches (e.g., main and release-v3.7), create two new branches from the corresponding remote branches. First complete the modification in one branch, then use the `cherry-pick` command to apply these changes to the other branch. After that, submit a separate Pull Request for each branch. +### Programming Standards +Please refer to the following documents for detailed information on Go language programming standards: +- [Go Coding Standards](https://github.com/openimsdk/open-im-server/blob/main/docs/contrib/go-code.md) +- [Code Conventions](https://github.com/openimsdk/open-im-server/blob/main/docs/contrib/code-conventions.md) + +### Logging Standards +- **Do not use the standard `log` package**. +- Use the `"github.com/openimsdk/tools/log"` package for logging, which supports multiple log levels: `debug`, `info`, `warn`, `error`. +- **Error logs should only be printed in the function where they are first actively called** to prevent log duplication and ensure clear error context. +### Exception and Error Handling +- **Prohibit the use of `panic`**: The code should not use `panic` to avoid abrupt termination when encountering unrecoverable errors. +- **Error Wrapping**: Use `"github.com/openimsdk/tools/errs"` to wrap errors, maintaining the integrity of error information and facilitating debugging. +- **Error Propagation**: If a function cannot handle an error itself, it should return the error to the caller, rather than hiding or ignoring it. From 285523751d1d52f9bc1dc628a499e08c7ad31364 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=A3=AE=E5=B1=BF?= <42713450+memory-qianxiao@users.noreply.github.com> Date: Fri, 17 May 2024 10:43:54 +0800 Subject: [PATCH 6/6] feat: middleware authentication and whitelist are used to release permissions. (#2309) --- internal/api/router.go | 76 +++++++++++++++++++++++++----------------- 1 file changed, 45 insertions(+), 31 deletions(-) diff --git a/internal/api/router.go b/internal/api/router.go index 1fbb33b09..600567178 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -15,6 +15,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "net/http" + "strings" ) func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.Engine { @@ -25,7 +26,6 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En if v, ok := binding.Validator.Engine().(*validator.Validate); ok { _ = v.RegisterValidation("required_if", RequiredIf) } - r.Use(gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID()) // init rpc client here userRpc := rpcclient.NewUser(disCov, config.Share.RpcRegisterName.User, config.Share.RpcRegisterName.MessageGateway, config.Share.IMAdminUserID) @@ -36,37 +36,37 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En authRpc := rpcclient.NewAuth(disCov, config.Share.RpcRegisterName.Auth) thirdRpc := rpcclient.NewThird(disCov, config.Share.RpcRegisterName.Third, config.API.Prometheus.GrafanaURL) + r.Use(gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID(), GinParseToken(authRpc)) u := NewUserApi(*userRpc) m := NewMessageApi(messageRpc, userRpc, config.Share.IMAdminUserID) - ParseToken := GinParseToken(authRpc) userRouterGroup := r.Group("/user") { userRouterGroup.POST("/user_register", u.UserRegister) - userRouterGroup.POST("/update_user_info", ParseToken, u.UpdateUserInfo) - userRouterGroup.POST("/update_user_info_ex", ParseToken, u.UpdateUserInfoEx) - userRouterGroup.POST("/set_global_msg_recv_opt", ParseToken, u.SetGlobalRecvMessageOpt) - userRouterGroup.POST("/get_users_info", ParseToken, u.GetUsersPublicInfo) - userRouterGroup.POST("/get_all_users_uid", ParseToken, u.GetAllUsersID) - userRouterGroup.POST("/account_check", ParseToken, u.AccountCheck) - userRouterGroup.POST("/get_users", ParseToken, u.GetUsers) - userRouterGroup.POST("/get_users_online_status", ParseToken, u.GetUsersOnlineStatus) - userRouterGroup.POST("/get_users_online_token_detail", ParseToken, u.GetUsersOnlineTokenDetail) - userRouterGroup.POST("/subscribe_users_status", ParseToken, u.SubscriberStatus) - userRouterGroup.POST("/get_users_status", ParseToken, u.GetUserStatus) - userRouterGroup.POST("/get_subscribe_users_status", ParseToken, u.GetSubscribeUsersStatus) + userRouterGroup.POST("/update_user_info", u.UpdateUserInfo) + userRouterGroup.POST("/update_user_info_ex", u.UpdateUserInfoEx) + userRouterGroup.POST("/set_global_msg_recv_opt", u.SetGlobalRecvMessageOpt) + userRouterGroup.POST("/get_users_info", u.GetUsersPublicInfo) + userRouterGroup.POST("/get_all_users_uid", u.GetAllUsersID) + userRouterGroup.POST("/account_check", u.AccountCheck) + userRouterGroup.POST("/get_users", u.GetUsers) + userRouterGroup.POST("/get_users_online_status", u.GetUsersOnlineStatus) + userRouterGroup.POST("/get_users_online_token_detail", u.GetUsersOnlineTokenDetail) + userRouterGroup.POST("/subscribe_users_status", u.SubscriberStatus) + userRouterGroup.POST("/get_users_status", u.GetUserStatus) + userRouterGroup.POST("/get_subscribe_users_status", u.GetSubscribeUsersStatus) - userRouterGroup.POST("/process_user_command_add", ParseToken, u.ProcessUserCommandAdd) - userRouterGroup.POST("/process_user_command_delete", ParseToken, u.ProcessUserCommandDelete) - userRouterGroup.POST("/process_user_command_update", ParseToken, u.ProcessUserCommandUpdate) - userRouterGroup.POST("/process_user_command_get", ParseToken, u.ProcessUserCommandGet) - userRouterGroup.POST("/process_user_command_get_all", ParseToken, u.ProcessUserCommandGetAll) + userRouterGroup.POST("/process_user_command_add", u.ProcessUserCommandAdd) + userRouterGroup.POST("/process_user_command_delete", u.ProcessUserCommandDelete) + userRouterGroup.POST("/process_user_command_update", u.ProcessUserCommandUpdate) + userRouterGroup.POST("/process_user_command_get", u.ProcessUserCommandGet) + userRouterGroup.POST("/process_user_command_get_all", u.ProcessUserCommandGetAll) - userRouterGroup.POST("/add_notification_account", ParseToken, u.AddNotificationAccount) - userRouterGroup.POST("/update_notification_account", ParseToken, u.UpdateNotificationAccountInfo) - userRouterGroup.POST("/search_notification_account", ParseToken, u.SearchNotificationAccount) + userRouterGroup.POST("/add_notification_account", u.AddNotificationAccount) + userRouterGroup.POST("/update_notification_account", u.UpdateNotificationAccountInfo) + userRouterGroup.POST("/search_notification_account", u.SearchNotificationAccount) } // friend routing group - friendRouterGroup := r.Group("/friend", ParseToken) + friendRouterGroup := r.Group("/friend") { f := NewFriendApi(*friendRpc) friendRouterGroup.POST("/delete_friend", f.DeleteFriend) @@ -88,7 +88,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En friendRouterGroup.POST("/update_friends", f.UpdateFriends) } g := NewGroupApi(*groupRpc) - groupRouterGroup := r.Group("/group", ParseToken) + groupRouterGroup := r.Group("/group") { groupRouterGroup.POST("/create_group", g.CreateGroup) groupRouterGroup.POST("/set_group_info", g.SetGroupInfo) @@ -120,12 +120,12 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En { a := NewAuthApi(*authRpc) authRouterGroup.POST("/user_token", a.UserToken) - authRouterGroup.POST("/get_user_token", ParseToken, a.GetUserToken) + authRouterGroup.POST("/get_user_token", a.GetUserToken) authRouterGroup.POST("/parse_token", a.ParseToken) - authRouterGroup.POST("/force_logout", ParseToken, a.ForceLogout) + authRouterGroup.POST("/force_logout", a.ForceLogout) } // Third service - thirdGroup := r.Group("/third", ParseToken) + thirdGroup := r.Group("/third") { t := NewThirdApi(*thirdRpc) thirdGroup.GET("/prometheus", t.GetPrometheus) @@ -137,7 +137,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En logs.POST("/delete", t.DeleteLogs) logs.POST("/search", t.SearchLogs) - objectGroup := r.Group("/object", ParseToken) + objectGroup := r.Group("/object") objectGroup.POST("/part_limit", t.PartLimit) objectGroup.POST("/part_size", t.PartSize) @@ -150,7 +150,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En objectGroup.GET("/*name", t.ObjectRedirect) } // Message - msgGroup := r.Group("/msg", ParseToken) + msgGroup := r.Group("/msg") { msgGroup.POST("/newest_seq", m.GetSeq) msgGroup.POST("/search_msg", m.SearchMsg) @@ -174,7 +174,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En msgGroup.POST("/get_server_time", m.GetServerTime) } // Conversation - conversationGroup := r.Group("/conversation", ParseToken) + conversationGroup := r.Group("/conversation") { c := NewConversationApi(*conversationRpc) conversationGroup.POST("/get_sorted_conversation_list", c.GetSortedConversationList) @@ -185,7 +185,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En conversationGroup.POST("/get_conversation_offline_push_user_ids", c.GetConversationOfflinePushUserIDs) } - statisticsGroup := r.Group("/statistics", ParseToken) + statisticsGroup := r.Group("/statistics") { statisticsGroup.POST("/user/register", u.UserRegisterCount) statisticsGroup.POST("/user/active", m.GetActiveUser) @@ -199,6 +199,13 @@ func GinParseToken(authRPC *rpcclient.Auth) gin.HandlerFunc { return func(c *gin.Context) { switch c.Request.Method { case http.MethodPost: + for _, wApi := range Whitelist { + if strings.HasPrefix(c.Request.URL.Path, wApi) { + c.Next() + return + } + } + token := c.Request.Header.Get(constant.Token) if token == "" { log.ZWarn(c, "header get token error", servererrs.ErrArgs.WrapMsg("header must have token")) @@ -218,3 +225,10 @@ func GinParseToken(authRPC *rpcclient.Auth) gin.HandlerFunc { } } } + +// Whitelist api not parse token +var Whitelist = []string{ + "/user/user_register", + "/auth/user_token", + "/auth/parse_token", +}