From be46c9f72362a101a53cdf29802d54b43a4dca45 Mon Sep 17 00:00:00 2001 From: dongming Date: Thu, 15 Dec 2022 23:15:02 +0800 Subject: [PATCH] l-43 --- controllers/app_controller.go | 11 +++++++++- .../controller-runtime/pkg/client/split.go | 2 ++ .../pkg/internal/controller/controller.go | 20 +++++++++++++++++++ 3 files changed, 32 insertions(+), 1 deletion(-) diff --git a/controllers/app_controller.go b/controllers/app_controller.go index ace0765..3761be9 100644 --- a/controllers/app_controller.go +++ b/controllers/app_controller.go @@ -50,8 +50,17 @@ type AppReconciler struct { func (r *AppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { _ = log.FromContext(ctx) - // TODO(user): your logic here + // 获取对象的惯例写法 + app := new(demov1.App) + // Get() 会调用实现了 Reader Interface 的对象。 + // 找到了实现的对象为 `delegatingReader` ,它只实现了 Reader 接口 + if err := r.Client.Get(ctx, req.NamespacedName, app); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + // create() 会调用实现了 Writer Interface 的对象 + //r.Client.Create() return ctrl.Result{}, nil } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/split.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/split.go index 8717345..6107218 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/split.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/split.go @@ -120,6 +120,7 @@ func (d *delegatingReader) shouldBypassCache(obj runtime.Object) (bool, error) { return false, nil } +// 使用 cache 访问 // Get retrieves an obj for a given object key from the Kubernetes Cluster. func (d *delegatingReader) Get(ctx context.Context, key ObjectKey, obj Object, opts ...GetOption) error { if isUncached, err := d.shouldBypassCache(obj); err != nil { @@ -130,6 +131,7 @@ func (d *delegatingReader) Get(ctx context.Context, key ObjectKey, obj Object, o return d.CacheReader.Get(ctx, key, obj, opts...) } +// 使用 cache 访问 // List retrieves list of objects for a given namespace and list options. func (d *delegatingReader) List(ctx context.Context, list ObjectList, opts ...ListOption) error { if isUncached, err := d.shouldBypassCache(list); err != nil { diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go index 1984988..2aed72f 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go @@ -118,6 +118,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ re panic(r) } }() + // 去调用我们的 reconcile。这里之后是 interface。但是Do这个字段中的 Reconcile 是我们之前传进来的。 return c.Do.Reconcile(ctx, req) } @@ -161,11 +162,13 @@ func (c *Controller) Start(ctx context.Context) error { return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times") } + // 初始化监控相关的信息 c.initMetrics() // Set the internal context. c.ctx = ctx + // 创建了一个限速队列 c.Queue = c.MakeQueue() go func() { <-ctx.Done() @@ -179,6 +182,7 @@ func (c *Controller) Start(ctx context.Context) error { // TODO(pwittrock): Reconsider HandleCrash defer utilruntime.HandleCrash() + // 启动所有的事件源 // NB(directxman12): launch the sources *before* trying to wait for the // caches to sync so that they have a chance to register their intendeded // caches. @@ -193,6 +197,7 @@ func (c *Controller) Start(ctx context.Context) error { // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches c.LogConstructor(nil).Info("Starting Controller") + // 等待一次同步完成 for _, watch := range c.startWatches { syncingSource, ok := watch.src.(source.SyncingSource) if !ok { @@ -218,6 +223,7 @@ func (c *Controller) Start(ctx context.Context) error { } } + // 完成一次同步后,清空所有源 // All the watches have been started, we can reset the local slice. // // We should never hold watches more than necessary, each watch source can hold a backing cache, @@ -230,6 +236,7 @@ func (c *Controller) Start(ctx context.Context) error { for i := 0; i < c.MaxConcurrentReconciles; i++ { go func() { defer wg.Done() + // 不断的从队列中读取任务 // Run a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the reconcileHandler is never invoked concurrently with the same object. for c.processNextWorkItem(ctx) { @@ -254,6 +261,7 @@ func (c *Controller) Start(ctx context.Context) error { // processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the reconcileHandler. func (c *Controller) processNextWorkItem(ctx context.Context) bool { + // 获取事件 obj, shutdown := c.Queue.Get() if shutdown { // Stop working @@ -268,9 +276,11 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool { // period. defer c.Queue.Done(obj) + // 加入一些监控可视化的信息 ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1) defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1) + // 去调用我们的 reconcile c.reconcileHandler(ctx, obj) return true } @@ -293,6 +303,7 @@ func (c *Controller) initMetrics() { } func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) { + // 计时 // Update metrics after processing each item reconcileStartTS := time.Now() defer func() { @@ -302,6 +313,7 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) { // Make sure that the object is a valid request. req, ok := obj.(reconcile.Request) if !ok { + // 异常,在队列中删掉这个对象 // As the item in the workqueue is actually invalid, we call // Forget here else we'd go into a loop of attempting to // process a work item that is invalid. @@ -316,16 +328,22 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) { log = log.WithValues("reconcileID", uuid.NewUUID()) ctx = logf.IntoContext(ctx, log) + // 去调用我们的 reconcile // RunInformersAndControllers the syncHandler, passing it the Namespace/Name string of the // resource to be synced. result, err := c.Reconcile(ctx, req) + // 处理我们的 reconcile 返回值的地方 switch { case err != nil: + // 如果报错,则重新加入队列,并记录监控信息,并记录错误日志 + // 如果我们返回了err,同时设定了重新入队的时间,那么,这个时间不会生效。 c.Queue.AddRateLimited(req) ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc() ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc() log.Error(err, "Reconciler error") case result.RequeueAfter > 0: + // 如果 rsults 中的 RequeueAfter,也就是设置等待一定使时间之后,再次入队。 + // 那么,会删掉从队列中删掉事件,等待设定的时间后再次入队。并更新监控信息 // The result.RequeueAfter request will be lost, if it is returned // along with a non-nil error. But this is intended as // We need to drive to stable reconcile loops before queuing due @@ -334,9 +352,11 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) { c.Queue.AddAfter(req, result.RequeueAfter) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc() case result.Requeue: + // 直接设置重新入队,会立刻重新入队,并更新监控信息。 c.Queue.AddRateLimited(req) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc() default: + // 除此之外,从队列中删除事件。并更新监控信息 // Finally, if no error occurs we Forget this item so it does not // get queued again until another change happens. c.Queue.Forget(obj)