0. 介绍

client-go list&watch 精讲 介绍过 Reflector 通过 list&watch 实时获取 Kubernetes 资源的信息。并且作为生产者将资源信息存到 DeltaFIFO 中。

那么消费者是在哪里定义的呢?本文围绕 DeltaFIFO 继续介绍是哪个模块消费了 DeltaFIFO 中的资源。

1. controller

controller.Run 方法中

func (c *controller) Run(stopCh <-chan struct{}) {
	...
	// 创建 Reflector 对象
	r := NewReflectorWithOptions(
		...
	)
	...

	var wg wait.Group

        // 开启协程运行 Reflector
	wg.StartWithChannel(stopCh, r.Run)

        // 运行 controller.processLoop
	wait.Until(c.processLoop, time.Second, stopCh)
	wg.Wait()
}

controller.processLoop 将作为消费者处理 DeltaFIFO 中的资源:

func (c *controller) processLoop() {
        // 正常状态下 processLoop 是一个永不退出的函数
	for {
	        // 从 DeltaFIFO 中 Pop 资源
	        // 将资源送入 PopProcessFunc 中处理
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
		        // 如果接收到 ErrFIFOClosed:DeltaFIFO: manipulating with closed queue 则退出
			if err == ErrFIFOClosed {
				return
			}
			// 如果接收到 RetryOnError 则将资源重新加入到 DeltaFIFO 中等待下一次重新处理
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
		for len(f.queue) == 0 {
			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
			// When Close() is called, the f.closed is set and the condition is broadcasted.
			// Which causes this loop to continue and return from the Pop().
			if f.closed {
				return nil, ErrFIFOClosed
			}

                        // 如果 DeltaFIFO 中无数据可消费,则调用 f.cond.Wait() 陷入阻塞,等待被唤醒
                        // 前面的调用 f.cond.Brondcast 就是唤醒这里的协程
			f.cond.Wait()
		}
		
		// 取出 DeltaFIFO 的首资源 key,保证资源处理的顺序性
		id := f.queue[0]
		f.queue = f.queue[1:]
		depth := len(f.queue)
		
		// 拿到资源信息
		item, ok := f.items[id]
		if !ok {
			// This should never happen
			klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
			continue
		}
		
		// 删除 DeltaFIFO 中的资源,注意这里删除的是资源,不是资源对应的 key,key 还保留在 DeltaFIFO 中
		delete(f.items, id)
		
		// 处理资源 item
		err := process(item, isInInitialList)
		// 如果资源处理出错,需要将资源重新放回 DeltaFIFO 中
		if e, ok := err.(ErrRequeue); ok {
			f.addIfNotPresent(id, item)
			err = e.Err
		}
		// Don't need to copyDeltas here, because we're transferring
		// ownership to the caller.
		return item, err
	}
}

调用 process(item, isInInitialList) 实际调用的是 sharedIndexInformer.HandleDeltas

func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	if deltas, ok := obj.(Deltas); ok {
	    // 获取资源的 Delta 结构,并交由 processDeltas 处理
		return processDeltas(s, s.indexer, deltas, isInInitialList)
	}
	return errors.New("object given as Process argument is not Deltas")
}

func processDeltas(...) error {
	// from oldest to newest
	// deltas 记录的是对象的顺序更新状态
	for _, d := range deltas {
		obj := d.Object

		switch d.Type {
		// 对于 Sync, Replaced, Added, Updated 状态进入一样的逻辑处理
		case Sync, Replaced, Added, Updated: 
			if old, exists, err := clientState.Get(obj); err == nil && exists {
				if err := clientState.Update(obj); err != nil {
					return err
				}
				handler.OnUpdate(old, obj)
			} else {
				if err := clientState.Add(obj); err != nil {
					return err
				}
				handler.OnAdd(obj, isInInitialList)
			}
		// 处理 Deleted 资源变更状态
		case Deleted:
			if err := clientState.Delete(obj); err != nil {
				return err
			}
			handler.OnDelete(obj)
		}
	}
	return nil
}

processDeltas 主要是调用 clientStatehandler 处理资源。分别看 clientStatehandler 做了什么。

1.1 clientState

以 Add 类型为例,进入 clientState.Add:

// 调用 clientState.Add 实际调用的是 cache.Add 方法
// cache 是 client-go 的缓存实现
func (c *cache) Add(obj interface{}) error {
        // 获取对象对应的 key 信息 
	key, err := c.keyFunc(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	
	// 调用 c.cacheStorage.Add 方法
	c.cacheStorage.Add(key, obj)
	return nil
}

// c.cacheStorage.Add 实际调用的是 threadSafeMap.Add 方法
func (c *threadSafeMap) Add(key string, obj interface{}) {
	c.Update(key, obj)
}

func (c *threadSafeMap) Update(key string, obj interface{}) {
	c.lock.Lock()
	defer c.lock.Unlock()
	// 获取缓存中 key 对应的资源
	oldObject := c.items[key]
	// 将新资源赋给 缓存中的 items
	c.items[key] = obj
	// 更新 indexer 中资源的信息,indexer 包括缓存 threadSafeMap
	c.index.updateIndices(oldObject, obj, key)
}

clientState 根据资源的状态类型到缓存中处理,资源存储在缓存 threadSafeMap 的 items 中,threadSafeMap 是一个并发 安全的 map,内部通过加锁实现并发(不像 Go 自带的并发安全 map,通过两个 map 实现)。

这里有个组件需要注意的是 indexer,我们先不过多介绍发散,在下一篇文章中会重点介绍 indexer 组件。

1.2 handler

继续查看 handler.OnAdd 实现:

func (s *sharedIndexInformer) OnAdd(obj interface{}, isInInitialList bool) {
	...
	s.processor.distribute(addNotification{newObj: obj, isInInitialList: isInInitialList}, false)
}

func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()

	for listener, isSyncing := range p.listeners {
		switch {
		case !sync:
			// non-sync messages are delivered to every listener
			// 调用 listener.add 处理资源
			listener.add(obj)
		case isSyncing:
			// sync messages are delivered to every syncing listener
			listener.add(obj)
		default:
			// skipping a sync obj for a non-syncing listener
		}
	}
}

func (p *processorListener) add(notification interface{}) {
	if a, ok := notification.(addNotification); ok && a.isInInitialList {
		p.syncTracker.Start()
	}
	// 将包装资源的 notification 写到 processorListener.addCh 通道
	p.addCh <- notification
}

handler.OnAdd 将资源写入到 processorListener 的 addCh 通道。

这里不继续探究哪里接收 processorListener.addCh 通道的 notification,在后面的文章中会重点介绍。

3. 小结

本文重点介绍了 controller 作为消费者是如何消费 DeltaFIFO 中的资源的。同时也可以看出在缓存中是不带事件变更类型的, 缓存中存储的只是资源的 item 信息。

controller 根据 DeltaFIFO 中不同的资源类型做相应的处理,也侧面说明了 DeltaFIFO 为什么要 Delta 结构而不用普通队列。

从 controller 可以看出更新到缓存中的永远是最新的资源,保证了处理资源的一致性。

回头看,这里的逻辑是 client-go 架构图 的第四步和第六步。