前言
kubelet 是运行在 Kubernetes 节点上的“节点代理”,用来管理节点。

kubelet 主要负责所在节点上的资源对象的管理,例如 Pod 资源对象的创建,删除,监控,驱逐及生命周期管理等。
kubelet 源码分析
kubelet 模块
kubelet 包括的模块如下图:

从图中可以看出,kubelet 的模块众多,每个模块负责不同的功能。本文将围绕创建 Pod 流程有取舍的介绍 kubelet 各个模块。
在开始流程介绍前,让我们通过 kubelet 工作原理图将各个模块串联起来,这对于我们的源码分析是相当有帮助的。

kubelet 启动及调试
下载 Kubernetes 源码,配置调试参数:
1{
2 "version": "0.2.0",
3 "configurations": [
4 {
5 "name": "Kubelet",
6 "type": "go",
7 "request": "launch",
8 "mode": "auto",
9 "program": "${fileDirname}",
10 "args": [
11 "--container-runtime-endpoint=/run/k3s/containerd/containerd.sock",
12 "-v=5",
13 "--port=10251",
14 "--kubeconfig=/root/.kube/config",
15 ]
16 }
17 ]
18}
打断点进入 kubelet:

kubelet 使用 Cobra 作为应用命令行框架,和 kube-scheduler,kube-apiserver 初始化过程类似,其流程如下:

这里,简要给出初始化示例代码:
1// kubernetes/cmd/kubelet/app/server.go
2func NewKubeletCommand() *cobra.Command {
3 // 解析 flags
4 cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
5 cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
6 kubeletFlags := options.NewKubeletFlags()
7
8 // 获取 kubelet 配置
9 kubeletConfig, err := options.NewKubeletConfiguration()
10
11 cmd := &cobra.Command{
12 ...
13 RunE: func(cmd *cobra.Command, args []string) error {
14 ...
15
16 // 构建 kubeletServer
17 kubeletServer := &options.KubeletServer{
18 KubeletFlags: *kubeletFlags,
19 KubeletConfiguration: *kubeletConfig,
20 }
21
22 // 构建 kubeletDeps,kubeletDeps 是运行 kubelet 需要的依赖项
23 kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
24
25 ...
26 return Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate)
27 }
28 }
29}
进入 Run 函数运行 kubelet:
1// kubernetes/cmd/kubelet/app/server.go
2func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {
3 ...
4 if err := run(ctx, s, kubeDeps, featureGate); err != nil {
5 return fmt.Errorf("failed to run Kubelet: %w", err)
6 }
7 return nil
8}
9
10func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
11 ...
12 if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
13 return err
14 }
15 ...
16}
run 函数的内容比较多,我们直接忽略,有重点的看 RunKubelet:
1// kubernetes/cmd/kubelet/app/server.go
2func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
3 ...
4 k, err := createAndInitKubelet(kubeServer,
5 kubeDeps,
6 hostname,
7 hostnameOverridden,
8 nodeName,
9 nodeIPs)
10 if err != nil {
11 return fmt.Errorf("failed to create kubelet: %w", err)
12 }
13
14 ...
15 if runOnce {
16 ...
17 } else {
18 startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
19 klog.InfoS("Started kubelet")
20 }
21 return nil
22}
这里 createAndInitKubelet 创建 kubelet 对象,该对象在 startKubelet 中运行:
1// kubernetes/cmd/kubelet/app/server.go
2func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
3 // start the kubelet
4 go k.Run(podCfg.Updates())
5
6 // start the kubelet server
7 if enableServer {
8 go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, kubeDeps.TracerProvider)
9 }
10 if kubeCfg.ReadOnlyPort > 0 {
11 go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
12 }
13 go k.ListenAndServePodResources()
14}
startKubelet 调用 kubelet.Run 方法运行 kubelet。我们直接进入 kubelet.Run 方法看其中做了什么。
1// kubernetes/pkg/kubelet/kubelet.go
2func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
3 ...
4 // 初始化模块是初始化不依赖于 container runtime 的模块
5 if err := kl.initializeModules(); err != nil {
6 kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
7 klog.ErrorS(err, "Failed to initialize internal modules")
8 os.Exit(1)
9 }
10
11 ...
12 kl.syncLoop(ctx, updates, kl)
13}
Kubelet.Run 中包括了不少操作,这里还是抓重点看 Kubelet.syncLoop 主逻辑做了什么。
1// kubernetes/pkg/kubelet/kubelet.go
2func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
3 klog.InfoS("Starting kubelet main sync loop")
4
5 // syncTicker 每秒检测一次是否有需要同步的 pod workers
6 syncTicker := time.NewTicker(time.Second)
7 defer syncTicker.Stop()
8
9 // 每两秒检测一次是否有需要清理的 pod
10 housekeepingTicker := time.NewTicker(housekeepingPeriod)
11 defer housekeepingTicker.Stop()
12 ...
13 for {
14 ...
15 kl.syncLoopMonitor.Store(kl.clock.Now())
16 if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
17 break
18 }
19 kl.syncLoopMonitor.Store(kl.clock.Now())
20 }
21}
22
23func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
24 syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
25 select {
26 case u, open := <-configCh:
27 ...
28 switch u.Op {
29 case kubetypes.ADD:
30 klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
31 // After restarting, kubelet will get all existing pods through
32 // ADD as if they are new pods. These pods will then go through the
33 // admission process and *may* be rejected. This can be resolved
34 // once we have checkpointing.
35 handler.HandlePodAdditions(u.Pods)
36 ...
37 }
38 }
39}
Kubelet.syncLoopIteration 包括多个操作管道的行为,这里仅以 configCh 管道为例,看创建 pod 的行为。
在 handler.HandlePodAdditions(u.Pods) 这里打断点,然后创建 pod:
1# helm install test .
2NAME: test
3LAST DEPLOYED: Sun May 19 15:34:54 2024
4NAMESPACE: default
5STATUS: deployed

1I0519 15:34:54.577769 1801325 kubelet.go:2410] "SyncLoop ADD" source="api" pods=["default/test-6d47479b6b-pphb2"]
进入 handler.HandlePodAdditions
1func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
2 ...
3 for _, pod := range pods {
4 // 获取 podManager 模块中记录的 pods
5 existingPods := kl.podManager.GetPods()
6
7 // 更新 podManager 中的 pod
8 kl.podManager.AddPod(pod)
9
10 // 根据 pod 的属性判断当前 pod 是不是 mirrorPod
11 // mirrorPod 是仅受 kubelet 管理的,对 kubernetes 不可见的 pod
12 pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
13 if wasMirror {
14 ...
15 }
16
17 // 判断 pod 是否处于 termination 状态
18 if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
19 activePods := kl.filterOutInactivePods(existingPods)
20 if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
21 ...
22 } else {
23 // 判断 pod 是否可以运行在当前 node
24 if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
25 kl.rejectPod(pod, reason, message)
26 continue
27 }
28 }
29 }
30
31 kl.podWorkers.UpdatePod(UpdatePodOptions{
32 Pod: pod,
33 MirrorPod: mirrorPod,
34 UpdateType: kubetypes.SyncPodCreate,
35 StartTime: start,
36 })
37 }
38}
这里,podManager 模块负责存储和访问 pod 的信息,维持 static pod 和 mirror pods 的关系,podManager 会被 statusManager/volumeManager/runtimeManager 调用,podManger 记录所有被管理的 pod。
继续往下看 podWorkers.UpdatePod:
1# kubernetes/pkg/kubelet/pod_workers.go
2func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
3 ...
4 status, ok := p.podSyncStatuses[uid]
5 if !ok {
6 klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
7 firstTime = true
8 status = &podSyncStatus{
9 syncedAt: now,
10 fullname: kubecontainer.BuildPodFullName(name, ns),
11 }
12 ...
13 p.podSyncStatuses[uid] = status
14 }
15
16 ...
17 // 创建一个 pod worker 协程,如果该协程不存在的话
18 podUpdates, exists := p.podUpdates[uid]
19 if !exists {
20 podUpdates = make(chan struct{}, 1)
21 p.podUpdates[uid] = podUpdates
22 ...
23 go func() {
24 defer runtime.HandleCrash()
25 defer klog.V(3).InfoS("Pod worker has stopped", "podUID", uid)
26 p.podWorkerLoop(uid, outCh)
27 }()
28 }
29}
30
31func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{}) {
32 var lastSyncTime time.Time
33 for range podUpdates {
34 // startPodSync 判断 pod 是否可以被同步
35 ctx, update, canStart, canEverStart, ok := p.startPodSync(podUID)
36
37 ...
38 err := func() error {
39 var status *kubecontainer.PodStatus
40 var err error
41 switch {
42 case update.Options.RunningPod != nil:
43 default:
44 status, err = p.podCache.GetNewerThan(update.Options.Pod.UID, lastSyncTime)
45 ...
46 }
47 }
48
49 switch {
50 case update.WorkType == TerminatedPod:
51 ...
52 default:
53 isTerminal, err = p.podSyncer.SyncPod(ctx, update.Options.UpdateType, update.Options.Pod, update.Options.MirrorPod, status)
54 }
55
56 lastSyncTime = p.clock.Now()
57 return err
58 }()
59
60 ...
61 }
62}
这里,要注意的是 podWorkers.podCache.GetNewerThan 获取的是最新的 pod 状态。其中,PLEG 获取 container runtime 的 pod 状态,存入 podCache 中。podCache 中的 pod 状态和 kubelet 从 kube-apiserver 获取的 pod 状态做对比,以获取最新的 pod 状态。
接着,进入 podWorkers.podSyncer.SyncPod 同步 pod:
1func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
2 ...
3 klog.V(4).InfoS("SyncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
4 ...
5 // 生成 apiPodStatus 以同步至 statusManager
6 apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, false)
7 ...
8 // 获取 statusManager 中存储的 pod 状态
9 existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
10 ...
11 // 调用 statusManager 同步 pod 状态
12 kl.statusManager.SetPodStatus(pod, apiPodStatus)
13
14 ...
15 // ensure the kubelet knows about referenced secrets or configmaps used by the pod
16 if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
17 if kl.secretManager != nil {
18 kl.secretManager.RegisterPod(pod)
19 }
20 if kl.configMapManager != nil {
21 kl.configMapManager.RegisterPod(pod)
22 }
23 }
24
25 // 创建 pod container manager
26 pcm := kl.containerManager.NewPodContainerManager()
27 ...
28
29 // Make data directories for the pod
30 if err := kl.makePodDataDirs(pod); err != nil {
31 ...
32 }
33
34 // Wait for volumes to attach/mount
35 if err := kl.volumeManager.WaitForAttachAndMount(ctx, pod); err != nil {
36 ...
37 }
38
39 // Fetch the pull secrets for the pod
40 pullSecrets := kl.getPullSecretsForPod(pod)
41
42 // Ensure the pod is being probed
43 kl.probeManager.AddPod(pod)
44
45 ...
46 result := kl.containerRuntime.SyncPod(sctx, pod, podStatus, pullSecrets, kl.backOff)
47 ...
48}
Kubelet.SyncPod 首先更新 statusManager 中 pod 的状态信息,接着开始创建 pod 所需要的资源,如 data directories,volumes,secrets。在调用 container runtime 同步 pod 前,将 pod 添加到 probeManger 模块,以检测 pod 状态。这里关于 probeManger 模块的详细内容可参考。
进入 Kubelet.containerRuntime.SyncPod 查看 container runtime 是怎么同步 pod 的。
1// kubernetes/pkg/kubelet/kuberuntime/kuberuntime_manager.go
2func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
3 // Step 1: Compute sandbox and container changes.
4 podContainerChanges := m.computePodActions(ctx, pod, podStatus)
5 ...
6 // Step 2: Kill the pod if the sandbox has changed.
7 if podContainerChanges.KillPod {
8 ...
9 } else {
10 // Step 3: kill any running containers in this pod which are not to keep.
11 for containerID, containerInfo := range podContainerChanges.ContainersToKill {
12 ...
13 }
14 }
15
16 ...
17 // Step 4: Create a sandbox for the pod if necessary.
18 podSandboxID := podContainerChanges.SandboxID
19 if podContainerChanges.CreateSandbox {
20 ...
21 createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
22 result.AddSyncResult(createSandboxResult)
23 ...
24 podSandboxID, msg, err = m.createPodSandbox(ctx, pod, podContainerChanges.Attempt)
25 if err != nil {
26 ...
27 }
28
29 // 调用 runtime cri 接口查询创建的 pod sandbox 状态
30 resp, err := m.runtimeService.PodSandboxStatus(ctx, podSandboxID, false)
31 ...
32 }
33
34 configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
35 result.AddSyncResult(configPodSandboxResult)
36
37 start := func(ctx context.Context, typeName, metricLabel string, spec *startSpec) error {
38 ...
39 klog.V(4).InfoS("Creating container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))
40 ...
41 if msg, err := m.startContainer(ctx, podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {
42 ...
43 }
44 ...
45 }
46
47 // Step 5: start ephemeral containers
48 for _, idx := range podContainerChanges.EphemeralContainersToStart {
49 start(ctx, "ephemeral container", metrics.EphemeralContainer, ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
50 }
51
52 if !utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {
53 ...
54 } else {
55 // Step 6: start init containers.
56 for _, idx := range podContainerChanges.InitContainersToStart {
57 container := &pod.Spec.InitContainers[idx]
58 // Start the next init container.
59 if err := start(ctx, "init container", metrics.InitContainer, containerStartSpec(container)); err != nil {
60 ...
61 }
62
63 // Successfully started the container; clear the entry in the failure
64 klog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))
65 }
66 }
67
68 // Step 7: For containers in podContainerChanges.ContainersToUpdate[CPU,Memory] list, invoke UpdateContainerResources
69 if isInPlacePodVerticalScalingAllowed(pod) {
70 if len(podContainerChanges.ContainersToUpdate) > 0 || podContainerChanges.UpdatePodResources {
71 m.doPodResizeAction(pod, podStatus, podContainerChanges, result)
72 }
73 }
74
75 // Step 8: start containers in podContainerChanges.ContainersToStart.
76 for _, idx := range podContainerChanges.ContainersToStart {
77 start(ctx, "container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))
78 }
79
80 return
在 Kubelet.containerRuntime.SyncPod 中通过调用 runtime cri 接口创建 pod sandbox 和 container。以创建 pod sandbox 为例,在 kubeGenericRuntimeManager.createPodSandbox 中调用 kubeGenericRuntimeManager.instrumentedRuntimeService.RunPodSandbox 创建 pod sandbox:
1func (m *kubeGenericRuntimeManager) createPodSandbox(ctx context.Context, pod *v1.Pod, attempt uint32) (string, string, error) {
2 podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)
3 if err != nil {
4 ...
5 }
6
7 // 创建 pod 的 log 目录
8 err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
9 ...
10
11 podSandBoxID, err := m.runtimeService.RunPodSandbox(ctx, podSandboxConfig, runtimeHandler)
12 if err != nil {
13 ...
14 }
15
16 return podSandBoxID, "", nil
17}
18
19func (in instrumentedRuntimeService) RunPodSandbox(ctx context.Context, config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
20 ...
21 out, err := in.service.RunPodSandbox(ctx, config, runtimeHandler)
22 ...
23 return out, err
24}
25
26func (r *remoteRuntimeService) RunPodSandbox(ctx context.Context, config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
27 ...
28 klog.V(10).InfoS("[RemoteRuntimeService] RunPodSandbox", "config", config, "runtimeHandler", runtimeHandler, "timeout", timeout)
29 ...
30 resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
31 Config: config,
32 RuntimeHandler: runtimeHandler,
33 })
34 ...
35}
36
37// kubernetes/vendor/k8s.io/cri-api/pkg/apis/runtime/v1/api.pb.go
38func (c *runtimeServiceClient) RunPodSandbox(ctx context.Context, in *RunPodSandboxRequest, opts ...grpc.CallOption) (*RunPodSandboxResponse, error) {
39 out := new(RunPodSandboxResponse)
40 err := c.cc.Invoke(ctx, "/runtime.v1.RuntimeService/RunPodSandbox", in, out, opts...)
41 if err != nil {
42 return nil, err
43 }
44 return out, nil
45}
可以看到,这里通过调用 cri 接口的 /runtime.v1.RuntimeService/RunPodSandbox 创建 pod sandbox,至于创建 container 也是类似,调用 runtime cri 的接口实现创建 pod 的 container。
小结
本文从 kubelet 源码层面介绍了 pod 创建的流程,后续将重点看 runtime 是如何工作的。