You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Open-IM-Server/pkg/common/discoveryregister/etcd/etcd.go

114 lines
3.2 KiB

package etcd
import (
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/naming/endpoints"
"go.etcd.io/etcd/client/v3/naming/resolver"
"google.golang.org/grpc"
gresolver "google.golang.org/grpc/resolver"
"log"
"time"
)
// SvcDiscoveryRegistryImpl implementation
type SvcDiscoveryRegistryImpl struct {
client *clientv3.Client
resolver gresolver.Builder
dialOptions []grpc.DialOption
serviceKey string
endpointMgr endpoints.Manager
leaseID clientv3.LeaseID
schema string
}
func NewSvcDiscoveryRegistry(schema string, endpoints []string) (*SvcDiscoveryRegistryImpl, error) {
cfg := clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
}
client, err := clientv3.New(cfg)
if err != nil {
return nil, err
}
r, err := resolver.NewBuilder(client)
if err != nil {
return nil, err
}
return &SvcDiscoveryRegistryImpl{
client: client,
resolver: r,
schema: schema,
}, nil
}
func (r *SvcDiscoveryRegistryImpl) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) {
return "", nil
}
func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
target := fmt.Sprintf("%s:///%s", r.schema, serviceName)
conn, err := grpc.DialContext(ctx, target, append(append(r.dialOptions, opts...), grpc.WithResolvers(r.resolver))...)
if err != nil {
return nil, err
}
return []*grpc.ClientConn{conn}, nil
}
func (r *SvcDiscoveryRegistryImpl) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
target := fmt.Sprintf("%s:///%s", r.schema, serviceName)
return grpc.DialContext(ctx, target, append(append(r.dialOptions, opts...), grpc.WithResolvers(r.resolver))...)
}
func (r *SvcDiscoveryRegistryImpl) GetSelfConnTarget() string {
return fmt.Sprintf("%s:///%s", r.schema, r.serviceKey)
}
func (r *SvcDiscoveryRegistryImpl) AddOption(opts ...grpc.DialOption) {
r.dialOptions = append(r.dialOptions, opts...)
}
func (r *SvcDiscoveryRegistryImpl) CloseConn(conn *grpc.ClientConn) {
if err := conn.Close(); err != nil {
log.Printf("Failed to close connection: %v", err)
}
}
func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
r.serviceKey = fmt.Sprintf("%s/%s-%d", serviceName, host, port)
em, err := endpoints.NewManager(r.client, serviceName)
if err != nil {
return err
}
r.endpointMgr = em
leaseResp, err := r.client.Grant(context.Background(), 30)
if err != nil {
return err
}
r.leaseID = leaseResp.ID
endpoint := endpoints.Endpoint{Addr: fmt.Sprintf("%s:%d", host, port)}
err = em.AddEndpoint(context.TODO(), r.serviceKey, endpoint, clientv3.WithLease(leaseResp.ID))
if err != nil {
return err
}
_, kaErr := r.client.KeepAlive(context.Background(), r.leaseID)
return kaErr
}
func (r *SvcDiscoveryRegistryImpl) UnRegister() error {
if r.endpointMgr == nil {
return fmt.Errorf("endpoint manager is not initialized")
}
return r.endpointMgr.DeleteEndpoint(context.TODO(), r.serviceKey)
}
func (r *SvcDiscoveryRegistryImpl) Close() {
if r.client != nil {
_ = r.client.Close()
}
}