前言
kubernetes 提供三种探针,配置探针(Liveness),就绪探针(Readiness)和启动(Startup)探针判断容器健康状态。其中,存活探针确定什么时候重启容器,就绪探针确定容器何时准备好接受流量请求,启动探针判断应用容器何时启动。
本文通过分析 kubelet 源码了解 kubernetes 的探针是怎么工作的。
kubelet probeManager
kubelet 中的 probeManager 模块提供了探针服务,直接分析 probeManager。
1// kubernetes/pkg/kubelet/kubelet.go
2func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,...) (*Kubelet, error) {
3 ...
4 klet.livenessManager = proberesults.NewManager()
5 klet.readinessManager = proberesults.NewManager()
6 klet.startupManager = proberesults.NewManager()
7
8 ...
9 if kubeDeps.ProbeManager != nil {
10 klet.probeManager = kubeDeps.ProbeManager
11 } else {
12 klet.probeManager = prober.NewManager(
13 klet.statusManager,
14 klet.livenessManager,
15 klet.readinessManager,
16 klet.startupManager,
17 klet.runner,
18 kubeDeps.Recorder)
19 }
20 ...
21}
在 NewMainKubelet 中初始化 probeManager。其中,probeManager 包括三种探针 statusManager,livenessManager 和 readinessManager。
当 kubelet 处理 pod 时,会将 pod 添加到 probeManager:
1// kubernetes/pkg/kubelet/kubelet.go
2func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
3 ...
4 // Ensure the pod is being probed
5 kl.probeManager.AddPod(pod)
6 ...
7}
8
9// kubernetes/pkg/kubelet/prober/prober_manager.go
10func (m *manager) AddPod(pod *v1.Pod) {
11 ...
12 key := probeKey{podUID: pod.UID}
13 for _, c := range append(pod.Spec.Containers, getRestartableInitContainers(pod)...) {
14 key.containerName = c.Name
15
16 if c.StartupProbe != nil {
17 ...
18 }
19
20 if c.ReadinessProbe != nil {
21 key.probeType = readiness
22 if _, ok := m.workers[key]; ok {
23 klog.V(8).ErrorS(nil, "Readiness probe already exists for container",
24 "pod", klog.KObj(pod), "containerName", c.Name)
25 return
26 }
27 w := newWorker(m, readiness, pod, c)
28 m.workers[key] = w
29 go w.run()
30 }
31
32 if c.LivenessProbe != nil {
33 ...
34 }
35 }
36}
在 manager.AddPod 中包含三种探针的处理逻辑,这里以 ReadinessProbe 探针为例进行分析。首先,创建 ReadinessProbe 的 worker,接着开启一个协程运行该 worker:
1// kubernetes/pkg/kubelet/prober/worker.go
2func (w *worker) run() {
3 ...
4probeLoop:
5 // doProbe 进行探针检测
6 for w.doProbe(ctx) {
7 // Wait for next probe tick.
8 select {
9 case <-w.stopCh:
10 break probeLoop
11 case <-probeTicker.C:
12 case <-w.manualTriggerCh:
13 // continue
14 }
15 }
16}
17
18func (w *worker) doProbe(ctx context.Context) (keepGoing bool) {
19 ...
20 // Note, exec probe does NOT have access to pod environment variables or downward API
21 result, err := w.probeManager.prober.probe(ctx, w.probeType, w.pod, status, w.container, w.containerID)
22 if err != nil {
23 // Prober error, throw away the result.
24 return true
25 }
26 ...
27}
进入 worker.probeManager.prober.probe 查看探针是怎么探测 container 的:
1// kubernetes/pkg/kubelet/prober/prober.go
2// probe probes the container.
3func (pb *prober) probe(ctx context.Context, probeType probeType, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (results.Result, error) {
4 var probeSpec *v1.Probe
5 switch probeType {
6 case readiness:
7 probeSpec = container.ReadinessProbe
8 case liveness:
9 probeSpec = container.LivenessProbe
10 case startup:
11 probeSpec = container.StartupProbe
12 default:
13 return results.Failure, fmt.Errorf("unknown probe type: %q", probeType)
14 }
15
16 if probeSpec == nil {
17 klog.InfoS("Probe is nil", "probeType", probeType, "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name)
18 return results.Success, nil
19 }
20
21 result, output, err := pb.runProbeWithRetries(ctx, probeType, probeSpec, pod, status, container, containerID, maxProbeRetries)
22 ...
23}
24
25// runProbeWithRetries tries to probe the container in a finite loop, it returns the last result
26// if it never succeeds.
27func (pb *prober) runProbeWithRetries(ctx context.Context, probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID, retries int) (probe.Result, string, error) {
28 var err error
29 var result probe.Result
30 var output string
31 for i := 0; i < retries; i++ {
32 result, output, err = pb.runProbe(ctx, probeType, p, pod, status, container, containerID)
33 if err == nil {
34 return result, output, nil
35 }
36 }
37 return result, output, err
38}
39
40func (pb *prober) runProbe(ctx context.Context, probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
41 timeout := time.Duration(p.TimeoutSeconds) * time.Second
42 if p.Exec != nil {
43 klog.V(4).InfoS("Exec-Probe runProbe", "pod", klog.KObj(pod), "containerName", container.Name, "execCommand", p.Exec.Command)
44 command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env)
45 return pb.exec.Probe(pb.newExecInContainer(ctx, container, containerID, command, timeout))
46 }
47
48 if p.HTTPGet != nil {
49 req, err := httpprobe.NewRequestForHTTPGetAction(p.HTTPGet, &container, status.PodIP, "probe")
50 ...
51 }
52
53 if p.TCPSocket != nil {
54 ...
55 }
56
57 if p.GRPC != nil {
58 ...
59 }
60 ...
61}
到这里我们可以看到,根据探针的不同类型执行不同的方法,对于用命令行探测的探针,执行 prober.exec.Probe 方法,对于 http 类型的探针,执行 httpprobe.NewRequestForHTTPGetAction 类型的方法,等等。
小结
本文从 kubelet 源码层面介绍了 kubernetes 中探针的检测逻辑,力图做到知其然,知其所以然。