master
dongming 2 years ago
parent a56cbd485f
commit bb2eb40d9c

@ -45,6 +45,7 @@ import (
const defaultExpectedTypeName = "<unspecified>" const defaultExpectedTypeName = "<unspecified>"
// Reflector 的数据结构
// Reflector watches a specified resource and causes all changes to be reflected in the given store. // Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct { type Reflector struct {
// name identifies this reflector. By default it will be a file:line if possible. // name identifies this reflector. By default it will be a file:line if possible.
@ -54,19 +55,20 @@ type Reflector struct {
// will be the stringification of expectedGVK if provided, and the // will be the stringification of expectedGVK if provided, and the
// stringification of expectedType otherwise. It is for display // stringification of expectedType otherwise. It is for display
// only, and should not be used for parsing or comparison. // only, and should not be used for parsing or comparison.
expectedTypeName string expectedTypeName string // 期望的,关注的,订阅的类型名
// An example object of the type we expect to place in the store. // An example object of the type we expect to place in the store.
// Only the type needs to be right, except that when that is // Only the type needs to be right, except that when that is
// `unstructured.Unstructured` the object's `"apiVersion"` and // `unstructured.Unstructured` the object's `"apiVersion"` and
// `"kind"` must also be right. // `"kind"` must also be right.
expectedType reflect.Type expectedType reflect.Type // 满足 “类型” 的类型
// The GVK of the object we expect to place in the store if unstructured. // The GVK of the object we expect to place in the store if unstructured.
expectedGVK *schema.GroupVersionKind expectedGVK *schema.GroupVersionKind // 用GVK来标注“类型”
// The destination to sync up with the watch source // The destination to sync up with the watch source
store Store store Store // 存储list&watch结果的地方
// listerWatcher is used to perform lists and watches. // listerWatcher is used to perform lists and watches.
listerWatcher ListerWatcher listerWatcher ListerWatcher // 提供 list & Watch 方法的对象
// List & Watch 行为的一些参数
// backoff manages backoff of ListWatch // backoff manages backoff of ListWatch
backoffManager wait.BackoffManager backoffManager wait.BackoffManager
// initConnBackoffManager manages backoff the initial connection with the Watch call of ListAndWatch. // initConnBackoffManager manages backoff the initial connection with the Watch call of ListAndWatch.
@ -74,14 +76,17 @@ type Reflector struct {
// MaxInternalErrorRetryDuration defines how long we should retry internal errors returned by watch. // MaxInternalErrorRetryDuration defines how long we should retry internal errors returned by watch.
MaxInternalErrorRetryDuration time.Duration MaxInternalErrorRetryDuration time.Duration
resyncPeriod time.Duration // 设置 Resync 操作的参数
resyncPeriod time.Duration // 自动resync的间隔时间
// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked // ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
ShouldResync func() bool ShouldResync func() bool // 主动执行resync这个方法被调用后返回“true” resync 就会被执行
// clock allows tests to manipulate time // clock allows tests to manipulate time
clock clock.Clock clock clock.Clock // 测试来使用的
// paginatedResult defines whether pagination should be forced for list calls. // paginatedResult defines whether pagination should be forced for list calls.
// It is set based on the result of the initial list call. // It is set based on the result of the initial list call.
paginatedResult bool paginatedResult bool // 是否使用分页
// 乐观锁的实现
// lastSyncResourceVersion is the resource version token last // lastSyncResourceVersion is the resource version token last
// observed when doing a sync with the underlying store // observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store // it is thread safe, but not synchronized with the underlying store
@ -98,7 +103,7 @@ type Reflector struct {
// NOTE: It should be used carefully as paginated lists are always served directly from // NOTE: It should be used carefully as paginated lists are always served directly from
// etcd, which is significantly less efficient and may lead to serious performance and // etcd, which is significantly less efficient and may lead to serious performance and
// scalability problems. // scalability problems.
WatchListPageSize int64 WatchListPageSize int64 // 分页的页大小
// Called whenever the ListAndWatch drops the connection with an error. // Called whenever the ListAndWatch drops the connection with an error.
watchErrorHandler WatchErrorHandler watchErrorHandler WatchErrorHandler
} }
@ -147,6 +152,8 @@ var (
minWatchTimeout = 5 * time.Minute minWatchTimeout = 5 * time.Minute
) )
// 创建 Reflector 的方法
// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector // NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
// The indexer is configured to key on namespace // The indexer is configured to key on namespace
func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) { func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
@ -169,13 +176,21 @@ func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyn
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod) return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
} }
// 创建 Reflector 的最终地方
// NewNamedReflector same as NewReflector, but with a specified name for logging // NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
// 创建始终,可以让测试的时候调整
realClock := &clock.RealClock{} realClock := &clock.RealClock{}
// 创建 Reflector 数据
r := &Reflector{ r := &Reflector{
name: name, name: name,
listerWatcher: lw, // 这两个对象是 reflector 与外部交互的重要对象
store: store, listerWatcher: lw, // 外部传进来的 提供 list & watch 方法的对象
store: store, // 外部传进来的 提供存储的对象
// 同步时,重试等操作测方法及参数
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff. // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
@ -185,21 +200,29 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
clock: realClock, clock: realClock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
} }
// 设置关注类型的方法
r.setExpectedType(expectedType) r.setExpectedType(expectedType)
return r return r
} }
// 设置关注类型的方法实现。这里所说的类型都为 GVK 名字
func (r *Reflector) setExpectedType(expectedType interface{}) { func (r *Reflector) setExpectedType(expectedType interface{}) {
// 获取类型名称
r.expectedType = reflect.TypeOf(expectedType) r.expectedType = reflect.TypeOf(expectedType)
if r.expectedType == nil { if r.expectedType == nil {
// 如果是空类型或者其他原因设置失败,导致该值为空,则使用默认的类型名
r.expectedTypeName = defaultExpectedTypeName r.expectedTypeName = defaultExpectedTypeName
return return
} }
// 将类型转换为容易识别的字符串。
r.expectedTypeName = r.expectedType.String() r.expectedTypeName = r.expectedType.String()
// 处理动态类型的相关操作
if obj, ok := expectedType.(*unstructured.Unstructured); ok { if obj, ok := expectedType.(*unstructured.Unstructured); ok {
// Use gvk to check that watch event objects are of the desired type. // Use gvk to check that watch event objects are of the desired type.
// 动态的获取类型
gvk := obj.GroupVersionKind() gvk := obj.GroupVersionKind()
if gvk.Empty() { if gvk.Empty() {
klog.V(4).Infof("Reflector from %s configured with expectedType of *unstructured.Unstructured with empty GroupVersionKind.", r.name) klog.V(4).Infof("Reflector from %s configured with expectedType of *unstructured.Unstructured with empty GroupVersionKind.", r.name)

Loading…
Cancel
Save