Background

Today, I received feedback from the business team that an online application failed to send a message to Pulsar, and after checking the logs, I learned that a java.lang.InterruptedException exception was thrown when sending the message.

After communicating with the business, we learned that the message sending was triggered in a gRPC interface, and the exception lasted about half an hour before returning to normal, which is the background of the whole problem.

Troubleshooting

After getting the issue, we first checked whether it was a common problem, and checked other applications without finding similar exceptions; we also checked the monitoring of the Pulsar broker, and there were still no fluctuations and exceptions in this time period.

This can be initially ruled out as a problem of the Pulsar server.

The next step is to check the load of the application during that time, from the application QPS to the memory situation of each JVM, there is still no significant change.

Pulsar source code troubleshooting

Since it seems that the application itself and the Pulsar broker are fine, we have to look at the exception itself to troubleshoot it.

The first step is to find out what version of Pulsar-client is being used, because the business uses the official SDK-based springboot starter, so the first step is to check whether this starter has any impact.

By looking at the source code basically ruled out the suspicion of starter, which simply encapsulates the function of SDK.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1027) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:91) at 
java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) 
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:89) ... 49 common frames omitted Caused by: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException 
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775) 
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ(ProducerImpl.java:393) 
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ$accessor$i7NYMN6i(ProducerImpl.java) 
at org.apache.pulsar.client.impl.ProducerImpl$auxiliary$EfuVvJLT.call(Unknown Source) 
at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:86) 
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java) 
at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:292) 
at org.apache.pulsar.client.impl.ProducerImpl.internalSendWithTxnAsync(ProducerImpl.java:363) 
at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendWithTxnAsync(PartitionedProducerImpl.java:191) 
at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendAsync(PartitionedProducerImpl.java:167) 
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:103) 
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:82) ... 49 common frames omitted Caused by: java.lang.InterruptedException: null
at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1343) 
at java.base/java.util.concurrent.Semaphore.acquire(Semaphore.java:318) 
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:758)

The next step is to analyze the stack, because some of the source code of Pulsar-client is not directly packaged into the dependencies, and many lines of code are not correct if decompiled, so we need to pull the official source code locally and switch to the for branch to view it.

I switched the branch to branch-2.8 directly here.

Starting at the top of the stack to troubleshoot TypedMessageBuilderImpl.java:91:

TypedMessageBuilderImpl

It looks like an exception was thrown when the message was sent internally asynchronously.

Moving on, see here.

1
2
java.lang.InterruptedException 
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775) at

org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775)

It seems to be correct here, but the number of lines of code is obviously not correct; since this branch of 2.8 has also been fixed for several versions, it is normal for the number of lines of code to be inconsistent with the latest code due to changes in the middle.

1
semaphore.get().acquire();

In order to confirm whether it is really this line of code, I turned the file forward a few versions to finally confirm that it is this line of code is correct.

Let’s open the source code for java.util.concurrent.Semaphore#acquire().

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
/**
 * <li>has its interrupted status set on entry to this method; or
 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
 * for a permit,
 * </ul>
 * then {@link InterruptedException} is thrown and the current thread's
 * interrupted status is cleared.
 *
 * @throws InterruptedException if the current thread is interrupted
 */
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted() ||
        (tryAcquireShared(arg) < 0 &&
         acquire(null, arg, true, true, false, 0L) < 0))
        throw new InterruptedException();
}    

The source code shows that the acquire() function does respond to interrupts, and throws an InterruptedException once it detects that the current thread has been interrupted.

Locating the problem

So the cause of the problem is basically determined, that is, the thread sending the message in Pulsar is interrupted, but why it is interrupted needs to be investigated.

We know that thread interrupt is required to call the Thread.currentThread().interrupt(); API, so we first guess whether a thread inside the Pulsar client interrupted the sending thread.

So I searched for the code in the module pulsar-client.

pulsar-client

Excluding anything not related to the producer, all the rest of the code that interrupts the thread just continues to pass after the exception; so it seems that pulsar-client does not actively interrupt internally.

Since Pulsar itself does not, it is only possible that the business code is doing the interrupting?

So I did a search in the business code.

business code

Sure enough, I found the only interrupt in the business code, and the call relationship tells me that this code is executed before the message is sent, and is in the same thread as the Pulsar send function.

The approximate pseudo-code is as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
    List.of(1, 2, 3).stream().map(e -> {
                return CompletableFuture.supplyAsync(() -> {
                    try {
                        TimeUnit.MILLISECONDS.sleep(10);
                    } catch (InterruptedException ex) {
                        throw new RuntimeException(ex);
                    }
                    return e;
                });
            }
    ).collect(Collectors.toList()).forEach(f -> {
        try {
            Integer integer = f.get();
            log.info("====" + integer);
            if (integer==3){
                TimeUnit.SECONDS.sleep(10);
                Thread.currentThread().interrupt();
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    });
MessageId send = producer.newMessage().value(msg.getBytes()).send();

Executing this code reproduces the same stack exactly.

Fortunately, there is also an output log here for interrupts.

log log

Searching through the logs, I found that the time of the exception and the time point of the log of this interrupt coincide exactly, so I also know the root cause.

Because the business thread and the message sending thread are the same, in some cases Thread.currentThread().interrupt();, in fact, simply executing this line of function does not happen, as long as there is no response to this interrupt, that is, the Semaphore source code in the judgment of the thread interrupt marker.

1
2
3
4
5
6
7
public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted() ||
        (tryAcquireShared(arg) < 0 &&
         acquire(null, arg, true, true, false, 0L) < 0))
        throw new InterruptedException();
}

But it just so happens that the business interruption here did not determine this flag itself, causing Pulsar to determine it internally and eventually throwing this exception.

Summary

So in the final analysis, the code here is not reasonable, first of all, they interrupt the thread but also did not use, which leads to the possibility of being used by other base libraries, so it will cause some unpredictable consequences.

Another is that it is not recommended to use Thread.currentThread().interrupt(); in the business code, the first look does not know what to do, but also not easy to maintain.

In fact, the essence of thread interrupt is also a means of inter-thread communication, with such needs can be completely replaced by built-in BlockQueue functions to achieve.

Ref

  • https://crossoverjie.top/2023/02/23/pulsar/pulsar-interrupted/