pull/458/head
withchao 2 years ago
parent 4b0389e0f5
commit 512e6717fc

@ -27,13 +27,13 @@ func (s *ZkClient) watch(wg *sync.WaitGroup) {
s.logger.Printf("zk event: %s", event.Path)
l := strings.Split(event.Path, "/")
if len(l) > 1 {
s.lock.Lock()
//s.lock.Lock()
rpcName := l[len(l)-1]
s.flushResolver(rpcName)
if len(s.localConns[rpcName]) != 0 {
delete(s.localConns, rpcName)
}
s.lock.Unlock()
//s.lock.Unlock()
}
s.logger.Printf("zk event handle success: %s", event.Path)
case zk.EventNodeDataChanged:
@ -74,8 +74,8 @@ func (s *ZkClient) GetConnsRemote(serviceName string) (conns []resolver.Address,
func (s *ZkClient) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]grpc.ClientConnInterface, error) {
s.logger.Printf("get conns from client, serviceName: %s", serviceName)
s.lock.Lock()
defer s.lock.Unlock()
//s.lock.Lock()
//defer s.lock.Unlock()
opts = append(s.options, opts...)
conns := s.localConns[serviceName]
if len(conns) == 0 {

@ -9,8 +9,6 @@ import (
)
func (s *ZkClient) Register(rpcRegisterName, host string, port int, opts ...grpc.DialOption) error {
// s.lock.Lock()
// defer s.lock.Unlock()
if err := s.ensureName(rpcRegisterName); err != nil {
return err
}
@ -28,8 +26,8 @@ func (s *ZkClient) Register(rpcRegisterName, host string, port int, opts ...grpc
}
func (s *ZkClient) UnRegister() error {
s.lock.Lock()
defer s.lock.Unlock()
//s.lock.Lock()
//defer s.lock.Unlock()
err := s.conn.Delete(s.node, -1)
if err != nil {
return err

@ -39,8 +39,8 @@ func (s *ZkClient) Build(target resolver.Target, cc resolver.ClientConn, opts re
r.cc = cc
r.getConnsRemote = s.GetConnsRemote
r.ResolveNow(resolver.ResolveNowOptions{})
s.lock.Lock()
defer s.lock.Unlock()
//s.lock.Lock()
//defer s.lock.Unlock()
s.resolvers[strings.TrimLeft(target.URL.Path, "/")] = r
s.logger.Printf("build resolver finished: %+v, cc: %+v", target, cc)
return r, nil

@ -34,7 +34,7 @@ type ZkClient struct {
node string
ticker *time.Ticker
lock Lock
//lock sync.Locker
options []grpc.DialOption
resolvers map[string]*Resolver
@ -91,7 +91,7 @@ func NewClient(zkServers []string, zkRoot string, options ...ZkOption) (*ZkClien
timeout: timeout,
localConns: make(map[string][]resolver.Address),
resolvers: make(map[string]*Resolver),
lock: &FakeLock{},
//lock: &sync.Mutex{},
}
client.ticker = time.NewTicker(defaultFreq)
for _, option := range options {
@ -141,14 +141,14 @@ func (s *ZkClient) ensureAndCreate(node string) error {
func (s *ZkClient) refresh(wg *sync.WaitGroup) {
for range s.ticker.C {
s.logger.Printf("refresh local conns")
s.lock.Lock()
//s.lock.Lock()
for rpcName := range s.resolvers {
s.flushResolver(rpcName)
}
for rpcName := range s.localConns {
delete(s.localConns, rpcName)
}
s.lock.Unlock()
//s.lock.Unlock()
}
}
@ -198,12 +198,7 @@ func (s *ZkClient) GetClientLocalConns() map[string][]resolver.Address {
return s.localConns
}
type Lock interface {
Lock()
Unlock()
}
type FakeLock struct{}
func (s *FakeLock) Lock() {}
func (s *FakeLock) Unlock() {}
//type FakeLock struct{}
//
//func (s *FakeLock) Lock() {}
//func (s *FakeLock) Unlock() {}

Loading…
Cancel
Save