You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kubeSourceCodeNote/controller/Kubernetes源码学习-Controller-P...

957 lines
32 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# P5-StatefulSet Controller
## 前言
在前面的几篇文章中先对deployment controller进行了初步分析:
[Controller-P3-Deployment Controller](https://github.com/yinwenqin/kubeSourceCodeNote/blob/master/controller/Kubernetes源码学习-Controller-P3-Controller分类与Deployment%20Controller.md)
严格来讲deployment的管理pod的逻辑是基于replicaSet来实现的因此接下来结合replicaSet controller进行了深入:
[Controller-P3-ReplicaSet Controller](https://github.com/yinwenqin/kubeSourceCodeNote/blob/master/controller/Kubernetes源码学习-Controller-P4-ReplicaSet%20Controller.md)
那么在本篇来看看另一个最常用的承载在pod之上的管理单位的控制器实现: **StatefulSet Controller**
## StatefulSet 的基本特性
在看代码之前先回顾一下sts的基本运行特性代入地阅读代码会比较顺畅
**创建**
sts是有序的pod副本有序串行地新建pod名称为{sts_name}-{0..N}从小序号的pod(名称为{sts_name}-0)创建一直到第n个副本的pod(名称为{sts_name}-n)
**更新**
sts的更新策略有2种:
- `RollingUpdateStatefulSetStrategyType`默认的滚动更新策略此策略下更新时pod根据序号反顺序更新从最大序号的pod开始删除重建更新至序号最小的pod。更新过程中始终保持pod数量等于指定副本数即每删除一个pod才会再创建一个。同时可以指定一个**partition**参数指定这个参数后只有序号大于等于partition的pod才会被更新序号小于partition参数的pod不会被更新例如有5个副本partition设置为2那么在更新sts时0和1号pod不会更新2 3 4号pod则会更新重建此时继续将partition缩减为0则0 1号pod也会更新重建。默认partition为0即所有的pod都会更新。这个参数一般不会使用但可用在发布时动态更新递减partition的值来实现滚动灰度发布。
- `OnDeleteStatefulSetStrategyType`, 此策略下controller不会对pod做任何操作由手动删除pod来触发新pod的创建
**删除**
删除sts时可以指定级联模式的参数`--cascade=true`默认为true意思是删除sts会同时删除它所管理的pod。设置为false时删除sts不会影响pod的运行且sts重建后依然能与此前的pod关联起来(这种方式可能会产生孤儿pod)。
**关联关系**
先来看看sts和pod的关联方式:
```bash
# sts
[root@008019 ~]# kubectl get sts deptest11dev
NAME READY AGE
deptest11dev 2/2 99d
# pod
[root@008019 ~]# kubectl get pods | grep deptest11dev
deptest11dev-0 1/1 Running 1 99d
deptest11dev-1 1/1 Running 0 3d17h
# edit pod
# 可以查看到pod的ownerReferences字段与sts关联
ownerReferences:
- apiVersion: apps/v1
blockOwnerDeletion: true
controller: true
kind: StatefulSet
name: deptest11dev
uid: 28ecf735-2ab4-11ea-afa8-1866daf0f324
# 可以查看到pod的labels标签新增了一个controller-revision-hash标签与controllerRevision关联
labels:
app: deptest11dev
controller-revision-hash: deptest11dev-587f8bd845
statefulset.kubernetes.io/pod-name: deptest11dev-1
```
再来看看sts和ControllerRevision关联方式:
```bash
[root@008019 ~]# kubectl get sts deptest11dev
NAME READY AGE
deptest11dev 2/2 99d
[root@008019 ~]# kubectl get ControllerRevisions | grep deptest11dev
deptest11dev-587f8bd845 statefulset.apps/deptest11dev 1 99d
[root@008019 ~]# kubectl get ControllerRevisions deptest11dev-587f8bd845
NAME CONTROLLER REVISION AGE
deptest11dev-587f8bd845 statefulset.apps/deptest11dev 1 99d
# ControllerRevisions资源中的ownerReferences字段,可以看出sts与其通过这个字段关联
ownerReferences:
- apiVersion: apps/v1
blockOwnerDeletion: true
controller: true
kind: StatefulSet
name: deptest11dev
uid: 28ecf735-2ab4-11ea-afa8-1866daf0f324
# sts status字段,可以看出sts通过status下的currentRevision、updateRevision字段与ControllerRevision关联
status:
collisionCount: 0
currentReplicas: 2
currentRevision: deptest11dev-587f8bd845
observedGeneration: 3
readyReplicas: 2
replicas: 2
updateRevision: deptest11dev-587f8bd845
updatedReplicas: 2
# 对sts.spec字段里的内容更新后引起pod重建sts开始滚动更新此时sts的status字段内容如下:
status:
collisionCount: 0
currentReplicas: 1
currentRevision: deptest11dev-587f8bd845
observedGeneration: 4
readyReplicas: 2
replicas: 2
# 这时可以发现updateRevision字段更新为了新的revision即updateRevision是最近一次更新的Revision
updateRevision: deptest11dev-7487498978
# 修改sts进行缩容/扩容 时的status字段
status:
collisionCount: 0
currentReplicas: 3
currentRevision: deptest11dev-7487498978
observedGeneration: 5
readyReplicas: 3
replicas: 3
# revision不会更新
updateRevision: deptest11dev-7487498978
updatedReplicas: 3
```
**记住这几者之间双向地关联方式,下面会提到。**
## StatefulSet Controller
### 初始化
`cmd/kube-controller-manager/app/apps.go:59`
```go
func startStatefulSetController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"}] {
return nil, false, nil
}
go statefulset.NewStatefulSetController(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Apps().V1().StatefulSets(),
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
ctx.InformerFactory.Apps().V1().ControllerRevisions(),
ctx.ClientBuilder.ClientOrDie("statefulset-controller"),
).Run(1, ctx.Stop)
return nil, true, nil
}
```
先来看看`NewStatefulSetController`做了什么:
==> `pkg/controller/statefulset/stateful_set.go:81`
```go
func NewStatefulSetController(
// 1.StatefulSetController关注四种类型的资源: Pod/Sts/PVC/ControllerRevision
podInformer coreinformers.PodInformer,
setInformer appsinformers.StatefulSetInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
revInformer appsinformers.ControllerRevisionInformer,
kubeClient clientset.Interface,
) *StatefulSetController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"})
// 2.NewDefaultStatefulSetControl方法需要关注
ssc := &StatefulSetController{
kubeClient: kubeClient,
control: NewDefaultStatefulSetControl(
NewRealStatefulPodControl(
kubeClient,
setInformer.Lister(),
podInformer.Lister(),
pvcInformer.Lister(),
recorder),
NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()),
history.NewHistory(kubeClient, revInformer.Lister()),
recorder,
),
pvcListerSynced: pvcInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
podControl: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder},
revListerSynced: revInformer.Informer().HasSynced,
}
// 当sts管理的pod curd时对应的处理方法(按入workqueue/更新pod/删除pod)
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
// lookup the statefulset and enqueue
AddFunc: ssc.addPod,
// lookup current and old statefulset if labels changed
UpdateFunc: ssc.updatePod,
// lookup statefulset accounting for deletion tombstones
DeleteFunc: ssc.deletePod,
})
ssc.podLister = podInformer.Lister()
ssc.podListerSynced = podInformer.Informer().HasSynced
// 当sts curd时对应的方法(sts压入workqueue)
setInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: ssc.enqueueStatefulSet,
UpdateFunc: func(old, cur interface{}) {
oldPS := old.(*apps.StatefulSet)
curPS := cur.(*apps.StatefulSet)
if oldPS.Status.Replicas != curPS.Status.Replicas {
klog.V(4).Infof("Observed updated replica count for StatefulSet: %v, %d->%d", curPS.Name, oldPS.Status.Replicas, curPS.Status.Replicas)
}
ssc.enqueueStatefulSet(cur)
},
DeleteFunc: ssc.enqueueStatefulSet,
},
statefulSetResyncPeriod,
)
ssc.setLister = setInformer.Lister()
ssc.setListerSynced = setInformer.Informer().HasSynced
// TODO: Watch volumes
// 返回ssc(StatefulSetController)
return ssc
}
```
先看注释1可以发现StatefulSetController关注四种类型的资源: Pod/Sts/PVC/ControllerRevision其中的**ControllerRevision**不太熟悉,先找出来看下它的结构,逐级跳转:
`cmd/kube-controller-manager/app/apps.go:63`
==> `vendor/k8s.io/client-go/informers/apps/v1/interface.go:28`
==>`vendor/k8s.io/client-go/informers/apps/v1/controllerrevision.go:38`
==> `vendor/k8s.io/client-go/listers/apps/v1/controllerrevision.go:29`
==> `vendor/k8s.io/api/apps/v1/types.go:800`
```go
type ControllerRevision struct {
metav1.TypeMeta `json:",inline"`
// Standard object's metadata.
// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#metadata
// +optional
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
// Data is the serialized representation of the state.
Data runtime.RawExtension `json:"data,omitempty" protobuf:"bytes,2,opt,name=data"`
// Revision indicates the revision of the state represented by Data.
Revision int64 `json:"revision" protobuf:"varint,3,opt,name=revision"`
}
```
阅读这个结构体上方的注释可以得知ControllerRevision提供给DaemonSet和StatefulSet用作更新和回滚ControllerRevision存放的是数据的快照ControllerRevision生成之后内容是不可修改的由调用端来负责序列化写入和反序列化读取。其中Revision(int64)字段相当于ControllerRevision的版本id号Data字段则存放序列化后的数据。
**画外音不难猜测StatefulSet的更新以及回滚(也是一种特殊的更新)操作是基于对新旧ControllerRevision的对比来进行的**
在来看下注释2NewDefaultStatefulSetControl方法:
`pkg/controller/statefulset/stateful_set.go:95`
==> `pkg/controller/statefulset/stateful_set_control.go:54`
```go
func NewDefaultStatefulSetControl(
podControl StatefulPodControlInterface,
statusUpdater StatefulSetStatusUpdaterInterface,
controllerHistory history.Interface,
recorder record.EventRecorder) StatefulSetControlInterface {
return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory, recorder}
}
```
**NewDefaultStatefulSetControl**返回的**defaultStatefulSetControl**结构体对象是sts管理控制逻辑的语义实现**defaultStatefulSetControl**结构体里面包含了sts控制过程中的各种接口
1. 管理sts对应的pod/pvc(podControl)的方法接口,有(**CreateStatefulPod/UpdateStatefulPod/DeleteStatefulPod**)这几个方法,通过**NewRealStatefulPodControl**函数返回的**realStatefulPodControl**结构体对象来实现
2. 管理sts status状态的更新(statusUpdater)的方法接口,有**UpdateStatefulSetStatus**这一个方法,通过**NewRealStatefulSetStatusUpdater**返回的**realStatefulSetStatusUpdater**结构体对象来实现。
3. 管理ControllerRevision版本(controllerHistory) 的方法接口,有(**ListControllerRevisions/CreateControllerRevision/DeleteControllerRevision/UpdateControllerRevision/AdoptControllerRevision/ReleaseControllerRevision**)这几个方法,通过***history.NewHistory***返回的**realHistory**结构体对象来实现。
现在接着往下去看看ssc(StatefulSetController) 运行的Run函数。
### 工作过程
`*StatefulSetController.Run()`函数:
```go
func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer ssc.queue.ShutDown()
klog.Infof("Starting stateful set controller")
defer klog.Infof("Shutting down statefulset controller")
if !controller.WaitForCacheSync("stateful set", stopCh, ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(ssc.worker, time.Second, stopCh)
}
<-stopCh
}
```
wait.Until定时器前面已经讲过不再复述重点在于ssc.worker函数代码里有多次跳跃
`pkg/controller/statefulset/stateful_set.go:159`
==>`pkg/controller/statefulset/stateful_set.go:410`
==> `pkg/controller/statefulset/stateful_set.go:399`
==>`pkg/controller/statefulset/stateful_set.go:415`
```go
// sync syncs the given statefulset.
func (ssc *StatefulSetController) sync(key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing statefulset %q (%v)", key, time.Since(startTime))
}()
// key的样例: default/teststs做个切割拿到namespace和sts name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// 获取到sts对象
set, err := ssc.setLister.StatefulSets(namespace).Get(name)
if errors.IsNotFound(err) {
klog.Infof("StatefulSet has been deleted %v", key)
return nil
}
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err))
return err
}
// labelSelector
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err))
// This is a non-transient error, so don't retry.
return nil
}
// 孤儿Revisions修正托管
if err := ssc.adoptOrphanRevisions(set); err != nil {
return err
}
// 获取到sts管理的pod
pods, err := ssc.getPodsForStatefulSet(set, selector)
if err != nil {
return err
}
// syncStatefulSet 执行sts sync
return ssc.syncStatefulSet(set, pods)
}
```
来分步看下
#### 孤儿Revisions修正托管
上面指出sts和revision两者之间显示地双向指定字段来关联对方明白这一点那么这个函数就好理解了。
出现孤儿ControllerRevisions的原因很有可能是sts在此期间进行了反复的更新更新时间差之中产生了脏数据.
`pkg/controller/statefulset/stateful_set.go:316`
```go
// adoptOrphanRevisions adopts any orphaned ControllerRevisions matched by set's Selector.
func (ssc *StatefulSetController) adoptOrphanRevisions(set *apps.StatefulSet) error {
// 通过sts指定的revision相关字段找到对应的revisions
revisions, err := ssc.control.ListRevisions(set)
if err != nil {
return err
}
hasOrphans := false
for i := range revisions {
// 通过revision指定的controller来源来找sts。如果指定绑定的sts为空那么说明此ControllerRevisions是孤儿状态(无托管),需要回收
if metav1.GetControllerOf(revisions[i]) == nil {
hasOrphans = true
break
}
}
// 出现孤儿ControllerRevisions的原因很有可能是sts在此期间进行了反复的更新因此重新获取一次最新的sts
if hasOrphans {
fresh, err := ssc.kubeClient.AppsV1().StatefulSets(set.Namespace).Get(set.Name, metav1.GetOptions{})
if err != nil {
return err
}
// sts(old) 若与fresh sts uid不同则说明期间sts可能经历了删除重建本次逻辑的流程打破抛错返回
if fresh.UID != set.UID {
return fmt.Errorf("original StatefulSet %v/%v is gone: got uid %v, wanted %v", set.Namespace, set.Name, fresh.UID, set.UID)
}
// 为这些controller sts指定为空的revision若label匹配则加上ownerReferences sts指定若label不匹配则gc
return ssc.control.AdoptOrphanRevisions(set, revisions)
}
return nil
}
```
#### 获取到sts管理的pod
`pkg/controller/statefulset/stateful_set.go:285`
```go
func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet, selector labels.Selector) ([]*v1.Pod, error) {
// List all pods to include the pods that don't match the selector anymore but
// has a ControllerRef pointing to this StatefulSet.
pods, err := ssc.podLister.Pods(set.Namespace).List(labels.Everything())
if err != nil {
return nil, err
}
// filter函数的作用是判断指定的pod和sts是否有所属关系展开代码可以看到判断的方式很简单对pod的名称做re字符串切割最后一个"-"之前的字符串是parent之后的数字是序号索引判断parent与sts name是否一致一致则为truepod 属于 sts不一致则为false
filter := func(pod *v1.Pod) bool {
// Only claim if it matches our StatefulSet name. Otherwise release/ignore.
return isMemberOf(set, pod)
}
// 如同revision一样若存在孤儿pod也需要对孤儿pod进行收养与sts label匹配则加上关联label不匹配则解除关联。
canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
fresh, err := ssc.kubeClient.AppsV1().StatefulSets(set.Namespace).Get(set.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
if fresh.UID != set.UID {
return nil, fmt.Errorf("original StatefulSet %v/%v is gone: got uid %v, wanted %v", set.Namespace, set.Name, fresh.UID, set.UID)
}
return fresh, nil
})
cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, controllerKind, canAdoptFunc)
// 执行筛选
return cm.ClaimPods(pods, filter)
}
```
**ClaimPods**
`pkg/controller/controller_ref_manager.go:171`
```go
func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) {
var claimed []*v1.Pod
var errlist []error
match := func(obj metav1.Object) bool {
pod := obj.(*v1.Pod)
// 先根据标签匹配pod仅当标签匹配通过后再匹配下一步sts调用则是按照上面说的取 pod name 字符串切割后与sts name对比
if !m.Selector.Matches(labels.Set(pod.Labels)) {
return false
}
for _, filter := range filters {
if !filter(pod) {
return false
}
}
return true
}
adopt := func(obj metav1.Object) error {
// 收养pod(添加关联关系)即为pod.metadata patch ownerReferences字段。
return m.AdoptPod(obj.(*v1.Pod))
}
release := func(obj metav1.Object) error {
// 释放pod关联关系即为pod.metadata delete ownerReferences字段。
return m.ReleasePod(obj.(*v1.Pod))
}
for _, pod := range pods {
// 判断单个pod是否匹配收养/释放孤儿pod的函数ClaimObject
ok, err := m.ClaimObject(pod, match, adopt, release)
if err != nil {
errlist = append(errlist, err)
continue
}
if ok {
claimed = append(claimed, pod)
}
}
return claimed, utilerrors.NewAggregate(errlist)
}
```
####
####ClaimObject
`pkg/controller/controller_ref_manager.go:66`
```go
func (m *BaseControllerRefManager) ClaimObject(obj metav1.Object, match func(metav1.Object) bool, adopt, release func(metav1.Object) error) (bool, error) {
// 1 获取到pod.metadata中的ownerReferences字段
controllerRef := metav1.GetControllerOf(obj)
// 1-1 如果pod存在ownerReferences,则直接进入判断是否match
if controllerRef != nil {
if controllerRef.UID != m.Controller.GetUID() {
// Owned by someone else. Ignore.
return false, nil
}
// 1-2 匹配则返回true
if match(obj) {
return true, nil
}
if m.Controller.GetDeletionTimestamp() != nil {
return false, nil
}
// 1-3 不匹配则pod释放关联字段,返回false
if err := release(obj); err != nil {
// If the pod no longer exists, ignore the error.
if errors.IsNotFound(err) {
return false, nil
}
return false, err
}
// Successfully released.
return false, nil
}
// 2 孤儿pod则要根据情况判断是否收养/释放
// 2-1 已删除的sts或match规则不匹配返回false
if m.Controller.GetDeletionTimestamp() != nil || !match(obj) {
// Ignore if we're being deleted or selector doesn't match.
return false, nil
}
if obj.GetDeletionTimestamp() != nil {
// Ignore if the object is being deleted
return false, nil
}
// Selector matches. Try to adopt.
if err := adopt(obj); err != nil {
// If the pod no longer exists, ignore the error.
if errors.IsNotFound(err) {
return false, nil
}
// Either someone else claimed it first, or there was a transient error.
// The controller should requeue and try again if it's still orphaned.
return false, err
}
// 收养成功返回true
return true, nil
}
```
到这里所有应当被sts管理的pod(包括孤儿pod)就过滤完毕了开始执行真正的sts sync。
#### syncStatefulSet
在找到了所有管理的pod后就要开始sts 的sync进行更新sts及更新pod的操作了回到这里:
`pkg/controller/statefulset/stateful_set.go:451`
==> `pkg/controller/statefulset/stateful_set.go:458`
==> `pkg/controller/statefulset/stateful_set_control.go:75`
```go
func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {
// 取出sts所有的revision并排序
revisions, err := ssc.ListRevisions(set)
if err != nil {
return err
}
history.SortControllerRevisions(revisions)
// 获得当前revision以及更新后最新的revision
currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions)
if err != nil {
return err
}
// 核心方法对pod进行操作
status, err := ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods)
if err != nil {
return err
}
// 操作完成后记录修改sts.status
err = ssc.updateStatefulSetStatus(set, status)
if err != nil {
return err
}
klog.V(4).Infof("StatefulSet %s/%s pod status replicas=%d ready=%d current=%d updated=%d",
set.Namespace,
set.Name,
status.Replicas,
status.ReadyReplicas,
status.CurrentReplicas,
status.UpdatedReplicas)
klog.V(4).Infof("StatefulSet %s/%s revisions current=%s update=%s",
set.Namespace,
set.Name,
status.CurrentRevision,
status.UpdateRevision)
// 对set的revision history进行维护
return ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
}
```
这里面最核心的函数是`updateStatefulSetStatus`,接着往下
#### updateStatefulSet
这一个函数内容很多200多行代码需要说明的地方会在下面代码中注释。
```go
func (ssc *defaultStatefulSetControl) updateStatefulSet(
set *apps.StatefulSet,
currentRevision *apps.ControllerRevision,
updateRevision *apps.ControllerRevision,
collisionCount int32,
pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
// 获取到当前sts currentSet然后获取到需更新到的sts updateSet。要实现的更新效果是
// 1.滚动更新时在未指定partition时使当前sts的管理的pod缩减为0updateSet的ready pod数 = spec.replicas
// 2.滚动更新时在未指定partition后大于等于partition的pod全部归于updateSet小于partition值的pod还是归属于原currentSet
// 3.OnDelete更新时do nothing
currentSet, err := ApplyRevision(set, currentRevision)
if err != nil {
return nil, err
}
updateSet, err := ApplyRevision(set, updateRevision)
if err != nil {
return nil, err
}
// set the generation, and revisions in the returned status
// 重新计算sts的status
status := apps.StatefulSetStatus{}
status.ObservedGeneration = set.Generation
status.CurrentRevision = currentRevision.Name
status.UpdateRevision = updateRevision.Name
status.CollisionCount = new(int32)
*status.CollisionCount = collisionCount
replicaCount := int(*set.Spec.Replicas)
// replicas是合法副本将满足 0 <= pod序号 < sts.spec.replicas的pod,放到这个slice里来。这里面的pod都是要保证ready的
replicas := make([]*v1.Pod, replicaCount)
// condemned是非法副本将满足 pod序号 >= sts.spec.replicas的pod,放到这个slice里来,这些pod是要删除掉的(可能是被缩容掉的)
condemned := make([]*v1.Pod, 0, len(pods))
unhealthy := 0
firstUnhealthyOrdinal := math.MaxInt32
var firstUnhealthyPod *v1.Pod
// First we partition pods into two lists valid replicas and condemned Pods
for i := range pods {
status.Replicas++
// status.ReadyReplicas计数
if isRunningAndReady(pods[i]) {
status.ReadyReplicas++
}
if isCreated(pods[i]) && !isTerminating(pods[i]) {
// 通过pod的controller-revision-hash label判断pod属于currentSet还是UpdatedSet分别计数
if getPodRevision(pods[i]) == currentRevision.Name {
status.CurrentReplicas++
}
if getPodRevision(pods[i]) == updateRevision.Name {
status.UpdatedReplicas++
}
}
if ord := getOrdinal(pods[i]); 0 <= ord && ord < replicaCount {
// 将满足 0 <= pod序号 < sts.spec.replicas的pod,放到replicas这个slice里来
replicas[ord] = pods[i]
} else if ord >= replicaCount {
// 将满足 pod序号 >= sts.spec.replicas的pod,放到condemned这个slice里来,这些pod是要删除掉的。
condemned = append(condemned, pods[i])
}
}
// replicas slice之中如果有索引位置为空则需要填充相应的pod。
// 根据currentSet.replicas/UpdatedSet.replicas/partition这三个值来判断pod是基于current revision还是基于update revision创建
for ord := 0; ord < replicaCount; ord++ {
if replicas[ord] == nil {
replicas[ord] = newVersionedStatefulSetPod(
currentSet,
updateSet,
currentRevision.Name,
updateRevision.Name, ord)
}
}
// 对需要删除的非法pod按照序号从大到小的顺序排序
sort.Sort(ascendingOrdinal(condemned))
// 如果有不健康的pod也需要删除但还是遵循串行的原则优先删除非法pod中序号最大的再到合法副本中的序号最小的。
for i := range replicas {
if !isHealthy(replicas[i]) {
unhealthy++
if ord := getOrdinal(replicas[i]); ord < firstUnhealthyOrdinal {
firstUnhealthyOrdinal = ord
firstUnhealthyPod = replicas[i]
}
}
}
for i := range condemned {
if !isHealthy(condemned[i]) {
unhealthy++
if ord := getOrdinal(condemned[i]); ord < firstUnhealthyOrdinal {
firstUnhealthyOrdinal = ord
firstUnhealthyPod = condemned[i]
}
}
}
if unhealthy > 0 {
klog.V(4).Infof("StatefulSet %s/%s has %d unhealthy Pods starting with %s",
set.Namespace,
set.Name,
unhealthy,
firstUnhealthyPod.Name)
}
// If the StatefulSet is being deleted, don't do anything other than updating
// status.
if set.DeletionTimestamp != nil {
return &status, nil
}
monotonic := !allowsBurst(set)
// 根据pod的序号对它们依次进行检查并操作。
for i := range replicas {
// 错误状态的pod删除重建
if isFailed(replicas[i]) {
ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
"StatefulSet %s/%s is recreating failed Pod %s",
set.Namespace,
set.Name,
replicas[i].Name)
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
return &status, err
}
if getPodRevision(replicas[i]) == currentRevision.Name {
status.CurrentReplicas--
}
if getPodRevision(replicas[i]) == updateRevision.Name {
status.UpdatedReplicas--
}
status.Replicas--
replicas[i] = newVersionedStatefulSetPod(
currentSet,
updateSet,
currentRevision.Name,
updateRevision.Name,
i)
}
// pod没有被创建(可能是上面刚填充的)就创建pod
if !isCreated(replicas[i]) {
if err := ssc.podControl.CreateStatefulPod(set, replicas[i]); err != nil {
return &status, err
}
status.Replicas++
if getPodRevision(replicas[i]) == currentRevision.Name {
status.CurrentReplicas++
}
if getPodRevision(replicas[i]) == updateRevision.Name {
status.UpdatedReplicas++
}
// 如果不允许burst直接返回
if monotonic {
return &status, nil
}
// pod created, no more work possible for this round
continue
}
// 如果不允许burst对于终结中的pod不采取任何逻辑等待它终结完毕后下一轮再操作。
if isTerminating(replicas[i]) && monotonic {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to Terminate",
set.Namespace,
set.Name,
replicas[i].Name)
return &status, nil
}
// 如果是正在创建中的pod(还未达到ready状态),同样不采取任何操作,因为需要保证创建操作依次有序
if !isRunningAndReady(replicas[i]) && monotonic {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to be Running and Ready",
set.Namespace,
set.Name,
replicas[i].Name)
return &status, nil
}
// 如果此pod与sts已经匹配(ready),且存储满足sts、pod的要求那么这个pod就是合格的podcontinue
if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) {
continue
}
// 确保pod与sts的标签关联以及为pod准备好它需要的pvc
replica := replicas[i].DeepCopy()
if err := ssc.podControl.UpdateStatefulPod(updateSet, replica); err != nil {
return &status, err
}
}
// 上面的合法副本得以保证之后下面要开始按pod序号从大到小的顺序删除非法pod了
for target := len(condemned) - 1; target >= 0; target-- {
// 终结中的pod不再处理直接返回等待下一轮检查
if isTerminating(condemned[target]) {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to Terminate prior to scale down",
set.Namespace,
set.Name,
condemned[target].Name)
// block if we are in monotonic mode
if monotonic {
return &status, nil
}
continue
}
// 如果此非法pod不是ready状态且不允许burst且它不是优先级第一的非健康pod不做任何操作。换而言之即使是删除非健康的pod也要按照序号从大到小的顺序串行执行。
if !isRunningAndReady(condemned[target]) && monotonic && condemned[target] != firstUnhealthyPod {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to be Running and Ready prior to scale down",
set.Namespace,
set.Name,
firstUnhealthyPod.Name)
return &status, nil
}
// 开始删除此pod更新status
klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for scale down",
set.Namespace,
set.Name,
condemned[target].Name)
if err := ssc.podControl.DeleteStatefulPod(set, condemned[target]); err != nil {
return &status, err
}
if getPodRevision(condemned[target]) == currentRevision.Name {
status.CurrentReplicas--
}
if getPodRevision(condemned[target]) == updateRevision.Name {
status.UpdatedReplicas--
}
if monotonic {
return &status, nil
}
}
// OnDelete更新模式下不自动删除pod需要手动删除pod来触发更新
if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
return &status, nil
}
// 经过上面那么多条件的过滤和准备现在要开始对replicas里的合法pod进行检查了
updateMin := 0
if set.Spec.UpdateStrategy.RollingUpdate != nil {
updateMin = int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
}
// 按pod的序号倒序检查
for target := len(replicas) - 1; target >= updateMin; target-- {
// 如果pod的revision不符合updateRevision那么删除重建此pod
if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) {
klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for update",
set.Namespace,
set.Name,
replicas[target].Name)
err := ssc.podControl.DeleteStatefulPod(set, replicas[target])
status.CurrentReplicas--
return &status, err
}
// 合法pod更新过程中还未到达ready状态的pod等待它
if !isHealthy(replicas[target]) {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to update",
set.Namespace,
set.Name,
replicas[target].Name)
return &status, nil
}
}
return &status, nil
}
```
**updateStatefulSet函数总结**
1. 每个循环的周期中最多操作一个pod
2. 根据sts.spec.replicas对比现有pod的序号对pod进行划分一部分划为合法(保留/重建),一部分划为非法(删除)
3. 对pods进行划分一部分划入current(old) set阵营另一部分划入update(new) set阵营
4. 更新过程中无论是删减、还是新建都保持pod数量固定有序地递增、递减
5. 最终保证所有的pod都归属于update revision
## 总结
statefulset 在设计上与 deployment 有许多不同的地方,例如:
- deployment通过rs管理podsts通过controllerRevision管理pod
- deployment curd是无序的sts强保证有序curd
- sts需要检查存储的匹配
在了解sts管理操作pod方式的基础上来看代码会有许多的帮助。