Download Kafka

Download the installation package of Kafka from the official website, “kafka_2.13-3.4.0.tgz” is used in this article. Just unarchive it to the installation directory.

Start with ZooKeeper or KRaft

Note: Java 8+ must be installed in your local environment.

Apache Kafka can be started using either ZooKeeper or KRaft. To get started with either configuration, follow the configuration below, but do not use both at the same time.

ZooKeeper

Run the following command to start all services in the correct order.

1
2
#Start the ZooKeeper service
$ bin/zookeeper-server-start.sh config/zookeeper.properties

Among other things, you can customize the directory where the data is stored in the zookeeper.properties file.

1
2
#dataDir=/tmp/zookeeper
dataDir=D:/data/zookeeper

Open another terminal session and run the following command.

1
2
#Start the Kafka service
$ bin/kafka-server-start.sh config/server.properties

Among other things, you can customize the directory where the logs are stored in the server.properties file.

1
2
#log.dirs=/tmp/kafka-logs
log.dirs=D:/data/kafka/kafka-logs

Once all services are successfully started, there will be a basic Kafka environment running and ready to use.

Note: The above command is executed under Linux, or in case of Windows environment, execute the executable file in the windows directory, e.g. bin/windows/*.bat. The command is as follows.

1
2
3
4
5
cd bin/windows

zookeeper-server-start.bat ../../config/zookeeper.properties

kafka-server-start.bat ../../config/server.properties

KRaft

Generate the cluster UUID.

1
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

Set the log directory format.

1
$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

Start the Kafka server.

1
$ bin/kafka-server-start.sh config/kraft/server.properties

Once the Kafka server is successfully started, there will be a basic Kafka environment running and ready to use.

Create topics to store events

Kafka is a distributed event streaming platform that allows events (also known as records or messages) to be read, written, stored and processed on multiple machines.

Example events include payment transactions, geolocation updates from cell phones, shipping orders, sensor measurements from IoT devices or medical devices, and more. These events are organized and stored by topic.

A topic is similar to a folder in a file system, and events are files in that folder.

Therefore, before writing the first event, a topic must be created. Open another terminal session and run the following command.

1
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

All of Kafka’s command line tools have other options. For example, it can also display details such as partition counts for new topics.

1
2
3
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events        TopicId: NPmZHyhbR9y00wMglMH2sg PartitionCount: 1       ReplicationFactor: 1	Configs:
    Topic: quickstart-events Partition: 0    Leader: 0   Replicas: 0 Isr: 0

Writing events to topics

Kafka clients communicate with the Kafka agent over the network to write (or read) events. Once received, the agent will store the events in a persistent and fault-tolerant manner.

Run the console producer client to write some events to the topic. By default, each line of input will result in a separate event being written to the topic.

1
2
3
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event

The producer client can be stopped at any time using Ctrl-C.

Reading events from a topic

Open another terminal session and run the console user client to read the events just created.

1
2
3
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

The consumer client can be stopped at any time using Ctrl-C.

Error when enabling KRaft configuration under Windows

The error message is as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java.io.UncheckedIOException: Error while writing the Quorum status from the file D:\data\kafka\kraft-combined-logs\__cluster_metadata-0\quorum-state
        at org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:155)
        at org.apache.kafka.raft.FileBasedStateStore.writeElectionState(FileBasedStateStore.java:128)
        at org.apache.kafka.raft.QuorumState.transitionTo(QuorumState.java:477)
        at org.apache.kafka.raft.QuorumState.initialize(QuorumState.java:212)
        at org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:369)
        at kafka.raft.KafkaRaftManager.buildRaftClient(RaftManager.scala:240)
        at kafka.raft.KafkaRaftManager.<init>(RaftManager.scala:166)
        at kafka.server.SharedServer.start(SharedServer.scala:228)
        at kafka.server.SharedServer.startForController(SharedServer.scala:128)
        at kafka.server.ControllerServer.startup(ControllerServer.scala:188)
        at kafka.server.KafkaRaftServer.$anonfun$startup$1(KafkaRaftServer.scala:98)
        at kafka.server.KafkaRaftServer.$anonfun$startup$1$adapted(KafkaRaftServer.scala:98)
        at scala.Option.foreach(Option.scala:437)
        at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:98)
        at kafka.Kafka$.main(Kafka.scala:115)
        at kafka.Kafka.main(Kafka.scala)
Caused by: java.nio.file.FileSystemException: D:\data\kafka\kraft-combined-logs\__cluster_metadata-0\quorum-state.tmp -> D:\data\kafka\kraft-combined-logs\__cluster_metadata-0\quorum-state: 另一个程序正在使用此文件,进程无法访问。

        at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
        at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
        at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
        at sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
        at java.nio.file.Files.move(Files.java:1395)
        at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:949)
        at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:932)
        at org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:152)
        ... 15 more

The issue has not been resolved yet, but there has been feedback on GitHub at https://github.com/apache/kafka/pull/12763.

Ref

  • https://kafka.apache.org/quickstart