前言
Kubernetes:kubelet 源码分析之创建 pod 流程 介绍了 kubelet 创建 pod 的流程,其中介绍了 kubelet 调用 runtime cri 接口创建 pod。containerd 源码分析:启动注册流程 介绍了 containerd 作为一种行业标准的高级运行时的启动注册流程。那么,kubelet 是怎么和 containerd 交互的呢? 本文会带着这个问题分析 kubelet 和 containerd 的交互。
kubelet 和 containerd 交互
kubelet
如 Kubernetes:kubelet 源码分析之创建 pod 流程 分析,kubelet 调用 runtime cri 接口 /runtime.v1.RuntimeService/RunPodSandbox 创建 pod:
1// kubernetes/vendor/k8s.io/cri-api/pkg/apis/runtime/v1/api.pb.go
2func (c *runtimeServiceClient) RunPodSandbox(ctx context.Context, in *RunPodSandboxRequest, opts ...grpc.CallOption) (*RunPodSandboxResponse, error) {
3 out := new(RunPodSandboxResponse)
4 err := c.cc.Invoke(ctx, "/runtime.v1.RuntimeService/RunPodSandbox", in, out, opts...)
5 if err != nil {
6 return nil, err
7 }
8 return out, nil
9}
kubelet 是 runtime cri 接口调用的客户端,那么容器运行时作为服务端是怎么提供服务的呢?
kubelet 和 containerd 交互流程
在介绍容器运行时提供的服务之前先看下 cri 架构图。

从图中可以看出,containerd 的 CRI 插件提供 image service 和 runtime service,负责对接 kubelet runtime cri 的接口调用,并将调用转发给 containerd。
继续,查看 containerd 的处理流程。
containerd
CRI Plugin
根据 cri 架构图, 从 CRI 插件入手查看 id 为 io.containerd.grpc.v1.cri 的 CRI 插件。
1// containerd/plugins/cri/cri.go
2func initCRIService(ic *plugin.InitContext) (interface{}, error) {
3 ...
4 // Get runtime service.
5 criRuntimePlugin, err := ic.GetByID(plugins.CRIServicePlugin, "runtime")
6 if err != nil {
7 return nil, fmt.Errorf("unable to load CRI runtime service plugin dependency: %w", err)
8 }
9
10 // Get image service.
11 criImagePlugin, err := ic.GetByID(plugins.CRIServicePlugin, "images")
12 if err != nil {
13 return nil, fmt.Errorf("unable to load CRI image service plugin dependency: %w", err)
14 }
15 ...
16 service := &criGRPCServer{
17 RuntimeServiceServer: rs,
18 ImageServiceServer: is,
19 Closer: s, // TODO: Where is close run?
20 initializer: s,
21 }
22
23 if config.DisableTCPService {
24 return service, nil
25 }
26
27 return criGRPCServerWithTCP{service}, nil
28}
插件返回的是 criGRPCServerWithTCP 对象。其中,包括 criGRPCServer 对象。criGRPCServer 对象实现了 grpcService 接口,将调用接口的 Register 注册对象到 grpc server。
1// containerd/plugins/cri/cri.go
2// Register registers all required services onto a specific grpc server.
3// This is used by containerd cri plugin.
4func (c *criGRPCServer) Register(s *grpc.Server) error {
5 return c.register(s)
6}
7
8func (c *criGRPCServer) register(s *grpc.Server) error {
9 instrumented := instrument.NewService(c)
10 runtime.RegisterRuntimeServiceServer(s, instrumented)
11 runtime.RegisterImageServiceServer(s, instrumented)
12 return nil
13}
在 criGRPCServer.register 中创建 instrumentedService 对象。
1type instrumentedService struct {
2 c criService
3}
4
5func NewService(c criService) GRPCServices {
6 return &instrumentedService{c: c}
7}
instrumentedService 包括 criService 对象。实际提供 runtime service 和 image service 的就是 criService 对象。
以注册 runtime service 为例,查看 runtime.RegisterRuntimeServiceServer(s, instrumented) 做了什么。
1// containerd/vendor/k8s.io/cri-api/pkg/apis/runtime/v1/api.pb.go
2func RegisterRuntimeServiceServer(s *grpc.Server, srv RuntimeServiceServer) {
3 s.RegisterService(&_RuntimeService_serviceDesc, srv)
4}
5
6var _RuntimeService_serviceDesc = grpc.ServiceDesc{
7 ServiceName: "runtime.v1.RuntimeService",
8 HandlerType: (*RuntimeServiceServer)(nil),
9 Methods: []grpc.MethodDesc{
10 {
11 MethodName: "Version",
12 Handler: _RuntimeService_Version_Handler,
13 },
14 {
15 MethodName: "RunPodSandbox",
16 Handler: _RuntimeService_RunPodSandbox_Handler,
17 },
18 ...
19 },
20 ...
21}
可以看到,注册 instrumentedService 到 grpc 中,instrumentedService 提供 runtime.v1.RuntimeService 服务,包括 kubelet 调用的 RunPodSandbox 方法。
继续看,instrumentedService 的 RunPodSandbox 做了什么。
1// containerd/internal/cri/instrumented_service.go
2func (in *instrumentedService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (res *runtime.RunPodSandboxResponse, err error) {
3 ...
4 res, err = in.c.RunPodSandbox(ctrdutil.WithNamespace(ctx), r)
5 return res, errdefs.ToGRPC(err)
6}
instrumentedService 调用 criGRPCServer 的 RunPodSandbox 方法,实际执行的是 criGRPCServer 中的 criServer 对象:
1// containerd/internal/cri/server/sandbox_run.go
2func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (_ *runtime.RunPodSandboxResponse, retErr error) {
3 ...
4 if err := c.sandboxService.CreateSandbox(ctx, sandboxInfo, sb.WithOptions(config), sb.WithNetNSPath(sandbox.NetNSPath)); err != nil {
5 return nil, fmt.Errorf("failed to create sandbox %q: %w", id, err)
6 }
7
8 ctrl, err := c.sandboxService.StartSandbox(ctx, sandbox.Sandboxer, id)
9 if err != nil {
10 ...
11 }
12 ...
13}
criService.RunPodSandbox 调用的是 sandboxService 的 CreateSandbox 和 StartSandbox 方法。
sandbox Plugin
sandboxService 在 cri.initCRIService 中实例化:
1// containerd/plugins/cri/cri.go
2func initCRIService(ic *plugin.InitContext) (interface{}, error) {
3 ...
4 sbControllers, err := getSandboxControllers(ic)
5 if err != nil {
6 return nil, fmt.Errorf("failed to get sandbox controllers from plugins %v", err)
7 }
8 ...
9 options := &server.CRIServiceOptions{
10 RuntimeService: criRuntimePlugin.(server.RuntimeService),
11 ImageService: criImagePlugin.(server.ImageService),
12 StreamingConfig: streamingConfig,
13 NRI: getNRIAPI(ic),
14 Client: client,
15 SandboxControllers: sbControllers,
16 }
17 ...
18 s, rs, err := server.NewCRIService(options)
19 ...
20 service := &criGRPCServer{
21 RuntimeServiceServer: rs,
22 ImageServiceServer: is,
23 Closer: s, // TODO: Where is close run?
24 initializer: s,
25 }
26}
首先,getSandboxControllers 获得 sandbox controllers:
1// containerd/plugins/cri/cri.go
2func getSandboxControllers(ic *plugin.InitContext) (map[string]sandbox.Controller, error) {
3 // plugins.SandboxControllerPlugin: "io.containerd.sandbox.controller.v1"
4 sandboxers, err := ic.GetByType(plugins.SandboxControllerPlugin)
5 if err != nil {
6 return nil, err
7 }
8 ...
9 return sc, nil
10}
sandbox.Controller 是类型为 io.containerd.sandbox.controller.v1 的插件对象。将该对象作为 options 赋给 criServer:
1// containerd/internal/cri/server/service.go
2func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServiceServer, error) {
3 ...
4 c := &criService{
5 ...
6 sandboxService: newCriSandboxService(&config, options.SandboxControllers),
7 }
8 ...
9}
10
11func newCriSandboxService(config *criconfig.Config, sandboxers map[string]sandbox.Controller) *criSandboxService {
12 return &criSandboxService{
13 sandboxControllers: sandboxers,
14 config: config,
15 }
16}
criService.sandboxService.CreateSandbox 调用的是插件对象 sanbox controllers 的 CreateSandbox 方法,该方法最终调用的是 sandboxClient 的 CreateSandbox:
1func (c *sandboxClient) CreateSandbox(ctx context.Context, in *CreateSandboxRequest, opts ...grpc.CallOption) (*CreateSandboxResponse, error) {
2 out := new(CreateSandboxResponse)
3 err := c.cc.Invoke(ctx, "/containerd.runtime.sandbox.v1.Sandbox/CreateSandbox", in, out, opts...)
4 if err != nil {
5 return nil, err
6 }
7 return out, nil
8}
可以看到,在 sandboxClient.CreateSandbox 中调用 containerd 提供的 containerd cri 接口 /containerd.runtime.sandbox.v1.Sandbox/CreateSandbox,该接口用来创建 sandbox,即 pod。
创建 pod 流程
根据上述分析,这里画出 kubelet 到 containerd 的交互流程图如下:

小结
本文在前文 Kubernetes:kubelet 源码分析之创建 pod 流程 和 containerd 源码分析:启动注册流程 的基础上,进一步分析从 kubelet 到 containerd 的交互流程,打通了 kubelet 到 containerd 这一步。