The data in our database is always changing, and sometimes we want to listen to the changes in the database data and react according to the changes, such as updating the cache corresponding to the changed data, incrementally synchronizing to other data sources, detecting and auditing the data, and so on. And this technology is called Change Data Capture. For this technology we may know a well-known domestic framework Canal , very good! But one limitation of Canal is that it can only be used for Mysql change data capture. Today to introduce another more powerful distributed CDC framework Debezium .

Debezium

I’m sure most casual developers are unfamiliar with the Debezium framework, but the company it belongs to will be no stranger to you.

redhat

That’s right the most successful in the open source world, Red Hat. Debezium is a streaming processing framework for capturing data changes, open source and free. Debezium monitors database row-level (row-level) data changes in near real-time and can react to changes. And only committed changes are visible , so do not worry about transaction problems or changes are rolled back . Debezium provides a unified model for all database change events, so there is no need to worry about the complexity of each database system. Debezium provides support for databases such as MongoDB, MySQL, PostgreSQL, SQL Server, Oracle, DB2, etc.

In addition, the Kafka Connector enables the development of an event stream-based change capture platform with high fault tolerance and extreme scalability.

Debezium

As shown in the figure, Debezium Kafka Connectors for MySQL and PostgresSQL are deployed to capture change events to both types of databases and then transfer those changes to other systems or databases (e.g. Elasticsearch, data warehouses, analytics systems) or caches via a downstream Kafka Connector.

Another way to play with this is to build Debezium into the application to make a message bus-like facility to pass data change events to subscribed downstream systems.

Debezium

Debezium also does a lot of work on data integrity and availability. Debezium keeps a history of database data changes with persistent, copy-backed logs, so your application can be stopped and restarted at any time without missing events that happened when it stopped running, ensuring that all events are handled correctly and completely.

Later I will demonstrate a Spring Boot data capture system integrated with Debezium.

Spring Boot Integration with Debezium

A theoretical introduction doesn’t give you a visual sense of what Debezium can do, so next I’ll use the Embedded Debezium engine to demonstrate it.

Debezium

As shown above, when we change a row of data in the MySQL database, the change event is captured by Debezium listening to binlog log changes in real time, and then the change event model is obtained and responded (consumed). Next, let’s set up the environment.

MySQL with binlog logging turned on

To facilitate the use of MySQL’s Docker container here, the corresponding script is.

1
2
3
4
5
6
# 运行mysql容器 
docker run --name mysql-service -v d:/mysql/data:/var/lib/mysql -p 3306:3306 -e TZ=Asia/Shanghai -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7 --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci --default-time_zone="+8:00"
# 设置binlog位置
docker exec mysql-service bash -c "echo 'log-bin=/var/lib/mysql/mysql-bin' >> /etc/mysql/mysql.conf.d/mysqld.cnf"
# 配置 mysql的server-id
docker exec mysql-service bash -c "echo 'server-id=123454' >> /etc/mysql/mysql.conf.d/mysqld.cnf"

The above script runs a MySQL container with user name root and password 123456 and mounts data to local path d:/mysql/data with binlog logging enabled and server-id set to 123454, which will be used later in the configuration.

Note that if you don’t use the root user, you need to ensure that the user has SELECT , RELOAD , SHOW DATABASES , REPLICATION SLAVE , REPLICATION CLIENT privileges.

Spring Boot integration with embedded Debezium

Debezium dependencies

The following dependencies are added to the Spring Boot application.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-api</artifactId>
        <version>${debezium.version}</version>
    </dependency>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-embedded</artifactId>
        <version>${debezium.version}</version>
    </dependency>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-connector-mysql</artifactId>
        <version>${debezium.version}</version>
    </dependency>

Declare the configuration

Then declare the required configuration.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
     * Debezium 配置.
     *
     * @return configuration
     */
    @Bean
    io.debezium.config.Configuration debeziumConfig() {
        return io.debezium.config.Configuration.create()
//            连接器的Java类名称
                .with("connector.class", MySqlConnector.class.getName())
//            偏移量持久化,用来容错 默认值
                .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
//                偏移量持久化文件路径 默认/tmp/offsets.dat  如果路径配置不正确可能导致无法存储偏移量 可能会导致重复消费变更
//                如果连接器重新启动,它将使用最后记录的偏移量来知道它应该恢复读取源信息中的哪个位置。
                .with("offset.storage.file.filename", "C:/Users/n1/IdeaProjects/spring-boot-debezium/tmp/offsets.dat")
//                捕获偏移量的周期
                .with("offset.flush.interval.ms", "6000")
//               连接器的唯一名称
                .with("name", "mysql-connector")
//                数据库的hostname
                .with("database.hostname", "localhost")
//                端口
                .with("database.port", "3306")
//                用户名
                .with("database.user", "root")
//                密码
                .with("database.password", "123456")
//                 包含的数据库列表
                .with("database.include.list", "etl")
//                是否包含数据库表结构层面的变更,建议使用默认值true
                .with("include.schema.changes", "false")
//                mysql.cnf 配置的 server-id
                .with("database.server.id", "123454")
//                 MySQL 服务器或集群的逻辑名称
                .with("database.server.name", "customer-mysql-db-server")
//                历史变更记录
                .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
//                历史变更记录存储位置 
                .with("database.history.file.filename", "C:/Users/n1/IdeaProjects/spring-boot-debezium/tmp/dbhistory.dat")
                .build();
    }

The configuration is divided into two parts.

Instantiating the Debezium Engine

The application needs to start a Debezium engine for the running Mysql Connector, which runs as an asynchronous thread that wraps the entire Mysql Connector connector lifecycle. Declaring an engine requires the following steps.

  • Declare the format of the received data change capture message, JSON, Avro, Protobuf, Connect, CloudEvents, etc. are provided.
  • Load the configuration defined above.
  • Declare the function method for consuming data change events.

Pseudocode for the declaration.

1
2
3
4
DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
        .using(configuration.asProperties())
        .notifying(this::handlePayload)
        .build();

The handlePayload method is.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) {
    recordChangeEvents.forEach(r -> {
        SourceRecord sourceRecord = r.record();
        Struct sourceRecordChangeValue = (Struct) sourceRecord.value();

        if (sourceRecordChangeValue != null) {
            // 判断操作的类型 过滤掉读 只处理增删改   这个其实可以在配置中设置
            Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));

            if (operation != Envelope.Operation.READ) {
                String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER;
                // 获取增删改对应的结构体数据
                Struct struct = (Struct) sourceRecordChangeValue.get(record);
                // 将变更的行封装为Map
                Map<String, Object> payload = struct.schema().fields().stream()
                        .map(Field::name)
                        .filter(fieldName -> struct.get(fieldName) != null)
                        .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
                        .collect(toMap(Pair::getKey, Pair::getValue));
                // 这里简单打印一下
                System.out.println("payload = " + payload);
            }
        }
    });
}

The engine is started and closed to fit the Spring bean lifecycle.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Data
public class DebeziumServerBootstrap implements InitializingBean, SmartLifecycle {

    private final Executor executor = Executors.newSingleThreadExecutor();
    private DebeziumEngine<?> debeziumEngine;

    @Override
    public void start() {
        executor.execute(debeziumEngine);
    }

    @SneakyThrows
    @Override
    public void stop() {
        debeziumEngine.close();
    }

    @Override
    public boolean isRunning() {
        return false;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        Assert.notNull(debeziumEngine, "debeziumEngine must not be null");
    }
}

Start

Start the Spring Boot project and you can add, delete and change data to the database using various means and observe that it will print something like the following.

1
payload = {user_id=1123213, username=felord.cn, age=11 , gender=0, enabled=1}

It means that Debezium has listened for changes to the database. You can think of scenarios in which this technique is useful.