Add etcd as a service discovery mechanism

pull/2318/head
skiffer-git 6 months ago
parent e58d8c40a4
commit f63c124f75

@ -14,7 +14,7 @@ require (
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect
github.com/mitchellh/mapstructure v1.5.0 github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.65 github.com/openimsdk/protocol v0.0.65
github.com/openimsdk/tools v0.0.49-alpha.9 github.com/openimsdk/tools v0.0.49-alpha.10
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0 github.com/stretchr/testify v1.9.0
@ -40,7 +40,6 @@ require (
github.com/shirou/gopsutil v3.21.11+incompatible github.com/shirou/gopsutil v3.21.11+incompatible
github.com/spf13/viper v1.18.2 github.com/spf13/viper v1.18.2
github.com/stathat/consistent v1.0.0 github.com/stathat/consistent v1.0.0
go.etcd.io/etcd/client/v3 v3.5.13
go.uber.org/automaxprocs v1.5.3 go.uber.org/automaxprocs v1.5.3
golang.org/x/sync v0.6.0 golang.org/x/sync v0.6.0
) )
@ -142,6 +141,7 @@ require (
github.com/yusufpapurcu/wmi v1.2.4 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.etcd.io/etcd/api/v3 v3.5.13 // indirect go.etcd.io/etcd/api/v3 v3.5.13 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.13 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.13 // indirect
go.etcd.io/etcd/client/v3 v3.5.13 // indirect
go.opencensus.io v0.24.0 // indirect go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect

@ -288,8 +288,8 @@ github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJ
github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.65 h1:SPT9qyUsFRTTKSKb/FjpS+xr6sxz/Kbnu+su1bxYagc= github.com/openimsdk/protocol v0.0.65 h1:SPT9qyUsFRTTKSKb/FjpS+xr6sxz/Kbnu+su1bxYagc=
github.com/openimsdk/protocol v0.0.65/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/protocol v0.0.65/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.49-alpha.9 h1:yoa3GS6t0d1mRv/S86niFBGDgSjy2EWWwBI5NAH1Kgk= github.com/openimsdk/tools v0.0.49-alpha.10 h1:vcfMUGBClD3TsjsTd/Wb0R2WcpjCo8hupMqreb0dXFA=
github.com/openimsdk/tools v0.0.49-alpha.9/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM= github.com/openimsdk/tools v0.0.49-alpha.10/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=

@ -16,9 +16,9 @@ package discoveryregister
import ( import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
getcd "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/etcd"
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes" "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes"
"github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery"
getcd "github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/discovery/zookeeper" "github.com/openimsdk/tools/discovery/zookeeper"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"time" "time"

@ -0,0 +1,15 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes // import "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/etcd"

@ -1,289 +0,0 @@
package etcd
import (
"context"
"fmt"
"github.com/pkg/errors"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/naming/endpoints"
"go.etcd.io/etcd/client/v3/naming/resolver"
"google.golang.org/grpc"
gresolver "google.golang.org/grpc/resolver"
"strings"
"sync"
"time"
)
// ZkOption defines a function type for modifying clientv3.Config
type ZkOption func(*clientv3.Config)
// SvcDiscoveryRegistryImpl implementation
type SvcDiscoveryRegistryImpl struct {
client *clientv3.Client
resolver gresolver.Builder
dialOptions []grpc.DialOption
serviceKey string
endpointMgr endpoints.Manager
leaseID clientv3.LeaseID
rpcRegisterTarget string
rootDirectory string
mu sync.RWMutex
connMap map[string][]*grpc.ClientConn
}
// NewSvcDiscoveryRegistry creates a new service discovery registry implementation
func NewSvcDiscoveryRegistry(rootDirectory string, endpoints []string, options ...ZkOption) (*SvcDiscoveryRegistryImpl, error) {
cfg := clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
// Increase keep-alive queue capacity and message size
PermitWithoutStream: true,
MaxCallSendMsgSize: 10 * 1024 * 1024, // 10 MB
}
// Apply provided options to the config
for _, opt := range options {
opt(&cfg)
}
client, err := clientv3.New(cfg)
if err != nil {
return nil, err
}
r, err := resolver.NewBuilder(client)
if err != nil {
return nil, err
}
s := &SvcDiscoveryRegistryImpl{
client: client,
resolver: r,
rootDirectory: rootDirectory,
connMap: make(map[string][]*grpc.ClientConn),
}
go s.watchServiceChanges()
return s, nil
}
// WithDialTimeout sets a custom dial timeout for the etcd client
func WithDialTimeout(timeout time.Duration) ZkOption {
return func(cfg *clientv3.Config) {
cfg.DialTimeout = timeout
}
}
// WithMaxCallSendMsgSize sets a custom max call send message size for the etcd client
func WithMaxCallSendMsgSize(size int) ZkOption {
return func(cfg *clientv3.Config) {
cfg.MaxCallSendMsgSize = size
}
}
// WithUsernameAndPassword sets a username and password for the etcd client
func WithUsernameAndPassword(username, password string) ZkOption {
return func(cfg *clientv3.Config) {
cfg.Username = username
cfg.Password = password
}
}
// GetUserIdHashGatewayHost returns the gateway host for a given user ID hash
func (r *SvcDiscoveryRegistryImpl) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) {
return "", nil
}
// GetConns returns gRPC client connections for a given service name
func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
fullServiceKey := fmt.Sprintf("%s/%s", r.rootDirectory, serviceName)
r.mu.RLock()
defer r.mu.RUnlock()
return r.connMap[fullServiceKey], nil
}
// GetConn returns a single gRPC client connection for a given service name
func (r *SvcDiscoveryRegistryImpl) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
target := fmt.Sprintf("etcd:///%s/%s", r.rootDirectory, serviceName)
return grpc.DialContext(ctx, target, append(append(r.dialOptions, opts...), grpc.WithResolvers(r.resolver))...)
}
// GetSelfConnTarget returns the connection target for the current service
func (r *SvcDiscoveryRegistryImpl) GetSelfConnTarget() string {
return r.rpcRegisterTarget
}
// AddOption appends gRPC dial options to the existing options
func (r *SvcDiscoveryRegistryImpl) AddOption(opts ...grpc.DialOption) {
r.dialOptions = append(r.dialOptions, opts...)
}
// CloseConn closes a given gRPC client connection
func (r *SvcDiscoveryRegistryImpl) CloseConn(conn *grpc.ClientConn) {
conn.Close()
}
// Register registers a new service endpoint with etcd
func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
r.serviceKey = fmt.Sprintf("%s/%s/%s:%d", r.rootDirectory, serviceName, host, port)
em, err := endpoints.NewManager(r.client, r.rootDirectory+"/"+serviceName)
if err != nil {
return err
}
r.endpointMgr = em
leaseResp, err := r.client.Grant(context.Background(), 30) //
if err != nil {
return err
}
r.leaseID = leaseResp.ID
r.rpcRegisterTarget = fmt.Sprintf("%s:%d", host, port)
endpoint := endpoints.Endpoint{Addr: r.rpcRegisterTarget}
err = em.AddEndpoint(context.TODO(), r.serviceKey, endpoint, clientv3.WithLease(leaseResp.ID))
if err != nil {
return err
}
go r.keepAliveLease(r.leaseID)
return nil
}
// keepAliveLease maintains the lease alive by sending keep-alive requests
func (r *SvcDiscoveryRegistryImpl) keepAliveLease(leaseID clientv3.LeaseID) {
ch, err := r.client.KeepAlive(context.Background(), leaseID)
if err != nil {
return
}
for ka := range ch {
if ka != nil {
} else {
return
}
}
}
// watchServiceChanges watches for changes in the service directory
func (r *SvcDiscoveryRegistryImpl) watchServiceChanges() {
watchChan := r.client.Watch(context.Background(), r.rootDirectory, clientv3.WithPrefix())
for watchResp := range watchChan {
updatedPrefixes := make(map[string]struct{}) // Create a set to track updated prefixes
for _, event := range watchResp.Events {
prefix, _ := r.splitEndpoint(string(event.Kv.Key))
if _, alreadyUpdated := updatedPrefixes[prefix]; !alreadyUpdated {
updatedPrefixes[prefix] = struct{}{} // Mark this prefix as updated
r.refreshConnMap(prefix)
}
}
}
}
// refreshConnMap fetches the latest endpoints and updates the local map
func (r *SvcDiscoveryRegistryImpl) refreshConnMap(prefix string) {
r.mu.Lock()
defer r.mu.Unlock()
fullPrefix := fmt.Sprintf("%s/", prefix)
resp, err := r.client.Get(context.Background(), fullPrefix, clientv3.WithPrefix())
if err != nil {
return
}
r.connMap[prefix] = []*grpc.ClientConn{} // Update the connMap with new connections
for _, kv := range resp.Kvs {
_, addr := r.splitEndpoint(string(kv.Key))
conn, err := grpc.DialContext(context.Background(), addr, append(r.dialOptions, grpc.WithResolvers(r.resolver))...)
if err != nil {
continue
}
r.connMap[prefix] = append(r.connMap[prefix], conn)
}
}
// splitEndpoint splits the endpoint string into prefix and address
func (r *SvcDiscoveryRegistryImpl) splitEndpoint(input string) (string, string) {
lastSlashIndex := strings.LastIndex(input, "/")
if lastSlashIndex != -1 {
part1 := input[:lastSlashIndex]
part2 := input[lastSlashIndex+1:]
return part1, part2
}
return input, ""
}
// UnRegister removes the service endpoint from etcd
func (r *SvcDiscoveryRegistryImpl) UnRegister() error {
if r.endpointMgr == nil {
return fmt.Errorf("endpoint manager is not initialized")
}
err := r.endpointMgr.DeleteEndpoint(context.TODO(), r.serviceKey)
if err != nil {
return err
}
return nil
}
// Close closes the etcd client connection
func (r *SvcDiscoveryRegistryImpl) Close() {
if r.client != nil {
_ = r.client.Close()
}
r.mu.Lock()
defer r.mu.Unlock()
}
// Check verifies if etcd is running by checking the existence of the root node and optionally creates it with a lease
func Check(ctx context.Context, etcdServers []string, etcdRoot string, createIfNotExist bool, options ...ZkOption) error {
cfg := clientv3.Config{
Endpoints: etcdServers,
}
for _, opt := range options {
opt(&cfg)
}
client, err := clientv3.New(cfg)
if err != nil {
return errors.Wrap(err, "failed to connect to etcd")
}
defer client.Close()
var opCtx context.Context
var cancel context.CancelFunc
if cfg.DialTimeout != 0 {
opCtx, cancel = context.WithTimeout(ctx, cfg.DialTimeout)
} else {
opCtx, cancel = context.WithTimeout(ctx, 10*time.Second)
}
defer cancel()
resp, err := client.Get(opCtx, etcdRoot)
if err != nil {
return errors.Wrap(err, "failed to get the root node from etcd")
}
if len(resp.Kvs) == 0 {
if createIfNotExist {
var leaseTTL int64 = 10
var leaseResp *clientv3.LeaseGrantResponse
if leaseTTL > 0 {
leaseResp, err = client.Grant(opCtx, leaseTTL)
if err != nil {
return errors.Wrap(err, "failed to create lease in etcd")
}
}
putOpts := []clientv3.OpOption{}
if leaseResp != nil {
putOpts = append(putOpts, clientv3.WithLease(leaseResp.ID))
}
_, err := client.Put(opCtx, etcdRoot, "", putOpts...)
if err != nil {
return errors.Wrap(err, "failed to create the root node in etcd")
}
} else {
return fmt.Errorf("root node %s does not exist in etcd", etcdRoot)
}
}
return nil
}
Loading…
Cancel
Save