han-joker 7 months ago
parent acc9386cd3
commit b210f50cfe

@ -0,0 +1,139 @@
package test
import (
"context"
"fmt"
"k8s-manager/handler/common"
"log"
"time"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/informers"
appv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/watch"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func WatchDeployments() {
// 1. 初始化配置
// 加载配置
config, err := clientcmd.BuildConfigFromFlags("", "../"+common.KubeConfigPath)
if err != nil {
log.Fatalln(err)
}
// 2. 获取clientSet客户端
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalln(err)
}
// 3. 监控Deployments列表
namespace := "kube-system"
watchInterface, err := clientSet.AppsV1().Deployments(namespace).
Watch(context.Background(), metav1.ListOptions{})
if err != nil {
log.Fatalln(err)
}
// 4. 持续处理watchInterface中的ResultChan
loopWatch:
for {
select {
case evt, ok := <-watchInterface.ResultChan():
// chan errorclosed
if !ok {
watchInterface.Stop()
break loopWatch
}
// assert 为目标类型
deployment := evt.Object.(*appv1.Deployment)
// 处理监控的到事件
switch evt.Type {
case watch.Error:
fmt.Println("Deployment Error.")
case watch.Added:
fmt.Println("Deployment Added. Name:", deployment.Name)
case watch.Modified:
fmt.Println("Deployment Modified. Name:", deployment.Name)
case watch.Deleted:
fmt.Println("Deployment Deleted. Name:", deployment.Name)
}
}
}
}
func InformerDeployments() {
// 1. 配置
config, err := clientcmd.BuildConfigFromFlags("", "../"+common.KubeConfigPath)
if err != nil {
log.Fatalln(err)
}
// 2. 客户端
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalln(err)
}
// 3. 获取Informer
// 初始化共享Informer工厂
// client, 同步周期每几秒将集群资源信息同步到客户端full全量缓存
informerFactory := informers.NewSharedInformerFactory(clientSet, 10*time.Second)
// 设置监听,GVR
deploymentInformer := informerFactory.Apps().V1().Deployments()
// 创建Informer
indexInformer := deploymentInformer.Informer()
// 4. 定义Informer监控事件处理器
indexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
deployment := obj.(*appv1.Deployment)
fmt.Println("Deployment Added. Name:", deployment.Name)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldDeployment := oldObj.(*appv1.Deployment)
newDeployment := newObj.(*appv1.Deployment)
fmt.Printf("Deployment Modified. OldName:%s, NewName:%s\n", oldDeployment.Name, newDeployment.Name)
},
DeleteFunc: func(obj interface{}) {
deployment := obj.(*appv1.Deployment)
fmt.Println("Deployment Deleted. Name:", deployment.Name)
},
})
// 5. 启动监控
//设置stop信号的channel
stopCh := make(chan struct{})
defer close(stopCh)
// 启动工厂中全部的Informer使用独立的goroutine完成监控
informerFactory.Start(stopCh)
// 同步客户端缓存
informerFactory.WaitForCacheSync(stopCh)
// 6. 查询列表(可选的)
log.Println("--------------------------------------")
lister := deploymentInformer.Lister()
namespace := "kube-system"
deployments, err := lister.Deployments(namespace).List(labels.Everything())
if err != nil {
log.Fatalln(err)
}
for _, dp := range deployments {
log.Printf("Name:%s Status:%d/%d\n", dp.Name, dp.Status.ReadyReplicas, dp.Status.Replicas)
}
log.Println("--------------------------------------")
// 保持运行
select {}
}

@ -0,0 +1,11 @@
package test
import "testing"
func TestWatchDeployments(t *testing.T) {
WatchDeployments()
}
func TestInformerDeployments(t *testing.T) {
InformerDeployments()
}
Loading…
Cancel
Save