Copyright 2022.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
package controllers
import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
networkv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
myAppsv1 "mashibing.com/pkg/mashibing-deployment/api/v1"
var WaitRequeue = 10 * time.Second
// MsbDeploymentReconciler reconciles a MsbDeployment object
type MsbDeploymentReconciler struct {
Scheme *runtime.Scheme
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the MsbDeployment object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.13.0/pkg/reconcile
func (r *MsbDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// 状态更新策略
// 创建的时候
// 更新为创建
// 更新的时候
// 根据获取的状态来判断时候更新status
// 删除的时候
// 只有在操作 ingress 的时候,并且 mode 为 nodeport 的时候
logger := log.FromContext(ctx, "MsbDployment", req.NamespacedName)
logger.Info("Reconcile is started.")
// 1. 获取资源对象
md := new(myAppsv1.MsbDeployment)
if err := r.Client.Get(ctx, req.NamespacedName, md); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
// 防止污染缓存
mdCopy := md.DeepCopy()
// ======= 处理 deployment ======
// 2. 获取deployment资源对象
deploy := new(appsv1.Deployment)
if err := r.Client.Get(ctx, req.NamespacedName, deploy); err != nil {
if errors.IsNotFound(err) {
// 2.1 不存在对象
// 2.1.1 创建 deployment
if errCreate := r.createDeployment(ctx, mdCopy); err != nil {
return ctrl.Result{}, errCreate
if _, errStatus := r.updateStatus(ctx,
fmt.Sprintf(myAppsv1.ConditionMessageDeploymentNotFmt, req.Name),
myAppsv1.ConditionReasonDeploymentNotReady); errStatus != nil {
return ctrl.Result{}, errStatus
} else {
if _, errStatus := r.updateStatus(ctx,
fmt.Sprintf("Deployment %s, err: %s", req.Name, err.Error()),
myAppsv1.ConditionReasonDeploymentNotReady); errStatus != nil {
return ctrl.Result{}, errStatus
return ctrl.Result{}, err
} else {
// 2.2 存在对象
// 2.2.1 更新 deployment
if err := r.updateDeployment(ctx, mdCopy, deploy); err != nil {
return ctrl.Result{}, err
if deploy.Status.AvailableReplicas == mdCopy.Spec.Replicas {
if _, errStatus := r.updateStatus(ctx,
fmt.Sprintf(myAppsv1.ConditionMessageDeploymentOKFmt, req.Name),
myAppsv1.ConditionReasonDeploymentReady); errStatus != nil {
return ctrl.Result{}, errStatus
} else {
if _, errStatus := r.updateStatus(ctx,
fmt.Sprintf(myAppsv1.ConditionMessageDeploymentNotFmt, req.Name),
myAppsv1.ConditionReasonDeploymentNotReady); errStatus != nil {
return ctrl.Result{}, errStatus
// ======= 处理 service =========
// 3. 获取 service 资源对象
svc := new(corev1.Service)
if err := r.Client.Get(ctx, req.NamespacedName, svc); err != nil {
if errors.IsNotFound(err) {
// 3.1 不存在
// 3.1.1 mode 为 ingress
if strings.ToLower(mdCopy.Spec.Expose.Mode) == myAppsv1.ModeIngress {
// 创建普通service
if err := r.createService(ctx, mdCopy); err != nil {
return ctrl.Result{}, err
} else if strings.ToLower(mdCopy.Spec.Expose.Mode) == myAppsv1.ModeNodePort {
// 3.1.2 mode 为 nodeport
// 创建 nodeport 模式的 service
if err := r.createNPService(ctx, mdCopy); err != nil {
return ctrl.Result{}, err
} else {
return ctrl.Result{}, myAppsv1.ErrorNotSupportMode
if _, errStatus := r.updateStatus(ctx,
fmt.Sprintf(myAppsv1.ConditionMessageServiceNotFmt, req.Name),
myAppsv1.ConditionReasonServiceNotReady); errStatus != nil {
return ctrl.Result{}, errStatus
} else {
if _, errStatus := r.updateStatus(ctx,
fmt.Sprintf("Service %s, err: %s", req.Name, err.Error()),
myAppsv1.ConditionReasonServiceNotReady); errStatus != nil {
return ctrl.Result{}, errStatus
return ctrl.Result{}, err
} else {
// 3.2 存在
if strings.ToLower(mdCopy.Spec.Expose.Mode) == myAppsv1.ModeIngress {
// 3.2.1 mode 为 ingress
// 更新普通的 service
if err := r.updateService(ctx, mdCopy, svc); err != nil {
return ctrl.Result{}, err
} else if strings.ToLower(mdCopy.Spec.Expose.Mode) == myAppsv1.ModeNodePort {
// 3.2.2 mode 为 nodeport
// 更新nodeport模式的service
if err := r.updateNPService(ctx, mdCopy, svc); err != nil {
return ctrl.Result{}, err
} else {
return ctrl.Result{}, myAppsv1.ErrorNotSupportMode
if _, errStatus := r.updateStatus(ctx,
fmt.Sprintf(myAppsv1.ConditionMessageServiceOKFmt, req.Name),
myAppsv1.ConditionReasonServiceReady); errStatus != nil {
return ctrl.Result{}, errStatus
// ======= 处理 ingress =========
// 4 获取 ingress 资源
ig := new(networkv1.Ingress)
if err := r.Client.Get(ctx, req.NamespacedName, ig); err != nil {
if errors.IsNotFound(err) {
// 4.1 不存在
if strings.ToLower(mdCopy.Spec.Expose.Mode) == myAppsv1.ModeIngress {
// 4.1.1 mode 为 ingress
// 创建 ingress
if err := r.createIngress(ctx, mdCopy); err != nil {
return ctrl.Result{}, err
if _, errStatus := r.updateStatus(ctx,
fmt.Sprintf(myAppsv1.ConditionMessageIngressNotFmt, req.Name),
myAppsv1.ConditionReasonIngressNotReady); errStatus != nil {
return ctrl.Result{}, errStatus
} else if strings.ToLower(mdCopy.Spec.Expose.Mode) == myAppsv1.ModeNodePort {
// 4.1.2 mode 为 nodeport
// 退出
return ctrl.Result{}, nil
} else {
if _, errStatus := r.updateStatus(ctx,
fmt.Sprintf("Ingress %s, err: %s", req.Name, err.Error()),
myAppsv1.ConditionReasonIngressNotReady); errStatus != nil {
return ctrl.Result{}, errStatus
return ctrl.Result{}, err
} else {
// 4.2 存在
if strings.ToLower(mdCopy.Spec.Expose.Mode) == myAppsv1.ModeIngress {
// 4.2.1 mode 为 ingress
//更新 ingress
if err := r.updateIngress(ctx, mdCopy, ig); err != nil {
return ctrl.Result{}, err
if _, errStatus := r.updateStatus(ctx,
fmt.Sprintf(myAppsv1.ConditionMessageIngressOKFmt, req.Name),
myAppsv1.ConditionReasonIngressReady); errStatus != nil {
return ctrl.Result{}, errStatus
} else if strings.ToLower(mdCopy.Spec.Expose.Mode) == myAppsv1.ModeNodePort {
// 4.2.2 mode 为 nodeport
// 删除 ingress
if err := r.deleteIngress(ctx, mdCopy); err != nil {
return ctrl.Result{}, err
r.deleteStatus(mdCopy, myAppsv1.ConditionTypeIngress)
// 最后检查状态时候最终完成
if sus, errStatus := r.updateStatus(ctx,
""); errStatus != nil {
return ctrl.Result{}, errStatus
} else if !sus {
logger.Info("Reconcile is ended.")
return ctrl.Result{RequeueAfter: WaitRequeue}, nil
logger.Info("Reconcile is ended.")
return ctrl.Result{}, nil
// SetupWithManager sets up the controller with the Manager.
func (r *MsbDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Owns(&appsv1.Deployment{}). // 监控 deployment 类型,变更就触发 reconciler
Owns(&corev1.Service{}). // 监控 service 类型,变更就触发 reconciler
Owns(&networkv1.Ingress{}). // 监控 ingress 类型,变更就触发 reconciler
func (r *MsbDeploymentReconciler) createDeployment(ctx context.Context, md *myAppsv1.MsbDeployment) error {
deploy, err := NewDeployment(md)
if err != nil {
return err
// 设置 deployment 所属于 md
if err := controllerutil.SetControllerReference(md, deploy, r.Scheme); err != nil {
return err
return r.Client.Create(ctx, deploy)
func (r *MsbDeploymentReconciler) updateDeployment(ctx context.Context, md *myAppsv1.MsbDeployment, dp *appsv1.Deployment) error {
deploy, err := NewDeployment(md)
if err != nil {
return err
// 设置 deployment 所属于 md
if err := controllerutil.SetControllerReference(md, deploy, r.Scheme); err != nil {
return err
// 预更新deployment,得到更新后的数据
if err := r.Update(ctx, deploy, client.DryRunAll); err != nil {
return err
// 和之前的数据进行比较,如果相同,说明更新不需要。
if reflect.DeepEqual(dp.Spec, deploy.Spec) {
return nil
return r.Client.Update(ctx, deploy)
func (r *MsbDeploymentReconciler) createService(ctx context.Context, md *myAppsv1.MsbDeployment) error {
svc, err := NewService(md)
if err != nil {
return err
// 设置 service 所属于 md
if err := controllerutil.SetControllerReference(md, svc, r.Scheme); err != nil {
return err
return r.Client.Create(ctx, svc)
func (r *MsbDeploymentReconciler) createNPService(ctx context.Context, md *myAppsv1.MsbDeployment) error {
svc, err := NewServiceNP(md)
if err != nil {
return err
// 设置 service 所属于 md
if err := controllerutil.SetControllerReference(md, svc, r.Scheme); err != nil {
return err
return r.Client.Create(ctx, svc)
func (r *MsbDeploymentReconciler) updateService(ctx context.Context, md *myAppsv1.MsbDeployment, service *corev1.Service) error {
svc, err := NewService(md)
if err != nil {
return err
// 设置 service 所属于 md
if err := controllerutil.SetControllerReference(md, svc, r.Scheme); err != nil {
return err
// 预更新service,得到更新后的数据
if err := r.Update(ctx, svc, client.DryRunAll); err != nil {
return err
// 和之前的数据进行比较,如果相同,说明更新不需要。
if reflect.DeepEqual(service.Spec, svc.Spec) {
return nil
return r.Client.Update(ctx, svc)
func (r *MsbDeploymentReconciler) updateNPService(ctx context.Context, md *myAppsv1.MsbDeployment, service *corev1.Service) error {
svc, err := NewServiceNP(md)
if err != nil {
return err
// 设置 service 所属于 md
if err := controllerutil.SetControllerReference(md, svc, r.Scheme); err != nil {
return err
// 预更新service,得到更新后的数据
if err := r.Update(ctx, svc, client.DryRunAll); err != nil {
return err
// 和之前的数据进行比较,如果相同,说明更新不需要。
if reflect.DeepEqual(service.Spec, svc.Spec) {
return nil
return r.Client.Update(ctx, svc)
func (r *MsbDeploymentReconciler) createIngress(ctx context.Context, md *myAppsv1.MsbDeployment) error {
ig, err := NewIngress(md)
if err != nil {
return err
// 设置 ingress 所属于 md
if err := controllerutil.SetControllerReference(md, ig, r.Scheme); err != nil {
return err
return r.Client.Create(ctx, ig)
func (r *MsbDeploymentReconciler) updateIngress(ctx context.Context, md *myAppsv1.MsbDeployment, ingress *networkv1.Ingress) error {
ig, err := NewIngress(md)
if err != nil {
return err
// 设置 ingress 所属于 md
if err := controllerutil.SetControllerReference(md, ig, r.Scheme); err != nil {
return err
// 预更新ingress,得到更新后的数据
if err := r.Update(ctx, ingress, client.DryRunAll); err != nil {
return err
// 和之前的数据进行比较,如果相同,说明更新不需要。
if reflect.DeepEqual(ingress.Spec, ig.Spec) {
return nil
return r.Client.Update(ctx, ig)
func (r *MsbDeploymentReconciler) deleteIngress(ctx context.Context, md *myAppsv1.MsbDeployment) error {
ig, err := NewIngress(md)
if err != nil {
return err
return r.Client.Delete(ctx, ig)
// 处理status
// return:
// bool: 资源是否完成,时候需要等待。如果是 true,表示资源已经完成没不需要再次reconcile
// 如果是 false,表示资源还未完成,需要重新入队
// error:执行 update 的状态
func (r *MsbDeploymentReconciler) updateStatus(ctx context.Context, md *myAppsv1.MsbDeployment, conditionType, message, status, reason string) (bool, error) {
if conditionType != "" {
// 1. 获取 status
//status := md.Status
// 2. 获取 conditions 字段
//conditions := status.Conditions
// 3. 根据当前的需求,获取指定的 condition
var condition *myAppsv1.Condition
for i := range md.Status.Conditions {
// 4. 是否获取到
if md.Status.Conditions[i].Type == conditionType {
// 4.1 获取到了
condition = &md.Status.Conditions[i]
if condition != nil {
// 4.1.1 获取当前线上的 conditon 状态,与存储的condition进行比较,如果相同,跳过。不同,替换
if condition.Status != status ||
condition.Message != message ||
condition.Reason != reason {
condition.Status = status
condition.Message = message
condition.Reason = reason
} else {
// 4.2 没获取到,创建这个conditon,更新到conditons中
md.Status.Conditions = append(md.Status.Conditions,
createCondition(conditionType, message, status, reason))
// 5. 继续处理其他的conditions
m, re, p, sus := isSuccess(md.Status.Conditions)
if sus {
// 6.1 如果所有conditions的状态都为成功,则更新总的 status 为成功。
md.Status.Message = myAppsv1.StatusMessageSuccess
md.Status.Reason = myAppsv1.StatusReasonSuccess
md.Status.Phase = myAppsv1.StatusPhaseComplete
} else {
// 6.2 遍历所有的conditons 状态,如果有任意一个condition不是完成的状态,则将这个状态更新到总的 status 中。更待一定时间再次入队。
md.Status.Message = m
md.Status.Reason = re
md.Status.Phase = p
// 7. 执行更新
return sus, r.Client.Status().Update(ctx, md)
func isSuccess(conditions []myAppsv1.Condition) (message, reason, phase string, sus bool) {
if len(conditions) == 0 {
return "", "", "", false
for i := range conditions {
if conditions[i].Status == myAppsv1.ConditonStatusFalse {
return conditions[i].Message, conditions[i].Reason, conditions[i].Type, false
return "", "", "", true
func createCondition(conditionType, message, status, reason string) myAppsv1.Condition {
return myAppsv1.Condition{
Type: conditionType,
Message: message,
Status: status,
Reason: reason,
LastTransitionTime: metav1.NewTime(time.Now()),
// 需要是幂等的,可以多次执行,不管是否存在。如果存在就删除,不存在就什么也不做
// 只是删除对应的Condition不做更多的操作
func (r *MsbDeploymentReconciler) deleteStatus(md *myAppsv1.MsbDeployment, conditionType string) {
// 1. 遍历conditions
for i := range md.Status.Conditions {
// 2. 找到要删除的对象
if md.Status.Conditions[i].Type == conditionType {
// 3. 执行删除
md.Status.Conditions = deleteCondition(md.Status.Conditions, i)
// a := struct {
// len int
// cap int
// [cap]int [1,2,3,4]
// }
// b = a
// b
// a
// a = append(a, "b")
// conditions = deleteCondition(conditions, 2)
// a = [1,2,3,4,5]
// f(a)
// f(b []int) {
// b = n[0:4]
// }
func deleteCondition(conditions []myAppsv1.Condition, i int) []myAppsv1.Condition {
// 前提:切片中的元素顺序不敏感
// 1. 要删除的元素的索引值不能大于切片长度
if i >= len(conditions) {
return []myAppsv1.Condition{}
// 2. 如果切片长度为1,且索引值为0,直接清空
if len(conditions) == 1 && i == 0 {
return conditions[:0]
// 3. 如果长度-1等于索引值,删除最后一个元素
if len(conditions)-1 == i {
return conditions[:len(conditions)-1]
// 4. 交换索引位置的元素和最后一个元素,删除最后一个元素
conditions[i], conditions[len(conditions)-1] = conditions[len(conditions)-1], conditions[i]
return conditions[:len(conditions)-1]