You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kubeSourceCodeNote/kubelet/Kubernetes源码学习-Kubelet-P1-启...

28 KiB

前言

在大致分析过k8s的Scheduler、Controller、APIServer三个控制平面组件后本篇开始进入数据交互平面的daemon组件kubelet部分看看kubelet是如何在控制平面和数据平面中以承上启下的模式工作的。

启动流程

启动入口照旧位于项目的cmd路径下使用cobra做cmd封装

cmd/kubelet/kubelet.go:39

func main() {
	rand.Seed(time.Now().UnixNano())

	command := app.NewKubeletCommand(server.SetupSignalHandler())
	logs.InitLogs()
	defer logs.FlushLogs()

	if err := command.Execute(); err != nil {
		fmt.Fprintf(os.Stderr, "%v\n", err)
		os.Exit(1)
	}
}

cmd/kubelet/app/server.go:112

NewKubeletFlagsNewKubeletConfiguration方法会初始化kubelet的很多默认flag和参数来分别看下

cmd/kubelet/app/options/options.go:214

func NewKubeletFlags() *KubeletFlags {
	remoteRuntimeEndpoint := ""
	if runtime.GOOS == "linux" {
		remoteRuntimeEndpoint = "unix:///var/run/dockershim.sock"
	} else if runtime.GOOS == "windows" {
		remoteRuntimeEndpoint = "npipe:////./pipe/dockershim"
	}

	return &KubeletFlags{
		EnableServer:                        true,
    // 容器运行时这个参数需要留意下
		ContainerRuntimeOptions:             *NewContainerRuntimeOptions(),
		CertDirectory:                       "/var/lib/kubelet/pki",
		RootDirectory:                       defaultRootDir,
		MasterServiceNamespace:              metav1.NamespaceDefault,
		MaxContainerCount:                   -1,
		MaxPerPodContainerCount:             1,
		MinimumGCAge:                        metav1.Duration{Duration: 0},
		NonMasqueradeCIDR:                   "10.0.0.0/8",
		RegisterSchedulable:                 true,
		ExperimentalKernelMemcgNotification: false,
		RemoteRuntimeEndpoint:               remoteRuntimeEndpoint,
		NodeLabels:                          make(map[string]string),
		VolumePluginDir:                     "/usr/libexec/kubernetes/kubelet-plugins/volume/exec/",
		RegisterNode:                        true,
		SeccompProfileRoot:                  filepath.Join(defaultRootDir, "seccomp"),
		HostNetworkSources:                  []string{kubetypes.AllSource},
		HostPIDSources:                      []string{kubetypes.AllSource},
		HostIPCSources:                      []string{kubetypes.AllSource},
		// TODO(#58010:v1.13.0): Remove --allow-privileged, it is deprecated
		AllowPrivileged: true,
		// prior to the introduction of this flag, there was a hardcoded cap of 50 images
		NodeStatusMaxImages: 50,
	}
}

ContainerRuntimeOptions有必要看看

cmd/kubelet/app/options/container_runtime.go:41

func NewContainerRuntimeOptions() *config.ContainerRuntimeOptions {
	dockerEndpoint := ""
	if runtime.GOOS != "windows" {
    // 默认的容器驱动是docker
		dockerEndpoint = "unix:///var/run/docker.sock"
	}

	return &config.ContainerRuntimeOptions{
		ContainerRuntime:           kubetypes.DockerContainerRuntime,
		RedirectContainerStreaming: false,
		DockerEndpoint:             dockerEndpoint,
    // dockershim路径dockershim是容器运行中的实际载体每个docker容器都会产生一个shim进程
		DockershimRootDirectory:    "/var/lib/dockershim",
    // pause容器
		PodSandboxImage:            defaultPodSandboxImage,
		ImagePullProgressDeadline:  metav1.Duration{Duration: 1 * time.Minute},
		ExperimentalDockershim:     false,

		// 这个目录下都是网络相关功能工具的执行文件
		CNIBinDir:  "/opt/cni/bin",
    // 这里是cni的配置文件如pod网段、网关、bridge等一般由cni动态生成
		CNIConfDir: "/etc/cni/net.d",
	}
}

cmd/kubelet/app/options/options.go:293

--> cmd/kubelet/app/options/options.go:311

// `NewKubeletConfiguration`方法则会默认设置一些参数
func applyLegacyDefaults(kc *kubeletconfig.KubeletConfiguration) {
	// --anonymous-auth
	kc.Authentication.Anonymous.Enabled = true
	// --authentication-token-webhook
	kc.Authentication.Webhook.Enabled = false
	// --authorization-mode
  // apiserver认证篇提到的针对node设计的AlwaysAllow认证模式
	kc.Authorization.Mode = kubeletconfig.KubeletAuthorizationModeAlwaysAllow
	// 10255采集信息的接口如prometheus采集cadvisor的metrics
	kc.ReadOnlyPort = ports.KubeletReadOnlyPort
}

再来看看Run方法里面做了哪些操作:

--> cmd/kubelet/app/server.go:148

		Run: func(cmd *cobra.Command, args []string) {
      ...
			// 上面百来行代码都是默认的init flag和config相关处理例如featureGates等略过
      
      // 加载kubelet配置文件展开进去看可以看到即是--config参数对应指定的文件一般kubeadm部署时使用的是/var/lib/kubelet/config.yaml
			if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {
				kubeletConfig, err = loadConfigFile(configFile)
				if err != nil {
					klog.Fatal(err)
				}
      ... 
			}
      
			// 实例化KubeletServer
			kubeletServer := &options.KubeletServer{
				KubeletFlags:         *kubeletFlags,
				KubeletConfiguration: *kubeletConfig,
			}

			// 构建一些kubelet的依赖插件例如nsenter连接dockershim的client端
			kubeletDeps, err := UnsecuredDependencies(kubeletServer)
			if err != nil {
				klog.Fatal(err)
			}

			// add the kubelet config controller to kubeletDeps
			kubeletDeps.KubeletConfigController = kubeletConfigController

			// start the experimental docker shim, if enabled
			if kubeletServer.KubeletFlags.ExperimentalDockershim {
				if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
					klog.Fatal(err)
				}
				return
			}

			// 启动kubelet
			klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
			if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {
				klog.Fatal(err)
			}
		},
	}

	// 下面是一些cmd help信息省略
  ...

	return cmd

--> cmd/kubelet/app/server.go:416

--> cmd/kubelet/app/server.go:479 这个函数代码段很长,两百多行,挑主要片段

func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) {
 ...

  // 独立模式指的是不与外部(如apiserver)交互的模式一般在调试中使用所以独立模式不需要起client
	standaloneMode := true
	if len(s.KubeConfig) > 0 {
		standaloneMode = false
	}

	if kubeDeps == nil {
		kubeDeps, err = UnsecuredDependencies(s)
		if err != nil {
			return err
		}
	}

  // 取得注册node名
	hostName, err := nodeutil.GetHostname(s.HostnameOverride)
	if err != nil {
		return err
	}
	nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
	if err != nil {
		return err
	}

	
	switch {
    // 独立模式则所有client设为nil
	case standaloneMode:
		kubeDeps.KubeClient = nil
		kubeDeps.EventClient = nil
		kubeDeps.HeartbeatClient = nil
		klog.Warningf("standalone mode, no API client")
	
   // 正常模式则初始化client包括kubeClient/eventClient/heartBeatClient
	case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
    // client的配置主要是连接apiserver的cert相关的配置cert文件默认放在/var/lib/kubelet/pki下如果开启了循环续期证书则相应的异步进程会从cert manager循环检测和更新证书。其他的配置诸如超时时间长连接时间等。closeAllConns接收的是一个方法用来断开连接。
		clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName)
		if err != nil {
			return err
		}
		if closeAllConns == nil {
			return errors.New("closeAllConns must be a valid function other than nil")
		}
		kubeDeps.OnHeartbeatFailure = closeAllConns
		// 构建一个client-go里的clientset实例访问各个GV和GVR对象使用
		kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
		if err != nil {
			return fmt.Errorf("failed to initialize kubelet client: %v", err)
		}

		// event事件使用独立的client与上面的访问GVR使用的client区分开
		eventClientConfig := *clientConfig
		eventClientConfig.QPS = float32(s.EventRecordQPS)
		eventClientConfig.Burst = int(s.EventBurst)
		kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
		if err != nil {
			return fmt.Errorf("failed to initialize kubelet event client: %v", err)
		}

		// 再开启一个心跳检测的client
		heartbeatClientConfig := *clientConfig
		heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
    // 如果开启了NodeLease(node定期向apiserver汇报运行状态)那么心跳间隔最大不超过NodeLease duration
		if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
			leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
			if heartbeatClientConfig.Timeout > leaseTimeout {
				heartbeatClientConfig.Timeout = leaseTimeout
			}
		}
    // 心跳1次/s
		heartbeatClientConfig.QPS = float32(-1)
		kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
		if err != nil {
			return fmt.Errorf("failed to initialize kubelet heartbeat client: %v", err)
		}
	}
  // 向apiserver发起认证建立会话
	if kubeDeps.Auth == nil {
		auth, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
		if err != nil {
			return err
		}
		kubeDeps.Auth = auth
	}
  // 填充cadvisor接口
	if kubeDeps.CAdvisorInterface == nil {
		imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
		kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))
		if err != nil {
			return err
		}
	}

	// Setup event recorder if required.
	makeEventRecorder(kubeDeps, nodeName)

	if kubeDeps.ContainerManager == nil {
		if s.CgroupsPerQOS && s.CgroupRoot == "" {
			klog.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified.  defaulting to /")
			s.CgroupRoot = "/"
		}
    // /var/lib/kubelt/config.yaml里可以指定为系统和kube组件指定不同的cgroup为它们预留资源
    // kubeReserved即为kube组件指定cgroup预留的资源
		kubeReserved, err := parseResourceList(s.KubeReserved)
		if err != nil {
			return err
		}
    // kubeReserved即为宿主机系统进程指定cgroup预留的资源
		systemReserved, err := parseResourceList(s.SystemReserved)
		if err != nil {
			return err
		}
    // 硬驱逐容器的资源阈值
		var hardEvictionThresholds []evictionapi.Threshold
		// If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
		if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
			hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
			if err != nil {
				return err
			}
		}
		experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
		if err != nil {
			return err
		}

		devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)
    
    // 上面的参数汇集起来,初始化容器管理器
		kubeDeps.ContainerManager, err = cm.NewContainerManager(
			kubeDeps.Mounter,
			kubeDeps.CAdvisorInterface,
			cm.NodeConfig{
				RuntimeCgroupsName:    s.RuntimeCgroups,
				SystemCgroupsName:     s.SystemCgroups,
				KubeletCgroupsName:    s.KubeletCgroups,
				ContainerRuntime:      s.ContainerRuntime,
				CgroupsPerQOS:         s.CgroupsPerQOS,
				CgroupRoot:            s.CgroupRoot,
				CgroupDriver:          s.CgroupDriver,
				KubeletRootDir:        s.RootDirectory,
				ProtectKernelDefaults: s.ProtectKernelDefaults,
				NodeAllocatableConfig: cm.NodeAllocatableConfig{
					KubeReservedCgroupName:   s.KubeReservedCgroup,
					SystemReservedCgroupName: s.SystemReservedCgroup,
					EnforceNodeAllocatable:   sets.NewString(s.EnforceNodeAllocatable...),
					KubeReserved:             kubeReserved,
					SystemReserved:           systemReserved,
					HardEvictionThresholds:   hardEvictionThresholds,
				},
				QOSReserved:                           *experimentalQOSReserved,
				ExperimentalCPUManagerPolicy:          s.CPUManagerPolicy,
				ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
				ExperimentalPodPidsLimit:              s.PodPidsLimit,
				EnforceCPULimits:                      s.CPUCFSQuota,
				CPUCFSQuotaPeriod:                     s.CPUCFSQuotaPeriod.Duration,
			},
			s.FailSwapOn,
			devicePluginEnabled,
			kubeDeps.Recorder)

		if err != nil {
			return err
		}
	}
   
	if err := checkPermissions(); err != nil {
		klog.Error(err)
	}

	utilruntime.ReallyCrash = s.ReallyCrashForTesting

	rand.Seed(time.Now().UnixNano())

	// oom判定器给当前进程设置oom分数容器内存资源管控的手段就是使用的oom这里待会儿拎出来单独分析
	oomAdjuster := kubeDeps.OOMAdjuster
	if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
		klog.Warning(err)
	}
  // RunKubelet接往下文
	if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
		return err
	}
  // 起一个健康检查的http服务
	if s.HealthzPort > 0 {
		healthz.DefaultHealthz()
		go wait.Until(func() {
			err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), nil)
			if err != nil {
				klog.Errorf("Starting health server failed: %v", err)
			}
		}, 5*time.Second, wait.NeverStop)
	}

	if s.RunOnce {
		return nil
	}

	// If systemd is used, notify it that we have started
	go daemon.SdNotify(false, "READY=1")

	select {
	case <-done:
		break
	case <-stopCh:
		break
	}

	return nil
}

OOMAdjuster

--> pkg/util/oom/oom.go:22上面提到的oom判定器这里分析一下这个结构体有三个方法:

// 这里目前用的还是结构体看todo描述是后面要改成interface
// TODO: make this an interface, and inject a mock ioutil struct for testing.
type OOMAdjuster struct {
	pidLister                 func(cgroupName string) ([]int, error)
	ApplyOOMScoreAdj          func(pid int, oomScoreAdj int) error
	ApplyOOMScoreAdjContainer func(cgroupName string, oomScoreAdj, maxTries int) error
}

--> pkg/util/oom/oom_linux.go:35实现方法

func NewOOMAdjuster() *OOMAdjuster {
	oomAdjuster := &OOMAdjuster{
		pidLister:        getPids,
		ApplyOOMScoreAdj: applyOOMScoreAdj,
	}
	oomAdjuster.ApplyOOMScoreAdjContainer = oomAdjuster.applyOOMScoreAdjContainer
	return oomAdjuster
}
// 获取cgroup下所有进程的pid
func getPids(cgroupName string) ([]int, error) {
	return cmutil.GetPids(filepath.Join("/", cgroupName))
}

// 修改oom分数在linux下即是修改/proc/<pid>/oom_score_adj对应的值当内存紧张时由linux系统的oom机制去杀掉oom score最高的进程默认情况下是使用内存越多的进程oom score越高越容易被killapplyOOMScoreAdj函数就是用来修改oom score的。

// Writes 'value' to /proc/<pid>/oom_score_adj. PID = 0 means self
// Returns os.ErrNotExist if the `pid` does not exist.
func applyOOMScoreAdj(pid int, oomScoreAdj int) error {
	if pid < 0 {
		return fmt.Errorf("invalid PID %d specified for oom_score_adj", pid)
	}

	var pidStr string
	if pid == 0 {
		pidStr = "self"
	} else {
		pidStr = strconv.Itoa(pid)
	}

	maxTries := 2
	oomScoreAdjPath := path.Join("/proc", pidStr, "oom_score_adj")
	value := strconv.Itoa(oomScoreAdj)
	klog.V(4).Infof("attempting to set %q to %q", oomScoreAdjPath, value)
	var err error
	for i := 0; i < maxTries; i++ {
		err = ioutil.WriteFile(oomScoreAdjPath, []byte(value), 0700)
		if err != nil {
			if os.IsNotExist(err) {
				klog.V(2).Infof("%q does not exist", oomScoreAdjPath)
				return os.ErrNotExist
			}

			klog.V(3).Info(err)
			time.Sleep(100 * time.Millisecond)
			continue
		}
		return nil
	}
	if err != nil {
		klog.V(2).Infof("failed to set %q to %q: %v", oomScoreAdjPath, value, err)
	}
	return err
}

// 修改整个容器的oom评分即修改某个cgroup下所有进程的评分getPids取得所有pid遍历执行applyOOMScoreAdj
// Writes 'value' to /proc/<pid>/oom_score_adj for all processes in cgroup cgroupName.
// Keeps trying to write until the process list of the cgroup stabilizes, or until maxTries tries.
func (oomAdjuster *OOMAdjuster) applyOOMScoreAdjContainer(cgroupName string, oomScoreAdj, maxTries int) error {
	adjustedProcessSet := make(map[int]bool)
	for i := 0; i < maxTries; i++ {
		continueAdjusting := false
		pidList, err := oomAdjuster.pidLister(cgroupName)
		if err != nil {
			if os.IsNotExist(err) {
				// Nothing to do since the container doesn't exist anymore.
				return os.ErrNotExist
			}
			continueAdjusting = true
			klog.V(10).Infof("Error getting process list for cgroup %s: %+v", cgroupName, err)
		} else if len(pidList) == 0 {
			klog.V(10).Infof("Pid list is empty")
			continueAdjusting = true
		} else {
			for _, pid := range pidList {
				if !adjustedProcessSet[pid] {
					klog.V(10).Infof("pid %d needs to be set", pid)
					if err = oomAdjuster.ApplyOOMScoreAdj(pid, oomScoreAdj); err == nil {
						adjustedProcessSet[pid] = true
					} else if err == os.ErrNotExist {
						continue
					} else {
						klog.V(10).Infof("cannot adjust oom score for pid %d - %v", pid, err)
						continueAdjusting = true
					}
					// Processes can come and go while we try to apply oom score adjust value. So ignore errors here.
				}
			}
		}
		if !continueAdjusting {
			return nil
		}
		// There's a slight race. A process might have forked just before we write its OOM score adjust.
		// The fork might copy the parent process's old OOM score, then this function might execute and
		// update the parent's OOM score, but the forked process id might not be reflected in cgroup.procs
		// for a short amount of time. So this function might return without changing the forked process's
		// OOM score. Very unlikely race, so ignoring this for now.
	}
	return fmt.Errorf("exceeded maxTries, some processes might not have desired OOM score")
}

RunKubelet

--> cmd/kubelet/app/server.go:955 回到主线

func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
	...
  
  // 这里的几个source都是"*"意为接收api/file/http来源的pod更新
	hostNetworkSources, err := kubetypes.GetValidatedSources(kubeServer.HostNetworkSources)
	if err != nil {
		return err
	}

	hostPIDSources, err := kubetypes.GetValidatedSources(kubeServer.HostPIDSources)
	if err != nil {
		return err
	}

	hostIPCSources, err := kubetypes.GetValidatedSources(kubeServer.HostIPCSources)
	if err != nil {
		return err
	}

	privilegedSources := capabilities.PrivilegedSources{
		HostNetworkSources: hostNetworkSources,
		HostPIDSources:     hostPIDSources,
		HostIPCSources:     hostIPCSources,
	}
	capabilities.Setup(kubeServer.AllowPrivileged, privilegedSources, 0)

	credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)
	klog.V(2).Infof("Using root directory: %v", kubeServer.RootDirectory)

	if kubeDeps.OSInterface == nil {
		kubeDeps.OSInterface = kubecontainer.RealOS{}
	}
  
  // kubelet初始化这个函数比较复杂下面拎出来分析
	k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
		kubeDeps,
		&kubeServer.ContainerRuntimeOptions,
		kubeServer.ContainerRuntime,
		kubeServer.RuntimeCgroups,
		kubeServer.HostnameOverride,
		kubeServer.NodeIP,
		kubeServer.ProviderID,
		kubeServer.CloudProvider,
		kubeServer.CertDirectory,
		kubeServer.RootDirectory,
		kubeServer.RegisterNode,
		kubeServer.RegisterWithTaints,
		kubeServer.AllowedUnsafeSysctls,
		kubeServer.RemoteRuntimeEndpoint,
		kubeServer.RemoteImageEndpoint,
		kubeServer.ExperimentalMounterPath,
		kubeServer.ExperimentalKernelMemcgNotification,
		kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
		kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
		kubeServer.MinimumGCAge,
		kubeServer.MaxPerPodContainerCount,
		kubeServer.MaxContainerCount,
		kubeServer.MasterServiceNamespace,
		kubeServer.RegisterSchedulable,
		kubeServer.NonMasqueradeCIDR,
		kubeServer.KeepTerminatedPodVolumes,
		kubeServer.NodeLabels,
		kubeServer.SeccompProfileRoot,
		kubeServer.BootstrapCheckpointPath,
		kubeServer.NodeStatusMaxImages)
	if err != nil {
		return fmt.Errorf("failed to create kubelet: %v", err)
	}

	// NewMainKubelet should have set up a pod source config if one didn't exist
	// when the builder was run. This is just a precaution.
	if kubeDeps.PodConfig == nil {
		return fmt.Errorf("failed to create kubelet, pod source config was nil")
	}
	podCfg := kubeDeps.PodConfig

	rlimit.RlimitNumFiles(uint64(kubeServer.MaxOpenFiles))

	// 只运行一次处理完pod就退出
	if runOnce {
		if _, err := k.RunOnce(podCfg.Updates()); err != nil {
			return fmt.Errorf("runonce failed: %v", err)
		}
		klog.Info("Started kubelet as runonce")
	} else {
    // 正常运行
		startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
		klog.Info("Started kubelet")
	}
	return nil
}

createAndInitKubelet

--> cmd/kubelet/app/server.go:1078这里主要走到NewMainKubelet函数:

--> pkg/kubelet/kubelet.go:326 God这个函数简直了500+行代码...挑重要的说一下吧

func NewMainKubelet(...) (...) {
  ...
  // 加载service informer
  serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
	if kubeDeps.KubeClient != nil {
		serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())
		r := cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0)
		go r.Run(wait.NeverStop)
	}
	serviceLister := corelisters.NewServiceLister(serviceIndexer)
  
  // 加载node informer
	nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
	if kubeDeps.KubeClient != nil {
		fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()
		nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
		r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0)
		go r.Run(wait.NeverStop)
	}
	nodeInfo := &predicates.CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)}

  ...
  
  // secretManager和configMapManager初始化因为这两者被使用都是需要往容器内挂载目录的需要kubelet来参与
  var secretManager secret.Manager
	var configMapManager configmap.Manager
	switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {
	case kubeletconfiginternal.WatchChangeDetectionStrategy:
		secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient)
		configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient)
	case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:
		secretManager = secret.NewCachingSecretManager(
			kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
		configMapManager = configmap.NewCachingConfigMapManager(
			kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
	case kubeletconfiginternal.GetChangeDetectionStrategy:
		secretManager = secret.NewSimpleSecretManager(kubeDeps.KubeClient)
		configMapManager = configmap.NewSimpleConfigMapManager(kubeDeps.KubeClient)
	default:
		return nil, fmt.Errorf("unknown configmap and secret manager mode: %v", kubeCfg.ConfigMapAndSecretChangeDetectionStrategy)
	}

	klet.secretManager = secretManager
	klet.configMapManager = configMapManager

  // 初始化存活探针管理器
  klet.livenessManager = proberesults.NewManager()
  
  //专为dockershim开辟的网络插件集 
	pluginSettings := dockershim.NetworkPluginSettings{
		HairpinMode:        kubeletconfiginternal.HairpinMode(kubeCfg.HairpinMode),
		NonMasqueradeCIDR:  nonMasqueradeCIDR,
		PluginName:         crOptions.NetworkPluginName,
		PluginConfDir:      crOptions.CNIConfDir,  // 默认在/etc/cni/net.d/下存放cni配置文件
		PluginBinDirString: crOptions.CNIBinDir,   // 默认在/opt/cni/bin/下存放cni二进制文件如bridge/tuning/vlan/dhcp/macvlan等等
		MTU:                int(crOptions.NetworkPluginMTU), // 网卡mtu
	}  
  
  // kubelet相关运行时初始化 
	runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
		kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
		klet.livenessManager,
		seccompProfileRoot,
		containerRefManager,
		machineInfo,
		klet,
		kubeDeps.OSInterface,
		klet,
		httpClient,
		imageBackOff,
		kubeCfg.SerializeImagePulls,
		float32(kubeCfg.RegistryPullQPS),
		int(kubeCfg.RegistryBurst),
		kubeCfg.CPUCFSQuota,
		kubeCfg.CPUCFSQuotaPeriod,
		runtimeService,
		imageService,
		kubeDeps.ContainerManager.InternalContainerLifecycle(),
		legacyLogProvider,
		klet.runtimeClassManager,
	)
	if err != nil {
		return nil, err
	}
	klet.containerRuntime = runtime
	klet.streamingRuntime = runtime
	klet.runner = runtime

	runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
	if err != nil {
		return nil, err
	}
	klet.runtimeCache = runtimeCache  
  
  // pleg初始化(Pod Lifecycle Event Generator)
  klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity,      plegRelistPeriod, klet.podCache, clock.RealClock{})
  
  // pod workQueue初始化
  klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
  // pod worker初始化worker从workQueue中取队首根据指令对pod进行相应的直接操作另外还有更新pod cache的操作
	klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)

	klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
	klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)

	// 初始化驱逐管理器
	evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock)

	klet.evictionManager = evictionManager
	klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
  
  ...
}	

再次回到主线进入最后的k.Run()函数循环逻辑:

--> cmd/kubelet/app/server.go:1058

func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
	// start the kubelet
  // 循环执行kubelet的工作逻辑k.Run()方法
	go wait.Until(func() {
		k.Run(podCfg.Updates())
	}, 0, wait.NeverStop)

	// start the kubelet server
  // 提供诸如/metrics /health等api
	if enableServer {
		go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)

	}
  // 配置查询api
	if kubeCfg.ReadOnlyPort > 0 {
		go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
	}
	if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
		go k.ListenAndServePodResources()
	}
}

wait.Until()循环执行函数前面的文章中已经分析过多次了不再过多赘述这里传参是period是0说明是无间隔死循环调用k.Run()方法体现在实际环境中kubelet运行时的表现就是无论运行中遇到什么报错kubelet都会持续工作。

来分析一下k.Run(podCfg.Updates())传的实参是什么:

--> pkg/kubelet/config/config.go:105

func (c *PodConfig) Updates() <-chan kubetypes.PodUpdate {
   return c.updates
}

接着看c.updates --> pkg/kubelet/config/config.go:58

type PodConfig struct {
   pods *podStorage
   mux  *config.Mux

   // the channel of denormalized changes passed to listeners
   updates chan kubetypes.PodUpdate

   // contains the list of all configured sources
   sourcesLock       sync.Mutex
   sources           sets.String
   checkpointManager checkpointmanager.CheckpointManager
}

--> pkg/kubelet/types/pod_update.go:80

type PodUpdate struct {
   Pods   []*v1.Pod
   Op     PodOperation
   Source string
}

猜测是将pod的写(删查改)请求转换成结构体放入chan中然后由k.Run()方法来处理这些写请求k.Run()的实现在这里``pkg/kubelet/kubelet.go:1382`,留作下回分析,本篇启动流程篇到此结束。

小结

kubelet的源码果真是相当的复杂一个函数动辄数百行也难怪毕竟作为daemon端执行数据平面工作的它要承担着很多职责先到这吧