From 6a8ef2703c069e46e80ecfc67e6fcc07ed96ab47 Mon Sep 17 00:00:00 2001 From: dongming Date: Thu, 15 Dec 2022 21:58:30 +0800 Subject: [PATCH] l-40 --- .../pkg/builder/controller.go | 18 ++++++++++++++++++ .../pkg/controller/controller.go | 3 +++ .../pkg/internal/controller/controller.go | 1 + .../controller-runtime/pkg/source/source.go | 5 +++++ 4 files changed, 27 insertions(+) diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/builder/controller.go b/vendor/sigs.k8s.io/controller-runtime/pkg/builder/controller.go index ccc06b4..1c782ea 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/builder/controller.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/builder/controller.go @@ -221,18 +221,28 @@ func (blder *Builder) project(obj client.Object, proj objectProjection) (client. } func (blder *Builder) doWatch() error { + // 创建一个类型(中间类型) // Reconcile type typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection) if err != nil { return err } + + // 创建源头的类型 src := &source.Kind{Type: typeForSrc} + // 消息的回调函数 hdler := &handler.EnqueueRequestForObject{} + + // 处理全局的回调,如果外部没有手动添加,这里不会有实质性作用。也就是说我们可以自己添加回调函数来有更多的处理。 allPredicates := append(blder.globalPredicates, blder.forInput.predicates...) + + // 调用 Watch 函数,蓝监控事件。比较底层的方法。在比较老的版本中很常用。目前不推荐使用。 if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { return err } + + // 调用 Watch 函数,蓝监控Input源中的事件。 // Watches the managed types for _, own := range blder.ownsInput { typeForSrc, err := blder.project(own.object, own.objectProjection) @@ -251,6 +261,7 @@ func (blder *Builder) doWatch() error { } } + // 调用 Watch 函数,蓝监控Owns源中的事件。 // Do the watch requests for _, w := range blder.watchesInput { allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) @@ -280,9 +291,12 @@ func (blder *Builder) getControllerName(gvk schema.GroupVersionKind) string { } func (blder *Builder) doController(r reconcile.Reconciler) error { + // 初始化 全局 的选项,主要是 并发度的控制,同步超时时间 globalOpts := blder.mgr.GetControllerOptions() + // 初始化 controller 的选项,主要是来自外部的设置 ctrlOptions := blder.ctrlOptions + // 将我们定义的 reconcile 函数,复制给 controller 的 reconcile if ctrlOptions.Reconciler == nil { ctrlOptions.Reconciler = r } @@ -294,6 +308,7 @@ func (blder *Builder) doController(r reconcile.Reconciler) error { return err } + // 设置并发度 // Setup concurrency. if ctrlOptions.MaxConcurrentReconciles == 0 { groupKind := gvk.GroupKind().String() @@ -303,13 +318,16 @@ func (blder *Builder) doController(r reconcile.Reconciler) error { } } + // 设置同步的超时时间 // Setup cache sync timeout. if ctrlOptions.CacheSyncTimeout == 0 && globalOpts.CacheSyncTimeout != nil { ctrlOptions.CacheSyncTimeout = *globalOpts.CacheSyncTimeout } + // 创建 controller 名字 controllerName := blder.getControllerName(gvk) + // 设置日志 // Setup the logger. if ctrlOptions.LogConstructor == nil { log := blder.mgr.GetLogger().WithValues( diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controller.go b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controller.go index 8e3d859..382d4fa 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controller.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controller.go @@ -122,14 +122,17 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller } } + // 设置并发的默认值 if options.MaxConcurrentReconciles <= 0 { options.MaxConcurrentReconciles = 1 } + // 设置同步超时的默认值 if options.CacheSyncTimeout == 0 { options.CacheSyncTimeout = 2 * time.Minute } + // 创建限速队列 if options.RateLimiter == nil { options.RateLimiter = workqueue.DefaultControllerRateLimiter() } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go index 3732eea..1984988 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go @@ -126,6 +126,7 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc c.mu.Lock() defer c.mu.Unlock() + // 注入 cache 到 参数中的对象中 // Inject Cache into arguments if err := c.SetFields(src); err != nil { return err diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/source/source.go b/vendor/sigs.k8s.io/controller-runtime/pkg/source/source.go index 241c582..fd40c9d 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/source/source.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/source/source.go @@ -107,6 +107,7 @@ var _ SyncingSource = &Kind{} // to enqueue reconcile.Requests. func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) error { + // 检查状态,看是否满足后续的需要 // Type should have been specified by the user. if ks.Type == nil { return fmt.Errorf("must specify Kind.Type") @@ -131,6 +132,7 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w // an error or the specified context is cancelled or expired. if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) { // Lookup the Informer from the Cache and add an EventHandler which populates the Queue + // 从 cache 中获取 informer。 i, lastErr = ks.cache.GetInformer(ctx, ks.Type) if lastErr != nil { kindMatchErr := &meta.NoKindMatchError{} @@ -155,7 +157,10 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w return } + // 添加回调方法 i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) + + // 等待同步完成,如果某一个类型的cache同步失败,则整个失败 if !ks.cache.WaitForCacheSync(ctx) { // Would be great to return something more informative here ks.started <- errors.New("cache did not sync")