上接 Kubernetes: client-go 源码剖析(一)

运行 informer

运行 informerReflectorinformerindexer 组件关联以实现 informer 流程图的流程。

Reflector List&Watch

运行 informer

 1informer.Run(stopCh)
 2
 3// client-go/tools/cache/shared_informer.go
 4func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
 5    func() {
 6		...
 7        // 创建 DeltaFIFO 队列
 8		fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
 9			KnownObjects:          s.indexer,
10			EmitDeltaTypeReplaced: true,
11			Transformer:           s.transform,
12		})
13
14		cfg := &Config{
15			Queue:             fifo,
16			ListerWatcher:     s.listerWatcher,
17			ObjectType:        s.objectType,
18			ObjectDescription: s.objectDescription,
19			FullResyncPeriod:  s.resyncCheckPeriod,
20			RetryOnError:      false,
21			ShouldResync:      s.processor.shouldResync,
22			Process:           s.HandleDeltas,
23			WatchErrorHandler: s.watchErrorHandler,
24		}
25
26        // 根据 Config 创建 informer 的 controller
27		s.controller = New(cfg)
28		s.controller.(*controller).clock = s.clock
29		s.started = true
30	}()
31
32    ...
33    // goroutine 运行 processor
34	wg.StartWithChannel(processorStopCh, s.processor.run)
35
36	...
37    // 运行 controller
38	s.controller.Run(stopCh)
39}

首先,创建队列 Delta FIFO

 1func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
 2	...
 3	func() {
 4		fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
 5			KnownObjects:          s.indexer,
 6			EmitDeltaTypeReplaced: true,
 7			Transformer:           s.transform,
 8		})
 9	}()
10}
11
12func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
13	...
14	f := &DeltaFIFO{
15		items:        map[string]Deltas{},
16		queue:        []string{},
17		keyFunc:      opts.KeyFunction,
18		knownObjects: opts.KnownObjects,
19
20		emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
21		transformer:           opts.Transformer,
22	}
23	f.cond.L = &f.lock
24	return f
25}

该队列中存储的是 Delta 资源对象,其存储结构为:

client-go Delta FIFO

为什么队列要设计成这个样子?因为 informer 在读取队列时,根据 items 的 action type 调用对应 EventHandler 的回调函数。

接下来,实例化 informercontroller 对象,并且调用 controller.Run 运行 controller

 1func (c *controller) Run(stopCh <-chan struct{}) {
 2	...
 3	r := NewReflectorWithOptions(
 4		c.config.ListerWatcher,
 5		c.config.ObjectType,
 6		c.config.Queue,
 7		ReflectorOptions{
 8			ResyncPeriod:    c.config.FullResyncPeriod,
 9			TypeDescription: c.config.ObjectDescription,
10			Clock:           c.clock,
11		},
12	)
13
14	...
15	wg.StartWithChannel(stopCh, r.Run)
16
17	wait.Until(c.processLoop, time.Second, stopCh)
18	wg.Wait()
19}

Run 方法中创建 Reflector 核心组件:

 1func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store Store, options ReflectorOptions) *Reflector {
 2	...
 3	// Reflector 中包括 ListerWatcher 对象和 DeltaFIFO 队列
 4	r := &Reflector{
 5		name:            options.Name,
 6		resyncPeriod:    options.ResyncPeriod,
 7		typeDescription: options.TypeDescription,
 8		listerWatcher:   lw,
 9		store:           store,
10		...
11	}
12
13	...
14	return r
15}

继续进入 wg.StartWithChannel 中运行 Reflector.Run

 1func (r *Reflector) Run(stopCh <-chan struct{}) {
 2	wait.BackoffUntil(func() {
 3		// 调用 Reflector 的 ListAndWatch 方法
 4		if err := r.ListAndWatch(stopCh); err != nil {
 5			r.watchErrorHandler(r, err)
 6		}
 7	}, r.backoffManager, true, stopCh)
 8}
 9
10func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
11	...
12	if fallbackToList {
13		err = r.list(stopCh)
14		if err != nil {
15			return err
16		}
17	}
18
19	...
20	return r.watch(w, stopCh, resyncerrc)
21}

在这里我们看到 ReflectorListAndWatch 实现了资源的 list 和 watch 操作。这相当于 informer 流程图的第一步。

具体看 Reflectorlist 方法做了什么:

 1func (r *Reflector) list(stopCh <-chan struct{}) error {
 2	...
 3	go func() {
 4		...
 5		pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
 6			return r.listerWatcher.List(opts)
 7		}))
 8
 9		list, paginatedResult, err = pager.ListWithAlloc(context.Background(), options)
10	}()
11
12	select {
13	case <-stopCh:
14		return nil
15	case r := <-panicCh:
16		panic(r)
17	// 阻塞 list
18	case <-listCh:
19	}
20
21	...
22}

首先,在 goroutine 内调用 pager.ListWithAlloc 获得 list 的资源对象:

 1func (p *ListPager) ListWithAlloc(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) {
 2	return p.list(ctx, options, true)
 3}
 4
 5func (p *ListPager) list(ctx context.Context, options metav1.ListOptions, allocNew bool) (runtime.Object, bool, error) {
 6	...
 7	for {
 8		select {
 9		case <-ctx.Done():
10			return nil, paginatedResult, ctx.Err()
11		default:
12		}
13
14		obj, err := p.PageFn(ctx, options)
15		...
16	}
17
18	...
19}

list 方法内调用 p.PageFn 获得资源对象 obj,调用 p.PageFn 实际调用的是 Reflector.listerWatcher 对象的 List 方法:

 1func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
 2	return lw.ListFunc(options)
 3}
 4
 5func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
 6	return cache.NewSharedIndexInformer(
 7		&cache.ListWatch{
 8			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
 9				if tweakListOptions != nil {
10					tweakListOptions(&options)
11				}
12				return client.CoreV1().Pods(namespace).List(context.TODO(), options)
13			},
14		...
15		}
16	)
17}

可以看到,这里将 Reflector 和前面的 ListFunc 回调函数关联上了,实际通过 ClientSet 客户端对象 list kube-apiserver 的资源。

Reflector Add Object

client-go informer 流程的第一步实现了,那么第二步在哪呢?带着这个问题,我们继续看 Reflector.list 方法:

 1func (r *Reflector) list(stopCh <-chan struct{}) error {
 2	...
 3
 4	// 通过反射读取 list 的 meta field
 5	listMetaInterface, err := meta.ListAccessor(list)
 6	if err != nil {
 7		return fmt.Errorf("unable to understand list result %#v: %v", list, err)
 8	}
 9
10	// 读取 resource version
11	resourceVersion = listMetaInterface.GetResourceVersion()
12	items, err := meta.ExtractListWithAlloc(list)
13
14	if err := r.syncWith(items, resourceVersion); err != nil {
15		return fmt.Errorf("unable to sync list result: %v", err)
16	}
17
18	return nil
19}

重点看 Reflector.syncWith 方法:

 1func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
 2	found := make([]interface{}, 0, len(items))
 3	for _, item := range items {
 4		found = append(found, item)
 5	}
 6	return r.store.Replace(found, resourceVersion)
 7}
 8
 9func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
10	...
11	for _, item := range list {
12		key, err := f.KeyOf(item)
13		if err != nil {
14			return KeyError{item, err}
15		}
16		keys.Insert(key)
17		if err := f.queueActionLocked(action, item); err != nil {
18			return fmt.Errorf("couldn't enqueue object: %v", err)
19		}
20	}
21
22	...
23	return nil
24}

Reflector.syncWith 调用 Reflector.DeltaFIFOReplace 方法将 list 的资源对象添加到 DeltaFIFO 队列中,实现 informer 流程的第二步。

watch 资源和 list 资源的流程类似,这里就不过多介绍了。

informer Pop Object

Reflector 作为生产者将 list&watch 的资源添加到 Delta FIFO 队列,那么消费者在哪里使用 Delta FIFO 的资源呢?

client-gocontroller.Run 中的 controller.processLoop 处理 Delta FIFO 的资源:

 1wait.Until(c.processLoop, time.Second, stopCh)
 2
 3// client-go/tools/cache/controller.go
 4func (c *controller) processLoop() {
 5	for {
 6		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
 7		if err != nil {
 8			if err == ErrFIFOClosed {
 9				return
10			}
11			if c.config.RetryOnError {
12				// This is the safe way to re-enqueue.
13				c.config.Queue.AddIfNotPresent(obj)
14			}
15		}
16	}
17}

controller.processLoop 会轮询队列中的资源,当队列中有资源加入时 Pop 资源:

 1func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
 2    for {
 3		for len(f.queue) == 0 {
 4			...
 5
 6            // 当队列无资源的时候协程阻塞
 7			f.cond.Wait()
 8		}
 9
10        id := f.queue[0]
11		f.queue = f.queue[1:]
12		depth := len(f.queue)
13	    ...
14		item, ok := f.items[id]
15		...
16		delete(f.items, id)
17        ...
18        err := process(item, isInInitialList)
19        ...
20        return item, err
21    }
22}

DeltaFIFO.Pop 会循环读取队列中的资源,当队列无资源时进入阻塞状态。如果队列中有资源,每次读取队列的首元素,删除队列中读取的首元素,然后调用回调函数 PopProcessFunc 处理读取的首元素:

 1err := process(item, isInInitialList)
 2
 3// client-go/tools/cache/shared_informer.go
 4func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {
 5	s.blockDeltas.Lock()
 6	defer s.blockDeltas.Unlock()
 7
 8	if deltas, ok := obj.(Deltas); ok {
 9		return processDeltas(s, s.indexer, deltas, isInInitialList)
10	}
11	return errors.New("object given as Process argument is not Deltas")
12}

调用回调函数 PopProcessFunc 实际调用的是 sharedIndexInformer.HandleDeltas 方法,在该方法内处理从队列读取到的资源。

至此,实现了 informer 流程图的第三步。

informer Add and Store Object

继续看 sharedIndexInformer.HandleDeltas 的函数 processDeltas

 1func processDeltas(handler ResourceEventHandler, clientState Store, deltas Deltas, isInInitialList bool,) error {
 2	for _, d := range deltas {
 3		obj := d.Object
 4
 5		switch d.Type {
 6		case Sync, Replaced, Added, Updated:
 7			if old, exists, err := clientState.Get(obj); err == nil && exists {
 8				if err := clientState.Update(obj); err != nil {
 9					return err
10				}
11				handler.OnUpdate(old, obj)
12			} else {
13				if err := clientState.Add(obj); err != nil {
14					return err
15				}
16				handler.OnAdd(obj, isInInitialList)
17			}
18		case Deleted:
19			if err := clientState.Delete(obj); err != nil {
20				return err
21			}
22			handler.OnDelete(obj)
23		}
24	}
25	return nil
26}

processDeltas 中根据不同的 Delta Type 执行不同的 case。我们以 Added type 为例查看处理流程。

首先,clientState.Get 从本地 indexer 存储中读取资源,并判断资源是否存在:

 1// clientState.Get get 资源 obj
 2func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {
 3	...
 4	return c.GetByKey(key)
 5}
 6
 7func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) {
 8	item, exists = c.cacheStorage.Get(key)
 9	return item, exists, nil
10}
11
12// 从 index 中读取资源
13func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
14	c.lock.RLock()
15	defer c.lock.RUnlock()
16	item, exists = c.items[key]
17	return item, exists
18}

如果资源不存在,进入 cache.Add:

 1func (c *cache) Add(obj interface{}) error {
 2	...
 3    // 添加资源到 indexer
 4	c.cacheStorage.Add(key, obj)
 5	return nil
 6}
 7
 8func (c *threadSafeMap) Add(key string, obj interface{}) {
 9	c.Update(key, obj)
10}
11
12func (c *threadSafeMap) Update(key string, obj interface{}) {
13	c.lock.Lock()
14	defer c.lock.Unlock()
15	oldObject := c.items[key]
16	c.items[key] = obj
17	c.index.updateIndices(oldObject, obj, key)
18}

clientState.Add 中将队列 Delta FIFO 读取的资源存入 indexer 中。

至此,完成了 informer 流程图的第四步和第五步。

Event Handler

将资源存入 indexer 后继续往下看 sharedIndexInformer.OnAdd 是怎么处理 Pop 出的资源的:

 1func (s *sharedIndexInformer) OnAdd(obj interface{}, isInInitialList bool) {
 2	...
 3	s.processor.distribute(addNotification{newObj: obj, isInInitialList: isInInitialList}, false)
 4}
 5
 6func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
 7	p.listenersLock.RLock()
 8	defer p.listenersLock.RUnlock()
 9
10	for listener, isSyncing := range p.listeners {
11		switch {
12		case !sync:
13			// non-sync messages are delivered to every listener
14			listener.add(obj)
15		case isSyncing:
16			// sync messages are delivered to every syncing listener
17			listener.add(obj)
18		default:
19			// skipping a sync obj for a non-syncing listener
20		}
21	}
22}
23
24func (p *processorListener) add(notification interface{}) {
25	if a, ok := notification.(addNotification); ok && a.isInInitialList {
26		p.syncTracker.Start()
27	}
28	p.addCh <- notification
29}

可以看到,Pop 的资源被加入 processorListener.addCh 通道。

那么,通道的另一端是哪里在处理呢?

答案在 sharedIndexInformer.Run 中的 sharedProcessor.run:

 1func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
 2    ...
 3    wg.StartWithChannel(processorStopCh, s.processor.run)
 4    ...
 5}
 6
 7func (p *sharedProcessor) run(stopCh <-chan struct{}) {
 8	func() {
 9		p.listenersLock.RLock()
10		defer p.listenersLock.RUnlock()
11		for listener := range p.listeners {
12			p.wg.Start(listener.run)
13			p.wg.Start(listener.pop)
14		}
15		p.listenersStarted = true
16	}()
17	<-stopCh
18	...
19}

sharedProcessor.run 方法中开启两个协程分别执行 listener.runlistener.pop 方法。

我们先看 listener.run 方法:

 1func (p *processorListener) run() {
 2	stopCh := make(chan struct{})
 3	wait.Until(func() {
 4		for next := range p.nextCh {
 5			switch notification := next.(type) {
 6			case updateNotification:
 7				p.handler.OnUpdate(notification.oldObj, notification.newObj)
 8			case addNotification:
 9				p.handler.OnAdd(notification.newObj, notification.isInInitialList)
10				if notification.isInInitialList {
11					p.syncTracker.Finished()
12				}
13			case deleteNotification:
14				p.handler.OnDelete(notification.oldObj)
15			default:
16				utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
17			}
18		}
19		close(stopCh)
20	}, 1*time.Second, stopCh)
21}

listener.runprocessorListener.nextCh 通道中读取资源对象,根据资源对象的类型决定执行哪个 case。

前面通道的一端将 addNotification 加入到 processorListeneraddCh 通道 p.addCh <- notification
这里 processorListener 根据 nextCh 通道的资源执行相应的 case。

那么 addChnextCh 的关联在哪里呢?

我们看 processorListener.pop

 1func (p *processorListener) pop() {
 2	defer utilruntime.HandleCrash()
 3	defer close(p.nextCh) // Tell .run() to stop
 4
 5	var nextCh chan<- interface{}
 6	var notification interface{}
 7	for {
 8		select {
 9		case nextCh <- notification:
10			// Notification dispatched
11			var ok bool
12			notification, ok = p.pendingNotifications.ReadOne()
13			if !ok { // Nothing to pop
14				nextCh = nil // Disable this select case
15			}
16		case notificationToAdd, ok := <-p.addCh:
17			if !ok {
18				return
19			}
20			if notification == nil { // No notification to pop (and pendingNotifications is empty)
21				// Optimize the case - skip adding to pendingNotifications
22				notification = notificationToAdd
23				nextCh = p.nextCh
24			} else { // There is already a notification waiting to be dispatched
25				p.pendingNotifications.WriteOne(notificationToAdd)
26			}
27		}
28	}
29}

processorListener.pop 的逻辑比较复杂,这里不过多介绍。重点在通过 nextCh = p.nextChprocessorListener.nextCh 和函数内通道 nextCh 关联,从而实现 processorListener.addCh 通道到 processorListener.nextCh 通道的数据传递。

了解了通道间的数据传递。我们以 ResourceEventHandlerFuncs.OnAdd 为例看 client-go 是怎么调用 EventHandler 的:

 1func (p *processorListener) run() {
 2	...
 3	wait.Until(func() {
 4		for next := range p.nextCh {
 5			switch notification := next.(type) {
 6			case updateNotification:
 7				...
 8			case addNotification:
 9				p.handler.OnAdd(notification.newObj, notification.isInInitialList)
10				...
11			}
12			...
13		}
14	})
15}
16
17func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}, isInInitialList bool) {
18	if r.AddFunc != nil {
19		r.AddFunc(obj)
20	}
21}
22
23func main() {
24	...
25	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
26		AddFunc: func(obj interface{}) {
27			mObj := obj.(v1.Object)
28			log.Printf("New Pod Added to Store: %s", mObj.GetName())
29		},
30		...
31	})
32}

可以看到,最终通过回调函数执行我们定义的 AddFunc handler。

至此,实现了 informer 流程图的第六步。

总结

我们通过两篇文章从源码角度介绍了 client-go 的流程。下面要开始 kube-schduler 的学习了,敬请期待…