You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kubeSourceCodeNote/pkg/pkg-01-wait-定时(条件)轮询库.md

9.0 KiB

title date tags
Kubernetes源码-pkg-01-wait-条件定时器库 2021/11/21 22:59:53
Kubernetes
Golang
读源码

Kubernetes源码-pkg-01-wait-定时(条件)轮询库

前言

在前面的主要组件分析过程中有数次提及到的wait库让我记忆犹新这是一个被高频引用的库各个主要组件如scheduler、controller、kubelet等都常常使用wait库中的function轮询间隔(或条件)触发执行动作。整个wait库只有一个代码文件代码行数不过400余行本篇就来完整地分析一下这个库。

代码路径: vendor/k8s.io/apimachinery/pkg/util/wait/wait.go

分类

wait库内的各种function大体来说都是以轮询的形式根据时间间隔、条件判断来确定工具执行函数是否应被继续执行。按代码中呈现按触发形式再细化一下各function则可以分为这几类

条件类型 说明
Until类 用得最多的类型一般以一条chan struct{} 或context Done接收done信号作为终止轮询的依据
Backoff类 每间隔一定的时长执行一次回溯函数,一般情况下,间隔时长随着回溯次数递增而倍数级延长,但间隔时长也会有上限值
poll类 两条channel一条用作传递单次执行信号用来轮询一条用作传递done信号

Until类

Untile类型有两个具体实现分别是Until和UntilWithContext

Until函数

vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:87

func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
	JitterUntil(f, period, 0.0, true, stopCh)
}

调用的是JitterUntil函数

func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
	var t *time.Timer
	var sawTimeout bool

	for {
    // f()执行前先判断一次stopCh是否有信号执行完之后也还要执行一次说明见下方
		select {
		case <-stopCh:
			return
		default:
		}

		jitteredPeriod := period
		if jitterFactor > 0.0 {
      // 如果加入抖动因子随机数,则要把间隔周期延长至原周期的随机倍数
			jitteredPeriod = Jitter(period, jitterFactor)
		}
		
    // sliding = false则把f()的运行时间计入间隔周期内
		if !sliding {
			t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
		}

		func() {
			defer runtime.HandleCrash()
			f()
		}()
		
    // sliding = true则f()执行完成后继续等待间隔周期再进入下一次循环
		if sliding {
			t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
		}

    // select下的各case分支权重相等随机分配因此为了避免在stopCh有停止信号时却select到了t.C信号分支上从而导致进入下一个逻辑循环超额执行了一次f()函数的情况在代码段开头就要判断一次stopCh是否有信号
		select {
		case <-stopCh:
			return
		case <-t.C:
			sawTimeout = true
		}
	}
}
func Jitter(duration time.Duration, maxFactor float64) time.Duration {
	if maxFactor <= 0.0 {
		maxFactor = 1.0
	}
  // 在抖动因子范围内选择随机数,放大间隔周期的倍数
	wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
	return wait
}

JitterUntil函数可谓是把条件考虑得很细致参数上有执行周期、抖动因子、窗口期(是否包含函数执行时间)另外在stopCh信号处理上也做到了预防超期执行JitterUntil函数已经足以应对各类以时间间隔维度的轮询场景了。

UntilWithContext

vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:96

func UntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) {
	JitterUntilWithContext(ctx, f, period, 0.0, true)
}
func JitterUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration, jitterFactor float64, sliding bool) {
	JitterUntil(func() { f(ctx) }, period, jitterFactor, sliding, ctx.Done())
}

可以看到最后也是调用的JitterUntil唯一的不同只是参数stopCh换成了包了一层的context.Done()

Backoff类

Backoff结构体

Backoff类函数都有一个类型为Backoff结构体的形参先来看看这个结构体的定义

vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:207

type Backoff struct {
	// 初始设定的间隔
	Duration time.Duration
	// 间隔时间的倍数
	Factor float64
	// 抖动因子,抖动因子是在最后计算的
	Jitter float64
  // duration最大步进次数下面还有个专门的Step()方法依据此字段动态计算出每次轮询的duration
	Steps int
  // 最大duration上限(计算抖动因子之前)
	Cap time.Duration
}
func (b *Backoff) Step() time.Duration {
	if b.Steps < 1 {
    // 当Steps == 0时duration不再变化
		if b.Jitter > 0 {
			return Jitter(b.Duration, b.Jitter)
		}
		return b.Duration
	}
  // Steps每循环一次会递减
	b.Steps--

	duration := b.Duration

	// calculate the next step
	if b.Factor != 0 {
    // duration每次轮询随着倍数因子倍数级递增
		b.Duration = time.Duration(float64(b.Duration) * b.Factor)
		if b.Cap > 0 && b.Duration > b.Cap {
      // 但duration最大也不会超过Cap
			b.Duration = b.Cap
			b.Steps = 0
		}
	}
  // 在计算的最后一步使用抖动因子放大duration再返回最终的duration
	if b.Jitter > 0 {
		duration = Jitter(duration, b.Jitter)
	}
	return duration
}

ok下面来看看具体的Backoff类实现函数。

ExponentialBackoff函数

func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
	for backoff.Steps > 0 {
		if ok, err := condition(); err != nil || ok {
			return err
		}
		if backoff.Steps == 1 {
			break
		}
		time.Sleep(backoff.Step())
	}
	return ErrWaitTimeout
}

ExponentialBackoff函数的工作逻辑是

  • 在最外层限定了backoff函数的最多重复执行次数即等于Steps字段值

  • 过程中condition()条件函数执行异常或正常则直接返回否则按最大限定次数每次轮询等待时间参照Step()方法返回值进行

  • 到达上限次数后若条件函数仍未返回结果,则返回超时错误

显然此函数适用于失败重试的场景常用k8s的同学一定不会陌生我们常遇到pod多次失败重试的状态如:CrashLoopBackOff/ImagePullBackOff状态重试过程正是使用此函数封装的

poll类

WaitFunc结构体

type WaitFunc func(done <-chan struct{}) <-chan struct{}

这里定义的type WaitFunc下面都会用到接收的参数chan用作done信号传递返回的chan用作执行信号传递

Poll

vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:286

func Poll(interval, timeout time.Duration, condition ConditionFunc) error {
	return pollInternal(poller(interval, timeout), condition)
}

调用的pollInternal函数看命名就知道是间隔执行的意思

func pollInternal(wait WaitFunc, condition ConditionFunc) error {
	done := make(chan struct{})
	defer close(done)
	return WaitFor(wait, condition, done)
}

-->

func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error {
	stopCh := make(chan struct{})
	defer close(stopCh)
  // type WaitFunc函数返回的c是传递执行信号的chan
	c := wait(stopCh)
	for {
		select {
      // c每取出一次数据则执行一次条件函数fn()
		case _, open := <-c:
			ok, err := fn()
			if err != nil {
				return err
			}
      // fn()条件满足则轮询结束返回nil
			if ok {
				return nil
			}
			if !open {
				return ErrWaitTimeout
			}
		case <-done:
			return ErrWaitTimeout
		}
	}
}

WaitFor函数可实现按条件结果来决定对执行函数的执行次数、结束时机的控制实现更高的可控性但与之对应的是执行信号、结束信号的发送全部需要在type waitFunc内部实现。即上面的Poll()函数中的poller(interval, timeout)waitFunc的实现,来看看:

func poller(interval, timeout time.Duration) WaitFunc {
	return WaitFunc(func(done <-chan struct{}) <-chan struct{} {
    // 执行信号chan
		ch := make(chan struct{})

		go func() {
			defer close(ch)
      // new一个ticker
			tick := time.NewTicker(interval)
			defer tick.Stop()
      
      // 默认无超时时间设定
			var after <-chan time.Time
			if timeout != 0 {
				// 当超时时间>0时设定超时判定计时器
				timer := time.NewTimer(timeout)
				after = timer.C
				defer timer.Stop()
			}

			for {
				select {
				case <-tick.C:
					// 达到interval tick时间后往ch插入一个执行信号
					select {
					case ch <- struct{}{}:
					default:
					}
				case <-after:
					return
				case <-done:
					return
				}
			}
		}()

		return ch
	})
}

从pollInternal的实现上来看看起来与Until差别不大但在WaitFunc的实现层面除了像poller()函数这样以定时器为条件外也可以取其他更加灵活的条件作为判定往执行chan发送信号的依据。