Skip to main content

Streaming Orders and History into ClickHouse

Overview

This document explains how to configure and use ClickHouse as data warehouse for the Ember. ClickHouse is an open source, column-oriented database management system, capable of generating real-time analytical data reports using SQL queries.

Install ClickHouse

To install and run ClickHouse on CentOS, run the following code:

sudo yum install -y curl
curl -s https://packagecloud.io/install/repositories/altinity/clickhouse/script.rpm.sh | sudo bash
sudo yum install -y clickhouse-server clickhouse-client

Configure ClickHouse

When connecting a client to work with the data, allow ClickHouse to accept incoming connections by editing the /etc/clickhouse-server/config.xml configuration file:

  1. Locate the listen_host configuration element.

  2. Uncomment it and set it to the local IP address of your server using the following code:

    <listen_host>**_10.0.0.59_**</listen_host>

    If you plan to use the Tabbix GUI to access ClickHouse data, find and uncomment the following fragment:

    <http_server_default_response><![CDATA[<html ng-app="SMI2"><head><base href="http://ui.tabix.io/"></head><body><div ui-view="" class="content-ui"></div><script src="http://loader.tabix.io/master.js"></script></body></html>]]></http_server_default_response>
  3. Define two users:

    • ember: For the Ember to load data into ClickHouse.
    • webviewer: A read-only user that will have remote access to loaded data.

A sample user configuration file can be found in Appendix C. Make sure to change the highlighted areas of the sample.

Security

Like any database, ClickHouse needs to be secured. Ideally, you should only access it from the local network. If you absolutely must connect to ClickHouse via the Internet, follow these guidelines:

  • Limit your firewall's ClickHouse port to known IP addresses (whitelist source IPs for accessing the ClickHouse web interface). For AWS, this means a Security Group rule with specific source IPs. Do not rely on ClickHouse user network settings alone.
  • Enable SSL using ClickHouse SSL configuration or by placing it behind an SSL proxy like Amazon Load Balancer with TLS and an AWS-managed certificate.
  • Use a strong password for the ClickHouse web user.
  • As an additional measure of security, place a ClickHouse web port behind NGINX. Doing so can provide additional protection from some HTTP attacks and stronger access logs.

Please be advised that a distributed setup of ClickHouse requires a "default" user with an empty password. Thus, the configuration settings for access restriction are applicable only for a single server setup.

Run ClickHouse

To run ClickHouse as a service, use the folllowing code:

sudo service clickhouse-server start

Verify Installation

To verify your installation, execute a simple command like “SELECT 1” using the ClickHouse CLI client:

sudo clickhouse-client

Connect to Tabix

Tabix is an open source SQL editor GUI for ClickHouse.

To connect your production setup to Tabix:

  1. Enter a name for your setup.
  2. For the port, use a public DNS name or the IP address of your ClickHouse installation.
    Make sure that this port is open in your server’s firewall and security group.
  3. In the Login field, add a user.
  4. Enter your password.
  5. In the Extend params query field, enter key1=value&key2=value.
  6. Click Sign In.

In the following screenshot, we use default port 8123 and a default user to connect to Tabix.

Clickhose connection form

Configure the Ember

The Ember can export data into ClickHouse in two modes:

  • Live Mode: A special daemon service exports data in near real-time.
  • Batch Mode: A periodic process exports all recently accumulated data in batches.

To configure the Ember, add the following section to $EMBER_HOME/ember.conf:

warehouse {
clickhouse { # unit id, you will use it when you run the app, it might be any
live = true # false for batch mode (true is default)
commitPeriod = 5s # repeatedly commits not full batch at the specified period (5s is default)

messages = [
${template.warehouse.clickhouse.messages} { # loader which loads order messages
filter.settings.inclusion = null # EVENTS or TRADES, null means no filter

loader.settings { # default # description
host = "10.10.87.123" # - # server host or ip
port = 8123 # 8123 # server port

username = "john" # - # connection credentials
password = "******" # - # connection credentials

databaseName = "ember" # ember # database name
tableName = "messages" # messages # table name

createDatabase = true # true # create database if not exists
createTable = true # true # create table if not exists
dropTable = false # false # drop table if exists

batchLimit = 64K # 64K # gathers a batch and sends it once it is full
}
}
]

orders = [
${template.warehouse.clickhouse.orders} { # loader which loads closed orders
loader.settings { # default # description
host = "10.10.87.123" # - # server host or ip
port = 8123 # 8123 # server port

username = "john" # - # connection credentials
password = "doe" # - # connection credentials

databaseName = "ember" # ember # database name
tableName = "orders" # orders # table name

createDatabase = true # true # create database if not exists
createTable = true # true # create table if not exists
dropTable = false # false # drop table if exists

batchLimit = 64K # 64K # gathers a batch and sends it once it is full
}
}
]
}
}

Start Exporting Data

To begin streaming Ember data into ClickHouse, use the data-warehouse service. This service reads the Ember Journal and converts all trading messages and completed orders into the ClickHouse Messages and Orders tables.

note

Active orders are not exported until they are complete (completely filled, cancelled, or rejected). However, messages concerning active orders are exported immediately.

In batch mode, the following script exits as soon as all recent data is exported:

export EMBER_HOME=/deltix/emberhome
/deltix/ember/bin/data-warehouse clickhouse

If you re-run the script, it appends any new data that the Ember accumulated since its last export.

Clickhouse tables

Sample Queries

select distinct SourceId, DestinationId, OrderType from orders
select sum(( Side == 'BUY' ? CumulativeQuantity : -CumulativeQuantity))
from orders where CumulativeQuantity != 0 and TraderId = 'OE51' and Account = 'Gold' limit 100
select * from messages where Type = 'OrderTradeReportEvent' limit 100
select distinct SourceId, DestinationId, OrderType from orders
select sum(( Side == 'BUY' ? CumulativeQuantity : -CumulativeQuantity))
from orders where CumulativeQuantity != 0 and TraderId = 'OE51' and Account = 'Gold'

Unquoted identifiers are folded to lowercase in ClickHouse; in metadata and many clients you will see names like traderid, sourceid. The samples above use PascalCase as in the loader DDL, which ClickHouse accepts and folds consistently.

Message Identity

Messages table: Each row can be identified by the composite key {term, sequence}.

  • sequence comes from the upstream Ember Order Management System (OMS).
    • Each message processed by the OMS is assigned a unique sequence number.
    • Not all messages are stored into the data warehouse. For example, some internal system control messages are skipped. Hence, you may see gaps in sequence numbers which otherwise increase monotonously.
    • System operators may periodically clear Ember’s journal. This action restarts message sequence numbers (and increases term). When this happens, new messages start at a different term.
  • term identifies journal creation time.
    • term remains the same for all messages written since Ember journal creation.
    • Each time the journal is cleared and re-created, the value specified as term increases.

Orders table: There is no column named sequence. Each row stores opensequence and closesequence. Use {term, closesequence} for ordering comparable to message sequence; use {sourceid, orderid} with term to refer to an order lifecycle.

Example

termsequenceData
15463008001OrderNewRequest
15463008003OrderNewEvent
.........
1546300800413891The last message before journal is reset
Operator resets Ember’s journal (will result in new term and message sequence reset)
15619392001OrderCancelRequest
15619392002OrderCancelRequest
.........
note

You can rely on the fact that {term, sequence} are always increasing to implement the before-after ordering of messages.

Real-life Examples

Messages Table Cleanup

In this example, a client accumulated massive amounts of messages in the database and decided to reduce the dataset to trades only instead of scaling ClickHouse up. The CREATE TABLE DDL below is illustrative (older DateTime / MergeTree order); your live table should match what the Ember loader created (DateTime64(3), MergeTree order/partition as in the current pipeline—see Appendix B).

create table ember.messagestemp (Type Enum8(''=-1,'OrderNewRequest'=0,'OrderReplaceRequest'=1,'OrderCancelRequest'=2,'OrderStatusRequest'=3,'OrderDiscardRequest'=4,'OrderPendingNewEvent'=5,'OrderNewEvent'=6,'OrderRejectEvent'=7,'OrderPendingCancelEvent'=8,'OrderCancelEvent'=9,'OrderCancelRejectEvent'=10,'OrderPendingReplaceEvent'=11,'OrderReplaceEvent'=12,'OrderReplaceRejectEvent'=13,'OrderTradeReportEvent'=14,'OrderTradeCancelEvent'=15,'OrderTradeCorrectEvent'=16,'OrderStatusEvent'=17,'OrderRestateEvent'=18),Term Int64,Sequence Int64,Timestamp DateTime,SourceId String,DestinationId String,OrderId String,OriginalOrderId String,CorrelationOrderId String,ParentSourceId String,ParentOrderId String,RequestId String,ExternalOrderId String,EventId String,ReferenceEventId String,OrderStatus Enum8(''=-1,'PENDING_NEW'=0,'NEW'=1,'REJECTED'=2,'PENDING_CANCEL'=3,'CANCELED'=4,'PENDING_REPLACE'=5,'REPLACED'=6,'PARTIALLY_FILLED'=7,'COMPLETELY_FILLED'=8,'EXPIRED'=9,'SUSPENDED'=10),Symbol String,InstrumentType Enum8(''=-1,'EQUITY'=0,'ETF'=1,'BOND'=2,'INDEX'=3,'FX'=4,'OPTION'=5,'FUTURE'=6,'SYNTHETIC'=7,'CUSTOM'=8),Currency String,ExchangeId String,TraderId String,Account String,ClearingAccount String,Side Enum8(''=-1,'BUY'=0,'SELL'=1,'SELL_SHORT'=2,'SELL_SHORT_EXEMPT'=3),TimeInForce Enum8(''=-1,'DAY'=0,'GOOD_TILL_CANCEL'=1,'AT_THE_OPENING'=2,'IMMEDIATE_OR_CANCEL'=3,'FILL_OR_KILL'=4,'GOOD_TILL_CROSSING'=5,'GOOD_TILL_DATE'=6,'AT_THE_CLOSE'=7),ExpireTime DateTime,Quantity Nullable(Decimal128(12)),MinQuantity Nullable(Decimal128(12)),DisplayQuantity Nullable(Decimal128(12)),OrderType Enum8(''=-1,'CUSTOM'=0,'MARKET'=1,'LIMIT'=2,'STOP'=3,'STOP_LIMIT'=4,'PEG_TO_MARKET'=5,'PEG_TO_MIDPOINT'=6,'PEG_TO_PRIMARY'=7,'MARKET_ON_CLOSE'=8,'LIMIT_ON_CLOSE'=9,'LIMIT_OR_BETTER'=10,'PREVIOUSLY_QUOTED'=11),LimitPrice Nullable(Decimal128(12)),StopPrice Nullable(Decimal128(12)),PegDifference Nullable(Decimal128(12)),AveragePrice Nullable(Decimal128(12)),CumulativeQuantity Nullable(Decimal128(12)),RemainingQuantity Nullable(Decimal128(12)),TradePrice Nullable(Decimal128(12)),TradeQuantity Nullable(Decimal128(12)),Commission Nullable(Decimal128(12)),CommissionCurrency String,CounterPartySourceId String,CounterPartyOrderId String,SettlementDate DateTime,TradeDate DateTime,Reason String,VendorRejectCode Int32,DeltixRejectCode Int32,MultiLegReportingType Enum8(''=-1,'SINGLE_SECURITY'=0,'INDIVIDUAL_LEG_SECURITY'=1,'MULTI_LEG_SECURITY'=2),AggressorSide Enum8(''=-1,'ORDER_INITIATOR_IS_PASSIVE'=0,'ORDER_INITIATOR_IS_AGGRESSOR'=1),OrderUnknown Enum8('false'=0,'true'=1),CancelType Enum8(''=-1,'GENERAL'=0,'DONE_FOR_DAY'=1,'EXPIRED'=2),ExecRestatementReason Enum8(''=-1,'GT_RENEWAL_RESTATEMENT'=0,'VERBAL_CHANGE'=1,'REPRICING_OF_ORDER'=2,'BROKER_OPTION'=3,'PARTIAL_DECLINE_OF_ORDER_QTY'=4,'CANCEL_ON_TRADING_HALT'=5,'CANCEL_ON_SYSTEM_FAILURE'=6,'MARKET_OR_EXCHANGE_OPTION'=7,'CANCELED_NOT_EST'=8,'WAREHOUSE_RECAP'=9,'OTHER'=10),Flags Int32,UserData String,Attributes Nested (Key Int32,Value String)) engine = MergeTree() order by (CorrelationOrderId)

insert into ember.messagestemp select * from ember.messages where Type = 'OrderTradeReportEvent'

rename table ember.messages to ember.messagestemp2

rename table ember.messagestemp to ember.messages

Appendix: Data Format

This appendix matches the tables created by the Ember ClickHouse loaders. For trading semantics, see the Order Entry API document.

Identifier naming (tables vs columns)

ClickHouse lowercases unquoted identifiers in metadata and queries. The column names below are listed in lowercase (e.g. traderid, sourcetype)—use these in SQL unless you quote identifiers.

Tables are created as databaseName.tableName (for example ember.messages, ember.orders). The messages table uses MergeTree ordered by Timestamp and partitioned by day of Timestamp; the orders table is ordered by CloseTime and partitioned by day of CloseTime, with minmax indexes on Term / Sequence or CloseSequence as defined in the loader.

Orders Table

The Orders table captures the final state of each order.

ColumnTypeExampleDescription
termInt641546300800Identifies sequence term. See “Message Identity”. Since: Ember 1.4
opensequenceInt64312321304Sequence number of the message that created the order.
closesequenceInt64312321312Sequence number of the message that completed the order.
sourceidStringCLIENT52Order source (stored as string; was historically 10-char alphanumeric).
destinationidStringTWAPOrder destination.
orderidStringICAP1983EEIdentifies the order for Ember, unique per source.
parentsourceidStringCONTROLParent order source (optional).
parentorderidStringICAP321XX1Parent order id (optional).
externalorderidStringZZ132131Venue-assigned id (optional).
accountStringGoldAccount.
clearingaccountStringClearing account (optional).
traderidStringjdoeTrader.
symbolStringEUR/USDSymbol.
instrumenttypeEnum8FXInstrument type (Enum8 values from trading model).
exchangeidStringHOTSPOTVenue / exchange id.
currencyStringUSDCurrency.
sideEnum8BUYSide (Enum8).
timeinforceEnum8DAYTime in force (Enum8).
expiretimeDateTime64(3)2019-02-27 17:00:00.000Expire time (GOOD_TILL_DATE).
orderstatusEnum8CANCELLEDFinal status (Enum8).
opentimeDateTime64(3)2019-02-27 16:51:48.002Open time.
closetimeDateTime64(3)2019-02-27 16:51:48.120Close time.
ordertypeEnum8PEG_TO_MIDPOINTOrder type (Enum8).
limitpriceNullable(Decimal128(12))1.33Limit price.
stoppriceNullable(Decimal128(12))1.20Stop price.
quantityNullable(Decimal128(12))10000Quantity.
displayquantityNullable(Decimal128(12))1000Display quantity.
minquantityNullable(Decimal128(12))1000Min quantity.
cumulativequantityNullable(Decimal128(12))1500.50Filled quantity.
averagepriceNullable(Decimal128(12))132.56Average price.
reasonString“Cancelled by user request”Cancel/reject reason text.
vendorrejectcodeInt321003Vendor reject code.
deltixrejectcodeInt32120Deltix reject code.
userdataStringAlgo123User tag.
modulekeyStringModule key.
portfoliokeyStringPortfolio key.
partyStringParty.
clearingbrokerStringClearing broker.
settlementdateDateTime64(3)Settlement date.
attributes.keynested Int326201Custom attributes: nested column (paired with attributes.value). FIX tag number.
attributes.valuenested StringQuantXCustom attributes: nested column; value text.

Messages Table

The Messages table records order-related activity in real time. Column semantics match the other warehouses; layout follows the loader (ExchangeId / TraderId after Currency, ClearingAccount before Side).

ColumnTypeExampleDescription
typeEnum8OrderTradeReportEventMessage type (Enum8).
termInt641546300800Term. See “Message Identity”.
sequenceInt64312321312Message sequence.
timestampDateTime64(3)2019-02-27 16:51:48.123Event time.
sourceidStringCLIENT52Source.
destinationidStringTWAPDestination.
orderidStringICAP1983EE23Order id.
originalorderidStringICAP1983EE22Original order in replace chain.
correlationorderidStringICAP1983EE00Correlation id.
parentsourceidStringCONTROLParent source.
parentorderidStringICAP321XX1Parent order.
requestidStringXCL#554Cancel request id.
externalorderidStringZZ132131External id.
eventidStringAAAT31231Event id.
referenceeventidStringReference event.
orderstatusEnum8PARTIALLY_FILLEDStatus on events (Enum8).
symbolStringEUR/USDSymbol.
instrumenttypeEnum8FXInstrument type (Enum8).
currencyStringUSDCurrency.
exchangeidStringHOTSPOTExchange id.
traderidStringjdoeTrader id.
accountStringGoldAccount.
clearingaccountStringClearing account.
sideEnum8BUYSide (Enum8).
timeinforceEnum8GOOD_TILL_CANCELTIF (Enum8).
expiretimeDateTime64(3)2019-02-27 17:00:00.000Expire time.
quantityNullable(Decimal128(12))100.50Quantity.
minquantityNullable(Decimal128(12))10Min quantity.
displayquantityNullable(Decimal128(12))5Display quantity (venue-dependent).
ordertypeEnum8LIMITOrder type (Enum8).
limitpriceNullable(Decimal128(12))1.33Limit price.
stoppriceNullable(Decimal128(12))1.20Stop price.
pegdifferenceNullable(Decimal128(12))0.03Peg difference.
averagepriceNullable(Decimal128(12))1.325Average price.
cumulativequantityNullable(Decimal128(12))25Cumulative qty.
remainingquantityNullable(Decimal128(12))75.50Remaining qty.
tradepriceNullable(Decimal128(12))1.321Trade price.
tradequantityNullable(Decimal128(12))5Trade size.
commissionNullable(Decimal128(12))0.0001Commission.
commissioncurrencyStringUSDCommission currency.
counterpartysourceidStringJOHNCounterparty source.
counterpartyorderidStringFED76123155Counterparty order.
settlementdateDateTime64(3)Settlement.
tradedateDateTime64(3)Trade date.
reasonStringMarket is closedReason text.
vendorrejectcodeInt321003Vendor reject.
deltixrejectcodeInt32120Deltix reject.
multilegreportingtypeEnum8Multi-leg reporting (Enum8).
aggressorsideEnum8Aggressor side (Enum8).
orderunknownEnum8falseCancel-reject flag (Enum8('false'=0,'true'=1)).
canceltypeEnum8Cancel type (Enum8).
execrestatementreasonEnum8Restate reason (Enum8).
flagsInt323Bitmask flags.
userdataStringFoo152User tag.
modulekeyStringModule key.
portfoliokeyStringPortfolio key.
partyStringParty.
clearingbrokerStringClearing broker.
attributes.keynested Int326201Nested custom attributes (paired with attributes.value).
attributes.valuenested StringQuantXNested custom attribute values.

Appendix B: Column Data Types (reference)

Types below are what the loader emits in CREATE TABLE. Enum8 definitions list one value per allowed enum name (empty label '' maps to -1); exact literals match your Ember build. Use Nullable(Decimal128(12)) for decimals and DateTime64(3) for timestamps.

Orders

ColumnClickHouse type
term, opensequence, closesequenceInt64
sourceid … externalorderid, account, clearingaccount, traderid, symbol, exchangeid, currency, userdata, modulekey, portfoliokey, party, clearingbrokerString
instrumenttype, side, timeinforce, orderstatus, ordertypeEnum8(...)
expiretime, opentime, closetime, settlementdateDateTime64(3)
limitprice … averagepriceNullable(Decimal128(12))
reasonString
vendorrejectcode, deltixrejectcodeInt32
userdataString
attributesNested (Key Int32, Value String)

Messages

ColumnClickHouse type
type, instrumenttype, orderstatus, side, timeinforce, ordertype, multilegreportingtype, aggressorside, canceltype, execrestatementreasonEnum8(...)
orderunknownEnum8('false'=0,'true'=1)
term, sequenceInt64
timestamp, expiretime, settlementdate, tradedateDateTime64(3)
quantity … commissionNullable(Decimal128(12))
vendorrejectcode, deltixrejectcode, flagsInt32
attributesNested (Key Int32, Value String)
remaining string columnsString

Appendix C: ClickHouse User Database Sample

<?xml version="1.0"?>
<yandex>
<profiles>
<default>
<!-- Maximum memory usage for processing single query, in bytes. -->
<max_memory_usage>10000000000</max_memory_usage>

<!-- Use cache of uncompressed blocks of data. Meaningful only for processing many of very short queries. -->
<use_uncompressed_cache>0</use_uncompressed_cache>

<!-- How to choose between replicas during distributed query processing.
random - choose random replica from set of replicas with minimum number of errors
nearest_hostname - from set of replicas with minimum number of errors, choose replica
with minimum number of different symbols between replica's hostname and local hostname
(Hamming distance).
in_order - first live replica is chosen in specified order.
-->
<load_balancing>random</load_balancing>
</default>

<!-- Profile that allows only read queries. -->
<readonly>
<readonly>2</readonly> <!-- read data and change settings queries are allowed -->
</readonly>
</profiles>
<users>
<ember>
<!-- TODO: Put an actual password here -->
<password>EMBERUSERPASSWORD</password>
<networks replace="replace">
<!-- List of networks with open access.-->
<!-- TODO: Specify here IP of ember-->
<ip>127.0.0.1</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</ember>
<webviewer>
<!-- TODO: Put an actual password here -->
<password>WEBVIEWERPASSWORD</password>
<networks replace="replace">
<!-- List of networks with open access.-->
<!-- TODO: Specify here company public subnet addresses-->
<ip>::/0</ip>
<ip>::1</ip>
<ip>127.0.0.1</ip>
<ip>217.23.114.0/24</ip>
</networks>
<profile>readonly</profile>
<quota>default</quota>
</webviewer>
</users>

<!-- Quotas. -->
<quotas>
<!-- Name of quota. -->
<default>
<!-- Limits for time interval. You could specify many intervals with different limits. -->
<interval>
<!-- Length of interval. -->
<duration>3600</duration>

<!-- No limits. Just calculate resource usage for time interval. -->
<queries>0</queries>
<errors>0</errors>
<result_rows>0</result_rows>
<read_rows>0</read_rows>
<execution_time>0</execution_time>
</interval>
</default>
</quotas>
</yandex>

References

Change Log

Ember Version 1.4

  • Added TERM (Int64) column to Orders and Messages tables.

Ember Version 1.1

  • Table Orders had datatype of CURRENCY field changed from VARCHAR to CHAR(10).
  • Table Messages had field CounterpartyId split into CounterPartySourceId and CounterPartyOrderId fields.