From f34bc6bc1386e712747d44fbd2bf3e6768fdcb73 Mon Sep 17 00:00:00 2001 From: dongming Date: Tue, 13 Dec 2022 15:19:34 +0800 Subject: [PATCH] l-18 --- .../k8s.io/client-go/tools/cache/reflector.go | 35 ++++++++++++++++--- vendor/k8s.io/client-go/tools/pager/pager.go | 23 ++++++++++-- 2 files changed, 50 insertions(+), 8 deletions(-) diff --git a/vendor/k8s.io/client-go/tools/cache/reflector.go b/vendor/k8s.io/client-go/tools/cache/reflector.go index 558fed6..d2d21ca 100644 --- a/vendor/k8s.io/client-go/tools/cache/reflector.go +++ b/vendor/k8s.io/client-go/tools/cache/reflector.go @@ -244,7 +244,7 @@ var internalPackages = []string{"client-go/tools/cache/"} func (r *Reflector) Run(stopCh <-chan struct{}) { klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) wait.BackoffUntil(func() { - if err := r.ListAndWatch(stopCh); err != nil {// 调用了 list & watch + if err := r.ListAndWatch(stopCh); err != nil {// 调用了 list & watch, r.watchErrorHandler(r, err) } }, r.backoffManager, true, stopCh) @@ -280,14 +280,17 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name) + // 执行list操作 err := r.list(stopCh) if err != nil { return err } + // 执行resync操作 resyncerrc := make(chan error, 1) cancelCh := make(chan struct{}) defer close(cancelCh) + // 启动一个协程来运行 resync go func() { resyncCh, cleanup := r.resyncChan() defer func() { @@ -313,6 +316,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { } }() + // 执行 watch 操作 retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock) for { // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors @@ -380,6 +384,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { // the resource version can be used for further progress notification (aka. watch). func (r *Reflector) list(stopCh <-chan struct{}) error { var resourceVersion string + + // 创建 option,主要是确定 resourceVersion options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()} initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name}) @@ -389,25 +395,34 @@ func (r *Reflector) list(stopCh <-chan struct{}) error { var err error listCh := make(chan struct{}, 1) panicCh := make(chan interface{}, 1) + // 启动 goroutine 来执行 list 并接收 go func() { defer func() { if r := recover(); r != nil { panicCh <- r } }() + // pager 是一个拥有执行了外部list方法结果的对象 // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first // list request will return the full response. - pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { + pager := pager.New( + pager.SimplePageFunc( + func(opts metav1.ListOptions) (runtime.Object, error) { + // 这里执行了外面传进来的对象的 list 方法 return r.listerWatcher.List(opts) })) switch { case r.WatchListPageSize != 0: + // 设置了分页值,就会让这个值在执行分页的时候生效 pager.PageSize = r.WatchListPageSize case r.paginatedResult: + // 如果已经使用用分页的结果,就不做任何操作 // We got a paginated result initially. Assume this resource and server honor // paging requests (i.e. watch cache is probably disabled) and leave the default // pager size set. case options.ResourceVersion != "" && options.ResourceVersion != "0": + // 如果满足这里,说明获取到数据,并且没有分页信息 + // 如果 PageSize 为 0, 后面会从 cache 中获取数据 // User didn't explicitly request pagination. // // With ResourceVersion != "", we have a possibility to list from watch cache, @@ -436,6 +451,8 @@ func (r *Reflector) list(stopCh <-chan struct{}) error { } close(listCh) }() + + // 控制 goroutine 的运行 select { case <-stopCh: return nil @@ -463,12 +480,15 @@ func (r *Reflector) list(stopCh <-chan struct{}) error { r.paginatedResult = true } + // 表示list执行成功 r.setIsLastSyncResourceVersionUnavailable(false) // list was successful - listMetaInterface, err := meta.ListAccessor(list) + + // 处理resourceVersion + listMetaInterface, err := meta.ListAccessor(list) // 创建提取器 if err != nil { return fmt.Errorf("unable to understand list result %#v: %v", list, err) } - resourceVersion = listMetaInterface.GetResourceVersion() + resourceVersion = listMetaInterface.GetResourceVersion() // 获取当前数据的 resourceVersion。相当于备份 resourceversion initTrace.Step("Resource version extracted") items, err := meta.ExtractList(list) if err != nil { @@ -479,7 +499,7 @@ func (r *Reflector) list(stopCh <-chan struct{}) error { return fmt.Errorf("unable to sync list result: %v", err) } initTrace.Step("SyncWith done") - r.setLastSyncResourceVersion(resourceVersion) + r.setLastSyncResourceVersion(resourceVersion) // 还原 resourceVersion initTrace.Step("Resource version updated") return nil } @@ -598,6 +618,7 @@ func (r *Reflector) setLastSyncResourceVersion(v string) { r.lastSyncResourceVersion = v } +// 确认 resourceVersion 的方法 // relistResourceVersion determines the resource version the reflector should list or relist from. // Returns either the lastSyncResourceVersion so that this reflector will relist with a resource // versions no older than has already been observed in relist results or watch events, or, if the last relist resulted @@ -608,16 +629,20 @@ func (r *Reflector) relistResourceVersion() string { defer r.lastSyncResourceVersionMutex.RUnlock() if r.isLastSyncResourceVersionUnavailable { + // 如果最近一次的sync的 resourceVersion 不可用,我们就返回一个空的 resourceVersion // Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache // if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to re-establish reflector // to the latest available ResourceVersion, using a consistent read from etcd. return "" } if r.lastSyncResourceVersion == "" { + // 如果最近一次的sync的 resourceVersion 为字符串 空 ,则返回字符串 “0” // For performance reasons, initial list performed by reflector uses "0" as resource version to allow it to // be served from the watch cache if it is enabled. return "0" } + + // 都不满足直接返回最后一次同步的 resourceVersion return r.lastSyncResourceVersion } diff --git a/vendor/k8s.io/client-go/tools/pager/pager.go b/vendor/k8s.io/client-go/tools/pager/pager.go index 805859e..6b8b277 100644 --- a/vendor/k8s.io/client-go/tools/pager/pager.go +++ b/vendor/k8s.io/client-go/tools/pager/pager.go @@ -74,12 +74,13 @@ func New(fn ListPageFunc) *ListPager { // server to reduce the impact on the server. If the chunk attempt fails, it will load // the full list instead. The Limit field on options, if unset, will default to the page size. func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) { + // 设置 page size if options.Limit == 0 { options.Limit = p.PageSize } requestedResourceVersion := options.ResourceVersion requestedResourceVersionMatch := options.ResourceVersionMatch - var list *metainternalversion.List + var list *metainternalversion.List // 用来存储获取的结果 paginatedResult := false for { @@ -89,8 +90,10 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti default: } + // 真正执行外部传入的,含有list对象的 list 方法 obj, err := p.PageFn(ctx, options) if err != nil { + // 错误处理 // Only fallback to full list if an "Expired" errors is returned, FullListIfExpired is true, and // the "Expired" error occurred in page 2 or later (since full list is intended to prevent a pager.List from // failing when the resource versions is established by the first page request falls out of the compaction @@ -104,14 +107,18 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti options.Continue = "" options.ResourceVersion = requestedResourceVersion options.ResourceVersionMatch = requestedResourceVersionMatch - result, err := p.PageFn(ctx, options) + result, err := p.PageFn(ctx, options) // 失败了,最后还会执行一次 + // 执行后,不管成功与否,都直接返回 return result, paginatedResult, err } + // 创建一个提取器 + // 根据获取的对象类型,来生成提取器 m, err := meta.ListAccessor(obj) if err != nil { return nil, paginatedResult, fmt.Errorf("returned object must be a list: %v", err) } + // 判断结果状态,如果没有更多或者已经为空,则退出 // exit early and return the object we got if we haven't processed any pages if len(m.GetContinue()) == 0 && list == nil { return obj, paginatedResult, nil @@ -119,10 +126,14 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti // initialize the list and fill its contents if list == nil { + // 进到这里,表示是第一次执行 + // 使用创建的提取器初始化 list 对象 list = &metainternalversion.List{Items: make([]runtime.Object, 0, options.Limit+1)} list.ResourceVersion = m.GetResourceVersion() list.SelfLink = m.GetSelfLink() } + + // 处理每一个获取的对象,无论是一个列表,还是封装在一个列表对象中的列表 if err := meta.EachListItem(obj, func(obj runtime.Object) error { list.Items = append(list.Items, obj) return nil @@ -130,18 +141,24 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti return nil, paginatedResult, err } + // 如果获取完成,就返回 // if we have no more items, return the list if len(m.GetContinue()) == 0 { return list, paginatedResult, nil } + // 开始下一次循环的初始化设置 // set the next loop up - options.Continue = m.GetContinue() + options.Continue = m.GetContinue() // 获取了象一个对象 + + // 一些变量的清理工作 // Clear the ResourceVersion(Match) on the subsequent List calls to avoid the // `specifying resource version is not allowed when using continue` error. // See https://github.com/kubernetes/kubernetes/issues/85221#issuecomment-553748143. options.ResourceVersion = "" options.ResourceVersionMatch = "" + + // 如果走到这里,自然的,分页就产生了。 // At this point, result is already paginated. paginatedResult = true }