https://www.splunk.com/en_us/blog/it/effectively-once-semantics-in-apache-pulsar.html

The article by pulsar describes in detail how Effectively once is supported, so I will not repeat it in this article, but will just summarise the conclusions described in the article below. The source code will be parsed later.

In order to implement Effectively once, pulsar supports it in two ways.

  1. Effectively-once publishing: ensuring that messages are sent only once
  2. Effectively-once consumer: ensuring that messages are consumed only once

Effectively-once publishing

pulsar supports the guarantee that only one copy of a message will be stored in pulsar in the event of extreme conditions such as broker failure, producer failure, network failure, etc. It relies heavily on the message deduplication feature in pulsar, which provides switches to control whether or not message deduplication is activated, from namespaces, to topics and other dimensions.

1
pulsar-admin namespaces set-deduplication $MY_NAMESPACE --enable

The reliable delivery of messages to pulsar is functionally accomplished by: constant retry on the producer side + message deduplication on the broker side.

So the producer also needs to set up a constant retry configuration. This is achieved by the following configuration

1
2
3
4
ProducerConfiguration conf = new ProducerConfiguration();
conf.setProducerName("my-producer");
conf.setSendTimeout(0, TimeUnit.SECONDS);
Producer producer = client.createProducer(TOPIC_NAME, conf);

message deduplication The implementation principle relies on the broker side maintaining a highSequenceId for each producer, the sequenceId is incremental and can be controlled by the user. Each time a message arrives at the broker, it is determined whether it is a duplicate message based on whether it is less than the current highSequenceId.

A more detailed source code explanation of message deduplication will follow, so I won’t go into too much detail here, but rather describe a limitation of this design. (Readers can see the limitations of this feature after reading the more detailed source code explanation)

Effectively-once publishing in practice only makes sense when the messages are coming from a replayable source as opposed to a non-replayable source (for example online HTTP requests). For non-replayable sources, there’s no way to re-send the previous pending messages after a crash.

pulsar uses the sequenceId design for higher performance message deduplication, with two limitations.

  1. it cannot determine non-replayable source deduplication: for example, http requests, each of which is stateless and random, cannot be associated with a sequenceId.
  2. only determine whether the most recent message is a duplicate: pulsar was originally designed to cope with the various failures of the producer and broker when passing to achieve a precise production of a message, and not to solve the business message idempotent. So if your scenario is that you have a history of messages that could be delivered repeatedly, and then want pulsar to de-duplicate messages based on some custom id (idmpotentId), then pulsar does not support this.

In summary, pulsar’s implementation of message deduplication using sequenceId is very high performance (only one loss of hash and judgement), and snapshots and persistence are performed asynchronously. To support both of these features, pulsar would have to maintain messageId’s for all messages over time, and would have to be designed to determine them efficiently.

Effectively-once consumer

pulsar only supports two consumption modes, subscribe and reader.

In subscribe mode, pulsar saves the consumer’s consumption loci and casts the next message according to the latest loci, and the user can explicitly and actively ack the loci after consuming the message.

1
2
3
4
5
6
7
Consumer consumer = client.subscribe(MY_TOPIC, MY_SUBSCRIPTION_NAME);

while (true) {
    Message msg = consumer.receive();
    // Process the message...
    consumer.acknowledge(msg);
}

For the subscribe mode, there are several possible cases of repeated consumption:

  1. broker failure: When the broker fails, the user may consume the data and process it, but not succeed in the ack, then the broker will recast the message when it recovers.
  2. consumer failure: Like broker failure, the message is consumed and processed, but the consumer is down before the ack, then the broker will also re-cast the message.
  3. network failure: network timeouts etc. will also cause the consumer to fail in submitting the ack, and the broker will re-cast the message.
  4. Duplicate data consumption (special): As described in the limitations of the Effectively-once publishing conclusion, it is possible that the pulsar itself stores duplicate data, and then even without the above three failures, the business side consumes duplicate data.

The first three of these faults can be solved by using pulsar’s reader mode + relying on external storage for the current offset. However, to achieve idempotency in the face of inherently duplicate data, one must use a store that stores all message ids to do so.

With reader mode, the user can actively specify that the pull starts with a certain message, and the user simply saves the bits consumed at the moment, for example by storing the lastMessageId and business state changes in a single transaction to be committed.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
MessageId lastMessageId = recoverLastMessageIdFromDB();
Reader reader = client.createReader(MY_TOPIC, lastMessageId,
                                    new ReaderConfiguration());

while (true) {
    Message msg = reader.readNext();
    byte[] msgId = msg.getMessageId().toByteArray();

    // Process the message and store msgId atomically
}

To sum up, in order to achieve a complete consumer accurate one-time consumption, if the producer side can not guarantee that the messages sent without duplicate messages, the consumer side needs to use a large idempotent persistent state storage to achieve, of course, this idempotent state can be configured according to the business scenario of a certain elimination mechanism.

Message Deduplication source code analysis

As described in Effectively-once publishing above, we know that pulsar uses the maxSequenceId of the maintained producer to guarantee the de-duplication of messages when a producer retries. Here is a brief parsing of the source code.

All the message de-duplication logic is implemented in the MessageDeduplication class, and each PersistentTopic object holds a MessageDeduplication object.

How can I tell if a message is a duplicate?

It relies heavily on two set judgements:

1
2
3
@VisibleForTesting
final ConcurrentOpenHashMap<String, Long> highestSequencedPushed = new ConcurrentOpenHashMap<>(16, 1);
final ConcurrentOpenHashMap<String, Long> highestSequencedPersisted = new ConcurrentOpenHashMap<>(16, 1);

These two collections store the produceName corresponding to the largest seuenceId, one is persistent and one is non-persistent. The daily judgement is made by non-persistent judgement (high speed) and a thread in the background periodically takes snapshots, ultimately determining whether the message is a duplicate or not relying mainly on persistent.

PersistentTopic will first call MessageDeduplication#isDuplication when it receives a message to write to determine if it is a duplicate message. The logic for this is simple and is shown below with the code omitted.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf headersAndPayload) {
    // Synchronize the get() and subsequent put() on the map. This would only be relevant if the producer
    // disconnects and re-connects very quickly. At that point the call can be coming from a different thread
    synchronized (highestSequencedPushed) {
        Long lastSequenceIdPushed = highestSequencedPushed.get(producerName);
        if (lastSequenceIdPushed != null && sequenceId <= lastSequenceIdPushed) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Message identified as duplicated producer={} seq-id={} -- highest-seq-id={}",
                        topic.getName(), producerName, sequenceId, lastSequenceIdPushed);
            }

            // Also need to check sequence ids that has been persisted.
            // If current message's seq id is smaller or equals to the
            // lastSequenceIdPersisted than its definitely a dup
            // If current message's seq id is between lastSequenceIdPersisted and
            // lastSequenceIdPushed, then we cannot be sure whether the message is a dup or not
            // we should return an error to the producer for the latter case so that it can retry at a future time
            Long lastSequenceIdPersisted = highestSequencedPersisted.get(producerName);
            if (lastSequenceIdPersisted != null && sequenceId <= lastSequenceIdPersisted) {
                return MessageDupStatus.Dup;
            } else {
                return MessageDupStatus.Unknown;
            }
        }
        highestSequencedPushed.put(producerName, highestSequenceId);
    }
    return MessageDupStatus.NotDup;
}      

It can be seen that there are three states returned.

  • MessageDupStatus.NotDup(non-duplicate message): If the sequenceId of the message sent by the producer is greater than the highSequenceId of the memory maintained by PersistentTopic, the message must be a duplicate.
  • MessageDupStatus.Dup(duplicate message): if sequenceId < highSequenceId, and sequenceId < highPersistentSequenceId, then it must be a duplicate message. persistentTopic will return confirmation that it is a duplicate message.
  • MessageDupStatus.Unknown: If sequenceId < highSequenceId and sequenceId > highPersistentSequenceId, then it is an unknown status; PersistentTopic will throw DupUnknownException to make the producer side retry

The Unknown state occurs because the highPersistentSequenceId set and the highSequenceId set are not maintained at the same point in time.

  • highSequenceId: The highSequenceId collection is updated each time the result is determined to be NotDup (before the message is persisted).
  • highPersistentSequenceId: The highPersistentSequenceId collection is updated after the message is actually written to bk (after the message is persisted).

The original intention of this design is because the execution of pulsar is asynchronous, after the current message is judged, if the message has not been written to bk successfully, the next message comes again, for high concurrency processing, this should not wait for the previous message to be written before doing the judgment of that message, so there is a collection of memory and a collection of persistence.

In most cases where a bk can be written to are successful, highSequenceId and highPersistentSequenceId are able to be consistent, so the Unknown state does not occur. In the event of a write bk exception, the highPersistentSequenceId will not be updated and the Unknown state will occur, and the PersistentTopic will call MessageDeduplication# when it receives the Unknown and Dup. resetHighestSequenceIdPushed() method to overwrite the highSequenceId collection with highPersistentSequenceId to keep the two collections consistent.

How is the MessageDeduplication state persisted?

Each broker in pulsar is stateless and if a broker hangs, the topic responsible for that broker is scheduled to run on another available broker. The main state to be persisted in MessageDeduplication is the highestSequencedPersisted collection.

The broker starts a timed thread to call the MessageDeduplication#takeSnapshot method to persist the state snapshot based on the user’s configuration at startup. The state is written to the bk, using the ManagedCursor’s properties metadata store.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void takeSnapshot(PositionImpl position) {
    if (log.isDebugEnabled()) {
        log.debug("[{}] Taking snapshot of sequence ids map", topic.getName());
    }
    Map<String, Long> snapshot = new TreeMap<>();
    highestSequencedPersisted.forEach((producerName, sequenceId) -> {
        if (snapshot.size() < maxNumberOfProducers) {
            snapshot.put(producerName, sequenceId);
        }
    });

    managedCursor.asyncMarkDelete(position, snapshot, new MarkDeleteCallback() {
        @Override
        public void markDeleteComplete(Object ctx) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Stored new deduplication snapshot at {}", topic.getName(), position);
            }
            lastSnapshotTimestamp = System.currentTimeMillis();
        }

        @Override
        public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
            log.warn("[{}] Failed to store new deduplication snapshot at {}", topic.getName(), position);
        }
    }, null);
}

So the question is, since state is persisted asynchronously, how does pulsar ensure that unpersisted state can be restored correctly after drifting?

When the broker starts, it will first read the latest state stored in the cursor, then it will start from the position corresponding to that state, re-consume it to the latest position of the ledger, and then come to guarantee the recovery to the latest sequenceId of each producer under that topic. replayCursor method

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
private CompletableFuture<Void> recoverSequenceIdsMap() {
    // Load the sequence ids from the snapshot in the cursor properties
    managedCursor.getProperties().forEach((k, v) -> {
        highestSequencedPushed.put(k, v);
        highestSequencedPersisted.put(k, v);
    });

    // Replay all the entries and apply all the sequence ids updates
    log.info("[{}] Replaying {} entries for deduplication", topic.getName(), managedCursor.getNumberOfEntries());
    CompletableFuture<Void> future = new CompletableFuture<>();
    replayCursor(future);
    return future;
}

Summary

Pulsar’s implementation of the Effectively once semantics requires the user to work with external storage, Pulsar just provides the api and the best solution. On the producer side, Pulsar implements producer de-duplication by maintaining the relationship between the producer’s corresponding highSequenceId, which can solve producer idempotency for producers with traceability. If the guarantee of Effectively once semantics is needed, a suitable solution needs to be made according to the specific business scenario.

Business scenario 1: Producer with traceability

For example, the data on the producer side is read from a file and the sequenceId can be used to guarantee producer idempotency. Then you can use the producer message deduplication + consumer reader pattern, so that the consumer side only needs to rely on external storage of the lastMessageId consumed at the moment.

Business scenario 2: Producers that are not traceable

For example, if the data on the producer side is sent from an http request, then pulsar producer message deduplication cannot be used, so the consumer side needs to rely on external storage to store all messageId’s (of business properties), thus realising the Effectively once semantics.


Reference https://shibd.github.io/pulsar-effectively-once/