Advanced Features
Memory Allocation
To ensure optimal performance and minimize the impact on the Execution Server, it's important for algorithms to avoid unnecessarily allocating new objects during their working cycles. Memory allocations can trigger periodic Garbage Collection, which can introduce unpredictable pauses in the execution of the entire system.
Generally, an algorithm should not allocate new object instances per-request or per-event callbacks. Allocating even short-lived objects will cause Java Garbage Collection pauses that will negatively impact the latency characteristics of the system.
Here are some best practices to follow:
- Object Pooling and Flyweight Programming - Use object pooling and flyweight programming patterns to reuse objects instead of creating new instances. Deltix provides the
deltix.anvil.util.ObjectPool
for this purpose. AsciiStringBuilder
- Instead of Java Strings or StringBuilder, use theAsciiStringBuilder
class. This can help reduce memory allocations and improve performance.CharSequenceParser
- Instead of Java text-to-type converters, use theCharSequenceParser
class. This can provide more efficient parsing of character sequences.CharSequenceUtil
- Use theCharSequenceUtil
class for common text operations such asindexOf
,startsWith
, etc. This class provides optimized implementations that can help avoid unnecessary memory allocations.deltix.anvil.util.Timer
- Use Deltix'sdeltix.anvil.util.Timer
instead of Java or JODA timers.- GFLog - Instead of Java Util Logging or zerogc log4j, use Deltix's GFLog for logging.
- DFP - Use Deltix's DFP (Decimal Floating-Point) library, represented as Java
long
, instead ofjava.lang.BigDecimal
for decimal arithmetic operations. - Timebase - Use Timebase for external messaging and additional persistence instead of traditional JDBC and alike.
Eager Initialization
While "lazy" initialization is a commonly used practice in software development, we recommend performing eager initialization for any resources that the algorithm may need during startup and initialization. By initializing resources upfront, you can avoid potential delays or issues during runtime and ensure that the algorithm is ready to process data efficiently.
Algorithm CPU affinity
To achieve the lowest message processing latency, it is recommended to pin each algorithm instance to an isolated CPU core. This can be done by assigning a specific CPU core to the event processing thread of the algorithm. Here's an example of how to assign CPU core 10 to the event processing thread of the "TWAP" algorithm in the ember.conf configuration file:
affinity {
algorithm-TWAP = [10]
...
}
In the above configuration fragment, the algorithm-TWAP section specifies that the event processing thread of the "TWAP" algorithm should be pinned to CPU core 10.
For more information on configuring Ember, you can refer to the Ember Configuration Reference documentation. The Linux Tuning Guide provides guidance on optimizing the system for low-latency message processing.
Input channels
In addition to market data subscriptions, the algorithm may receive data from custom TimeBase streams. These custom input channels can be used, for example, to transmit control or configuration messages to your algorithm.
Ember uses non-blocking I/O, and custom input channels are not an exception. The algorithm's framework provides doFirst()
and doLast()
callbacks that can be used to process messages from custom input sources. The algorithm’s worker thread keeps calling these from the main loop. The algorithm API provides you the ability to poll custom input sources and call user-provided callbacks for each message received from each custom source.
Here's an example of how to set up and handle custom input messages in your algorithm:
// callback that will process custom messages
Consumer<InstrumentMessage> messageConsumer = this::onCustomInputMessage;
// here we create a poller for TimeBase stream identified by given stream key
customSourcePoller = context.createInputPoller(customStreamKey, messageConsumer);
/** Callback that will process custom messages */
private void onCustomInputMessage (InstrumentMessage message) {
// This is just a sample. In real life avoid logging high rate messages
LOGGER.info("Custom message received %s %s").withTimestamp(message.getTimeStampMs()).with(message.getSymbol());
}
In the code snippet above, the onCustomInputMessage
function is defined as a callback to process custom messages. In this example, it logs the received message.
To handle the custom input messages, you need to override the doLast()
method in your algorithm. This does the main work of polling your source and dispatching messages to the callback we defined above. Use the example below to implement the override method:
@Override
public int doLast(int workDone) {
int newWorkDone = super.doLast(workDone); // don't forget this - otherwise things like timers won't work
// check if input stream has any new messages
newWorkDone += customSourcePoller.poll(32);
return newWorkDone;
}
The doLast()
method is part of the algorithm's main processing loop and is called by the worker thread. In the overridden method, you can call customSourcePoller.poll()
to check for new messages on the custom input stream. The batch size of 32 in this example can be adjusted according to your needs.
Finally, make sure to close the poller at algorithm shutdown to free up TimeBase resources:
@Override
public void close() {
CloseHelper.close(customSourcePoller);
}
By calling CloseHelper.close()
on the customSourcePoller
, you ensure that the poller is properly closed and resources are freed.
Output channels
In Ember, output channels provide a convenient way to output data into TimeBase. An output channel is essentially a message sink that consumes messages.
/**
* Something that consumes messages.
*/
public interface MessageChannel <T> extends Disposable {
/**
* This method is invoked to send a message to the object.
*
* @param msg A temporary buffer with the message.
* By convention, the message is only valid for the duration
* of this call.
*/
public void send (T msg);
}
The MessageChannel<T>
interface represents an output channel and provides a send()
method to send messages to the channel.
Ember's AlgorithmContext API provides a number of ways to construct an output channel. Here's an example of how to create a channel for a given output stream and message class:
class MyAlgorithm extends AbstractAlgorithm<…> {
private final MessageChannel myChannel;
MyAlgorithm(AlgorithmContext context, OrdersCacheSettings cacheSettings, …) {
super(context, cacheSettings);
myChannel = context.createOutputChannel("positions", MyPositionMessage.class);
}
…
In the code snippet above, the MyAlgorithm
class extends AbstractAlgorithm
and creates an output channel named "positions" for the MyPositionMessage
class. The context.createOutputChannel()
method is used to create the channel.
Ember automatically creates the output TimeBase stream if it doesn't exist. There are overloaded channel factories available in AlgorithmContext API that allow you to fine-tune the stream parameters such as type, distribution factor, and performance characteristics.
To write data into the channel, simply initialize your message and pass it to the channel's send()
method:
myChannel.send(myMessage);
Once the send()
method call is completed, you can reuse the passed message, as the message content has been synchronously copied to the internal TimeBase buffers.
Remember to close the channel when the algorithm is shutdown:
@Override
public void close() {
myChannel.close();
}
By calling the close()
method of the channel, you ensure that the channel is properly closed and any associated resources are released.
Tracking Trading Connector Status
@Override
public void onSessionStatusEvent(final SessionStatusEvent event) {
if (isLeader()) {
logger.info("Connector %s switched to %s state")
.withAlphanumeric(event.getSourceId())
.with(event.getStatus());
}
}
In the event of a disconnection from the exchange, the algorithm follows these steps:
The algorithm receives a notification, as shown in the code snippet above.
When the connection is restored, the algorithm receives a similar notification. Additionally, the Order Management System (OMS) requests order status updates for all orders placed on the exchange that were active prior to disconnect. This results in order state synchronization.
Some exchanges provide the option to use "cancel-on-disconnect". If available, this option instructs the exchange to automatically cancel all active orders if the connection goes down.
Tracking TimeBase Connection Status
To enable your algorithm to receive notifications about TimeBase disconnect, follow these steps:
Add
implements MarketSupplier.ConnectionListener
to your algorithm class declaration.In the algorithm constructor, call the following code:
if (marketDataProcessor != null && context.getMarketSupplier() != null)
context.getMarketSupplier().addConnectionListener(this);Implement the
onConnected()
andonDisconnected()
TimeBase callbacks in your algorithm.
Agorithms that rely on dynamic market data subscription must be able to recover from TimeBase disconnects.
The dynamic subscription API is exposed via AlgorithmContext.getMarketSubscription()
.
The following example shows how to perform re-subscription to all symbols defined in the MarketDataProcessor
:
private final BiPredicate<Instrument, Object> resubscribeCallback;
private boolean needResubscription;
MyAlgorithm (AlgorithmContext context) {
// ...
resubscribeCallback = (instrument,cooke) -> {
if (context.getMarketSubscription() != null)
context.getMarketSubscription().subscribe(instrument.getSymbol(),TradingUtils.convert(instrument.getInstrumentType()));
return true;
};
}
@Override
public void onConnected() {
if (needResubscription) { // skip initial connect
LOGGER.info("Market data feed restored - resubscribing");
marketDataProcessor.iterate(resubscribeCallback,null);
needResubscription = false;
}
}
@Override
public void onDisconnected() {
LOGGER.info("Market data feed disconnected");
needResubscription = true;
}
TimeBase Type Loader
The algorithm can define a custom type loader that determines the mapping between message types in a TimeBase stream and the corresponding Java message classes at runtime. This component is used by the TimeBase client when reading or writing data to TimeBase.
For example, the Smart Order Router algorithm uses a custom type loader to map the Exchange Balance Publisher data format to the specific Java class BalanceMessage
.
class SORMessageTypeLoader extends ParentIgnoringTypeLoader {
private static final String BALANCE_RCD_NAME = "deltix.samples.uhf.balance.BalanceMessage";
@Override
public Class load(ClassDescriptor cd, ExceptionHandler handler) throws ClassNotFoundException {
String javaClassName = cd.getName();
if (BALANCE_RCD_NAME.equals(javaClassName))
return BalanceMessage.class;
return super.load(cd, handler);
}
}
You can use a custom type loader in input/output channels like this:
MessageChannel outputChannel = context.createOutputTopic(
"outputTopicName",
(topicSettings, loadingOptions) -> {
loadingOptions.typeLoader = new SORMessageTypeLoader();
},
PackageHeader.class);
For market data subscription, you can specify a custom type loader as follows:
SOR: ${template.algorithm.default} {
factory = "deltix.ember.algorithm.pack.sor.SORAlgorithmFactory"
subscription {
streams = ${marketDataStreams}
typeLoader: {
factory: "deltix.ember.algorithm.pack.sor.balance.SORMessageTypeLoaderFactory"
settings {}
}
}
Note that in ember.conf, the type loader is referenced using the factory class SORMessageTypeLoaderFactory
, rather than directly using SORMessageTypeLoader
(the usual approach for customizing bean/instance parameters):
public class SORMessageTypeLoaderFactory implements deltix.anvil.util.Factory<SORMessageTypeLoader> {
@Override
public SORMessageTypeLoader create() {
return new SORMessageTypeLoader();
}
}