Streaming Orders into Apache Kafka
Overview
This document explains how to configure and use Apache Kafka as data warehouse for the Execution Server (ES).
Apache Kafka is an open-source distributed event streaming platform that can be used to store Ember trading history.
Install and Configure Apache Kafka
To learn how to install and configure Apache Kafka, see the Kafka documentation.
Configure the Execution Server
Kafka Topics
Since Ember version 1.11.20, you can setup a warehouse service to export data into Kafka.
This service creates two Kafka topics:
- Messages: Encompasses all trading flow messages, including trading requests (order submissions, cancellations, modifications) and trading events (order ACKs/NACKs, cancellation ACKs/NACKs, fills, etc.). You can configure a filter to reduce this topic to trade events only (fills). This significantly reduces topic size.
- Orders: Records the final state of each order that Ember processed. This topic contains one message per order.
Ember appends journal terms to Kafka output topic names like messages_1673573978
. You can change the name prefix of the Kafka ouput topics in the Ember configuration file.
Data Export Modes
The Execution Server can export data into Kafka in two modes:
- Live Mode: A special daemon service exports data in near real-time.
- Batch Mode: A periodic process exports all recently accumulated data in batches.
Configuration
To configure the Execution Server, add the following section to $EMBER_HOME/ember.conf. The data export mode is controlled by the live
parameter.
warehouse {
kafka { # unit id, you will use it when you run the app, it might be any
live = true # the process does not wait for new data and exits if false
messages = [
${template.warehouse.kafka.messages} { # loader which loads order messages
filter.settings.inclusion = null # EVENTS or TRADES, null means no filter
loader.settings {
servers = "localhost:9092" # Kafka servers
topic = "messages" # output topic name prefix
topicFormat = "JSON"
numOfPartitions = 1 # number of partitions of topic created in Kafka
replicationFactor = 3 # replication factor of topic created in Kafka
securityProtocol = PLAINTEXT # Kafka connection protocol: PLAINTEXT, SSL
}
}
]
orders = [
${template.warehouse.kafka.orders} { # loader which loads closed orders
loader.settings {
servers = "localhost:9092" # Kafka servers
topic = "orders" # output topic name prefix
topicFormat = "JSON"
numOfPartitions = 1 # number of partitions of topic created in Kafka
replicationFactor = 3 # replication factor of topic created in Kafka
securityProtocol = PLAINTEXT # Kafka connection protocol: PLAINTEXT, SSL
}
}
]
}
}
Secure the Connection
To connect to Kafka over secure protocol (securityProtocol = SSL), include the following attributes to Kafka warehouse orders and messages settings:
sslTrustStoreLocation
: Trust store file locationsslTrustStorePassword
: Trust store file passwordsslKeyStoreLocation
: Key store file location (for two-way authentication)sslKeyStorePassword
: Key store passwordsslKeyPassword
: Private key password in the key store file
Add Kafka Settings via Java System Properties
To specify Kafka client connection properties via Java system properties:
- Prepend the configuration attributes listed in the Kafka documentation with the
kafka
prefix.
For instance, to pass the security.protocol
property to Kafka:
Include the
kafka.security.protocol
system property in theJAVA_OPTS
environment variable before running the following script:export JAVA_OPTS=-Dkafka.security.protocol=SSL
Non-null Kafka connection settings specified in the Ember configuration file override the corresponding attributes specified via system properties.
Start Exporting Data
To begin streaming Execution Server data into Kafka, use the data-warehouse
service. This service reads the Ember Journal and loads all trading messages and completed orders into Kafka Messages and Orders topics.
Active orders are not exported until they are complete (completely filled, cancelled, or rejected). However, events concerning active orders (for example, trades) appear in the Messages topic almost immediately.
Run Ember's data-warehouse
script with single argument that specifies the Kafka data warehouse:
export EMBER_HOME=/deltix/emberhome
/deltix/ember/bin/data-warehouse kafka
In Batch Mode, when the live
parameter is set to false
, this script exits as soon as all recent data is exported. If you re-run the script, it appends any new data that the Execution Server accumulated since the last export.
You can view data in Kafka using the kafka-console-consumer
script included with the Kafka server or a third-party UI. The records are stored in JSON format, with each record containing only non-null attributes.
Data Format
Orders Topic
Data about orders is written after orders become inactive (for example, filled, canceled, or rejected).
Here is the complete list of JSON attributes:
Name | Type | Description |
---|---|---|
openSequence | Long | Sequence of new order request |
closeSequence | Long | Sequence of final order event |
sourceId | String | Order source |
destinationId | String | Order destination |
orderId | String | Order ID assigned by order source |
correlationOrderId | String | ID of the first order in order-replace chain |
parentSourceId | String | Source of parent order |
parentOrderId | String | ID of parent order |
externalOrderId | String | Optional order ID assigned by execution venue |
quoteId | String | Optional ID of quote targeted by order |
Account | String | Order account |
clearingAccount | String | Clearing account |
traderId | String | ID of trader |
Symbol | String | Instrument symbol |
instrumentType | String | Instrument type |
exchangeId | String | Optional destination exchange for outbound message and source exchange for inbound messages |
currency | String | Order currency |
side | String | Order side |
timeInForce | String | Order time in force |
expireTime | Long | Order expiration time |
orderStatus | String | Final state of the order |
openTime | Long | Order submission time |
closeTime | Long | Order completion time |
orderType | String | Order type |
limitPrice | Double | Limit price |
stopPrice | Double | Stop price |
pegDifference | Double | Peg offset for pegged order types |
quantity | Double | Order quantity |
displayQuantity | Double | Minimum quantity to display on exchange floor |
minQuantity | Double | Minimum quantity to execute |
cumulativeQuantity | Double | Cumulative executed quantity |
averagePrice | Double | Average execution price |
reason | String | Reason for cancel or reject events |
vendorRejectCode | Int | Vendor specific reject code |
deltixRejectCode | Int | Reject code in Deltix classification |
userData | String | User-provided order tag |
moduleKey | String | Used to identify strategy that placed the order |
portfolioKey | String | Used to identify portfolio this order belongs to |
party | String | Describes business entity involved in trade |
clearingBroker | String | Clearing broker |
totalCommission | Double | Total trade commission |
commissionCurrency | String | Trade commission currency |
attributes | List of attributes with Int key and String value | Custom order attributes specified during order submission |
Messages Topic
The Messages topic contains all order events and request messages. Each JSON record contains the type
attribute with the value indicating the type of event or request.
Here is the complete list of attributes written to the Messages topic:
Name | Type | Description |
---|---|---|
type | String | Message type |
sequence | Long | Message sequence number |
timestamp | Long | Message timestamp |
originalTimestamp | Long | Timestamp reported by execution venue |
sourceId | String | Message source |
destinationId | String | Message destination |
orderId | String | Order ID assigned by order source |
originalOrderId | String | Original order ID in cancel-replace chain |
correlationOrderId | String | ID of the first order in order-replace chain |
parentSourceId | String | Source of parent order |
parentOrderId | String | ID of parent order |
requestId | String | Request ID |
eventId | String | Event ID |
referenceEventId | String | Used to identify previously communicated event |
orderStatus | String | Order status |
symbol | String | Instrument symbol |
instrumentType | String | Instrument type |
currency | String | Order currency |
exchangeId | String | Optional destination exchange for outbound message and source exchange for inbound messages |
traderId | String | ID for trader |
account | String | Order account |
clearingAccount | String | Order clearing account |
side | String | Order side |
timeInForce | String | Order time in force |
expireTime | Long | Order expiration time |
quantity | Double | Order quantity |
minQuantity | Double | Minimum quantity to execute |
displayQuantity | Double | Minimum quantity to display on exchange floor |
orderType | String | Order type |
limitPrice | Double | Limit price |
stopPrice | Double | Stop price |
pegDifference | Double | Peg offset for pegged order types |
averagePrice | Double | Average execution price |
cumulativeQuantity | Double | Cumulative executed quantity |
remainingQuantity | Double | Remaining order quantity |
tradePrice | Double | Order trade price |
tradeQuantity | Double | Order trade quantity |
commission | Double | Trade commission |
commissionCurrency | String | Trade commission currency |
counterPartySourceId | String | Source of the other side of the trade |
counterPartyOrderId | String | Order ID of the other side of the trade |
settlementDate | Long | Trade settlement date |
tradeDate | Long | Trade date |
reason | String | Reason of cancel and reject events |
vendorRejectCode | Int | Vendor specific reject code |
deltixRejectCode | Int | Reject code in Deltix classification |
multiLegReportingType | String | Identifies single-leg trade or whole contract trade of multi-legged security |
aggressorSide | String | Trade aggressor side |
orderUnknown | Boolean | Flag used by order cancel reject events |
cancelType | String | Enum used by cancel events |
execRestatementReason | String | Used by order restate events to classify restate type |
flags | Int | Bitmask containing various order flags |
userData | String | User-provided order tag |
moduleKey | String | Used to identify strategy that placed the order |
portfolioKey | String | Used to identify portfolio this order belongs to |
party | String | Describes business entity involved in trade |
clearingBroker | String | Clearing broker |
attributes | List of attributes with Int key and String value | Custom order attributes specified during order submission |