Skip to main content

WebSocket API feed gateway

The service publishes periodical snapshots and trades via WebSocket.

Usage

This section describes deployment command line parameter this service supports.

wsfeed

Usage:
wsfeed.sh -i 0.0.0.0 -p 8080 -t \ This runs the server on
dxtick://127.0.0.1:8011 -s prices -e 1000 \ '0.0.0.0' (all network
-z 10 -k 15 interfaces) and port '8080' to
publish the price data from
Timebase (snapshots and
trades)
'dxtick://127.0.0.1:8011' and
stream 'prices'. Snapshots
(not aggregated books) will be
pushed every 1000 milliseconds
with maximum 10 levels. If
there is no any pricing data
for 15 seconds, the server
terminates itself. Management
REST server runs on
'127.0.0.1:8081' for JSON and
'127.0.0.1:8082' for plain
text by default.
Options:
-i <IFC> or --interface-ws \ IP address of the local interface for
<IFC> Websocket server. Default: 127.0.0.1

-p <PORT> or --port-ws <PORT> Local port to listen to for Websocket
server. Default: 8080

-l or --ssl Use self-signed SSL.

-t <URL> or --timebase <URL> Timebase URL. Default:
dxtick://timebase:8011

-s <STREAMS> or --streams \ Timebase streams to fetch the market data
<STREAMS> from. Use ',' delimiter to split names.

-c <CHANNELS> or --channels \ Channels to consume. A combination of
<CHANNELS> channel codes to consume: s (snapshot), t
(trade). Default: st

-m <TYPE> or --timestamp <TYPE> Timestamp type: none (no timestamp), sys
(system time), ts (timestamp of TB
message), ots (originalTimestamp of TB
message). Default: ts

-d <PROPERTIES> or \ Additional trade properties to be
--trade-props <PROPERTIES> published. Argument's value may consists
of all following codes: c (condition), t
(trade type), s (aggressor side). For
example: cs (to publish condition AND
aggressor side).

-e <PERIOD> or \ Period to send snapshots in milliseconds.
--snapshot-period <PERIOD> Default: 1000

-z <SIZE> or --snapshot-size \ Size of the snapshot. In the aggregated
<SIZE> book mode this is the max size of one
single source's book exposed. Default: 10

-a or --aggregated Process L2 data for multiple
sources/exchanges and publish the
aggregated book. In this mode
'snapshot-size' specifies the maximum
numbers of price levels for one single
source to be added to the aggregated book.

-k <TIMEOUT> or --kill \ Kill the process if there is no market
<TIMEOUT> data for the specified timeout in seconds.

-r <TIMEOUT> or --reconnect \ Reconnect to TimeBase if there is no
<TIMEOUT> market data for the specified timeout in
seconds. Is used only if -k <TIMEOUT> or
--kill <TIMEOUT> isn't specified. Default:
60

-n <IFC> or --interface-mgmt \ IP address of the local interface for the
<IFC> REST management server. Default: 127.0.0.1

-o <PORT> or --port-mgmt <PORT> Local port to listen to for the REST
management server in JSON. Default: 8081

-x <PORT> or --port-txt-mgmt \ Local port to listen to for the REST
<PORT> management server in plain text. Default:
8082

-h or --help This help.

-? This help.

Market Data Websocket API

The service receives all Websocket connections on ws(s)://INTERFACE_WS:PORT_WS/websocket/v1

Subscribe

Periodical Snapshots
{
"type": "subscribe",
"symbols": [
"FVM1",
"FVU1"
],
"channel": "snapshot"
}
Trades
{
"type": "subscribe",
"symbols": [
"FVM1",
"FVU1"
],
"channel": "trade"
}

Unsubscribe

Periodical Snapshots
{
"type": "unsubscribe",
"symbols": [
"FVM1"
],
"channel": "snapshot"
}
Trades
{
"type": "unsubscribe",
"symbols": [
"FVM1"
],
"channel": "trade"
}

Response payload

Periodical Snapshots
Non-aggregated Books
{
"type": "snapshot",
"symbol": "FVU1",
"timestamp": "2021-04-23T18:25:43.511000Z",
"bids": [
[1343.05, 20],
[1340.95, 40]
],
"asks": [
[1360.00, 10],
[1365.95, 50]
]
}
Aggregated Books
{
"type": "snapshot",
"symbol": "FVU1",
"timestamp": "2021-04-23T18:25:43.511000Z",
"bids": [
{"price": 1343.05, "size": 20, "source": "ABC"},
{"price": 1340.95, "size": 40, "source": "BCD"}
],
"asks": [
{"price": 1360.00, "size": 10, "source": "ABC"},
{"price": 1361.00, "size": 30, "source": "ABC"},
{"price": 1361.00, "size": 10, "source": "BCD"},
{"price": 1365.95, "size": 50, "source": "BCD"}
]
}
Trades

Message payload has "execution": [ pricePrice, tradeSize ] format:

Non-aggregated Books
{
"type": "trade",
"symbol": "FVU1",
"timestamp": "2021-04-23T10:35:40.887000Z",
"execution": [1361.05, 100]
}
Aggregated Books
{
"type": "trade",
"symbol": "FVU1",
"timestamp": "2021-04-23T10:35:40.887000Z",
"execution": [1361.05, 100],
"source": "ABC"
}

Market Data Management REST API

The service provides a management REST API. Two different ports are used for answers in JSON and plain text. Default port for JSON is 8081, and for plain text 8082 (see Usage).

    GET
/v1/status
Returns current status of the service.

/v1/shutdown
Shutdowns the service gracefully.

/v1/symbols
Returns a list of all symbols, the market data was processed for.

/v1/book/{symbol}
Returns current snapshot of the book for specified symbol.
Parameters: Symbol to show the book for.

/v1/jvm/info
Returns JVM info.

/v1/jvm/threaddump
Returns thread dump.

/v1/help
This help.

WebSocket Client

Here we show how to consume this feed from simple Java app (Netty-based):

    final SnapshotChannelListener snapshotListener = new SnapshotChannelListener() {
private final StringBuilder book = new StringBuilder();

@Override
public void onSnapshotStared(final CharSequence symbol, final long timestamp) {
book.setLength(0);
book.append("Book: ").append(symbol).append("{\n");
}

@Override
public void onClearBids() {
book.append("\tClear bids\n");
}

@Override
public void bid(final long price, final long size) {
book.append("\tBid: ")
.append(Decimal64Utils.toString(price)).append('/')
.append(Decimal64Utils.toString(size)).append("\n");
}

@Override
public void onClearAsks() {
book.append("\tClear asks\n");
}

@Override
public void ask(final long price, final long size) {
book.append("\tAsk: ")
.append(Decimal64Utils.toString(price)).append('/')
.append(Decimal64Utils.toString(size)).append("\n");
}

@Override
public void onSnapshotFinished() {
book.append("}\n");
System.out.println(book);
}
};

final TradeChannelListener tradeListener = new TradeChannelListener() {
@Override
public void onTrade(final CharSequence symbol, final long timestamp, final long price, final long size) {
System.out.println("Trade: " + symbol + ' '
+ Decimal64Utils.toString(price) + '/' + Decimal64Utils.toString(size));
}
};

try (WsFeedClient feedClient = WsFeedClient.builder()
.withUrl("ws://127.0.0.1:8080/websocket/v1")
.withReadTimeout(10_000)
.withListener(new WsFeedClient.Listener() {
@Override
public void onConnected() {
System.out.println("Connected");
}

@Override
public void onDisconnected() {
System.out.println("Disconnected");
}
}).build()) {

feedClient.start();

final SnapshotChannel snapshotChannel =
feedClient.channels().openSnapshot(snapshotListener);

snapshotChannel.subscribe("FVM1");
snapshotChannel.subscribe("FVU1");

//...

snapshotChannel.unsubscribe("FVM1");

//...

final TradeChannel tradeChannel =
feedClient.channels().openTrade(tradeListener);

tradeChannel.subscribe("FVM1");

//...
}
}

Deployment example

Specify service version in .env:

WSFEED_VERSION=5.6.10

Add the following snippet to your docker-compose.yml:

  wsfeed:
image: "registry.deltixhub.com/deltix.docker/pricehub/deltix-wsfeed:${WSFEED_VERSION}"
command: ["-i0.0.0.0", "-p8090", "-tdxtick://timebase:8011", "-sCOINBASE", "-e1000", "-z10", "-k60"]
depends_on:
timebase:
condition: service_healthy
restart: unless-stopped
init: true
ports:
- "8090:8090"
- "8081:8081"

Here we assume that docker composes use bridge networking and Here we assume that docker composes use bridge networking and TimeBase is running in the same compose on port 8011 (under "timebase" name). We also assume that market data aggregator is configured to write COINBASE market data into TimeBase. is running in the same compose on port 8011 (under "timebase" name). We also assume that market data aggregator is configured to write COINBASE market data into TimeBase.