diff --git a/test/watch.go b/test/watch.go new file mode 100644 index 0000000..520b1e2 --- /dev/null +++ b/test/watch.go @@ -0,0 +1,139 @@ +package test + +import ( + "context" + "fmt" + "k8s-manager/handler/common" + "log" + "time" + + "k8s.io/apimachinery/pkg/labels" + + "k8s.io/client-go/tools/cache" + + "k8s.io/client-go/informers" + + appv1 "k8s.io/api/apps/v1" + + "k8s.io/apimachinery/pkg/watch" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" +) + +func WatchDeployments() { + // 1. 初始化配置 + // 加载配置 + config, err := clientcmd.BuildConfigFromFlags("", "../"+common.KubeConfigPath) + if err != nil { + log.Fatalln(err) + } + + // 2. 获取clientSet客户端 + clientSet, err := kubernetes.NewForConfig(config) + if err != nil { + log.Fatalln(err) + } + // 3. 监控Deployments列表 + namespace := "kube-system" + watchInterface, err := clientSet.AppsV1().Deployments(namespace). + Watch(context.Background(), metav1.ListOptions{}) + if err != nil { + log.Fatalln(err) + } + + // 4. 持续处理watchInterface中的ResultChan +loopWatch: + for { + select { + case evt, ok := <-watchInterface.ResultChan(): + // chan error,closed + if !ok { + watchInterface.Stop() + break loopWatch + } + + // assert 为目标类型 + deployment := evt.Object.(*appv1.Deployment) + + // 处理监控的到事件 + switch evt.Type { + case watch.Error: + fmt.Println("Deployment Error.") + case watch.Added: + fmt.Println("Deployment Added. Name:", deployment.Name) + case watch.Modified: + fmt.Println("Deployment Modified. Name:", deployment.Name) + case watch.Deleted: + fmt.Println("Deployment Deleted. Name:", deployment.Name) + } + } + } + +} + +func InformerDeployments() { + // 1. 配置 + config, err := clientcmd.BuildConfigFromFlags("", "../"+common.KubeConfigPath) + if err != nil { + log.Fatalln(err) + } + + // 2. 客户端 + clientSet, err := kubernetes.NewForConfig(config) + if err != nil { + log.Fatalln(err) + } + + // 3. 获取Informer + // 初始化共享Informer工厂 + // client, 同步周期(每几秒,将集群资源信息同步到客户端full全量缓存) + informerFactory := informers.NewSharedInformerFactory(clientSet, 10*time.Second) + // 设置监听,GVR + deploymentInformer := informerFactory.Apps().V1().Deployments() + // 创建Informer + indexInformer := deploymentInformer.Informer() + + // 4. 定义Informer监控事件处理器 + indexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + deployment := obj.(*appv1.Deployment) + fmt.Println("Deployment Added. Name:", deployment.Name) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + oldDeployment := oldObj.(*appv1.Deployment) + newDeployment := newObj.(*appv1.Deployment) + fmt.Printf("Deployment Modified. OldName:%s, NewName:%s\n", oldDeployment.Name, newDeployment.Name) + }, + DeleteFunc: func(obj interface{}) { + deployment := obj.(*appv1.Deployment) + fmt.Println("Deployment Deleted. Name:", deployment.Name) + }, + }) + + // 5. 启动监控 + //设置stop信号的channel + stopCh := make(chan struct{}) + defer close(stopCh) + // 启动工厂中全部的Informer,使用独立的goroutine完成监控 + informerFactory.Start(stopCh) + + // 同步客户端缓存 + informerFactory.WaitForCacheSync(stopCh) + + // 6. 查询列表(可选的) + log.Println("--------------------------------------") + lister := deploymentInformer.Lister() + namespace := "kube-system" + deployments, err := lister.Deployments(namespace).List(labels.Everything()) + if err != nil { + log.Fatalln(err) + } + for _, dp := range deployments { + log.Printf("Name:%s Status:%d/%d\n", dp.Name, dp.Status.ReadyReplicas, dp.Status.Replicas) + } + log.Println("--------------------------------------") + // 保持运行 + select {} +} diff --git a/test/watch_test.go b/test/watch_test.go new file mode 100644 index 0000000..3914b40 --- /dev/null +++ b/test/watch_test.go @@ -0,0 +1,11 @@ +package test + +import "testing" + +func TestWatchDeployments(t *testing.T) { + WatchDeployments() +} + +func TestInformerDeployments(t *testing.T) { + InformerDeployments() +}