前言
上几篇文章介绍了 kubernetes 的核心数据结构 scheme 和 Kubernetes:kube-apiserver 的启动流程。在启动流程篇中重点关注的是启动的核心逻辑,并没有关注 kube-apiserver 和外部组件的交互。
而交互是非常必要的,其定义了边界和依赖。

从 Kubernetes 架构图可以看出,kube-apiserver 是唯一和 etcd 交互的组件。因此,这里将 kube-apiserver 和 etcd 交互的部分单独拿出来加以介绍,做到知其然,知其所以然。
使用 etcd
既然是交互,首先需要了解的是怎么用交互的组件。这里同 kube-apiserver 交互的是大名鼎鼎的 etcd,不需要多介绍它。
仅给出示例:
1package main
2
3import (
4 "context"
5 "fmt"
6 "log"
7 "time"
8
9 "go.etcd.io/etcd/clientv3"
10)
11
12func main() {
13 cli, err := clientv3.New(clientv3.Config{
14 Endpoints: []string{"127.0.0.1:2379"},
15 DialTimeout: time.Second * 5,
16 })
17 if err != nil {
18 log.Fatal(err)
19 }
20 fmt.Println("connect to etcd success.")
21 defer cli.Close()
22
23 // lease with 5 second
24 resp, err := cli.Grant(context.TODO(), 5)
25 if err != nil {
26 log.Fatal(err)
27 }
28
29 // delete key:name after expire of lease
30 _, err = cli.Put(context.TODO(), "name", "hxia", clientv3.WithLease(resp.ID))
31 if err != nil {
32 log.Fatal(err)
33 }
34}
详细内容可参考 go-by-example: etcd 和 Quickstart。
kube-apiserver 和 etcd
顺序看 kube-apiserver 和 etcd 的交互是非常复杂的,容易头晕。这里,逆序的看 kube-apiserver 和 etcd 的交互。首先,找到它们在哪里交互的,接着从这一点开始发散,摸清整体脉络。
那么,它们在哪里交互的呢?这个问题不难回答,在 handler。作为 RESTful API 的处理单元,handler 内定义了 kube-apiserver 和 etcd 的交互。
以处理 GET 的 handler 为例:
1# kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/installer.go
2func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
3 switch action.Verb {
4 case "GET": // Get a resource.
5 var handler restful.RouteFunction
6 if isGetterWithOptions {
7 handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, isSubresource)
8 } else {
9 handler = restfulGetResource(getter, reqScope)
10 }
11
12 route := ws.GET(action.Path).To(handler).
13 Doc(doc).
14 Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
15 Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
16 Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
17 Returns(http.StatusOK, "OK", producedObject).
18 Writes(producedObject)
19
20 addParams(route, action.Params)
21 routes = append(routes, route)
22 }
23}
进入 restfulGetResource 看 handler 是怎么创建的。
1func restfulGetResource(r rest.Getter, scope handlers.RequestScope) restful.RouteFunction {
2 return func(req *restful.Request, res *restful.Response) {
3 handlers.GetResource(r, &scope)(res.ResponseWriter, req.Request)
4 }
5}
6
7// GetResource returns a function that handles retrieving a single resource from a rest.Storage object.
8func GetResource(r rest.Getter, scope *RequestScope) http.HandlerFunc {
9 return getResourceHandler(scope,
10 func(ctx context.Context, name string, req *http.Request) (runtime.Object, error) {
11 ...
12 return r.Get(ctx, name, &options)
13 })
14}
15
16// Getter is an object that can retrieve a named RESTful resource.
17type Getter interface {
18 // Get finds a resource in the storage by name and returns it.
19 // Although it can return an arbitrary error value, IsNotFound(err) is true for the
20 // returned error value err when the specified resource is not found.
21 Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error)
22}
可以看到:
restfulGetResource返回一个路由函数,路由函数内包含传递给restfulGetResource的getter对象。- 返回的路由函数内,调用的是
getter的Get方法获取资源对象runtime.Object。这里的getter是实现Getter接口的对象。
基于上述分析,现在重点就变成 getter 调用的 Get 具体做了什么。通过逐级向上追溯,找到了 Getter 接口的实例对象 customResourceDefinitionStorage。
1# kubernetes/vendor/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go
2func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
3 if resource := "customresourcedefinitions"; apiResourceConfig.ResourceEnabled(v1.SchemeGroupVersion.WithResource(resource)) {
4 // 调用 NetREST 创建资源实体 customResourceDefinitionStorage
5 customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
6 if err != nil {
7 return nil, err
8 }
9 storage[resource] = customResourceDefinitionStorage
10 storage[resource+"/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
11 }
12}
13
14func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (*REST, error) {
15 strategy := NewStrategy(scheme)
16
17 store := &genericregistry.Store{
18 ...
19 }
20 options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: GetAttrs}
21 if err := store.CompleteWithOptions(options); err != nil {
22 return nil, err
23 }
24 return &REST{store}, nil
25}
接口的实例对象找到了,继续看实例对象的 Get 做了什么。
1# kubernetes/vendor/k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition/etcd.go
2// rest implements a RESTStorage for API services against etcd
3type REST struct {
4 *genericregistry.Store
5}
6
7# kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
8type Store struct {
9 Storage DryRunnableStorage
10}
11
12# kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go
13type DryRunnableStorage struct {
14 Storage storage.Interface
15 Codec runtime.Codec
16}
17
18# kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
19// Get retrieves the item from storage.
20func (e *Store) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
21 obj := e.NewFunc()
22 key, err := e.KeyFunc(ctx, name)
23 if err != nil {
24 return nil, err
25 }
26 if err := e.Storage.Get(ctx, key, storage.GetOptions{ResourceVersion: options.ResourceVersion}, obj); err != nil {
27 return nil, storeerr.InterpretGetError(err, e.qualifiedResourceFromContext(ctx), name)
28 }
29 if e.Decorator != nil {
30 e.Decorator(obj)
31 }
32 return obj, nil
33}
34
35# kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go
36func (s *DryRunnableStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
37 return s.Storage.Get(ctx, key, opts, objPtr)
38}
REST 对象包含 *genericregistry.Store,其继承了 Store 的 Get 方法。在 Store.Get 方法内,通过 e.Storage.Get 调用 DryRunnableStorage 的 Get 方法。实际是通过 DryRunnableStorage 内的 Storage 存储接口调用 Get 方法,从而访问 etcd。
DryRunnableStorage.Storage 是一个接口,它的实体对象是什么呢?
还是从资源实体入手,看 REST{store} 是如何实例化的。
1// NewREST returns a RESTStorage object that will work against API services.
2func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (*REST, error) {
3 strategy := NewStrategy(scheme)
4
5 store := &genericregistry.Store{
6 ...
7 }
8 options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: GetAttrs}
9 // 进入 CompleteWithOptions
10 if err := store.CompleteWithOptions(options); err != nil {
11 return nil, err
12 }
13 return &REST{store}, nil
14}
15
16func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
17 if e.Storage.Storage == nil {
18 e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
19 opts.StorageConfig,
20 prefix,
21 keyFunc,
22 e.NewFunc,
23 e.NewListFunc,
24 attrFunc,
25 options.TriggerFunc,
26 options.Indexers,
27 )
28 }
29}
看到这里,已经知道哪里实例化的 storage.Interface 对象了。这里的 opts.Decorator 是一个装饰函数。接着,继续探案,看这个装饰函数干了什么,知道它干了什么就能挖出来最关键的一环,存储接口是怎么访问 etcd 的。
1# kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
2func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
3 // 通过 options.RESTOptions.GetRESTOptions 实例化 opts
4 // options.RESTOptions 是满足 RESTOptionsGetter 接口的实例
5 opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
6 if err != nil {
7 return err
8 }
9}
10
11# kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/options.go
12type RESTOptionsGetter interface {
13 GetRESTOptions(resource schema.GroupResource) (RESTOptions, error)
14}
15
16func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (*REST, error) {
17 strategy := NewStrategy(scheme)
18
19 store := &genericregistry.Store{
20 ...
21 }
22 // 创建 options
23 options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: GetAttrs}
24 // 将 options 作为参数传递给 Store.CompleteWithOptions
25 if err := store.CompleteWithOptions(options); err != nil {
26 return nil, err
27 }
28 return &REST{store}, nil
29}
30
31func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
32 if resource := "customresourcedefinitions"; apiResourceConfig.ResourceEnabled(v1.SchemeGroupVersion.WithResource(resource)) {
33 customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
34 if err != nil {
35 return nil, err
36 }
37 storage[resource] = customResourceDefinitionStorage
38 storage[resource+"/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
39 }
40}
可以看到,c.GenericConfig.RESTOptionsGetter 即为 optsGetter,调用 c.GenericConfig.RESTOptionsGetter 的 GetRESTOptions 得到 RESTOptions。
c.GenericConfig.RESTOptionsGetter 在哪里实例化的呢?
还记得前面创建通用配置的 BuildGenericConfig 吗?在该函数内,实例化了 c.GenericConfig.RESTOptionsGetter。
1# kubernetes/pkg/controlplane/apiserver/config.go
2func BuildGenericConfig(
3 s controlplaneapiserver.CompletedOptions,
4 schemes []*runtime.Scheme,
5 getOpenAPIDefinitions func(ref openapicommon.ReferenceCallback) map[string]openapicommon.OpenAPIDefinition,
6){
7 storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
8 storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
9 storageFactory, lastErr = storageFactoryConfig.Complete(s.Etcd).New()
10 if lastErr != nil {
11 return
12 }
13 if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
14 return
15 }
16}
17
18func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
19 c.RESTOptionsGetter = s.CreateRESTOptionsGetter(factory, c.ResourceTransformers)
20 return nil
21}
22
23func (s *EtcdOptions) CreateRESTOptionsGetter(factory serverstorage.StorageFactory, resourceTransformers storagevalue.ResourceTransformers) generic.RESTOptionsGetter {
24 if resourceTransformers != nil {
25 factory = &transformerStorageFactory{
26 delegate: factory,
27 resourceTransformers: resourceTransformers,
28 }
29 }
30 return &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
31}
过程也不复杂,可以看到,RESTOptionsGetter 接口的实例化对象是 &StorageFactoryRestOptionsFactory。
调用 c.GenericConfig.RESTOptionsGetter 的 GetRESTOptions 实际调用的是 StorageFactoryRestOptionsFactory.GetRESTOptions。
1func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
2 ret := generic.RESTOptions{
3 StorageConfig: storageConfig,
4 Decorator: generic.UndecoratedStorage,
5 DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
6 EnableGarbageCollection: f.Options.EnableGarbageCollection,
7 ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
8 CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
9 StorageObjectCountTracker: f.Options.StorageConfig.StorageObjectCountTracker,
10 }
11
12 return ret, nil
13}
RESTOptions 中包含了 Decorator 的创建,这里我们的重点是 Decorator,进入 generic.UndecoratedStorage 看它是怎么一个函数。
1# kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go
2func UndecoratedStorage(
3 config *storagebackend.ConfigForResource,
4 resourcePrefix string,
5 keyFunc func(obj runtime.Object) (string, error),
6 newFunc func() runtime.Object,
7 newListFunc func() runtime.Object,
8 getAttrsFunc storage.AttrFunc,
9 trigger storage.IndexerFuncs,
10 indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
11 return NewRawStorage(config, newFunc, newListFunc, resourcePrefix)
12}
13
14func NewRawStorage(config *storagebackend.ConfigForResource, newFunc, newListFunc func() runtime.Object, resourcePrefix string) (storage.Interface, factory.DestroyFunc, error) {
15 return factory.Create(*config, newFunc, newListFunc, resourcePrefix)
16}
17
18# kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go
19func Create(c storagebackend.ConfigForResource, newFunc, newListFunc func() runtime.Object, resourcePrefix string) (storage.Interface, DestroyFunc, error) {
20 switch c.Type {
21 case storagebackend.StorageTypeETCD2:
22 return nil, nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
23 case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
24 return newETCD3Storage(c, newFunc, newListFunc, resourcePrefix)
25 default:
26 return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
27 }
28}
29
30# kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go
31func newETCD3Storage(c storagebackend.ConfigForResource, newFunc, newListFunc func() runtime.Object, resourcePrefix string) (storage.Interface, DestroyFunc, error) {
32 client, err := newETCD3Client(c.Transport)
33 if err != nil {
34 stopCompactor()
35 return nil, nil, err
36 }
37
38 client.KV = etcd3.NewETCDLatencyTracker(client.KV)
39
40 return etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil
41}
42
43// New returns an etcd3 implementation of storage.Interface.
44func New(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface {
45 return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, pagingEnabled, leaseManagerConfig)
46}
47
48func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store {
49 s := &store{
50 client: c,
51 codec: codec,
52 versioner: versioner,
53 transformer: transformer,
54 pagingEnabled: pagingEnabled,
55 pathPrefix: pathPrefix,
56 groupResource: groupResource,
57 groupResourceString: groupResource.String(),
58 watcher: w,
59 leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
60 }
61 return s
62}
上述代码基本都是函数的顺序调用,不用介绍太多。
可以看到 opts.Decorator 做的事情是实例化了一个访问 etcd 的接口实例 store。store 中存储了访问 etcd 的 client,client 是通过 newETCD3Client(c.Transport) 创建的。
到这里,基本破案了。访问 etcd 实际是通过 store 和 etcd 进行交互。这里的 store 只是存储了 client 并没有实际访问,实际访问在 handler。
再回头看 DryRunnableStorage.Get 方法内的 s.Storage.Get 即可知道其调用的是 store 的 Get 方法。
1# kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go
2func (s *DryRunnableStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
3 return s.Storage.Get(ctx, key, opts, objPtr)
4}
5
6# kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go
7// Get implements storage.Interface.Get.
8func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error {
9 preparedKey, err := s.prepareKey(key)
10 if err != nil {
11 return err
12 }
13 startTime := time.Now()
14 // 通过 client 访问 key 对应的 value
15 getResp, err := s.client.KV.Get(ctx, preparedKey)
16
17 kv := getResp.Kvs[0]
18
19 data, _, err := s.transformer.TransformFromStorage(ctx, kv.Value, authenticatedDataString(preparedKey))
20 if err != nil {
21 return storage.NewInternalError(err.Error())
22 }
23
24 err = decode(s.codec, s.versioner, data, out, kv.ModRevision)
25 if err != nil {
26 recordDecodeError(s.groupResourceString, preparedKey)
27 return err
28 }
29 return nil
30}
最后,通过本文介绍了 kube-apiserver 和 etcd 的交互。下一步将重点介绍 kube-apiserver 是怎么做鉴权,认证和准入机制的。