golang Kafka

I. Background

As we all know, Kafka is a star open source project under the Apache Open Source Foundation. As an open source distributed event streaming platform, it is used by thousands of companies for high-performance data pipelines, stream analysis, data integration, and mission-critical applications. In China, large and small companies, whether they deploy their own or use Kafka cloud services like those provided by AliCloud, many Internet applications are already inseparable from Kafka.

The Internet is not bound to a certain programming language, but many people don’t like that Kafka is developed by Scala/Java. Especially for those programmers who have a “religious” devotion to a language and a “hammer in their hands and nails in their eyes”, there is always an urge to rewrite Kafka. But just like many fans of the new language want to rewrite Kubernetes, Kafka has already established a huge start and ecological advantage, and it is difficult to establish a mega-project with the same specifications and the corresponding ecology in the short term (the same hot Kafka-like Apache pulsar in the last two years pulsar) was created in the same time as Kafka, but it was incorporated into the Apache Foundation hosting later).

The Kafka ecosystem is robust, with Kafka clients for all programming languages. The company behind Kafka confluent.inc also maintains clients for all major languages.

Kafka clients for all programming languages

Developers of other major languages just need to take advantage of these client sides and make good connections to the Kafka cluster. Well done so much pavement, the following talk about why to write this article.

The current logging scheme for the production environment of the business line is as follows.

logging scheme for the production environment of the business line

From the diagram we see: The business system writes logs to Kafka, and then consumes the logs through the logstash tool and aggregates them to the Elastic Search Cluster behind for query use . The business system is mainly implemented in Java, and in order to prevent the log from blocking the business process, the business system uses logback, which supports the fallback appender, to write the logs: This way, when the Kafka write fails, the logs This way, when Kafka writes fail, the logs can be written to an alternate file to ensure that the logs are not lost as much as possible .

Considering the reuse of existing IT facilities and solutions, our new system implemented in Go also converges to this no-drop log aggregation solution, which requires that our logger also supports writing to Kafka and supports the fallback mechanism.

Our log package is based on the uber zap package, uber’s zap log package is currently one of the most widely used, high-performance log packages in the Go community, Issue 25 thoughtworks technical radar also lists zap as a recommended tool for the experimental phase, and the thoughtworks team is already using it on a large scale.

uber zap

However, zap does not natively support writing Kafka, but zap is extensible, and we need to add extensions to it for writing Kafka. And to write Kafka, we can’t do without the Kafka Client package. Currently the mainstream Kafka client in Go community are sarama from Shopify, confluent-kafka-go maintained by confluent.inc, the company behind Kafka, and segmentio/kafka-go.

In this post, I’ll talk about my experience with each of these three clients based on my usage history.

Here, let’s start with Shopify/sarama, which has the most stars.

II. Shopify/sarama: more stars don’t necessarily mean better

The Kafka client package that has the most stars and is most widely used in the Go community is sarama for Shopify, a foreign e-commerce platform.

Here I will demonstrate how to extend zap to support writing kafka based on sarama. in the previous article, I introduced zap built on top of zapcore, which consists of Encoder, WriteSyncer and LevelEnabler, for our functional requirements of writing Kafka We just need to define an implementation that gives a WriteSyncer interface to assemble a logger that supports writing to Kafka .

Let’s start by looking at the function that creates the logger from the top down.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// https://github.com/bigwhite/experiments/blob/master/kafka-clients/zapkafka/log.go

type Logger struct {
    l     *zap.Logger // zap ensure that zap.Logger is safe for concurrent use
    cfg   zap.Config
    level zap.AtomicLevel
}

func (l *Logger) Info(msg string, fields ...zap.Field) {
    l.l.Info(msg, fields...)
}

func New(writer io.Writer, level int8, opts ...zap.Option) *Logger {
    if writer == nil {
        panic("the writer is nil")
    }
    atomicLevel := zap.NewAtomicLevelAt(zapcore.Level(level))

    logger := &Logger{
        cfg:   zap.NewProductionConfig(),
        level: atomicLevel,
    }

    logger.cfg.EncoderConfig.EncodeTime = func(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
        enc.AppendString(t.Format(time.RFC3339)) // 2021-11-19 10:11:30.777
    }
    logger.cfg.EncoderConfig.TimeKey = "logtime"

    core := zapcore.NewCore(
        zapcore.NewJSONEncoder(logger.cfg.EncoderConfig),
        zapcore.AddSync(writer),
        atomicLevel,
    )
    logger.l = zap.New(core, opts...)
    return logger
}

// SetLevel alters the logging level on runtime
// it is concurrent-safe
func (l *Logger) SetLevel(level int8) error {
    l.level.SetLevel(zapcore.Level(level))
    return nil
}

There is nothing related to the kafka client in this code. The New function is used to create a *Logger instance, which takes the first argument of the io. Note that we use the zap.AtomicLevel type to store the logger’s level information, based on the fact that the zap.AtomicLevel level supports hot updates, we can dynamically modify the logger’s log level at runtime.

Next, we will implement a type that satisfies the zapcore.WriteSyncer interface based on sarama’s AsyncProducer.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// https://github.com/bigwhite/experiments/blob/master/kafka-clients/zapkafka/kafka_syncer.go

type kafkaWriteSyncer struct {
    topic          string
    producer       sarama.AsyncProducer
    fallbackSyncer zapcore.WriteSyncer
}

func NewKafkaAsyncProducer(addrs []string) (sarama.AsyncProducer, error) {
    config := sarama.NewConfig()
    config.Producer.Return.Errors = true
    return sarama.NewAsyncProducer(addrs, config)
}

func NewKafkaSyncer(producer sarama.AsyncProducer, topic string, fallbackWs zapcore.WriteSyncer) zapcore.WriteSyncer {
    w := &kafkaWriteSyncer{
        producer:       producer,
        topic:          topic,
        fallbackSyncer: zapcore.AddSync(fallbackWs),
    }

    go func() {
        for e := range producer.Errors() {
            val, err := e.Msg.Value.Encode()
            if err != nil {
                continue
            }

            fallbackWs.Write(val)
        }
    }()

    return w
}

NewKafkaSyncer is the same function that creates zapcore.WriteSyncer, which uses the sarama.AsyncProducer interface type for its first argument, in order to be able to take advantage of the mock test package provided by sarama. The last parameter is the WriteSyncer parameter used during fallback.

NewKafkaAsyncProducer function is used to facilitate the user to create sarama.AsyncProducer quickly, where the config uses the default config value. In the config default value, the default value of Return.Successes is false, which means that the client does not care about the success status of messages written to Kafka, and we do not need to create a separate goroutine to consume AsyncProducer.Successes(). But we need to focus on write failures, so we set Return.Errors to true and start a goroutine in NewKafkaSyncer to handle the log data of write failures and write them to the fallback syncer.

Next, let’s look at the Write and Sync methods of the kafkaWriteSyncer.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// https://github.com/bigwhite/experiments/blob/master/kafka-clients/zapkafka/kafka_syncer.go

func (ws *kafkaWriteSyncer) Write(b []byte) (n int, err error) {
    b1 := make([]byte, len(b))
    copy(b1, b) // b is reused, we must pass its copy b1 to sarama
    msg := &sarama.ProducerMessage{
        Topic: ws.topic,
        Value: sarama.ByteEncoder(b1),
    }

    select {
    case ws.producer.Input() <- msg:
    default:
        // if producer block on input channel, write log entry to default fallbackSyncer
        return ws.fallbackSyncer.Write(b1)
    }
    return len(b1), nil
}

func (ws *kafkaWriteSyncer) Sync() error {
    ws.producer.AsyncClose()
    return ws.fallbackSyncer.Sync()
}

Note: b in the above code will be reused by zap, so we need to make a copy of b and send the copy to sarama before throwing it to the sarama channel.

From the above code, here we are wrapping the data to be written into a sarama.ProducerMessage and send it to the input channel of the producer. What is the reason for this situation? This is mainly because the kafka logger based on sarama v1.30.0 had a hang in our validation environment, and the network might have fluctuated at that time, causing the connection between the logger and kafka to be abnormal, and we initially suspect that this position is blocking, causing the business to be blocked. There is a fix in sarama v1.32.0, which is very similar to our hang phenomenon.

But there is a serious problem with doing so, that is, in the stress test, we found that a large number of logs could not be written to kafka, but were written to the fallback syncer. The reason for this is that we see in sarama’s async_producer.go that the input channel is an unbuffered channel, and there is only one dispatcher goroutine that reads messages from the input channel, and considering the scheduling of the goroutine, the logs are written to the fallback syncer. Considering the scheduling of the goroutine, it is not surprising that a large number of logs are written to the fallback syncer.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// github.com/Shopify/sarama@v1.32.0/async_producer.go
func newAsyncProducer(client Client) (AsyncProducer, error) {
    // Check that we are not dealing with a closed Client before processing any other arguments
    if client.Closed() {
        return nil, ErrClosedClient
    }

    txnmgr, err := newTransactionManager(client.Config(), client)
    if err != nil {
        return nil, err
    }

    p := &asyncProducer{
        client:     client,
        conf:       client.Config(),
        errors:     make(chan *ProducerError),
        input:      make(chan *ProducerMessage), // 笔者注:这是一个unbuffer channel
        successes:  make(chan *ProducerMessage),
        retries:    make(chan *ProducerMessage),
        brokers:    make(map[*Broker]*brokerProducer),
        brokerRefs: make(map[*brokerProducer]int),
        txnmgr:     txnmgr,
    }
    ... ...
}

Some people say here can add timer (Timer) to do timeout, to know the log are on the critical path of program execution, every write a log to start a Timer feels too consuming (even Reset reuse Timer). If sarama doesn’t hang the input channel at any time, then let’s not use a trick like select-default in the Write method.

A nice thing about sarama is that it provides the mocks test package, which can be used both for self-testing of sarama and for self-testing of go packages that depend on sarama, taking the above implementation as an example, we can write some tests based on the mocks test package.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// https://github.com/bigwhite/experiments/blob/master/kafka-clients/zapkafka/log_test.go

func TestWriteFailWithKafkaSyncer(t *testing.T) {
    config := sarama.NewConfig()
    p := mocks.NewAsyncProducer(t, config)

    var buf = make([]byte, 0, 256)
    w := bytes.NewBuffer(buf)
    w.Write([]byte("hello"))
    logger := New(NewKafkaSyncer(p, "test", NewFileSyncer(w)), 0)

    p.ExpectInputAndFail(errors.New("produce error"))
    p.ExpectInputAndFail(errors.New("produce error"))

    // all below will be written to the fallback sycner
    logger.Info("demo1", zap.String("status", "ok")) // write to the kafka syncer
    logger.Info("demo2", zap.String("status", "ok")) // write to the kafka syncer

    // make sure the goroutine which handles the error writes the log to the fallback syncer
    time.Sleep(2 * time.Second)

    s := string(w.Bytes())
    if !strings.Contains(s, "demo1") {
        t.Errorf("want true, actual false")
    }
    if !strings.Contains(s, "demo2") {
        t.Errorf("want true, actual false")
    }

    if err := p.Close(); err != nil {
        t.Error(err)
    }
}

NewAsyncProducerreturns an implementation that satisfies thesarama.AsyncProducer` interface. Then set expect, for each message, and write two logs here, so set it twice. Note: Since we are handling the Errors channel in a separate goroutine, there are some competing conditions here . In concurrent programs, the Fallback syncer must also support concurrent writes. zapcore provides zapcore.Lock which can be used to wrap a normal zapcore.WriteSyncer into a concurrency-safe WriteSyncer.

However, there was a “serious” problem with sarama. We remove the select-default operation for the input channel and create a concurrent-write applet for concurrently writing logs to kafka.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// https://github.com/bigwhite/experiments/blob/master/kafka-clients/zapkafka/cmd/concurrent_write/main.go

func SaramaProducer() {
    p, err := log.NewKafkaAsyncProducer([]string{"localhost:29092"})
    if err != nil {
        panic(err)
    }
    logger := log.New(log.NewKafkaSyncer(p, "test", zapcore.AddSync(os.Stderr)), int8(0))
    var wg sync.WaitGroup
    var cnt int64

    for j := 0; j < 10; j++ {
        wg.Add(1)
        go func(j int) {
            var value string
            for i := 0; i < 10000; i++ {
                now := time.Now()
                value = fmt.Sprintf("%02d-%04d-%s", j, i, now.Format("15:04:05"))
                logger.Info("log message:", zap.String("value", value))
                atomic.AddInt64(&cnt, 1)
            }
            wg.Done()
        }(j)
    }

    wg.Wait()
    logger.Sync()
    println("cnt =", atomic.LoadInt64(&cnt))
    time.Sleep(10 * time.Second)
}

func main() {
    SaramaProducer()
}

We start a kafka service locally using docker-compose.yml, which is officially provided by kafka.

1
2
$cd benchmark
$docker-compose up -d

Then we use the consumer tool that comes with the kafka container to consume data from the topic named test, and the consumed data is redirected to 1.log.

1
$docker exec benchmark_kafka_1 /bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning > 1.log 2>&1

Then we run concurrent_write.

1
2
$ make
$./concurrent_write > 1.log 2>&1

The concurrent_write program starts 10 goroutines, each goroutine writes 1w logs to kafka, most of the time you can see 10w logs in the 1.log in the benchmark directory, but when using sarama v1.30.0, sometimes you can see less than 10w logs. but when I use sarama v1.30.0, I sometimes see less than 10w logs, and I don’t know where the “missing” logs are. With sarama v1.32.0, this has not happened yet.

Well, it’s time to look at the next kafka client package!

III. confluent-kafka-go: the package that requires cgo to be opened is still a bit annoying

The confluent-kafka-go package is a Go client maintained by confluent.inc, the technology company behind kafka, and can be considered the official Go client for Kafka. The only “problem” with this package, however, is that it is built on the kafka c/c++ library librdkafka, which means that once your Go application relies on the confluent-kafka-go, you will have a hard time achieving static compilation of Go applications and cross-platform compilation. Since all business systems rely on log packages, once the dependency on confluent-kafka-go can only be dynamically linked, our build toolchain all needs to be changed, which is slightly more costly.

However, confluent-kafka-go is easy to use, has good write performance, and does not have the same “lost messages” as the previous sarama, here is an example of a confluent-kafka-go based producer.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// https://github.com/bigwhite/experiments/blob/master/kafka-clients/confluent-kafka-go-static-build/producer.go

func ReadConfig(configFile string) kafka.ConfigMap {
    m := make(map[string]kafka.ConfigValue)
    file, err := os.Open(configFile)
    if err != nil {
        fmt.Fprintf(os.Stderr, "Failed to open file: %s", err)
        os.Exit(1)
    }
    defer file.Close()

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        line := strings.TrimSpace(scanner.Text())
        if !strings.HasPrefix(line, "#") && len(line) != 0 {
            kv := strings.Split(line, "=")
            parameter := strings.TrimSpace(kv[0])
            value := strings.TrimSpace(kv[1])
            m[parameter] = value
        }
    }

    if err := scanner.Err(); err != nil {
        fmt.Printf("Failed to read file: %s", err)
        os.Exit(1)
    }
    return m
}

func main() {
    conf := ReadConfig("./producer.conf")

    topic := "test"
    p, err := kafka.NewProducer(&conf)
    var mu sync.Mutex

    if err != nil {
        fmt.Printf("Failed to create producer: %s", err)
        os.Exit(1)
    }
    var wg sync.WaitGroup
    var cnt int64

    // Go-routine to handle message delivery reports and
    // possibly other event types (errors, stats, etc)
    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition)
                } else {
                    fmt.Printf("Produced event to topic %s: key = %-10s value = %s\n",
                        *ev.TopicPartition.Topic, string(ev.Key), string(ev.Value))
                }
            }
        }
    }()

    for j := 0; j < 10; j++ {
        wg.Add(1)
        go func(j int) {
            var value string
            for i := 0; i < 10000; i++ {
                key := ""
                now := time.Now()
                value = fmt.Sprintf("%02d-%04d-%s", j, i, now.Format("15:04:05"))
                mu.Lock()
                p.Produce(&kafka.Message{
                    TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
                    Key:            []byte(key),
                    Value:          []byte(value),
                }, nil)
                mu.Unlock()
                atomic.AddInt64(&cnt, 1)
            }
            wg.Done()
        }(j)
    }

    wg.Wait()
    // Wait for all messages to be delivered
    time.Sleep(10 * time.Second)
    p.Close()
}

Here we still use 10 goroutines to write 1w messages each to kafka. Note: Producer instances created with kafka.NewProducer by default are not concurrency safe, so here a sync.Mutex is used to manage their Produce calls synchronously. We can verify that confluent-kafka-go is running by starting a kafka service locally, as in the example in sarama.

Since the confluent-kafka-go package is implemented based on the kafka c library, we cannot turn off CGO, and if we do, we will encounter the following compilation problem.

1
2
3
4
5
6
7
8
$CGO_ENABLED=0 go build
# producer
./producer.go:15:42: undefined: kafka.ConfigMap
./producer.go:17:29: undefined: kafka.ConfigValue
./producer.go:50:18: undefined: kafka.NewProducer
./producer.go:85:22: undefined: kafka.Message
./producer.go:86:28: undefined: kafka.TopicPartition
./producer.go:86:75: undefined: kafka.PartitionAny

Therefore, by default Go programs that rely on the confluent-kafka-go package will be dynamically linked, and the compiled program results are viewed via ldd as follows (on CentOS).

1
2
3
4
5
6
7
8
9
$make build
$ldd producer
    linux-vdso.so.1 =>  (0x00007ffcf87ec000)
    libm.so.6 => /lib64/libm.so.6 (0x00007f473d014000)
    libdl.so.2 => /lib64/libdl.so.2 (0x00007f473ce10000)
    libpthread.so.0 => /lib64/libpthread.so.0 (0x00007f473cbf4000)
    librt.so.1 => /lib64/librt.so.1 (0x00007f473c9ec000)
    libc.so.6 => /lib64/libc.so.6 (0x00007f473c61e000)
    /lib64/ld-linux-x86-64.so.2 (0x00007f473d316000)

So is it possible to compile statically with CGO on? Theoretically, yes.

But the confluent-kafka-go package is officially confirmed not to support static compilation yet. Let’s try statically compiling it with CGO on.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// on CentOS
$ go build -buildvcs=false -o producer-static -ldflags '-linkmode "external" -extldflags "-static"'
$ producer
/root/.bin/go1.18beta2/pkg/tool/linux_amd64/link: running gcc failed: exit status 1
/usr/bin/ld: 找不到 -lm
/usr/bin/ld: 找不到 -ldl
/usr/bin/ld: 找不到 -lpthread
/usr/bin/ld: 找不到 -lrt
/usr/bin/ld: 找不到 -lpthread
/usr/bin/ld: 找不到 -lc
collect2: 错误:ld 返回 1

Static linking will statically link the symbols of the c part of confluent-kafka-go, which may be in c runtime libraries such as libc, libpthread, or system libraries, but by default CentOS does not have the .a (archive) versions of these libraries installed. We need to install them manually.

1
$yum install glibc-static

After installation, we then execute the static compile command above.

1
2
3
4
5
6
$go build -buildvcs=false -o producer-static -ldflags '-linkmode "external" -extldflags "-static"'
$ producer
/root/go/pkg/mod/github.com/confluentinc/confluent-kafka-go@v1.8.2/kafka/librdkafka_vendor/librdkafka_glibc_linux.a(rddl.o):在函数'rd_dl_open'中:
(.text+0x1d): 警告:Using 'dlopen' in statically linked applications requires at runtime the shared libraries from the glibc version used for linking
/root/go/pkg/mod/github.com/confluentinc/confluent-kafka-go@v1.8.2/kafka/librdkafka_vendor/librdkafka_glibc_linux.a(rdaddr.o):在函数'rd_getaddrinfo'中:
(.text+0x440): 警告:Using 'getaddrinfo' in statically linked applications requires at runtime the shared libraries from the glibc version used for linking

This time our static compilation worked!

1
2
$ ldd producer-static
    不是动态可执行文件

But there are some warnings! Let’s ignore these warnings and try to see if the compiled producer-static is available. Starting the local kafka service with docker-compose and executing producer-static, we find that the application can write 10w messages to kafka normally, without errors occurring in between. At least in the producer scenario, the application does not execute the code containing dlopen, getaddrinfo.

However, this does not mean that the above static compilation approach is not problematic in other scenarios, so let’s wait for the official solution to be released. Or use the builder container to build your confluent-kafka-go based application.

Let’s move on to segmentio/kafka-go.

iv. segmentio/kafka-go: sync is slow, async is fast

Like sarama, segmentio/kafka-go is a pure go implementation of the kafka client and has been tested in many companies’ production environments. segmentio/kafka-go provides low-level conn api and high-level api (reader and writer), taking writer as an example. Relative to the low-level api, it is concurrently safe, but also provides connection hold and retry, without the need for developers to implement their own, in addition writer also supports sync and async write, timeout write with context.

But Writer’s sync mode is very slow, only a few dozen a second, but async mode is fast!

However, like confluent-kafka-go, segmentio/kafka-go does not provide a mock test package like sarama, we need to build our own environment to test. kafka-go official advice: start a kafka service locally and run the test . In the age of lightweight containers, it’s worth thinking about whether mock is needed.

The experience with segmentio/kafka-go has been great, and I haven’t encountered any major problems so far, so I won’t give examples here. See the benchmark section below for examples.

V. Write performance

Even a brief comparison cannot be done without benchmark, and test cases for sequential benchmark and concurrent benchmark are created here for each of the three packages above.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// https://github.com/bigwhite/experiments/blob/master/kafka-clients/benchmark/kafka_clients_test.go

var m = []byte("this is benchmark for three mainstream kafka client")

func BenchmarkSaramaAsync(b *testing.B) {
    b.ReportAllocs()
    config := sarama.NewConfig()
    producer, err := sarama.NewAsyncProducer([]string{"localhost:29092"}, config)
    if err != nil {
        panic(err)
    }

    message := &sarama.ProducerMessage{Topic: "test", Value: sarama.ByteEncoder(m)}

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        producer.Input() <- message
    }
}

func BenchmarkSaramaAsyncInParalell(b *testing.B) {
    b.ReportAllocs()
    config := sarama.NewConfig()
    producer, err := sarama.NewAsyncProducer([]string{"localhost:29092"}, config)
    if err != nil {
        panic(err)
    }

    message := &sarama.ProducerMessage{Topic: "test", Value: sarama.ByteEncoder(m)}

    b.ResetTimer()

    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            producer.Input() <- message
        }
    })
}

func BenchmarkKafkaGoAsync(b *testing.B) {
    b.ReportAllocs()
    w := &kafkago.Writer{
        Addr:     kafkago.TCP("localhost:29092"),
        Topic:    "test",
        Balancer: &kafkago.LeastBytes{},
        Async:    true,
    }

    c := context.Background()
    b.ResetTimer()

    for i := 0; i < b.N; i++ {
        w.WriteMessages(c, kafkago.Message{Value: []byte(m)})
    }
}

func BenchmarkKafkaGoAsyncInParalell(b *testing.B) {
    b.ReportAllocs()
    w := &kafkago.Writer{
        Addr:     kafkago.TCP("localhost:29092"),
        Topic:    "test",
        Balancer: &kafkago.LeastBytes{},
        Async:    true,
    }

    c := context.Background()
    b.ResetTimer()

    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            w.WriteMessages(c, kafkago.Message{Value: []byte(m)})
        }
    })
}

func ReadConfig(configFile string) ckafkago.ConfigMap {
    m := make(map[string]ckafkago.ConfigValue)

    file, err := os.Open(configFile)
    if err != nil {
        fmt.Fprintf(os.Stderr, "Failed to open file: %s", err)
        os.Exit(1)
    }
    defer file.Close()

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        line := strings.TrimSpace(scanner.Text())
        if !strings.HasPrefix(line, "#") && len(line) != 0 {
            kv := strings.Split(line, "=")
            parameter := strings.TrimSpace(kv[0])
            value := strings.TrimSpace(kv[1])
            m[parameter] = value
        }
    }

    if err := scanner.Err(); err != nil {
        fmt.Printf("Failed to read file: %s", err)
        os.Exit(1)
    }

    return m

}

func BenchmarkConfluentKafkaGoAsync(b *testing.B) {
    b.ReportAllocs()
    conf := ReadConfig("./confluent-kafka-go.conf")

    topic := "test"
    p, _ := ckafkago.NewProducer(&conf)

    go func() {
        for _ = range p.Events() {
        }
    }()

    key := []byte("")
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        p.Produce(&ckafkago.Message{
            TopicPartition: ckafkago.TopicPartition{Topic: &topic, Partition: ckafkago.PartitionAny},
            Key:            key,
            Value:          m,
        }, nil)
    }
}

func BenchmarkConfluentKafkaGoAsyncInParalell(b *testing.B) {
    b.ReportAllocs()
    conf := ReadConfig("./confluent-kafka-go.conf")

    topic := "test"
    p, _ := ckafkago.NewProducer(&conf)

    go func() {
        for range p.Events() {
        }
    }()

    var mu sync.Mutex
    key := []byte("")
    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            mu.Lock()
            p.Produce(&ckafkago.Message{
                TopicPartition: ckafkago.TopicPartition{Topic: &topic, Partition: ckafkago.PartitionAny},
                Key:            key,
                Value:          m,
            }, nil)
            mu.Unlock()
        }
    })
}

Start a kafka service locally and run the benchmark.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
$go test -bench .
goos: linux
goarch: amd64
pkg: kafka_clients
cpu: Intel(R) Core(TM) i7-9700 CPU @ 3.00GHz
BenchmarkSaramaAsync-4                            802070          2267 ns/op         294 B/op          1 allocs/op
BenchmarkSaramaAsyncInParalell-4                 1000000          1913 ns/op         294 B/op          1 allocs/op
BenchmarkKafkaGoAsync-4                          1000000          1208 ns/op         376 B/op          5 allocs/op
BenchmarkKafkaGoAsyncInParalell-4                1768538          703.4 ns/op        368 B/op          5 allocs/op
BenchmarkConfluentKafkaGoAsync-4                 1000000          3154 ns/op         389 B/op         10 allocs/op
BenchmarkConfluentKafkaGoAsyncInParalell-4        742476          1863 ns/op         390 B/op         10 allocs/op

We see that although sarama has an advantage in memory allocation, the overall performance is segmentio/kafka-go optimal.

VI. Summary

This article compared three mainstream kafka client packages from the Go community: Shopify/sarama, confluent-kafka-go, and segmentio/kafka-go. sarama is the most widely used and the one I have studied for the longest time, but it also has the most pitfalls and is abandoned; confluent-kafka-go is official but based on cgo. go is official but based on cgo, so I have no choice but to give up; finally, we chose segmentio/kafka-go, which has been running online for some time and no major problems have been found so far.

However, the comparison in this article is limited to the scenario of Producer, which is an “incomplete” introduction. We will add more practical experience in more scenarios later.

The source code in this article can be downloaded from here.