You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

581 lines
18 KiB

/*
Copyright 2022.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"fmt"
2 years ago
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
networkv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"reflect"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
2 years ago
"strings"
"time"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
2 years ago
myAppsv1 "mashibing.com/pkg/mashibing-deployment/api/v1"
)
var WaitRequeue = 10 * time.Second
// MsbDeploymentReconciler reconciles a MsbDeployment object
type MsbDeploymentReconciler struct {
client.Client
DynamicClient dynamic.Interface // 用来访问 issuer和certificate资源
Scheme *runtime.Scheme
}
// 创建GVR, 共动态客户端使用
var (
// issuer
issuerGVR = schema.GroupVersionResource{
Group: "cert-manager.io",
Version: "v1",
Resource: "issuers",
}
// certificate
certGVR = schema.GroupVersionResource{
Group: "cert-manager.io",
Version: "v1",
Resource: "certificates",
}
)
//+kubebuilder:rbac:groups=apps.mashibing.com,resources=msbdeployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=apps.mashibing.com,resources=msbdeployments/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=apps.mashibing.com,resources=msbdeployments/finalizers,verbs=update
//+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="apps",resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="networking.k8s.io",resources=ingresses,verbs=get;list;watch;create;update;patch;delete
// 创建 issuer 和 certificate 资源需要的权限
//+kubebuilder:rbac:groups=cert-manager.io,resources=issuers,verbs=get;list;watch;create;update;patch
//+kubebuilder:rbac:groups=cert-manager.io,resources=certificates,verbs=get;list;watch;create;update;patch
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// the MsbDeployment object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.13.0/pkg/reconcile
func (r *MsbDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// 状态更新策略
// 创建的时候
// 更新为创建
// 更新的时候
// 根据获取的状态来判断时候更新status
// 删除的时候
// 只有在操作 ingress 的时候,并且 mode 为 nodeport 的时候
2 years ago
logger := log.FromContext(ctx, "MsbDployment", req.NamespacedName)
2 years ago
logger.Info("Reconcile is started.")
2 years ago
// 1. 获取资源对象
md := new(myAppsv1.MsbDeployment)
if err := r.Client.Get(ctx, req.NamespacedName, md); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// 防止污染缓存
mdCopy := md.DeepCopy()
// 处理最终的返回
defer func() {
if r.Ready(mdCopy) {
_ = r.Client.Status().Update(ctx, mdCopy)
return
}
if mdCopy.Status.ObservedGeneration != md.Status.ObservedGeneration {
_ = r.Client.Status().Update(ctx, mdCopy)
}
}()
2 years ago
// ======= 处理 deployment ======
// 2. 获取deployment资源对象
deploy := new(appsv1.Deployment)
if err := r.Client.Get(ctx, req.NamespacedName, deploy); err != nil {
if errors.IsNotFound(err) {
// 2.1 不存在对象
// 2.1.1 创建 deployment
if errCreate := r.createDeployment(ctx, mdCopy); err != nil {
return ctrl.Result{}, errCreate
}
r.updateConditions(
mdCopy,
myAppsv1.ConditionTypeDeployment,
fmt.Sprintf(myAppsv1.ConditionMessageDeploymentNotFmt, req.Name),
myAppsv1.ConditonStatusFalse,
myAppsv1.ConditionReasonDeploymentNotReady)
2 years ago
} else {
r.updateConditions(
mdCopy,
myAppsv1.ConditionTypeDeployment,
fmt.Sprintf("Deployment %s, err: %s", req.Name, err.Error()),
myAppsv1.ConditonStatusFalse,
myAppsv1.ConditionReasonDeploymentNotReady)
2 years ago
return ctrl.Result{}, err
}
} else {
// 2.2 存在对象
// 2.2.1 更新 deployment
if err := r.updateDeployment(ctx, mdCopy, deploy); err != nil {
2 years ago
return ctrl.Result{}, err
}
if deploy.Status.AvailableReplicas == mdCopy.Spec.Replicas {
r.updateConditions(
mdCopy,
myAppsv1.ConditionTypeDeployment,
fmt.Sprintf(myAppsv1.ConditionMessageDeploymentOKFmt, req.Name),
myAppsv1.ConditonStatusTrue,
myAppsv1.ConditionReasonDeploymentReady)
} else {
r.updateConditions(
mdCopy,
myAppsv1.ConditionTypeDeployment,
fmt.Sprintf(myAppsv1.ConditionMessageDeploymentNotFmt, req.Name),
myAppsv1.ConditonStatusFalse,
myAppsv1.ConditionReasonDeploymentNotReady)
}
2 years ago
}
// ======= 处理 service =========
// 3. 获取 service 资源对象
svc := new(corev1.Service)
if err := r.Client.Get(ctx, req.NamespacedName, svc); err != nil {
if errors.IsNotFound(err) {
// 3.1 不存在 创建 service
if err := r.createService(ctx, mdCopy); err != nil {
return ctrl.Result{}, err
}
2 years ago
} else {
r.updateConditions(
mdCopy,
myAppsv1.ConditionTypeService,
fmt.Sprintf("Service %s, err: %s", req.Name, err.Error()),
myAppsv1.ConditonStatusFalse,
myAppsv1.ConditionReasonServiceNotReady)
2 years ago
return ctrl.Result{}, err
}
} else {
// 3.2 存在 更新 service
if err := r.updateService(ctx, mdCopy, svc); err != nil {
return ctrl.Result{}, err
2 years ago
}
// 检查 现有状态来更新 status
r.updateConditions(
mdCopy,
myAppsv1.ConditionTypeService,
fmt.Sprintf(myAppsv1.ConditionMessageServiceOKFmt, req.Name),
myAppsv1.ConditonStatusTrue,
myAppsv1.ConditionReasonServiceReady)
2 years ago
}
// ======= 处理 ingress =========
// 4 获取 ingress 资源
ig := new(networkv1.Ingress)
if err := r.Client.Get(ctx, req.NamespacedName, ig); err != nil {
if errors.IsNotFound(err) {
// 4.1 不存在
if strings.ToLower(mdCopy.Spec.Expose.Mode) == myAppsv1.ModeIngress {
// 4.1.1 mode 为 ingress
// 4.1.1.1 创建 ingress
if err := r.createIngress(ctx, mdCopy); err != nil {
return ctrl.Result{}, err
}
r.updateConditions(
mdCopy,
myAppsv1.ConditionTypeIngress,
fmt.Sprintf(myAppsv1.ConditionMessageIngressNotFmt, req.Name),
myAppsv1.ConditonStatusFalse,
myAppsv1.ConditionReasonIngressNotReady)
if mdCopy.Spec.Expose.Tls {
// 创建 issuers
if err := r.createIssuer(ctx, mdCopy); err != nil {
return ctrl.Result{}, err
}
// 创建 certificates
if err := r.createCert(ctx, mdCopy); err != nil {
return ctrl.Result{}, err
}
}
2 years ago
} else if strings.ToLower(mdCopy.Spec.Expose.Mode) == myAppsv1.ModeNodePort {
// 4.1.2 mode 为 nodeport
// 4.1.2.1 退出
return ctrl.Result{}, nil
}
} else {
r.updateConditions(
mdCopy,
myAppsv1.ConditionTypeIngress,
fmt.Sprintf("Ingress %s, err: %s", req.Name, err.Error()),
myAppsv1.ConditonStatusFalse,
myAppsv1.ConditionReasonIngressNotReady)
2 years ago
return ctrl.Result{}, err
}
} else {
// 4.2 存在
if strings.ToLower(mdCopy.Spec.Expose.Mode) == myAppsv1.ModeIngress {
// 4.2.1 mode 为 ingress
// 4.2.1.1更新 ingress
if err := r.updateIngress(ctx, mdCopy, ig); err != nil {
2 years ago
return ctrl.Result{}, err
}
r.updateConditions(
mdCopy,
myAppsv1.ConditionTypeIngress,
fmt.Sprintf(myAppsv1.ConditionMessageIngressOKFmt, req.Name),
myAppsv1.ConditonStatusTrue,
myAppsv1.ConditionReasonIngressReady)
if mdCopy.Spec.Expose.Tls {
// 创建 issuers
if err := r.createIssuer(ctx, mdCopy); err != nil {
return ctrl.Result{}, err
}
// 创建 certificates
if err := r.createCert(ctx, mdCopy); err != nil {
return ctrl.Result{}, err
}
}
2 years ago
} else if strings.ToLower(mdCopy.Spec.Expose.Mode) == myAppsv1.ModeNodePort {
// 4.2.2 mode 为 nodeport
// 4.2.2.1 删除 ingress
if err := r.deleteIngress(ctx, mdCopy); err != nil {
return ctrl.Result{}, err
}
r.deleteStatus(mdCopy, myAppsv1.ConditionTypeIngress)
2 years ago
}
}
logger.Info("Reconcile is ended.")
if !r.Ready(mdCopy) {
return ctrl.Result{RequeueAfter: WaitRequeue}, nil
}
return ctrl.Result{}, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *MsbDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
2 years ago
For(&myAppsv1.MsbDeployment{}).
Owns(&appsv1.Deployment{}). // 监控 deployment 类型,变更就触发 reconciler
2 years ago
Owns(&corev1.Service{}). // 监控 service 类型,变更就触发 reconciler
Owns(&networkv1.Ingress{}). // 监控 ingress 类型,变更就触发 reconciler
Complete(r)
}
2 years ago
func (r *MsbDeploymentReconciler) createDeployment(ctx context.Context, md *myAppsv1.MsbDeployment) error {
deploy := NewDeployment(md)
// 设置 deployment 所属于 md
if err := controllerutil.SetControllerReference(md, &deploy, r.Scheme); err != nil {
return err
}
return r.Client.Create(ctx, &deploy)
2 years ago
}
func (r *MsbDeploymentReconciler) updateDeployment(ctx context.Context, md *myAppsv1.MsbDeployment, dp *appsv1.Deployment) error {
deploy := NewDeployment(md)
// 设置 deployment 所属于 md
if err := controllerutil.SetControllerReference(md, &deploy, r.Scheme); err != nil {
return err
}
// 预更新deployment得到更新后的数据
if err := r.Update(ctx, &deploy, client.DryRunAll); err != nil {
return err
}
// 和之前的数据进行比较,如果相同,说明更新不需要。
if reflect.DeepEqual(dp.Spec, deploy.Spec) {
return nil
}
return r.Client.Update(ctx, &deploy)
2 years ago
}
func (r *MsbDeploymentReconciler) createService(ctx context.Context, md *myAppsv1.MsbDeployment) error {
svc := NewService(md)
// 设置 service 所属于 md
if err := controllerutil.SetControllerReference(md, &svc, r.Scheme); err != nil {
return err
}
return r.Client.Create(ctx, &svc)
2 years ago
}
func (r *MsbDeploymentReconciler) updateService(ctx context.Context, md *myAppsv1.MsbDeployment, service *corev1.Service) error {
svc := NewService(md)
// 设置 service 所属于 md
if err := controllerutil.SetControllerReference(md, &svc, r.Scheme); err != nil {
return err
}
// 预更新service得到更新后的数据
if err := r.Update(ctx, &svc, client.DryRunAll); err != nil {
return err
}
// 和之前的数据进行比较,如果相同,说明更新不需要。
if reflect.DeepEqual(service.Spec, svc.Spec) {
return nil
}
return r.Client.Update(ctx, &svc)
2 years ago
}
func (r *MsbDeploymentReconciler) createIngress(ctx context.Context, md *myAppsv1.MsbDeployment) error {
ig := NewIngress(md)
// 设置 ingress 所属于 md
if err := controllerutil.SetControllerReference(md, &ig, r.Scheme); err != nil {
return err
}
return r.Client.Create(ctx, &ig)
2 years ago
}
func (r *MsbDeploymentReconciler) updateIngress(ctx context.Context, md *myAppsv1.MsbDeployment, ingress *networkv1.Ingress) error {
ig := NewIngress(md)
// 设置 ingress 所属于 md
if err := controllerutil.SetControllerReference(md, &ig, r.Scheme); err != nil {
return err
}
// 预更新ingress得到更新后的数据
if err := r.Update(ctx, ingress, client.DryRunAll); err != nil {
return err
}
// 和之前的数据进行比较,如果相同,说明更新不需要。
if reflect.DeepEqual(ingress.Spec, ig.Spec) {
return nil
}
return r.Client.Update(ctx, &ig)
2 years ago
}
func (r *MsbDeploymentReconciler) deleteIngress(ctx context.Context, md *myAppsv1.MsbDeployment) error {
ig := NewIngress(md)
return r.Client.Delete(ctx, &ig)
2 years ago
}
// 更新Condition并变更版本
func (r *MsbDeploymentReconciler) updateConditions(md *myAppsv1.MsbDeployment, conditionType, message, status, reason string) {
// 1. 获取 status
//status := md.Status
// 2. 获取 conditions 字段
//conditions := status.Conditions
// 3. 根据当前的需求,获取指定的 condition
var condition *myAppsv1.Condition
for i := range md.Status.Conditions {
// 4. 是否获取到
if md.Status.Conditions[i].Type == conditionType {
// 4.1 获取到了
condition = &md.Status.Conditions[i]
}
}
if condition != nil {
// 4.1.1 获取当前线上的 conditon 状态与存储的condition进行比较如果相同跳过。不同替换
if condition.Status != status ||
condition.Message != message ||
condition.Reason != reason {
condition.Status = status
condition.Message = message
condition.Reason = reason
md.Status.ObservedGeneration += 1
}
} else {
// 4.2 没获取到创建这个conditon更新到conditons中
md.Status.Conditions = append(md.Status.Conditions,
createCondition(conditionType, message, status, reason))
md.Status.ObservedGeneration += 1
}
}
// Ready 判断本次 reconcile 是否达到预期
func (r *MsbDeploymentReconciler) Ready(md *myAppsv1.MsbDeployment) bool {
// 5. 继续处理其他的conditions
m, re, p, sus := isSuccess(md.Status.Conditions)
if sus {
// 6.1 如果所有conditions的状态都为成功则更新总的 status 为成功。
md.Status.Message = myAppsv1.StatusMessageSuccess
md.Status.Reason = myAppsv1.StatusReasonSuccess
md.Status.Phase = myAppsv1.StatusPhaseComplete
} else {
// 6.2 遍历所有的conditons 状态如果有任意一个condition不是完成的状态则将这个状态更新到总的 status 中。更待一定时间再次入队。
if md.Status.Message != m ||
md.Status.Reason != re ||
md.Status.Phase != p {
md.Status.Message = m
md.Status.Reason = re
md.Status.Phase = p
md.Status.ObservedGeneration += 1
}
}
return sus
}
func isSuccess(conditions []myAppsv1.Condition) (message, reason, phase string, sus bool) {
if len(conditions) == 0 {
return "", "", "", false
}
for i := range conditions {
if conditions[i].Status == myAppsv1.ConditonStatusFalse {
return conditions[i].Message, conditions[i].Reason, conditions[i].Type, false
}
}
return "", "", "", true
}
func createCondition(conditionType, message, status, reason string) myAppsv1.Condition {
return myAppsv1.Condition{
Type: conditionType,
Message: message,
Status: status,
Reason: reason,
LastTransitionTime: metav1.NewTime(time.Now()),
}
}
// 需要是幂等的,可以多次执行,不管是否存在。如果存在就删除,不存在就什么也不做
// 只是删除对应的Condition不做更多的操作
func (r *MsbDeploymentReconciler) deleteStatus(md *myAppsv1.MsbDeployment, conditionType string) {
// 1. 遍历conditions
2 years ago
var tmp []myAppsv1.Condition
copy(tmp, md.Status.Conditions)
for i := range tmp {
// 2. 找到要删除的对象
2 years ago
if tmp[i].Type == conditionType {
// 3. 执行删除
2 years ago
md.Status.Conditions = deleteCondition(tmp, i)
}
}
}
func (r *MsbDeploymentReconciler) createIssuer(ctx context.Context, md *myAppsv1.MsbDeployment) error {
// 1. 创建 issuer 资源
i, err := NewIssuer(md)
if err != nil {
return err
}
// 设置 issuer 所属于 md
if err := controllerutil.SetControllerReference(md, i, r.Scheme); err != nil {
return err
}
// 在k8s中创建issuer资源
if _, err := r.DynamicClient.Resource(issuerGVR).
Namespace(md.Namespace).
Create(ctx, i, metav1.CreateOptions{}); err != nil {
if errors.IsAlreadyExists(err) {
// 这是一个折中的考虑,在没有比较完整的处理证书更新的方案前,
// 这是一个简单并且不会出现意外错误的处理方式
return nil
}
return err
}
return nil
}
func (r *MsbDeploymentReconciler) createCert(ctx context.Context, md *myAppsv1.MsbDeployment) error {
// 1. 创建 issuer 资源
c, err := NewCert(md)
if err != nil {
return err
}
// 设置 issuer 所属于 md
if err := controllerutil.SetControllerReference(md, c, r.Scheme); err != nil {
return err
}
// 在k8s中创建certificate资源
if _, err := r.DynamicClient.Resource(certGVR).
Namespace(md.Namespace).
Create(ctx, c, metav1.CreateOptions{}); err != nil {
if errors.IsAlreadyExists(err) {
// 这是一个折中的考虑,在没有比较完整的处理证书更新的方案前,
// 这是一个简单并且不会出现意外错误的处理方式
return nil
}
return err
}
return nil
}
// a := struct {
// len int
// cap int
// [cap]int [1,2,3,4]
// }
//
// b = a
//
// b
//
// a
// a = append(a, "b")
// conditions = deleteCondition(conditions, 2)
// a = [1,2,3,4,5]
// f(a)
//
// f(b []int) {
// b = n[0:4]
// }
func deleteCondition(conditions []myAppsv1.Condition, i int) []myAppsv1.Condition {
// 前提:切片中的元素顺序不敏感
// 1. 要删除的元素的索引值不能大于切片长度
if i >= len(conditions) {
return []myAppsv1.Condition{}
}
// 2. 如果切片长度为1且索引值为0直接清空
if len(conditions) == 1 && i == 0 {
return conditions[:0]
}
// 3. 如果长度-1等于索引值删除最后一个元素
if len(conditions)-1 == i {
return conditions[:len(conditions)-1]
}
// 4. 交换索引位置的元素和最后一个元素,删除最后一个元素
conditions[i], conditions[len(conditions)-1] = conditions[len(conditions)-1], conditions[i]
return conditions[:len(conditions)-1]
}