This article is based on reading the source code of Kubernetes v1.16. The article has some source code, but I will try to describe it clearly by matching pictures

In the Kubernetes Master node, there are three important components: ApiServer, ControllerManager, and Scheduler, which are responsible for the management of the whole cluster together. In this article, we try to sort out the workflow and principle of ControllerManager.

k8s ControllerManager

What is Controller Manager

According to the official documentation: kube-controller-manager runs controllers, which are background threads that handle regular tasks in the cluster.

For example, when a Pod created through Deployment exits abnormally, the RS Controller will accept and handle the exit and create a new Pod to maintain the expected number of copies.

Almost every specific resource is managed by a specific Controller to maintain the expected state, and it is the Controller Manager’s responsibility to aggregate all the Controllers:

  1. provide infrastructure to reduce the complexity of Controller implementation
  2. start and maintain the Controller’s uptime

In this way, Controller ensures that the resources in the cluster remain in the expected state, and Controller Manager ensures that the Controller remains in the expected state.

Controller Workflow

Before we explain how the Controller Manager provides the infrastructure and runtime environment for the Controller, let’s understand what the Controller workflow looks like.

From a high-dimensional perspective, Controller Manager mainly provides the ability to distribute events, while different Controllers only need to register the corresponding Handler to wait for receiving and processing events.

Controller Workflow

Take Deployment Controller as an example, the NewDeploymentController method in pkg/controller/deployment/deployment_controller.go includes the registration of Event Handler, for For Deployment Controller, you only need to implement different processing logic according to different events, and then you can achieve the management of the corresponding resources.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
	AddFunc:    dc.addDeployment,
	UpdateFunc: dc.updateDeployment,
	// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
	DeleteFunc: dc.deleteDeployment,
})
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
	AddFunc:    dc.addReplicaSet,
	UpdateFunc: dc.updateReplicaSet,
	DeleteFunc: dc.deleteReplicaSet,
})
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
	DeleteFunc: dc.deletePod,
})

As you can see, with the help of the Controller Manager, the logic of the Controller can be done very purely by implementing the corresponding EventHandler, so what specific work does the Controller Manager do?

Controller Manager Architecture

The key module that aids the Controller Manager in event distribution is client-go, and one of the more critical modules is informer.

kubernetes provides an architecture diagram of client-go on github, from which you can see that the Controller is the bottom half (CustomController) of the description, while the Controller Manager is mainly the top half of the completion.

Controller Manager Architecture

Informer Factory

As you can see from the above diagram, Informer is a very critical “bridge”, so the management of Informer is the first thing that Controller Manager does.

Since each Informer maintains a watch long connection to the Api Server, this single instance factory ensures that each type of Informer is instantiated only once by providing a unique entry point for all Controllers to get an Informer.

The initialization logic of this singleton factory.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
	factory := &sharedInformerFactory{
		client:           client,
		namespace:        v1.NamespaceAll,
		defaultResync:    defaultResync,
		informers:        make(map[reflect.Type]cache.SharedIndexInformer),
		startedInformers: make(map[reflect.Type]bool),
		customResync:     make(map[reflect.Type]time.Duration),
	}

	// Apply all options
	for _, opt := range options {
		factory = opt(factory)
	}

	return factory
}

As you can see from the initialization logic above, the most important part of the sharedInformerFactory is the map named informers, where the key is the resource type and the value is the Informer that cares about that resource type. each type of Informer will be instantiated only once and stored in the map. Different Controllers will only get the same Informer instance when they need the same resource.

For Controller Manager, maintaining all the Informers to work properly is the basic condition for all Controllers to work properly. The sharedInformerFactory maintains all informer instances through this map, so the sharedInformerFactory also takes the responsibility of providing a unified startup portal.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
	f.lock.Lock()
	defer f.lock.Unlock()

	for informerType, informer := range f.informers {
		if !f.startedInformers[informerType] {
			go informer.Run(stopCh)
			f.startedInformers[informerType] = true
		}
	}
}

When the Controller Manager starts, the most important thing is to run all the Informers through the Start method of this factory.

Informer creation

Here’s how these Informers are created, the Controller Manager is initialized in the NewControllerInitializers function in cmd/kube-controller-manager/app/controllermanager.go. Because of the lengthy code, here is an example of the Deployment Controller only.

The logic for initializing the Deployment Controller is in the startDeploymentController function in cmd/kube-controller-manager/app/apps.go.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
	if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
		return nil, false, nil
	}
	dc, err := deployment.NewDeploymentController(
		ctx.InformerFactory.Apps().V1().Deployments(),
		ctx.InformerFactory.Apps().V1().ReplicaSets(),
		ctx.InformerFactory.Core().V1().Pods(),
		ctx.ClientBuilder.ClientOrDie("deployment-controller"),
	)
	if err != nil {
		return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
	}
	go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
	return nil, true, nil
}

The most critical logic is in deployment.NewDeploymentController, which actually creates the Deployment Controller, and the first three parameters of the creation function are Deployment, ReplicaSet, and Pod’s Informer. As you can see, the Informer’s singleton factory provides an entry point for creating Informers with different resources using the ApiGroup as the path.

However, it is important to note that.Apps().V1().Deployments() returns an instance of type deploymentInformer, but deploymentInformer is not really an Informer (despite its Informer name). It is just a template class whose main function is to provide templates for the creation of Informers focused on Deployment as a specific resource.

1
2
3
4
// Deployments returns a DeploymentInformer.
func (v *version) Deployments() DeploymentInformer {
	return &deploymentInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}

The real logic for creating an Informer is in deploymentInformer.Informer() (client-go/informers/apps/v1/deployment.go), and f.defaultInformer is the default Deployment Informer creation template method to create an Informer that focuses only on Deployment resources by passing the resource instance and this template method into the InformerFor method of the Informer factory.

1
2
3
func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
	return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}

To briefly explain.

  1. you can get an Informer template class of a specific type through the Informer factory (i.e. deploymentInformer in this case)
  2. it is the Informer() method of the Informer template class that actually creates the Informer for that particular resource. 3.
  3. the Informer() method just creates the real Informer through the InformerFor of the Informer factory

The template method (design pattern) is used here, and although it’s a bit convoluted, you can refer to the following diagram to sort it out. The key to understanding it is that the Informer’s differentiated creation logic is delegated to the template class.

creation logic is delegated to the template class

Finally, the structure named sharedIndexInformer will be instantiated and will actually take on the responsibilities of the Informer. It is also the instance that is registered to the Informer factory map.

Informer operation

Since the real Informer instance is an object of type sharedIndexInformer, when the Informer factory is started (by executing the Start method), it is the sharedIndexInformer that is actually run.

The sharedIndexInformer is a component in client-go, and its Run method is a few dozen lines long, but it does a lot of work. This is where we get to the most interesting part of the Controller Manager.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,

		Process: s.HandleDeltas,
	}

	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()

		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()

	// Separate stop channel because Processor should be stopped strictly after controller
	processorStopCh := make(chan struct{})
	var wg wait.Group
	defer wg.Wait()              // Wait for Processor to stop
	defer close(processorStopCh) // Tell Processor to stop
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	wg.StartWithChannel(processorStopCh, s.processor.run)

	defer func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		s.stopped = true // Don't want any new listeners
	}()
	s.controller.Run(stopCh)
}

The startup logic of sharedIndexInformer does several things.

  1. creates a queue named fifo.
  2. creates and runs an instance called controller.
  3. started cacheMutationDetector.
  4. started processor.

These terms (or components) were not mentioned in the previous article, but these four things are the core of what the Controller Manager does, so I’ll cover each of them below.

sharedIndexInformer

sharedIndexInformer is a shared Informer framework where different Controllers only need to provide a template class (like the deploymentInformer mentioned above) to create an Informer specific to their needs.

The sharedIndexInformer contains a bunch of tools to do the Informer’s job, and the main code is in client-go/tools/cache/shared_informer.go. Its creation logic is also in there.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// NewSharedIndexInformer creates a new instance for the listwatcher.
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
	realClock := &clock.RealClock{}
	sharedIndexInformer := &sharedIndexInformer{
		processor:                       &sharedProcessor{clock: realClock},
		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
		listerWatcher:                   lw,
		objectType:                      objType,
		resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
		defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
		clock:                           realClock,
	}
	return sharedIndexInformer
}

In the creation logic, there are several things to look out for:

  1. processor: provides the function of EventHandler registration and event distribution
  2. indexer: provides resource caching functionality
  3. listerWatcher: provided by the template class, contains the List and Watch methods for a specific resource
  4. objectType: used to mark which specific resource type to focus on
  5. cacheMutationDetector: monitors the Informer’s cache

In addition, it also contains the DeltaFIFO queue and controller mentioned in the startup logic above, which are described below.

sharedProcessor

processor is a very interesting component in sharedIndexInformer. Controller Manager ensures that different Controllers share the same Informer through an Informer singleton factory, but different Controllers have different Handlers registered to the shared Informer.

The processor is the component that manages the registered Handlers and distributes events to different Handlers.

1
2
3
4
5
6
7
8
type sharedProcessor struct {
	listenersStarted bool
	listenersLock    sync.RWMutex
	listeners        []*processorListener
	syncingListeners []*processorListener
	clock            clock.Clock
	wg               wait.Group
}

The core of sharedProcessor’s work revolves around the Listener slice of listeners.

When we register a Handler to the Informer, it is eventually converted into an instance of a structure called processorListener.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
	ret := &processorListener{
		nextCh:                make(chan interface{}),
		addCh:                 make(chan interface{}),
		handler:               handler,
		pendingNotifications:  *buffer.NewRingGrowing(bufferSize),
		requestedResyncPeriod: requestedResyncPeriod,
		resyncPeriod:          resyncPeriod,
	}

	ret.determineNextResync(now)

	return ret
}

This instance contains mainly two channels and the Handler method registered outside. The processorListener object instantiated here will eventually be added to the sharedProcessor.listeners list.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func (p *sharedProcessor) addListener(listener *processorListener) {
	p.listenersLock.Lock()
	defer p.listenersLock.Unlock()

	p.addListenerLocked(listener)
	if p.listenersStarted {
		p.wg.Start(listener.run)
		p.wg.Start(listener.pop)
	}
}

As shown in the diagram, the Handler method in the Controller will eventually be added to the Listener, which will be appended to the Listeners slice of the sharedProcessor.

Handler method in the Controller will eventually be added to the Listener

As mentioned before, sharedIndexInformer will run sharedProcessor when it starts, and the logic for starting sharedProcessor is related to these listeners.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
	func() {
		p.listenersLock.RLock()
		defer p.listenersLock.RUnlock()
		for _, listener := range p.listeners {
			p.wg.Start(listener.run)
			p.wg.Start(listener.pop)
		}
		p.listenersStarted = true
	}()
	<-stopCh
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()
	for _, listener := range p.listeners {
		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
	}
	p.wg.Wait() // Wait for all .pop() and .run() to stop
}

As you can see, sharedProcessor will execute the run and pop methods of listener in sequence when it starts, so let’s look at these two methods now.

Starting the listener

Since the listener contains the Handler methods registered with the Controller, the most important function of the listener is to trigger these methods when an event occurs, and listener.run is to keep getting events from the nextCh channel and executing the corresponding handler.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (p *processorListener) run() {
	// this call blocks until the channel is closed.  When a panic happens during the notification
	// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
	// the next notification will be attempted.  This is usually better than the alternative of never
	// delivering again.
	stopCh := make(chan struct{})
	wait.Until(func() {
		// this gives us a few quick retries before a long pause and then a few more quick retries
		err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
			for next := range p.nextCh {
				switch notification := next.(type) {
				case updateNotification:
					p.handler.OnUpdate(notification.oldObj, notification.newObj)
				case addNotification:
					p.handler.OnAdd(notification.newObj)
				case deleteNotification:
					p.handler.OnDelete(notification.oldObj)
				default:
					utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
				}
			}
			// the only way to get here is if the p.nextCh is empty and closed
			return true, nil
		})

		// the only way to get here is if the p.nextCh is empty and closed
		if err == nil {
			close(stopCh)
		}
	}, 1*time.Minute, stopCh)
}

You can see that listener.run keeps getting events from the nextCh channel, but where do the events in the nextCh channel come from? It is the responsibility of listener.pop to put the events in nextCh.

listener.pop is a very clever and interesting piece of logic.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (p *processorListener) pop() {
	defer utilruntime.HandleCrash()
	defer close(p.nextCh) // Tell .run() to stop

	var nextCh chan<- interface{}
	var notification interface{}
	for {
		select {
		case nextCh <- notification:
			// Notification dispatched
			var ok bool
			notification, ok = p.pendingNotifications.ReadOne()
			if !ok { // Nothing to pop
				nextCh = nil // Disable this select case
			}
		case notificationToAdd, ok := <-p.addCh:
			if !ok {
				return
			}
			if notification == nil { // No notification to pop (and pendingNotifications is empty)
				// Optimize the case - skip adding to pendingNotifications
				notification = notificationToAdd
				nextCh = p.nextCh
			} else { // There is already a notification waiting to be dispatched
				p.pendingNotifications.WriteOne(notificationToAdd)
			}
		}
	}
}

The reason why listener contains two channels: addCh and nextCh is that Informer cannot predict whether listener.handler is consuming events faster than they can be produced, so it adds a buffer called pendingNotifications. queue to hold events that have not been consumed in time.

pendingNotifications

The pop method, on the one hand, keeps getting the latest events from addCh to make sure the producer doesn’t block. Then it determines if a buffer exists, and if it does, it adds the event to the buffer, and if not, it tries to push it to nextCh.

On the other hand, it determines if there are any events left in the buffer, and if there is still stock, it keeps passing it to nextCh.

The pop method implements a distribution mechanism with a buffer that allows events to be continuously passed from addCh to nextCh. But the question arises, where do the addCh events come from?

The source is very simple, listener has an add method with an event as input, which pushes the new event into addCh. The add method is called by the sharedProcessor that manages all the listeners.

As mentioned above, the sharedProcessor is responsible for managing all the Handlers and distributing events, but it is the distribute method that does the real distributing.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()

	if sync {
		for _, listener := range p.syncingListeners {
			listener.add(obj)
		}
	} else {
		for _, listener := range p.listeners {
			listener.add(obj)
		}
	}
}

So far, we have a clearer picture of one part:

  1. the Controller registers the Handler with the Informer.
  2. Informer maintains all Handlers (listener) through sharedProcessor.
  3. Informer receives the event and distributes it through sharedProcessor.distribute.
  4. the Controller is triggered by the corresponding Handler to handle its own logic

So the remaining question is where do the Informer events come from?

DeltaFIFO

Before analyzing the Informer fetch event, a very interesting gadget that needs to be told in advance is the fifo queue created during sharedIndexInformer.Run.

1
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

DeltaFIFO is a very interesting queue, the code for which is defined in client-go/tools/cache/delta_fifo.go. The most important thing for a queue is definitely the Add and Pop methods. DeltaFIFO provides several Add methods, and although different methods are distinguished according to different event types (add/update/delete/sync), they all end up executing queueActionLocked.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}

	// If object is supposed to be deleted (last event is Deleted),
	// then we should ignore Sync events, because it would result in
	// recreation of this object.
	if actionType == Sync && f.willObjectBeDeletedLocked(id) {
		return nil
	}

	newDeltas := append(f.items[id], Delta{actionType, obj})
	newDeltas = dedupDeltas(newDeltas)

	if len(newDeltas) > 0 {
		if _, exists := f.items[id]; !exists {
			f.queue = append(f.queue, id)
		}
		f.items[id] = newDeltas
		f.cond.Broadcast()
	} else {
		// We need to remove this from our map (extra items in the queue are
		// ignored if they are not in the map).
		delete(f.items, id)
	}
	return nil
}

The first parameter of the queueActionLocked method, actionType, is the event type.

1
2
3
4
5
6
const (
	Added   DeltaType = "Added"   // watch api 获得的创建事件
	Updated DeltaType = "Updated" // watch api 获得的更新事件
	Deleted DeltaType = "Deleted" // watch api 获得的删除事件
	Sync DeltaType = "Sync"       // 触发了 List Api,需要刷新缓存
)

The event type and the incoming queue method show that this is a queue with business functions, not just “first in, first out”, and there are two very clever designs in the incoming queue method.

  1. the events in the queue will first determine if there are unconsumed events for the resource, and then handle them appropriately.
  2. if the list method finds that the resource has already been deleted, it will not be processed.

The second point is easier to understand, if a list request is triggered and the resource to be processed is found to have been deleted, then it does not need to be queued again. The first point needs to be seen together with the out of queue method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
		for len(f.queue) == 0 {
			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
			// When Close() is called, the f.closed is set and the condition is broadcasted.
			// Which causes this loop to continue and return from the Pop().
			if f.IsClosed() {
				return nil, ErrFIFOClosed
			}

			f.cond.Wait()
		}
		id := f.queue[0]
		f.queue = f.queue[1:]
		if f.initialPopulationCount > 0 {
			f.initialPopulationCount--
		}
		item, ok := f.items[id]
		if !ok {
			// Item may have been deleted subsequently.
			continue
		}
		delete(f.items, id)
		err := process(item)
		if e, ok := err.(ErrRequeue); ok {
			f.addIfNotPresent(id, item)
			err = e.Err
		}
		// Don't need to copyDeltas here, because we're transferring
		// ownership to the caller.
		return item, err
	}
}

DeltaFIFO’s Pop method has one input, which is the handler function. When it comes out of the queue, DeltaFIFO will first get the resource all events according to the resource id, and then hand it over to the handler function.

The workflow is shown in the figure.

workflow

In general, DeltaFIFO’s queue method first determines if the resource is already in items, and if it is, the resource is not yet consumed (still queued), so it appends the event directly to items[resource_id]. If it is not in items, then items[resource_id] is created and the resource id is appended to queue.

The DeltaFIFO out-of-queue method gets the resource id at the top of the queue from queue, then takes all the events for that resource from items, and finally calls the PopProcessFunc type handler passed in by the Pop method.

So, the feature of DeltaFIFO is that it is the events (of the resource) that are in the queue, and when it comes out of the queue, it gets all the events of the first resource in the queue. This design ensures that there is no starvation due to a resource creating events like crazy, so that other resources do not have the chance to be processed.

controller

DeltaFIFO is a very important component, and the only thing that really makes it valuable is the Informer controller.

While the K8s source code does use the word controller, this controller is not a resource controller like a Deployment Controller. Rather, it is a top-down event controller (taking events from the API Server and sending them down to the Informer for processing).

The responsibilities of the controller are twofold.

  1. get events from the Api Server via List-Watch and push the events into DeltaFIFO
  2. call the Pop method of DeltaFIFO with the HandleDeltas method of the sharedIndexInformer as an argument

The definition of controller is very simple and its core is Reflector.

1
2
3
4
5
6
type controller struct {
	config         Config
	reflector      *Reflector
	reflectorMutex sync.RWMutex
	clock          clock.Clock
}

The code of Reflector is rather cumbersome but simple, it is to list-watch through the listerWatcher defined in sharedIndexInformer and push the obtained events into the DeltaFIFO.

After the controller starts, it starts the Reflector and then executes the processLoop, which is a dead loop that keeps reading the resource events from the DeltaFIFO and handing them to the HandleDeltas method of the sharedIndexInformer (assigned to config. Process).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (c *controller) processLoop() {
	for {
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == ErrFIFOClosed {
				return
			}
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

If we look at the HandleDeltas method of the sharedIndexInformer, we can see that the whole event consumption process works.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	// from oldest to newest
	for _, d := range obj.(Deltas) {
		switch d.Type {
		case Sync, Added, Updated:
			isSync := d.Type == Sync
			s.cacheMutationDetector.AddObject(d.Object)
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
				s.processor.distribute(addNotification{newObj: d.Object}, isSync)
			}
		case Deleted:
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}

We learned earlier that the processor.attribute method distributes events to all listeners, and the controller uses the Reflector to get the events from the ApiServer and put them in the queue, then takes the events from the queue via the processLoop for the resource to be processed, and finally calls the processor.attribute via the HandleDeltas method of the sharedIndexInformer. All events, and finally the processor.attribute is called via the HandleDeltas method of the sharedIndexInformer.

Thus, we can organize the entire flow of events as follows.

flow of events

Indexer

Above, we have sorted out all the logic from receiving to distributing events, but in the HandleDeltas method of sharedIndexInformer, there is some logic that is more interesting, that is, all events are updated to s.indexer first and then distributed.

As mentioned earlier, the Indexer is a thread-safe store, used as a cache in order to relieve the pressure on the ApiServer when the resource controller (Controller) queries the resource.

When there is any event update, the cache in Indexer will be refreshed first, and then the event will be distributed to the resource controller, who will get the resource details from Indexer first, thus reducing unnecessary query requests to the APIServer.

The specific implementation of Indexer storage is in client-go/tools/cache/thread_safe_store.go, and the data is stored in threadSafeMap.

1
2
3
4
5
6
7
8
9
type threadSafeMap struct {
	lock  sync.RWMutex
	items map[string]interface{}

	// indexers maps a name to an IndexFunc
	indexers Indexers
	// indices maps a name to an Index
	indices Indices
}

In essence, threadSafeMap is a map with a read/write lock, in addition to which it is possible to define indexes, which are interestingly implemented by two fields.

  1. Indexers is a map that defines a number of indexing functions, key is indexName and value is the indexing function (which calculates the index value of the resource).
  2. Indices holds the mapping between index values and data keys, Indices is a two-level map, the key of the first level is indexName, which corresponds to Indexers and determines what method is used to calculate the index value, and value is a map that holds the association “index value - resource key” association.

The relevant logic is relatively simple and can be found in the following diagram.

relevant logic is relatively simple

MutationDetector

The HandleDeltas method of the sharedIndexInformer updates data to the s.indexer in addition to the s.cacheMutationDetector.

As mentioned at the beginning, when sharedIndexInformer starts, it also starts a cacheMutationDetector to monitor the indexer’s cache.

Because the indexer cache is actually a pointer, multiple Controllers accessing the indexer’s cached resources actually get the same resource instance. If one Controller does not play nice and modifies the properties of a resource, it will definitely affect the correctness of other Controllers.

When the Informer receives a new event, the MutationDetector saves a pointer to the resource (as does the indexer) and a deep copy of the resource. By periodically checking whether the resource pointed to by the pointer matches the deep copy stored at the beginning, we know whether the cached resource has been modified.

However, whether monitoring is enabled or not is affected by the environment variable KUBE_CACHE_MUTATION_DETECTOR. If this environment variable is not set, sharedIndexInformer instantiates dummyMutationDetector and does nothing after startup.

If KUBE_CACHE_MUTATION_DETECTOR is true, sharedIndexInformer instantiates defaultCacheMutationDetector, which performs periodic checks of the cache at 1s intervals, triggering a failure handler function if it finds the cache modified, or a panic if the function is not defined.

Summary

This article explains the Controller Manager in a narrow sense, as it does not include a specific resource manager (Controller), but only explains how the Controller Manager “Manages Controller”.

You can see that the Controller Manager does a lot of work to ensure that the Controller can focus only on the events it cares about, and the core of this work is the Informer. When you understand how the Informer works with other components, it becomes clear what the Controller Manager paves the way for the Resource Manager.