controller p2/p3

add-license-1
yinwenqin 4 years ago
parent 83f4331eec
commit 1fa26993c1

BIN
.DS_Store vendored

Binary file not shown.

@ -0,0 +1,243 @@
# P2-Controller与informer
## 前言
Controller作为k8s的资源控制组件必定要实时地监控对比资源的目标状态和当前状态这其中会与apiserver产生大量的交互。在k8s中k8s各个组件都会与apiServer交互因此k8s在项目中封装了一个client-go公用模块路径位于项目`vendor/k8s.io/client-go`非常多的组件向ApiServer的curd操作都在client-go包中封装client-go是k8s项目的核心包之一。它与controller的工作流程密不可分在读controller源码理解其工作流程前必须首先对informer有一定得了解因此本篇专门对informer机制进行简单地介绍说明为后面的文章铺垫。
## ApiServer的连接方式
**1.短连接**
获取kubernetes某种资源的方式有多种常见的如kubectl、调用apiserver restful api的接口restful api接口详情查看官方手册。同时也可通过kubectl命令来查看操作对应的api例如
```
kubectl get pod POD_NAME -v=9
```
输出信息中会包含此操作对应的api url
但这种全量型的操作方式在大集群规模下开销还是比较大的因此k8s还提供长连接的watch接口。
**2.长连接**
watch接口是对list接口的一种改进在调用watch接口后首先会一次性返回list的数据同时会保持会话连接后续的接口对象的curd都会产生事件由apiserver将变更数据推送给调用端调用端接收数据后再更新初始接收到的list的全量数据以及其他操作。
**3.client-go**
watch依然比较麻烦毕竟list获取的数据以及后续watch到的数据需要调用端在内部处理和更新缓存。索性官方提供了一个client-go客户端工具包封装里面提供多种apiserver相关操作做到开箱即用k8s各组件也都使用了它。
## client-go工作模式
在client-go中informer是对watch操作的一次再封装 Informer是一个带有本地缓存、索引功能的client端(list/watch)在绝大多数场景下客户端与apiserver的交互都是读多写少的因此做好本地缓存(Store)和索引(Index)可以大幅减少开销提升性能。同时informer可以注册相应的EventHandler事件触发器的在执行资源更新后触发其他连锁操作。
#### 工作流程图
![](http://mycloudn.kokoerp.com/20191218161137.png)
#### 流程图组件解释
图中有上下分层上层逻辑由client-go内部封装下层的逻辑由controller内部完成对照上图分层说明
**上层**
- Reflector反射器调用 Kubernetes 的 List/Watch API实现对 apiserver 指定类型对象的监控(List/Watch)将获取的数据反序列化成对象实例存入delta 缓存队列中;
- DeltaIFIFO Queue一个增量fifo缓存队列接收来自 Reflector 传递的反序列化对象;
- Informer是这个流程中的关键重要的桥梁主要有两个工作1.从DeltaIFIFO Queue中获取对象更新LocalStore的cache数据 2.触发后续的eventHandler生成工作队列对象加入WorkQueue供后面的controller读取进行实际的控制处理工作。值得注意的是每一种资源都对应一个informer多种资源对应多个informer但每个informer都会与apiserver建立起一个watch长连接通常controller都会使用SharedInformerFactory这个单例工厂模式来使所有的informer的创建全部都经过这个单例工厂从而保证每种资源对应的informer都是唯一且可复用的以降低开销。
- LocalStoreinformer 的 cache缓存这里缓存的是从 apiserver 中获取得到的对象(其中有一部分可能还在DeltaFIFO 中没来得及放入缓存里来)此时client再查询对象的时候就直接从 cache 中查找,减少了 apiserver 的压力LocalStore 只会被 Lister 的 List/Get 读操作的方法访问;
**下层**
- ResourceEventHandler由controller注册由informer来触发当resource object符合过滤规则时触发ResourceEventHandler将其丢入WorkQueue内。
- WorkQueueinformer更新本地的缓存后会根据注册的相关EventHandler生成事件放入WorkQueue内Controller 接收 WorkQueue 中的事件然后相应执行controller的业务逻辑一般来说就是保证资源对象的目标状态与实际状态达成一致的逻辑。
如果你想自定义controller关于infomer的实践建议参考这里
[如何用 client-go 拓展 Kubernetes 的 API](https://mp.weixin.qq.com/s?__biz=MzU1OTAzNzc5MQ==&mid=2247484052&idx=1&sn=cec9f4a1ee0d21c5b2c51bd147b8af59&chksm=fc1c2ea4cb6ba7b283eef5ac4a45985437c648361831bc3e6dd5f38053be1968b3389386e415&scene=21#wechat_redirect)
## APIVersion的说明
在每一个资源的申明yaml文件中有一个必须存在的一个键`apiVersion`这代表了apiserver对相应资源的rest操作所绑定的group和version常见的值例如"apps/v1",斜杠前一位代表的是api的group斜杠后的值则代表的是version。每一种资源的rest操作都必须绑定正确的group和version。
在informer的代码中因为与apiserver直接交互因此api的group和version相关的结构体会反复出现且结构体命名非常容易混淆。因此这里列出进行说明对阅读informer的源码有一定额外的帮助。
- **APIGroup** : api所属组别的信息其中包含的groupVersion字段即我们每个yaml文件中apiVersion所指定的
- **APIResource**所有使用到的resouce资源类型包含pod/deployment/svc等等
**APIGroup结构体**
```go
type APIGroup struct {
TypeMeta `json:",inline"`
Name string `json:"name" protobuf:"bytes,1,opt,name=name"`
Versions []GroupVersionForDiscovery `json:"versions" protobuf:"bytes,2,rep,name=versions"`
PreferredVersion GroupVersionForDiscovery `json:"preferredVersion,omitempty" protobuf:"bytes,3,opt,name=preferredVersion"`
ServerAddressByClientCIDRs []ServerAddressByClientCIDR `json:"serverAddressByClientCIDRs,omitempty" protobuf:"bytes,4,rep,name=serverAddressByClientCIDRs"`
}
```
**APIGroup实例**
通过如下方式快速访问的apiserver的rest api
```shell
# 节点上运行kubectl proxy代理,免去认证步骤,这不太安全,只建议测试使用,使用完毕后及时关闭
~# kubectl proxy --port=8001 &
# 查看所有的APIGroupgroups 数组内的每一个成员都是一个APIGroup实例
~#curl 127.0.0.1:8001/apis/
{
"kind": "APIGroupList",
"apiVersion": "v1",
"groups": [
{
"name": "apiregistration.k8s.io",
"versions": [
{
"groupVersion": "apiregistration.k8s.io/v1",
"version": "v1"
},
{
"groupVersion": "apiregistration.k8s.io/v1beta1",
"version": "v1beta1"
}
],
"preferredVersion": {
"groupVersion": "apiregistration.k8s.io/v1",
"version": "v1"
}
},
{
"name": "extensions",
"versions": [
{
"groupVersion": "extensions/v1beta1",
"version": "v1beta1"
}
],
"preferredVersion": {
"groupVersion": "extensions/v1beta1",
"version": "v1beta1"
}
},
...
}
```
**APIResource结构体**
```go
type APIResourceList struct {
TypeMeta `json:",inline"`
GroupVersion string `json:"groupVersion" protobuf:"bytes,1,opt,name=groupVersion"`
APIResources []APIResource `json:"resources" protobuf:"bytes,2,rep,name=resources"`
}
```
**APIResource实例**
Resource分为多个groupVersion
```shell
~# curl 127.0.0.1:8001/api/v1 #默认资源
~# curl 127.0.0.1:8001/apis/apps/v1/ # 扩展资源
```
具体参考官方api说明
https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.14/
下面是一个APIResourceList实例
```shell
~# curl 127.0.0.1:8001/api/v1
{
"kind": "APIResourceList",
"groupVersion": "v1",
"resources": [
{
"name": "pods",
"singularName": "",
"namespaced": true,
"kind": "Pod",
"verbs": [
"create",
"delete",
"deletecollection",
"get",
"list",
"patch",
"update",
"watch"
],
"shortNames": [
"po"
],
"categories": [
"all"
]
},
{
"name": "configmaps",
"singularName": "",
"namespaced": true,
"kind": "ConfigMap",
"verbs": [
"create",
"delete",
"deletecollection",
"get",
"list",
"patch",
"update",
"watch"
],
"shortNames": [
"cm"
]
},
{
"name": "endpoints",
"singularName": "",
"namespaced": true,
"kind": "Endpoints",
"verbs": [
"create",
"delete",
"deletecollection",
"get",
"list",
"patch",
"update",
"watch"
],
"shortNames": [
"ep"
]
},
...
]
}
```
## 总结
本篇先简单介绍一下informer和apiGroup的相关信息以及和controller组件结合的工作流程为后面的几篇(deploymentCrontroller、replicaSet controller 、statefulSetController等)分析作铺垫后面的文章中会反复提到本篇上方的informer与controller结合工作的流程描述和图解。

@ -0,0 +1,763 @@
# P3-Controller分类与Deployment Controller
## 前言
Controller部分的第一篇文章中我们从cobra启动命令入口开始进入到了多实例leader选举部分的代码对leader选举流程做了详细地分析
接着在第二篇中文字和图解简单描述了controller是如何结合client-go模块中的informer工作的为本篇及后面的几篇作铺垫
那么本篇,就接着第一篇往下,继续代码分析。
## Controller的分类
**启动**
承接篇一在cobra入口之下controller的启动入口在这里
`cmd/kube-controller-manager/app/controllermanager.go:191`
```go
run := func(ctx context.Context) {}
```
==> `cmd/kube-controller-manager/app/controllermanager.go:217`,重点是这里的**NewControllerInitializers**函数。
```go
if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
klog.Fatalf("error starting controllers: %v", err)
}
```
==> `cmd/kube-controller-manager/app/controllermanager.go:343`
<img src="http://mycloudn.kokoerp.com/20200127122642.png" style="zoom:80%;" />
可以看到controller会对不同的资源分别初始化相应的controller包含我们常见的deployment、statefulset、endpoint、pvc等等资源controller种类有多达30余个。因此在controller整个章节中不会对它们逐一分析只会抽取几个常见有代表性地进行深入本篇就来看看deployment controller吧。
## Deployment Controller
### 初始化
`cmd/kube-controller-manager/app/controllermanager.go:354`
```go
controllers["deployment"] = startDeploymentController
```
==> `cmd/kube-controller-manager/app/apps.go:82`
```go
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
return nil, false, nil
}
dc, err := deployment.NewDeploymentController(
// deployment主要关注这3个资源: Deployment/ReplicaSet/Poddeployment通过replicaSet来管理Pod
// 这3个函数会返回相应资源的informer
ctx.InformerFactory.Apps().V1().Deployments(),
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("deployment-controller"),
)
if err != nil {
return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
// deployment controller 运行函数
go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
return nil, true, nil
}
```
dc.Run()函数第一个参数是worker的数量默认值是5个在这里定义的`pkg/controller/apis/config/v1alpha1/defaults.go:48`第二个参数是空结构体让go协程接收异常停止的信号。
==> `pkg/controller/deployment/deployment_controller.go:148`
```go
// Run begins watching and syncing.
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer dc.queue.ShutDown()
klog.Infof("Starting deployment controller")
defer klog.Infof("Shutting down deployment controller")
// 判断各个informer的缓存是否已经同步完毕的函数
if !controller.WaitForCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
return
}
// 启动多个worker开始工作
for i := 0; i < workers; i++ {
go wait.Until(dc.worker, time.Second, stopCh)
}
<-stopCh
}
```
controller.WaitForCacheSync函数是用来检测各个informer是否本地缓存已经同步完毕的函数返回值是bool类型。前面第二章讲到过informer为了加速和减轻apiserver的负担设计了local storage缓存因此这里做了一步缓存是否已同步的检测。
默认是5个worker每个worker调用wait.Until()方法每间隔1s循环执行dc.worker函数运行deployment controller的工作逻辑。wait.Until()这个循环调用的计时器函数还是挺有意思的,展开看下。
#### wait.Until循环计时器函数
`pkg/controller/deployment/deployment_controller.go:160`
==> `vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:88`
==>`vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:130`
```go
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
var t *time.Timer
var sawTimeout bool
for {
select {
case <-stopCh:
return
default:
}
jitteredPeriod := period
if jitterFactor > 0.0 {
jitteredPeriod = Jitter(period, jitterFactor)
}
// sliding这个布尔值的意思是是否将执行函数f()的执行时间计入执行间隔时间内如果为否则在f执行前就开始计时如果为是则在f执行后再开始计时。
if !sliding {
t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
}
func() {
defer runtime.HandleCrash()
f()
}()
if sliding {
t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
}
// select 下各个分支的权重是公平的因此stop信号的处理在循环开始之前和循环开始之后都分别判断了一次
select {
case <-stopCh:
return
// Timer.C是Timer结构体内部的一个channel计时器在到达指定的时间后会往此channel发送一个事件channel同时也可以被接收来触发其他的逻辑。这里的逻辑是判断如果f()执行超过计时器的超时时间那么加一个超时的标记sawTimeout。
case <-t.C:
sawTimeout = true
}
}
}
```
resetOrReuseTimer函数
```go
func resetOrReuseTimer(t *time.Timer, d time.Duration, sawTimeout bool) *time.Timer {
if t == nil {
return time.NewTimer(d)
}
if !t.Stop() && !sawTimeout {
<-t.C
}
t.Reset(d)
return t
}
```
概括一下这个函数是对timer模块的一个再封装重复利用timer计时器来每秒执行一次dc.worker().
#### dc.worker函数
`pkg/controller/deployment/deployment_controller.go:460`
==> `pkg/controller/deployment/deployment_controller.go:464`
```go
func (dc *DeploymentController) processNextWorkItem() bool {
// 从队列头部取出对象
key, quit := dc.queue.Get()
if quit {
return false
}
defer dc.queue.Done(key)
// 处理对象
err := dc.syncHandler(key.(string))
dc.handleErr(err, key)
return true
}
```
Deployment controller 的worker函数就是不断地调用processNextWorkItem函数processNextWorkItem函数是从work queue中获取待处理的对象(第二篇中informer图解中的第7-第8步),如果存在,那么执行相应后续的增删改查逻辑,如果不存在,那么就退出。
其中dc.queue.Get()接口方法的实现在这里:
`vendor/k8s.io/client-go/util/workqueue/queue.go:140`
```go
func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
if len(q.queue) == 0 {
// We must be shutting down.
return nil, true
}
// 取出队列的队首
item, q.queue = q.queue[0], q.queue[1:]
q.metrics.get(item)
// 对象加入正在处理中map
q.processing.insert(item)
// dirty map去除对象(dirty map中保存的是等待处理的对象)
q.dirty.delete(item)
return item, false
}
```
其中的`dc.syncHandler()`方法在这里:
pkg/controller/deployment/deployment_controller.go:135
```go
dc.syncHandler = dc.syncDeployment
```
==> pkg/controller/deployment/deployment_controller.go:560
所有的增删改(滚动更新)查操作,全部都在这个函数内部处理。
```go
func (dc *DeploymentController) syncDeployment(key string) error {
startTime := time.Now()
klog.V(4).Infof("Started syncing deployment %q (%v)", key, startTime)
defer func() {
klog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Since(startTime))
}()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
deployment, err := dc.dLister.Deployments(namespace).Get(name)
if errors.IsNotFound(err) {
klog.V(2).Infof("Deployment %v has been deleted", key)
return nil
}
if err != nil {
return err
}
// Deep-copy otherwise we are mutating our cache.
// TODO: Deep-copy only when needed.
d := deployment.DeepCopy()
everything := metav1.LabelSelector{}
if reflect.DeepEqual(d.Spec.Selector, &everything) {
// deployment必须包含selector标签
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
if d.Status.ObservedGeneration < d.Generation {
d.Status.ObservedGeneration = d.Generation
dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(d)
}
return nil
}
// 获取deployment所控制的replicaSet
rsList, err := dc.getReplicaSetsForDeployment(d)
if err != nil {
return err
}
// 获取所有的podmap结构按replicaSet分组key是rs。
// 检查deployment在重建的过程中是否还存在旧版本(未更新)的pod
podMap, err := dc.getPodMapForDeployment(d, rsList)
if err != nil {
return err
}
if d.DeletionTimestamp != nil {
return dc.syncStatusOnly(d, rsList)
}
// 检查deployment是否为pause暂停状态pause状态则调用sync方法同步deployment
if err = dc.checkPausedConditions(d); err != nil {
return err
}
if d.Spec.Paused {
return dc.sync(d, rsList)
}
// 判断本次deployment事件是否是一个回滚事件
// 一旦底层的rs更新到了一个新的版本就无法自动执行回滚了因此直到下一次队列中再次出现此deployment且不为rollback状态时才能无虞地触发更新rs。所以这里再进行一次判断如果deployment带有回滚标记那么先执行rs的回滚。
if getRollbackTo(d) != nil {
return dc.rollback(d, rsList)
}
// 判断本次deployment事件是否是一个scale事件是则调用sync方法同步deployment
scalingEvent, err := dc.isScalingEvent(d, rsList)
if err != nil {
return err
}
if scalingEvent {
return dc.sync(d, rsList)
}
// 更新deployment视Deployment.Spec.Strategy指定的更新策略类型来执行相应的更新操作
// 1.如果是rolloutRecreate类型则一次性杀死pod再重建
// 2.如果是rolloutRolling类型则滚动更新pod
switch d.Spec.Strategy.Type {
case apps.RecreateDeploymentStrategyType:
return dc.rolloutRecreate(d, rsList, podMap)
case apps.RollingUpdateDeploymentStrategyType:
return dc.rolloutRolling(d, rsList)
}
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}
```
### 暂停和扩(缩)容(/删除)
dc.sync方法这里出现了两次分别在pause状态和scaling状态调用比较关键分析一下sync方法的内容。
`pkg/controller/deployment/sync.go:48`
```go
func (dc *DeploymentController) sync(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
// 展开查看代码可以知道这里的newRS指的是找到模板hash值与当前的d Deployment 模板hash值相同的rsoldRSs则是所有的历史版本的rs
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
if err != nil {
return err
}
// 对比最新的rs和之前的rs如果需要scale缩扩容则执行scale方法
if err := dc.scale(d, newRS, oldRSs); err != nil {
// If we get an error while trying to scale, the deployment will be requeued
// so we can abort this resync
return err
}
// pause状态且不处于回滚状态的deployment进行清理(根据指定的保存历史版本数上限,清理超出限制的历史版本)
if d.Spec.Paused && getRollbackTo(d) == nil {
if err := dc.cleanupDeployment(oldRSs, d); err != nil {
return err
}
}
allRSs := append(oldRSs, newRS)
// 同步deployment状态
return dc.syncDeploymentStatus(allRSs, newRS, d)
}
```
来看看dc.scale()方法:
`pkg/controller/deployment/sync.go:289`
```go
func (dc *DeploymentController) scale(deployment *apps.Deployment, newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) error {
// FindActiveOrLatest方法返回值如果此时只有一个活跃的rs那么就返回这个rs如果不止那么就找出revision最新的rs返回
if activeOrLatest := deploymentutil.FindActiveOrLatest(newRS, oldRSs); activeOrLatest != nil {
if *(activeOrLatest.Spec.Replicas) == *(deployment.Spec.Replicas) {
// 如果rs已经和deployment指定的副本数一致直接return
return nil
}
_, _, err := dc.scaleReplicaSetAndRecordEvent(activeOrLatest, *(deployment.Spec.Replicas), deployment)
return err
}
// 如果最新的rs的已经收敛到了deployment的期望状态则旧rs需要被完全scale down缩容删除掉。
if deploymentutil.IsSaturated(deployment, newRS) {
for _, old := range controller.FilterActiveReplicaSets(oldRSs) {
if _, _, err := dc.scaleReplicaSetAndRecordEvent(old, 0, deployment); err != nil {
return err
}
}
return nil
}
// 在滚动更新的过程中需要控制旧rs与新rs所控制的模板pod的数量的总和多出的pod数量不能超过MaxSurge数因此是滚动更新的过程中旧rs和新rs控制得pod数量必然是一个此消彼长的过程
if deploymentutil.IsRollingUpdate(deployment) {
allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS))
allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
allowedSize := int32(0)
if *(deployment.Spec.Replicas) > 0 {
allowedSize = *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment)
}
// 可以增加或删除的pod数量结果正数则代表可以继续新增pod结果为负数则代表需要删除pod了
deploymentReplicasToAdd := allowedSize - allRSsReplicas
var scalingOperation string
switch {
case deploymentReplicasToAdd > 0:
// 如果是扩容那么把所有的rs按时间从最新到最旧的顺序排序
sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs))
scalingOperation = "up"
case deploymentReplicasToAdd < 0:
// 如果是缩容那么把所有的rs按时间从最旧到最新的顺序排序
sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs))
scalingOperation = "down"
}
// 遍历每一个rs, 用map保存此rs应该达到的pod的数量(等于当前数量+需scale数量
deploymentReplicasAdded := int32(0)
nameToSize := make(map[string]int32)
for i := range allRSs {
rs := allRSs[i]
if deploymentReplicasToAdd != 0 {
// 计算当前rs需scale的数量
proportion := deploymentutil.GetProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded)
// 总pod数量等于当前数量+scale数量
nameToSize[rs.Name] = *(rs.Spec.Replicas) + proportion
deploymentReplicasAdded += proportion
} else {
nameToSize[rs.Name] = *(rs.Spec.Replicas)
}
}
// Update all replica sets
for i := range allRSs {
rs := allRSs[i]
// Add/remove any leftovers to the largest replica set.
// 如果还有各rs加起来都未消化完的pod则交给上面排序后的第一个rs(最新或最旧的rs)。
if i == 0 && deploymentReplicasToAdd != 0 {
leftover := deploymentReplicasToAdd - deploymentReplicasAdded
nameToSize[rs.Name] = nameToSize[rs.Name] + leftover
if nameToSize[rs.Name] < 0 {
nameToSize[rs.Name] = 0
}
}
// 把这个rs scale到它应该达到的数量
if _, _, err := dc.scaleReplicaSet(rs, nameToSize[rs.Name], deployment, scalingOperation); err != nil {
// Return as soon as we fail, the deployment is requeued
return err
}
}
}
return nil
}
```
#### syncDeploymentStatus函数
在完成rs的scale和pause状态的逻辑处理后deployment的状态也需要与最新的rs同步因此这个函数就是用来同步deployment的状态的。
```go
func (dc *DeploymentController) syncDeploymentStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
newStatus := calculateStatus(allRSs, newRS, d)
if reflect.DeepEqual(d.Status, newStatus) {
return nil
}
newDeployment := d
newDeployment.Status = newStatus
_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(newDeployment)
return err
}
```
这个函数主要用来更新deployment的status字段的内容例如版本、副本数、可用副本数、更新副本数等等。
整个扩容的过程涉及所有rs的操作可能很容易混淆但其实只要记住在99%的情况下deployment只有一个活跃状态的rs即newRS大部分操作都是针对这个newRS做的那么上面的过程就容易理解很多了。
### 滚动更新
Deployment更新策略分为滚动更新和一次性更新更新方式其实都是类似只是一个是分批式一个是全量式这里看下滚动更新的代码。
deployment 的spec字段内的内容一旦发生变化就会触发rs的更新生成新版本的rs并且基于新rs进行副本扩容旧版本的rs则会缩容。
`pkg/controller/deployment/deployment_controller.go:644`
==> `pkg/controller/deployment/rolling.go:31`
```go
func (dc *DeploymentController) rolloutRolling(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
// 获取新的rs如果没有新的rs则创建newRS
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
if err != nil {
return err
}
allRSs := append(oldRSs, newRS)
// 对比判断newRS是否需要扩容(新rs管理的pod是否已达到目标数量)
scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, d)
if err != nil {
return err
}
if scaledUp {
// 扩容完毕更新deployment的status
return dc.syncRolloutStatus(allRSs, newRS, d)
}
// 对比判断oldRS是否需要缩容(旧rs管理的pod是否已经全部终结)
scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
if err != nil {
return err
}
if scaledDown {
// 缩容完毕更新deployment的status
return dc.syncRolloutStatus(allRSs, newRS, d)
}
// deployment 进入complete状态根据revision历史版本数限制清除旧的rs
if deploymentutil.DeploymentComplete(d, &d.Status) {
if err := dc.cleanupDeployment(oldRSs, d); err != nil {
return err
}
}
// 更新deployment的status
return dc.syncRolloutStatus(allRSs, newRS, d)
}
```
reconcileNewReplicaSet函数
这个函数返回bool值即是否应该扩容newRS的bool值
```go
func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
if *(newRS.Spec.Replicas) == *(deployment.Spec.Replicas) {
// deployment replicas 和newRS replicas相等则说明new rs已经无需扩容
return false, nil
}
if *(newRS.Spec.Replicas) > *(deployment.Spec.Replicas) {
// newRS replicas > deployment replicas,则说明newRS需要缩容返回值scaled此时值应当是false
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, *(deployment.Spec.Replicas), deployment)
return scaled, err
}
// newRS replicas < deployment replicas,使NewRSNewReplicasnewRSpod
newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS)
if err != nil {
return false, err
}
// 返回值scaled此时值应当是true
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, newReplicasCount, deployment)
return scaled, err
}
```
NewRSNewReplicas函数
计算newRS此时应该有的副本数量的函数
```go
func NewRSNewReplicas(deployment *apps.Deployment, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet) (int32, error) {
switch deployment.Spec.Strategy.Type {
// 滚动更新时
case apps.RollingUpdateDeploymentStrategyType:
// Check if we can scale up.
maxSurge, err := intstrutil.GetValueFromIntOrPercent(deployment.Spec.Strategy.RollingUpdate.MaxSurge, int(*(deployment.Spec.Replicas)), true)
if err != nil {
return 0, err
}
// 当前的副本数(当前值) = 所有版本的rs管理的pod数量的总和
currentPodCount := GetReplicaCountForReplicaSets(allRSs)
// 最多允许同时存在的副本数(最大值) = 指定副本数 + maxSurge的副本数(整数或者比例计算)
maxTotalPods := *(deployment.Spec.Replicas) + int32(maxSurge)
// 如果当前值比最大值还大那么说明不能再扩容了直接返回最新的newRS.Spec.Replicas
if currentPodCount >= maxTotalPods {
return *(newRS.Spec.Replicas), nil
}
// 否则,可扩容值 = 最大值 - 当前值
scaleUpCount := maxTotalPods - currentPodCount
// 但每一个版本的rs管理的副本数量不能超过deployment所指定的副本数量只有新旧版本的rs加起来的副本数可以突破到maxSurge的上限。因此这里的可扩容值要取这两个值之间的最小值。
scaleUpCount = int32(integer.IntMin(int(scaleUpCount), int(*(deployment.Spec.Replicas)-*(newRS.Spec.Replicas))))
// 此时newRS应有的副本数 = 当前值 + 可扩容值
return *(newRS.Spec.Replicas) + scaleUpCount, nil
case apps.RecreateDeploymentStrategyType:
// 非滚动更新时newRS的应用副本数 = deployment.Spec.Replicas,无弹性
return *(deployment.Spec.Replicas), nil
default:
return 0, fmt.Errorf("deployment type %v isn't supported", deployment.Spec.Strategy.Type)
}
}
```
reconcileOldReplicaSets函数
这个函数返回bool值即是否应该缩容oldRSs的bool值
```go
func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs)
if oldPodsCount == 0 {
// 已经缩容完毕,直接返回
return false, nil
}
// 当前所有的pod的数量(当前值)
allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
klog.V(4).Infof("New replica set %s/%s has %d available pods.", newRS.Namespace, newRS.Name, newRS.Status.AvailableReplicas)
// deployment 指定的最大不可用的副本数(最大不可用值)
maxUnavailable := deploymentutil.MaxUnavailable(*deployment)
// Check if we can scale down. We can scale down in the following 2 cases:
// * Some old replica sets have unhealthy replicas, we could safely scale down those unhealthy replicas since that won't further
// increase unavailability.
// * New replica set has scaled up and it's replicas becomes ready, then we can scale down old replica sets in a further step.
//
// maxScaledDown := allPodsCount - minAvailable - newReplicaSetPodsUnavailable
// take into account not only maxUnavailable and any surge pods that have been created, but also unavailable pods from
// the newRS, so that the unavailable pods from the newRS would not make us scale down old replica sets in a further
// step(that will increase unavailability).
//
// Concrete example:
//
// * 10 replicas
// * 2 maxUnavailable (absolute number, not percent)
// * 3 maxSurge (absolute number, not percent)
//
// case 1:
// * Deployment is updated, newRS is created with 3 replicas, oldRS is scaled down to 8, and newRS is scaled up to 5.
// * The new replica set pods crashloop and never become available.
// * allPodsCount is 13. minAvailable is 8. newRSPodsUnavailable is 5.
// * A node fails and causes one of the oldRS pods to become unavailable. However, 13 - 8 - 5 = 0, so the oldRS won't be scaled down.
// * The user notices the crashloop and does kubectl rollout undo to rollback.
// * newRSPodsUnavailable is 1, since we rolled back to the good replica set, so maxScaledDown = 13 - 8 - 1 = 4. 4 of the crashlooping pods will be scaled down.
// * The total number of pods will then be 9 and the newRS can be scaled up to 10.
//
// case 2:
// Same example, but pushing a new pod template instead of rolling back (aka "roll over"):
// * The new replica set created must start with 0 replicas because allPodsCount is already at 13.
// * However, newRSPodsUnavailable would also be 0, so the 2 old replica sets could be scaled down by 5 (13 - 8 - 0), which would then
// allow the new replica set to be scaled up by 5.
// Available指的是就绪探针结果为true的副本若默认未指定就绪探针则pod running之后自动视就绪为true
// 最小可用副本数(至少可用数)
minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
// newRs不可用数
newRSUnavailablePodCount := *(newRS.Spec.Replicas) - newRS.Status.AvailableReplicas
// 最大可缩容数 = 总数 - 最小可用数 - newRS不可用数(为了保证最小可用数因此此时newRS的不可用副本不能参与这个计算)
maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount
if maxScaledDown <= 0 {
return false, nil
}
// oldRS里不健康的副本无论如何都是需要清除的
// and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737
oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(oldRSs, deployment, maxScaledDown)
if err != nil {
return false, nil
}
klog.V(4).Infof("Cleaned up unhealthy replicas from old RSes by %d", cleanupCount)
// 还要对比最大可缩容数和deployment指定的最大同时不可用副本数这两者之间的最小值才是可缩容数量
allRSs = append(oldRSs, newRS)
scaledDownCount, err := dc.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, deployment)
if err != nil {
return false, nil
}
klog.V(4).Infof("Scaled down old RSes of deployment %s by %d", deployment.Name, scaledDownCount)
// oldRS里不健康的副本无论如何都是需要清除的
totalScaledDown := cleanupCount + scaledDownCount
// 判断缩容数是否大于0
return totalScaledDown > 0, nil
}
```
中间的英文注释里的举例说明非常详细,可以看一下注释。
#### syncRolloutStatus函数
这个函数主要用于更新deployment的status字段和其中的condition字段。
```go
func (dc *DeploymentController) syncRolloutStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
newStatus := calculateStatus(allRSs, newRS, d)
if !util.HasProgressDeadline(d) {
util.RemoveDeploymentCondition(&newStatus, apps.DeploymentProgressing)
}
currentCond := util.GetDeploymentCondition(d.Status, apps.DeploymentProgressing)
/**
判断deployment是否为complete状态条件有多个
1. newRS.replicas = newRS.Status.UpdatedReplicas 说明newRS的副本更新已全部完成
2. newRS.status.condition.reason = miniumReplicasAvailable
**/
isCompleteDeployment := newStatus.Replicas == newStatus.UpdatedReplicas && currentCond != nil && currentCond.Reason == util.NewRSAvailableReason
// 未达到complete状态的deployment才进行下面的检查
if util.HasProgressDeadline(d) && !isCompleteDeployment {
switch {
case util.DeploymentComplete(d, &newStatus):
// Update the deployment conditions with a message for the new replica set that
// was successfully deployed. If the condition already exists, we ignore this update.
msg := fmt.Sprintf("Deployment %q has successfully progressed.", d.Name)
if newRS != nil {
msg = fmt.Sprintf("ReplicaSet %q has successfully progressed.", newRS.Name)
}
condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, util.NewRSAvailableReason, msg)
util.SetDeploymentCondition(&newStatus, *condition)
case util.DeploymentProgressing(d, &newStatus):
// If there is any progress made, continue by not checking if the deployment failed. This
// behavior emulates the rolling updater progressDeadline check.
msg := fmt.Sprintf("Deployment %q is progressing.", d.Name)
if newRS != nil {
msg = fmt.Sprintf("ReplicaSet %q is progressing.", newRS.Name)
}
condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, util.ReplicaSetUpdatedReason, msg)
// Update the current Progressing condition or add a new one if it doesn't exist.
// If a Progressing condition with status=true already exists, we should update
// everything but lastTransitionTime. SetDeploymentCondition already does that but
// it also is not updating conditions when the reason of the new condition is the
// same as the old. The Progressing condition is a special case because we want to
// update with the same reason and change just lastUpdateTime iff we notice any
// progress. That's why we handle it here.
if currentCond != nil {
if currentCond.Status == v1.ConditionTrue {
condition.LastTransitionTime = currentCond.LastTransitionTime
}
util.RemoveDeploymentCondition(&newStatus, apps.DeploymentProgressing)
}
util.SetDeploymentCondition(&newStatus, *condition)
case util.DeploymentTimedOut(d, &newStatus):
// Update the deployment with a timeout condition. If the condition already exists,
// we ignore this update.
msg := fmt.Sprintf("Deployment %q has timed out progressing.", d.Name)
if newRS != nil {
msg = fmt.Sprintf("ReplicaSet %q has timed out progressing.", newRS.Name)
}
condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionFalse, util.TimedOutReason, msg)
util.SetDeploymentCondition(&newStatus, *condition)
}
}
```
DeploymentCondition在这里面反复出现便于理解参照一个正常状态的deployment condition样例:
<img src="http://mycloudn.kokoerp.com/20200130125357.png" style="zoom:50%;" />
#### 总结
滚动更新过程中主要是通过调用`reconcileNewReplicaSet`函数对 newRS 扩容,调用 `reconcileOldReplicaSets`函数 对 oldRSs缩容按照 `maxSurge``maxUnavailable` 的约束计时器间隔1s反复执行、收敛、修正最终达到期望状态完成更新。
## 总结
Deployment的回滚、扩(缩)容、暂停、更新等操作主要是通过修改rs来完成的。其中rs的版本控制、replicas数量控制是其最核心也是难以理解的地方但是只要记住99%的时间里deployment对应的活跃的rs只有一个只有更新时才会出现2个rs极少数情况下(短时间重复更新)才会出现2个以上的rs对于上面源码的理解就会容易许多。
另外从上面我们也可以发现deployment的更新实际基本不涉及对pod的直接操作因此本章后续的章节会分析一下replicaSet controller是怎么和pod进行管理交互的。

@ -1,20 +1,9 @@
---
title: "Kubernetes源码学习-Controller-总览篇"
date: 2019/12/05 19:15:49
tags:
- Kubernetes
- Golang
- 读源码
## Controller源码分段阅读导航
---
##Controller源码分段阅读导航
- [多实例leader选举]()
- [Informer工作流程]()
- [Deployment Controller]()
- [StafulSet Controller]()
- [多实例leader选举]([https://github.com/yinwenqin/kubeSourceCodeNote/blob/master/controller/Kubernetes%E6%BA%90%E7%A0%81%E5%AD%A6%E4%B9%A0-Controller-P1-%E5%A4%9A%E5%AE%9E%E4%BE%8Bleader%E9%80%89%E4%B8%BE.md](https://github.com/yinwenqin/kubeSourceCodeNote/blob/master/controller/Kubernetes源码学习-Controller-P1-多实例leader选举.md))
- [Kubernetes源码学习-Controller-P2-Controller与informer](https://github.com/yinwenqin/kubeSourceCodeNote/blob/master/controller/Kubernetes源码学习-Controller-P2-Controller与informer.md)
- [Kubernetes源码学习-Controller-P3-Controller分类与Deployment Controller](https://github.com/yinwenqin/kubeSourceCodeNote/blob/master/controller/Kubernetes源码学习-Controller-P3-Controller分类与Deployment Controller.md)
- [StafulSet Controller]
- 待补充
## 概述
@ -33,7 +22,7 @@ https://github.com/kubernetes/community/blob/master/contributors/design-proposal
**Controller是做什么用的**
Controller通过watch apiServer循环地观察监控着某些特定的资源对象获取它们当前的状态对它们进行对比、修正、收敛来使这些对象的状态不断靠近、直至达成在它们的声明语义中所期望的目标状态这即是controller的作用。
Controller通过watch apiServer循环地观察监控着某些特定的资源对象获取它们当前的状态对它们进行对比、修正、收敛来使这些对象的状态不断靠近、直至达成在它们的声明语义中所期望的目标状态这即是controller的作用。再通俗点来说就是使资源对象的status当前状态达到spec的期望状态。
**伪代码**

Loading…
Cancel
Save