Skip to main content

Streaming Orders into Apache Kafka

Overview

This document explains how to configure and use Apache Kafka as data warehouse for the Execution Server (ES).

Apache Kafka is an open-source distributed event streaming platform that can be used to store Ember trading history.

Install and Configure Apache Kafka

To learn how to install and configure Apache Kafka, see the Kafka documentation.

Configure the Execution Server

Kafka Topics

Since Ember version 1.11.20, you can setup a warehouse service to export data into Kafka.

This service creates two Kafka topics:

  • Messages: Encompasses all trading flow messages, including trading requests (order submissions, cancellations, modifications) and trading events (order ACKs/NACKs, cancellation ACKs/NACKs, fills, etc.). You can configure a filter to reduce this topic to trade events only (fills). This significantly reduces topic size.
  • Orders: Records the final state of each order that Ember processed. This topic contains one message per order.

Ember appends journal terms to Kafka output topic names like messages_1673573978. You can change the name prefix of the Kafka ouput topics in the Ember configuration file.

Data Export Modes

The Execution Server can export data into Kafka in two modes:

  • Live Mode: A special daemon service exports data in near real-time.
  • Batch Mode: A periodic process exports all recently accumulated data in batches.

Configuration

To configure the Execution Server, add the following section to $EMBER_HOME/ember.conf. The data export mode is controlled by the live parameter.

warehouse {
kafka { # unit id, you will use it when you run the app, it might be any
live = true # the process does not wait for new data and exits if false

messages = [
${template.warehouse.kafka.messages} { # loader which loads order messages
filter.settings.inclusion = null # EVENTS or TRADES, null means no filter

loader.settings {
servers = "localhost:9092" # Kafka servers
topic = "messages" # output topic name prefix
topicFormat = "JSON"
numOfPartitions = 1 # number of partitions of topic created in Kafka
replicationFactor = 3 # replication factor of topic created in Kafka
securityProtocol = PLAINTEXT # Kafka connection protocol: PLAINTEXT, SSL
}
}
]

orders = [
${template.warehouse.kafka.orders} { # loader which loads closed orders
loader.settings {
servers = "localhost:9092" # Kafka servers
topic = "orders" # output topic name prefix
topicFormat = "JSON"
numOfPartitions = 1 # number of partitions of topic created in Kafka
replicationFactor = 3 # replication factor of topic created in Kafka
securityProtocol = PLAINTEXT # Kafka connection protocol: PLAINTEXT, SSL
}
}
]
}
}

Secure the Connection

To connect to Kafka over secure protocol (securityProtocol = SSL), include the following attributes to Kafka warehouse orders and messages settings:

  • sslTrustStoreLocation: Trust store file location
  • sslTrustStorePassword: Trust store file password
  • sslKeyStoreLocation: Key store file location (for two-way authentication)
  • sslKeyStorePassword: Key store password
  • sslKeyPassword: Private key password in the key store file

Add Kafka Settings via Java System Properties

To specify Kafka client connection properties via Java system properties:

For instance, to pass the security.protocol property to Kafka:

  • Include the kafka.security.protocol system property in the JAVA_OPTS environment variable before running the following script:

    export JAVA_OPTS=-Dkafka.security.protocol=SSL
caution

Non-null Kafka connection settings specified in the Ember configuration file override the corresponding attributes specified via system properties.

Start Exporting Data

To begin streaming Execution Server data into Kafka, use the data-warehouse service. This service reads the Ember Journal and loads all trading messages and completed orders into Kafka Messages and Orders topics.

note

Active orders are not exported until they are complete (completely filled, cancelled, or rejected). However, events concerning active orders (for example, trades) appear in the Messages topic almost immediately.

Run Ember's data-warehouse script with single argument that specifies the Kafka data warehouse:

export EMBER_HOME=/deltix/emberhome
/deltix/ember/bin/data-warehouse kafka

In Batch Mode, when the live parameter is set to false, this script exits as soon as all recent data is exported. If you re-run the script, it appends any new data that the Execution Server accumulated since the last export.

You can view data in Kafka using the kafka-console-consumer script included with the Kafka server or a third-party UI. The records are stored in JSON format, with each record containing only non-null attributes.

Data Format

Orders Topic

Data about orders is written after orders become inactive (for example, filled, canceled, or rejected).

Here is the complete list of JSON attributes:

NameTypeDescription
openSequenceLongSequence of new order request
closeSequenceLongSequence of final order event
sourceIdStringOrder source
destinationIdStringOrder destination
orderIdStringOrder ID assigned by order source
correlationOrderIdStringID of the first order in order-replace chain
parentSourceIdStringSource of parent order
parentOrderIdStringID of parent order
externalOrderIdStringOptional order ID assigned by execution venue
quoteIdStringOptional ID of quote targeted by order
AccountStringOrder account
clearingAccountStringClearing account
traderIdStringID of trader
SymbolStringInstrument symbol
instrumentTypeStringInstrument type
exchangeIdStringOptional destination exchange for outbound message and source exchange for inbound messages
currencyStringOrder currency
sideStringOrder side
timeInForceStringOrder time in force
expireTimeLongOrder expiration time
orderStatusStringFinal state of the order
openTimeLongOrder submission time
closeTimeLongOrder completion time
orderTypeStringOrder type
limitPriceDoubleLimit price
stopPriceDoubleStop price
pegDifferenceDoublePeg offset for pegged order types
quantityDoubleOrder quantity
displayQuantityDoubleMinimum quantity to display on exchange floor
minQuantityDoubleMinimum quantity to execute
cumulativeQuantityDoubleCumulative executed quantity
averagePriceDoubleAverage execution price
reasonStringReason for cancel or reject events
vendorRejectCodeIntVendor specific reject code
deltixRejectCodeIntReject code in Deltix classification
userDataStringUser-provided order tag
moduleKeyStringUsed to identify strategy that placed the order
portfolioKeyStringUsed to identify portfolio this order belongs to
partyStringDescribes business entity involved in trade
clearingBrokerStringClearing broker
totalCommissionDoubleTotal trade commission
commissionCurrencyStringTrade commission currency
attributesList of attributes with Int key and String valueCustom order attributes specified during order submission

Messages Topic

The Messages topic contains all order events and request messages. Each JSON record contains the type attribute with the value indicating the type of event or request.

Here is the complete list of attributes written to the Messages topic:

NameTypeDescription
typeStringMessage type
sequenceLongMessage sequence number
timestampLongMessage timestamp
originalTimestampLongTimestamp reported by execution venue
sourceIdStringMessage source
destinationIdStringMessage destination
orderIdStringOrder ID assigned by order source
originalOrderIdStringOriginal order ID in cancel-replace chain
correlationOrderIdStringID of the first order in order-replace chain
parentSourceIdStringSource of parent order
parentOrderIdStringID of parent order
requestIdStringRequest ID
eventIdStringEvent ID
referenceEventIdStringUsed to identify previously communicated event
orderStatusStringOrder status
symbolStringInstrument symbol
instrumentTypeStringInstrument type
currencyStringOrder currency
exchangeIdStringOptional destination exchange for outbound message and source exchange for inbound messages
traderIdStringID for trader
accountStringOrder account
clearingAccountStringOrder clearing account
sideStringOrder side
timeInForceStringOrder time in force
expireTimeLongOrder expiration time
quantityDoubleOrder quantity
minQuantityDoubleMinimum quantity to execute
displayQuantityDoubleMinimum quantity to display on exchange floor
orderTypeStringOrder type
limitPriceDoubleLimit price
stopPriceDoubleStop price
pegDifferenceDoublePeg offset for pegged order types
averagePriceDoubleAverage execution price
cumulativeQuantityDoubleCumulative executed quantity
remainingQuantityDoubleRemaining order quantity
tradePriceDoubleOrder trade price
tradeQuantityDoubleOrder trade quantity
commissionDoubleTrade commission
commissionCurrencyStringTrade commission currency
counterPartySourceIdStringSource of the other side of the trade
counterPartyOrderIdStringOrder ID of the other side of the trade
settlementDateLongTrade settlement date
tradeDateLongTrade date
reasonStringReason of cancel and reject events
vendorRejectCodeIntVendor specific reject code
deltixRejectCodeIntReject code in Deltix classification
multiLegReportingTypeStringIdentifies single-leg trade or whole contract trade of multi-legged security
aggressorSideStringTrade aggressor side
orderUnknownBooleanFlag used by order cancel reject events
cancelTypeStringEnum used by cancel events
execRestatementReasonStringUsed by order restate events to classify restate type
flagsIntBitmask containing various order flags
userDataStringUser-provided order tag
moduleKeyStringUsed to identify strategy that placed the order
portfolioKeyStringUsed to identify portfolio this order belongs to
partyStringDescribes business entity involved in trade
clearingBrokerStringClearing broker
attributesList of attributes with Int key and String valueCustom order attributes specified during order submission