In previous articles we have discussed how to design and implement an operator within a single kubernetes cluster, but as our application scales up or for various other reasons within the company (e.g. permissions, etc.) we have to adopt multiple kubernetes clusters to meet our needs. How does our operator fit into this multi-cluster scenario?

Of course, there are many solutions for multi-cluster scenarios, such as ClusterNet, Karmada, and so on. But sometimes we still have the need for operators to listen directly to the resources of multiple clusters, maybe because of permissions or because we don’t want such a heavy solution.

Multi-Cluster Operator Practices

Requirements

First let’s set the requirements and environment

  • We now have cluster main and cluster sub, where main is the main cluster and sub is the sub-cluster
  • We have a CRD in the main cluster, and the function of this CRD is to create a job
  • Now in a multi-cluster environment, our main cluster listens to the creation of a CRD and automatically creates a job in the main cluster and sub-clusters

Creating an experimental environment

Build a cluster using kind.

1
2
3
4
5
# Creating a master cluster
kind create cluster --name main

# Create subclusters
kind create cluster --name sub

Code

The main logic is shown below, it’s actually adding the subcluster client to TestReconciler.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// TestReconciler reconciles a Test object
type TestReconciler struct {
    // Main cluster client
    client.Client

    // List of clients of all clusters
    Clients map[string]client.Client

    Scheme *runtime.Scheme
}

// NewTestReconciler ...
func NewTestReconciler(mgr ctrl.Manager, clusters map[string]cluster.Cluster) (*TestReconciler, error) {
    r := TestReconciler{
        Client: mgr.GetClient(),
        Scheme: mgr.GetScheme(),
        Clients: map[string]client.Client{
            "main": mgr.GetClient(),
        },
    }
    for name, cluster := range clusters {
        r.Clients[name] = cluster.GetClient()
    }

    err := r.SetupWithManager(mgr)
    return &r, err
}

func (r *TestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    var test jobv1.Test
    var res ctrl.Result

    err := r.Get(ctx, req.NamespacedName, &test)
    if err != nil {
        return res, client.IgnoreNotFound(err)
    }

    job := test.Job()

    for _, c := range r.Clients {
        err := c.Create(ctx, job.DeepCopy())
        if err != nil {
            return res, err
        }
    }

    return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *TestReconciler) SetupWithManager(mgr ctrl.Manager) error {
    builder := ctrl.NewControllerManagedBy(mgr).
        For(&jobv1.Test{})
    return builder.Complete(r)
}

Note that when we initialize main.go, we need to use mgr.Add() to add the subcluster to the manager, which will be used later when listening for resource changes.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// NewSubClusters initializes the subclusters
// Both context clusters need to be present in the ~/.kube/config file
func NewSubClusters(mgr ctrl.Manager, clientContexts ...string) map[string]cluster.Cluster {
    clusters := map[string]cluster.Cluster{}

    for _, v := range clientContexts {
        conf, err := config.GetConfigWithContext(v)
        checkErr(err, "get client config fail", "context", v)

        c, err := cluster.New(conf)
        checkErr(err, "new cluster fail", "context", v)

        err = mgr.Add(c)
        checkErr(err, "add cluster in manager", "context", v)

        clusters[v] = c
    }
    return clusters
}

How to listen to resource changes in multiple clusters at the same time?

Above we showed how to create resources like in multiple clusters, this is actually very simple and can be achieved without controller-runtime, as in this example above, often the creation does not solve the problem, we also need to follow the status of the created resources.

Suppose we have a requirement: as long as there is a job error, then we think the status of the CRD should be Error, how to implement it?

Official example

In the Move cluster-specific code out of the manager there is a simple example in the design document, but I don’t think it’s very good because it’s a bit too simple and crude

  • First listened directly to both clusters for the Secret resource when listening for resource changes.
  • Then within the Reconcile method, since it is not internally known which cluster the resource is from, it has to try the first cluster first and then the second cluster.

So can we tell at Reconcile time which event is from which cluster?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
type secretMirrorReconciler struct {
    referenceClusterClient, mirrorClusterClient client.Client
}

func (r *secretMirrorReconciler) Reconcile(r reconcile.Request)(reconcile.Result, error){
    s := &corev1.Secret{}
    if err := r.referenceClusterClient.Get(context.TODO(), r.NamespacedName, s); err != nil {
        if kerrors.IsNotFound{ return reconcile.Result{}, nil }
        return reconcile.Result, err
    }

    if err := r.mirrorClusterClient.Get(context.TODO(), r.NamespacedName, &corev1.Secret); err != nil {
        if !kerrors.IsNotFound(err) {
            return reconcile.Result{}, err
        }

        mirrorSecret := &corev1.Secret{
            ObjectMeta: metav1.ObjectMeta{Namespace: s.Namespace, Name: s.Name},
            Data: s.Data,
        }
        return reconcile.Result{}, r.mirrorClusterClient.Create(context.TODO(), mirrorSecret)
    }

    return nil
}

func NewSecretMirrorReconciler(mgr manager.Manager, mirrorCluster cluster.Cluster) error {
    return ctrl.NewControllerManagedBy(mgr).
        // Watch Secrets in the reference cluster
        For(&corev1.Secret{}).
        // Watch Secrets in the mirror cluster
        Watches(
            source.NewKindWithCache(&corev1.Secret{}, mirrorCluster.GetCache()),
            &handler.EnqueueRequestForObject{},
        ).
        Complete(&secretMirrorReconciler{
            referenceClusterClient: mgr.GetClient(),
            mirrorClusterClient:    mirrorCluster.GetClient(),
        })
}

// ... Omit the main function

Code

The hard part of the implementation is how to distinguish the source clusters of the events. In the Reconcile parameter ctrl.Request there are only two fields, namespace and name, so the only way we can distinguish the clusters is from these two fields.

1
Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error)

Obviously namespace is more suitable than name, so we can add a rule to namespace, the actual value of namespace field becomes ${cluster}/${namespace}, so we need to add the We need to add the cluster flag to the event entry, and then use the corresponding client in Reconcile to perform the operation according to the cluster.

First of all, when listening, we can customize a handler and inject the cluster name into it.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// MuiltClustersEnqueue
// Append the cluster name to the Namespace
func MuiltClustersEnqueue(clusterName string) handler.EventHandler {
    return handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request {
        return []reconcile.Request{
            {
                NamespacedName: types.NamespacedName{
                    Name:      o.GetName(),
                    Namespace: clusterName + "/" + o.GetNamespace(),
                },
            },
        }
    })
}

// SetupWithManager sets up the controller with the Manager.
func (r *TestJobReconciler) SetupWithManager(mgr ctrl.Manager, cs map[string]cluster.Cluster) error {
    build := ctrl.NewControllerManagedBy(mgr).
        For(&batchv1.Job{})

        // Listening to multiple clusters
    for name, cluster := range cs {
        build = build.Watches(
            source.NewKindWithCache(&batchv1.Job{}, cluster.GetCache()),
            MuiltClustersEnqueue(name),
        )
    }
    return build.Complete(r)
}

Then we just get the correct cluster client within Reconcile to operate.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (r *TestJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    var res ctrl.Result

    logger := log.FromContext(ctx)

    var job batchv1.Job
    cluster, ns := GetClusterNameNs(req.Namespace)
    req.Namespace = ns

    logger.Info("get job", "cluster", cluster)

    err := r.GetClient(cluster).Get(ctx, req.NamespacedName, &job)
    if err != nil {
        return res, client.IgnoreNotFound(err)
    }

    if job.Status.CompletionTime.IsZero() {
        return res, nil
    }
    logger.Info("job complete", "cluster", cluster)

    var test jobv1.Test
    err = r.Get(ctx, clusterx.GetOwnerNameNs(&job), &test)
    if err != nil {
        return res, client.IgnoreNotFound(err)
    }

    test.Status.Phase = "finished"
    err = r.Client.Status().Update(ctx, &test)
    return ctrl.Result{}, err
}

Summary

The final implementation we gave is just a simple demo, in the actual project it is best to do some abstraction of the code again, we can abstract the multi-cluster related operations are put together, so it will be easier to maintain.

Here’s a simple example, which I won’t go into within the article for space.