master
dongming 2 years ago
parent 41f911d3c7
commit be46c9f723

@ -50,8 +50,17 @@ type AppReconciler struct {
func (r *AppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { func (r *AppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx) _ = log.FromContext(ctx)
// TODO(user): your logic here // 获取对象的惯例写法
app := new(demov1.App)
// Get 会调用实现了 Reader Interface 的对象。
// 找到了实现的对象为 `delegatingReader` ,它只实现了 Reader 接口
if err := r.Client.Get(ctx, req.NamespacedName, app); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// create() 会调用实现了 Writer Interface 的对象
//r.Client.Create()
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }

@ -120,6 +120,7 @@ func (d *delegatingReader) shouldBypassCache(obj runtime.Object) (bool, error) {
return false, nil return false, nil
} }
// 使用 cache 访问
// Get retrieves an obj for a given object key from the Kubernetes Cluster. // Get retrieves an obj for a given object key from the Kubernetes Cluster.
func (d *delegatingReader) Get(ctx context.Context, key ObjectKey, obj Object, opts ...GetOption) error { func (d *delegatingReader) Get(ctx context.Context, key ObjectKey, obj Object, opts ...GetOption) error {
if isUncached, err := d.shouldBypassCache(obj); err != nil { if isUncached, err := d.shouldBypassCache(obj); err != nil {
@ -130,6 +131,7 @@ func (d *delegatingReader) Get(ctx context.Context, key ObjectKey, obj Object, o
return d.CacheReader.Get(ctx, key, obj, opts...) return d.CacheReader.Get(ctx, key, obj, opts...)
} }
// 使用 cache 访问
// List retrieves list of objects for a given namespace and list options. // List retrieves list of objects for a given namespace and list options.
func (d *delegatingReader) List(ctx context.Context, list ObjectList, opts ...ListOption) error { func (d *delegatingReader) List(ctx context.Context, list ObjectList, opts ...ListOption) error {
if isUncached, err := d.shouldBypassCache(list); err != nil { if isUncached, err := d.shouldBypassCache(list); err != nil {

@ -118,6 +118,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ re
panic(r) panic(r)
} }
}() }()
// 去调用我们的 reconcile。这里之后是 interface。但是Do这个字段中的 Reconcile 是我们之前传进来的。
return c.Do.Reconcile(ctx, req) return c.Do.Reconcile(ctx, req)
} }
@ -161,11 +162,13 @@ func (c *Controller) Start(ctx context.Context) error {
return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times") return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
} }
// 初始化监控相关的信息
c.initMetrics() c.initMetrics()
// Set the internal context. // Set the internal context.
c.ctx = ctx c.ctx = ctx
// 创建了一个限速队列
c.Queue = c.MakeQueue() c.Queue = c.MakeQueue()
go func() { go func() {
<-ctx.Done() <-ctx.Done()
@ -179,6 +182,7 @@ func (c *Controller) Start(ctx context.Context) error {
// TODO(pwittrock): Reconsider HandleCrash // TODO(pwittrock): Reconsider HandleCrash
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
// 启动所有的事件源
// NB(directxman12): launch the sources *before* trying to wait for the // NB(directxman12): launch the sources *before* trying to wait for the
// caches to sync so that they have a chance to register their intendeded // caches to sync so that they have a chance to register their intendeded
// caches. // caches.
@ -193,6 +197,7 @@ func (c *Controller) Start(ctx context.Context) error {
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
c.LogConstructor(nil).Info("Starting Controller") c.LogConstructor(nil).Info("Starting Controller")
// 等待一次同步完成
for _, watch := range c.startWatches { for _, watch := range c.startWatches {
syncingSource, ok := watch.src.(source.SyncingSource) syncingSource, ok := watch.src.(source.SyncingSource)
if !ok { if !ok {
@ -218,6 +223,7 @@ func (c *Controller) Start(ctx context.Context) error {
} }
} }
// 完成一次同步后,清空所有源
// All the watches have been started, we can reset the local slice. // All the watches have been started, we can reset the local slice.
// //
// We should never hold watches more than necessary, each watch source can hold a backing cache, // We should never hold watches more than necessary, each watch source can hold a backing cache,
@ -230,6 +236,7 @@ func (c *Controller) Start(ctx context.Context) error {
for i := 0; i < c.MaxConcurrentReconciles; i++ { for i := 0; i < c.MaxConcurrentReconciles; i++ {
go func() { go func() {
defer wg.Done() defer wg.Done()
// 不断的从队列中读取任务
// Run a worker thread that just dequeues items, processes them, and marks them done. // Run a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the reconcileHandler is never invoked concurrently with the same object. // It enforces that the reconcileHandler is never invoked concurrently with the same object.
for c.processNextWorkItem(ctx) { for c.processNextWorkItem(ctx) {
@ -254,6 +261,7 @@ func (c *Controller) Start(ctx context.Context) error {
// processNextWorkItem will read a single work item off the workqueue and // processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the reconcileHandler. // attempt to process it, by calling the reconcileHandler.
func (c *Controller) processNextWorkItem(ctx context.Context) bool { func (c *Controller) processNextWorkItem(ctx context.Context) bool {
// 获取事件
obj, shutdown := c.Queue.Get() obj, shutdown := c.Queue.Get()
if shutdown { if shutdown {
// Stop working // Stop working
@ -268,9 +276,11 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool {
// period. // period.
defer c.Queue.Done(obj) defer c.Queue.Done(obj)
// 加入一些监控可视化的信息
ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1) ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1) defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)
// 去调用我们的 reconcile
c.reconcileHandler(ctx, obj) c.reconcileHandler(ctx, obj)
return true return true
} }
@ -293,6 +303,7 @@ func (c *Controller) initMetrics() {
} }
func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) { func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
// 计时
// Update metrics after processing each item // Update metrics after processing each item
reconcileStartTS := time.Now() reconcileStartTS := time.Now()
defer func() { defer func() {
@ -302,6 +313,7 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
// Make sure that the object is a valid request. // Make sure that the object is a valid request.
req, ok := obj.(reconcile.Request) req, ok := obj.(reconcile.Request)
if !ok { if !ok {
// 异常,在队列中删掉这个对象
// As the item in the workqueue is actually invalid, we call // As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to // Forget here else we'd go into a loop of attempting to
// process a work item that is invalid. // process a work item that is invalid.
@ -316,16 +328,22 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
log = log.WithValues("reconcileID", uuid.NewUUID()) log = log.WithValues("reconcileID", uuid.NewUUID())
ctx = logf.IntoContext(ctx, log) ctx = logf.IntoContext(ctx, log)
// 去调用我们的 reconcile
// RunInformersAndControllers the syncHandler, passing it the Namespace/Name string of the // RunInformersAndControllers the syncHandler, passing it the Namespace/Name string of the
// resource to be synced. // resource to be synced.
result, err := c.Reconcile(ctx, req) result, err := c.Reconcile(ctx, req)
// 处理我们的 reconcile 返回值的地方
switch { switch {
case err != nil: case err != nil:
// 如果报错,则重新加入队列,并记录监控信息,并记录错误日志
// 如果我们返回了err同时设定了重新入队的时间那么这个时间不会生效。
c.Queue.AddRateLimited(req) c.Queue.AddRateLimited(req)
ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc() ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc() ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
log.Error(err, "Reconciler error") log.Error(err, "Reconciler error")
case result.RequeueAfter > 0: case result.RequeueAfter > 0:
// 如果 rsults 中的 RequeueAfter也就是设置等待一定使时间之后再次入队。
// 那么,会删掉从队列中删掉事件,等待设定的时间后再次入队。并更新监控信息
// The result.RequeueAfter request will be lost, if it is returned // The result.RequeueAfter request will be lost, if it is returned
// along with a non-nil error. But this is intended as // along with a non-nil error. But this is intended as
// We need to drive to stable reconcile loops before queuing due // We need to drive to stable reconcile loops before queuing due
@ -334,9 +352,11 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
c.Queue.AddAfter(req, result.RequeueAfter) c.Queue.AddAfter(req, result.RequeueAfter)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc() ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
case result.Requeue: case result.Requeue:
// 直接设置重新入队,会立刻重新入队,并更新监控信息。
c.Queue.AddRateLimited(req) c.Queue.AddRateLimited(req)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc() ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()
default: default:
// 除此之外,从队列中删除事件。并更新监控信息
// Finally, if no error occurs we Forget this item so it does not // Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens. // get queued again until another change happens.
c.Queue.Forget(obj) c.Queue.Forget(obj)

Loading…
Cancel
Save