From 6080042e26473a2a818de647d177603772ff89c5 Mon Sep 17 00:00:00 2001 From: dongming Date: Tue, 13 Dec 2022 16:23:04 +0800 Subject: [PATCH] l-20 --- .../k8s.io/client-go/tools/cache/reflector.go | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/vendor/k8s.io/client-go/tools/cache/reflector.go b/vendor/k8s.io/client-go/tools/cache/reflector.go index d2d21ca..4e4be02 100644 --- a/vendor/k8s.io/client-go/tools/cache/reflector.go +++ b/vendor/k8s.io/client-go/tools/cache/reflector.go @@ -317,8 +317,11 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { }() // 执行 watch 操作 + + // 创建 retry 对象,来控制下面的循环行为 retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock) for { + // 控制循环退出的逻辑,只有发送stop信号,才能退出 *是说仅仅这里的控制* // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors select { case <-stopCh: @@ -340,6 +343,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent start := r.clock.Now() + + // 执行外部传进来对象的 watch 操作。返回的是一个对象,这个对象可以被看作一个进程,他会一直watch着变化的事件,供后续可以不断从中读出事件 w, err := r.listerWatcher.Watch(options) if err != nil { // If this is "connection refused" error, it means that most likely apiserver is not responsive. @@ -348,12 +353,14 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { // If that's the case begin exponentially backing off and resend watch request. // Do the same for "429" errors. if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) { + // 如果时上面的两种错误之一,则会重新进入循环,但是要等待一定的时间 <-r.initConnBackoffManager.Backoff().C() continue } return err } + // 处理watch对象的函数,主要是不断的读出event err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh) retry.After(err) if err != nil { @@ -536,34 +543,48 @@ loop: for { select { case <-stopCh: + // 如果是收到结束信号,返回的这个错误,在外面调用方,不会判断其他错误,直接退出 return errorStopRequested case err := <-errc: return err case event, ok := <-w.ResultChan(): + // 获取 watch 对象中的事件 + // 如果 ok 时 false, 则时应为 watch 被关闭,而不是读出了event if !ok { break loop } + + // 这里时说 event 的错误类型时 watch.Error,并不是获取event的时候出错 if event.Type == watch.Error { return apierrors.FromObject(event.Object) } + + // 期望的类型和 event的类型 是否相同,不同的时候会产生错误,并跳过后面的执行,开始下次循环 if expectedType != nil { if e, a := expectedType, reflect.TypeOf(event.Object); e != a { utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a)) continue } } + + // 期望的 GVK 和 event的 GVK 是否相同,不同的时候会产生错误,并跳过后面的执行,开始下次循环 if expectedGVK != nil { if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a { utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a)) continue } } + + // 备份 resourceVersion + // 创建一个提取器 meta, err := meta.Accessor(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event)) continue } resourceVersion := meta.GetResourceVersion() + + // 根据不同的事件类型做不同的处理 switch event.Type { case watch.Added: err := store.Add(event.Object) @@ -596,6 +617,8 @@ loop: } } + // 到这里 watch 操作就结束了 + watchDuration := clock.Since(start) if watchDuration < 1*time.Second && eventCount == 0 { return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name)