前言

kubelet 是运行在 Kubernetes 节点上的“节点代理”,用来管理节点。

kubelet

kubelet 主要负责所在节点上的资源对象的管理,例如 Pod 资源对象的创建,删除,监控,驱逐及生命周期管理等。

kubelet 源码分析

kubelet 模块

kubelet 包括的模块如下图:

kubelet 模块

从图中可以看出,kubelet 的模块众多,每个模块负责不同的功能。本文将围绕创建 Pod 流程有取舍的介绍 kubelet 各个模块。

在开始流程介绍前,让我们通过 kubelet 工作原理图将各个模块串联起来,这对于我们的源码分析是相当有帮助的。

kubelet 工作原理

kubelet 启动及调试

下载 Kubernetes 源码,配置调试参数:

 1{
 2    "version": "0.2.0",
 3    "configurations": [
 4        {
 5            "name": "Kubelet",
 6            "type": "go",
 7            "request": "launch",
 8            "mode": "auto",
 9            "program": "${fileDirname}",
10            "args": [
11                "--container-runtime-endpoint=/run/k3s/containerd/containerd.sock",
12                "-v=5",
13                "--port=10251",
14                "--kubeconfig=/root/.kube/config",
15            ]
16        }
17    ]
18}

打断点进入 kubelet:

调试 kubelet

kubelet 使用 Cobra 作为应用命令行框架,和 kube-schedulerkube-apiserver 初始化过程类似,其流程如下:

kubelet 初始化示意图

这里,简要给出初始化示例代码:

 1// kubernetes/cmd/kubelet/app/server.go
 2func NewKubeletCommand() *cobra.Command {
 3    // 解析 flags
 4	cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
 5	cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
 6	kubeletFlags := options.NewKubeletFlags()
 7
 8    // 获取 kubelet 配置
 9    kubeletConfig, err := options.NewKubeletConfiguration()
10
11    cmd := &cobra.Command{
12		...
13        RunE: func(cmd *cobra.Command, args []string) error {
14			...
15
16            // 构建 kubeletServer
17            kubeletServer := &options.KubeletServer{
18				KubeletFlags:         *kubeletFlags,
19				KubeletConfiguration: *kubeletConfig,
20			}
21
22            // 构建 kubeletDeps,kubeletDeps 是运行 kubelet 需要的依赖项
23            kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
24
25            ...
26            return Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate)
27        }
28    }
29}

进入 Run 函数运行 kubelet

 1// kubernetes/cmd/kubelet/app/server.go
 2func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {
 3	...
 4	if err := run(ctx, s, kubeDeps, featureGate); err != nil {
 5		return fmt.Errorf("failed to run Kubelet: %w", err)
 6	}
 7	return nil
 8}
 9
10func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
11    ...
12    if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
13		return err
14	}
15    ...
16}

run 函数的内容比较多,我们直接忽略,有重点的看 RunKubelet

 1// kubernetes/cmd/kubelet/app/server.go
 2func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
 3    ...
 4    k, err := createAndInitKubelet(kubeServer,
 5		kubeDeps,
 6		hostname,
 7		hostnameOverridden,
 8		nodeName,
 9		nodeIPs)
10    if err != nil {
11		return fmt.Errorf("failed to create kubelet: %w", err)
12	}
13
14    ...
15    if runOnce {
16		...
17	} else {
18		startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
19		klog.InfoS("Started kubelet")
20	}
21	return nil
22}

这里 createAndInitKubelet 创建 kubelet 对象,该对象在 startKubelet 中运行:

 1// kubernetes/cmd/kubelet/app/server.go
 2func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
 3	// start the kubelet
 4	go k.Run(podCfg.Updates())
 5
 6	// start the kubelet server
 7	if enableServer {
 8		go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, kubeDeps.TracerProvider)
 9	}
10	if kubeCfg.ReadOnlyPort > 0 {
11		go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
12	}
13	go k.ListenAndServePodResources()
14}

startKubelet 调用 kubelet.Run 方法运行 kubelet。我们直接进入 kubelet.Run 方法看其中做了什么。

 1// kubernetes/pkg/kubelet/kubelet.go
 2func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
 3    ...
 4    // 初始化模块是初始化不依赖于 container runtime 的模块
 5    if err := kl.initializeModules(); err != nil {
 6		kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
 7		klog.ErrorS(err, "Failed to initialize internal modules")
 8		os.Exit(1)
 9	}
10
11    ...
12    kl.syncLoop(ctx, updates, kl)
13}

Kubelet.Run 中包括了不少操作,这里还是抓重点看 Kubelet.syncLoop 主逻辑做了什么。

 1// kubernetes/pkg/kubelet/kubelet.go
 2func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
 3	klog.InfoS("Starting kubelet main sync loop")
 4
 5    // syncTicker 每秒检测一次是否有需要同步的 pod workers
 6    syncTicker := time.NewTicker(time.Second)
 7	defer syncTicker.Stop()
 8
 9    // 每两秒检测一次是否有需要清理的 pod
10	housekeepingTicker := time.NewTicker(housekeepingPeriod)
11	defer housekeepingTicker.Stop()
12    ...
13	for {
14		...
15		kl.syncLoopMonitor.Store(kl.clock.Now())
16		if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
17			break
18		}
19		kl.syncLoopMonitor.Store(kl.clock.Now())
20	}
21}
22
23func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
24	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
25	select {
26    case u, open := <-configCh:
27        ...
28        switch u.Op {
29		case kubetypes.ADD:
30			klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
31			// After restarting, kubelet will get all existing pods through
32			// ADD as if they are new pods. These pods will then go through the
33			// admission process and *may* be rejected. This can be resolved
34			// once we have checkpointing.
35			handler.HandlePodAdditions(u.Pods)
36            ...
37        }
38    }
39}

Kubelet.syncLoopIteration 包括多个操作管道的行为,这里仅以 configCh 管道为例,看创建 pod 的行为。

handler.HandlePodAdditions(u.Pods) 这里打断点,然后创建 pod:

1# helm install test .
2NAME: test
3LAST DEPLOYED: Sun May 19 15:34:54 2024
4NAMESPACE: default
5STATUS: deployed

kubelet create pod

1I0519 15:34:54.577769 1801325 kubelet.go:2410] "SyncLoop ADD" source="api" pods=["default/test-6d47479b6b-pphb2"]

进入 handler.HandlePodAdditions

 1func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
 2	...
 3    for _, pod := range pods {
 4        // 获取 podManager 模块中记录的 pods
 5		existingPods := kl.podManager.GetPods()
 6
 7        // 更新 podManager 中的 pod
 8        kl.podManager.AddPod(pod)
 9
10        // 根据 pod 的属性判断当前 pod 是不是 mirrorPod
11        // mirrorPod 是仅受 kubelet 管理的,对 kubernetes 不可见的 pod
12        pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
13        if wasMirror {
14            ...
15        }
16
17        // 判断 pod 是否处于 termination 状态
18        if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
19            activePods := kl.filterOutInactivePods(existingPods)
20            if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
21                ...
22            } else {
23                // 判断 pod 是否可以运行在当前 node
24                if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
25					kl.rejectPod(pod, reason, message)
26					continue
27				}
28            }
29        }
30
31        kl.podWorkers.UpdatePod(UpdatePodOptions{
32			Pod:        pod,
33			MirrorPod:  mirrorPod,
34			UpdateType: kubetypes.SyncPodCreate,
35			StartTime:  start,
36		})
37    }
38}

这里,podManager 模块负责存储和访问 pod 的信息,维持 static pod 和 mirror pods 的关系,podManager 会被 statusManager/volumeManager/runtimeManager 调用,podManger 记录所有被管理的 pod。

继续往下看 podWorkers.UpdatePod

 1# kubernetes/pkg/kubelet/pod_workers.go
 2func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
 3    ...
 4    status, ok := p.podSyncStatuses[uid]
 5    if !ok {
 6        klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
 7        firstTime = true
 8		status = &podSyncStatus{
 9			syncedAt: now,
10			fullname: kubecontainer.BuildPodFullName(name, ns),
11		}
12        ...
13        p.podSyncStatuses[uid] = status
14    }
15
16    ...
17    // 创建一个 pod worker 协程,如果该协程不存在的话
18    podUpdates, exists := p.podUpdates[uid]
19	if !exists {
20        podUpdates = make(chan struct{}, 1)
21		p.podUpdates[uid] = podUpdates
22        ...
23        go func() {
24			defer runtime.HandleCrash()
25			defer klog.V(3).InfoS("Pod worker has stopped", "podUID", uid)
26			p.podWorkerLoop(uid, outCh)
27		}()
28    }
29}
30
31func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{}) {
32    var lastSyncTime time.Time
33    for range podUpdates {
34        // startPodSync 判断 pod 是否可以被同步
35		ctx, update, canStart, canEverStart, ok := p.startPodSync(podUID)
36
37        ...
38        err := func() error {
39            var status *kubecontainer.PodStatus
40            var err error
41            switch {
42			case update.Options.RunningPod != nil:
43            default:
44                status, err = p.podCache.GetNewerThan(update.Options.Pod.UID, lastSyncTime)
45                ...
46            }
47        }
48
49        switch {
50			case update.WorkType == TerminatedPod:
51            ...
52            default:
53				isTerminal, err = p.podSyncer.SyncPod(ctx, update.Options.UpdateType, update.Options.Pod, update.Options.MirrorPod, status)
54			}
55
56            lastSyncTime = p.clock.Now()
57			return err
58		}()
59
60        ...
61    }
62}

这里,要注意的是 podWorkers.podCache.GetNewerThan 获取的是最新的 pod 状态。其中,PLEG 获取 container runtime 的 pod 状态,存入 podCache 中。podCache 中的 pod 状态和 kubeletkube-apiserver 获取的 pod 状态做对比,以获取最新的 pod 状态。

接着,进入 podWorkers.podSyncer.SyncPod 同步 pod:

 1func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
 2    ...
 3	klog.V(4).InfoS("SyncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
 4    ...
 5    // 生成 apiPodStatus 以同步至 statusManager
 6    apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, false)
 7    ...
 8    // 获取 statusManager 中存储的 pod 状态
 9    existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
10    ...
11    // 调用 statusManager 同步 pod 状态
12    kl.statusManager.SetPodStatus(pod, apiPodStatus)
13
14    ...
15    // ensure the kubelet knows about referenced secrets or configmaps used by the pod
16    if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
17		if kl.secretManager != nil {
18			kl.secretManager.RegisterPod(pod)
19		}
20		if kl.configMapManager != nil {
21			kl.configMapManager.RegisterPod(pod)
22		}
23	}
24
25    // 创建 pod container manager
26    pcm := kl.containerManager.NewPodContainerManager()
27    ...
28
29    // Make data directories for the pod
30	if err := kl.makePodDataDirs(pod); err != nil {
31		...
32	}
33
34    // Wait for volumes to attach/mount
35	if err := kl.volumeManager.WaitForAttachAndMount(ctx, pod); err != nil {
36		...
37	}
38
39    // Fetch the pull secrets for the pod
40	pullSecrets := kl.getPullSecretsForPod(pod)
41
42	// Ensure the pod is being probed
43	kl.probeManager.AddPod(pod)
44
45    ...
46    result := kl.containerRuntime.SyncPod(sctx, pod, podStatus, pullSecrets, kl.backOff)
47    ...
48}

Kubelet.SyncPod 首先更新 statusManager 中 pod 的状态信息,接着开始创建 pod 所需要的资源,如 data directoriesvolumessecrets。在调用 container runtime 同步 pod 前,将 pod 添加到 probeManger 模块,以检测 pod 状态。这里关于 probeManger 模块的详细内容可参考。

进入 Kubelet.containerRuntime.SyncPod 查看 container runtime 是怎么同步 pod 的。

 1// kubernetes/pkg/kubelet/kuberuntime/kuberuntime_manager.go
 2func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
 3	// Step 1: Compute sandbox and container changes.
 4	podContainerChanges := m.computePodActions(ctx, pod, podStatus)
 5    ...
 6    // Step 2: Kill the pod if the sandbox has changed.
 7	if podContainerChanges.KillPod {
 8        ...
 9    } else {
10        // Step 3: kill any running containers in this pod which are not to keep.
11        for containerID, containerInfo := range podContainerChanges.ContainersToKill {
12            ...
13        }
14    }
15
16    ...
17    // Step 4: Create a sandbox for the pod if necessary.
18    podSandboxID := podContainerChanges.SandboxID
19	if podContainerChanges.CreateSandbox {
20        ...
21        createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
22		result.AddSyncResult(createSandboxResult)
23        ...
24        podSandboxID, msg, err = m.createPodSandbox(ctx, pod, podContainerChanges.Attempt)
25        if err != nil {
26            ...
27        }
28
29        // 调用 runtime cri 接口查询创建的 pod sandbox 状态
30        resp, err := m.runtimeService.PodSandboxStatus(ctx, podSandboxID, false)
31        ...
32    }
33
34    configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
35	result.AddSyncResult(configPodSandboxResult)
36
37    start := func(ctx context.Context, typeName, metricLabel string, spec *startSpec) error {
38        ...
39        klog.V(4).InfoS("Creating container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))
40        ...
41        if msg, err := m.startContainer(ctx, podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {
42            ...
43        }
44        ...
45    }
46
47    // Step 5: start ephemeral containers
48	for _, idx := range podContainerChanges.EphemeralContainersToStart {
49		start(ctx, "ephemeral container", metrics.EphemeralContainer, ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
50	}
51
52    if !utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {
53        ...
54    } else {
55        // Step 6: start init containers.
56		for _, idx := range podContainerChanges.InitContainersToStart {
57            container := &pod.Spec.InitContainers[idx]
58			// Start the next init container.
59			if err := start(ctx, "init container", metrics.InitContainer, containerStartSpec(container)); err != nil {
60                ...
61            }
62
63            // Successfully started the container; clear the entry in the failure
64			klog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))
65        }
66    }
67
68    // Step 7: For containers in podContainerChanges.ContainersToUpdate[CPU,Memory] list, invoke UpdateContainerResources
69	if isInPlacePodVerticalScalingAllowed(pod) {
70		if len(podContainerChanges.ContainersToUpdate) > 0 || podContainerChanges.UpdatePodResources {
71			m.doPodResizeAction(pod, podStatus, podContainerChanges, result)
72		}
73	}
74
75    // Step 8: start containers in podContainerChanges.ContainersToStart.
76	for _, idx := range podContainerChanges.ContainersToStart {
77		start(ctx, "container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))
78	}
79
80	return

Kubelet.containerRuntime.SyncPod 中通过调用 runtime cri 接口创建 pod sandbox 和 container。以创建 pod sandbox 为例,在 kubeGenericRuntimeManager.createPodSandbox 中调用 kubeGenericRuntimeManager.instrumentedRuntimeService.RunPodSandbox 创建 pod sandbox:

 1func (m *kubeGenericRuntimeManager) createPodSandbox(ctx context.Context, pod *v1.Pod, attempt uint32) (string, string, error) {
 2    podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)
 3    if err != nil {
 4        ...
 5    }
 6
 7    // 创建 pod 的 log 目录
 8    err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
 9    ...
10
11    podSandBoxID, err := m.runtimeService.RunPodSandbox(ctx, podSandboxConfig, runtimeHandler)
12    if err != nil {
13        ...
14    }
15
16    return podSandBoxID, "", nil
17}
18
19func (in instrumentedRuntimeService) RunPodSandbox(ctx context.Context, config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
20	...
21	out, err := in.service.RunPodSandbox(ctx, config, runtimeHandler)
22	...
23	return out, err
24}
25
26func (r *remoteRuntimeService) RunPodSandbox(ctx context.Context, config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
27	...
28	klog.V(10).InfoS("[RemoteRuntimeService] RunPodSandbox", "config", config, "runtimeHandler", runtimeHandler, "timeout", timeout)
29	...
30	resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
31		Config:         config,
32		RuntimeHandler: runtimeHandler,
33	})
34    ...
35}
36
37// kubernetes/vendor/k8s.io/cri-api/pkg/apis/runtime/v1/api.pb.go
38func (c *runtimeServiceClient) RunPodSandbox(ctx context.Context, in *RunPodSandboxRequest, opts ...grpc.CallOption) (*RunPodSandboxResponse, error) {
39	out := new(RunPodSandboxResponse)
40	err := c.cc.Invoke(ctx, "/runtime.v1.RuntimeService/RunPodSandbox", in, out, opts...)
41	if err != nil {
42		return nil, err
43	}
44	return out, nil
45}

可以看到,这里通过调用 cri 接口的 /runtime.v1.RuntimeService/RunPodSandbox 创建 pod sandbox,至于创建 container 也是类似,调用 runtime cri 的接口实现创建 pod 的 container。

小结

本文从 kubelet 源码层面介绍了 pod 创建的流程,后续将重点看 runtime 是如何工作的。