Apache Pulsar is a multi-tenant, high-performance inter-service messaging solution that supports multi-tenancy, low latency, read/write separation, cross-territory replication, fast scaling, flexible fault tolerance, and other features. This article will briefly introduce some concepts and principles of Pulsar server-side message acknowledgement.

## Preface

Before transactional messages were available, the highest level of messaging assurance supported in Pulsar was to ensure that a Producer’s messages were saved exactly once on a single partition through the Broker’s message de-duplication mechanism. When a Producer fails to send a message, the Broker ensures that the message is persisted only once, even if it retries sending the message. However, in the Partitioned Topic scenario, the Producer has no way to guarantee message atomicity across multiple partitions.

When the Broker is down, the Producer may fail to send a message, and if the Producer does not retry or has exhausted the number of retries, the message will not be written to Pulsar. on the consumer side, the current message acknowledgement is a best-effort operation that does not ensure that the message will be acknowledged successfully, and if the message acknowledgement fails, this will cause the message to be re-delivered and the consumer will receive Pulsar can only guarantee that the consumer will consume the message at least once.

Similarly, Pulsar Functions are only guaranteed to process a single message on an idempotent function once, i.e., the business needs to guarantee idempotency. It cannot guarantee that processing multiple messages or outputting multiple results will only happen once.

For example, the execution steps of a Function are: consume messages from Topic-A1 and Topic-A2, then aggregate the messages in the Function (e.g., time window aggregation calculation), store the results in Topic-B, and finally acknowledge (ACK) the messages in Topic-A1 and Topic-A2 respectively. The Function may fail between “outputting the result to Topic-B” and “acknowledging the message”, or even when acknowledging a single message. This will cause all (or some) of the messages in Topic-A1 and Topic-A2 to be re-passed and re-processed, and new results to be generated, resulting in incorrect calculation results for the entire time window.

Therefore, Pulsar needs transactional mechanisms to guarantee exact-once semantics (Exactly-once), where both production and consumption are guaranteed to be exact-once, without duplication and without data loss, even in the case of Broker downtime or Function processing failure.

## Introduction to Transactions

Pulsar transaction messages are designed to guarantee the precise one-time semantics of the Pulsar Function, so that when a Producer sends multiple messages to different Partitions, they can all succeed or all fail at the same time. It can also ensure that when Consumers consume multiple messages, they can all confirm success or all fail at the same time. Of course, it is also possible to include both production and consumption in the same transaction, so that either all succeed or all fail.

Let’s take the Function scenario at the beginning of this section as an example to demonstrate the scenario of production and consumption in the same transaction.

First, we need to enable transactions in broker.conf.

\transactionCoordinatorEnabled=true.

Then, we create the PulsarClient and the transaction object separately. Both the producer and consumer APIs need to take this transaction object with them to ensure that they are in the same transaction.

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27  //创建client，并启用事务 PulsarClient pulsarClient = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .enableTransaction(true) .build(); // 创建事务Transaction txn = pulsarClient .newTransaction() .withTransactionTimeout(1, TimeUnit.MINUTES) .build() .get(); String sourceTopic = "public/default/source-topic"; String sinkTopic = "public/default/sink-topic"; //创建生产者和消费者Consumer sourceConsumer = pulsarClient.newConsumer(Schema.STRING) .topic(sourceTopic) .subscriptionName("my-sub") .subscribe(); Producer sinkProducer = pulsarClient.newProducer(Schema.STRING) .topic(sinkTopic) .create(); // 从原Topic中消费一条消息，并发送到另外一个Topic中，它们在同一个事务内 Message message = sourceConsumer.receive(); sinkProducer.newMessage(txn).value("sink data").sendAsync(); sourceConsumer.acknowledgeAsync(message.getMessageId(), txn); // 提交事务 txn.commit().get(); 

Let’s take the Function example at the beginning of this subsection.

When no transaction is enabled, if Function writes the result to SinkTopic first, but the message acknowledgement fails (Step-4 below fails), this causes the message to be recast (Step-1 below), and Function will recalculate a result and send it to SinkTopic again, so that a piece of data is recalculated and recast twice.

If the transaction is not enabled, Function will acknowledge the message first and then write the data to SinkTopic (Step-4 and then Step-3). At this time, if the writing to SinkTopic fails and the message from SourceTopic has been acknowledged, the data will be lost and the final calculation result will be inaccurate.

If the transaction is turned on, all the previous steps will be rolled back as long as there is no commit at the end, and the produced messages and confirmed messages are rolled back, so that the whole process can be started again without double calculation and data loss. The entire timing diagram is shown below.

We just need to follow the steps above and understand exactly what each step does to get a clear picture of how the whole transaction is implemented. In the following subsections, we will go through it step by step.

## Transaction flow

Before understanding the whole transaction flow, we introduce the components of transactions in Pulsar, which are commonly found in distributed transactions, such as TC, TM, RM, etc.

1. TM: Transaction initiator. Defines the boundary of the transaction and is responsible for informing the TC that the distributed transaction starts, commits, and rolls back. In Pulsar transactions, this role is played by each PulsarClient. 2.
2. RM: Resource manager for each node. A TopicTransactionBuffer and a PendingAckHandle are defined in Pulsar to manage the resources produced and consumed respectively. TC: Transaction coordinator, a module used to process transaction requests from Pulsar Client to track its transaction status. Each TC is identified by a unique id (TCID), and TCs maintain their own transaction metadata stores independently of each other. TCIDs are used to generate transaction IDs and broadcast notifications to different nodes to commit and roll back transactions.

In the following, we introduce the entire transaction flow with a Producer, the gray part in the diagram represents the storage, and there are two existing storage implementations, in-memory and Bookkeeper.

1. A Pulsar cluster may have multiple TCs (16 by default), PulsarClient needs to select which TC to use when creating a transaction, and all subsequent transactions will be sent to this TC for creation, commit, rollback, etc. The selection rule is simple, as the TC’s Topic is fixed, first Lookup to see all partitions in the The rules for selecting a TC are simple.
2. Open transaction. code through pulsarClient.newTransaction() to open a transaction, the Client will send a newTxn command to the corresponding TC, TC generated and returned a new transaction ID object, the object stores the TC ID (for subsequent requests to find the node) and the transaction ID, the transaction ID is incremental, the same TC The transaction ID is incremental, and the same TC generates IDs without duplication.
3. The Topic may be a partitioned topic and the message will be sent to different Broker nodes. In order to let the TC know which nodes the message will be sent to (the TC needs to notify these nodes when the subsequent transaction commits and rolls back), the Producer will register the partition information to the TC before sending the message. This way, the subsequent TC knows which nodes’ RMs are to be notified to commit and rollback the transaction.
4. Sending messages. This step is not much different from the normal message sending, but the message needs to go through the RM on each Broker first, which is defined as TopicTransactionBuffer in Pulsar. At this point, although the message has been written to the original Topic, but the consumer is not visible, the transaction isolation level in Pulsar is Read Commit.
5. After the Producer sends all the messages, the transaction is submitted and the TC receives the submission request and broadcasts a notification to the RM node to submit the transaction and update the corresponding metadata so that the message can be consumed by the consumer.

How are the messages in Setp-4 guaranteed to persist into the Topic and not be visible?

A maxReadPosition property is stored in each Topic to identify the maximum position that can be read by the current Consumer. When the transaction has not been committed yet, the maxReadPosition will not change although the data has been persisted to the Topic. Therefore, the Consumer cannot consume the uncommitted data.

The message has been persisted and finally the transaction has to be rolled back, how to handle this part of data?

If the transaction is to be rolled back, the transaction will be recorded as Aborted in RM. If a transaction is found to have ended, it will be filtered out directly (the message will be confirmed internally).

What happens if the transaction is partially successful and partially unsuccessful when it is finally committed?

There is a timing object called TransactionOpRetryTimer in TC, all transactions that are not all successfully broadcast are given to it to retry until all nodes are finally all successful or exceed the retry count. So won’t there be consistency problems with this process? First let’s think about what the scenario is when this happens. Usually some Broker nodes are down causing these nodes to be unavailable or network jitter causing temporary unreachability. In Pulsar, if a Broker is down, the Topic attribution is transferred, and unless the entire cluster is unavailable, a new Broker can always be found and resolved by retrying. In the process of Topic attribution transfer, maxReadPosition is not changed, and consumers cannot consume messages. Even if the whole cluster is unavailable, Timer will still let the transaction commit by retrying when the cluster is subsequently restored.

Will it block the consumption of normal messages if the transaction is not completed?.

Yes. Suppose we open a transaction, send a few transaction messages, but do not commit or roll back the transaction. At this point, if we continue to send ordinary messages to the Topic, the maxReadPosition will not change because the transaction messages have not been committed, and the consumer will not consume the new messages and will block the consumption of ordinary messages. This is the expected behavior, in order to ensure the order of messages. And different Topics will not affect each other, because each Topic has its own maxReadPosition.

## Implementation of transactions

We can divide the implementation of the transaction into five parts: environment, TC, producer RM, consumer RM, and client. Since the management of production and consumer resources is separate, we will introduce them separately.

### Environment setup

The setup of the transaction coordinator needs to start from the initialization of the Pulsar cluster. We have described how to build the cluster in Chapter 1, and the first time you need to execute a command to initialize the cluster metadata in ZooKeeper. At this point, Pulsar will automatically create a SystemNamespace and a Topic in it, the full Topic is shown below.

persistent://pulsar/system/transaction_coordinator_assign

This is a PartitionedTopic with 16 partitions by default, each partition is a separate TC. we can set the number of TCs with the –initial-num-transaction-coordinators parameter.

### TCs and RMs

• TransactionMetadataStoreService is the overall coordinator of transactions on the Broker and we can think of it as the TC.
• TransactionMetadataStore is used by TC to store metadata of transactions, such as: newly created transactions, partitions registered up by Producer. This interface has two implementation classes, one is the implementation that saves the data to Bookkeeper and the other saves the data directly in memory.
• TransactionTimeoutTracker The server side is used to track transactions that time out.
• Various Providers, which are factory classes and require no special attention.
• TopicTransactionBuffer The RM of the producer, when the transaction message is sent to the Broker, the RM acts as a proxy to record some metadata and then store the message into the original Topic. internally contains the TopicTransactionBufferRecover and TransactionBufferSnapshotService is included internally. RM’s metadata is structured as snapshots and refreshed regularly, and these two objects are responsible for snapshot recovery and snapshot preservation, respectively. Since the production messages are in Topic, there will be one for each Topic/Partition.
• PendingAckHandle Consumer’s RM, one for each subscription since consumption is on a subscription basis.

Since online environments usually use persistent transactions, the following principles are based on persistent implementations.

All transaction-related services are initialized when the BrokerService is started, and each Partition is a Topic in the TC Topic, and the TransactionMetadataStoreService, when initialized, restores the previously persisted metadata from the The TransactionMetadataStoreService restores the previously persisted metadata from Bookkeeper during initialization. Each TC will save the following metadata.

• newTransaction. creates a new transaction and returns a unique transaction ID object.
• addProducedPartitionToTxn. Register information about the Partition for which the producer wants to send a message for subsequent TCs to notify the corresponding node of the RM commit/rollback transaction.
• addAckedPartitionToTxn. Register information about the Partition for which the consumer wants to consume the message, for subsequent TCs to notify the corresponding node of the RM commit/rollback transaction.
• endTransaction. ends a transaction, either commit, rollback or timeout, etc.

If we set enableTransaction=true when initializing PulsarClient, an additional TransactionCoordinatorClient will be initialized when the Client is initialized. Since the TC Tenant, Namespace and Topic names are fixed, the TC client can discover all the Partition information through Lookup and cache it locally, and then when the Client creates a transaction, it will poll to select the TC to be used for the next transaction from this cache list.

#### Producer Transaction Management

Next we will open a transaction.

 1 2 3 4 5 6  // 创建事务 Transaction txn = pulsarClient .newTransaction() .withTransactionTimeout(1, TimeUnit.MINUTES) .build() .get(); 

In the above code, a newTxn is sent to a TC and a Transaction object is obtained.

When a transaction is opened, TransactionCoordinatorClient selects a TC from the cache and sends a newTxn command to the Broker where the selected TC is located, the structure of the command is defined as follows.

 1 2 3 4 5  message CommandNewTxn { required uint64 request_id = 1; optional uint64 txn_ttl_seconds = 2 [default = 0]; optional uint64 tc_id = 3 [default = 0]; } 

Since the TCID is included in the command, there is no problem even if multiple TCs are managed by the same Broker, which will find the corresponding TC and process the request based on the TCID.

Before sending a message, the Producer sends an AddPartitionToTxn command to the Broker, and only after it succeeds will it continue to send the real message. After the transaction message reaches the Broker, it is passed to the TransactionBuffer for processing. After passing the checks, the data will be saved in the TransactionBuffer, which is just a proxy (it will save some metadata), and it will eventually call the original Topic to save the message. constructor needs to pass in the original Topic object. We can think of TransactionBuffer as an RM on the Producer side.

The TransactionBuffer stores two kinds of information, one is the original message, which is stored directly using the Topic. The other is a snapshot, which stores the Topic name, the maximum readable position information (to avoid Consumers reading uncommitted data), and the list of transactions that have been aborted in that Topic.

The TransactionBuffer receives the message and writes an abortMarker directly to the original Topic to mark that the transaction has been aborted, and then updates the list in memory. abortMarker is also an ordinary message, but the metadata in the message header is different from the ordinary message. is also a normal message, but the metadata in the message header is not the same as in a normal message. This data is stored in the snapshot, mainly for fast data recovery after a Broker restart. If the snapshot data is lost, TopicTransactionBufferRecover reads all the data in the Topic from end to end, and updates the break list in memory for each abortMarker encountered. If a snapshot is available, we only need to read from the beginning at the snapshot to recover the data.

### Consumer transaction management

The consumer needs to bring the transaction object with it when the message is acknowledged, identifying the use of transaction Ack.

\consumer.acknowledge(message, txn);

Each subscription on the server side has a PendingAckHandle object to manage the transaction Ack information, which we can consider as the RM that manages the consumer data. when the Broker finds a message acknowledgement request with transaction information, it will forward the request to the corresponding PendingAckHandle for processing.

All message acknowledgements with transactions turned on do not directly modify the MarkDeleted location on the cursor, but are first persisted to an additional Ledger, a copy of which is also cached in the Broker’s memory. This Ledger is managed by the pendingAckStore, which we can think of as the Consumer RM’s log.

When the transaction is committed, RM will call the consumer’s corresponding Subscription and perform all the message acknowledgement operations just now. At the same time, a special Marker is also written in the log Ledger to identify that the transaction needs to be committed. When the transaction is rolled back, an AbortMarker will also be recorded in the log first, and then triggers the Message to be re-delivered.

The log stored in the pendingAckStore is the redo log. When this component is initialized, it first reads all redo logs from the log Ledger, thus reconstructing the previous message acknowledgement information in memory. Because message acknowledgement is an idempotent operation, if the Broker inadvertently goes down, it only needs to re-execute the operation in the redo log. When the messages in the subscription are actually acknowledged, the corresponding redo log in the pendingAckStore can also be cleaned up. The cleanup is as simple as moving the MarkDelete location of the Ledger in the pendingAckStore.