- Buffers - Thread safe direct and atomic buffers for working with on and off heap memory with memory ordering semantics.
- Lists - Array backed lists of int/long primitives to avoid boxing.
- Maps - Open addressing and linear probing with int/long primitive keys to object reference values.
- Maps - Open addressing and linear probing with int/long primitive keys to int/long values.
- Sets - Open addressing and linear probing for int/long primitives and object references.
- Cache - Set Associative with int/long primitive keys to object reference values.
- Clocks - Clock implementations to abstract system clocks, allow caching, and enable testing.
- Queues - Lock-less implementations for low-latency applications.
- Ring/Broadcast Buffers - implemented off-heap for IPC communication.
- Simple Agent framework for concurrent services.
- Signal handling to support “Ctrl + c” in a server application.
- Scalable Timer Wheel - For scheduling timers at a given deadline with O(1) register and cancel time.
- Code generation from annotated implementations specialised for primitive types.
- Off-heap counters implementation for application telemetry, position tracking, and coordination.
- Implementations of InputStream and OutputStream that can wrap direct buffers.
- DistinctErrorLog - A log of distinct errors to avoid filling disks with existing logging approaches.
- IdGenerator - Concurrent and distributed unique id generator employing a lock-less implementation of the Twitter Snowflake algorithm.
1.1 Duty Cycle
Duty Cycle is a programming model, it is a dead loop program, in the loop, to execute a certain logic, and according to the result of the execution to decide whether to wait a while for the next cycle. For example.
Agents are defined.
doWork(), which is used to handle the business logic, returns a value that determines whether to execute the idle policy in Agrona.
- When the return value is greater than 0, the idle policy is not triggered and Agrona executes the next
- When the return value is less than or equal to 0, the specified idle policy is executed
onClose() serve as callback hook methods when the Agent is started and closed, and
roleName() asserts the name of the Agent.
1.3 Idle Strategies
Agrona natively provides a number of idle strategies.
|SleepingIdleStrategy||Based on parkNanos for thread suspension|
|SleepingMillisIdleStrategy||Based on thread.sleep for local development on low-configuration machines or development with a large number of processes|
|YieldingIdleStrategy||Gives control over threads using thread.yield|
|BackoffIdleStrategy||An aggressive strategy that uses spinning followed by yield (Thread.yield()) and then parkNanos at the configured time, which is the default strategy for Aeron Cluster|
|NoOpIdleStrategy||the most aggressive policy, no processing|
|BusySpinIdleStrategy||For Java 9 and above, Thread.onSpinWait() will be used. This provides a hint to the CPU that the thread is in a tight loop but busy waiting for something, and then the CPU may allocate additional resources to another thread without involving the OS scheduler.|
If you need to customize the idle strategy, you only need to implement the
The idle policy above does not necessarily guarantee thread safety, so it is recommended that each Agent use a separate idle policy.
1.4 Agent Runner
Agent Runner is responsible for combining and running the Agent and Idle Strategies.
Above is the constructor of AgentRunner, where
|idleStrategy||IdleStrategy instance object|
|errorHandler||The callback handler when an exception occurs during the Agent’s execution|
|errorCounter||Records the number of exceptions that occurred during the Agent’s execution|
|agent||Agent instance object|
After getting the AgentRunner object, Agrona provides the following three ways to actually start it.
AgentRunner#startOnThread(AgentRunner), which will create a thread to run after execution
AgentRunner#startOnThread(AgentRunner, ThreadFactory), which will create a separate thread using the specified threadFactory
- form a
CompositeAgentwith multiple Agents and call the above two methods, these Agents will share a common thread to run
1.5 Agent Invoker
If we want to control the Agent manually, Agrona provides the
You can see that the constructor method removes the idle policy compared to AgentRunner, because it is the Agent that needs to be executed manually, so this parameter is not needed.
Agrone provides its own Clock API, first of all it is based on Epoch Time, which is the time difference since 1970-1-1 00:00:00.000 until now. The top-level interface is
EpochClock and there are two implementations:
SystemEpochClock, the millisecond time difference is returned, which is actually a wrapper around
System.currentTimeMillis(), providing a static instance for the operation.
CachedEpochClock, which is actually a cache, there are several methods.
- update(long timeMs) sets the timeMs directly to the cache
- advance(long timeMs) adds timeMs to the cache on top of the original value
- time() Get the result of the cached value
In addition, Agrone also provides microsecond and nanosecond APIs:
SystemEpochMicroClockbased on the
SystemEpochNanoClockis based on the
System.nanoTime()API in a timed sampling fashion, allowing you to adjust the sampling interval and parameters as needed
The author of
Aeron developed disruptor during his tenure at LMAX. In Agrona, the authors also provide support for this data structure.
For single-producer-single-consumer scenarios, unlike the RingBuffer in Disruptor, an additional
RingBufferDescriptor.TRAILER_LENGTH is required to define the RingBuffer size, and the
ByteBuffer API determines whether the buffer is allocated inside or outside the heap.
The following code shows the creation of a OneToOneRingBuffer of size 4096, using an off-heap allocation buffer.
When consuming data, you need to implement the
MessageHandler interface, e.g.
msgType field is the identifier of the message and will be stored in the message header. If this field is not used, it must be set to a value greater than 0.
When producing data, the RingBuffer’s
write method needs to be called, e.g.
sentOk indicates whether the write was successful or not, and this can be used to perform a backpressure operation to prevent consumers from consuming it. ringBuffer provides the following two methods to show the current production and consumption.
In addition to the MessageHandler interface
ControlledMessageHandler can also implement the consumption of RingBuffer.
The difference is that the
onMessage() method returns the `ControlledMessageHandler:
- ABORT : This aborts the read operation for this message. It will be delivered again on next read
- BREAK : This stops further processing after the current message for this read.
- COMMIT : Continues processing, but commits at the current message.
- CONTINUE : Continues processing, committing at the end of the current batch (this is equivalent to the standard handler).
When writing data, you can also use the
tryClaim() method to manipulate the underlying RingBuffer data structure directly.
tryClaim() to get the index that can be written, then get the underlying data structure of the RingBuffer, write data to it, and finally, call
abort to finish.
The API is consistent with OneToOneRingBuffer and supports multi-producer scenarios.
OneToOneRingBuffer and ManyToOneRingBuffer are both in single-consumer scenarios. If multiple consumers are needed, Agrona provides
Note in particular that under Broadcast, messages are discarded if the sender is producing faster than the consumer can consume (no backpressure support).
IV. Data Structures
Agrona provides a number of collection data structures for solving the overhead of needing to box and unbox the base data type in a collection.
When using the IDE for DEUBG, Agrona HashMaps may have problems with the wrong elements in it. To solve this you can set
shouldAvoidAllocationin the constructor to false and Agrona will turn off caching, but this will also result in an increase in GCs.
|HashMap of Int2ObjectHashMap||
|The HashMap of Long2NullableObjectHashMap||
|HashMap of Object2IntHashMap||
When using these HashMaps, you need to make sure that the element
hashCode() is correct, and the performance of the collection can be greatly affected if hashCodes are heavily conflicted.
|Int2ObjectCache||Cache with primitive int lookup to an object. Tuned for very small data structures stored within CPU cache lines. Typical sizes are 2 to 16 entries. Underlying storage is an array.|
|IntLruCache||Fixed size cache, use LRU policy to clear expired cache when limit is reached|
|IntHashSet||HashSet of base int type, automatically expanded.|
|ObjectHashSet||HashSet of type object, automatically expanded.|
V. Direct Buffer
Agrona’s use of the
sun.nio.ch.SelectedImpl.selectedKeysAPIs may cause the JVM to have a warning log printed at startup about illegal reflection accesses. If you want to remove the selections, add the JVM parameters:
--add-opens java.base/sun.nio.ch=ALL-UNNAMED --add-opens jdk.unsupported/sun.misc=ALL-UNNAMED
Agrona defines the
DirectBuffer interface for interacting with Aeron, which is somewhat similar to Java NIO ByteBuffer, but a bit more convenient.
|UnsafeBuffer||A fixed size buffer outside the heap will throw an IndexOutOfBoundsException when the size is exceeded.|
|ExpandableDirectByteBuffer||Scalable direct buffer, implemented in the underlying ByteBuffer, defaults to 128 bytes and can be resized via the constructor. When the size is exceeded, a new ByteBuffer is created and the existing contents are copied into it.|
|ExpandableArrayBuffer||The underlying direct buffer, which uses a byte array (new byte[size]), defaults to 128 bytes, and when the size is exceeded, a new byte is created and the existing contents are copied in.|
5.1 Key Concepts
Agrona uses the byte order of
ByteOrder.nativeOrder() by default, and using different byte orders for reading and writing can lead to incorrect results. This can occur in cross-OS and cross-platform interactions.
The following figure shows a buffer of size 13 bytes, if we want to extract 4 bytes from it for the highlighted part, we need to set offset to 4 and then set read length to 4.
5.2 Working with Types
5.2.1 Chars & Bytes
DirectBuffer provides methods to read and write single bytes or 16-bit characters.
5.2.2 Shorts, Integers & Longs
DirectBuffer provides support for reading and writing short, int, and long data. For int and long, additional utility methods are provided for compare-and-set, get-and-add, and get-and-set.
5.2.3 Floats & Doubles
It is generally not recommended to use float and double for data transfer, but either BigDecimal formatted as a string or scaled long.
DirectBuffer provides methods for reading and writing float and double.
putStringUtf8manipulate non-fixed-length strings, which is less efficient.
putStringWithoutLengthUtf8manipulate fixed-length strings
Agrona also implements the snowflake algorithm.
- the generated result is 64-bit data
- The data is roughly ordered
- Up to 4096000 can be generated per second. If an attempt is made to generate more than this value, it will spin up to the next available generation time
IllegalStateExceptionexception can be thrown if the system has a clock rollback
- Lock-free and thread-safe
When initializing the snowflake algorithm, you need to provide a unique node ID, which supports up to 1024 nodes by default.
Note that by default it uses 1970 as the starting point, so it can only generate up to 2039 (similar to Epoch Time). It provides an overloaded constructor to specify the starting time.