前言
前面两篇文章 Kubernetes:kube-apiserver 之 scheme(一) 和 Kubernetes:kube-apiserver 之 scheme(二) 重点介绍了 kube-apiserver 中的资源注册表 scheme。这里进入正题,开始介绍 kube-apiserver 的核心实现。
kube-apiserver 启动流程
kube-apiserver 使用 Cobra 作为 CLI 框架,其初始化示意图如下。

结合示意图和代码看初始化过程效果更佳。代码在 kubernetes/cmd/kube-apiserver/apiserver.go。
1# kubernetes/cmd/kube-apiserver/apiserver.go
2package main
3
4func main() {
5 command := app.NewAPIServerCommand()
6 code := cli.Run(command)
7 os.Exit(code)
8}
9
10# kubernetes/cmd/kube-apiserver/app/server.go
11func NewAPIServerCommand() *cobra.Command {
12 s := options.NewServerRunOptions()
13 cmd := &cobra.Command{
14 Use: "kube-apiserver",
15 ...
16 RunE: func(cmd *cobra.Command, args []string) error {
17 // set default options
18 completedOptions, err := s.Complete()
19 if err != nil {
20 return err
21 }
22
23 // validate options
24 if errs := completedOptions.Validate(); len(errs) != 0 {
25 return utilerrors.NewAggregate(errs)
26 }
27
28 return Run(completedOptions, genericapiserver.SetupSignalHandler())
29 },
30 }
31
32 # parse flags to options
33 fs := cmd.Flags()
34 namedFlagSets := s.Flags()
35 verflag.AddFlags(namedFlagSets.FlagSet("global"))
36
37 return cmd
38}
首先调用 options.NewServerRunOptions() 实例化 options 选项,接着 s.Complete() 补全默认 options,将补全的 options 送入 Validate() 方法进行验证。验证通过后进入 Run(completedOptions, genericapiserver.SetupSignalHandler()),Run() 函数是不会退出的函数,在函数内运行 kube-apiserver。
有一点要注意的是,kube-apiserver 的 参数 通过 flag 解析赋给 options,这是框架的用法,不多讲。
进入 Run() 函数内。
1func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error {
2 // 实例化 kube-apiserver 配置 config
3 config, err := NewConfig(opts)
4 if err != nil {
5 return err
6 }
7
8 // 补全默认配置
9 completed, err := config.Complete()
10 if err != nil {
11 return err
12 }
13
14 // 创建服务链
15 server, err := CreateServerChain(completed)
16 if err != nil {
17 return err
18 }
19
20 prepared, err := server.PrepareRun()
21 if err != nil {
22 return err
23 }
24
25 return prepared.Run(stopCh)
26}
如注释所示,Run() 函数内 kube-apiserver 的启动流程相当清晰。
下面分步看各个流程。
实例化配置
进入 NewConfig(opts) 看实例化 config 过程。
1# kubernetes/cmd/kube-apiserver/app/config.go
2func NewConfig(opts options.CompletedOptions) (*Config, error) {
3 // 根据 options 实例化 Config
4 c := &Config{
5 Options: opts,
6 }
7
8 // 创建 controlPlane 配置文件
9 controlPlane, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(opts)
10 if err != nil {
11 return nil, err
12 }
13 c.ControlPlane = controlPlane
14
15 // 创建 apiExtensions 配置文件
16 apiExtensions, err := apiserver.CreateAPIExtensionsConfig(*controlPlane.GenericConfig, controlPlane.ExtraConfig.VersionedInformers, pluginInitializer, opts.CompletedOptions, opts.MasterCount,
17 serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(controlPlane.ExtraConfig.ProxyTransport, controlPlane.GenericConfig.EgressSelector, controlPlane.GenericConfig.LoopbackClientConfig, controlPlane.GenericConfig.TracerProvider))
18 if err != nil {
19 return nil, err
20 }
21 c.ApiExtensions = apiExtensions
22
23 // 创建 aggregator 配置文件
24 aggregator, err := createAggregatorConfig(*controlPlane.GenericConfig, opts.CompletedOptions, controlPlane.ExtraConfig.VersionedInformers, serviceResolver, controlPlane.ExtraConfig.ProxyTransport, controlPlane.ExtraConfig.PeerProxy, pluginInitializer)
25 if err != nil {
26 return nil, err
27 }
28 c.Aggregator = aggregator
29
30 return c, nil
31}
将 kube-apiserver 的所有 REST 服务组合在一起是极为复杂的,这里 kube-apiserver 将服务拆分,解耦为三种 HTTP Server:KubeAPIServer,APIExtensionsServer 和 AggregatorServer。

三种 HTTP Server 拥有各自的配置文件。这里以 APIExtensionsServer 为例,查看其启动流程,其它两种 HTTP Server 与此类似。
进入 CreateKubeAPIServerConfig(opts)。
1func CreateKubeAPIServerConfig(opts options.CompletedOptions) (
2 *controlplane.Config,
3 aggregatorapiserver.ServiceResolver,
4 []admission.PluginInitializer,
5 error,
6) {
7 // 创建通用配置
8 genericConfig, versionedInformers, storageFactory, err := controlplaneapiserver.BuildGenericConfig(
9 opts.CompletedOptions,
10 []*runtime.Scheme{legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme},
11 generatedopenapi.GetOpenAPIDefinitions,
12 )
13
14 config := &controlplane.Config{
15 GenericConfig: genericConfig,
16 ExtraConfig: controlplane.ExtraConfig{
17 ...
18 },
19 }
20
21 // setup admission
22 admissionConfig := &kubeapiserveradmission.Config{
23 ExternalInformers: versionedInformers,
24 LoopbackClientConfig: genericConfig.LoopbackClientConfig,
25 CloudConfigFile: opts.CloudProvider.CloudConfigFile,
26 }
27 err = opts.Admission.ApplyTo(
28 genericConfig,
29 versionedInformers,
30 clientgoExternalClient,
31 dynamicExternalClient,
32 utilfeature.DefaultFeatureGate,
33 pluginInitializers...)
34 if err != nil {
35 return nil, nil, nil, fmt.Errorf("failed to apply admission: %w", err)
36 }
37
38 ...
39 return config, serviceResolver, pluginInitializers, nil
40}
试想,三种 HTTP Server 肯定有通用的配置。kube-apiserver 在函数 CreateKubeAPIServerConfig(opts) 内调用 BuildGenericConfig() 创建 HTTP Server 通用配置。
创建完通用配置后,实例化 KubeAPIServer 配置 config。接着,实例化 admission 准入相关配置,通过 opts.Admission.ApplyTo() 将准入配置赋给 config。
进入 BuildGenericConfig 看通用配置创建了什么。
1func BuildGenericConfig(
2 s controlplaneapiserver.CompletedOptions,
3 schemes []*runtime.Scheme,
4 getOpenAPIDefinitions func(ref openapicommon.ReferenceCallback) map[string]openapicommon.OpenAPIDefinition,
5) (
6 genericConfig *genericapiserver.Config,
7 versionedInformers clientgoinformers.SharedInformerFactory,
8 storageFactory *serverstorage.DefaultStorageFactory,
9
10 lastErr error,
11) {
12 // NewConfig returns a Config struct with the default values
13 genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
14 genericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()
15
16 // ApplyTo applies the run options to the method receiver and returns self
17 if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil {
18 return
19 }
20
21 // wrap the definitions to revert any changes from disabled features
22 getOpenAPIDefinitions = openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(getOpenAPIDefinitions)
23 namer := openapinamer.NewDefinitionNamer(schemes...)
24 genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(getOpenAPIDefinitions, namer)
25 genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
26 genericConfig.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(getOpenAPIDefinitions, namer)
27 genericConfig.OpenAPIV3Config.Info.Title = "Kubernetes"
28
29 // New returns a new storage factory created from the completed storage factory configuration.
30 storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
31 storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
32 storageFactory, lastErr = storageFactoryConfig.Complete(s.Etcd).New()
33 if lastErr != nil {
34 return
35 }
36
37 // ApplyWithStorageFactoryTo mutates the provided server.Config. It must never mutate the receiver (EtcdOptions).
38 if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
39 return
40 }
41
42 // Authentication.ApplyTo requires already applied OpenAPIConfig and EgressSelector if present
43 if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, genericConfig.OpenAPIV3Config, clientgoExternalClient, versionedInformers); lastErr != nil {
44 return
45 }
46
47 // BuildAuthorizer constructs the authorizer
48 genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
49 if err != nil {
50 lastErr = fmt.Errorf("invalid authorization config: %v", err)
51 return
52 }
53
54 ...
55
56 return
57}
通用配置内创建了一系列配置,概括如下。

这里不继续往下探各个配置的详细信息,后续需要再回头看。配置文件创建好后,返回 Run() 函数看服务链的创建。
创建服务链
进入 CreateServerChain 查看服务链。
1func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregator, error) {
2 // New returns an HTTP handler that is meant to be executed at the end of the delegation chain.
3 // It checks if the request have been made before the server has installed all known HTTP paths.
4 // In that case it returns a 503 response otherwise it returns a 404.
5 notFoundHandler := notfoundhandler.New(config.ControlPlane.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
6
7 // New returns a new instance of CustomResourceDefinitions from the given config.
8 apiExtensionsServer, err := config.ApiExtensions.New(genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
9 if err != nil {
10 return nil, err
11 }
12 crdAPIEnabled := config.ApiExtensions.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"))
13
14 kubeAPIServer, err := config.ControlPlane.New(apiExtensionsServer.GenericAPIServer)
15 if err != nil {
16 return nil, err
17 }
18
19 // aggregator comes last in the chain
20 aggregatorServer, err := createAggregatorServer(config.Aggregator, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled)
21 if err != nil {
22 // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
23 return nil, err
24 }
25
26 return aggregatorServer, nil
27}
服务链中创建三种 HTTP Server,这里还是介绍 apiExtensionsServer 服务。
首先将 notFoundHandler handler 赋给 apiExtensionsServer,当 REST 路由不到指定 API 时会路由到 notFoundHandler 处理请求。
进入 config.ApiExtensions.New。
1func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
2 // 创建通用 APIServer
3 genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
4 if err != nil {
5 return nil, err
6 }
7
8 // 实例化 APIExtensions server: CustomResourceDefinitions
9 s := &CustomResourceDefinitions{
10 GenericAPIServer: genericServer,
11 }
12
13 // 创建 apiGroupInfo,通过 apiGroupInfo 建立 REST API 到资源实例的路由
14 apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
15 storage := map[string]rest.Storage{}
16 // customresourcedefinitions
17 if resource := "customresourcedefinitions"; apiResourceConfig.ResourceEnabled(v1.SchemeGroupVersion.WithResource(resource)) {
18 // 实例化 REST 资源:customResourceDefinitionStorage
19 customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
20 if err != nil {
21 return nil, err
22 }
23 storage[resource] = customResourceDefinitionStorage
24 storage[resource+"/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
25 }
26 if len(storage) > 0 {
27 // 建立 version 到 REST 资源的 mapping
28 apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage
29 }
30
31 // 通过 apiGroupInfo 建立 REST API 到资源实例的路由
32 if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
33 return nil, err
34 }
35
36 ...
37}
config.ApiExtensions.New 中定义的流程如注释所示。下面逐层展开各个流程。
创建通用 APIServer
进入 c.GenericConfig.New 查看通用 APIServer 创建过程。
1func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
2 // 创建通用 APIServer 的 handler
3 apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
4
5 // 实例化通用 APIServer 并将前面创建的 apiServerHandler 赋给通用 APIServer
6 s := &GenericAPIServer{
7 Handler: apiServerHandler,
8 listedPathProvider: apiServerHandler,
9 }
10
11 // 建立通用 REST 路由
12 installAPI(s, c.Config)
13
14 return s, nil
15}
首先,进入 NewAPIServerHandler 查看通用 APIServer handler 的创建。
1func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
2 // 建立非 REST 资源的路由
3 nonGoRestfulMux := mux.NewPathRecorderMux(name)
4 if notFoundHandler != nil {
5 nonGoRestfulMux.NotFoundHandler(notFoundHandler)
6 }
7
8 // 创建 go-restful container 处理 REST 资源的路由
9 gorestfulContainer := restful.NewContainer()
10 gorestfulContainer.ServeMux = http.NewServeMux()
11 gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
12 gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
13 logStackOnRecover(s, panicReason, httpWriter)
14 })
15 gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
16 serviceErrorHandler(s, serviceErr, request, response)
17 })
18
19 // 将两种路由 REST 资源路由和 非 REST 资源路由赋给 director
20 director := director{
21 name: name,
22 goRestfulContainer: gorestfulContainer,
23 nonGoRestfulMux: nonGoRestfulMux,
24 }
25
26 // 返回 APIServerHandler
27 return &APIServerHandler{
28 FullHandlerChain: handlerChainBuilder(director),
29 GoRestfulContainer: gorestfulContainer,
30 NonGoRestfulMux: nonGoRestfulMux,
31 Director: director,
32 }
33}
kube-apiserver 基于 go-restful 框架建立 RESTful API 的路由。
创建好 apiServerHandler 后将该 handler 赋给通用 APIServer GenericAPIServer。接着进入 installAPI(s, c.Config) 看通用 REST 路由的创建过程。
1func installAPI(s *GenericAPIServer, c *Config) {
2 ...
3 routes.Version{Version: c.Version}.Install(s.Handler.GoRestfulContainer)
4}
这里看一种 REST 路由 version 的建立过程,进入 Version.Install。
1// Install registers the APIServer's `/version` handler.
2func (v Version) Install(c *restful.Container) {
3 if v.Version == nil {
4 return
5 }
6
7 // Set up a service to return the git code version.
8 versionWS := new(restful.WebService)
9 versionWS.Path("/version")
10 versionWS.Doc("git code version from which this is built")
11 versionWS.Route(
12 versionWS.GET("/").To(v.handleVersion).
13 Doc("get the code version").
14 Operation("getCodeVersion").
15 Produces(restful.MIME_JSON).
16 Consumes(restful.MIME_JSON).
17 Writes(version.Info{}))
18
19 c.Add(versionWS)
20}
21
22// handleVersion writes the server's version information.
23func (v Version) handleVersion(req *restful.Request, resp *restful.Response) {
24 responsewriters.WriteRawJSON(http.StatusOK, *v.Version, resp.ResponseWriter)
25}
可以看到,在 Install 内建立了 /version 到 v.handleVersion 的路由。
下一节,将继续介绍创建服务链的流程。未完待续…