Event Handlers
Inbound Orders
When it comes to handling trading requests, there are four types that every execution algorithm should typically manage:
- New order submission
- Cancellation of an active order
- Modification of an active order
- Order status request
These requests are handled through the OrderRequestHandler
inferface, which includes the following methods in Java:
public interface OrderRequestHandler {
void onNewOrderRequest(NewOrderRequest request);
void onCancelOrderRequest(CancelOrderRequest request);
void onReplaceOrderRequest(ReplaceOrderRequest request);
void onOrderStatusRequest(OrderStatusRequest request);
}
To simplify algorithm development, Ember provides a boilerplate handler for these events in the AbstactExecutionAlgorithm
class. This class handles the following aspects:
- Maintaining the state of active orders based on their lifecycle events.
- Providing boilerplate code to handle order workflows between parent algo orders and their children orders.
Inbound orders are requests directed towards the algorithm's logic. For example, it could be a request to execute a new client order or to cancel a previously transmitted order.
Algorithms are fully responsible for the lifecycle of these orders:
The execution algorithm must acknowledge each inbound order. When initial order processing is not expected to take a lot of time, the acknowledgement event can be skipped, provided that the algorithm can guarantee some other form or feedback event. For example, an algorithm implementing IOC (immediately or cancelled) orders may skip the order acknowledgement and respond with a FILL, CANCEL, or REJECT event depending on market conditions.
The Ember Risk module has an Order Acknowledgement timeout checker. This component halts trading if the execution algorithm does not acknowledge an order within a certain time frame (usually a few seconds).
All inbound requests, including child order submissions, require corresponding acknowledgements (ACK) or non-acknowledgements (NACK) from the execution algorithm. The Execution Algorithm API provides helper methods that simplify this task.
Outbound Orders
When working with outbound orders, if your algorithm extends the AbstractAlgorithm
class, you can use the following code to submit a child order for a given parent order:
MutableOrderNewRequest childOrderRequest = makeChildOrderRequest(parentOrder);
childOrderRequest.setTimeInForce(TimeInForce.IMMEDIATE_OR_CANCEL);
childOrderRequest.setOrderType(OrderType.LIMIT);
childOrderRequest.setLimitPrice(quotedPrice);
childOrderRequest.setDestinationId(exchangeId);
childOrderRequest.setQuantity(quantity);
Order childOrder = submit(parentOrder, childOrderRequest);
The AbstractAlgorithm
provides a helper method called submit()
that performs the following tasks:
- Sends the specified
OrderNewRequest
to the Order Management System (OMS). - Creates a state object for the child order and links it to the parent order.
In the case of implementing an execution algorithm, the emitted child order can likely inherit many properties from the parent order, such as symbol
, side
, account
, etc.
Alternatively, you can submit an order without parent order. In this case, you need to specify the symbol
and side
explicitly:
MutableOrderNewRequest orderRequest = makeMarketOrder(Side.BUY, Decimal64Utils.ONE, "NFLX");
orderRequest.setDestinationId(AlphanumericCodec.encode("SIM"));
orderRequest.setExchangeId(AlphanumericCodec.encode("ACK"));
Order order = submit(orderRequest);
The following example shows how to initialize a new outbound order request from scratch, using several helper functions from the AlgorithmContext
:
MutableOrderNewRequest request = new MutableOrderNewRequest(); //TODO: store as instance variable and reuse
request.setSourceId(context.getId()); // Identify this algorithm as order source
request.setOrderId(context.getRequestSequence().next()); // generate unique order ID
request.setSide(Side.BUY);
request.setQuantity(Decimal64Utils.fromLong(10));
request.setSymbol("CLZ22");
request.setLimitPrice(Decimal64Utils.fromDouble(80.08));
request.setTimeInForce(TimeInForce.DAY);
request.setOrderType(OrderType.LIMIT);
request.setDestinationId(AlphanumericCodec.encode("ICEBERG"));
request.setExchangeId(AlphanumericCodec.encode("CME"));
request.setTimestamp(context.getClock().time());
To submit this order to the Ember OMS, you can use the OMS sink:
context.getOMS().onNewOrderRequest(request);
For more information on the difference between order exchange and order destination, please refer to the Trading Data model document.
Market Data
The Execution Server provides market data via the TimeBase API.
Subscription to Data from TimeBase Streams
The algorithm configuration in ember.conf defines which TimeBase stream each algorithm uses for market data.
For example:
algorithms {
TWAP ${template.algorithm.default} {
...
subscription {
streams = ["CMEFeed", "ICEFeed"]
}
}
...
If you want to limit the symbols and message types you receive from the stream, you can specify them in the configuration as well.
For instance:
algorithms {
TWAP ${template.algorithm.default} {
...
subscription {
streams = ["CMEFeed", "ICEFeed"]
symbols = ["IFLL ESU0", "IFLL L M2", "IFLL ERM2"]
types = ["deltix.timebase.api.messages.TradeMessage"]
}
}
...
By default, the algorithm subscribes to all data from the specified streams. However, you can modify the subscription using the TimeBase EntityAndTypeSubscriptionController
interface, which is available from the AlgorithmContext
.
context.getMarketSubscription().subscribe("ESM9", InstrumentType.FUTURE);
Subscription to Data from TimeBase Topics
In cases where performance is critical, the algorithm can subscribe to market data from TimeBase Topics.
For example:
algorithms {
TWAP ${template.algorithm.default} {
topics = ["ticks"]
...
}
....
The above configuration snippet demonstrates subscribing to the "ticks" topic for market data.
When deciding whether to use TimeBase Streams or Topics for market data subscription, consider the factors compared in the table below.
Feature | TimeBase Streams | TimeBase Topics |
---|---|---|
Maximum throughput | 1.5 million messages/second | 9 million messages/second |
Latency – median | 46 microseconds | 0.35 microseconds |
Latency - 99% percentile | 62 microseconds | 1.40 microseconds |
Durability | Messages can be persisted using Durable Streams | Each topic can be configured to write carbon copy of data into durable stream (provided that the disk can handle the data rate of the topic). |
Subscription control | Supported via Entity and Type Subscription Controller | Not supported. Algorithms receive all data from the topic. |
Number of sources | TimeBase supports thousands of streams | Topics are efficient but resource-intensive. We do not recommend more than 50 topics per system. |
Programmatic Subscription
The AlgorithmContext
provides access to market data subscription via the MarketSubscription
interface, allowing you to change the subscription on the fly.
For example:
context.getMarketSubscription().subscribe("ESM9", InstrumentType.FUTURE);
The code snippet demonstrates subscribing to the "ESM9" symbol with the instrument type set to FUTURE
.
Market Data Handler
The Ember runtime, which runs on the main algorithm processing thread, polls the underlying TimeBase cursors and dispatches available market messages to the algorithm via the MarketDataHandler
interface:
public interface MarketMessageHandler {
void onBarMessage(BarMessage message);
void onBestBidOfferMessage(BestBidOfferMessageInfo message);
void onLevel2Message(Level2Message message);
void onL2Message(L2Message message);
void onTradeMessage(TradeMessage message);
}
The MarketDataHandler
interface includes several methods for handling different types of market messages such as BarMessage
, BestBidOfferMessage
, Level2Message
, L2Message
, and TradeMessage
. Algorithms can implement these methods to process the received market messages.
Algorithms can also subscribe to a custom input channel. For more information, please refer to the Advanced Features page.
Security Master
Start Time - Instrument Snapshot
Deltix maintains information about all instruments in a special TimeBase stream called Security Metadata, also known as "securities".
When the system starts, each algorithm is provided with a snapshot of all known instruments via the AlgorithmContext.getInstrumentSnapshot
method. The instrument snapshot allows you to iterate through all the known instruments.
Here's an example of how you can iterate through the instrument snapshot:
public void open() {
super.open();
InstrumentSnapshot instruments = context.getInstrumentSnapshot();
final int size = instruments.size();
for (int i=0; i < size; i++) {
InstrumentUpdate instrument = instruments.get(i);
...
}
}
In the code snippet above, the open()
method of the algorithm class retrieves the instrument snapshot using context.getInstrumentSnapshot()
and then iterates through each instrument using a loop.
You also can also use the forEach()
method with an InstrumentUpdateHandler
visitor to iterate through the instrument snapshot:
context.getInstrumentSnapshot().forEach(instrumentSnapshotProcessor);
The instrumentSnapshotProcessor
is an instance of an InstrumentUpdateHandler
that defines the logic for processing each instrument update.
You can also lookup an instrument by its symbol using the get()
method:
...
for (String symbol : symbols.split("\\s*,\\s*")) {
InstrumentUpdate instrument = context.getInstrumentSnapshot().get(symbol);
if (instrument == null)
throw new IllegalArgumentException("Symbol is missing from security metadata " + symbol);
matchers.put(symbol, new InstrumentData(context.getId(), symbol, sharedConverterPool, TradingUtils.convert(instrument.getInstrumentType())));
}
}
In the code snippet above, the algorithm retrieves an instrument update for a given symbol and performs further processing based on the instrument data.
Run Time - Instrument Updates
While the algorithm is running, security metadata updates are visible via the InstrumentUpdateHandler
interface, which is implemented by each algorithm. Whenever instrument metadata infomation is updated in the securities TimeBase stream, a special Ember thread reads the updated information and dispatches notifications to all registered instrument update handlers in the Ember process, including algoriothms.
The default algorithm implementation, AbstractAlgorithm
, forwards instrument updates to the market data processor used by the algorithm. Here's an example for different instrument types:
@Override
public void onEquityUpdate(EquityUpdate update) {
if (marketDataProcessor != null)
marketDataProcessor.onEquityUpdate(update);
}
@Override
public void onFutureUpdate(FutureUpdate update) {
if (marketDataProcessor != null)
marketDataProcessor.onFutureUpdate(update);
}
@Override
public void onOptionUpdate(OptionUpdate update) {
if (marketDataProcessor != null)
marketDataProcessor.onOptionUpdate(update);
}
// Similar implementation for other instrument types ...
In the code snippet above, the algorithm's implementation of InstrumentUpdateHandler forwards the instrument updates to the market data processor, if one is set.
The default implementation of the MarketDataProcessor
checks if the instrument belongs to the algorithm's subscription. If it does, the instrument information is cached in the per-instrument state of processor:
@Override
public void onEquityUpdate(EquityUpdate update) {
InstrumentUpdateHandler instrument = getInstrument(update);
if (instrument != null)
instrument.onEquityUpdate(update);
}
@Nullable
public InstrumentUpdateHandler getInstrument(InstrumentUpdate update) {
return isSubscribedTo(update.getSymbol()) || (subscription != null && subscription.isSubscribedToAllInstruments()) ?
getOrCreate(update.getSymbol(), update.getInstrumentType()) : get(update.getSymbol());
}
In the code snippet above, the getInstrument()
method checks if the algorithm is subscribed to the instrument's symbol or if it is subscribed to all instruments. If so, it retrieves the corresponding InstrumentUpdateHandler
for further processing.
Here is an example of how the update()
method processes the InstrumentUpdate
and extracts the custom instrument attributes minOrderSize
and orderSizePrecision
:
protected void update(InstrumentUpdate instrumentUpdate) {
final ObjectList<InstrumentAttribute> attributes = instrumentUpdate.getAttributes();
if (attributes != null) {
for (int i = 0; i < attributes.size(); i += 1) {
final InstrumentAttribute attribute = attributes.get(i);
if (!attribute.hasKey() || !attribute.hasValue())
continue;
if (CharSequenceUtil.equals("minOrderSize", attribute.getKey())) {
setMinOrderSize(Decimal64Utils.tryParse(attribute.getValue(), Decimal64Utils.ZERO));
} else if (CharSequenceUtil.equals("orderSizePrecision", attribute.getKey()) && attribute.getValue() != null) {
setOrderSizePrecision(Integer.parseInt(attribute.getValue().toString()));
}
}
}
}
Since Ember 1.14, attributes like minOrderSize
and orderSizePrecision
are normal fields of the InstrumentUpdate
interface rather than key-value style custom attributes shown in the example above.
For more examples of storing minimum order size and order size precision custom instrument attributes, please refer to this Iceberg Algorithm sample.
You can also explore another Ember algorithm sample that shows how to track per-exchange instrument information in the PerExchangeMetadataSampleAlgorithm
class.
Central Instrument Metadata Provider
For crypto-exchanges, most forex venues, and exchanges like CME, Deltix maintains a Central Security Master API. Algorithms can access this API via the AlgorithmContext.getRemoteSecurityMetadataProvider()
method.
Most calls to this API are synchronous REST calls and can be expensive to perform during algorithm run-time.
Here's an example of using the Central Security Master API to retrieve instrument metadata:
GenericInstrument instrument = context.getRemoteSecurityMetadataProvider().getInstrument(exchange, symbol);
if (instrument instanceof CryptoCurrency) {
CryptoCurrency currency = (CryptoCurrency) instrument;
if (expectedSide == AggressorIndicator.ORDER_INITIATOR_IS_AGGRESSOR)
return currency.getTakerCommission();
else
return currency.getMakerCommission();
}
In the code snippet above, the algorithm retrieves the instrument metadata for a specific exchange and symbol using the getInstrument()
method of the RemoteSecurityMetadataProvider
. If the instrument is a CryptoCurrency, it casts the instrument to the appropriate type and performs further processing based on the expected side of the order.
Please keep in mind that making frequent and costly synchronous REST calls during algorithm runtime may impact performance. It's important to consider the potential latency and overhead when utilizing the Central Security Master API.
Timers
Timers allow you to schedule work to be done at a specific time or with a certain periodicity. Ember provides an allocation-free timer API for algorithm developers. Timer jobs are executed in the main processing thread of each algorithm.
A timer job consists of the following components:
- Callback function: This function performs the scheduled work.
- Optional parameter: A parameter that can be passed to the callback function.
- Next execution time: The time at which the job is scheduled to execute.
For example, here's a callback function that cancels an order:
long onOrderTimeout(long now, Cookie cookie) {
cancelOrder((Order)cookie);
return DO_NOT_RESCHEDULE;
}
The callback function takes two parameters:
- The current time (
now
), which may be slightly past scheduled time. - A cookie value (
Cookie
) that was specified when the job was created.
The cancelOrder()
function cancels the specified order. The callback function returns DO_NOT_RESCHEDULE
to indicate that the job should not be rescheduled.
To schedule a timer job, you can use the timer service as shown below:
TimerJob cancelJob = timer.schedule(clock.time()+5000, this::onOrderTimeout, cookie);
In the code snippet above, we schedule a callback function to be executed 5 seconds from the current clock time.
The schedule()
method returns a TimerJob
object that can be used to cancel the job if needed. For example, if the order is cancelled, it may be a good idea to cancel the order timeout timer:
cancelJob.cancel();
If you want to perform periodic tasks, you can return the next execution time from the callback function. Otherwise, if you don't want the timer to be rescheduled, you can return the DO_NOT_RESCHEDULE
value.
See Appendix A: Implementing order timeout for a complete end-to-end example of timer code.
Exceptions in User-defined Timer Callbacks
Unhandled exceptions that occur in user-defined timer callbacks will be caught at the algorithm's default exception handler level, resulting in ERROR-level logging. Please bear in mind that exceptions in periodic timers (i.e., timers that are intended to be rescheduled) prevent rescheduling since the exception prevents the callback from communicating the next execution time.
Summary
To wrap up this chapter, here's a more detailed design diagram of the algorithm and its supporting classes:
For an example of using Timers in an algorithm, refer to the TimedOrderAlgorithm
sample or the TWAPAlgorithm
sample.