From bb2eb40d9c6611affc56003c52de291f5aa4d4e0 Mon Sep 17 00:00:00 2001 From: dongming Date: Mon, 12 Dec 2022 21:10:59 +0800 Subject: [PATCH] l-12 --- .../k8s.io/client-go/tools/cache/reflector.go | 47 ++++++++++++++----- 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/vendor/k8s.io/client-go/tools/cache/reflector.go b/vendor/k8s.io/client-go/tools/cache/reflector.go index 9cd476b..d421747 100644 --- a/vendor/k8s.io/client-go/tools/cache/reflector.go +++ b/vendor/k8s.io/client-go/tools/cache/reflector.go @@ -45,6 +45,7 @@ import ( const defaultExpectedTypeName = "" +// Reflector 的数据结构 // Reflector watches a specified resource and causes all changes to be reflected in the given store. type Reflector struct { // name identifies this reflector. By default it will be a file:line if possible. @@ -54,19 +55,20 @@ type Reflector struct { // will be the stringification of expectedGVK if provided, and the // stringification of expectedType otherwise. It is for display // only, and should not be used for parsing or comparison. - expectedTypeName string + expectedTypeName string // 期望的,关注的,订阅的类型名 // An example object of the type we expect to place in the store. // Only the type needs to be right, except that when that is // `unstructured.Unstructured` the object's `"apiVersion"` and // `"kind"` must also be right. - expectedType reflect.Type + expectedType reflect.Type // 满足 “类型” 的类型 // The GVK of the object we expect to place in the store if unstructured. - expectedGVK *schema.GroupVersionKind + expectedGVK *schema.GroupVersionKind // 用GVK来标注“类型” // The destination to sync up with the watch source - store Store + store Store // 存储list&watch结果的地方 // listerWatcher is used to perform lists and watches. - listerWatcher ListerWatcher + listerWatcher ListerWatcher // 提供 list & Watch 方法的对象 + // List & Watch 行为的一些参数 // backoff manages backoff of ListWatch backoffManager wait.BackoffManager // initConnBackoffManager manages backoff the initial connection with the Watch call of ListAndWatch. @@ -74,14 +76,17 @@ type Reflector struct { // MaxInternalErrorRetryDuration defines how long we should retry internal errors returned by watch. MaxInternalErrorRetryDuration time.Duration - resyncPeriod time.Duration + // 设置 Resync 操作的参数 + resyncPeriod time.Duration // 自动resync的间隔时间 // ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked - ShouldResync func() bool + ShouldResync func() bool // 主动执行resync,这个方法被调用后,返回“true” resync 就会被执行 // clock allows tests to manipulate time - clock clock.Clock + clock clock.Clock // 测试来使用的 // paginatedResult defines whether pagination should be forced for list calls. // It is set based on the result of the initial list call. - paginatedResult bool + paginatedResult bool // 是否使用分页 + + // 乐观锁的实现 // lastSyncResourceVersion is the resource version token last // observed when doing a sync with the underlying store // it is thread safe, but not synchronized with the underlying store @@ -98,7 +103,7 @@ type Reflector struct { // NOTE: It should be used carefully as paginated lists are always served directly from // etcd, which is significantly less efficient and may lead to serious performance and // scalability problems. - WatchListPageSize int64 + WatchListPageSize int64 // 分页的页大小 // Called whenever the ListAndWatch drops the connection with an error. watchErrorHandler WatchErrorHandler } @@ -147,6 +152,8 @@ var ( minWatchTimeout = 5 * time.Minute ) +// 创建 Reflector 的方法 + // NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector // The indexer is configured to key on namespace func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) { @@ -169,13 +176,21 @@ func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyn return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod) } + +// 创建 Reflector 的最终地方 // NewNamedReflector same as NewReflector, but with a specified name for logging func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { + // 创建始终,可以让测试的时候调整 realClock := &clock.RealClock{} + + // 创建 Reflector 数据 r := &Reflector{ name: name, - listerWatcher: lw, - store: store, + // 这两个对象是 reflector 与外部交互的重要对象 + listerWatcher: lw, // 外部传进来的 提供 list & watch 方法的对象 + store: store, // 外部传进来的 提供存储的对象 + + // 同步时,重试等操作测方法及参数 // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff. @@ -185,21 +200,29 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, clock: realClock, watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), } + + // 设置关注类型的方法 r.setExpectedType(expectedType) return r } +// 设置关注类型的方法实现。这里所说的类型都为 GVK 名字 func (r *Reflector) setExpectedType(expectedType interface{}) { + // 获取类型名称 r.expectedType = reflect.TypeOf(expectedType) if r.expectedType == nil { + // 如果是空类型或者其他原因设置失败,导致该值为空,则使用默认的类型名 r.expectedTypeName = defaultExpectedTypeName return } + // 将类型转换为容易识别的字符串。 r.expectedTypeName = r.expectedType.String() + // 处理动态类型的相关操作 if obj, ok := expectedType.(*unstructured.Unstructured); ok { // Use gvk to check that watch event objects are of the desired type. + // 动态的获取类型 gvk := obj.GroupVersionKind() if gvk.Empty() { klog.V(4).Infof("Reflector from %s configured with expectedType of *unstructured.Unstructured with empty GroupVersionKind.", r.name)