As the cloud-native ecosystem continues to evolve, most of the current Kubernetes-based cloud-native technologies almost always adopt the CRD + Controller model. Even without a custom CRD, there will be a need for a controller to detect resources of interest and do the work required by the business when their state changes.

controller-runtime is a relatively good tool provided by the Kubernetes community to quickly build a set of watch for ApiServer. This article will give a brief summary and introduction of how controller-runtime works and how it is used in different scenarios.

Architecture

The architecture of controller-runtime can be summarized in the following diagram. Note: Webhook is not in the scope of this article, so it is omitted from the diagram.

controller-runtime Architecture

The main components are Manager and Reconciler created by the user and Cache and Controller started by Controller Runtime itself. First, the user side, Manager is created by the user during initialization and used to start the Controller Runtime components; Reconciler is a component that the user needs to provide to handle their own business logic.

The controller-runtime component, Cache, as the name implies, is a cache, which is used to establish the Informer’s connection to the ApiServer to watch the resources and push the watched objects into the queue; the Controller will register the eventHandler with the Informer on the one hand, and get the data from the queue on the other. On the other hand, the controller will take data from the queue and execute the user-side Reconciler functions.

The entire workflow on the controller-runtime side is as follows.

controller-runtime workflow

First, the Controller registers a resource-specific eventHandler with the Informer; then the Cache starts the Informer, which sends a request to the ApiServer to establish a connection; when the Informer detects a resource change, it uses the When the Informer detects a resource change, it uses the eventHandler registered with the Controller to determine if it is pushed into the queue; when an element is pushed into the queue, the Controller takes the element out and executes the Reconciler on the user side.

Usage

The following describes several different scenarios of usage.

General Usage

We are already familiar with the usage of controller-runtime, and the simplest usage can be expressed in the following code.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func start() {
    scheme := runtime.NewScheme()
    _ = corev1.AddToScheme(scheme)
    // 1. init Manager
    mgr, _ := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
        Scheme: scheme,
        Port:   9443,
    })
    // 2. init Reconciler(Controller)
    _ = ctrl.NewControllerManagedBy(mgr).
        For(&corev1.Pod{}).
        Complete(&ApplicationReconciler{})

    // 3. start Manager
    if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
    }
}

type ApplicationReconciler struct {
}

func (a ApplicationReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
    return reconcile.Result{}, nil
}

The first step is to initialize the Manager and generate a default configuration of the Cache.

The second step is to initialize the Controller.

  • ctrl.NewControllerManagedBy : used to create the Controller and inject some configurations of the Manager generated in the first step into the Controller.
  • For : A shortcut method provided by Controller Runtime to specify the resource type of the watch.
  • Owns: sometimes the Owns method is used to indicate that a resource is a slave of a resource I care about and its event will go into the Controller’s queue.
  • Complete is also a shortcut method for generating a Controller, registering the user’s Reconciler into the Controller, and generating the default eventHandler for the watch resource, while executing the Controller’s watch function.

The user’s Reconciler just needs to implement the reconcile.Reconciler interface.

The last step is to start the Manager, which starts the Cache, i.e. the Informer, and the Controller at the same time.

Setting up EventHandler

In the whole architecture, Informer plays the role of ListWatch for ApiServer, and when it detects the change of the resource it is interested in, it will process it according to the registered eventHandler and determine whether it needs to be pushed into the queue.

So, in the process of using it, we can register the eventHandler function of Informer into it when we create the Controller, as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func start() {
    scheme := runtime.NewScheme()
    _ = corev1.AddToScheme(scheme)
    // 1. init Manager
    mgr, _ := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
        Scheme: scheme,
        Port:   9443,
    })
    // 2. init Reconciler(Controller)
    c, _ := controller.New("app", mgr, controller.Options{})
    _ = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}, predicate.Funcs{
        CreateFunc: func(event event.CreateEvent) bool {
            ...
        },
        UpdateFunc: func(updateEvent event.UpdateEvent) bool {
            ...
        },
        DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
            ...
        },
    })
    // 3. start Manager
    if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
    }
}

Adding the logic of judging resources before they are added to the Queue in predicate can effectively prevent the queue from being pushed with too many useless resources. If our Reconciler needs to detect multiple resources, here the Controller can perform watch for different resource types and register different eventHandler each time.

Set Cache selector

In addition, we can also add a valid LabelSelector or FieldSelector to Informer’s ListWatch function to further reduce the detected invalid resources, which can also play a role in reducing the pressure on ApiServer in case of a large amount of cluster resources. The details are as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func start() {
    scheme := runtime.NewScheme()
    _ = corev1.AddToScheme(scheme)
    // 1. init Manager
    mgr, _ := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
        Scheme: scheme,
        Port:   9443,
        NewCache: cache.BuilderWithOptions(cache.Options{
            Scheme: scheme,
            SelectorsByObject: cache.SelectorsByObject{
                &corev1.Pod{}: {
                    Label: labels.SelectorFromSet(labels.Set{}),
                },
                &corev1.Node{}: {
                    Field: fields.SelectorFromSet(fields.Set{"metadata.name": "node01"}),
                },
            },
        }),
    })
    // 2. init Reconciler(Controller)
    c, _ := controller.New("app", mgr, controller.Options{})
    _ = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}, predicate.Funcs{
        CreateFunc: func(event event.CreateEvent) bool {
            ...
        },
        UpdateFunc: func(updateEvent event.UpdateEvent) bool {
            ...
        },
        DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
            ...
        },
    })
    // 3. start Manager
    if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
    }
}

Note that controller-runtime is only available in version v0.11.0 to set the cache selector.

The way to do this is to use the cache.BuilderWithOptions function to register the LabelSelector or FieldSelector when initializing the Manager, and to register the scheme so that when the Informer generated by the cache makes a request to the ApiServer, it will also give the resource scheme. The resource scheme is given at the same time.

Here you can see from the source code that Cache generates 3 types of Informer, structured, unstructured and metadata. The following is a list of the 3 types of Informers that are started at the same time.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func NewInformersMap(config *rest.Config,
    scheme *runtime.Scheme,
    mapper meta.RESTMapper,
    resync time.Duration,
    namespace string,
    selectors SelectorsByGVK,
    disableDeepCopy DisableDeepCopyByGVK,
) *InformersMap {
    return &InformersMap{
        structured:   newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
        unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
        metadata:     newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),

        Scheme: scheme,
    }
}

// Start calls Run on each of the informers and sets started to true.  Blocks on the context.
func (m *InformersMap) Start(ctx context.Context) error {
    go m.structured.Start(ctx)
    go m.unstructured.Start(ctx)
    go m.metadata.Start(ctx)
    <-ctx.Done()
    return nil
}

Among them, structured is a deterministic resource, which needs to register the corresponding resource type in scheme; unstructured is an indeterminate resource; and metadata is a request to ApiServer in the form of protobuf.

Take structured as an example.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
    // Kubernetes APIs work against Resources, not GroupVersionKinds.  Map the
    // groupVersionKind to the Resource API we will use.
    mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
    if err != nil {
        return nil, err
    }

    client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs)
    if err != nil {
        return nil, err
    }
    listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
    listObj, err := ip.Scheme.New(listGVK)
    if err != nil {
        return nil, err
    }

    // TODO: the functions that make use of this ListWatch should be adapted to
    //  pass in their own contexts instead of relying on this fixed one here.
    ctx := context.TODO()
    // Create a new ListWatch for the obj
    return &cache.ListWatch{
        ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
            ip.selectors(gvk).ApplyToList(&opts)
            res := listObj.DeepCopyObject()
            namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
            isNamespaceScoped := namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
            err := client.Get().NamespaceIfScoped(namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res)
            return res, err
        },
        // Setup the watch function
        WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
            ip.selectors(gvk).ApplyToList(&opts)
            // Watch needs to be set to true separately
            opts.Watch = true
            namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
            isNamespaceScoped := namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
            return client.Get().NamespaceIfScoped(namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch(ctx)
        },
    }, nil
}

You can see that in Informer’s ListWatch interface, p.selectors(gvk).ApplyToList(&opts) will add the selector we registered at the beginning to the list/watch request that follows.

Using Metadata

In the above example, we mentioned that metadata uses protobuf serialized form to request ApiServer, which is more efficient than the default serialized type json, and performs better in large-scale environments. However, not all resource types support the protobuf format, for example, CRD does not.

Another point to note is that in Metadata data, the only data that is watched is metadata, not spec and status.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func start() {
    scheme := runtime.NewScheme()
    // 1. init Manager
    mgr, _ := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
        Scheme: scheme,
        Port:   9443,
        NewCache: cache.BuilderWithOptions(cache.Options{
            Scheme: scheme,
            SelectorsByObject: cache.SelectorsByObject{
                &corev1.Pod{}: {
                    Label: labels.SelectorFromSet(labels.Set{}),
                },
                &corev1.Node{}: {
                    Field: fields.SelectorFromSet(fields.Set{"metadata.name": "node01"}),
                },
            },
        }),
    })
    // 2. init Reconciler(Controller)
    c, _ := controller.New("app", mgr, controller.Options{})

    _ = ctrl.NewControllerManagedBy(mgr).
        For(&corev1.Pod{}).
        Complete(&ApplicationReconciler{})

    u := &metav1.PartialObjectMetadata{}
    u.SetGroupVersionKind(schema.GroupVersionKind{
        Kind:    "Pod",
        Group:   "",
        Version: "v1",
    })
    _ = c.Watch(&source.Kind{Type: u}, &handler.EnqueueRequestForObject{}, predicate.Funcs{
        CreateFunc: func(event event.CreateEvent) bool {
            return true
        },
        UpdateFunc: func(updateEvent event.UpdateEvent) bool {
            return true
        },
        DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
            return true
        },
    })
    // 3. start Manager
    if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
    }
}

In the Cache metadata data, the data format used is meta.v1.PartialObjectMetadata, the premise is that the user only cares about the metadata of the resource, not its spec and status, so in the ListWatch function of the ApiServer, only the metadata is fetched. The source code is as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// PartialObjectMetadata is a generic representation of any object with ObjectMeta. It allows clients
// to get access to a particular ObjectMeta schema without knowing the details of the version.
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type PartialObjectMetadata struct {
    TypeMeta `json:",inline"`
    // Standard object's metadata.
    // More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
    // +optional
    ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
}

func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
    // Kubernetes APIs work against Resources, not GroupVersionKinds.  Map the
    // groupVersionKind to the Resource API we will use.
    mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
    if err != nil {
        return nil, err
    }

    // Always clear the negotiated serializer and use the one
    // set from the metadata client.
    cfg := rest.CopyConfig(ip.config)
    cfg.NegotiatedSerializer = nil

    // grab the metadata client
    client, err := metadata.NewForConfig(cfg)
    if err != nil {
        return nil, err
    }
    ctx := context.TODO()
    // create the relevant listwatch
    return &cache.ListWatch{
        ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
            ip.selectors(gvk).ApplyToList(&opts)

            var (
                list *metav1.PartialObjectMetadataList
                err  error
            )
            namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
            if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
                list, err = client.Resource(mapping.Resource).Namespace(namespace).List(ctx, opts)
            } else {
                list, err = client.Resource(mapping.Resource).List(ctx, opts)
            }
            if list != nil {
                for i := range list.Items {
                    list.Items[i].SetGroupVersionKind(gvk)
                }
            }
            return list, err
        },
        // Setup the watch function
        WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
            ip.selectors(gvk).ApplyToList(&opts)
            // Watch needs to be set to true separately
            opts.Watch = true

            var (
                watcher watch.Interface
                err     error
            )
            namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
            if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
                watcher, err = client.Resource(mapping.Resource).Namespace(namespace).Watch(ctx, opts)
            } else {
                watcher, err = client.Resource(mapping.Resource).Watch(ctx, opts)
            }
            if watcher != nil {
                watcher = newGVKFixupWatcher(gvk, watcher)
            }
            return watcher, err
        },
    }, nil
}

As you can see, the controller-runtime uses client-go.metadata.Client and the data format returned by this Client’s interface is PartialObjectMetadata.

Summary

controller-runtime is a very useful tool for generating resource controllers, and we can use controller-runtime to quickly generate the resource controllers we need in the usual development process. At the same time, controller-runtime also provides many ways to not only build controllers quickly, but also to configure them flexibly for different business needs to achieve the desired results.