Overview

In Kubernetes, kube-controller-manager, kube-scheduler, and the underlying implementation of controller-rumtime using Operator all support leader election in highly available systems. This article will focus on understanding how the leader election in controller-rumtime (the underlying implementation is client-go) is implemented in the kubernetes controller.

Background

When running kube-controller-manager, there are some parameters provided to cm for leader election, you can refer to the official documentation parameters to understand the parameters.

1
2
3
4
5
6
7
--leader-elect                               Default: true
--leader-elect-renew-deadline duration       Default: 10s
--leader-elect-resource-lock string          Default: "leases"
--leader-elect-resource-name string          Default: "kube-controller-manager"
--leader-elect-resource-namespace string     Default: "kube-system"
--leader-elect-retry-period duration         Default: 2s
...

I thought the election of these components was done through etcd, but when I learned about controller-runtime, I found that there was no configuration of the parameters related to etcd, which raised my curiosity about the election mechanism. With this curiosity, I searched for the kubernetes election and found that the official website describes it this way. simple leader election with kubernetes

From reading the article, we know that the kubernetes API provides an election mechanism that can be implemented for any container running in the cluster.

The Kubernetes API provides two properties to accomplish the election action

  • ResourceVersions: Each API object has a unique ResourceVersion.
  • Annotations: each API object can be annotated with these keys

Note: This kind of election will increase the pressure on the APIServer. It will also have an impact on etcd

So with this information, let’s look at who is the cm leader in the Kubernetes cluster (we provide a cluster with only one node, so this node is the leader).

All services in Kubernetes with leader election enabled will generate an EndPoint, and in this EndPoint will be the label (Annotations) mentioned above to identify who is the leader.

1
2
3
4
5
$ kubectl get ep -n kube-system
NAME                      ENDPOINTS   AGE
kube-controller-manager   <none>      3d4h
kube-dns                              3d4h
kube-scheduler            <none>      3d4h

Here’s an example of kube-controller-manager to see what information is available in this EndPoint.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
[root@master-machine ~]# kubectl describe ep kube-controller-manager -n kube-system
Name:         kube-controller-manager
Namespace:    kube-system
Labels:       <none>
Annotations:  control-plane.alpha.kubernetes.io/leader:
                {"holderIdentity":"master-machine_06730140-a503-487d-850b-1fe1619f1fe1","leaseDurationSeconds":15,"acquireTime":"2022-06-27T15:30:46Z","re...
Subsets:
Events:
  Type    Reason          Age    From                     Message
  ----    ------          ----   ----                     -------
  Normal  LeaderElection  2d22h  kube-controller-manager  master-machine_76aabcb5-49ff-45ff-bd18-4afa61fbc5af became leader
  Normal  LeaderElection  9m     kube-controller-manager  master-machine_06730140-a503-487d-850b-1fe1619f1fe1 became leader

Take kube-controller-manager as an example, and look at this Annotations: control-plane.alpha.kubernetes.io/leader: which identifies which node is the leader.

election in controller-runtime

controller-runtime section on leader election is in pkg/leaderelection, 100 lines of code in total, let’s see what’s done.

As you can see, only some options for creating resource locks are provided here EndPoint What is the information

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
type Options struct {
    // 在manager启动时,决定是否进行选举
    LeaderElection bool
    // 使用那种资源锁 默认为租用 lease
    LeaderElectionResourceLock string
    // 选举发生的名称空间
    LeaderElectionNamespace string
    // 该属性将决定持有leader锁资源的名称
    LeaderElectionID string
}

As you can see by NewResourceLock, this is going under client-go/tools/leaderelection below, and this leaderelection also has an example to learn how to use it.

As you can see by the example, the entry point to the election is a RunOrDie() function.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 这里使用了一个lease锁,注释中说愿意为集群中存在lease的监听较少
lock := &resourcelock.LeaseLock{
    LeaseMeta: metav1.ObjectMeta{
        Name:      leaseLockName,
        Namespace: leaseLockNamespace,
    },
    Client: client.CoordinationV1(),
    LockConfig: resourcelock.ResourceLockConfig{
        Identity: id,
    },
}

// 开启选举循环
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
    Lock: lock,
    // 这里必须保证拥有的租约在调用cancel()前终止,否则会仍有一个loop在运行
    ReleaseOnCancel: true,
    LeaseDuration:   60 * time.Second,
    RenewDeadline:   15 * time.Second,
    RetryPeriod:     5 * time.Second,
    Callbacks: leaderelection.LeaderCallbacks{
        OnStartedLeading: func(ctx context.Context) {
            // 这里填写你的代码,
            // usually put your code
            run(ctx)
        },
        OnStoppedLeading: func() {
            // 这里清理你的lease
            klog.Infof("leader lost: %s", id)
            os.Exit(0)
        },
        OnNewLeader: func(identity string) {
            // we're notified when new leader elected
            if identity == id {
                // I just got the lock
                return
            }
            klog.Infof("new leader elected: %s", identity)
        },
    },
})

Here we understand the concept of lock and how to start a lock, here is a look, client-go provides those locks.

In the code tools/leaderelection/resourcelock/interface.go defines a lock abstraction, interface provides a generic interface for locking resources used in a leader election.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
type Interface interface {
    // Get 返回选举记录
    Get(ctx context.Context) (*LeaderElectionRecord, []byte, error)

    // Create 创建一个LeaderElectionRecord
    Create(ctx context.Context, ler LeaderElectionRecord) error

    // Update will update and existing LeaderElectionRecord
    Update(ctx context.Context, ler LeaderElectionRecord) error

    // RecordEvent is used to record events
    RecordEvent(string)

    // Identity 返回锁的标识
    Identity() string

    // Describe is used to convert details on current resource lock into a string
    Describe() string
}

Then the implementation of this abstract interface is that the implementation of the resource lock, we can see that client-go provides four kinds of resource lock

  • leaselock
  • configmaplock
  • multilock
  • endpointlock

leaselock

Lease is a resource for Leases in the kubernetes control plane implemented through ETCD, mainly to provide a control mechanism for distributed leases. A description of this API can be found at: Lease.

In a Kubernetes cluster, we can use the following command to view the corresponding leases

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
$ kubectl get leases -A
NAMESPACE         NAME                      HOLDER                                                AGE
kube-node-lease   master-machine            master-machine                                        3d19h
kube-system       kube-controller-manager   master-machine_06730140-a503-487d-850b-1fe1619f1fe1   3d19h
kube-system       kube-scheduler            master-machine_1724e2d9-c19c-48d7-ae47-ee4217b27073   3d19h

$ kubectl describe leases kube-controller-manager -n kube-system
Name:         kube-controller-manager
Namespace:    kube-system
Labels:       <none>
Annotations:  <none>
API Version:  coordination.k8s.io/v1
Kind:         Lease
Metadata:
  Creation Timestamp:  2022-06-24T11:01:51Z
  Managed Fields:
    API Version:  coordination.k8s.io/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f:spec:
        f:acquireTime:
        f:holderIdentity:
        f:leaseDurationSeconds:
        f:leaseTransitions:
        f:renewTime:
    Manager:         kube-controller-manager
    Operation:       Update
    Time:            2022-06-24T11:01:51Z
  Resource Version:  56012
  Self Link:         /apis/coordination.k8s.io/v1/namespaces/kube-system/leases/kube-controller-manager
  UID:               851a32d2-25dc-49b6-a3f7-7a76f152f071
Spec:
  Acquire Time:            2022-06-27T15:30:46.000000Z
  Holder Identity:         master-machine_06730140-a503-487d-850b-1fe1619f1fe1
  Lease Duration Seconds:  15
  Lease Transitions:       2
  Renew Time:              2022-06-28T06:09:26.837773Z
Events:                    <none>

Here’s a look at the implementation of leaselock, leaselock will be implemented as an abstraction of the resource lock.

1
2
3
4
5
6
7
8
9
type LeaseLock struct {
    // LeaseMeta 就是类似于其他资源类型的属性,包含name ns 以及其他关于lease的属性
    LeaseMeta  metav1.ObjectMeta
    Client     coordinationv1client.LeasesGetter // Client 就是提供了informer中的功能
    // lockconfig包含上面通过 describe 看到的 Identity与recoder用于记录资源锁的更改
    LockConfig ResourceLockConfig
    // lease 就是 API中的Lease资源,可以参考下上面给出的这个API的使用
    lease      *coordinationv1.Lease
}

Here’s a look at the methods that leaselock implements

Get

Get leaselock.go#L41-L53) is the record that returns the election from the spec.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (ll *LeaseLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
    var err error
    ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{})
    if err != nil {
        return nil, nil, err
    }
    record := LeaseSpecToLeaderElectionRecord(&ll.lease.Spec)
    recordByte, err := json.Marshal(*record)
    if err != nil {
        return nil, nil, err
    }
    return record, recordByte, nil
}

// 可以看出是返回这个资源spec里面填充的值
func LeaseSpecToLeaderElectionRecord(spec *coordinationv1.LeaseSpec) *LeaderElectionRecord {
    var r LeaderElectionRecord
    if spec.HolderIdentity != nil {
        r.HolderIdentity = *spec.HolderIdentity
    }
    if spec.LeaseDurationSeconds != nil {
        r.LeaseDurationSeconds = int(*spec.LeaseDurationSeconds)
    }
    if spec.LeaseTransitions != nil {
        r.LeaderTransitions = int(*spec.LeaseTransitions)
    }
    if spec.AcquireTime != nil {
        r.AcquireTime = metav1.Time{spec.AcquireTime.Time}
    }
    if spec.RenewTime != nil {
        r.RenewTime = metav1.Time{spec.RenewTime.Time}
    }
    return &r
}

Create

Create leaselock.go#L56-L66) is an attempt to create a lease in the kubernetes cluster. As you can see, the Client is the REST client of the corresponding resource provided by the API, and the result will create this Lease in the Kubernetes cluster.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func (ll *LeaseLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
    var err error
    ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Create(ctx, &coordinationv1.Lease{
        ObjectMeta: metav1.ObjectMeta{
            Name:      ll.LeaseMeta.Name,
            Namespace: ll.LeaseMeta.Namespace,
        },
        Spec: LeaderElectionRecordToLeaseSpec(&ler),
    }, metav1.CreateOptions{})
    return err
}

Update

Update is to update the spec of Lease.

RecordEvent

RecordEvent is a record of the events that occurred during the election, at which point we go back to the previous section. When we look at the ep information in the kubernetes cluster, we can see that there is a became leader event in the event, and here we are adding the generated event to the meta-data.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func (ll *LeaseLock) RecordEvent(s string) {
   if ll.LockConfig.EventRecorder == nil {
      return
   }
   events := fmt.Sprintf("%v %v", ll.LockConfig.Identity, s)
   subject := &coordinationv1.Lease{ObjectMeta: ll.lease.ObjectMeta}
   // Populate the type meta, so we don't have to get it from the schema
   subject.Kind = "Lease"
   subject.APIVersion = coordinationv1.SchemeGroupVersion.String()
   ll.LockConfig.EventRecorder.Eventf(subject, corev1.EventTypeNormal, "LeaderElection", events)
}

Here we have a general understanding of what a resource lock is. Other types of resource locks are implemented in the same way, so we won’t elaborate too much here; let’s look at the election process below.

election workflow

The code entry for the election is in leaderelection.go, which continues from the above example above.

We saw earlier that the entry point to the election is a RunOrDie() leader-election/main.go#L122) function, so let’s continue from there. Enter RunOrDie and see that there are really only a few lines, and roughly understand that RunOrDie will use the provided configuration to start the election client, and will then block until the ctx exits, or stops holding the leader’s lease.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
    le, err := NewLeaderElector(lec)
    if err != nil {
        panic(err)
    }
    if lec.WatchDog != nil {
        lec.WatchDog.SetLeaderElection(le)
    }
    le.Run(ctx)
}

Here’s a look at NewLeaderElector What does it do? As you can see, LeaderElector is a structure, here it just creates him, and this structure provides everything we need in an election (LeaderElector is the election client created by RunOrDie).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
    if lec.LeaseDuration <= lec.RenewDeadline {
        return nil, fmt.Errorf("leaseDuration must be greater than renewDeadline")
    }
    if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) {
        return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor")
    }
    if lec.LeaseDuration < 1 {
        return nil, fmt.Errorf("leaseDuration must be greater than zero")
    }
    if lec.RenewDeadline < 1 {
        return nil, fmt.Errorf("renewDeadline must be greater than zero")
    }
    if lec.RetryPeriod < 1 {
        return nil, fmt.Errorf("retryPeriod must be greater than zero")
    }
    if lec.Callbacks.OnStartedLeading == nil {
        return nil, fmt.Errorf("OnStartedLeading callback must not be nil")
    }
    if lec.Callbacks.OnStoppedLeading == nil {
        return nil, fmt.Errorf("OnStoppedLeading callback must not be nil")
    }

    if lec.Lock == nil {
        return nil, fmt.Errorf("Lock must not be nil.")
    }
    le := LeaderElector{
        config:  lec,
        clock:   clock.RealClock{},
        metrics: globalMetricsFactory.newLeaderMetrics(),
    }
    le.metrics.leaderOff(le.config.Name)
    return &le, nil
}

LeaderElector is the established election client.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
type LeaderElector struct {
    config LeaderElectionConfig // 这个的配置,包含一些时间参数,健康检查
    // recoder相关属性
    observedRecord    rl.LeaderElectionRecord
    observedRawRecord []byte
    observedTime      time.Time
    // used to implement OnNewLeader(), may lag slightly from the
    // value observedRecord.HolderIdentity if the transition has
    // not yet been reported.
    reportedLeader string
    // clock is wrapper around time to allow for less flaky testing
    clock clock.Clock
    // 锁定 observedRecord
    observedRecordLock sync.Mutex
    metrics leaderMetricsAdapter
}

You can see that the election logic implemented by Run is the three callbacks that are passed in when initializing the client.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (le *LeaderElector) Run(ctx context.Context) {
    defer runtime.HandleCrash()
    defer func() { // 退出时执行callbacke的OnStoppedLeading
        le.config.Callbacks.OnStoppedLeading()
    }()

    if !le.acquire(ctx) {
        return
    }
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    go le.config.Callbacks.OnStartedLeading(ctx) // 选举时,执行 OnStartedLeading
    le.renew(ctx)
}

The acquire is called in Run, and this is done by a loop to call tryAcquireOrRenew until the end signal is passed by ctx.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (le *LeaderElector) acquire(ctx context.Context) bool {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    succeeded := false
    desc := le.config.Lock.Describe()
    klog.Infof("attempting to acquire leader lease %v...", desc)
    // jitterUntil是执行定时的函数 func() 是定时任务的逻辑
    // RetryPeriod是周期间隔
    // JitterFactor 是重试系数,类似于延迟队列中的系数 (duration + maxFactor * duration)
    // sliding 逻辑是否计算在时间内
    // 上下文传递
    wait.JitterUntil(func() {
        succeeded = le.tryAcquireOrRenew(ctx)
        le.maybeReportTransition()
        if !succeeded {
            klog.V(4).Infof("failed to acquire lease %v", desc)
            return
        }
        le.config.Lock.RecordEvent("became leader")
        le.metrics.leaderOn(le.config.Name)
        klog.Infof("successfully acquired lease %v", desc)
        cancel()
    }, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
    return succeeded
}

Here the actual election action in tryAcquireOrRenew, the following look at tryAcquireOrRenew; tryAcquireOrRenew is to try to get a leader lease, if it has been obtained, then update the lease; otherwise can get the lease is true, and vice versa false.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
    now := metav1.Now() // 时间
    leaderElectionRecord := rl.LeaderElectionRecord{ // 构建一个选举record
        HolderIdentity:       le.config.Lock.Identity(), // 选举人的身份特征,ep与主机名有关
        LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), // 默认15s
        RenewTime:            now, // 重新获取时间
        AcquireTime:          now, // 获得时间
    }

    // 1. 从API获取或创建一个recode,如果可以拿到则已经有租约,反之创建新租约
    oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
    if err != nil {
        if !errors.IsNotFound(err) {
            klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
            return false
        }
        // 创建租约的动作就是新建一个对应的resource,这个lock就是leaderelection提供的四种锁,
        // 看你在runOrDie中初始化传入了什么锁
        if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
            klog.Errorf("error initially creating leader election record: %v", err)
            return false
        }
        // 到了这里就已经拿到或者创建了租约,然后记录其一些属性,LeaderElectionRecord
        le.setObservedRecord(&leaderElectionRecord)

        return true
    }

    // 2. 获取记录检查身份和时间
    if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
        le.setObservedRecord(oldLeaderElectionRecord)

        le.observedRawRecord = oldLeaderElectionRawRecord
    }
    if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
        le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
        !le.IsLeader() { // 不是leader,进行HolderIdentity比较,再加上时间,这个时候没有到竞选其,跳出
        klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
        return false
    }

    // 3.我们将尝试更新。 在这里leaderElectionRecord设置为默认值。让我们在更新之前更正它。
    if le.IsLeader() { // 到这就说明是leader,修正他的时间
        leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
        leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
    } else { // LeaderTransitions 就是指leader调整(转变为其他)了几次,如果是,
        // 则为发生转变,保持原有值
        // 反之,则+1
        leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
    }
    // 完事之后更新APIServer中的锁资源,也就是更新对应的资源的属性信息
    if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
        klog.Errorf("Failed to update lock: %v", err)
        return false
    }
    // setObservedRecord 是通过一个新的record来更新这个锁中的record
    // 操作是安全的,会上锁保证临界区仅可以被一个线程/进程操作
    le.setObservedRecord(&leaderElectionRecord)
    return true
}

summary

At this point, you have a complete idea of what the election process is using kubernetes; here is a brief review of all the steps of the above leader election.

  • The preferred service is the leader of the service, and the lock can be locked for resources such as lease, endpoint, etc.
  • The instance that is already a leader will keep renewing the lease, the default value of the lease is 15 seconds (leaseDuration); the leader renews the lease time when the lease is full (renewTime).
  • Other follower, will constantly check the existence of the corresponding resource lock, if there is already a leader, then check renewTime, if the lease time () is exceeded, it indicates that there is a problem with the leader need to restart the election until a follower is promoted to the leader.
  • And to avoid resource seizure, the Kubernetes API uses ResourceVersion to avoid being repeatedly modified (if the version number does not match the requested version number, then it means it has already been modified, then APIServer will return an error)