前言

containerd 是一个行业标准的容器运行时,其强调简单性、健壮性和可移植性。本文将从 containerd 的代码结构入手,查看 containerd 的启动注册流程。

启动注册流程

containerd

首先以调试模式运行 containerd

 1// containerd/cmd/containerd/main.go
 2package main
 3
 4import (
 5	...
 6	_ "github.com/containerd/containerd/v2/cmd/containerd/builtins"
 7)
 8
 9...
10func main() {
11	app := command.App()
12	if err := app.Run(os.Args); err != nil {
13		fmt.Fprintf(os.Stderr, "containerd: %s\n", err)
14		os.Exit(1)
15	}
16}

在启动 containerd 时,导入匿名包 github.com/containerd/containerd/v2/cmd/containerd/builtins 注册插件。

接着,进入 command.App():

 1// containerd/cmd/containerd/server/server.go
 2func App() *cli.App {
 3    app := cli.NewApp()
 4	app.Name = "containerd"
 5    ...
 6
 7    app.Action = func(context *cli.Context) error {
 8		...
 9        go func() {
10			defer close(chsrv)
11
12			server, err := server.New(ctx, config)
13			if err != nil {
14				select {
15				case chsrv <- srvResp{err: err}:
16				case <-ctx.Done():
17				}
18				return
19			}
20			...
21		}()
22        ...
23    }
24}

这里省略了一系列初始化过程,重点在 server.New(ctx, config)

 1// containerd/cmd/containerd/server/server.go
 2func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
 3    ...
 4    // 将插件加载到 loaded 中
 5    loaded, err := LoadPlugins(ctx, config)
 6    if err != nil {
 7		return nil, err
 8	}
 9    ...
10    serverOpts := []grpc.ServerOption{
11		grpc.StatsHandler(otelgrpc.NewServerHandler()),
12		grpc.ChainStreamInterceptor(
13			streamNamespaceInterceptor,
14			prometheusServerMetrics.StreamServerInterceptor(),
15		),
16		grpc.ChainUnaryInterceptor(
17			unaryNamespaceInterceptor,
18			prometheusServerMetrics.UnaryServerInterceptor(),
19		),
20	}
21    ...
22    var (
23		grpcServer = grpc.NewServer(serverOpts...)
24		tcpServer  = grpc.NewServer(tcpServerOpts...)
25
26        grpcServices  []grpcService
27		tcpServices   []tcpService
28		ttrpcServices []ttrpcService
29
30		s = &Server{
31			prometheusServerMetrics: prometheusServerMetrics,
32			grpcServer:              grpcServer,
33			tcpServer:               tcpServer,
34			ttrpcServer:             ttrpcServer,
35			config:                  config,
36		}
37        ...
38    )
39    ...
40    // 遍历插件
41    for _, p := range loaded {
42        ...
43        result := p.Init(initContext)
44        if err := initialized.Add(result); err != nil {
45			return nil, fmt.Errorf("could not add plugin result to plugin set: %w", err)
46		}
47
48		instance, err := result.Instance()
49        ...
50        if src, ok := instance.(grpcService); ok {
51			grpcServices = append(grpcServices, src)
52		}
53		if src, ok := instance.(ttrpcService); ok {
54			ttrpcServices = append(ttrpcServices, src)
55		}
56		if service, ok := instance.(tcpService); ok {
57			tcpServices = append(tcpServices, service)
58		}
59        ...
60    }
61
62    // 注册插件服务
63	for _, service := range grpcServices {
64		if err := service.Register(grpcServer); err != nil {
65			return nil, err
66		}
67	}
68	for _, service := range ttrpcServices {
69		if err := service.RegisterTTRPC(ttrpcServer); err != nil {
70			return nil, err
71		}
72	}
73	for _, service := range tcpServices {
74		if err := service.RegisterTCP(tcpServer); err != nil {
75			return nil, err
76		}
77	}
78    ...
79}

server.Newcontainerd 运行的主逻辑。

首先,将注册的插件加载到 loaded,接着遍历 loaded。通过 result := p.Init(initContext) 获取插件的实例。
io.containerd.grpc.v1.containers 插件为例,查看 p.Init 是如何获取插件对象的。

 1// containerd/vendor/github.com/containerd/plugin/plugin.go
 2func (r Registration) Init(ic *InitContext) *Plugin {
 3    // 调用注册插件的 InitFn 函数
 4	p, err := r.InitFn(ic)
 5	return &Plugin{
 6		Registration: r,
 7		Config:       ic.Config,
 8		Meta:         *ic.Meta,
 9		instance:     p,
10		err:          err,
11	}
12}
13
14// containerd/plugins/services/containers/service.go
15func init() {
16	registry.Register(&plugin.Registration{
17		Type: plugins.GRPCPlugin,
18		ID:   "containers",
19		Requires: []plugin.Type{
20			plugins.ServicePlugin,
21		},
22        // 执行 InitFn 返回 service 对象
23		InitFn: func(ic *plugin.InitContext) (interface{}, error) {
24			i, err := ic.GetByID(plugins.ServicePlugin, services.ContainersService)
25			if err != nil {
26				return nil, err
27			}
28			return &service{local: i.(api.ContainersClient)}, nil
29		},
30	})
31}

获取到插件实例后,根据插件类型注册插件实例以提供对应的(grpc/ttrpc/tcp)服务。

注册插件

注册插件是通过 init 机制实现的。在 main 中导入 github.com/containerd/containerd/v2/cmd/containerd/builtins 包。

builtins 包导入包含 init 的插件包实现插件注册。以 cri 插件为例:

 1// containerd/cmd/containerd/builtins/cri.go
 2package builtins
 3
 4import (
 5	_ "github.com/containerd/containerd/v2/plugins/cri"
 6	...
 7)
 8
 9// containerd/plugins/cri/cri.go
10package cri
11
12...
13// Register CRI service plugin
14func init() {
15	defaultConfig := criconfig.DefaultServerConfig()
16	registry.Register(&plugin.Registration{
17		Type: plugins.GRPCPlugin,
18		ID:   "cri",
19		Requires: []plugin.Type{
20			...
21		},
22		Config: &defaultConfig,
23		ConfigMigration: func(ctx context.Context, configVersion int, pluginConfigs map[string]interface{}) error {
24			...
25		},
26		InitFn: initCRIService,
27	})
28}

init 中通过 registry.Register 注册插件:

 1package registry
 2...
 3var register = struct {
 4	sync.RWMutex
 5	r plugin.Registry
 6}{}
 7
 8// Register allows plugins to register
 9func Register(r *plugin.Registration) {
10	register.Lock()
11	defer register.Unlock()
12	register.r = register.r.Register(r)
13}

可以看到插件注册的过程实际是将插件结构体 plugin.Registration 注册到 register.plugin.Registry 的过程。

register.plugin.Registry 实际是一个包含 Registration 的切片。

1package plugin
2
3type Registry []*Registration

查看插件

使用 ctr 查看 containerd 注册的插件,ctrcontainerd 官方提供的命令行工具。如下:

1# ctr plugins ls
2TYPE                                   ID                       PLATFORMS      STATUS
3io.containerd.image-verifier.v1        bindir                   -              ok
4io.containerd.internal.v1              opt                      -              ok
5...

小结

本文主要介绍了 containerd 的启动注册插件流程。当然,插件的类型众多,插件是如何工作的,插件之间如何交互,kubernetes 又是怎么和 containerd 交互的,这些会在下文中继续介绍。