diff --git a/pkg/discoveryregistry/zookeeper/discover.go b/pkg/discoveryregistry/zookeeper/discover.go index 6c47c1674..7d3cab77a 100644 --- a/pkg/discoveryregistry/zookeeper/discover.go +++ b/pkg/discoveryregistry/zookeeper/discover.go @@ -5,8 +5,8 @@ import ( "fmt" "io" "strings" - "sync" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/pkg/errors" "github.com/go-zookeeper/zk" @@ -17,7 +17,7 @@ import ( var ErrConnIsNil = errors.New("conn is nil") var ErrConnIsNilButLocalNotNil = errors.New("conn is nil, but local is not nil") -func (s *ZkClient) watch(wg *sync.WaitGroup) { +func (s *ZkClient) watch() { for { event := <-s.eventChan switch event.Type { @@ -27,12 +27,9 @@ func (s *ZkClient) watch(wg *sync.WaitGroup) { s.logger.Printf("zk event: %s", event.Path) l := strings.Split(event.Path, "/") if len(l) > 1 { + serviceName := l[len(l)-1] s.lock.Lock() - rpcName := l[len(l)-1] - s.flushResolver(rpcName) - if len(s.localConns[rpcName]) != 0 { - delete(s.localConns, rpcName) - } + s.flushResolverAndDeleteLocal(serviceName) s.lock.Unlock() } s.logger.Printf("zk event handle success: %s", event.Path) @@ -60,9 +57,10 @@ func (s *ZkClient) GetConnsRemote(serviceName string) (conns []resolver.Address, } return nil, errors.Wrap(err, "get children error") } + log.ZDebug(context.Background(), "get conns from remote", "conn", data) conns = append(conns, resolver.Address{Addr: string(data), ServerName: serviceName}) } - _, _, _, err = s.conn.ChildrenW(s.getPath(serviceName)) + _, _, _, err = s.conn.ChildrenW(path) if err != nil { return nil, errors.Wrap(err, "children watch error") } @@ -75,7 +73,6 @@ func (s *ZkClient) GetConnsRemote(serviceName string) (conns []resolver.Address, func (s *ZkClient) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]grpc.ClientConnInterface, error) { s.logger.Printf("get conns from client, serviceName: %s", serviceName) s.lock.Lock() - defer s.lock.Unlock() opts = append(s.options, opts...) conns := s.localConns[serviceName] if len(conns) == 0 { @@ -83,10 +80,12 @@ func (s *ZkClient) GetConns(ctx context.Context, serviceName string, opts ...grp s.logger.Printf("get conns from zk remote, serviceName: %s", serviceName) conns, err = s.GetConnsRemote(serviceName) if err != nil { + s.lock.Unlock() return nil, err } s.localConns[serviceName] = conns } + s.lock.Unlock() var ret []grpc.ClientConnInterface s.logger.Printf("get conns from zk success, serviceName: %s", serviceName) for _, conn := range conns { @@ -94,7 +93,7 @@ func (s *ZkClient) GetConns(ctx context.Context, serviceName string, opts ...grp if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("conns dialContext error, conn: %s", conn.Addr)) } - ret = append(ret, newClientConnInterface(cc)) + ret = append(ret, cc) } s.logger.Printf("dial ctx success, serviceName: %s", serviceName) return ret, nil @@ -102,11 +101,7 @@ func (s *ZkClient) GetConns(ctx context.Context, serviceName string, opts ...grp func (s *ZkClient) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (grpc.ClientConnInterface, error) { newOpts := append(s.options, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, s.balancerName))) - cc, err := grpc.DialContext(ctx, fmt.Sprintf("%s:///%s", s.scheme, serviceName), append(newOpts, opts...)...) - if err != nil { - return nil, err - } - return newClientConnInterface(cc), nil + return grpc.DialContext(ctx, fmt.Sprintf("%s:///%s", s.scheme, serviceName), append(newOpts, opts...)...) } func (s *ZkClient) CloseConn(conn grpc.ClientConnInterface) { diff --git a/pkg/discoveryregistry/zookeeper/resolver.go b/pkg/discoveryregistry/zookeeper/resolver.go index 9430b0d4d..f41666e15 100644 --- a/pkg/discoveryregistry/zookeeper/resolver.go +++ b/pkg/discoveryregistry/zookeeper/resolver.go @@ -27,6 +27,7 @@ func (r *Resolver) ResolveNow(o resolver.ResolveNowOptions) { r.addrs = newConns if err := r.cc.UpdateState(resolver.State{Addresses: newConns}); err != nil { log.ZError(context.Background(), "UpdateState error", err, "conns", newConns) + return } log.ZDebug(context.Background(), "resolve now finished", "target", r.target, "conns", r.addrs) } diff --git a/pkg/discoveryregistry/zookeeper/zk.go b/pkg/discoveryregistry/zookeeper/zk.go index 06cf199d8..423d0b6c1 100644 --- a/pkg/discoveryregistry/zookeeper/zk.go +++ b/pkg/discoveryregistry/zookeeper/zk.go @@ -91,7 +91,6 @@ func NewClient(zkServers []string, zkRoot string, options ...ZkOption) (*ZkClien timeout: timeout, localConns: make(map[string][]resolver.Address), resolvers: make(map[string]*Resolver), - lock: &sync.Mutex{}, } client.ticker = time.NewTicker(defaultFreq) for _, option := range options { @@ -114,9 +113,8 @@ func NewClient(zkServers []string, zkRoot string, options ...ZkOption) (*ZkClien return nil, err } resolver.Register(client) - var wg sync.WaitGroup - go client.refresh(&wg) - go client.watch(&wg) + go client.refresh() + go client.watch() return client, nil } @@ -138,7 +136,7 @@ func (s *ZkClient) ensureAndCreate(node string) error { return nil } -func (s *ZkClient) refresh(wg *sync.WaitGroup) { +func (s *ZkClient) refresh() { for range s.ticker.C { s.logger.Printf("refresh local conns") s.lock.Lock() @@ -149,12 +147,17 @@ func (s *ZkClient) refresh(wg *sync.WaitGroup) { delete(s.localConns, rpcName) } s.lock.Unlock() + s.logger.Printf("refresh local conns success") } +} +func (s *ZkClient) flushResolverAndDeleteLocal(serviceName string) { + s.logger.Printf("start flush %s", serviceName) + s.flushResolver(serviceName) + delete(s.localConns, serviceName) } func (s *ZkClient) flushResolver(serviceName string) { - s.logger.Printf("start flush") r, ok := s.resolvers[serviceName] if ok { r.ResolveNow(resolver.ResolveNowOptions{}) @@ -197,8 +200,3 @@ func (s *ZkClient) AddOption(opts ...grpc.DialOption) { func (s *ZkClient) GetClientLocalConns() map[string][]resolver.Address { return s.localConns } - -type FakeLock struct{} - -func (s *FakeLock) Lock() {} -func (s *FakeLock) Unlock() {}