This article introduces the idea of implementing Flink’s JobManager HA on top of Kubernetes. flink 1.12 is not yet released, but we have seen this piece in the development plan. But this post is mainly about our internal implementation.

HA stands for HighAvailability and is used in many distributed systems to solve the SPOF (single point of failure) problem. For example, the NameNode of HDFS, the distributed file system of the Big Data ecosystem (which can be simply understood as the master in the master-slave structure), has to be enabled for HA in the production environment. Similarly, many HAs are for the master in the distributed architecture (master-slave).

So, how can HA solve the single point problem? In a non-HA system, once the master hangs, a lot of work undertaken by the master role will stop. For example, Flink’s JobManager is responsible for task forwarding. In the non-HA case, we have to restart the job and start the computation from scratch. But with HA, when a JobManager hangs, a new JobManager will replace it so that the job can continue to run.

The need for HA requires the simultaneous existence of several identical service processes, but the real service should be only one process, which is the Leader, and how to choose one of these services to serve is the Leader election.

1. Leader Election

In a typical Leader election, multiple candidates (candidates) compete for the position of leader (normally only one), a process called Leader Election. The leader needs to keep sending heartbeats in order to explain his role as leader to other followers. However, heartbeats have a disadvantage that they may lead to multiple owners when the network is unstable. In order to solve this problem, the concept of lease was introduced. The lease mechanism is simply a specific role to issue a lease to all the nodes in the system, and the node that gets the lease is the leader. the lease corresponds to a valid timeout, and before the timeout runs out, the leader needs to renew the lease, otherwise a new round of leader election is required.

In addition to implementing a consistency algorithm, we can also use some distributed coordination systems (coordinator), such as ZooKeeper, Etcd, and so on, to implement the leader election.

Currently Flink’s HA service is abstracted as interface HighAvailabilityServices, we can implement our own HA service in a plug-in way. HighAvailabilityServices is responsible for all services that need to be guaranteed highly available, and its main tasks include

  • ResourceManager leader election and leader acquisition. (ResourceManager is a component of JobManager)
  • JobManager leader election and leader acquisition.
  • Metadata storage for checkpoint. checkpoint is a distributed snapshot of the distributed computing system, and a new JobManager can read the checkpoint to restore the previous job state. The checkpoint itself is usually stored in a distributed file system, such as HDFS, and the HA service only needs to store the checkpoint’s file path.
  • Register the latest checkpoint. checkpoint will be done once every fixed interval, when recovering, you can generally recover from the latest one.
  • Registry is responsible for managing the status of jobs.

The following is the implementation of the interface, where some of the Deprecated methods have been removed.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
public interface HighAvailabilityServices extends ClientHighAvailabilityServices {
  // 获取 resource manager 的 leader retriever
	LeaderRetrievalService getResourceManagerLeaderRetriever();

  // 获取 dispatcher 的 leader retriever
	LeaderRetrievalService getDispatcherLeaderRetriever();

	/**
	 * Gets the leader retriever for the job JobMaster which is responsible for the given job.
	 *
	 * @param jobID The identifier of the job.
	 * @param defaultJobManagerAddress JobManager address which will be returned by
	 *                              a static leader retrieval service.
	 * @return Leader retrieval service to retrieve the job manager for the given job
	 */
	LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress);

	/**
	 * Gets the leader election service for the cluster's resource manager.
	 *
	 * @return Leader election service for the resource manager leader election
	 */
	LeaderElectionService getResourceManagerLeaderElectionService();

	/**
	 * Gets the leader election service for the cluster's dispatcher.
	 *
	 * @return Leader election service for the dispatcher leader election
	 */
	LeaderElectionService getDispatcherLeaderElectionService();

	/**
	 * Gets the leader election service for the given job.
	 *
	 * @param jobID The identifier of the job running the election.
	 * @return Leader election service for the job manager leader election
	 */
	LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);

	/**
	 * Gets the checkpoint recovery factory for the job manager.
	 *
	 * @return Checkpoint recovery factory
	 */
	CheckpointRecoveryFactory getCheckpointRecoveryFactory();

	/**
	 * Gets the submitted job graph store for the job manager.
	 *
	 * @return Submitted job graph store
	 * @throws Exception if the submitted job graph store could not be created
	 */
	JobGraphStore getJobGraphStore() throws Exception;

	/**
	 * Gets the registry that holds information about whether jobs are currently running.
	 *
	 * @return Running job registry to retrieve running jobs
	 */
	RunningJobsRegistry getRunningJobsRegistry() throws Exception;

	/**
	 * Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
	 *
	 * @return Blob store
	 * @throws IOException if the blob store could not be created
	 */
	BlobStore createBlobStore() throws IOException;

	/**
	 * Gets the leader election service for the cluster's rest endpoint.
	 *
	 * @return the leader election service used by the cluster's rest endpoint
	 */
	default LeaderElectionService getClusterRestEndpointLeaderElectionService() {
		// for backwards compatibility we delegate to getWebMonitorLeaderElectionService
		// all implementations of this interface should override getClusterRestEndpointLeaderElectionService, though
		return getWebMonitorLeaderElectionService();
	}

	
	default LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
		// for backwards compatibility we delegate to getWebMonitorLeaderRetriever
		// all implementations of this interface should override getClusterRestEndpointLeaderRetriever, though
		return getWebMonitorLeaderRetriever();
	}

	// ------------------------------------------------------------------------
	//  Shutdown and Cleanup
	// ------------------------------------------------------------------------

	/**
	 * Closes the high availability services, releasing all resources.
	 *
	 * <p>This method <b>does not delete or clean up</b> any data stored in external stores
	 * (file systems, ZooKeeper, etc). Another instance of the high availability
	 * services will be able to recover the job.
	 *
	 * <p>If an exception occurs during closing services, this method will attempt to
	 * continue closing other services and report exceptions only after all services
	 * have been attempted to be closed.
	 *
	 * @throws Exception Thrown, if an exception occurred while closing these services.
	 */
	
	void close() throws Exception;

	/**
	 * Closes the high availability services (releasing all resources) and deletes
	 * all data stored by these services in external stores.
	 *
	 * <p>After this method was called, the any job or session that was managed by
	 * these high availability services will be unrecoverable.
	 *
	 * <p>If an exception occurs during cleanup, this method will attempt to
	 * continue the cleanup and report exceptions only after all cleanup steps have
	 * been attempted.
	 *
	 * @throws Exception Thrown, if an exception occurred while closing these services
	 *                   or cleaning up data stored by them.
	 */
	void closeAndCleanupAllData() throws Exception;
}

HighAvailabilityServices There are several other related interfaces involved.

LedaerElectionService helps campaigners LeaderContender to participate in leader elections. All services that participate in leader elections need to implement the interface LeaderContender, which is implemented by four services in Flink.

  • Dispatcher
  • JobManager
  • ResourceManager
  • WebMonitorEndpoint
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public interface LeaderElectionService {
  // 启动 leader 选举服务,内部在选举结束之后会根据成功或者失败调用 LeaderContender 的指定的回调函数
  // 除此之外还会将 contender 加入到 listener 中,以便在之后的选举流程中使用
	void start(LeaderContender contender) throws Exception;

  // 停止 leader 选举服务
	void stop() throws Exception;

  // 确认竞争者接受了 leader 委派并发布其 leader 地址
	void confirmLeadership(UUID leaderSessionID, String leaderAddress);

  // 返回在 leaderSessionId 内,与该 service 绑定的竞争者是否是 leader
	boolean hasLeadership(@Nonnull UUID leaderSessionId);
}

// 需要参与 leader 选举的服务都需要实现该接口
public interface LeaderContender {
  // 被选为 leader 时的回调函数
	void grantLeadership(UUID leaderSessionID);

  // 之前的 leader 被剥夺 leader 角色时的调用函数
	void revokeLeadership();

  // 错误处理
	void handleError(Exception exception);

	default String getDescription() {
		return "LeaderContender: " + getClass().getSimpleName();
	}
}

LeaderRetrievalService is used to get the current leader of the supported HA services and to notify the listener of the new leader when the leader changes. The listener corresponds to LeaderRetrievalListener, which needs to be implemented by all services that want to receive notifications about new leaders.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public interface LeaderRetrievalService {
  // 启动 service,并且将 listener 加到回调队列中,当出现新的 leader 时进行回调。
	void start(LeaderRetrievalListener listener) throws Exception;

	void stop() throws Exception;
}

public interface LeaderRetrievalListener {
  // 当新的 leader 被选出来时,会通过回调该方法进行通知
	void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID);

	void handleError(Exception exception);
}

CheckpointRecoveryFactory is the checkpoint related interface. HA Service only stores checkpoint metadata such as the checkpoint storage path (e.g. HDFS), and does not store checkpoint data. This part is mostly about checkpoint related logic, so I won’t go over it here.

RunningJobsRegistry is used to track the status of jobs. For example, when a leader switch occurs, the new jobmanager needs to recover with the help of RunningJobsRegistry.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
public interface RunningJobsRegistry {
	enum JobSchedulingStatus {
		PENDING,
		RUNNING,
		DONE;
	}

  // 将作业状态设置为 Running
	void setJobRunning(JobID jobID) throws IOException;

  // 将作业状态设置为完成
	void setJobFinished(JobID jobID) throws IOException;

  // 获得作业的 schedule 状态
	JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException;

	void clearJob(JobID jobID) throws IOException;
}

1. HA scheme implementation based on ZooKeeper

1. ZooKeeper master selection

It is well known that zk can be used to select a master, and the principle is simple: all candidates participating in the master selection try to create a specified znode (the creation path is atomic), which is actually a path, and the successful one is the leader. persist and ephemeral. persist’s znode and its stored data will always exist unless manually deleted; ephemeral’s znode’s lifecycle is consistent with the client that created it, strictly speaking with the lifecycle of the connection between the client and the server (i.e., the session). Once the client and server are disconnected, the ephemeral znode will be deleted. In addition, zk provides an api called watch, so that other followers can listen to the leader hang up by watching the leader’s znode.

In short, by creating an ephemeral znode we can implement leader election and leader sensing.

Curator leaderLatch

Although it is relatively simple to implement leader election via zk, we generally do not write the code logic naked. Since the common uses of zk are relatively fixed (choosing a leader, distributed locks, etc.), Netflix has open-sourced a zk client-side operation framework: curator, and now everyone operates zk through curator, such as Spark and Flink.

To use LeaderLatch to select a master is very simple, basically follow the procedure below.

1
2
3
leaderLatch = new LeaderLatch(client, latchPath);	// 创建 LeaderLatch
leaderLatch.addListener(this);										// 添加回调对象
leaderLatch.start();				

The second of these adds the callback object, which is actually the following interface.

1
2
3
4
5
public interface LeaderLatchListener {
    void isLeader();	// leader 回调

    void notLeader();	// 非 leader 回调
}

The process that is successfully elected as leader will call back the method isLeader(), otherwise it will call back the method notLeader().

There are four services in Flink’s HighAvailabilityServices that need to elect a leader: ResourceManager, Dispatcher, JobManager, and ClusterRestEndpoint.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
 /**
 * Gets the leader election service for the cluster's resource manager.
 */
LeaderElectionService getResourceManagerLeaderElectionService();
 /**
 * Gets the leader election service for the cluster's dispatcher.
 */
LeaderElectionService getDispatcherLeaderElectionService();
 /**
 * Gets the leader election service for the given job.
 */
LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
 /**
 * Gets the leader election service for the cluster's rest endpoint.
 */
default LeaderElectionService getClusterRestEndpointLeaderElectionService()

From the above analysis of zk leader selection, we can think that these four methods should return one implementation, but the path of the corresponding znode of zk is different. This is indeed the case, as they all return ZooKeeperLeaderElectionService.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public class ZooKeeperLeaderElectionService implements LeaderElectionService, LeaderLatchListener, NodeCacheListener, UnhandledErrorListener {
  // zk 相关
  /** Client to the ZooKeeper quorum. */
	private final CuratorFramework client;

	/** Curator recipe for leader election. */
	private final LeaderLatch leaderLatch;

	/** Curator recipe to watch a given ZooKeeper node for changes. */
	private final NodeCache cache;
}

The ZooKeeperLeaderElectionService member contains curator’s LeaderLatch for selecting the master and NodeCache for listening to leader node changes. ZooKeeperLeaderElectionService implements Flink’s interface LeaderElectionService, as well as curator’s interface LeaderLatchListener and NodeCacheListener so that we can add the current object as a callback to zk’s callback object. LeaderLatchListener is used for callbacks on the completion of the master selection (success or failure), NodeCacheListener is used for callbacks on node changes.

Leader Retrieval is actually the interface LeaderRetrievalService in Flink’s HaService, and as mentioned earlier, the biggest role of LeaderRetrievalService is to notify the client when the Leader changes. The main function of LeaderRetrievalService is to notify the client when the leader changes, which is actually Flink’s TaskManager, etc. It is easy to think of zk’s LeaderRetrievalService as extending the curator framework’s NodeCacheListener, let’s see if it is.

The interface LeaderRetrievalService is based on the zk implementation ZooKeeperLeaderRetrievalService. We have three methods listed below.

  • start: interface LeaderRetrievalService describes the start method as Starts the leader retrieval service with the given listener to Starts the leader retrieval service with the given listener to listen for new leaders. The core of this is to add the listener to the callback object to notify when the leader changes. But based on zk implementation, the callback is done by zk, but the start entry LeaderRetrievalListener does not work as a callback object for zk (no NodeCacheListener is implemented). So the callback here is actually zk calling back ZooKeeperLeaderRetrievalService, and then ZooKeeperLeaderRetrievalService notifying the real listener.
  • stop: not much to say, relatively simple, mainly stop and clean up.
  • nodeChanged: the callback function for zk when the leader changes.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService, NodeCacheListener, UnhandledErrorListener {
	/** Connection to the used ZooKeeper quorum. */
	private final CuratorFramework client;

	/** Curator recipe to watch changes of a specific ZooKeeper node. */
	private final NodeCache cache;
  
	
	public void start(LeaderRetrievalListener listener) throws Exception {
		synchronized (lock) {
			leaderListener = listener;

			client.getUnhandledErrorListenable().addListener(this);
			cache.getListenable().addListener(this);
			cache.start();

			client.getConnectionStateListenable().addListener(connectionStateListener);

			running = true;
		}
	}

  
	public void stop() throws Exception {
		synchronized (lock) {
			if (!running) {
				return;
			}

			running = false;
		}

		client.getUnhandledErrorListenable().removeListener(this);
		client.getConnectionStateListenable().removeListener(connectionStateListener);

		try {
			cache.close();
		} catch (IOException e) {
			throw new Exception("Could not properly stop the ZooKeeperLeaderRetrievalService.", e);
		}
	}

	
	public void nodeChanged() throws Exception {
		synchronized (lock) {
			if (running) {
				try {
					LOG.debug("Leader node has changed.");

					ChildData childData = cache.getCurrentData();

					String leaderAddress;
					UUID leaderSessionID;

					if (childData == null) {
						leaderAddress = null;
						leaderSessionID = null;
					} else {
						byte[] data = childData.getData();

						if (data == null || data.length == 0) {
							leaderAddress = null;
							leaderSessionID = null;
						} else {
							ByteArrayInputStream bais = new ByteArrayInputStream(data);
							ObjectInputStream ois = new ObjectInputStream(bais);

							leaderAddress = ois.readUTF();
							leaderSessionID = (UUID) ois.readObject();
						}
					}

					if (!(Objects.equals(leaderAddress, lastLeaderAddress) &&
						Objects.equals(leaderSessionID, lastLeaderSessionID))) {
						LOG.debug(
							"New leader information: Leader={}, session ID={}.",
							leaderAddress,
							leaderSessionID);

						lastLeaderAddress = leaderAddress;
						lastLeaderSessionID = leaderSessionID;
						leaderListener.notifyLeaderAddress(leaderAddress, leaderSessionID);
					}
				} catch (Exception e) {
					leaderListener.handleError(new Exception("Could not handle node changed event.", e));
					throw e;
				}
			} else {
				LOG.debug("Ignoring node change notification since the service has already been stopped.");
			}
		}
	}
}

4. RunningJobsRegistry

Another piece of HA Service is RunningJobsRegistry, which is used to record the running jobs. The zk-based implementation can be thought of as long as it is based on a fixed path for reading and writing, which is relatively simple and will not be repeated.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/**
 * A zookeeper based registry for running jobs, highly available.
 */
public class ZooKeeperRunningJobsRegistry implements RunningJobsRegistry {

	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperRunningJobsRegistry.class);

	private static final Charset ENCODING = Charset.forName("utf-8");

	/** The ZooKeeper client to use. */
	private final CuratorFramework client;

	private final String runningJobPath;

	public ZooKeeperRunningJobsRegistry(final CuratorFramework client, final Configuration configuration) {
		this.client = checkNotNull(client, "client");
		this.runningJobPath = configuration.getString(HighAvailabilityOptions.ZOOKEEPER_RUNNING_JOB_REGISTRY_PATH);
	}

	
	public void setJobRunning(JobID jobID) throws IOException {
		checkNotNull(jobID);

		try {
			writeEnumToZooKeeper(jobID, JobSchedulingStatus.RUNNING);
		}
		catch (Exception e) {
			throw new IOException("Failed to set RUNNING state in ZooKeeper for job " + jobID, e);
		}
	}

	
	public void setJobFinished(JobID jobID) throws IOException {
		checkNotNull(jobID);

		try {
			writeEnumToZooKeeper(jobID, JobSchedulingStatus.DONE);
		}
		catch (Exception e) {
			throw new IOException("Failed to set DONE state in ZooKeeper for job " + jobID, e);
		}
	}

	
	public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException {
		checkNotNull(jobID);

		try {
			final String zkPath = createZkPath(jobID);
			final Stat stat = client.checkExists().forPath(zkPath);
			if (stat != null) {
				// found some data, try to parse it
				final byte[] data = client.getData().forPath(zkPath);
				if (data != null) {
					try {
						final String name = new String(data, ENCODING);
						return JobSchedulingStatus.valueOf(name);
					}
					catch (IllegalArgumentException e) {
						throw new IOException("Found corrupt data in ZooKeeper: " +
								Arrays.toString(data) + " is no valid job status");
					}
				}
			}

			// nothing found, yet, must be in status 'PENDING'
			return JobSchedulingStatus.PENDING;
		}
		catch (Exception e) {
			throw new IOException("Get finished state from zk fail for job " + jobID.toString(), e);
		}
	}

	
	public void clearJob(JobID jobID) throws IOException {
		checkNotNull(jobID);

		try {
			final String zkPath = createZkPath(jobID);
			this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
			this.client.delete().forPath(zkPath);
		}
		catch (Exception e) {
			throw new IOException("Failed to clear job state from ZooKeeper for job " + jobID, e);
		}
	}

	private String createZkPath(JobID jobID) {
		return runningJobPath + jobID.toString();
	}

	private void writeEnumToZooKeeper(JobID jobID, JobSchedulingStatus status) throws Exception {
		LOG.debug("Setting scheduling state for job {} to {}.", jobID, status);
		final String zkPath = createZkPath(jobID);
		this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
		this.client.setData().forPath(zkPath, status.name().getBytes(ENCODING));
	}
}

2. Why you need a Kubernetes-based HA solution

To some extent, Kubernetes has become the de facto standard for cloud natives. (The de facto standard is whoever uses it more.) Now many big data ecosystems are gradually embracing the Kubernetes ecosystem, such as distributed computing frameworks like Flink and Spark. But Kubernetes clusters use etcd in distributed orchestration, not zk, so the zk-based HA solution mentioned above won’t work. So can we build a zk cluster to do HA for Flink? Of course we can, but it is definitely not a good idea for users who have used Kubernetes before to manage a separate zk cluster for Flink’s HA.

In this case, the Kubernetes-based Flink HA solution is more important. Some people may ask, “Why not etcd-based? Simply, Kubernetes uses etcd, but does not expose the etcd service to users, but uses etcd as its own backend storage for metadata. But using k8s is also an indirect use of etcd.

3. Specific implementation of the program

1. k8s-based leader election

For zk, we implement leader election by creating atomic znode api, similarly for k8s. There is a 2016 blog in the official k8s documentation detailing how to do leader elections with the help of k8s: https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/. Simply put, the atomicity of the k8s API is wrapped into a distributed lock, and all candidates in the election compete for the lock, and the one who gets the lock is the leader.

In the k8s api for client-go, the above lock is called resourcelock, now client-go supports four resourceLock:

  • configMapLock: Distributed lock based on the operation extension of the configMap resource
  • endpointLock: Distributed lock that extends operations based on the endPoint resource
  • leaseLock: based on lease resource, relatively lightweight leader resource
  • multiLock: a mix of locks resourceLock is essentially a lock that locks the resources of k8s and then provides a specific method for election. The specific method here is the method defined by the interfact of Lock, which mainly includes the following.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Interface offers a common interface for locking on arbitrary
// resources used in leader election.  The Interface is used
// to hide the details on specific implementations in order to allow
// them to change over time.  This interface is strictly for use
// by the leaderelection code.
type Interface interface {
	// Get returns the LeaderElectionRecord
	Get(ctx context.Context) (*LeaderElectionRecord, []byte, error)

	// Create attempts to create a LeaderElectionRecord
	Create(ctx context.Context, ler LeaderElectionRecord) error

	// Update will update and existing LeaderElectionRecord
	Update(ctx context.Context, ler LeaderElectionRecord) error

	// RecordEvent is used to record events
	RecordEvent(string)

	// Identity will return the locks Identity
	Identity() string

	// Describe is used to convert details on current resource lock
	// into a string
	Describe() string
}

The implementation of resourceLock for a particular resource is actually a resource (configMap, endpoint, release, etc.) that is manipulated by calling the k8s api, for example, the Create implementation of configMapLock is as follows: it tries to create a specific name under a specific namespace configMap under a specific namespace and write the leader information to the annotation. (k8s API object annotation can basically write arbitrary information) When multiple candidates try to create a configMap with the same name under the same namespace at the same time, only one will succeed, and this is the leader.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// Create attempts to create a LeaderElectionRecord annotation
func (cml *ConfigMapLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
	recordBytes, err := json.Marshal(ler)
	if err != nil {
		return err
	}
	cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Create(ctx, &v1.ConfigMap{
		ObjectMeta: metav1.ObjectMeta{
			Name:      cml.ConfigMapMeta.Name,
			Namespace: cml.ConfigMapMeta.Namespace,
			Annotations: map[string]string{
				LeaderElectionRecordAnnotationKey: string(recordBytes),
			},
		},
	}, metav1.CreateOptions{})
	return err
}

With the resourceLock package and leaderelection provided by client-go, we can easily implement our own leader election logic. Here is an example using leaseLock, the full code is available at: https://github.com/kubernetes/client-go/blob/master/examples/leader-election/main.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
 // we use the Lease lock type since edits to Leases are less common
// and fewer objects in the cluster watch "all Leases".
lock := &resourcelock.LeaseLock{
	LeaseMeta: metav1.ObjectMeta{
		Name:      leaseLockName,
		Namespace: leaseLockNamespace,
	},
	Client: client.CoordinationV1(),
	LockConfig: resourcelock.ResourceLockConfig{
		Identity: id,
	},
}

// start the leader election code loop
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
	Lock: lock,
	// IMPORTANT: you MUST ensure that any code you have that
	// is protected by the lease must terminate **before**
	// you call cancel. Otherwise, you could have a background
	// loop still running and another process could
	// get elected before your background loop finished, violating
	// the stated goal of the lease.
	ReleaseOnCancel: true,
	LeaseDuration:   60 * time.Second,
	RenewDeadline:   15 * time.Second,
	RetryPeriod:     5 * time.Second,
	Callbacks: leaderelection.LeaderCallbacks{
		OnStartedLeading: func(ctx context.Context) {
			// we're notified when we start - this is where you would
			// usually put your code
			run(ctx)
		},
		OnStoppedLeading: func() {
			// we can do cleanup here
			klog.Infof("leader lost: %s", id)
			os.Exit(0)
		},
		OnNewLeader: func(identity string) {
			// we're notified when new leader elected
			if identity == id {
				// I just got the lock
				return
			}
			klog.Infof("new leader elected: %s", identity)
		},
	},
})

The leader election in k8s is described above in client-go, for Java calls we can use the java counterpart to the client SDK: io.kubernetes:client-java-extend. We can see that the code provides three types of resourceLock: configMapLock, endpointLock and leaseLock. Link: https://github.com/kubernetes-client/java/tree/master/extended/src/main/java/io/kubernetes/client/extended/leaderelection/resourcelock

We use configMapLock internally, which means that we use the annotation of configMap to store the leader information.

1. LeaderElectionService

To implement the Java version of the k8s-based LedaerElectionService, we can refer to the class: LeaderElector provided in the sdk. LeaderElector provides a run method to run leader elections.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
   * Runs the leader election in foreground.
   */
  public void run(Runnable startLeadingHook, Runnable stopLeadingHook) {
    run(startLeadingHook, stopLeadingHook, null);
  }

  /**
   * Runs the leader election in foreground.
   */
  public void run(
      Runnable startLeadingHook, Runnable stopLeadingHook, Consumer<String> onNewLeaderHook) {
    this.onNewLeaderHook = onNewLeaderHook;
    log.info("Start leader election with lock {}", config.getLock().describe());
    try {
      if (!acquire()) {
        // Fail to acquire leadership
        return;
      }
      log.info("Successfully acquired lease, became leader");
      // Hook on start leading
      hookWorkers.submit(startLeadingHook);
      renewLoop();
      log.info("Failed to renew lease, lose leadership");
      // Hook on stop leading
      stopLeadingHook.run();
    } catch (Throwable t) {
      stopLeadingHook.run();
    }
  }

But this run method is a one-off. In order to implement Flink’s LedaerElectionService, we need to write a class to do some auxiliary work, such as

  • Encapsulate LeaderElector to run the leader election
  • Define a daemon process to run the leader election in a loop
  • Define a Listener interface for callbacks

2. LeaderRetrievalService

It is relatively simple to implement LeaderRetrievalService based on k8s, because we store the leader information in the annotation of a specific resource (ConfigMap, Endpoint, Lease) of k8s, so we can directly read the annotation of the corresponding resource. annotation of the corresponding resource. However, we also need to extend a resident process and Listener interface to notify the Listener when the Leader information changes.

The remaining two CheckpointRecoveryFactory and RunningJobsRegistry are similar.