master
dongming 2 years ago
parent d12832f8a6
commit 6a8ef2703c

@ -221,18 +221,28 @@ func (blder *Builder) project(obj client.Object, proj objectProjection) (client.
}
func (blder *Builder) doWatch() error {
// 创建一个类型(中间类型)
// Reconcile type
typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
if err != nil {
return err
}
// 创建源头的类型
src := &source.Kind{Type: typeForSrc}
// 消息的回调函数
hdler := &handler.EnqueueRequestForObject{}
// 处理全局的回调,如果外部没有手动添加,这里不会有实质性作用。也就是说我们可以自己添加回调函数来有更多的处理。
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
// 调用 Watch 函数,蓝监控事件。比较底层的方法。在比较老的版本中很常用。目前不推荐使用。
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}
// 调用 Watch 函数蓝监控Input源中的事件。
// Watches the managed types
for _, own := range blder.ownsInput {
typeForSrc, err := blder.project(own.object, own.objectProjection)
@ -251,6 +261,7 @@ func (blder *Builder) doWatch() error {
}
}
// 调用 Watch 函数蓝监控Owns源中的事件。
// Do the watch requests
for _, w := range blder.watchesInput {
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
@ -280,9 +291,12 @@ func (blder *Builder) getControllerName(gvk schema.GroupVersionKind) string {
}
func (blder *Builder) doController(r reconcile.Reconciler) error {
// 初始化 全局 的选项,主要是 并发度的控制,同步超时时间
globalOpts := blder.mgr.GetControllerOptions()
// 初始化 controller 的选项,主要是来自外部的设置
ctrlOptions := blder.ctrlOptions
// 将我们定义的 reconcile 函数,复制给 controller 的 reconcile
if ctrlOptions.Reconciler == nil {
ctrlOptions.Reconciler = r
}
@ -294,6 +308,7 @@ func (blder *Builder) doController(r reconcile.Reconciler) error {
return err
}
// 设置并发度
// Setup concurrency.
if ctrlOptions.MaxConcurrentReconciles == 0 {
groupKind := gvk.GroupKind().String()
@ -303,13 +318,16 @@ func (blder *Builder) doController(r reconcile.Reconciler) error {
}
}
// 设置同步的超时时间
// Setup cache sync timeout.
if ctrlOptions.CacheSyncTimeout == 0 && globalOpts.CacheSyncTimeout != nil {
ctrlOptions.CacheSyncTimeout = *globalOpts.CacheSyncTimeout
}
// 创建 controller 名字
controllerName := blder.getControllerName(gvk)
// 设置日志
// Setup the logger.
if ctrlOptions.LogConstructor == nil {
log := blder.mgr.GetLogger().WithValues(

@ -122,14 +122,17 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
}
}
// 设置并发的默认值
if options.MaxConcurrentReconciles <= 0 {
options.MaxConcurrentReconciles = 1
}
// 设置同步超时的默认值
if options.CacheSyncTimeout == 0 {
options.CacheSyncTimeout = 2 * time.Minute
}
// 创建限速队列
if options.RateLimiter == nil {
options.RateLimiter = workqueue.DefaultControllerRateLimiter()
}

@ -126,6 +126,7 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
c.mu.Lock()
defer c.mu.Unlock()
// 注入 cache 到 参数中的对象中
// Inject Cache into arguments
if err := c.SetFields(src); err != nil {
return err

@ -107,6 +107,7 @@ var _ SyncingSource = &Kind{}
// to enqueue reconcile.Requests.
func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
// 检查状态,看是否满足后续的需要
// Type should have been specified by the user.
if ks.Type == nil {
return fmt.Errorf("must specify Kind.Type")
@ -131,6 +132,7 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
// an error or the specified context is cancelled or expired.
if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) {
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
// 从 cache 中获取 informer。
i, lastErr = ks.cache.GetInformer(ctx, ks.Type)
if lastErr != nil {
kindMatchErr := &meta.NoKindMatchError{}
@ -155,7 +157,10 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
return
}
// 添加回调方法
i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
// 等待同步完成如果某一个类型的cache同步失败则整个失败
if !ks.cache.WaitForCacheSync(ctx) {
// Would be great to return something more informative here
ks.started <- errors.New("cache did not sync")

Loading…
Cancel
Save