Apache Kafka is the most popular distributed messaging publish-subscribe system available today. While Kafka is very powerful, it is equally complex and requires a highly available and robust platform to run. In a world where microservices are prevalent and most companies are adopting distributed computing, it is still very advantageous to use Kafka as a core messaging system.

If you run your microservices in a Kubernetes cluster, it makes sense to run a Kafka cluster in Kubernetes to take advantage of its built-in resiliency and high availability, and we can easily interact with Kafka Pods within the cluster using the built-in Kubernetes service discovery.

Here we will introduce how to build a distributed Kafka cluster on Kubernetes, here we will use Helm Chart and StatefulSet for deployment, of course if you want to dynamically generate persistent data volumes, you also need to configure a StorageClass resource in advance, for example based on Ceph RBD, if you don’t have dynamic volumes configured in your cluster, you need to create 3 unbound PVs for data persistence in advance.

The current Kafka deployment on Kubernetes based on Helm’s official repository chartincubator/kafka, using the image confluentinc/cp-kafka:5.0.1, that is, the deployment is the Kafka version provided by Confluent, Confluent Platform Kafka (CP Kafka for short) provides some advanced features that Apache Kafka does not, such as cross-data center backup, Schema registry, and cluster monitoring tools.

Installation

Installing with Helm Chart requires Helm to be installed, of course, and can be done directly with the latest version of Helm v3 as follows.

1
2
3
4
> wget https://get.helm.sh/helm-v3.4.0-linux-amd64.tar.gz
> tar -zxvf helm-v3.4.0-linux-amd64.tar.gz
> sudo cp -a linux-amd64/helm /usr/local/bin/helm
> chmod +x /usr/local/bin/helm

Then add Kafka’s Chart repository.

1
2
3
4
5
6
> helm repo add incubator http://mirror.azure.cn/kubernetes/charts-incubator/
> helm repo update
Hang tight while we grab the latest from your chart repositories...
...Successfully got an update from the "incubator" chart repository
...Successfully got an update from the "stable" chart repository
Update Complete. ⎈Happy Helming!⎈

Next we can configure the Values file to be installed, either directly using the default values.yaml file, which can then be used to customize it, for example by specifying our own StorageClass.

1
> curl https://raw.githubusercontent.com/helm/charts/master/incubator/kafka/values.yaml > kfk-values.yaml

Here I have used the default directly for installation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
> helm install kafka incubator/kafka -f kfk-values.yaml
NAME: kafka
LAST DEPLOYED: Sun Nov  1 09:36:44 2020
NAMESPACE: default
STATUS: deployed
REVISION: 1
NOTES:
### Connecting to Kafka from inside Kubernetes

You can connect to Kafka by running a simple pod in the K8s cluster like this with a configuration like this:

  apiVersion: v1
  kind: Pod
  metadata:
    name: testclient
    namespace: default
  spec:
    containers:
    - name: kafka
      image: confluentinc/cp-kafka:5.0.1
      command:
        - sh
        - -c
        - "exec tail -f /dev/null"

Once you have the testclient pod above running, you can list all kafka
topics with:

  kubectl -n default exec testclient -- ./bin/kafka-topics.sh --zookeeper kafka-zookeeper:2181 --list

To create a new topic:

  kubectl -n default exec testclient -- ./bin/kafka-topics.sh --zookeeper kafka-zookeeper:2181 --topic test1 --create --partitions 1 --replication-factor 1

To listen for messages on a topic:

  kubectl -n default exec -ti testclient -- ./bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test1 --from-beginning

To stop the listener session above press: Ctrl+C

To start an interactive message producer session:
  kubectl -n default exec -ti testclient -- ./bin/kafka-console-producer.sh --broker-list kafka-headless:9092 --topic test1

To create a message in the above session, simply type the message and press "enter"
To end the producer session try: Ctrl+C

If you specify "zookeeper.connect" in configurationOverrides, please replace "kafka-zookeeper:2181" with the value of "zookeeper.connect", or you will get error.

If you don’t configure StorageClass or available PVs, kafka’s Pod will be in Pending state when you install it, so be sure to configure the data volumes in advance.

Normally, Kafka will be installed successfully in a few moments.

1
2
3
4
5
6
7
8
> kubectl get pods
NAME                READY   STATUS    RESTARTS   AGE
kafka-0             1/1     Running   0          25m
kafka-1             1/1     Running   0          11m
kafka-2             1/1     Running   0          2m
kafka-zookeeper-0   1/1     Running   0          25m
kafka-zookeeper-1   1/1     Running   0          22m
kafka-zookeeper-2   1/1     Running   0          18m

By default, 3 ZK Pods and 3 Kafka Pods are installed, which ensures high availability of the application, or you can see the information about the persistent volumes I configured.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
> kubectl get pvc
NAME              STATUS   VOLUME   CAPACITY   ACCESS MODES   STORAGECLASS   AGE
datadir-kafka-0   Bound    kfk0     1Gi        RWO                           28m
datadir-kafka-1   Bound    kfk1     1Gi        RWO                           13m
datadir-kafka-2   Bound    kfk2     1Gi        RWO                           4m9s
> kubectl get pv
NAME   CAPACITY   ACCESS MODES   RECLAIM POLICY   STATUS   CLAIM                     STORAGECLASS   REASON   AGE
kfk0   1Gi        RWO            Retain           Bound    default/datadir-kafka-0                           23m
kfk1   1Gi        RWO            Retain           Bound    default/datadir-kafka-1                           22m
kfk2   1Gi        RWO            Retain           Bound    default/datadir-kafka-2                           10m

If we configure a default StorageClass, it will dynamically request persistent volumes. If your cluster does not have dynamic volumes enabled, you can modify values.yaml to use static volumes.

Then look at the corresponding Service object.

1
2
3
4
5
6
7
> kubectl get svc
NAME                       TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                      AGE
kafka                      ClusterIP   10.100.205.187   <none>        9092/TCP                     31m
kafka-headless             ClusterIP   None             <none>        9092/TCP                     31m
kafka-zookeeper            ClusterIP   10.100.230.255   <none>        2181/TCP                     31m
kafka-zookeeper-headless   ClusterIP   None             <none>        2181/TCP,3888/TCP,2888/TCP   31m
kubernetes                 ClusterIP   10.96.0.1        <none>        443/TCP                      14d

You can see that there is a zookeeper service called kafka-zookeeper and a Kafka service called kafka. For the management of the Kafka cluster, we will interact with the kafka-zookeeper service, and for the sending and receiving of cluster messages, we will use the kafka service.

Client testing

Now that the Kafka cluster is set up, let’s install a Kafka client that will help us generate and retrieve topics messages.

Create the client directly with the following command.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
> cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Pod
metadata:
  name: testclient
  namespace: default
spec:
  containers:
  - name: kafka
    image: confluentinc/cp-kafka:5.0.1
    command:
      - sh
      - -c
      - "exec tail -f /dev/null"
EOF
> kubectl get pod testclient
NAME         READY   STATUS    RESTARTS   AGE
testclient   1/1     Running   0          23s

Once the client pod is created successfully we can start doing some simple tests. First let’s create a topic named test1 with a partition and replication factor '1'.

1
2
> kubectl exec -it testclient -- /usr/bin/kafka-topics --zookeeper kafka-zookeeper:2181 --topic test1 --create --partitions 1 --replication-factor 1
Created topic "test1".

Then create a producer that will post messages to this topic.

1
2
> kubectl  exec -ti testclient -- /usr/bin/kafka-console-producer --broker-list kafka:9092 --topic test1
>

Then retype a terminal page that allows us to open a consumer session so we can see the message we sent.

1
> kubectl exec -ti testclient -- /usr/bin/kafka-console-consumer --bootstrap-server kafka:9092 --topic test1

Now we send the message in the producer’s window and see the corresponding message in the consumer session window above:

Here the Kafka cluster is proven to work properly. For example, it should be noted that we did not persist the zk cluster, so if it is a production environment, you must remember to do the data persistence and customize it according to your needs in the values.yaml file, but of course for production environments it is recommended to use an Operator to build Kafka clusters, such as strimzi-kafka-operator.