Apache Kafka is the most popular distributed messaging publish-subscribe system available today. While Kafka is very powerful, it is equally complex and requires a highly available and robust platform to run. In a world where microservices are prevalent and most companies are adopting distributed computing, it is still very advantageous to use Kafka as a core messaging system.

If you run your microservices in a Kubernetes cluster, it makes sense to run a Kafka cluster in Kubernetes to take advantage of its built-in resiliency and high availability, and we can easily interact with Kafka Pods within the cluster using the built-in Kubernetes service discovery.

Here we will introduce how to build a distributed Kafka cluster on Kubernetes, here we will use Helm Chart and StatefulSet for deployment, of course if you want to dynamically generate persistent data volumes, you also need to configure a StorageClass resource in advance, for example based on Ceph RBD, if you don’t have dynamic volumes configured in your cluster, you need to create 3 unbound PVs for data persistence in advance.

The current Kafka deployment on Kubernetes based on Helm’s official repository chartincubator/kafka, using the image confluentinc/cp-kafka:5.0.1, that is, the deployment is the Kafka version provided by Confluent, Confluent Platform Kafka (CP Kafka for short) provides some advanced features that Apache Kafka does not, such as cross-data center backup, Schema registry, and cluster monitoring tools.

Installation

Installing with Helm Chart requires Helm to be installed, of course, and can be done directly with the latest version of Helm v3 as follows.

 1 2 3 4  > wget https://get.helm.sh/helm-v3.4.0-linux-amd64.tar.gz > tar -zxvf helm-v3.4.0-linux-amd64.tar.gz > sudo cp -a linux-amd64/helm /usr/local/bin/helm > chmod +x /usr/local/bin/helm 

 1 2 3 4 5 6  > helm repo add incubator http://mirror.azure.cn/kubernetes/charts-incubator/ > helm repo update Hang tight while we grab the latest from your chart repositories... ...Successfully got an update from the "incubator" chart repository ...Successfully got an update from the "stable" chart repository Update Complete. ⎈Happy Helming!⎈ 

Next we can configure the Values file to be installed, either directly using the default values.yaml file, which can then be used to customize it, for example by specifying our own StorageClass.

 1  > curl https://raw.githubusercontent.com/helm/charts/master/incubator/kafka/values.yaml > kfk-values.yaml 

Here I have used the default directly for installation.

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47  > helm install kafka incubator/kafka -f kfk-values.yaml NAME: kafka LAST DEPLOYED: Sun Nov 1 09:36:44 2020 NAMESPACE: default STATUS: deployed REVISION: 1 NOTES: ### Connecting to Kafka from inside Kubernetes You can connect to Kafka by running a simple pod in the K8s cluster like this with a configuration like this: apiVersion: v1 kind: Pod metadata: name: testclient namespace: default spec: containers: - name: kafka image: confluentinc/cp-kafka:5.0.1 command: - sh - -c - "exec tail -f /dev/null" Once you have the testclient pod above running, you can list all kafka topics with: kubectl -n default exec testclient -- ./bin/kafka-topics.sh --zookeeper kafka-zookeeper:2181 --list To create a new topic: kubectl -n default exec testclient -- ./bin/kafka-topics.sh --zookeeper kafka-zookeeper:2181 --topic test1 --create --partitions 1 --replication-factor 1 To listen for messages on a topic: kubectl -n default exec -ti testclient -- ./bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test1 --from-beginning To stop the listener session above press: Ctrl+C To start an interactive message producer session: kubectl -n default exec -ti testclient -- ./bin/kafka-console-producer.sh --broker-list kafka-headless:9092 --topic test1 To create a message in the above session, simply type the message and press "enter" To end the producer session try: Ctrl+C If you specify "zookeeper.connect" in configurationOverrides, please replace "kafka-zookeeper:2181" with the value of "zookeeper.connect", or you will get error. 

If you don’t configure StorageClass or available PVs, kafka’s Pod will be in Pending state when you install it, so be sure to configure the data volumes in advance.

Normally, Kafka will be installed successfully in a few moments.

 1 2 3 4 5 6 7 8  > kubectl get pods NAME READY STATUS RESTARTS AGE kafka-0 1/1 Running 0 25m kafka-1 1/1 Running 0 11m kafka-2 1/1 Running 0 2m kafka-zookeeper-0 1/1 Running 0 25m kafka-zookeeper-1 1/1 Running 0 22m kafka-zookeeper-2 1/1 Running 0 18m 

By default, 3 ZK Pods and 3 Kafka Pods are installed, which ensures high availability of the application, or you can see the information about the persistent volumes I configured.

  1 2 3 4 5 6 7 8 9 10  > kubectl get pvc NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE datadir-kafka-0 Bound kfk0 1Gi RWO 28m datadir-kafka-1 Bound kfk1 1Gi RWO 13m datadir-kafka-2 Bound kfk2 1Gi RWO 4m9s > kubectl get pv NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM STORAGECLASS REASON AGE kfk0 1Gi RWO Retain Bound default/datadir-kafka-0 23m kfk1 1Gi RWO Retain Bound default/datadir-kafka-1 22m kfk2 1Gi RWO Retain Bound default/datadir-kafka-2 10m 

If we configure a default StorageClass, it will dynamically request persistent volumes. If your cluster does not have dynamic volumes enabled, you can modify values.yaml to use static volumes.

Then look at the corresponding Service object.

 1 2 3 4 5 6 7  > kubectl get svc NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE kafka ClusterIP 10.100.205.187 9092/TCP 31m kafka-headless ClusterIP None 9092/TCP 31m kafka-zookeeper ClusterIP 10.100.230.255 2181/TCP 31m kafka-zookeeper-headless ClusterIP None 2181/TCP,3888/TCP,2888/TCP 31m kubernetes ClusterIP 10.96.0.1 443/TCP 14d 

You can see that there is a zookeeper service called kafka-zookeeper and a Kafka service called kafka. For the management of the Kafka cluster, we will interact with the kafka-zookeeper service, and for the sending and receiving of cluster messages, we will use the kafka service.

Client testing

Now that the Kafka cluster is set up, let’s install a Kafka client that will help us generate and retrieve topics messages.

Create the client directly with the following command.

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18  > cat < kubectl get pod testclient NAME READY STATUS RESTARTS AGE testclient 1/1 Running 0 23s 

Once the client pod is created successfully we can start doing some simple tests. First let’s create a topic named test1 with a partition and replication factor '1'.

 1 2  > kubectl exec -it testclient -- /usr/bin/kafka-topics --zookeeper kafka-zookeeper:2181 --topic test1 --create --partitions 1 --replication-factor 1 Created topic "test1". 

Then create a producer that will post messages to this topic.

 1 2  > kubectl exec -ti testclient -- /usr/bin/kafka-console-producer --broker-list kafka:9092 --topic test1 > 

Then retype a terminal page that allows us to open a consumer session so we can see the message we sent.

 1  > kubectl exec -ti testclient -- /usr/bin/kafka-console-consumer --bootstrap-server kafka:9092 --topic test1 

Now we send the message in the producer’s window and see the corresponding message in the consumer session window above:

Here the Kafka cluster is proven to work properly. For example, it should be noted that we did not persist the zk cluster, so if it is a production environment, you must remember to do the data persistence and customize it according to your needs in the values.yaml file, but of course for production environments it is recommended to use an Operator to build Kafka clusters, such as strimzi-kafka-operator.