1. 介绍

client-go DeltaFIFO 介绍了从 DeltaFIFO 中取出的资源将被放到 indexer ,接着交由 processorListener 处理。

本文继续看 processorListener 是如何处理 DeltaFIFO pop 的资源的。

2. 启动 handler

processorListener 的启动在 sharedIndexInformer.Run() 方法定义:

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
        ...
	// 开启协程运行 sharedIndexInformer.processor.run 方法
	wg.StartWithChannel(processorStopCh, s.processor.run)
        ...
}

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
	func() {
		p.listenersLock.RLock()
		defer p.listenersLock.RUnlock()
		for listener := range p.listeners {
		        // 开启协程运行 processorListener.run
			p.wg.Start(listener.run)
			
			// 开启协程运行 processorListener.pop
			p.wg.Start(listener.pop)
		}
		p.listenersStarted = true
	}()
	<-stopCh
	...
}

sharedProcessor.run 的重点在 processorListener.popprocessorListener.run,分别介绍如下。

2.1 processorListener.pop

func (p *processorListener) pop() {
	defer utilruntime.HandleCrash()
	defer close(p.nextCh) // Tell .run() to stop

	var nextCh chan<- interface{}
	var notification interface{}
	for {
		select {
		case nextCh <- notification:
			var ok bool
			// 读取 processorListener.pendingNotifications 
			notification, ok = p.pendingNotifications.ReadOne()
			if !ok { // Nothing to pop
			        // 表示 processorListener.pendingNotifications 没有资源
				nextCh = nil // Disable this select case
			}
		// DeltaFIFO 将资源信息存入 processorListener.addCh
		case notificationToAdd, ok := <-p.addCh:
			if !ok {
				return
			}
			if notification == nil { // No notification to pop (and pendingNotifications is empty)
			        // 如果 notification 为 nil,表示目前没有资源处理
			        // 将 processorListener.addCh 的资源赋给 notification,下一次处理该资源
				notification = notificationToAdd
				// 绑定 nextCh 和 processorListener.nextCh
				nextCh = p.nextCh
			} else { // There is already a notification waiting to be dispatched
			        // 如果 notification 不为空,表示当前有资源在处理
			        // 将 notificationToAdd 写入 processorListener.pendingNotifications
				p.pendingNotifications.WriteOne(notificationToAdd)
			}
		}
	}
}

这里的 processorListener.pop() 是两个逻辑耦合在一起的。一个是从通道 addCh 中将资源转入 nextCh 通道。另一个是如果 nextCh 的资源未能被及时处理,则将资源写入 processorListener.pendingNotifications,以等待 nextCh 空闲时处理。

为什么这样处理是 nextCh 和 addCh 的处理速度不一致。nextCh 的处理要慢于 addCh,所以需要将资源写入 pendingNotifications 做为缓冲。

processorListener.pendingNotifications 是一个 ringBuffer 循环队列,它不是并发安全的,不同于普通循环队列,它的长度是可扩容的。这是相比于通道的好处,通道的长度是固定的(但是通道是并发安全的)。

2.2 processorListener.run

继续看 processorListener.run 是如何处理资源的。

func (p *processorListener) run() {
	stopCh := make(chan struct{})
	wait.Until(func() {
	        // 从 processorListener.nextCh 通道中获取资源
		for next := range p.nextCh {
		        // 获取资源类型
			switch notification := next.(type) {
			case updateNotification:
			        // 处理 updateNotification 类型资源
				p.handler.OnUpdate(notification.oldObj, notification.newObj)
			case addNotification:
			        // 处理 addNotification 类型资源
				p.handler.OnAdd(notification.newObj, notification.isInInitialList)
				if notification.isInInitialList {
					p.syncTracker.Finished()
				}
			case deleteNotification:
			        // 处理 deleteNotification 类型资源
				p.handler.OnDelete(notification.oldObj)
			default:
				utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
			}
		}
		// the only way to get here is if the p.nextCh is empty and closed
		close(stopCh)
	}, 1*time.Second, stopCh)
}

processorListener.run 根据不同资源类型调用 processorListener.handler 做相应的处理。

这里以 processorListener.handler.OnAdd() 方法为例看 processorListener.handler 对资源做了什么。

// processorListener.handler.OnAdd() 实际调用的是 ResourceEventHandlerFuncs.OnAdd() 方法
func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}, isInInitialList bool) {
	if r.AddFunc != nil {
		r.AddFunc(obj)
	}
}

ResourceEventHandlerFuncs.OnAdd() 实际执行的是 ResourceEventHandlerFuncs.AddFunc() 该方法是我们在创建 informer 时注入的:

func main() {
	// 解析 kubeconfig
	config, err := clientcmd.BuildConfigFromFlags("", "/Users/hxia/.kube/config")
	if err != nil {
		panic(err)
	}

	// 创建 ClientSet 客户端对象
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err)
	}

	stopCh := make(chan struct{})
	defer close(stopCh)

	// 创建 sharedInformers
	sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)
	// 创建 informer
	informer := sharedInformers.Core().V1().Pods().Informer()

	// 创建 Event 回调 handler
	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
	    // 注入 AddFunc 到 informer
	    // ResourceEventHandlerFuncs.AddFunc() 调用的 AddFunc 实际执行的是这里
		AddFunc: func(obj interface{}) {
			mObj := obj.(v1.Object)
			log.Printf("New Pod Added to Store: %s", mObj.GetName())
		},
	})

	// 运行 informer
	informer.Run(stopCh)
}

到此我们可以看到 nextCh 的资源将根据资源类型执行不同的 event handler 事件。通过该事件将资源交由业务逻辑处理。

3. 小结

通过两个通道实现资源的传递,这一结构有点类似于 Reflector 和 controller。

Reflector 负责 list&watch APIServer 的资源,并将资源放入 DeltaFIFO 中,controller 负责将 DeltaFIFO 中的资源拿出来存入 indexer 和 processorListener.addCh。

类似的,processorListener.pop 负责将 processorListener.addCh 中的资源写入到 processorListener.nextCh,如果processorListener.nextCh 来不及处理,则将资源写入到 processorListener.pendingNotifications 队列。processorListener.run 负责将 processorListener.nextCh 中的 资源交由不同的 event handler 处理。

通过 processorListener.addCh 和 processorListener.nextCh 将资源的处理过程解耦,使得协程的职责更清晰。

我们可以看到模块之间可以通过通道/队列解耦,实现不同模块各司其职,较少耦合性。

除了用通道/队列接偶,模块之间解耦还有什么方式吗?比如:

  • 定义接口和实现,模块之间依赖接口而不是实现。
  • 通过消息总线/队列,模块之间只需要接/发消息到消息总线/队列,而不用关心对端的逻辑。
  • 通过依赖注入,只需实现自己的逻辑,依赖可以通过注入解决,可用于业务逻辑和控制逻辑解耦。
  • 通过代理模式或中间层解耦两个模块,模块只需要和代理或中间层通信,不需要管对方的处理逻辑,减少了耦合。
  • 通过分层架构,在架构层面解耦。不同层负责处理不同事情,减少依赖。