上接 Kubernetes: client-go 源码剖析(一)
运行 informer
运行 informer 将 Reflector,informer 和 indexer 组件关联以实现 informer 流程图的流程。
Reflector List&Watch
运行 informer:
1informer.Run(stopCh)
2
3// client-go/tools/cache/shared_informer.go
4func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
5 func() {
6 ...
7 // 创建 DeltaFIFO 队列
8 fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
9 KnownObjects: s.indexer,
10 EmitDeltaTypeReplaced: true,
11 Transformer: s.transform,
12 })
13
14 cfg := &Config{
15 Queue: fifo,
16 ListerWatcher: s.listerWatcher,
17 ObjectType: s.objectType,
18 ObjectDescription: s.objectDescription,
19 FullResyncPeriod: s.resyncCheckPeriod,
20 RetryOnError: false,
21 ShouldResync: s.processor.shouldResync,
22 Process: s.HandleDeltas,
23 WatchErrorHandler: s.watchErrorHandler,
24 }
25
26 // 根据 Config 创建 informer 的 controller
27 s.controller = New(cfg)
28 s.controller.(*controller).clock = s.clock
29 s.started = true
30 }()
31
32 ...
33 // goroutine 运行 processor
34 wg.StartWithChannel(processorStopCh, s.processor.run)
35
36 ...
37 // 运行 controller
38 s.controller.Run(stopCh)
39}
首先,创建队列 Delta FIFO:
1func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
2 ...
3 func() {
4 fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
5 KnownObjects: s.indexer,
6 EmitDeltaTypeReplaced: true,
7 Transformer: s.transform,
8 })
9 }()
10}
11
12func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
13 ...
14 f := &DeltaFIFO{
15 items: map[string]Deltas{},
16 queue: []string{},
17 keyFunc: opts.KeyFunction,
18 knownObjects: opts.KnownObjects,
19
20 emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
21 transformer: opts.Transformer,
22 }
23 f.cond.L = &f.lock
24 return f
25}
该队列中存储的是 Delta 资源对象,其存储结构为:

为什么队列要设计成这个样子?因为 informer 在读取队列时,根据 items 的 action type 调用对应 EventHandler 的回调函数。
接下来,实例化 informer 的 controller 对象,并且调用 controller.Run 运行 controller:
1func (c *controller) Run(stopCh <-chan struct{}) {
2 ...
3 r := NewReflectorWithOptions(
4 c.config.ListerWatcher,
5 c.config.ObjectType,
6 c.config.Queue,
7 ReflectorOptions{
8 ResyncPeriod: c.config.FullResyncPeriod,
9 TypeDescription: c.config.ObjectDescription,
10 Clock: c.clock,
11 },
12 )
13
14 ...
15 wg.StartWithChannel(stopCh, r.Run)
16
17 wait.Until(c.processLoop, time.Second, stopCh)
18 wg.Wait()
19}
在 Run 方法中创建 Reflector 核心组件:
1func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store Store, options ReflectorOptions) *Reflector {
2 ...
3 // Reflector 中包括 ListerWatcher 对象和 DeltaFIFO 队列
4 r := &Reflector{
5 name: options.Name,
6 resyncPeriod: options.ResyncPeriod,
7 typeDescription: options.TypeDescription,
8 listerWatcher: lw,
9 store: store,
10 ...
11 }
12
13 ...
14 return r
15}
继续进入 wg.StartWithChannel 中运行 Reflector.Run:
1func (r *Reflector) Run(stopCh <-chan struct{}) {
2 wait.BackoffUntil(func() {
3 // 调用 Reflector 的 ListAndWatch 方法
4 if err := r.ListAndWatch(stopCh); err != nil {
5 r.watchErrorHandler(r, err)
6 }
7 }, r.backoffManager, true, stopCh)
8}
9
10func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
11 ...
12 if fallbackToList {
13 err = r.list(stopCh)
14 if err != nil {
15 return err
16 }
17 }
18
19 ...
20 return r.watch(w, stopCh, resyncerrc)
21}
在这里我们看到 Reflector 的 ListAndWatch 实现了资源的 list 和 watch 操作。这相当于 informer 流程图的第一步。
具体看 Reflector 的 list 方法做了什么:
1func (r *Reflector) list(stopCh <-chan struct{}) error {
2 ...
3 go func() {
4 ...
5 pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
6 return r.listerWatcher.List(opts)
7 }))
8
9 list, paginatedResult, err = pager.ListWithAlloc(context.Background(), options)
10 }()
11
12 select {
13 case <-stopCh:
14 return nil
15 case r := <-panicCh:
16 panic(r)
17 // 阻塞 list
18 case <-listCh:
19 }
20
21 ...
22}
首先,在 goroutine 内调用 pager.ListWithAlloc 获得 list 的资源对象:
1func (p *ListPager) ListWithAlloc(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) {
2 return p.list(ctx, options, true)
3}
4
5func (p *ListPager) list(ctx context.Context, options metav1.ListOptions, allocNew bool) (runtime.Object, bool, error) {
6 ...
7 for {
8 select {
9 case <-ctx.Done():
10 return nil, paginatedResult, ctx.Err()
11 default:
12 }
13
14 obj, err := p.PageFn(ctx, options)
15 ...
16 }
17
18 ...
19}
list 方法内调用 p.PageFn 获得资源对象 obj,调用 p.PageFn 实际调用的是 Reflector.listerWatcher 对象的 List 方法:
1func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
2 return lw.ListFunc(options)
3}
4
5func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
6 return cache.NewSharedIndexInformer(
7 &cache.ListWatch{
8 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
9 if tweakListOptions != nil {
10 tweakListOptions(&options)
11 }
12 return client.CoreV1().Pods(namespace).List(context.TODO(), options)
13 },
14 ...
15 }
16 )
17}
可以看到,这里将 Reflector 和前面的 ListFunc 回调函数关联上了,实际通过 ClientSet 客户端对象 list kube-apiserver 的资源。
Reflector Add Object
client-go informer 流程的第一步实现了,那么第二步在哪呢?带着这个问题,我们继续看 Reflector.list 方法:
1func (r *Reflector) list(stopCh <-chan struct{}) error {
2 ...
3
4 // 通过反射读取 list 的 meta field
5 listMetaInterface, err := meta.ListAccessor(list)
6 if err != nil {
7 return fmt.Errorf("unable to understand list result %#v: %v", list, err)
8 }
9
10 // 读取 resource version
11 resourceVersion = listMetaInterface.GetResourceVersion()
12 items, err := meta.ExtractListWithAlloc(list)
13
14 if err := r.syncWith(items, resourceVersion); err != nil {
15 return fmt.Errorf("unable to sync list result: %v", err)
16 }
17
18 return nil
19}
重点看 Reflector.syncWith 方法:
1func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
2 found := make([]interface{}, 0, len(items))
3 for _, item := range items {
4 found = append(found, item)
5 }
6 return r.store.Replace(found, resourceVersion)
7}
8
9func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
10 ...
11 for _, item := range list {
12 key, err := f.KeyOf(item)
13 if err != nil {
14 return KeyError{item, err}
15 }
16 keys.Insert(key)
17 if err := f.queueActionLocked(action, item); err != nil {
18 return fmt.Errorf("couldn't enqueue object: %v", err)
19 }
20 }
21
22 ...
23 return nil
24}
Reflector.syncWith 调用 Reflector.DeltaFIFO 的 Replace 方法将 list 的资源对象添加到 DeltaFIFO 队列中,实现 informer 流程的第二步。
watch 资源和 list 资源的流程类似,这里就不过多介绍了。
informer Pop Object
Reflector 作为生产者将 list&watch 的资源添加到 Delta FIFO 队列,那么消费者在哪里使用 Delta FIFO 的资源呢?
client-go 在 controller.Run 中的 controller.processLoop 处理 Delta FIFO 的资源:
1wait.Until(c.processLoop, time.Second, stopCh)
2
3// client-go/tools/cache/controller.go
4func (c *controller) processLoop() {
5 for {
6 obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
7 if err != nil {
8 if err == ErrFIFOClosed {
9 return
10 }
11 if c.config.RetryOnError {
12 // This is the safe way to re-enqueue.
13 c.config.Queue.AddIfNotPresent(obj)
14 }
15 }
16 }
17}
controller.processLoop 会轮询队列中的资源,当队列中有资源加入时 Pop 资源:
1func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
2 for {
3 for len(f.queue) == 0 {
4 ...
5
6 // 当队列无资源的时候协程阻塞
7 f.cond.Wait()
8 }
9
10 id := f.queue[0]
11 f.queue = f.queue[1:]
12 depth := len(f.queue)
13 ...
14 item, ok := f.items[id]
15 ...
16 delete(f.items, id)
17 ...
18 err := process(item, isInInitialList)
19 ...
20 return item, err
21 }
22}
DeltaFIFO.Pop 会循环读取队列中的资源,当队列无资源时进入阻塞状态。如果队列中有资源,每次读取队列的首元素,删除队列中读取的首元素,然后调用回调函数 PopProcessFunc 处理读取的首元素:
1err := process(item, isInInitialList)
2
3// client-go/tools/cache/shared_informer.go
4func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {
5 s.blockDeltas.Lock()
6 defer s.blockDeltas.Unlock()
7
8 if deltas, ok := obj.(Deltas); ok {
9 return processDeltas(s, s.indexer, deltas, isInInitialList)
10 }
11 return errors.New("object given as Process argument is not Deltas")
12}
调用回调函数 PopProcessFunc 实际调用的是 sharedIndexInformer.HandleDeltas 方法,在该方法内处理从队列读取到的资源。
至此,实现了 informer 流程图的第三步。
informer Add and Store Object
继续看 sharedIndexInformer.HandleDeltas 的函数 processDeltas:
1func processDeltas(handler ResourceEventHandler, clientState Store, deltas Deltas, isInInitialList bool,) error {
2 for _, d := range deltas {
3 obj := d.Object
4
5 switch d.Type {
6 case Sync, Replaced, Added, Updated:
7 if old, exists, err := clientState.Get(obj); err == nil && exists {
8 if err := clientState.Update(obj); err != nil {
9 return err
10 }
11 handler.OnUpdate(old, obj)
12 } else {
13 if err := clientState.Add(obj); err != nil {
14 return err
15 }
16 handler.OnAdd(obj, isInInitialList)
17 }
18 case Deleted:
19 if err := clientState.Delete(obj); err != nil {
20 return err
21 }
22 handler.OnDelete(obj)
23 }
24 }
25 return nil
26}
在 processDeltas 中根据不同的 Delta Type 执行不同的 case。我们以 Added type 为例查看处理流程。
首先,clientState.Get 从本地 indexer 存储中读取资源,并判断资源是否存在:
1// clientState.Get get 资源 obj
2func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {
3 ...
4 return c.GetByKey(key)
5}
6
7func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) {
8 item, exists = c.cacheStorage.Get(key)
9 return item, exists, nil
10}
11
12// 从 index 中读取资源
13func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
14 c.lock.RLock()
15 defer c.lock.RUnlock()
16 item, exists = c.items[key]
17 return item, exists
18}
如果资源不存在,进入 cache.Add:
1func (c *cache) Add(obj interface{}) error {
2 ...
3 // 添加资源到 indexer
4 c.cacheStorage.Add(key, obj)
5 return nil
6}
7
8func (c *threadSafeMap) Add(key string, obj interface{}) {
9 c.Update(key, obj)
10}
11
12func (c *threadSafeMap) Update(key string, obj interface{}) {
13 c.lock.Lock()
14 defer c.lock.Unlock()
15 oldObject := c.items[key]
16 c.items[key] = obj
17 c.index.updateIndices(oldObject, obj, key)
18}
在 clientState.Add 中将队列 Delta FIFO 读取的资源存入 indexer 中。
至此,完成了 informer 流程图的第四步和第五步。
Event Handler
将资源存入 indexer 后继续往下看 sharedIndexInformer.OnAdd 是怎么处理 Pop 出的资源的:
1func (s *sharedIndexInformer) OnAdd(obj interface{}, isInInitialList bool) {
2 ...
3 s.processor.distribute(addNotification{newObj: obj, isInInitialList: isInInitialList}, false)
4}
5
6func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
7 p.listenersLock.RLock()
8 defer p.listenersLock.RUnlock()
9
10 for listener, isSyncing := range p.listeners {
11 switch {
12 case !sync:
13 // non-sync messages are delivered to every listener
14 listener.add(obj)
15 case isSyncing:
16 // sync messages are delivered to every syncing listener
17 listener.add(obj)
18 default:
19 // skipping a sync obj for a non-syncing listener
20 }
21 }
22}
23
24func (p *processorListener) add(notification interface{}) {
25 if a, ok := notification.(addNotification); ok && a.isInInitialList {
26 p.syncTracker.Start()
27 }
28 p.addCh <- notification
29}
可以看到,Pop 的资源被加入 processorListener.addCh 通道。
那么,通道的另一端是哪里在处理呢?
答案在 sharedIndexInformer.Run 中的 sharedProcessor.run:
1func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
2 ...
3 wg.StartWithChannel(processorStopCh, s.processor.run)
4 ...
5}
6
7func (p *sharedProcessor) run(stopCh <-chan struct{}) {
8 func() {
9 p.listenersLock.RLock()
10 defer p.listenersLock.RUnlock()
11 for listener := range p.listeners {
12 p.wg.Start(listener.run)
13 p.wg.Start(listener.pop)
14 }
15 p.listenersStarted = true
16 }()
17 <-stopCh
18 ...
19}
在 sharedProcessor.run 方法中开启两个协程分别执行 listener.run 和 listener.pop 方法。
我们先看 listener.run 方法:
1func (p *processorListener) run() {
2 stopCh := make(chan struct{})
3 wait.Until(func() {
4 for next := range p.nextCh {
5 switch notification := next.(type) {
6 case updateNotification:
7 p.handler.OnUpdate(notification.oldObj, notification.newObj)
8 case addNotification:
9 p.handler.OnAdd(notification.newObj, notification.isInInitialList)
10 if notification.isInInitialList {
11 p.syncTracker.Finished()
12 }
13 case deleteNotification:
14 p.handler.OnDelete(notification.oldObj)
15 default:
16 utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
17 }
18 }
19 close(stopCh)
20 }, 1*time.Second, stopCh)
21}
listener.run 从 processorListener.nextCh 通道中读取资源对象,根据资源对象的类型决定执行哪个 case。
前面通道的一端将 addNotification 加入到 processorListener 的 addCh 通道 p.addCh <- notification。
这里 processorListener 根据 nextCh 通道的资源执行相应的 case。
那么 addCh 和 nextCh 的关联在哪里呢?
我们看 processorListener.pop:
1func (p *processorListener) pop() {
2 defer utilruntime.HandleCrash()
3 defer close(p.nextCh) // Tell .run() to stop
4
5 var nextCh chan<- interface{}
6 var notification interface{}
7 for {
8 select {
9 case nextCh <- notification:
10 // Notification dispatched
11 var ok bool
12 notification, ok = p.pendingNotifications.ReadOne()
13 if !ok { // Nothing to pop
14 nextCh = nil // Disable this select case
15 }
16 case notificationToAdd, ok := <-p.addCh:
17 if !ok {
18 return
19 }
20 if notification == nil { // No notification to pop (and pendingNotifications is empty)
21 // Optimize the case - skip adding to pendingNotifications
22 notification = notificationToAdd
23 nextCh = p.nextCh
24 } else { // There is already a notification waiting to be dispatched
25 p.pendingNotifications.WriteOne(notificationToAdd)
26 }
27 }
28 }
29}
processorListener.pop 的逻辑比较复杂,这里不过多介绍。重点在通过 nextCh = p.nextCh 将 processorListener.nextCh 和函数内通道 nextCh 关联,从而实现 processorListener.addCh 通道到 processorListener.nextCh 通道的数据传递。
了解了通道间的数据传递。我们以 ResourceEventHandlerFuncs.OnAdd 为例看 client-go 是怎么调用 EventHandler 的:
1func (p *processorListener) run() {
2 ...
3 wait.Until(func() {
4 for next := range p.nextCh {
5 switch notification := next.(type) {
6 case updateNotification:
7 ...
8 case addNotification:
9 p.handler.OnAdd(notification.newObj, notification.isInInitialList)
10 ...
11 }
12 ...
13 }
14 })
15}
16
17func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}, isInInitialList bool) {
18 if r.AddFunc != nil {
19 r.AddFunc(obj)
20 }
21}
22
23func main() {
24 ...
25 informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
26 AddFunc: func(obj interface{}) {
27 mObj := obj.(v1.Object)
28 log.Printf("New Pod Added to Store: %s", mObj.GetName())
29 },
30 ...
31 })
32}
可以看到,最终通过回调函数执行我们定义的 AddFunc handler。
至此,实现了 informer 流程图的第六步。
总结
我们通过两篇文章从源码角度介绍了 client-go 的流程。下面要开始 kube-schduler 的学习了,敬请期待…