You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

638 lines
20 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

/*
Copyright 2022.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
networkv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"reflect"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"strings"
"time"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
myAppsv1 "mashibing.com/pkg/mashibing-deployment/api/v1"
)
var WaitRequeue = 10 * time.Second
// MsbDeploymentReconciler reconciles a MsbDeployment object
type MsbDeploymentReconciler struct {
client.Client
DynamicClient dynamic.Interface // 用来访问 issuer和certificate资源
Scheme *runtime.Scheme
}
// 创建GVR, 共动态客户端使用
var (
// issuer
issuerGVR = schema.GroupVersionResource{
Group: "cert-manager.io",
Version: "v1",
Resource: "issuers",
}
// certificate
certGVR = schema.GroupVersionResource{
Group: "cert-manager.io",
Version: "v1",
Resource: "certificates",
}
)
//+kubebuilder:rbac:groups=apps.mashibing.com,resources=msbdeployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=apps.mashibing.com,resources=msbdeployments/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=apps.mashibing.com,resources=msbdeployments/finalizers,verbs=update
//+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="apps",resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="networking.k8s.io",resources=ingresses,verbs=get;list;watch;create;update;patch;delete
// 创建 issuer 和 certificate 资源需要的权限
//+kubebuilder:rbac:groups=cert-manager.io,resources=issuers,verbs=get;list;watch;create;update;patch
//+kubebuilder:rbac:groups=cert-manager.io,resources=certificates,verbs=get;list;watch;create;update;patch
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// the MsbDeployment object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.13.0/pkg/reconcile
func (r *MsbDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// 状态更新策略
// 创建的时候
// 更新为创建
// 更新的时候
// 根据获取的状态来判断时候更新status
// 删除的时候
// 只有在操作 ingress 的时候,并且 mode 为 nodeport 的时候
logger := log.FromContext(ctx, "MsbDployment", req.NamespacedName)
logger.Info("Reconcile is started.")
// 1. 获取资源对象
md := new(myAppsv1.MsbDeployment)
if err := r.Client.Get(ctx, req.NamespacedName, md); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// 防止污染缓存
mdCopy := md.DeepCopy()
// ======= 处理 deployment ======
// 2. 获取deployment资源对象
deploy := new(appsv1.Deployment)
if err := r.Client.Get(ctx, req.NamespacedName, deploy); err != nil {
if errors.IsNotFound(err) {
// 2.1 不存在对象
// 2.1.1 创建 deployment
if errCreate := r.createDeployment(ctx, mdCopy); err != nil {
return ctrl.Result{}, errCreate
}
if _, errStatus := r.updateStatus(ctx,
mdCopy,
myAppsv1.ConditionTypeDeployment,
fmt.Sprintf(myAppsv1.ConditionMessageDeploymentNotFmt, req.Name),
myAppsv1.ConditonStatusFalse,
myAppsv1.ConditionReasonDeploymentNotReady); errStatus != nil {
return ctrl.Result{}, errStatus
}
} else {
if _, errStatus := r.updateStatus(ctx,
mdCopy,
myAppsv1.ConditionTypeDeployment,
fmt.Sprintf("Deployment %s, err: %s", req.Name, err.Error()),
myAppsv1.ConditonStatusFalse,
myAppsv1.ConditionReasonDeploymentNotReady); errStatus != nil {
return ctrl.Result{}, errStatus
}
return ctrl.Result{}, err
}
} else {
// 2.2 存在对象
// 2.2.1 更新 deployment
if err := r.updateDeployment(ctx, mdCopy, deploy); err != nil {
return ctrl.Result{}, err
}
if deploy.Status.AvailableReplicas == mdCopy.Spec.Replicas {
if _, errStatus := r.updateStatus(ctx,
mdCopy,
myAppsv1.ConditionTypeDeployment,
fmt.Sprintf(myAppsv1.ConditionMessageDeploymentOKFmt, req.Name),
myAppsv1.ConditonStatusTrue,
myAppsv1.ConditionReasonDeploymentReady); errStatus != nil {
return ctrl.Result{}, errStatus
}
} else {
if _, errStatus := r.updateStatus(ctx,
mdCopy,
myAppsv1.ConditionTypeDeployment,
fmt.Sprintf(myAppsv1.ConditionMessageDeploymentNotFmt, req.Name),
myAppsv1.ConditonStatusFalse,
myAppsv1.ConditionReasonDeploymentNotReady); errStatus != nil {
return ctrl.Result{}, errStatus
}
}
}
// ======= 处理 service =========
// 3. 获取 service 资源对象
svc := new(corev1.Service)
if err := r.Client.Get(ctx, req.NamespacedName, svc); err != nil {
if errors.IsNotFound(err) {
// 3.1 不存在
// 3.1.1 mode 为 ingress
if strings.ToLower(mdCopy.Spec.Expose.Mode) == myAppsv1.ModeIngress {
// 3.1.1.1 创建普通service
if err := r.createService(ctx, mdCopy); err != nil {
return ctrl.Result{}, err
}
} else if strings.ToLower(mdCopy.Spec.Expose.Mode) == myAppsv1.ModeNodePort {
// 3.1.2 mode 为 nodeport
// 3.1.2.1 创建 nodeport 模式的 service
if err := r.createNPService(ctx, mdCopy); err != nil {
return ctrl.Result{}, err
}
} else {
return ctrl.Result{}, myAppsv1.ErrorNotSupportMode
}
if _, errStatus := r.updateStatus(ctx,
mdCopy,
myAppsv1.ConditionTypeService,
fmt.Sprintf(myAppsv1.ConditionMessageServiceNotFmt, req.Name),
myAppsv1.ConditonStatusFalse,
myAppsv1.ConditionReasonServiceNotReady); errStatus != nil {
return ctrl.Result{}, errStatus
}
} else {
if _, errStatus := r.updateStatus(ctx,
mdCopy,
myAppsv1.ConditionTypeService,
fmt.Sprintf("Service %s, err: %s", req.Name, err.Error()),
myAppsv1.ConditonStatusFalse,
myAppsv1.ConditionReasonServiceNotReady); errStatus != nil {
return ctrl.Result{}, errStatus
}
return ctrl.Result{}, err
}
} else {
// 3.2 存在
if strings.ToLower(mdCopy.Spec.Expose.Mode) == myAppsv1.ModeIngress {
// 3.2.1 mode 为 ingress
// 3.2.1.1 更新普通的 service
if err := r.updateService(ctx, mdCopy, svc); err != nil {
return ctrl.Result{}, err
}
} else if strings.ToLower(mdCopy.Spec.Expose.Mode) == myAppsv1.ModeNodePort {
// 3.2.2 mode 为 nodeport
// 3.2.2.1 更新nodeport模式的service
if err := r.updateNPService(ctx, mdCopy, svc); err != nil {
return ctrl.Result{}, err
}
} else {
return ctrl.Result{}, myAppsv1.ErrorNotSupportMode
}
if _, errStatus := r.updateStatus(ctx,
mdCopy,
myAppsv1.ConditionTypeService,
fmt.Sprintf(myAppsv1.ConditionMessageServiceOKFmt, req.Name),
myAppsv1.ConditonStatusTrue,
myAppsv1.ConditionReasonServiceReady); errStatus != nil {
return ctrl.Result{}, errStatus
}
}
// ======= 处理 ingress =========
// 4 获取 ingress 资源
ig := new(networkv1.Ingress)
if err := r.Client.Get(ctx, req.NamespacedName, ig); err != nil {
if errors.IsNotFound(err) {
// 4.1 不存在
if strings.ToLower(mdCopy.Spec.Expose.Mode) == myAppsv1.ModeIngress {
// 4.1.1 mode 为 ingress
// 4.1.1.1 创建 ingress
if err := r.createIngress(ctx, mdCopy); err != nil {
return ctrl.Result{}, err
}
if _, errStatus := r.updateStatus(ctx,
mdCopy,
myAppsv1.ConditionTypeIngress,
fmt.Sprintf(myAppsv1.ConditionMessageIngressNotFmt, req.Name),
myAppsv1.ConditonStatusFalse,
myAppsv1.ConditionReasonIngressNotReady); errStatus != nil {
return ctrl.Result{}, errStatus
}
if mdCopy.Spec.Expose.Tls {
// 创建 issuers
if err := r.createIssuer(ctx, mdCopy); err != nil {
return ctrl.Result{}, err
}
// 创建 certificates
if err := r.createCert(ctx, mdCopy); err != nil {
return ctrl.Result{}, err
}
}
} else if strings.ToLower(mdCopy.Spec.Expose.Mode) == myAppsv1.ModeNodePort {
// 4.1.2 mode 为 nodeport
// 4.1.2.1 退出
return ctrl.Result{}, nil
}
} else {
if _, errStatus := r.updateStatus(ctx,
mdCopy,
myAppsv1.ConditionTypeIngress,
fmt.Sprintf("Ingress %s, err: %s", req.Name, err.Error()),
myAppsv1.ConditonStatusFalse,
myAppsv1.ConditionReasonIngressNotReady); errStatus != nil {
return ctrl.Result{}, errStatus
}
return ctrl.Result{}, err
}
} else {
// 4.2 存在
if strings.ToLower(mdCopy.Spec.Expose.Mode) == myAppsv1.ModeIngress {
// 4.2.1 mode 为 ingress
// 4.2.1.1更新 ingress
if err := r.updateIngress(ctx, mdCopy, ig); err != nil {
return ctrl.Result{}, err
}
if _, errStatus := r.updateStatus(ctx,
mdCopy,
myAppsv1.ConditionTypeIngress,
fmt.Sprintf(myAppsv1.ConditionMessageIngressOKFmt, req.Name),
myAppsv1.ConditonStatusTrue,
myAppsv1.ConditionReasonIngressReady); errStatus != nil {
return ctrl.Result{}, errStatus
}
if mdCopy.Spec.Expose.Tls {
// 创建 issuers
if err := r.createIssuer(ctx, mdCopy); err != nil {
return ctrl.Result{}, err
}
// 创建 certificates
if err := r.createCert(ctx, mdCopy); err != nil {
return ctrl.Result{}, err
}
}
} else if strings.ToLower(mdCopy.Spec.Expose.Mode) == myAppsv1.ModeNodePort {
// 4.2.2 mode 为 nodeport
// 4.2.2.1 删除 ingress
if err := r.deleteIngress(ctx, mdCopy); err != nil {
return ctrl.Result{}, err
}
r.deleteStatus(mdCopy, myAppsv1.ConditionTypeIngress)
}
}
// 最后检查状态时候最终完成
if sus, errStatus := r.updateStatus(ctx,
mdCopy,
"",
"",
"",
""); errStatus != nil {
return ctrl.Result{}, errStatus
} else if !sus {
logger.Info("Reconcile is ended.")
return ctrl.Result{RequeueAfter: WaitRequeue}, nil
}
logger.Info("Reconcile is ended.")
return ctrl.Result{}, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *MsbDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&myAppsv1.MsbDeployment{}).
Owns(&appsv1.Deployment{}). // 监控 deployment 类型,变更就触发 reconciler
Owns(&corev1.Service{}). // 监控 service 类型,变更就触发 reconciler
Owns(&networkv1.Ingress{}). // 监控 ingress 类型,变更就触发 reconciler
Complete(r)
}
func (r *MsbDeploymentReconciler) createDeployment(ctx context.Context, md *myAppsv1.MsbDeployment) error {
deploy, err := NewDeployment(md)
if err != nil {
return err
}
// 设置 deployment 所属于 md
if err := controllerutil.SetControllerReference(md, deploy, r.Scheme); err != nil {
return err
}
return r.Client.Create(ctx, deploy)
}
func (r *MsbDeploymentReconciler) updateDeployment(ctx context.Context, md *myAppsv1.MsbDeployment, dp *appsv1.Deployment) error {
deploy, err := NewDeployment(md)
if err != nil {
return err
}
// 设置 deployment 所属于 md
if err := controllerutil.SetControllerReference(md, deploy, r.Scheme); err != nil {
return err
}
// 预更新deployment得到更新后的数据
if err := r.Update(ctx, deploy, client.DryRunAll); err != nil {
return err
}
// 和之前的数据进行比较,如果相同,说明更新不需要。
if reflect.DeepEqual(dp.Spec, deploy.Spec) {
return nil
}
return r.Client.Update(ctx, deploy)
}
func (r *MsbDeploymentReconciler) createService(ctx context.Context, md *myAppsv1.MsbDeployment) error {
svc, err := NewService(md)
if err != nil {
return err
}
// 设置 service 所属于 md
if err := controllerutil.SetControllerReference(md, svc, r.Scheme); err != nil {
return err
}
return r.Client.Create(ctx, svc)
}
func (r *MsbDeploymentReconciler) createNPService(ctx context.Context, md *myAppsv1.MsbDeployment) error {
svc, err := NewServiceNP(md)
if err != nil {
return err
}
// 设置 service 所属于 md
if err := controllerutil.SetControllerReference(md, svc, r.Scheme); err != nil {
return err
}
return r.Client.Create(ctx, svc)
}
func (r *MsbDeploymentReconciler) updateService(ctx context.Context, md *myAppsv1.MsbDeployment, service *corev1.Service) error {
svc, err := NewService(md)
if err != nil {
return err
}
// 设置 service 所属于 md
if err := controllerutil.SetControllerReference(md, svc, r.Scheme); err != nil {
return err
}
// 预更新service得到更新后的数据
if err := r.Update(ctx, svc, client.DryRunAll); err != nil {
return err
}
// 和之前的数据进行比较,如果相同,说明更新不需要。
if reflect.DeepEqual(service.Spec, svc.Spec) {
return nil
}
return r.Client.Update(ctx, svc)
}
func (r *MsbDeploymentReconciler) updateNPService(ctx context.Context, md *myAppsv1.MsbDeployment, service *corev1.Service) error {
svc, err := NewServiceNP(md)
if err != nil {
return err
}
// 设置 service 所属于 md
if err := controllerutil.SetControllerReference(md, svc, r.Scheme); err != nil {
return err
}
// 预更新service得到更新后的数据
if err := r.Update(ctx, svc, client.DryRunAll); err != nil {
return err
}
// 和之前的数据进行比较,如果相同,说明更新不需要。
if reflect.DeepEqual(service.Spec, svc.Spec) {
return nil
}
return r.Client.Update(ctx, svc)
}
func (r *MsbDeploymentReconciler) createIngress(ctx context.Context, md *myAppsv1.MsbDeployment) error {
ig, err := NewIngress(md)
if err != nil {
return err
}
// 设置 ingress 所属于 md
if err := controllerutil.SetControllerReference(md, ig, r.Scheme); err != nil {
return err
}
return r.Client.Create(ctx, ig)
}
func (r *MsbDeploymentReconciler) updateIngress(ctx context.Context, md *myAppsv1.MsbDeployment, ingress *networkv1.Ingress) error {
ig, err := NewIngress(md)
if err != nil {
return err
}
// 设置 ingress 所属于 md
if err := controllerutil.SetControllerReference(md, ig, r.Scheme); err != nil {
return err
}
// 预更新ingress得到更新后的数据
if err := r.Update(ctx, ingress, client.DryRunAll); err != nil {
return err
}
// 和之前的数据进行比较,如果相同,说明更新不需要。
if reflect.DeepEqual(ingress.Spec, ig.Spec) {
return nil
}
return r.Client.Update(ctx, ig)
}
func (r *MsbDeploymentReconciler) deleteIngress(ctx context.Context, md *myAppsv1.MsbDeployment) error {
ig, err := NewIngress(md)
if err != nil {
return err
}
return r.Client.Delete(ctx, ig)
}
// 处理status
// return:
//
// bool: 资源是否完成,时候需要等待。如果是 true表示资源已经完成没不需要再次reconcile
// 如果是 false表示资源还未完成需要重新入队
// error执行 update 的状态
func (r *MsbDeploymentReconciler) updateStatus(ctx context.Context, md *myAppsv1.MsbDeployment, conditionType, message, status, reason string) (bool, error) {
if conditionType != "" {
// 1. 获取 status
//status := md.Status
// 2. 获取 conditions 字段
//conditions := status.Conditions
// 3. 根据当前的需求,获取指定的 condition
var condition *myAppsv1.Condition
for i := range md.Status.Conditions {
// 4. 是否获取到
if md.Status.Conditions[i].Type == conditionType {
// 4.1 获取到了
condition = &md.Status.Conditions[i]
}
}
if condition != nil {
// 4.1.1 获取当前线上的 conditon 状态与存储的condition进行比较如果相同跳过。不同替换
if condition.Status != status ||
condition.Message != message ||
condition.Reason != reason {
condition.Status = status
condition.Message = message
condition.Reason = reason
}
} else {
// 4.2 没获取到创建这个conditon更新到conditons中
md.Status.Conditions = append(md.Status.Conditions,
createCondition(conditionType, message, status, reason))
}
}
// 5. 继续处理其他的conditions
m, re, p, sus := isSuccess(md.Status.Conditions)
if sus {
// 6.1 如果所有conditions的状态都为成功则更新总的 status 为成功。
md.Status.Message = myAppsv1.StatusMessageSuccess
md.Status.Reason = myAppsv1.StatusReasonSuccess
md.Status.Phase = myAppsv1.StatusPhaseComplete
} else {
// 6.2 遍历所有的conditons 状态如果有任意一个condition不是完成的状态则将这个状态更新到总的 status 中。更待一定时间再次入队。
md.Status.Message = m
md.Status.Reason = re
md.Status.Phase = p
}
// 7. 执行更新
return sus, r.Client.Status().Update(ctx, md)
}
func isSuccess(conditions []myAppsv1.Condition) (message, reason, phase string, sus bool) {
if len(conditions) == 0 {
return "", "", "", false
}
for i := range conditions {
if conditions[i].Status == myAppsv1.ConditonStatusFalse {
return conditions[i].Message, conditions[i].Reason, conditions[i].Type, false
}
}
return "", "", "", true
}
func createCondition(conditionType, message, status, reason string) myAppsv1.Condition {
return myAppsv1.Condition{
Type: conditionType,
Message: message,
Status: status,
Reason: reason,
LastTransitionTime: metav1.NewTime(time.Now()),
}
}
// 需要是幂等的,可以多次执行,不管是否存在。如果存在就删除,不存在就什么也不做
// 只是删除对应的Condition不做更多的操作
func (r *MsbDeploymentReconciler) deleteStatus(md *myAppsv1.MsbDeployment, conditionType string) {
// 1. 遍历conditions
for i := range md.Status.Conditions {
// 2. 找到要删除的对象
if md.Status.Conditions[i].Type == conditionType {
// 3. 执行删除
md.Status.Conditions = deleteCondition(md.Status.Conditions, i)
}
}
}
func (r *MsbDeploymentReconciler) createIssuer(ctx context.Context, mdCopy *myAppsv1.MsbDeployment) error {
return nil
}
func (r *MsbDeploymentReconciler) createCert(ctx context.Context, mdCopy *myAppsv1.MsbDeployment) error {
return nil
}
// a := struct {
// len int
// cap int
// [cap]int [1,2,3,4]
// }
//
// b = a
//
// b
//
// a
// a = append(a, "b")
// conditions = deleteCondition(conditions, 2)
// a = [1,2,3,4,5]
// f(a)
//
// f(b []int) {
// b = n[0:4]
// }
func deleteCondition(conditions []myAppsv1.Condition, i int) []myAppsv1.Condition {
// 前提:切片中的元素顺序不敏感
// 1. 要删除的元素的索引值不能大于切片长度
if i >= len(conditions) {
return []myAppsv1.Condition{}
}
// 2. 如果切片长度为1且索引值为0直接清空
if len(conditions) == 1 && i == 0 {
return conditions[:0]
}
// 3. 如果长度-1等于索引值删除最后一个元素
if len(conditions)-1 == i {
return conditions[:len(conditions)-1]
}
// 4. 交换索引位置的元素和最后一个元素,删除最后一个元素
conditions[i], conditions[len(conditions)-1] = conditions[len(conditions)-1], conditions[i]
return conditions[:len(conditions)-1]
}