前言
在看 kube-scheduler 组件的过程中遇到了 kube-scheduler 对于 client-go 的调用,泛泛的理解调用过程总有种隔靴搔痒的感觉,于是调转头先把 client-go 理清楚在回来看 kube-scheduler。
为什么要看 client-go,并且要深入到原理,源码层面去看。很简单,因为它很重要。重要在两方面:
kubernetes组件通过client-go和kube-apiserver交互。client-go简单,易用,大部分基于Kubernetes做二次开发的应用,在和kube-apiserver交互时会使用client-go。
当然,不仅在于使用,理解层面,对于我们学习代码开发,架构等也有帮助。
client-go 客户端对象
client-go 支持四种客户端对象,分别是 RESTClient,ClientSet,DynamicClient 和 DiscoveryClient:

组件或者二次开发的应用可以通过这四种客户端对象和 kube-apiserver 交互。其中,RESTClient 是最基础的客户端对象,它封装了 HTTP Request,实现了 RESTful 风格的 API。ClientSet 基于 RESTClient,封装了对于 Resource 和 Version 的请求方法。DynamicClient 相比于 ClientSet 提供了全资源,包括自定义资源的请求方法。DiscoveryClient 用于发现 kube-apiserver 支持的资源组,资源版本和资源信息。
每种客户端适用的场景不同,主要是对 HTTP Request 做了层层封装,具体的代码实现可参考 client-go 客户端。
informer 机制
仅仅封装 HTTP Request 是不够的,组件通过 client-go 和 kube-apiserver 交互,必然对实时性,可靠性等有很高要求。试想,如果 ETCD 中存储的数据和组件通过 client-go 从 ETCD 获取的数据不匹配的话,那将会是一个非常严重的问题。
如何实现 client-go 的实时性,可靠性?client-go 给出的答案是:informer 机制。

client-go informer 流程图
informer 机制的核心组件包括:
Reflector: 主要负责两类任务:- 通过
client-go客户端对象 listkube-apiserver资源,并且 watchkube-apiserver资源变更。 - 作为生产者,将获取的资源放入
Delta FIFO队列。
- 通过
Informer: 主要负责三类任务:- 作为消费者,将
Reflector放入队列的资源拿出来。 - 将资源交给
indexer组件。 - 交给
indexer组件之后触发回调函数,处理回调事件。
- 作为消费者,将
Indexer:indexer组件负责将资源信息存入到本地内存数据库(实际是map对象),该数据库作为缓存存在,其资源信息和ETCD中的资源信息完全一致(得益于watch机制)。因此,client-go可以从本地indexer中读取相应的资源,而不用每次都从kube-apiserver中获取资源信息。这也实现了client-go对于实时性的要求。
接下来从源码角度看各个组件的处理流程,力图做到知其然,知其所以然。
informer 源码分析
直接阅读 informer 源码是非常晦涩难懂的,这里通过 informer 的代码示例开始学习:
1package main
2
3import (
4 "log"
5 "time"
6
7 v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
8 "k8s.io/client-go/informers"
9 "k8s.io/client-go/kubernetes"
10 "k8s.io/client-go/tools/cache"
11 "k8s.io/client-go/tools/clientcmd"
12)
13
14func main() {
15 // 解析 kubeconfig
16 config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
17 if err != nil {
18 panic(err)
19 }
20
21 // 创建 ClientSet 客户端对象
22 clientset, err := kubernetes.NewForConfig(config)
23 if err != nil {
24 panic(err)
25 }
26
27 stopCh := make(chan struct{})
28 defer close(stopCh)
29
30 // 创建 sharedInformers
31 sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)
32 // 创建 informer
33 informer := sharedInformers.Core().V1().Pods().Informer()
34
35 // 创建 Event 回调 handler
36 informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
37 AddFunc: func(obj interface{}) {
38 mObj := obj.(v1.Object)
39 log.Printf("New Pod Added to Store: %s", mObj.GetName())
40 },
41 UpdateFunc: func(oldObj, newObj interface{}) {
42 oObj := oldObj.(v1.Object)
43 nObj := newObj.(v1.Object)
44 log.Printf("%s Pod Updated to %s", oObj.GetName(), nObj.GetName())
45 },
46 DeleteFunc: func(obj interface{}) {
47 mObj := obj.(v1.Object)
48 log.Printf("Pod Deleted from Store: %s", mObj.GetName())
49 },
50 })
51
52 // 运行 informer
53 informer.Run(stopCh)
54}
执行结果如下:
1# go run informer.go
22023/12/14 12:00:26 New Pod Added to Store: prometheus-alertmanager-0
32023/12/14 12:01:26 prometheus-alertmanager-0 Pod Updated to prometheus-alertmanager-0
上述代码示例分为三部分:创建 informer,创建 informer 的 EventHandler,运行 informer。下面,通过这三部分流程介绍 client-go 的核心组件。
创建 informer
创建 informer 分为两步。
1)创建工厂 sharedInformerFactory
1// sharedInformers factory
2sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)
3
4// client-go/informers/factory.go
5func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
6 return NewSharedInformerFactoryWithOptions(client, defaultResync)
7}
8
9func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
10 factory := &sharedInformerFactory{
11 client: client,
12 namespace: v1.NamespaceAll,
13 defaultResync: defaultResync,
14 informers: make(map[reflect.Type]cache.SharedIndexInformer),
15 startedInformers: make(map[reflect.Type]bool),
16 customResync: make(map[reflect.Type]time.Duration),
17 }
18
19 // Apply all options
20 for _, opt := range options {
21 factory = opt(factory)
22 }
23
24 return factory
25}
sharedInformerFactory 实现了 SharedInformerFactory 接口,该工厂负责创建 informer。
2)创建 informer
1// 创建 informer
2informer := sharedInformers.Core().V1().Pods().Informer()
3
4// 调用 Core 方法
5func (f *sharedInformerFactory) Core() core.Interface {
6 return core.New(f, f.namespace, f.tweakListOptions)
7}
8
9func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
10 return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
11}
12
13// 调用 V1 方法
14func (g *group) V1() v1.Interface {
15 return v1.New(g.factory, g.namespace, g.tweakListOptions)
16}
17
18func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
19 return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
20}
21
22// 调用 Pods 方法
23func (v *version) Pods() PodInformer {
24 return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
25}
经过层层构建创建 podInformer 对象,该对象实现了 PodInformer 接口,调用接口的 Informer 方法创建 informer 对象:
1func (f *podInformer) Informer() cache.SharedIndexInformer {
2 return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
3}
podInformer.Informer 实际调用的是 sharedInformerFactory.InformerFor:
1func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
2 f.lock.Lock()
3 defer f.lock.Unlock()
4
5 // 反射出资源对象 obj 的 type
6 informerType := reflect.TypeOf(obj)
7
8 // 读取并判断资源对象的 informer
9 informer, exists := f.informers[informerType]
10 if exists {
11 return informer
12 }
13
14 ...
15
16 // 调用 newFunc 创建 informer
17 informer = newFunc(f.client, resyncPeriod)
18
19 // 将 type:informer 加入到 factory 的 informers 中
20 f.informers[informerType] = informer
21
22 return informer
23}
从 InformerFor 方法可以看出,sharedInformerFactory 的 share 体现在同一个资源类型共享 informer。
这么设计在于,每个 informer 包括一个 Reflector,Reflector 通过访问 kube-apiserver 实现 ListAndWatch 操作。共享 informer 实际是共享 Reflector,这种共享机制将减少 Reflector 对于 kube-apiserver 的访问,降低 kube-apiserver 的负载,节约资源。
继续看,创建 informer 的 newFunc 函数做了什么:
1informer = newFunc(f.client, resyncPeriod)
2
3// client-go/informers/core/v1/pod.go
4func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
5 return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
6}
7
8func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
9 return cache.NewSharedIndexInformer(
10 &cache.ListWatch{
11 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
12 if tweakListOptions != nil {
13 tweakListOptions(&options)
14 }
15 return client.CoreV1().Pods(namespace).List(context.TODO(), options)
16 },
17 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
18 if tweakListOptions != nil {
19 tweakListOptions(&options)
20 }
21 return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
22 },
23 },
24 &corev1.Pod{},
25 resyncPeriod,
26 indexers,
27 )
28}
newFunc 实际调用的是 NewFilteredPodInformer 函数,在函数内创建 cache.ListAndWatch 对象,对象中包括 ListFunc 和 WatchFunc 回调函数,回调函数内调用 ClientSet 实现 list 和 watch 资源对象。
继续看 cache.NewSharedIndexInformer:
1// client-go/tools/cache/shared_informer.go
2func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
3 return NewSharedIndexInformerWithOptions(
4 lw,
5 exampleObject,
6 SharedIndexInformerOptions{
7 ResyncPeriod: defaultEventHandlerResyncPeriod,
8 Indexers: indexers,
9 },
10 )
11}
12
13func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer {
14 realClock := &clock.RealClock{}
15
16 return &sharedIndexInformer{
17 indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers),
18 processor: &sharedProcessor{clock: realClock},
19 listerWatcher: lw,
20 objectType: exampleObject,
21 objectDescription: options.ObjectDescription,
22 resyncCheckPeriod: options.ResyncPeriod,
23 defaultEventHandlerResyncPeriod: options.ResyncPeriod,
24 clock: realClock,
25 cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
26 }
27}
在 NewSharedIndexInformerWithOptions 函数内创建 informer sharedIndexInformer。可以看到,sharedIndexInformer 内包括了 indexer 核心组件。
informer 创建完成。接下来为 informer 添加回调函数 EventHandler。
创建 EventHandler
代码实现如下:
1informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
2 AddFunc: func(obj interface{}) {
3 mObj := obj.(v1.Object)
4 log.Printf("New Pod Added to Store: %s", mObj.GetName())
5 },
6 UpdateFunc: func(oldObj, newObj interface{}) {
7 oObj := oldObj.(v1.Object)
8 nObj := newObj.(v1.Object)
9 log.Printf("%s Pod Updated to %s", oObj.GetName(), nObj.GetName())
10 },
11 DeleteFunc: func(obj interface{}) {
12 mObj := obj.(v1.Object)
13 log.Printf("Pod Deleted from Store: %s", mObj.GetName())
14 },
15})
创建 EventHandler 的 handler 中包括三种回调函数:AddFunc,UpdateFunc 和 DeleteFunc,三种回调函数分别在资源有增加,变更,删除时触发。
在 sharedIndexInformer.AddEventHandler 内,将 handler 传递给 sharedIndexInformer.AddEventHandlerWithResyncPeriod 方法,该方法主要创建 listener 对象:
1// client-go/tools/cache/shared_informer.go
2func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) {
3 return s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
4}
5
6func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) {
7 ...
8 listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced)
9
10 if !s.started {
11 return s.processor.addListener(listener), nil
12 }
13 ...
14}
15
16// client-go/tools/cache/shared_informer.go
17func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener {
18 ret := &processorListener{
19 nextCh: make(chan interface{}),
20 addCh: make(chan interface{}),
21 handler: handler,
22 syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: hasSynced},
23 pendingNotifications: *buffer.NewRingGrowing(bufferSize),
24 requestedResyncPeriod: requestedResyncPeriod,
25 resyncPeriod: resyncPeriod,
26 }
27
28 ret.determineNextResync(now)
29
30 return ret
31}
32
33func (p *sharedProcessor) addListener(listener *processorListener) ResourceEventHandlerRegistration {
34 ...
35
36 p.listeners[listener] = true
37 ...
38
39 return listener
40}
listener 对象包含通道 addCh 和 nextCh,以及 handler 等对象。最后将 listener 存入 sharedIndexInformer.sharedProcessor 中。
创建完 informer 的 EventHandler,接下来该运行 informer 了。