You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

140 lines
3.6 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package test
import (
"context"
"fmt"
"k8s-manager/handler/common"
"log"
"time"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/informers"
appv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/watch"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func WatchDeployments() {
// 1. 初始化配置
// 加载配置
config, err := clientcmd.BuildConfigFromFlags("", "../"+common.KubeConfigPath)
if err != nil {
log.Fatalln(err)
}
// 2. 获取clientSet客户端
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalln(err)
}
// 3. 监控Deployments列表
namespace := "kube-system"
watchInterface, err := clientSet.AppsV1().Deployments(namespace).
Watch(context.Background(), metav1.ListOptions{})
if err != nil {
log.Fatalln(err)
}
// 4. 持续处理watchInterface中的ResultChan
loopWatch:
for {
select {
case evt, ok := <-watchInterface.ResultChan():
// chan errorclosed
if !ok {
watchInterface.Stop()
break loopWatch
}
// assert 为目标类型
deployment := evt.Object.(*appv1.Deployment)
// 处理监控的到事件
switch evt.Type {
case watch.Error:
fmt.Println("Deployment Error.")
case watch.Added:
fmt.Println("Deployment Added. Name:", deployment.Name)
case watch.Modified:
fmt.Println("Deployment Modified. Name:", deployment.Name)
case watch.Deleted:
fmt.Println("Deployment Deleted. Name:", deployment.Name)
}
}
}
}
func InformerDeployments() {
// 1. 配置
config, err := clientcmd.BuildConfigFromFlags("", "../"+common.KubeConfigPath)
if err != nil {
log.Fatalln(err)
}
// 2. 客户端
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalln(err)
}
// 3. 获取Informer
// 初始化共享Informer工厂
// client, 同步周期每几秒将集群资源信息同步到客户端full全量缓存
informerFactory := informers.NewSharedInformerFactory(clientSet, 10*time.Second)
// 设置监听,GVR
deploymentInformer := informerFactory.Apps().V1().Deployments()
// 创建Informer
indexInformer := deploymentInformer.Informer()
// 4. 定义Informer监控事件处理器
indexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
deployment := obj.(*appv1.Deployment)
fmt.Println("Deployment Added. Name:", deployment.Name)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldDeployment := oldObj.(*appv1.Deployment)
newDeployment := newObj.(*appv1.Deployment)
fmt.Printf("Deployment Modified. OldName:%s, NewName:%s\n", oldDeployment.Name, newDeployment.Name)
},
DeleteFunc: func(obj interface{}) {
deployment := obj.(*appv1.Deployment)
fmt.Println("Deployment Deleted. Name:", deployment.Name)
},
})
// 5. 启动监控
//设置stop信号的channel
stopCh := make(chan struct{})
defer close(stopCh)
// 启动工厂中全部的Informer使用独立的goroutine完成监控
informerFactory.Start(stopCh)
// 同步客户端缓存
informerFactory.WaitForCacheSync(stopCh)
// 6. 查询列表(可选的)
log.Println("--------------------------------------")
lister := deploymentInformer.Lister()
namespace := "kube-system"
deployments, err := lister.Deployments(namespace).List(labels.Everything())
if err != nil {
log.Fatalln(err)
}
for _, dp := range deployments {
log.Printf("Name:%s Status:%d/%d\n", dp.Name, dp.Status.ReadyReplicas, dp.Status.Replicas)
}
log.Println("--------------------------------------")
// 保持运行
select {}
}