Agrona is a Java toolkit developed by real-logic that provides a number of high-performance data structures and tooling methods, mainly including.

  • Buffers - Thread safe direct and atomic buffers for working with on and off heap memory with memory ordering semantics.
  • Lists - Array backed lists of int/long primitives to avoid boxing.
  • Maps - Open addressing and linear probing with int/long primitive keys to object reference values.
  • Maps - Open addressing and linear probing with int/long primitive keys to int/long values.
  • Sets - Open addressing and linear probing for int/long primitives and object references.
  • Cache - Set Associative with int/long primitive keys to object reference values.
  • Clocks - Clock implementations to abstract system clocks, allow caching, and enable testing.
  • Queues - Lock-less implementations for low-latency applications.
  • Ring/Broadcast Buffers - implemented off-heap for IPC communication.
  • Simple Agent framework for concurrent services.
  • Signal handling to support “Ctrl + c” in a server application.
  • Scalable Timer Wheel - For scheduling timers at a given deadline with O(1) register and cancel time.
  • Code generation from annotated implementations specialised for primitive types.
  • Off-heap counters implementation for application telemetry, position tracking, and coordination.
  • Implementations of InputStream and OutputStream that can wrap direct buffers.
  • DistinctErrorLog - A log of distinct errors to avoid filling disks with existing logging approaches.
  • IdGenerator - Concurrent and distributed unique id generator employing a lock-less implementation of the Twitter Snowflake algorithm.

I. Agents

1.1 Duty Cycle

Duty Cycle is a programming model, it is a dead loop program, in the loop, to execute a certain logic, and according to the result of the execution to decide whether to wait a while for the next cycle. For example.

1
2
3
4
while (true) {
    int result = doLogic();
    doIdleStrategies(result);
}

1.2 Agent

In Agrona, Agents are defined.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public interface Agent {
    default void onStart() {
    }

    int doWork() throws Exception;

    default void onClose() {
    }

    String roleName();
}

doWork(), which is used to handle the business logic, returns a value that determines whether to execute the idle policy in Agrona.

  • When the return value is greater than 0, the idle policy is not triggered and Agrona executes the next doWork() immediately
  • When the return value is less than or equal to 0, the specified idle policy is executed

In addition, onStart() and onClose() serve as callback hook methods when the Agent is started and closed, and roleName() asserts the name of the Agent.

1.3 Idle Strategies

Agrona natively provides a number of idle strategies.

Name Implementation Details
SleepingIdleStrategy Based on parkNanos for thread suspension
SleepingMillisIdleStrategy Based on thread.sleep for local development on low-configuration machines or development with a large number of processes
YieldingIdleStrategy Gives control over threads using thread.yield
BackoffIdleStrategy An aggressive strategy that uses spinning followed by yield (Thread.yield()) and then parkNanos at the configured time, which is the default strategy for Aeron Cluster
NoOpIdleStrategy the most aggressive policy, no processing
BusySpinIdleStrategy For Java 9 and above, Thread.onSpinWait() will be used. This provides a hint to the CPU that the thread is in a tight loop but busy waiting for something, and then the CPU may allocate additional resources to another thread without involving the OS scheduler.

If you need to customize the idle strategy, you only need to implement the IdleStrategy interface.

1
2
3
4
5
public interface IdleStrategy {
    void idle(int count);
    void idle();
    void reset();
}

The idle policy above does not necessarily guarantee thread safety, so it is recommended that each Agent use a separate idle policy.

1.4 Agent Runner

The Agent Runner is responsible for combining and running the Agent and Idle Strategies.

1
final AgentRunner runner = new AgentRunner(idleStrategy, errorHandler, errorCounter, agent);

Above is the constructor of AgentRunner, where

parameters meaning
idleStrategy IdleStrategy instance object
errorHandler The callback handler when an exception occurs during the Agent’s execution
errorCounter Records the number of exceptions that occurred during the Agent’s execution
agent Agent instance object

After getting the AgentRunner object, Agrona provides the following three ways to actually start it.

  1. AgentRunner#startOnThread(AgentRunner) , which will create a thread to run after execution
  2. AgentRunner#startOnThread(AgentRunner, ThreadFactory), which will create a separate thread using the specified threadFactory
  3. form a CompositeAgent with multiple Agents and call the above two methods, these Agents will share a common thread to run

1.5 Agent Invoker

If we want to control the Agent manually, Agrona provides the AgentInvoker.

1
final AgentInvoker agentInvoker = new AgentInvoker(errorHandle, errorCounter, agent);

You can see that the constructor method removes the idle policy compared to AgentRunner, because it is the Agent that needs to be executed manually, so this parameter is not needed.

II. Clocks

Agrone provides its own Clock API, first of all it is based on Epoch Time, which is the time difference since 1970-1-1 00:00:00.000 until now. The top-level interface is EpochClock and there are two implementations: SystemEpochClock and CachedEpochClock.

For SystemEpochClock, the millisecond time difference is returned, which is actually a wrapper around System.currentTimeMillis(), providing a static instance for the operation.

1
2
EpochClock clock = SystemEpochClock.INSTANCE;
long time = clock.time();

For CachedEpochClock, which is actually a cache, there are several methods.

  • update(long timeMs) sets the timeMs directly to the cache
  • advance(long timeMs) adds timeMs to the cache on top of the original value
  • time() Get the result of the cached value
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CachedEpochClock clock = new CachedEpochClock();
clock.update(1L);

assertEquals(1L, clock.time());

clock.update(2L);
assertEquals(2L, clock.time());

clock.advance(98L);
assertEquals(100L, clock.time());

In addition, Agrone also provides microsecond and nanosecond APIs:

  • SystemEpochMicroClock based on the java.time.Instant API implementation
  • SystemEpochNanoClock is based on the java.time.Instant API implementation
  • OffsetEpochNanoClock calls the System.nanoTime() API in a timed sampling fashion, allowing you to adjust the sampling interval and parameters as needed

III. RingBuffer

The author of Aeron developed disruptor during his tenure at LMAX. In Agrona, the authors also provide support for this data structure.

3.1 OneToOneRingBuffer

For single-producer-single-consumer scenarios, unlike the RingBuffer in Disruptor, an additional RingBufferDescriptor.TRAILER_LENGTH is required to define the RingBuffer size, and the ByteBuffer API determines whether the buffer is allocated inside or outside the heap.

The following code shows the creation of a OneToOneRingBuffer of size 4096, using an off-heap allocation buffer.

1
2
3
final int bufferLength = 4096 + RingBufferDescriptor.TRAILER_LENGTH;
final UnsafeBuffer internalBuffer = new UnsafeBuffer(ByteBuffer.allocateDirect(bufferLength));
final OneToOneRingBuffer ringBuffer = new OneToOneRingBuffer(internalBuffer);

MessageHandler

When consuming data, you need to implement the MessageHandler interface, e.g.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public class MessageCapture implements MessageHandler {
    private final HashSet<String> receivedStrings = new HashSet<>();
    private int count = 0;

    @Override
    public void onMessage(int msgTypeId, MutableDirectBuffer buffer, int index, int length) {
        receivedStrings.add(buffer.getStringWithoutLengthAscii(index, length));
        count++;
    }
}

Where the msgType field is the identifier of the message and will be stored in the message header. If this field is not used, it must be set to a value greater than 0.

RingBuffer#write

When producing data, the RingBuffer’s write method needs to be called, e.g.

1
2
3
4
5
6
7
8
//prepare some data
final String testString = "0123456789";

final UnsafeBuffer toSend = new UnsafeBuffer(ByteBuffer.allocateDirect(testString.length()));
toSend.putStringWithoutLengthAscii(0, testString);

//write the data
private sentOk = ringBuffer.write(1, toSend, 0, testString.length());

sentOk indicates whether the write was successful or not, and this can be used to perform a backpressure operation to prevent consumers from consuming it. ringBuffer provides the following two methods to show the current production and consumption.

1
2
3
4
5
//the current consumer position in the ring buffer
ringBuffer.consumerPosition();

//the current producer position in the ring buffer
ringBuffer.producerPosition();

ControlledMessageHandler

In addition to the MessageHandler interface ControlledMessageHandler can also implement the consumption of RingBuffer.

1
2
3
4
5
6
7
public class ControlledMessageCapture implements ControlledMessageHandler {
    @Override
    public ControlledMessageHandler.Action onMessage(int msgTypeId, MutableDirectBuffer buffer, int index, int length) {
        ..do something
        return Action.COMMIT; //or ABORT, BREAK OR CONTINUE as required.
    }
}

The difference is that the onMessage() method returns the `ControlledMessageHandler:

  • ABORT : This aborts the read operation for this message. It will be delivered again on next read
  • BREAK : This stops further processing after the current message for this read.
  • COMMIT : Continues processing, but commits at the current message.
  • CONTINUE : Continues processing, committing at the end of the current batch (this is equivalent to the standard handler).

TryClaim

When writing data, you can also use the tryClaim() method to manipulate the underlying RingBuffer data structure directly.

1
2
3
4
5
6
7
int claimIndex = ringBuffer.tryClaim(1, Integer.BYTES);
if (claimIndex > 0) {
    final AtomicBuffer buffer = ringBuffer.buffer();
    buffer.putInt(claimIndex, something);
    ringBuffer.commit(claimIndex);
}
return 0;

First, call tryClaim() to get the index that can be written, then get the underlying data structure of the RingBuffer, write data to it, and finally, call commit or abort to finish.

3.2 ManyToOneRingBuffer

The API is consistent with OneToOneRingBuffer and supports multi-producer scenarios.

3.3 Broadcast

OneToOneRingBuffer and ManyToOneRingBuffer are both in single-consumer scenarios. If multiple consumers are needed, Agrona provides BroadcastTransmitter and BroadcastReceiver.

Note in particular that under Broadcast, messages are discarded if the sender is producing faster than the consumer can consume (no backpressure support).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
private final BroadcastTransmitter transmitter;
private final MutableDirectBuffer msgBuffer = new ExpandableArrayBuffer();

public SendAgent(final AtomicBuffer buffer...) {
    this.transmitter = new BroadcastTransmitter(buffer);
}

@Override
public int doWork() {
    ...
    msgBuffer.putInt(0, lastSend);
    transmitter.transmit(1, msgBuffer, 0, Integer.BYTES);
    ...
    lastSend++;
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ReceiveAgent implements Agent, MessageHandler {
    ...
    private final BroadcastReceiver broadcastReceiver;
    private final CopyBroadcastReceiver copyBroadcastReceiver;

    public ReceiveAgent(final AtomicBuffer atomicBuffer, final String name) {
        this.broadcastReceiver = new BroadcastReceiver(atomicBuffer);
        this.copyBroadcastReceiver = new CopyBroadcastReceiver(broadcastReceiver);
        ...
    }

    @Override
    public int doWork() {
        copyBroadcastReceiver.receive(this::onMessage);
        return 0;
    }

    @Override
    public void onMessage(int msgTypeId, MutableDirectBuffer buffer, int index, int length) {
        LOGGER.info("Received {}", buffer.getInt(index));
    }
...
}

IV. Data Structures

Agrona provides a number of collection data structures for solving the overhead of needing to box and unbox the base data type in a collection.

4.1 HashMaps

When using the IDE for DEUBG, Agrona HashMaps may have problems with the wrong elements in it. To solve this you can set shouldAvoidAllocation in the constructor to false and Agrona will turn off caching, but this will also result in an increase in GCs.

Collection Notes
Int2IntHashMap <int, int> of HashMap
Int2NullableObjectHashMap <int, nullable object> of HashMap
HashMap of Int2ObjectHashMap <int, object>.
Long2LongHashMap HashMap of <long, long>
The HashMap of Long2NullableObjectHashMap <long, nullable object> is identified by NullReference inside the collection if the value is null.
Long2ObjectHashMap <long, object> of HashMap
HashMap of Object2IntHashMap <object, int>
Object2NullableObjectHashMap HashMap of <object, nullable object>
Object2ObjectHashMap HashMap of <object, object>

When using these HashMaps, you need to make sure that the element hashCode() is correct, and the performance of the collection can be greatly affected if hashCodes are heavily conflicted.

4.2 Caches

Collection Notes
Int2ObjectCache Cache with primitive int lookup to an object. Tuned for very small data structures stored within CPU cache lines. Typical sizes are 2 to 16 entries. Underlying storage is an array.
IntLruCache Fixed size cache, use LRU policy to clear expired cache when limit is reached

4.3 HashSets

Collection Notes
IntHashSet HashSet of base int type, automatically expanded.
ObjectHashSet HashSet of type object, automatically expanded.

V. Direct Buffer

Agrona’s use of the sun.misc.Unsafe and sun.nio.ch.SelectedImpl.selectedKeys APIs may cause the JVM to have a warning log printed at startup about illegal reflection accesses. If you want to remove the selections, add the JVM parameters: --add-opens java.base/sun.nio.ch=ALL-UNNAMED --add-opens jdk.unsupported/sun.misc=ALL-UNNAMED

Agrona defines the DirectBuffer interface for interacting with Aeron, which is somewhat similar to Java NIO ByteBuffer, but a bit more convenient.

Name Implementation Details
UnsafeBuffer A fixed size buffer outside the heap will throw an IndexOutOfBoundsException when the size is exceeded.
ExpandableDirectByteBuffer Scalable direct buffer, implemented in the underlying ByteBuffer, defaults to 128 bytes and can be resized via the constructor. When the size is exceeded, a new ByteBuffer is created and the existing contents are copied into it.
ExpandableArrayBuffer The underlying direct buffer, which uses a byte array (new byte[size]), defaults to 128 bytes, and when the size is exceeded, a new byte[] is created and the existing contents are copied in.

5.1 Key Concepts

Agrona uses the byte order of ByteOrder.nativeOrder() by default, and using different byte orders for reading and writing can lead to incorrect results. This can occur in cross-OS and cross-platform interactions.

The following figure shows a buffer of size 13 bytes, if we want to extract 4 bytes from it for the highlighted part, we need to set offset to 4 and then set read length to 4.

5.2 Working with Types

5.2.1 Chars & Bytes

DirectBuffer provides methods to read and write single bytes or 16-bit characters.

5.2.2 Shorts, Integers & Longs

DirectBuffer provides support for reading and writing short, int, and long data. For int and long, additional utility methods are provided for compare-and-set, get-and-add, and get-and-set.

1
2
3
4
5
6
//place 41 at index 0
unsafeBuffer.putLong(0, 41);
//add 1 to the long at index 0 and return the old value, which is 41
long originalValue = unsafeBuffer.getAndAddLong(0, 1);
//read the value of the long at index 0; is now 42
long plus1 = unsafeBuffer.getLong(0);
1
2
3
4
5
//read current value while writing a new value.
//Assuming code continues from above, oldValue = 42
long oldValue = unsafeBuffer.getAndSetLong(0, 43);
//read the value of the long at index 0, which is now 43
long newValue = unsafeBuffer.getLong(0);
1
2
3
4
5
//check the value was what was expected, returning true/false if it was. Then update the value a new value
//Assuming code continues from above, wasExpected = true
boolean wasExpected = unsafeBuffer.compareAndSetLong(0, 43, 44);
//read the value of the long at index 0, which is now 44
long updatedValue = unsafeBuffer.getLong(0);

5.2.3 Floats & Doubles

It is generally not recommended to use float and double for data transfer, but either BigDecimal formatted as a string or scaled long.

DirectBuffer provides methods for reading and writing float and double.

5.2.3 Strings

  • putStringAscii, putStringUtf8 manipulate non-fixed-length strings, which is less efficient.
  • putStringWithoutLengthAscii, putStringWithoutLengthUtf8 manipulate fixed-length strings

VI. IdGenerator

Agrona also implements the snowflake algorithm.

  • the generated result is 64-bit data
  • The data is roughly ordered
  • Up to 4096000 can be generated per second. If an attempt is made to generate more than this value, it will spin up to the next available generation time
  • An IllegalStateException exception can be thrown if the system has a clock rollback
  • Lock-free and thread-safe

When initializing the snowflake algorithm, you need to provide a unique node ID, which supports up to 1024 nodes by default.

1
2
3
4
final long nodeId = 1L;
final IdGenerator idGenerator = new SnowflakeIdGenerator(nodeId);

final long nextId = idGenerator.nextId();

Note that by default it uses 1970 as the starting point, so it can only generate up to 2039 (similar to Epoch Time). It provides an overloaded constructor to specify the starting time.