You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kubeSourceCodeNote/controller/Kubernetes源码学习-Controller-P...

764 lines
31 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# P3-Controller分类与Deployment Controller
## 前言
Controller部分的第一篇文章中我们从cobra启动命令入口开始进入到了多实例leader选举部分的代码对leader选举流程做了详细地分析
[Controller-P1-多实例leader选举](https://wqyin.cn/gitbooks/kubeSourceCodeNote/controller/Kubernetes源码学习-Controller-P1-多实例leader选举.html)
接着在第二篇中文字和图解简单描述了controller是如何结合client-go模块中的informer工作的为本篇及后面的几篇作铺垫
[Controller-P2-Controller与informer](https://wqyin.cn/gitbooks/kubeSourceCodeNote/controller/Kubernetes源码学习-Controller-P2-Controller与informer.html)
那么本篇,就接着第一篇往下,继续看代码。
## Controller的分类
**启动**
承接篇一在cobra入口之下controller的启动入口在这里
`cmd/kube-controller-manager/app/controllermanager.go:191`
```go
run := func(ctx context.Context) {}
```
==> `cmd/kube-controller-manager/app/controllermanager.go:217`,重点是这里的**NewControllerInitializers**函数。
```go
if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
klog.Fatalf("error starting controllers: %v", err)
}
```
==> `cmd/kube-controller-manager/app/controllermanager.go:343`
<img src="http://mycloudn.wqyin.cn/20200127122642.png" style="zoom:80%;" />
可以看到controller会对不同的资源分别初始化相应的controller包含我们常见的deployment、statefulset、endpoint、pvc等等资源controller种类有多达30余个。因此在controller整个章节中不会对它们逐一分析只会抽取几个常见有代表性地进行深入本篇就来看看deployment controller吧。
## Deployment Controller
### 初始化
`cmd/kube-controller-manager/app/controllermanager.go:354`
```go
controllers["deployment"] = startDeploymentController
```
==> `cmd/kube-controller-manager/app/apps.go:82`
```go
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
return nil, false, nil
}
dc, err := deployment.NewDeploymentController(
// deployment主要关注这3个资源: Deployment/ReplicaSet/Poddeployment通过replicaSet来管理Pod
// 这3个函数会返回相应资源的informer
ctx.InformerFactory.Apps().V1().Deployments(),
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("deployment-controller"),
)
if err != nil {
return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
// deployment controller 运行函数
go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
return nil, true, nil
}
```
dc.Run()函数第一个参数是worker的数量默认值是5个在这里定义的`pkg/controller/apis/config/v1alpha1/defaults.go:48`第二个参数是空结构体让go协程接收异常停止的信号。
==> `pkg/controller/deployment/deployment_controller.go:148`
```go
// Run begins watching and syncing.
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer dc.queue.ShutDown()
klog.Infof("Starting deployment controller")
defer klog.Infof("Shutting down deployment controller")
// 判断各个informer的缓存是否已经同步完毕的函数
if !controller.WaitForCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
return
}
// 启动多个worker开始工作
for i := 0; i < workers; i++ {
go wait.Until(dc.worker, time.Second, stopCh)
}
<-stopCh
}
```
controller.WaitForCacheSync函数是用来检测各个informer是否本地缓存已经同步完毕的函数返回值是bool类型。前面第二章讲到过informer为了加速和减轻apiserver的负担设计了local storage缓存因此这里做了一步缓存是否已同步的检测。
默认是5个worker每个worker调用wait.Until()方法每间隔1s循环执行dc.worker函数运行deployment controller的工作逻辑。wait.Until()这个循环调用的计时器函数还是挺有意思的,展开看下。
#### wait.Until循环计时器函数
`pkg/controller/deployment/deployment_controller.go:160`
==> `vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:88`
==>`vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:130`
```go
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
var t *time.Timer
var sawTimeout bool
for {
select {
case <-stopCh:
return
default:
}
jitteredPeriod := period
if jitterFactor > 0.0 {
jitteredPeriod = Jitter(period, jitterFactor)
}
// sliding这个布尔值的意思是是否将执行函数f()的执行时间计入执行间隔时间内如果为否则在f执行前就开始计时如果为是则在f执行后再开始计时。
if !sliding {
t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
}
func() {
defer runtime.HandleCrash()
f()
}()
if sliding {
t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
}
// select 下各个分支的权重是公平的因此stop信号的处理在循环开始之前和循环开始之后都分别判断了一次
select {
case <-stopCh:
return
// Timer.C是Timer结构体内部的一个channel计时器在到达指定的时间后会往此channel发送一个事件channel同时也可以被接收来触发其他的逻辑。这里的逻辑是判断如果f()执行超过计时器的超时时间那么加一个超时的标记sawTimeout。
case <-t.C:
sawTimeout = true
}
}
}
```
resetOrReuseTimer函数
```go
func resetOrReuseTimer(t *time.Timer, d time.Duration, sawTimeout bool) *time.Timer {
if t == nil {
return time.NewTimer(d)
}
if !t.Stop() && !sawTimeout {
<-t.C
}
t.Reset(d)
return t
}
```
概括一下这个函数是对timer模块的一个再封装重复利用timer计时器来每秒执行一次dc.worker().
#### dc.worker函数
`pkg/controller/deployment/deployment_controller.go:460`
==> `pkg/controller/deployment/deployment_controller.go:464`
```go
func (dc *DeploymentController) processNextWorkItem() bool {
// 从队列头部取出对象
key, quit := dc.queue.Get()
if quit {
return false
}
defer dc.queue.Done(key)
// 处理对象
err := dc.syncHandler(key.(string))
dc.handleErr(err, key)
return true
}
```
Deployment controller 的worker函数就是不断地调用processNextWorkItem函数processNextWorkItem函数是从work queue中获取待处理的对象(第二篇中informer图解中的第7-第8步),如果存在,那么执行相应后续的增删改查逻辑,如果不存在,那么就退出。
其中dc.queue.Get()接口方法的实现在这里:
`vendor/k8s.io/client-go/util/workqueue/queue.go:140`
```go
func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
if len(q.queue) == 0 {
// We must be shutting down.
return nil, true
}
// 取出队列的队首
item, q.queue = q.queue[0], q.queue[1:]
q.metrics.get(item)
// 对象加入正在处理中map
q.processing.insert(item)
// dirty map去除对象(dirty map中保存的是等待处理的对象)
q.dirty.delete(item)
return item, false
}
```
其中的`dc.syncHandler()`方法在这里:
pkg/controller/deployment/deployment_controller.go:135
```go
dc.syncHandler = dc.syncDeployment
```
==> pkg/controller/deployment/deployment_controller.go:560
所有的增删改(滚动更新)查操作,全部都在这个函数内部处理。
```go
func (dc *DeploymentController) syncDeployment(key string) error {
startTime := time.Now()
klog.V(4).Infof("Started syncing deployment %q (%v)", key, startTime)
defer func() {
klog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Since(startTime))
}()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
deployment, err := dc.dLister.Deployments(namespace).Get(name)
if errors.IsNotFound(err) {
klog.V(2).Infof("Deployment %v has been deleted", key)
return nil
}
if err != nil {
return err
}
// Deep-copy otherwise we are mutating our cache.
// TODO: Deep-copy only when needed.
d := deployment.DeepCopy()
everything := metav1.LabelSelector{}
if reflect.DeepEqual(d.Spec.Selector, &everything) {
// deployment必须包含selector标签
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
if d.Status.ObservedGeneration < d.Generation {
d.Status.ObservedGeneration = d.Generation
dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(d)
}
return nil
}
// 获取deployment所控制的replicaSet
rsList, err := dc.getReplicaSetsForDeployment(d)
if err != nil {
return err
}
// 获取所有的podmap结构按replicaSet分组key是rs。
// 检查deployment在重建的过程中是否还存在旧版本(未更新)的pod
podMap, err := dc.getPodMapForDeployment(d, rsList)
if err != nil {
return err
}
if d.DeletionTimestamp != nil {
return dc.syncStatusOnly(d, rsList)
}
// 检查deployment是否为pause暂停状态pause状态则调用sync方法同步deployment
if err = dc.checkPausedConditions(d); err != nil {
return err
}
if d.Spec.Paused {
return dc.sync(d, rsList)
}
// 判断本次deployment事件是否是一个回滚事件
// 一旦底层的rs更新到了一个新的版本就无法自动执行回滚了因此直到下一次队列中再次出现此deployment且不为rollback状态时才能无虞地触发更新rs。所以这里再进行一次判断如果deployment带有回滚标记那么先执行rs的回滚。
if getRollbackTo(d) != nil {
return dc.rollback(d, rsList)
}
// 判断本次deployment事件是否是一个scale事件是则调用sync方法同步deployment
scalingEvent, err := dc.isScalingEvent(d, rsList)
if err != nil {
return err
}
if scalingEvent {
return dc.sync(d, rsList)
}
// 更新deployment视Deployment.Spec.Strategy指定的更新策略类型来执行相应的更新操作
// 1.如果是rolloutRecreate类型则一次性杀死pod再重建
// 2.如果是rolloutRolling类型则滚动更新pod
switch d.Spec.Strategy.Type {
case apps.RecreateDeploymentStrategyType:
return dc.rolloutRecreate(d, rsList, podMap)
case apps.RollingUpdateDeploymentStrategyType:
return dc.rolloutRolling(d, rsList)
}
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}
```
### 暂停和扩(缩)容(/删除)
dc.sync方法这里出现了两次分别在pause状态和scaling状态调用比较关键分析一下sync方法的内容。
`pkg/controller/deployment/sync.go:48`
```go
func (dc *DeploymentController) sync(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
// 展开查看代码可以知道这里的newRS指的是找到模板hash值与当前的d Deployment 模板hash值相同的rsoldRSs则是所有的历史版本的rs
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
if err != nil {
return err
}
// 对比最新的rs和之前的rs如果需要scale缩扩容则执行scale方法
if err := dc.scale(d, newRS, oldRSs); err != nil {
// If we get an error while trying to scale, the deployment will be requeued
// so we can abort this resync
return err
}
// pause状态且不处于回滚状态的deployment进行清理(根据指定的保存历史版本数上限,清理超出限制的历史版本)
if d.Spec.Paused && getRollbackTo(d) == nil {
if err := dc.cleanupDeployment(oldRSs, d); err != nil {
return err
}
}
allRSs := append(oldRSs, newRS)
// 同步deployment状态
return dc.syncDeploymentStatus(allRSs, newRS, d)
}
```
来看看dc.scale()方法:
`pkg/controller/deployment/sync.go:289`
```go
func (dc *DeploymentController) scale(deployment *apps.Deployment, newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) error {
// FindActiveOrLatest方法返回值如果此时只有一个活跃的rs那么就返回这个rs如果不止那么就找出revision最新的rs返回
if activeOrLatest := deploymentutil.FindActiveOrLatest(newRS, oldRSs); activeOrLatest != nil {
if *(activeOrLatest.Spec.Replicas) == *(deployment.Spec.Replicas) {
// 如果rs已经和deployment指定的副本数一致直接return
return nil
}
_, _, err := dc.scaleReplicaSetAndRecordEvent(activeOrLatest, *(deployment.Spec.Replicas), deployment)
return err
}
// 如果最新的rs的已经收敛到了deployment的期望状态则旧rs需要被完全scale down缩容删除掉。
if deploymentutil.IsSaturated(deployment, newRS) {
for _, old := range controller.FilterActiveReplicaSets(oldRSs) {
if _, _, err := dc.scaleReplicaSetAndRecordEvent(old, 0, deployment); err != nil {
return err
}
}
return nil
}
// 在滚动更新的过程中需要控制旧rs与新rs所控制的模板pod的数量的总和多出的pod数量不能超过MaxSurge数因此是滚动更新的过程中旧rs和新rs控制得pod数量必然是一个此消彼长的过程
if deploymentutil.IsRollingUpdate(deployment) {
allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS))
allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
allowedSize := int32(0)
if *(deployment.Spec.Replicas) > 0 {
allowedSize = *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment)
}
// 可以增加或删除的pod数量结果正数则代表可以继续新增pod结果为负数则代表需要删除pod了
deploymentReplicasToAdd := allowedSize - allRSsReplicas
var scalingOperation string
switch {
case deploymentReplicasToAdd > 0:
// 如果是扩容那么把所有的rs按时间从最新到最旧的顺序排序
sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs))
scalingOperation = "up"
case deploymentReplicasToAdd < 0:
// 如果是缩容那么把所有的rs按时间从最旧到最新的顺序排序
sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs))
scalingOperation = "down"
}
// 遍历每一个rs, 用map保存此rs应该达到的pod的数量(等于当前数量+需scale数量
deploymentReplicasAdded := int32(0)
nameToSize := make(map[string]int32)
for i := range allRSs {
rs := allRSs[i]
if deploymentReplicasToAdd != 0 {
// 计算当前rs需scale的数量
proportion := deploymentutil.GetProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded)
// 总pod数量等于当前数量+scale数量
nameToSize[rs.Name] = *(rs.Spec.Replicas) + proportion
deploymentReplicasAdded += proportion
} else {
nameToSize[rs.Name] = *(rs.Spec.Replicas)
}
}
// Update all replica sets
for i := range allRSs {
rs := allRSs[i]
// Add/remove any leftovers to the largest replica set.
// 如果还有各rs加起来都未消化完的pod则交给上面排序后的第一个rs(最新或最旧的rs)。
if i == 0 && deploymentReplicasToAdd != 0 {
leftover := deploymentReplicasToAdd - deploymentReplicasAdded
nameToSize[rs.Name] = nameToSize[rs.Name] + leftover
if nameToSize[rs.Name] < 0 {
nameToSize[rs.Name] = 0
}
}
// 把这个rs scale到它应该达到的数量
if _, _, err := dc.scaleReplicaSet(rs, nameToSize[rs.Name], deployment, scalingOperation); err != nil {
// Return as soon as we fail, the deployment is requeued
return err
}
}
}
return nil
}
```
#### syncDeploymentStatus函数
在完成rs的scale和pause状态的逻辑处理后deployment的状态也需要与最新的rs同步因此这个函数就是用来同步deployment的状态的。
```go
func (dc *DeploymentController) syncDeploymentStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
newStatus := calculateStatus(allRSs, newRS, d)
if reflect.DeepEqual(d.Status, newStatus) {
return nil
}
newDeployment := d
newDeployment.Status = newStatus
_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(newDeployment)
return err
}
```
这个函数主要用来更新deployment的status字段的内容例如版本、副本数、可用副本数、更新副本数等等。
整个扩容的过程涉及所有rs的操作可能很容易混淆但其实只要记住在99%的情况下deployment只有一个活跃状态的rs即newRS大部分操作都是针对这个newRS做的那么上面的过程就容易理解很多了。
### 滚动更新
Deployment更新策略分为滚动更新和一次性更新更新方式其实都是类似只是一个是分批式一个是全量式这里看下滚动更新的代码。
deployment 的spec字段内的内容一旦发生变化就会触发rs的更新生成新版本的rs并且基于新rs进行副本扩容旧版本的rs则会缩容。
`pkg/controller/deployment/deployment_controller.go:644`
==> `pkg/controller/deployment/rolling.go:31`
```go
func (dc *DeploymentController) rolloutRolling(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
// 获取新的rs如果没有新的rs则创建newRS
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
if err != nil {
return err
}
allRSs := append(oldRSs, newRS)
// 对比判断newRS是否需要扩容(新rs管理的pod是否已达到目标数量)
scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, d)
if err != nil {
return err
}
if scaledUp {
// 扩容完毕更新deployment的status
return dc.syncRolloutStatus(allRSs, newRS, d)
}
// 对比判断oldRS是否需要缩容(旧rs管理的pod是否已经全部终结)
scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
if err != nil {
return err
}
if scaledDown {
// 缩容完毕更新deployment的status
return dc.syncRolloutStatus(allRSs, newRS, d)
}
// deployment 进入complete状态根据revision历史版本数限制清除旧的rs
if deploymentutil.DeploymentComplete(d, &d.Status) {
if err := dc.cleanupDeployment(oldRSs, d); err != nil {
return err
}
}
// 更新deployment的status
return dc.syncRolloutStatus(allRSs, newRS, d)
}
```
reconcileNewReplicaSet函数
这个函数返回bool值即是否应该扩容newRS的bool值
```go
func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
if *(newRS.Spec.Replicas) == *(deployment.Spec.Replicas) {
// deployment replicas 和newRS replicas相等则说明new rs已经无需扩容
return false, nil
}
if *(newRS.Spec.Replicas) > *(deployment.Spec.Replicas) {
// newRS replicas > deployment replicas,则说明newRS需要缩容返回值scaled此时值应当是false
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, *(deployment.Spec.Replicas), deployment)
return scaled, err
}
// newRS replicas < deployment replicas,则使用NewRSNewReplicas方法计算newRS此时应用拥有的pod副本的数量
newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS)
if err != nil {
return false, err
}
// 返回值scaled此时值应当是true
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, newReplicasCount, deployment)
return scaled, err
}
```
NewRSNewReplicas函数
计算newRS此时应该有的副本数量的函数
```go
func NewRSNewReplicas(deployment *apps.Deployment, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet) (int32, error) {
switch deployment.Spec.Strategy.Type {
// 滚动更新时
case apps.RollingUpdateDeploymentStrategyType:
// Check if we can scale up.
maxSurge, err := intstrutil.GetValueFromIntOrPercent(deployment.Spec.Strategy.RollingUpdate.MaxSurge, int(*(deployment.Spec.Replicas)), true)
if err != nil {
return 0, err
}
// 当前的副本数(当前值) = 所有版本的rs管理的pod数量的总和
currentPodCount := GetReplicaCountForReplicaSets(allRSs)
// 最多允许同时存在的副本数(最大值) = 指定副本数 + maxSurge的副本数(整数或者比例计算)
maxTotalPods := *(deployment.Spec.Replicas) + int32(maxSurge)
// 如果当前值比最大值还大那么说明不能再扩容了直接返回最新的newRS.Spec.Replicas
if currentPodCount >= maxTotalPods {
return *(newRS.Spec.Replicas), nil
}
// 否则,可扩容值 = 最大值 - 当前值
scaleUpCount := maxTotalPods - currentPodCount
// 但每一个版本的rs管理的副本数量不能超过deployment所指定的副本数量只有新旧版本的rs加起来的副本数可以突破到maxSurge的上限。因此这里的可扩容值要取这两个值之间的最小值。
scaleUpCount = int32(integer.IntMin(int(scaleUpCount), int(*(deployment.Spec.Replicas)-*(newRS.Spec.Replicas))))
// 此时newRS应有的副本数 = 当前值 + 可扩容值
return *(newRS.Spec.Replicas) + scaleUpCount, nil
case apps.RecreateDeploymentStrategyType:
// 非滚动更新时newRS的应用副本数 = deployment.Spec.Replicas,无弹性
return *(deployment.Spec.Replicas), nil
default:
return 0, fmt.Errorf("deployment type %v isn't supported", deployment.Spec.Strategy.Type)
}
}
```
reconcileOldReplicaSets函数
这个函数返回bool值即是否应该缩容oldRSs的bool值
```go
func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs)
if oldPodsCount == 0 {
// 已经缩容完毕,直接返回
return false, nil
}
// 当前所有的pod的数量(当前值)
allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
klog.V(4).Infof("New replica set %s/%s has %d available pods.", newRS.Namespace, newRS.Name, newRS.Status.AvailableReplicas)
// deployment 指定的最大不可用的副本数(最大不可用值)
maxUnavailable := deploymentutil.MaxUnavailable(*deployment)
// Check if we can scale down. We can scale down in the following 2 cases:
// * Some old replica sets have unhealthy replicas, we could safely scale down those unhealthy replicas since that won't further
// increase unavailability.
// * New replica set has scaled up and it's replicas becomes ready, then we can scale down old replica sets in a further step.
//
// maxScaledDown := allPodsCount - minAvailable - newReplicaSetPodsUnavailable
// take into account not only maxUnavailable and any surge pods that have been created, but also unavailable pods from
// the newRS, so that the unavailable pods from the newRS would not make us scale down old replica sets in a further
// step(that will increase unavailability).
//
// Concrete example:
//
// * 10 replicas
// * 2 maxUnavailable (absolute number, not percent)
// * 3 maxSurge (absolute number, not percent)
//
// case 1:
// * Deployment is updated, newRS is created with 3 replicas, oldRS is scaled down to 8, and newRS is scaled up to 5.
// * The new replica set pods crashloop and never become available.
// * allPodsCount is 13. minAvailable is 8. newRSPodsUnavailable is 5.
// * A node fails and causes one of the oldRS pods to become unavailable. However, 13 - 8 - 5 = 0, so the oldRS won't be scaled down.
// * The user notices the crashloop and does kubectl rollout undo to rollback.
// * newRSPodsUnavailable is 1, since we rolled back to the good replica set, so maxScaledDown = 13 - 8 - 1 = 4. 4 of the crashlooping pods will be scaled down.
// * The total number of pods will then be 9 and the newRS can be scaled up to 10.
//
// case 2:
// Same example, but pushing a new pod template instead of rolling back (aka "roll over"):
// * The new replica set created must start with 0 replicas because allPodsCount is already at 13.
// * However, newRSPodsUnavailable would also be 0, so the 2 old replica sets could be scaled down by 5 (13 - 8 - 0), which would then
// allow the new replica set to be scaled up by 5.
// Available指的是就绪探针结果为true的副本若默认未指定就绪探针则pod running之后自动视就绪为true
// 最小可用副本数(至少可用数)
minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
// newRs不可用数
newRSUnavailablePodCount := *(newRS.Spec.Replicas) - newRS.Status.AvailableReplicas
// 最大可缩容数 = 总数 - 最小可用数 - newRS不可用数(为了保证最小可用数因此此时newRS的不可用副本不能参与这个计算)
maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount
if maxScaledDown <= 0 {
return false, nil
}
// oldRS里不健康的副本无论如何都是需要清除的
// and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737
oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(oldRSs, deployment, maxScaledDown)
if err != nil {
return false, nil
}
klog.V(4).Infof("Cleaned up unhealthy replicas from old RSes by %d", cleanupCount)
// 还要对比最大可缩容数和deployment指定的最大同时不可用副本数这两者之间的最小值才是可缩容数量
allRSs = append(oldRSs, newRS)
scaledDownCount, err := dc.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, deployment)
if err != nil {
return false, nil
}
klog.V(4).Infof("Scaled down old RSes of deployment %s by %d", deployment.Name, scaledDownCount)
// oldRS里不健康的副本无论如何都是需要清除的
totalScaledDown := cleanupCount + scaledDownCount
// 判断缩容数是否大于0
return totalScaledDown > 0, nil
}
```
中间的英文注释里的举例说明非常详细,可以看一下注释。
#### syncRolloutStatus函数
这个函数主要用于更新deployment的status字段和其中的condition字段。
```go
func (dc *DeploymentController) syncRolloutStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
newStatus := calculateStatus(allRSs, newRS, d)
if !util.HasProgressDeadline(d) {
util.RemoveDeploymentCondition(&newStatus, apps.DeploymentProgressing)
}
currentCond := util.GetDeploymentCondition(d.Status, apps.DeploymentProgressing)
/**
判断deployment是否为complete状态条件有多个
1. newRS.replicas = newRS.Status.UpdatedReplicas 说明newRS的副本更新已全部完成
2. newRS.status.condition.reason = miniumReplicasAvailable
**/
isCompleteDeployment := newStatus.Replicas == newStatus.UpdatedReplicas && currentCond != nil && currentCond.Reason == util.NewRSAvailableReason
// 未达到complete状态的deployment才进行下面的检查
if util.HasProgressDeadline(d) && !isCompleteDeployment {
switch {
case util.DeploymentComplete(d, &newStatus):
// Update the deployment conditions with a message for the new replica set that
// was successfully deployed. If the condition already exists, we ignore this update.
msg := fmt.Sprintf("Deployment %q has successfully progressed.", d.Name)
if newRS != nil {
msg = fmt.Sprintf("ReplicaSet %q has successfully progressed.", newRS.Name)
}
condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, util.NewRSAvailableReason, msg)
util.SetDeploymentCondition(&newStatus, *condition)
case util.DeploymentProgressing(d, &newStatus):
// If there is any progress made, continue by not checking if the deployment failed. This
// behavior emulates the rolling updater progressDeadline check.
msg := fmt.Sprintf("Deployment %q is progressing.", d.Name)
if newRS != nil {
msg = fmt.Sprintf("ReplicaSet %q is progressing.", newRS.Name)
}
condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, util.ReplicaSetUpdatedReason, msg)
// Update the current Progressing condition or add a new one if it doesn't exist.
// If a Progressing condition with status=true already exists, we should update
// everything but lastTransitionTime. SetDeploymentCondition already does that but
// it also is not updating conditions when the reason of the new condition is the
// same as the old. The Progressing condition is a special case because we want to
// update with the same reason and change just lastUpdateTime iff we notice any
// progress. That's why we handle it here.
if currentCond != nil {
if currentCond.Status == v1.ConditionTrue {
condition.LastTransitionTime = currentCond.LastTransitionTime
}
util.RemoveDeploymentCondition(&newStatus, apps.DeploymentProgressing)
}
util.SetDeploymentCondition(&newStatus, *condition)
case util.DeploymentTimedOut(d, &newStatus):
// Update the deployment with a timeout condition. If the condition already exists,
// we ignore this update.
msg := fmt.Sprintf("Deployment %q has timed out progressing.", d.Name)
if newRS != nil {
msg = fmt.Sprintf("ReplicaSet %q has timed out progressing.", newRS.Name)
}
condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionFalse, util.TimedOutReason, msg)
util.SetDeploymentCondition(&newStatus, *condition)
}
}
```
DeploymentCondition在这里面反复出现便于理解参照一个正常状态的deployment condition样例:
<img src="http://mycloudn.wqyin.cn/20200130125357.png" style="zoom:50%;" />
#### 总结
滚动更新过程中主要是通过调用`reconcileNewReplicaSet`函数对 newRS 扩容,调用 `reconcileOldReplicaSets`函数 对 oldRSs缩容按照 `maxSurge``maxUnavailable` 的约束计时器间隔1s反复执行、收敛、修正最终达到期望状态完成更新。
## 总结
Deployment的回滚、扩(缩)容、暂停、更新等操作主要是通过修改rs来完成的。其中rs的版本控制、replicas数量控制是其最核心也是难以理解的地方但是只要记住99%的时间里deployment对应的活跃的rs只有一个只有更新时才会出现2个rs极少数情况下(短时间重复更新)才会出现2个以上的rs对于上面源码的理解就会容易许多。
另外从上面这么多步骤的拆解也可以发现deployment的更新实际基本不涉及对pod的直接操作因此本章后续的章节会分析一下replicaSet controller是怎么和pod进行管理交互的。