文接 containerd 源码分析:创建 container(一)

创建容器进程

创建 container 成功后,接着创建 task, task 将根据 container metadata 创建容器进程。

创建 task

进入 tasks.Newtask 创建 task:

 1// containerd/cmd/ctr/commands/tasks/tasks_unix.go
 2func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, checkpoint string, con console.Console, nullIO bool, logURI string, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) {
 3   ...
 4   t, err := container.NewTask(ctx, ioCreator, opts...)
 5	if err != nil {
 6		return nil, err
 7	}
 8   ...
 9}
10
11// containerd/client/container.go
12func (c *container) NewTask(ctx context.Context, ioCreate cio.Creator, opts ...NewTaskOpts) (_ Task, err error) {
13   ...
14   t := &task{
15		client: c.client,
16		io:     i,
17		id:     c.id,
18		c:      c,
19	}
20	...
21	response, err := c.client.TaskService().Create(ctx, request)
22	if err != nil {
23		return nil, errdefs.FromGRPC(err)
24	}
25	t.pid = response.Pid
26	return t, nil
27}

类似创建 container,这里调用 container.client.TaskService().Create 创建 task:

1// containerd/api/services/tasks/v1/tasks_grpc.pg.go
2func (c *tasksClient) Create(ctx context.Context, in *CreateTaskRequest, opts ...grpc.CallOption) (*CreateTaskResponse, error) {
3	out := new(CreateTaskResponse)
4	err := c.cc.Invoke(ctx, "/containerd.services.tasks.v1.Tasks/Create", in, out, opts...)
5	if err != nil {
6		return nil, err
7	}
8	return out, nil
9}

调用 /containerd.services.tasks.v1.Tasks/Create grpc 接口创建 task。查看 containerd 中提供该服务的插件。

 1// containerd/plugins/services/tasks/service.go
 2func init() {
 3	registry.Register(&plugin.Registration{
 4		Type: plugins.GRPCPlugin,
 5		ID:   "tasks",
 6		Requires: []plugin.Type{
 7			plugins.ServicePlugin,
 8		},
 9		InitFn: func(ic *plugin.InitContext) (interface{}, error) {
10			// plugins.ServicePlugin: io.containerd.service.v1
11			// services.TasksService: tasks-service
12			i, err := ic.GetByID(plugins.ServicePlugin, services.TasksService)
13			if err != nil {
14				return nil, err
15			}
16			return &service{local: i.(api.TasksClient)}, nil
17		},
18	})
19}

containerd 中提供该服务的是 io.containerd.grpc.v1.tasks 插件。调用插件对象 serviceCreate 方法创建 task:

1// containerd/plugins/services/tasks/service.go
2func (s *service) Create(ctx context.Context, r *api.CreateTaskRequest) (*api.CreateTaskResponse, error) {
3	return s.local.Create(ctx, r)
4}

service 调用 local 对象的 Create 方法创建 task。localio.containerd.service.v1.tasks-service 插件的实例化对象:

 1// containerd/plugins/services/tasks/local.go
 2func init() {
 3	registry.Register(&plugin.Registration{
 4		Type:     plugins.ServicePlugin,
 5		ID:       services.TasksService,
 6		Requires: tasksServiceRequires,
 7		Config:   &Config{},
 8		InitFn:   initFunc,
 9	})
10
11	timeout.Set(stateTimeout, 2*time.Second)
12}
13
14func initFunc(ic *plugin.InitContext) (interface{}, error) {
15	config := ic.Config.(*Config)
16
17	// plugins.RuntimePluginV2: io.containerd.runtime.v2
18	v2r, err := ic.GetByID(plugins.RuntimePluginV2, "task")
19	if err != nil {
20		return nil, err
21	}
22
23	// plugins.MetadataPlugin: io.containerd.metadata.v1
24	m, err := ic.GetSingle(plugins.MetadataPlugin)
25	if err != nil {
26		return nil, err
27	}
28	...
29	db := m.(*metadata.DB)
30	l := &local{
31		containers: metadata.NewContainerStore(db),
32		store:      db.ContentStore(),
33		publisher:  ep.(events.Publisher),
34		monitor:    monitor.(runtime.TaskMonitor),
35		v2Runtime:  v2r.(runtime.PlatformRuntime),
36	}
37	...
38}

进入 local.Create

 1// containerd/plugins/services/tasks/local.go
 2func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.CallOption) (*api.CreateTaskResponse, error) {
 3	// 从 boltDB 中获取 container metadata
 4	container, err := l.getContainer(ctx, r.ContainerID)
 5	if err != nil {
 6		return nil, errdefs.ToGRPC(err)
 7	}
 8	...
 9	rtime := l.v2Runtime
10	...
11	c, err := rtime.Create(ctx, r.ContainerID, opts)
12	if err != nil {
13		return nil, errdefs.ToGRPC(err)
14	}
15	...
16}

local.Create 首先获取 boltDB 中的 container 信息, 接着调用 local.v2Runtime.Create 创建 task。v2Runtime 是 io.containerd.runtime.v2.task 插件的实例:

 1// containerd/core/runtime/v2/manager.go
 2func init() {
 3	registry.Register(&plugin.Registration{
 4		Type: plugins.RuntimePluginV2,
 5		ID:   "task",
 6		...
 7		InitFn: func(ic *plugin.InitContext) (interface{}, error) {
 8			...
 9			// 获取 metadata 插件的实例
10			m, err := ic.GetSingle(plugins.MetadataPlugin)
11			if err != nil {
12				return nil, err
13			}
14			...
15			cs := metadata.NewContainerStore(m.(*metadata.DB))
16			ss := metadata.NewSandboxStore(m.(*metadata.DB))
17			events := ep.(*exchange.Exchange)
18
19			shimManager, err := NewShimManager(ic.Context, &ManagerConfig{
20				Root:         ic.Properties[plugins.PropertyRootDir],
21				State:        ic.Properties[plugins.PropertyStateDir],
22				Address:      ic.Properties[plugins.PropertyGRPCAddress],
23				TTRPCAddress: ic.Properties[plugins.PropertyTTRPCAddress],
24				Events:       events,
25				Store:        cs,
26				SchedCore:    config.SchedCore,
27				SandboxStore: ss,
28			})
29			if err != nil {
30				return nil, err
31			}
32
33			return NewTaskManager(shimManager), nil
34		},
35		...
36	})
37}
38
39func NewShimManager(ctx context.Context, config *ManagerConfig) (*ShimManager, error) {
40	...
41	m := &ShimManager{
42		root:                   config.Root,
43		state:                  config.State,
44		containerdAddress:      config.Address,
45		containerdTTRPCAddress: config.TTRPCAddress,
46		shims:                  runtime.NewNSMap[ShimInstance](),
47		events:                 config.Events,
48		containers:             config.Store,
49		schedCore:              config.SchedCore,
50		sandboxStore:           config.SandboxStore,
51	}
52	...
53	return m, nil
54}
55
56func NewTaskManager(shims *ShimManager) *TaskManager {
57	return &TaskManager{
58		manager: shims,
59	}
60}

io.containerd.runtime.v2.task 插件的实例是 TaskManger,其中包括 shims(垫片)。调用 local.v2Runtime.Create 实际调用的是 TaskManager.Create

 1// containerd/core/runtime/v2/manager.go
 2func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (runtime.Task, error) {
 3	// step1: 创建 shim
 4	shim, err := m.manager.Start(ctx, taskID, opts)
 5	if err != nil {
 6		return nil, fmt.Errorf("failed to start shim: %w", err)
 7	}
 8
 9	// step2:new shim task
10	shimTask, err := newShimTask(shim)
11	if err != nil {
12		return nil, err
13	}
14
15	// step3: 创建 task
16	t, err := shimTask.Create(ctx, opts)
17	if err != nil {
18		...
19	}
20
21	return t, nil
22}

首先,调用 TaskManager.manager.Start 启动 shim:

 1// containerd/core/runtime/v2/manager.go
 2func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateOpts) (_ ShimInstance, retErr error) {
 3	// 创建 bundle,bundle 是包含 shim 信息的对象
 4	bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec)
 5	if err != nil {
 6		return nil, err
 7	}
 8	...
 9	// 启动 shim
10	shim, err := m.startShim(ctx, bundle, id, opts)
11	if err != nil {
12		return nil, err
13	}
14	...
15	// 将启动的 shim 添加到 TaskManager 的 shims 中
16	if err := m.shims.Add(ctx, shim); err != nil {
17		return nil, fmt.Errorf("failed to add task: %w", err)
18	}
19	...
20}

进入 ShimManager.startShim 查看启动 shim 的逻辑:

 1// containerd/core/runtime/v2/manager.go
 2func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shim, error) {
 3	...
 4	// 启动 shim 的 binary 对象
 5	b := shimBinary(bundle, shimBinaryConfig{
 6		runtime:      runtimePath,
 7		address:      m.containerdAddress,
 8		ttrpcAddress: m.containerdTTRPCAddress,
 9		schedCore:    m.schedCore,
10	})
11	// binary 对象启动 shim
12	shim, err := b.Start(ctx, protobuf.FromAny(topts), func() {
13		...
14	})
15	...
16}
17
18// containerd/core/runtime/v2/binary.go
19func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) {
20	...
21	cmd, err := client.Command(
22		ctx,
23		&client.CommandConfig{
24			// runtime: /usr/bin/containerd-shim-runc-v2
25			Runtime:      b.runtime,
26			Address:      b.containerdAddress,
27			TTRPCAddress: b.containerdTTRPCAddress,
28			Path:         b.bundle.Path,
29			Opts:         opts,
30			Args:         args,
31			SchedCore:    b.schedCore,
32		})
33	if err != nil {
34		return nil, err
35	}
36	...
37	out, err := cmd.CombinedOutput()
38	if err != nil {
39		return nil, fmt.Errorf("%s: %w", out, err)
40	}
41	...
42}

shim 通过命令行执行二进制文件的方式,启动 container 对应的 runtime /usr/bin/containerd-shim-runc-v2。查看启动的 shim:

1# ps -ef | grep nginx1
2root     4144414 4144319  0 10:20 ?        00:00:00 /root/go/src/containerd/cmd/ctr/__debug_bin3399182742 run docker.io/library/nginx:alpine nginx1
3root     4147233       1  0 10:49 ?        00:00:00 /usr/bin/containerd-shim-runc-v2 -namespace default -id nginx1 -address /run/containerd/containerd.sock

启动的 shim 进程为 4147233,其父进程是 root(1)进程。

shim 提供 ttrpc 服务,负责和 containerd 进行通信:

containerd 和 shim 交互

TaskManager.manager.Start 启动了 shim。接着在 TaskManager.Create 中调用 newShimTask 实例化 task。

 1// containerd/core/runtime/v2/shim.go
 2func newShimTask(shim ShimInstance) (*shimTask, error) {
 3	taskClient, err := NewTaskClient(shim.Client(), shim.Version())
 4	if err != nil {
 5		return nil, err
 6	}
 7
 8	return &shimTask{
 9		ShimInstance: shim,
10		task:         taskClient,
11	}, nil
12}
13
14// containerd/core/runtime/v2/bridge.go
15func NewTaskClient(client interface{}, version int) (TaskServiceClient, error) {
16	switch c := client.(type) {
17	case *ttrpc.Client:
18		switch version {
19		case 2:
20			return &ttrpcV2Bridge{client: v2.NewTaskClient(c)}, nil
21		case 3:
22			return v3.NewTTRPCTaskClient(c), nil
23		default:
24			return nil, fmt.Errorf("containerd client supports only v2 and v3 TTRPC task client (got %d)", version)
25		}
26
27	...
28	}
29}

task 包括和 shim 连接的 client ttrpcV2Bridge,它们通过 ttrpc 建立连接。

继续在 TaskManager.Create 中调用 shimTask.Create

 1// containerd/core/runtime/v2/shim.go
 2func (s *shimTask) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Task, error) {
 3	...
 4	request := &task.CreateTaskRequest{
 5		ID:         s.ID(),
 6		Bundle:     s.Bundle(),
 7		Stdin:      opts.IO.Stdin,
 8		Stdout:     opts.IO.Stdout,
 9		Stderr:     opts.IO.Stderr,
10		Terminal:   opts.IO.Terminal,
11		Checkpoint: opts.Checkpoint,
12		Options:    protobuf.FromAny(topts),
13	}
14	...
15	_, err := s.task.Create(ctx, request)
16	if err != nil {
17		return nil, errdefs.FromGRPC(err)
18	}
19
20	return s, nil
21}

进入 shimTask.task.Create

 1// containerd/core/runtime/v2/bridge.go
 2func (b *ttrpcV2Bridge) Create(ctx context.Context, request *api.CreateTaskRequest) (*api.CreateTaskResponse, error) {
 3	resp, err := b.client.Create(ctx, &v2.CreateTaskRequest{
 4		ID:               request.GetID(),
 5		Bundle:           request.GetBundle(),
 6		Rootfs:           request.GetRootfs(),
 7		Terminal:         request.GetTerminal(),
 8		Stdin:            request.GetStdin(),
 9		Stdout:           request.GetStdout(),
10		Stderr:           request.GetStderr(),
11		Checkpoint:       request.GetCheckpoint(),
12		ParentCheckpoint: request.GetParentCheckpoint(),
13		Options:          request.GetOptions(),
14	})
15
16	return &api.CreateTaskResponse{Pid: resp.GetPid()}, err
17}
18
19// containerd/api/runtime/task/v2/shim_ttrpc.pb.go
20func (c *taskClient) Create(ctx context.Context, req *CreateTaskRequest) (*CreateTaskResponse, error) {
21	var resp CreateTaskResponse
22	if err := c.client.Call(ctx, "containerd.task.v2.Task", "Create", req, &resp); err != nil {
23		return nil, err
24	}
25	return &resp, nil
26}

taskClient.Create 调用 shim 的 containerd.task.v2.Task.Create ttrpc 创建 task。

containerd-shim-runc-v2 和 Runc

containerd.task.v2.Task.Create 是由 containerd-shim-runc-v2 提供的 ttrpc 服务。其服务实例是 containerd-shim-runc-v2 下的 service 对象:

 1// containerd/cmd/containerd-shim-runc-v2/task/service.go
 2// Create a new initial process and container with the underlying OCI runtime
 3func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
 4	...
 5	container, err := runc.NewContainer(ctx, s.platform, r)
 6	if err != nil {
 7		return nil, err
 8	}
 9	...
10}

service.Create 调用 runc.NewContainer 实例化容器。runc 是实际创建容器的低级运行时。进入 runc.NewContainer

 1// containerd/cmd/containerd-shim-runc-v2/runc/container.go
 2// NewContainer returns a new runc container
 3func NewContainer(ctx context.Context, platform stdio.Platform, r *task.CreateTaskRequest) (_ *Container, retErr error) {
 4	...
 5	// runc init 进程,runc init 进程负责启动容器进程
 6	p, err := newInit(
 7		ctx,
 8		r.Bundle,
 9		filepath.Join(r.Bundle, "work"),
10		ns,
11		platform,
12		config,
13		opts,
14		rootfs,
15	)
16
17	...
18	// 创建容器
19	if err := p.Create(ctx, config); err != nil {
20		return nil, errdefs.ToGRPC(err)
21	}
22	container := &Container{
23		ID:              r.ID,
24		Bundle:          r.Bundle,
25		process:         p,
26		processes:       make(map[string]process.Process),
27		reservedProcess: make(map[string]struct{}),
28	}
29	...
30}

Init.Create 中调用低级运行时 runc 创建启动容器的 init 进程。

 1// containerd/cmd/containerd-shim-runc-v2/process/init.go
 2func (p *Init) Create(ctx context.Context, r *CreateConfig) error {
 3	...
 4	if err := p.runtime.Create(ctx, r.ID, r.Bundle, opts); err != nil {
 5		return p.runtimeError(err, "OCI runtime create failed")
 6	}
 7	...
 8}
 9
10// containerd/vendor/github.com/containerd/go-runc/runc.go
11// Create creates a new container and returns its pid if it was created successfully
12func (r *Runc) Create(context context.Context, id, bundle string, opts *CreateOpts) error {
13	...
14	cmd := r.command(context, append(args, id)...)
15	if opts.IO != nil {
16		opts.Set(cmd)
17	}
18	...
19	// Runc.startCommand 执行 runc 命令创建容器
20	ec, err := r.startCommand(cmd)
21	...
22}

Runc.startCommand 中执行 runc init 命令创建启动容器的 init 进程:

1# ps -ef | grep 120376
2root      4147233       1  0 14:25 ?        00:00:00 /usr/bin/containerd-shim-runc-v2 -namespace default -id nginx1 -address /run/containerd/containerd.sock
3root      120396  4147233  0 14:25 ?        00:00:00 runc init