Add etcd as a service discovery mechanism

pull/2318/head
skiffer-git 6 months ago
parent 188f27b0f6
commit e58d8c40a4

@ -2,7 +2,6 @@ package push
import ( import (
"context" "context"
"fmt"
"github.com/openimsdk/protocol/msggateway" "github.com/openimsdk/protocol/msggateway"
"github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery"
@ -82,8 +81,6 @@ func (d *DefaultAllNode) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.M
// Online push message // Online push message
for _, conn := range conns { for _, conn := range conns {
fmt.Println(ctx, "get gateway conn detail ", "conn ", *conn)
conn := conn // loop var safe conn := conn // loop var safe
wg.Go(func() error { wg.Go(func() error {
msgClient := msggateway.NewMsgGatewayClient(conn) msgClient := msggateway.NewMsgGatewayClient(conn)

@ -97,10 +97,8 @@ func (r *SvcDiscoveryRegistryImpl) GetUserIdHashGatewayHost(ctx context.Context,
// GetConns returns gRPC client connections for a given service name // GetConns returns gRPC client connections for a given service name
func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
fullServiceKey := fmt.Sprintf("%s/%s", r.rootDirectory, serviceName) fullServiceKey := fmt.Sprintf("%s/%s", r.rootDirectory, serviceName)
r.mu.RLock() r.mu.RLock()
defer r.mu.RUnlock() defer r.mu.RUnlock()
fmt.Printf("all conns ", serviceName, r.connMap[fullServiceKey])
return r.connMap[fullServiceKey], nil return r.connMap[fullServiceKey], nil
} }
@ -122,9 +120,7 @@ func (r *SvcDiscoveryRegistryImpl) AddOption(opts ...grpc.DialOption) {
// CloseConn closes a given gRPC client connection // CloseConn closes a given gRPC client connection
func (r *SvcDiscoveryRegistryImpl) CloseConn(conn *grpc.ClientConn) { func (r *SvcDiscoveryRegistryImpl) CloseConn(conn *grpc.ClientConn) {
if err := conn.Close(); err != nil { conn.Close()
fmt.Printf("Failed to close connection: %v\n", err)
}
} }
// Register registers a new service endpoint with etcd // Register registers a new service endpoint with etcd
@ -136,7 +132,7 @@ func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int,
} }
r.endpointMgr = em r.endpointMgr = em
leaseResp, err := r.client.Grant(context.Background(), 60) // Increase TTL time leaseResp, err := r.client.Grant(context.Background(), 30) //
if err != nil { if err != nil {
return err return err
} }
@ -158,14 +154,11 @@ func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int,
func (r *SvcDiscoveryRegistryImpl) keepAliveLease(leaseID clientv3.LeaseID) { func (r *SvcDiscoveryRegistryImpl) keepAliveLease(leaseID clientv3.LeaseID) {
ch, err := r.client.KeepAlive(context.Background(), leaseID) ch, err := r.client.KeepAlive(context.Background(), leaseID)
if err != nil { if err != nil {
fmt.Printf("Failed to keep lease alive: %v\n", err)
return return
} }
for ka := range ch { for ka := range ch {
if ka != nil { if ka != nil {
} else { } else {
fmt.Printf("Lease keep-alive response channel closed\n")
return return
} }
} }
@ -175,10 +168,14 @@ func (r *SvcDiscoveryRegistryImpl) keepAliveLease(leaseID clientv3.LeaseID) {
func (r *SvcDiscoveryRegistryImpl) watchServiceChanges() { func (r *SvcDiscoveryRegistryImpl) watchServiceChanges() {
watchChan := r.client.Watch(context.Background(), r.rootDirectory, clientv3.WithPrefix()) watchChan := r.client.Watch(context.Background(), r.rootDirectory, clientv3.WithPrefix())
for watchResp := range watchChan { for watchResp := range watchChan {
updatedPrefixes := make(map[string]struct{}) // Create a set to track updated prefixes
for _, event := range watchResp.Events { for _, event := range watchResp.Events {
prefix, _ := r.splitEndpoint(string(event.Kv.Key)) prefix, _ := r.splitEndpoint(string(event.Kv.Key))
fmt.Printf("Change detected for prefix: %s\n", prefix) if _, alreadyUpdated := updatedPrefixes[prefix]; !alreadyUpdated {
r.refreshConnMap(prefix) updatedPrefixes[prefix] = struct{}{} // Mark this prefix as updated
r.refreshConnMap(prefix)
}
} }
} }
} }
@ -191,7 +188,6 @@ func (r *SvcDiscoveryRegistryImpl) refreshConnMap(prefix string) {
fullPrefix := fmt.Sprintf("%s/", prefix) fullPrefix := fmt.Sprintf("%s/", prefix)
resp, err := r.client.Get(context.Background(), fullPrefix, clientv3.WithPrefix()) resp, err := r.client.Get(context.Background(), fullPrefix, clientv3.WithPrefix())
if err != nil { if err != nil {
fmt.Printf("Failed to get endpoints: %v\n", err)
return return
} }
r.connMap[prefix] = []*grpc.ClientConn{} // Update the connMap with new connections r.connMap[prefix] = []*grpc.ClientConn{} // Update the connMap with new connections
@ -199,7 +195,6 @@ func (r *SvcDiscoveryRegistryImpl) refreshConnMap(prefix string) {
_, addr := r.splitEndpoint(string(kv.Key)) _, addr := r.splitEndpoint(string(kv.Key))
conn, err := grpc.DialContext(context.Background(), addr, append(r.dialOptions, grpc.WithResolvers(r.resolver))...) conn, err := grpc.DialContext(context.Background(), addr, append(r.dialOptions, grpc.WithResolvers(r.resolver))...)
if err != nil { if err != nil {
fmt.Printf("Failed to dial new endpoint: %v\n", err)
continue continue
} }
r.connMap[prefix] = append(r.connMap[prefix], conn) r.connMap[prefix] = append(r.connMap[prefix], conn)
@ -244,11 +239,9 @@ func Check(ctx context.Context, etcdServers []string, etcdRoot string, createIfN
cfg := clientv3.Config{ cfg := clientv3.Config{
Endpoints: etcdServers, Endpoints: etcdServers,
} }
for _, opt := range options { for _, opt := range options {
opt(&cfg) opt(&cfg)
} }
client, err := clientv3.New(cfg) client, err := clientv3.New(cfg)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to connect to etcd") return errors.Wrap(err, "failed to connect to etcd")
@ -279,7 +272,6 @@ func Check(ctx context.Context, etcdServers []string, etcdRoot string, createIfN
return errors.Wrap(err, "failed to create lease in etcd") return errors.Wrap(err, "failed to create lease in etcd")
} }
} }
putOpts := []clientv3.OpOption{} putOpts := []clientv3.OpOption{}
if leaseResp != nil { if leaseResp != nil {
putOpts = append(putOpts, clientv3.WithLease(leaseResp.ID)) putOpts = append(putOpts, clientv3.WithLease(leaseResp.ID))
@ -289,13 +281,9 @@ func Check(ctx context.Context, etcdServers []string, etcdRoot string, createIfN
if err != nil { if err != nil {
return errors.Wrap(err, "failed to create the root node in etcd") return errors.Wrap(err, "failed to create the root node in etcd")
} }
fmt.Printf("Root node %s did not exist, but has been created.\n", etcdRoot)
} else { } else {
return fmt.Errorf("root node %s does not exist in etcd", etcdRoot) return fmt.Errorf("root node %s does not exist in etcd", etcdRoot)
} }
} else {
fmt.Printf("Etcd is running and the root node %s exists.\n", etcdRoot)
} }
return nil return nil
} }

Loading…
Cancel
Save