文接 containerd 源码分析:创建 container(一)
创建容器进程
创建 container 成功后,接着创建 task, task 将根据 container metadata 创建容器进程。
创建 task
进入 tasks.Newtask 创建 task:
1// containerd/cmd/ctr/commands/tasks/tasks_unix.go
2func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, checkpoint string, con console.Console, nullIO bool, logURI string, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) {
3 ...
4 t, err := container.NewTask(ctx, ioCreator, opts...)
5 if err != nil {
6 return nil, err
7 }
8 ...
9}
10
11// containerd/client/container.go
12func (c *container) NewTask(ctx context.Context, ioCreate cio.Creator, opts ...NewTaskOpts) (_ Task, err error) {
13 ...
14 t := &task{
15 client: c.client,
16 io: i,
17 id: c.id,
18 c: c,
19 }
20 ...
21 response, err := c.client.TaskService().Create(ctx, request)
22 if err != nil {
23 return nil, errdefs.FromGRPC(err)
24 }
25 t.pid = response.Pid
26 return t, nil
27}
类似创建 container,这里调用 container.client.TaskService().Create 创建 task:
1// containerd/api/services/tasks/v1/tasks_grpc.pg.go
2func (c *tasksClient) Create(ctx context.Context, in *CreateTaskRequest, opts ...grpc.CallOption) (*CreateTaskResponse, error) {
3 out := new(CreateTaskResponse)
4 err := c.cc.Invoke(ctx, "/containerd.services.tasks.v1.Tasks/Create", in, out, opts...)
5 if err != nil {
6 return nil, err
7 }
8 return out, nil
9}
调用 /containerd.services.tasks.v1.Tasks/Create grpc 接口创建 task。查看 containerd 中提供该服务的插件。
1// containerd/plugins/services/tasks/service.go
2func init() {
3 registry.Register(&plugin.Registration{
4 Type: plugins.GRPCPlugin,
5 ID: "tasks",
6 Requires: []plugin.Type{
7 plugins.ServicePlugin,
8 },
9 InitFn: func(ic *plugin.InitContext) (interface{}, error) {
10 // plugins.ServicePlugin: io.containerd.service.v1
11 // services.TasksService: tasks-service
12 i, err := ic.GetByID(plugins.ServicePlugin, services.TasksService)
13 if err != nil {
14 return nil, err
15 }
16 return &service{local: i.(api.TasksClient)}, nil
17 },
18 })
19}
containerd 中提供该服务的是 io.containerd.grpc.v1.tasks 插件。调用插件对象 service 的 Create 方法创建 task:
1// containerd/plugins/services/tasks/service.go
2func (s *service) Create(ctx context.Context, r *api.CreateTaskRequest) (*api.CreateTaskResponse, error) {
3 return s.local.Create(ctx, r)
4}
service 调用 local 对象的 Create 方法创建 task。local 是 io.containerd.service.v1.tasks-service 插件的实例化对象:
1// containerd/plugins/services/tasks/local.go
2func init() {
3 registry.Register(&plugin.Registration{
4 Type: plugins.ServicePlugin,
5 ID: services.TasksService,
6 Requires: tasksServiceRequires,
7 Config: &Config{},
8 InitFn: initFunc,
9 })
10
11 timeout.Set(stateTimeout, 2*time.Second)
12}
13
14func initFunc(ic *plugin.InitContext) (interface{}, error) {
15 config := ic.Config.(*Config)
16
17 // plugins.RuntimePluginV2: io.containerd.runtime.v2
18 v2r, err := ic.GetByID(plugins.RuntimePluginV2, "task")
19 if err != nil {
20 return nil, err
21 }
22
23 // plugins.MetadataPlugin: io.containerd.metadata.v1
24 m, err := ic.GetSingle(plugins.MetadataPlugin)
25 if err != nil {
26 return nil, err
27 }
28 ...
29 db := m.(*metadata.DB)
30 l := &local{
31 containers: metadata.NewContainerStore(db),
32 store: db.ContentStore(),
33 publisher: ep.(events.Publisher),
34 monitor: monitor.(runtime.TaskMonitor),
35 v2Runtime: v2r.(runtime.PlatformRuntime),
36 }
37 ...
38}
进入 local.Create:
1// containerd/plugins/services/tasks/local.go
2func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.CallOption) (*api.CreateTaskResponse, error) {
3 // 从 boltDB 中获取 container metadata
4 container, err := l.getContainer(ctx, r.ContainerID)
5 if err != nil {
6 return nil, errdefs.ToGRPC(err)
7 }
8 ...
9 rtime := l.v2Runtime
10 ...
11 c, err := rtime.Create(ctx, r.ContainerID, opts)
12 if err != nil {
13 return nil, errdefs.ToGRPC(err)
14 }
15 ...
16}
local.Create 首先获取 boltDB 中的 container 信息, 接着调用 local.v2Runtime.Create 创建 task。v2Runtime 是 io.containerd.runtime.v2.task 插件的实例:
1// containerd/core/runtime/v2/manager.go
2func init() {
3 registry.Register(&plugin.Registration{
4 Type: plugins.RuntimePluginV2,
5 ID: "task",
6 ...
7 InitFn: func(ic *plugin.InitContext) (interface{}, error) {
8 ...
9 // 获取 metadata 插件的实例
10 m, err := ic.GetSingle(plugins.MetadataPlugin)
11 if err != nil {
12 return nil, err
13 }
14 ...
15 cs := metadata.NewContainerStore(m.(*metadata.DB))
16 ss := metadata.NewSandboxStore(m.(*metadata.DB))
17 events := ep.(*exchange.Exchange)
18
19 shimManager, err := NewShimManager(ic.Context, &ManagerConfig{
20 Root: ic.Properties[plugins.PropertyRootDir],
21 State: ic.Properties[plugins.PropertyStateDir],
22 Address: ic.Properties[plugins.PropertyGRPCAddress],
23 TTRPCAddress: ic.Properties[plugins.PropertyTTRPCAddress],
24 Events: events,
25 Store: cs,
26 SchedCore: config.SchedCore,
27 SandboxStore: ss,
28 })
29 if err != nil {
30 return nil, err
31 }
32
33 return NewTaskManager(shimManager), nil
34 },
35 ...
36 })
37}
38
39func NewShimManager(ctx context.Context, config *ManagerConfig) (*ShimManager, error) {
40 ...
41 m := &ShimManager{
42 root: config.Root,
43 state: config.State,
44 containerdAddress: config.Address,
45 containerdTTRPCAddress: config.TTRPCAddress,
46 shims: runtime.NewNSMap[ShimInstance](),
47 events: config.Events,
48 containers: config.Store,
49 schedCore: config.SchedCore,
50 sandboxStore: config.SandboxStore,
51 }
52 ...
53 return m, nil
54}
55
56func NewTaskManager(shims *ShimManager) *TaskManager {
57 return &TaskManager{
58 manager: shims,
59 }
60}
io.containerd.runtime.v2.task 插件的实例是 TaskManger,其中包括 shims(垫片)。调用 local.v2Runtime.Create 实际调用的是 TaskManager.Create:
1// containerd/core/runtime/v2/manager.go
2func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (runtime.Task, error) {
3 // step1: 创建 shim
4 shim, err := m.manager.Start(ctx, taskID, opts)
5 if err != nil {
6 return nil, fmt.Errorf("failed to start shim: %w", err)
7 }
8
9 // step2:new shim task
10 shimTask, err := newShimTask(shim)
11 if err != nil {
12 return nil, err
13 }
14
15 // step3: 创建 task
16 t, err := shimTask.Create(ctx, opts)
17 if err != nil {
18 ...
19 }
20
21 return t, nil
22}
首先,调用 TaskManager.manager.Start 启动 shim:
1// containerd/core/runtime/v2/manager.go
2func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateOpts) (_ ShimInstance, retErr error) {
3 // 创建 bundle,bundle 是包含 shim 信息的对象
4 bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec)
5 if err != nil {
6 return nil, err
7 }
8 ...
9 // 启动 shim
10 shim, err := m.startShim(ctx, bundle, id, opts)
11 if err != nil {
12 return nil, err
13 }
14 ...
15 // 将启动的 shim 添加到 TaskManager 的 shims 中
16 if err := m.shims.Add(ctx, shim); err != nil {
17 return nil, fmt.Errorf("failed to add task: %w", err)
18 }
19 ...
20}
进入 ShimManager.startShim 查看启动 shim 的逻辑:
1// containerd/core/runtime/v2/manager.go
2func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shim, error) {
3 ...
4 // 启动 shim 的 binary 对象
5 b := shimBinary(bundle, shimBinaryConfig{
6 runtime: runtimePath,
7 address: m.containerdAddress,
8 ttrpcAddress: m.containerdTTRPCAddress,
9 schedCore: m.schedCore,
10 })
11 // binary 对象启动 shim
12 shim, err := b.Start(ctx, protobuf.FromAny(topts), func() {
13 ...
14 })
15 ...
16}
17
18// containerd/core/runtime/v2/binary.go
19func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) {
20 ...
21 cmd, err := client.Command(
22 ctx,
23 &client.CommandConfig{
24 // runtime: /usr/bin/containerd-shim-runc-v2
25 Runtime: b.runtime,
26 Address: b.containerdAddress,
27 TTRPCAddress: b.containerdTTRPCAddress,
28 Path: b.bundle.Path,
29 Opts: opts,
30 Args: args,
31 SchedCore: b.schedCore,
32 })
33 if err != nil {
34 return nil, err
35 }
36 ...
37 out, err := cmd.CombinedOutput()
38 if err != nil {
39 return nil, fmt.Errorf("%s: %w", out, err)
40 }
41 ...
42}
shim 通过命令行执行二进制文件的方式,启动 container 对应的 runtime /usr/bin/containerd-shim-runc-v2。查看启动的 shim:
1# ps -ef | grep nginx1
2root 4144414 4144319 0 10:20 ? 00:00:00 /root/go/src/containerd/cmd/ctr/__debug_bin3399182742 run docker.io/library/nginx:alpine nginx1
3root 4147233 1 0 10:49 ? 00:00:00 /usr/bin/containerd-shim-runc-v2 -namespace default -id nginx1 -address /run/containerd/containerd.sock
启动的 shim 进程为 4147233,其父进程是 root(1)进程。
shim 提供 ttrpc 服务,负责和 containerd 进行通信:

TaskManager.manager.Start 启动了 shim。接着在 TaskManager.Create 中调用 newShimTask 实例化 task。
1// containerd/core/runtime/v2/shim.go
2func newShimTask(shim ShimInstance) (*shimTask, error) {
3 taskClient, err := NewTaskClient(shim.Client(), shim.Version())
4 if err != nil {
5 return nil, err
6 }
7
8 return &shimTask{
9 ShimInstance: shim,
10 task: taskClient,
11 }, nil
12}
13
14// containerd/core/runtime/v2/bridge.go
15func NewTaskClient(client interface{}, version int) (TaskServiceClient, error) {
16 switch c := client.(type) {
17 case *ttrpc.Client:
18 switch version {
19 case 2:
20 return &ttrpcV2Bridge{client: v2.NewTaskClient(c)}, nil
21 case 3:
22 return v3.NewTTRPCTaskClient(c), nil
23 default:
24 return nil, fmt.Errorf("containerd client supports only v2 and v3 TTRPC task client (got %d)", version)
25 }
26
27 ...
28 }
29}
task 包括和 shim 连接的 client ttrpcV2Bridge,它们通过 ttrpc 建立连接。
继续在 TaskManager.Create 中调用 shimTask.Create :
1// containerd/core/runtime/v2/shim.go
2func (s *shimTask) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Task, error) {
3 ...
4 request := &task.CreateTaskRequest{
5 ID: s.ID(),
6 Bundle: s.Bundle(),
7 Stdin: opts.IO.Stdin,
8 Stdout: opts.IO.Stdout,
9 Stderr: opts.IO.Stderr,
10 Terminal: opts.IO.Terminal,
11 Checkpoint: opts.Checkpoint,
12 Options: protobuf.FromAny(topts),
13 }
14 ...
15 _, err := s.task.Create(ctx, request)
16 if err != nil {
17 return nil, errdefs.FromGRPC(err)
18 }
19
20 return s, nil
21}
进入 shimTask.task.Create:
1// containerd/core/runtime/v2/bridge.go
2func (b *ttrpcV2Bridge) Create(ctx context.Context, request *api.CreateTaskRequest) (*api.CreateTaskResponse, error) {
3 resp, err := b.client.Create(ctx, &v2.CreateTaskRequest{
4 ID: request.GetID(),
5 Bundle: request.GetBundle(),
6 Rootfs: request.GetRootfs(),
7 Terminal: request.GetTerminal(),
8 Stdin: request.GetStdin(),
9 Stdout: request.GetStdout(),
10 Stderr: request.GetStderr(),
11 Checkpoint: request.GetCheckpoint(),
12 ParentCheckpoint: request.GetParentCheckpoint(),
13 Options: request.GetOptions(),
14 })
15
16 return &api.CreateTaskResponse{Pid: resp.GetPid()}, err
17}
18
19// containerd/api/runtime/task/v2/shim_ttrpc.pb.go
20func (c *taskClient) Create(ctx context.Context, req *CreateTaskRequest) (*CreateTaskResponse, error) {
21 var resp CreateTaskResponse
22 if err := c.client.Call(ctx, "containerd.task.v2.Task", "Create", req, &resp); err != nil {
23 return nil, err
24 }
25 return &resp, nil
26}
taskClient.Create 调用 shim 的 containerd.task.v2.Task.Create ttrpc 创建 task。
containerd-shim-runc-v2 和 Runc
containerd.task.v2.Task.Create 是由 containerd-shim-runc-v2 提供的 ttrpc 服务。其服务实例是 containerd-shim-runc-v2 下的 service 对象:
1// containerd/cmd/containerd-shim-runc-v2/task/service.go
2// Create a new initial process and container with the underlying OCI runtime
3func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
4 ...
5 container, err := runc.NewContainer(ctx, s.platform, r)
6 if err != nil {
7 return nil, err
8 }
9 ...
10}
service.Create 调用 runc.NewContainer 实例化容器。runc 是实际创建容器的低级运行时。进入 runc.NewContainer:
1// containerd/cmd/containerd-shim-runc-v2/runc/container.go
2// NewContainer returns a new runc container
3func NewContainer(ctx context.Context, platform stdio.Platform, r *task.CreateTaskRequest) (_ *Container, retErr error) {
4 ...
5 // runc init 进程,runc init 进程负责启动容器进程
6 p, err := newInit(
7 ctx,
8 r.Bundle,
9 filepath.Join(r.Bundle, "work"),
10 ns,
11 platform,
12 config,
13 opts,
14 rootfs,
15 )
16
17 ...
18 // 创建容器
19 if err := p.Create(ctx, config); err != nil {
20 return nil, errdefs.ToGRPC(err)
21 }
22 container := &Container{
23 ID: r.ID,
24 Bundle: r.Bundle,
25 process: p,
26 processes: make(map[string]process.Process),
27 reservedProcess: make(map[string]struct{}),
28 }
29 ...
30}
在 Init.Create 中调用低级运行时 runc 创建启动容器的 init 进程。
1// containerd/cmd/containerd-shim-runc-v2/process/init.go
2func (p *Init) Create(ctx context.Context, r *CreateConfig) error {
3 ...
4 if err := p.runtime.Create(ctx, r.ID, r.Bundle, opts); err != nil {
5 return p.runtimeError(err, "OCI runtime create failed")
6 }
7 ...
8}
9
10// containerd/vendor/github.com/containerd/go-runc/runc.go
11// Create creates a new container and returns its pid if it was created successfully
12func (r *Runc) Create(context context.Context, id, bundle string, opts *CreateOpts) error {
13 ...
14 cmd := r.command(context, append(args, id)...)
15 if opts.IO != nil {
16 opts.Set(cmd)
17 }
18 ...
19 // Runc.startCommand 执行 runc 命令创建容器
20 ec, err := r.startCommand(cmd)
21 ...
22}
在 Runc.startCommand 中执行 runc init 命令创建启动容器的 init 进程:
1# ps -ef | grep 120376
2root 4147233 1 0 14:25 ? 00:00:00 /usr/bin/containerd-shim-runc-v2 -namespace default -id nginx1 -address /run/containerd/containerd.sock
3root 120396 4147233 0 14:25 ? 00:00:00 runc init