master
dongming 2 years ago
parent f34bc6bc13
commit 6080042e26

@ -317,8 +317,11 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
}() }()
// 执行 watch 操作 // 执行 watch 操作
// 创建 retry 对象,来控制下面的循环行为
retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock) retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
for { for {
// 控制循环退出的逻辑只有发送stop信号才能退出 *是说仅仅这里的控制*
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
select { select {
case <-stopCh: case <-stopCh:
@ -340,6 +343,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
start := r.clock.Now() start := r.clock.Now()
// 执行外部传进来对象的 watch 操作。返回的是一个对象这个对象可以被看作一个进程他会一直watch着变化的事件供后续可以不断从中读出事件
w, err := r.listerWatcher.Watch(options) w, err := r.listerWatcher.Watch(options)
if err != nil { if err != nil {
// If this is "connection refused" error, it means that most likely apiserver is not responsive. // If this is "connection refused" error, it means that most likely apiserver is not responsive.
@ -348,12 +353,14 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// If that's the case begin exponentially backing off and resend watch request. // If that's the case begin exponentially backing off and resend watch request.
// Do the same for "429" errors. // Do the same for "429" errors.
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) { if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
// 如果时上面的两种错误之一,则会重新进入循环,但是要等待一定的时间
<-r.initConnBackoffManager.Backoff().C() <-r.initConnBackoffManager.Backoff().C()
continue continue
} }
return err return err
} }
// 处理watch对象的函数主要是不断的读出event
err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh) err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh)
retry.After(err) retry.After(err)
if err != nil { if err != nil {
@ -536,34 +543,48 @@ loop:
for { for {
select { select {
case <-stopCh: case <-stopCh:
// 如果是收到结束信号,返回的这个错误,在外面调用方,不会判断其他错误,直接退出
return errorStopRequested return errorStopRequested
case err := <-errc: case err := <-errc:
return err return err
case event, ok := <-w.ResultChan(): case event, ok := <-w.ResultChan():
// 获取 watch 对象中的事件
// 如果 ok 时 false 则时应为 watch 被关闭而不是读出了event
if !ok { if !ok {
break loop break loop
} }
// 这里时说 event 的错误类型时 watch.Error并不是获取event的时候出错
if event.Type == watch.Error { if event.Type == watch.Error {
return apierrors.FromObject(event.Object) return apierrors.FromObject(event.Object)
} }
// 期望的类型和 event的类型 是否相同,不同的时候会产生错误,并跳过后面的执行,开始下次循环
if expectedType != nil { if expectedType != nil {
if e, a := expectedType, reflect.TypeOf(event.Object); e != a { if e, a := expectedType, reflect.TypeOf(event.Object); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a)) utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))
continue continue
} }
} }
// 期望的 GVK 和 event的 GVK 是否相同,不同的时候会产生错误,并跳过后面的执行,开始下次循环
if expectedGVK != nil { if expectedGVK != nil {
if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a { if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a)) utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a))
continue continue
} }
} }
// 备份 resourceVersion
// 创建一个提取器
meta, err := meta.Accessor(event.Object) meta, err := meta.Accessor(event.Object)
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event)) utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
continue continue
} }
resourceVersion := meta.GetResourceVersion() resourceVersion := meta.GetResourceVersion()
// 根据不同的事件类型做不同的处理
switch event.Type { switch event.Type {
case watch.Added: case watch.Added:
err := store.Add(event.Object) err := store.Add(event.Object)
@ -596,6 +617,8 @@ loop:
} }
} }
// 到这里 watch 操作就结束了
watchDuration := clock.Since(start) watchDuration := clock.Since(start)
if watchDuration < 1*time.Second && eventCount == 0 { if watchDuration < 1*time.Second && eventCount == 0 {
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name) return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name)

Loading…
Cancel
Save