Recently, when I was testing log collection, I found that Elasticsearch was a bit overwhelmed by the volume of log data, and the optimization of ES might not be completed overnight, so I planned to add an intermediate layer to export the logs to Kafka, and then consume the logs from Kafka via Logstash and deposit them into Elasticsearch. There is no Kafka cluster in the test environment, so let’s build a Kafka cluster in the test environment first.

The relevant environment versions used in this article are as follows.

1
2
3
4
5
6
$ kubectl version
Client Version: version.Info{Major:"1", Minor:"14", GitVersion:"v1.14.2", GitCommit:"66049e3b21efe110454d67df4fa62b08ea79a19b", GitTreeState:"clean", BuildDate:"2019-05-16T18:55:03Z", GoVersion:"go1.12.5", Compiler:"gc", Platform:"darwin/amd64"}
Server Version: version.Info{Major:"1", Minor:"16", GitVersion:"v1.16.2", GitCommit:"c97fe5036ef3df2967d086711e6c0c405941e14b", GitTreeState:"clean", BuildDate:"2019-10-15T19:09:08Z", GoVersion:"go1.12.10", Compiler:"gc", Platform:"linux/amd64"}
$ helm version
version.BuildInfo{Version:"v3.0.1", GitCommit:"7c22ef9ce89e0ebeb7125ba2ebf7d421f3e82ffa", GitTreeState:"clean", GoVersion:"go1.13.4"}
$ # kafka helm chart 包版本为:kafka-0.20.8.tgz

Again, for simplicity, we use Helm3 to install Kafka, and first we need to add an incubator repository address, since the stable repository does not have a suitable Chart package for Kafka.

1
2
3
4
5
6
$ helm repo add incubator http://mirror.azure.cn/kubernetes/charts-incubator/
$ helm repo update
Hang tight while we grab the latest from your chart repositories...
...Successfully got an update from the "incubator" chart repository
...Successfully got an update from the "stable" chart repository
Update Complete. ⎈ Happy Helming!⎈ 

Download Kafka’s Helm Chart package locally, which will help us understand how to use the Chart package, or you can skip this step.

1
2
3
$ helm fetch incubator/kafka
$ ls kafka-0.20.8.tgz 
$ tar -xvf kafka-0.20.8.tgz

Then create a new file called kafka-test.yaml with the following contents.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
resources:
  limits:
    cpu: 200m
    memory: 1536Mi
  requests:
    cpu: 100m
    memory: 1024Mi

livenessProbe:
  initialDelaySeconds: 60

persistence:
  storageClass: "rook-ceph-block"

Since kafka is slow when it first starts, try to set the initialization time of the health check longer, we set it to livenessProbe.initialDelaySeconds=60, the resource declaration can be declared according to the actual situation of our cluster, and finally if you need to persist kafka data, you also need to provide A StorageClass, we also know that kafka’s IO requirements for disk itself is also very high, so it is best to use Local PV, we use here is a ceph rbd StorageClass resource object: (storageclass.yaml)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
   name: rook-ceph-block
provisioner: rook-ceph.rbd.csi.ceph.com
parameters:
    # clusterID 是 rook 集群运行的命名空间
    clusterID: rook-ceph
    # 指定存储池
    pool: k8s-test-pool
    # RBD image (实际的存储介质) 格式. 默认为 "2".
    imageFormat: "2"
    # RBD image 特性. CSI RBD 现在只支持 `layering` .
    imageFeatures: layering
    # Ceph 管理员认证信息,这些都是在 clusterID 命名空间下面自动生成的
    csi.storage.k8s.io/provisioner-secret-name: rook-csi-rbd-provisioner
    csi.storage.k8s.io/provisioner-secret-namespace: rook-ceph
    csi.storage.k8s.io/node-stage-secret-name: rook-csi-rbd-node
    csi.storage.k8s.io/node-stage-secret-namespace: rook-ceph
    # 指定 volume 的文件系统格式,如果不指定, csi-provisioner 会默认设置为 `ext4`
    csi.storage.k8s.io/fstype: ext4
reclaimPolicy: Retain

The specific storage solution needs to be selected according to our own actual situation, I use the Rook built Ceph here, relatively simple to use.

Once the custom values file is ready, you can use Helm to install it directly.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
$ kubectl create ns kafka
$ helm install -f kafka.yaml kfk incubator/kafka --namespace kafka
NAME: kfk
LAST DEPLOYED: Tue Mar 17 11:49:51 2020
NAMESPACE: kafka
STATUS: deployed
REVISION: 1
NOTES:
### Connecting to Kafka from inside Kubernetes

You can connect to Kafka by running a simple pod in the K8s cluster like this with a configuration like this:

  apiVersion: v1
  kind: Pod
  metadata:
    name: testclient
    namespace: kafka
  spec:
    containers:
    - name: kafka
      image: confluentinc/cp-kafka:5.0.1
      command:
        - sh
        - -c
        - "exec tail -f /dev/null"

Once you have the testclient pod above running, you can list all kafka
topics with:

  kubectl -n kafka exec testclient -- kafka-topics --zookeeper kfk-zookeeper:2181 --list

To create a new topic:

  kubectl -n kafka exec testclient -- kafka-topics --zookeeper kfk-zookeeper:2181 --topic test1 --create --partitions 1 --replication-factor 1

To listen for messages on a topic:

  kubectl -n kafka exec -ti testclient -- kafka-console-consumer --bootstrap-server kfk-kafka:9092 --topic test1 --from-beginning

To stop the listener session above press: Ctrl+C

To start an interactive message producer session:
  kubectl -n kafka exec -ti testclient -- kafka-console-producer --broker-list kfk-kafka-headless:9092 --topic test1

To create a message in the above session, simply type the message and press "enter"
To end the producer session try: Ctrl+C

If you specify "zookeeper.connect" in configurationOverrides, please replace "kfk-zookeeper:2181" with the value of "zookeeper.connect", or you will get error.

After successful installation, you can check the status of the Release.

1
2
3
$ helm ls -n kafka
NAME    NAMESPACE       REVISION        UPDATED                                 STATUS          CHART           APP VERSION
kfk     kafka           1               2020-03-17 14:50:41.595746 +0800 CST    deployed        kafka-0.20.8    5.0.1  

A cluster of 3 instances of kafka and zookeeper is normally deployed every now and then.

1
2
3
4
5
6
7
8
$ kubectl get pods -n kafka
NAME              READY   STATUS    RESTARTS   AGE
kfk-kafka-0       1/1     Running   0          3h52m
kfk-kafka-1       1/1     Running   0          3h50m
kfk-kafka-2       1/1     Running   0          3h48m
kfk-zookeeper-0   1/1     Running   0          3h55m
kfk-zookeeper-1   1/1     Running   0          3h54m
kfk-zookeeper-2   1/1     Running   0          3h54m

After deployment, create a test client to test if the kafka cluster is working: (testclient.yaml)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
apiVersion: v1
  kind: Pod
  metadata:
    name: testclient
    namespace: kafka
  spec:
    containers:
    - name: kafka
      image: confluentinc/cp-kafka:5.0.1
      command:
        - sh
        - -c
        - "exec tail -f /dev/null"

It is also straightforward to deploy the resource objects above.

1
2
3
4
5
$ kubectl apply -f testclient.yaml
$ kubectl get pods -n kafka
NAME              READY   STATUS    RESTARTS   AGE
testclient        1/1     Running   0          3h44m
......

After the test client is created, create a new topic with the following command:

1
2
$ kubectl -n kafka exec testclient -- kafka-topics --zookeeper kfk-zookeeper:2181 --topic test1 --create --partitions 1 --replication-factor 1
Created topic "test1".

You can see that the topic test1 was created successfully. Then you can run the following command to listen for messages from the topic test1.

1
$ kubectl -n kafka exec -ti testclient -- kafka-console-consumer --bootstrap-server kfk-kafka:9092 --topic test1 --from-beginning

Then open a new command line terminal to generate a message.

1
2
3
$ kubectl -n kafka exec -ti testclient -- kafka-console-producer --broker-list kfk-kafka-headless:9092 --topic test1
>Hello kafka on k8s
>

At this time, you can see the corresponding message log in the listener of the topic test1.

1
2
$ kubectl -n kafka exec -ti testclient -- kafka-console-consumer --bootstrap-server kfk-kafka:9092 --topic test1 --from-beginning
Hello kafka on k8s

This shows that our kafka deployment has successfully run on top of a Kubernetes cluster. Of course, we are only using it in a test environment, and there are many things to consider about whether you can deploy kafka on a Kubernetes cluster in a production environment, and it is more recommended to use an Operator for stateful applications, such as Confluent’s Kafka Operator, in short, it doesn’t matter if you can hold it, just use it 🤣