From cafa12c6e4590844938ee6c8eda518f1ca519707 Mon Sep 17 00:00:00 2001 From: saeipi Date: Sun, 24 Apr 2022 19:36:15 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4=E5=A4=9A=E4=BD=99=E7=9A=84?= =?UTF-8?q?=E6=B3=A8=E9=87=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/grpc-etcdv3/getcdv3/watcher.go | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/pkg/grpc-etcdv3/getcdv3/watcher.go b/pkg/grpc-etcdv3/getcdv3/watcher.go index 7abac7c2a..ef1859cc5 100644 --- a/pkg/grpc-etcdv3/getcdv3/watcher.go +++ b/pkg/grpc-etcdv3/getcdv3/watcher.go @@ -35,15 +35,13 @@ func NewWatcher() (w *Watcher) { catalog = cfg.Config.Etcd.EtcdSchema + ":///" + cfg.Config.RpcRegisterName.OpenImOnlineMessageRelayName config = clientv3.Config{ - Endpoints: cfg.Config.Etcd.EtcdAddr, // 集群地址 - DialTimeout: time.Duration(5000) * time.Millisecond, // 连接超时 + Endpoints: cfg.Config.Etcd.EtcdAddr, + DialTimeout: time.Duration(5000) * time.Millisecond, } - // 1、建立连接 if client, err = clientv3.New(config); err != nil { panic(err.Error()) return } - // 2、得到KV和观察者 kv = clientv3.NewKV(client) watcher = clientv3.NewWatcher(client) @@ -60,7 +58,6 @@ func NewWatcher() (w *Watcher) { return } -// 监听变化 func (w *Watcher) Run() (err error) { var ( resp *clientv3.GetResponse @@ -73,7 +70,6 @@ func (w *Watcher) Run() (err error) { value string ) - // 1、get目录下的所有键值对,并且获知当前集群的revision if resp, err = w.kv.Get(context.TODO(), w.catalog, clientv3.WithPrefix()); err != nil { return } @@ -84,17 +80,13 @@ func (w *Watcher) Run() (err error) { } w.updateServices() - // 2、从该revision向后监听变化事件 go func() { - // 从GET时刻的后续版本开始监听变化 watchStartRevision = resp.Header.Revision + 1 - // 监听目录的后续变化 watchChan = w.watcher.Watch(context.TODO(), w.catalog, clientv3.WithRev(watchStartRevision), clientv3.WithPrefix()) - // 处理监听事件 for watchResp = range watchChan { for _, watchEvent = range watchResp.Events { switch watchEvent.Type { - case mvccpb.PUT: // 任务保存事件 + case mvccpb.PUT: w.rwLock.Lock() key = string(watchEvent.Kv.Key) @@ -103,7 +95,7 @@ func (w *Watcher) Run() (err error) { w.updateServices() w.rwLock.Unlock() - case mvccpb.DELETE: // 任务被删除了 + case mvccpb.DELETE: w.rwLock.Lock() key = string(watchEvent.Kv.Key)