In DolphinDB, we can import historical data into a stream table in chronological order as "real-time data" so that the same script can be used both for backtesting and real-time trading. Regarding streaming in DolphinDB please refer to DolphinDB Streaming Tutorial.
This article introduces functions replay
and replayDS
and then demonstrates the process of data replaying.
replay(inputTables, outputTables, [dateColumn], [timeColumn], [replayRate], [absoluteRate=true], [parallelLevel=1])
Function replay
injects data from specified tables or data sources into stream tables.
-
'inputTables' is a table or a tuple. Each element of the tuple is an unpartitioned table or a data source generated by function
replayDS
. -
'outputTables' is a table or a tuple of tables, or a string or a string vector. The number of elements of outputTables must be the same as the number of elements of inputTables. If it is a vector, it is a list of the names of the shared stream tables where the replayed data of the corresponding tables of inputTables are saved. If it is a tuple, each element is a shared stream table where the replayed data of the corresponding table in inputTables are saved. The schema of each table in outputTables must be identical as the schema of the corresponding table in inputTables.
-
'dateColumn' and 'timeColumn' are strings indicating the date column and time column in inputTables. If neither is specified, the first column of the table is chosen as 'dateColumn'. If there is a 'dateColumn', it must be one of the partitioning columns. If only 'timeColumn' is specified, it must be one of the partitioning columns. If information about date and time comes from the same column (e.g., DATETIME, TIMESTAMP), use the same column for both 'dateColumn' and 'timeColumn'. Data are replayed in batches determined by the smallest unit of time in 'timeColumn' or 'dateColumn' if 'timeColumn' is not specified. For examples, if the smallest unit of time in 'timeColumn' is second then all data in the same second are replayed in the same batch; if 'timeColumn' is not specified, then all data in the same day are replayed in the same batch.
-
'replayRate' is a nonnegative integer indicating the number of rows to be replayed per second. If it is not specified, it means data are replayed at the maximum speed.
-
'replayRate' is an integer.
-
'absoluteRate' is a Boolean value. The default value is true.
Regarding 'replayRate' and 'absoluteRate':
(1) If 'replayRate' is a positive integer and absoluteRate=true, replay at the speed of 'replayRate' rows per second.
(2) If 'replayRate' is a positive integer and absoluteRate=false, replay at 'replayRate' times the original speed of the data. For example, if the difference between the maximum and the minimum values of 'dateColumn' or 'timeColumn' is n seconds, then it takes n/replayRate seconds to finish the replay.
(3) If 'replayRate' is unspecified or negative, replay at the maximum speed.
- 'parallelLevel' is a positive integer. When the size of individual partitions in the data sources is too large relative to memory size, we need to use function
replayDS
to further divide individual partitions into smaller data sources. 'parallelLevel' indicates the number of threads loading data into memory from these smaller data sources simultaneously. The default value is 1. If 'inputTables' is a table or a tuple of tables, the effective 'parallelLevel' is always 1.
replayDS(sqlObj, [dateColumn], [timeColumn], [timeRepartitionSchema])
Function replayDS
generates a group of data sources to be used as the inputs of function replay
. It splits a SQL query into multiple subqueries based on 'timeRepartitionSchema' with 'timeColumn' within each 'dateColumn' partition.
-
'sqlObj' is a table or metacode with SQL statements (such as <select * from sourceTable>) indicating the data to be replayed. The table object of "select from" must use a DATE type column as one of the partitioning columns.
-
'dateColumn' and 'timeColumn' are strings indicating the date column and time column. If neither is specified, the first column of the table is chosen as 'dateColumn'. If there is a 'dateColumn', it must be one of the partitioning columns. If only 'timeColumn' is specified, it must be one of the partitioning columns. If information about date and time comes from the same column (e.g., DATETIME, TIMESTAMP), use the same column for both 'dateColumn' and 'timeColumn'. Function
replayDS
and the corresponding functionreplay
must use the same set of 'dateColumn' and 'timeColumn'. -
'timeRepartitionSchema' is a TIME or NANOTIME type vector. 'timeRepartitionSchema' deliminates multiple data sources on the dimension of 'timeColumn' within each 'dateColumn' partition. For example, if timeRepartitionSchema=[t1, t2, t3], then there are 4 data sources within a day: [00:00:00.000,t1), [t1,t2), [t2,t3) and [t3,23:59:59.999).
replay(inputTable, outputTable, `date, `time, 10)
To replay a single table with a large number of rows, we can use function replayDS
together with function replay. Function
replayDSdeliminates multiple data sources on the dimension of 'timeColumn' within each 'dateColumn' partition. Parameter 'parallelLevel' of function
replay` specifies the number of threads loading data into memory from these smaller data sources simultaneously. In this example, 'parallelLevel' is set to 2.
inputDS = replayDS(<select * from inputTable>, `date, `time, 08:00:00.000 + (1..10) * 3600000)
replay(inputDS, outputTable, `date, `time, 1000, true, 2)
To replay multiple tables simultaneously, assign a tuple of these table names to parameter 'inputTables' of function replay
and specify the output tables. Each of the output tables corresponds to an input table and should have the same schema as the corresponding input table. All input tables should have identical 'dateColumn' and 'timeColumn'.
ds1 = replayDS(<select * from input1>, `date, `time, 08:00:00.000 + (1..10) * 3600000)
ds2 = replayDS(<select * from input2>, `date, `time, 08:00:00.000 + (1..10) * 3600000)
ds3 = replayDS(<select * from input3>, `date, `time, 08:00:00.000 + (1..10) * 3600000)
replay([ds1, ds2, ds3], [out1, out2, out3], `date, `time, 1000, true, 2)
If function replay
was called with submitJob
, we can use getRecentJobs
to get jobId, then cancel the replay with command cancelJob
.
getRecentJobs()
cancelJob(jobid)
If function replay
was called directly, we can use getConsoleJobs
in another GUI session to get jobId, then cancel the replay use command cancelConsoleJob
.
getConsoleJobs()
cancelConsoleJob(jobId)
Replayed data are streaming data. We can subscribe to and process the replayed data in the following 3 ways:
- Subscribe in DolphinDB. Write user-defined functions in DolphinDB to process streaming data.
- Subscribe in DolphinDB. To conduct real-time calculations with streaming data, use DolphinDB's built-in streaming aggregators such as time-series aggregator, cross-sectional aggregator and anomaly detection engine. They are very easy to use and have excellent performance. In section 3.2, we use a cross-sectional aggregator to calculate the intrinsic value of an ETF.
- With third-party client through DolphinDB's streaming API.
In this example, we replay the level 1 stock quotes in US stock markets on 2007/08/17, and calculate the intrinsic value of an ETF with the built-in cross-sectional aggregator in DolphinDB. The following are the schema of the input table 'quotes' and a preview of the data.
quotes = database("dfs://TAQ").loadTable("quotes");
quotes.schema().colDefs;
name | typeString | typeInt |
---|---|---|
time | SECOND | 10 |
symbol | SYMBOL | 17 |
ofrsiz | INT | 4 |
ofr | DOUBLE | 16 |
mode | INT | 4 |
mmid | SYMBOL | 17 |
ex | CHAR | 2 |
date | DATE | 6 |
bidsize | INT | 4 |
bid | DOUBLE | 16 |
select top 10 * from quotes where date=2007.08.17
symbol | date | time | bid | ofr | bidsiz | ofrsiz | mode | ex | mmid |
---|---|---|---|---|---|---|---|---|---|
A | 2007.08.17 | 04:15:06 | 0.01 | 0 | 10 | 0 | 12 | 80 | |
A | 2007.08.17 | 06:21:16 | 1 | 0 | 1 | 0 | 12 | 80 | |
A | 2007.08.17 | 06:21:44 | 0.01 | 0 | 10 | 0 | 12 | 80 | |
A | 2007.08.17 | 06:49:02 | 32.03 | 0 | 1 | 0 | 12 | 80 | |
A | 2007.08.17 | 06:49:02 | 32.03 | 32.78 | 1 | 1 | 12 | 80 | |
A | 2007.08.17 | 07:02:01 | 18.5 | 0 | 1 | 0 | 12 | 84 | |
A | 2007.08.17 | 07:02:01 | 18.5 | 45.25 | 1 | 1 | 12 | 84 | |
A | 2007.08.17 | 07:54:55 | 31.9 | 45.25 | 3 | 1 | 12 | 84 | |
A | 2007.08.17 | 08:00:00 | 31.9 | 40 | 3 | 2 | 12 | 84 | |
A | 2007.08.17 | 08:00:00 | 31.9 | 35.5 | 3 | 2 | 12 | 84 |
(1) To replay a large amount of data, if we load all data into memory first, we may have an out-of-memory problem. We can first use function replayDS
and specify parameter 'timeRepartitionSchema' to divide the data into 60 parts based on the column 'time'.
trs = cutPoints(09:30:00.000..16:00:00.000, 60)
rds = replayDS(<select * from quotes where date=2007.08.17>, `date, `time, trs);
(2) Define the output stream table 'outQuotes'.
sch = select name,typeString as type from quotes.schema().colDefs
share streamTable(100:0, sch.name, sch.type) as outQuotes
(3) Define a dictionary for the ETF components weights and function etfVal
to calculate ETF intrinsic value. For simplicity we use an ETF with only 6 component stocks.
defg etfVal(weights,sym, price) {
return wsum(price, weights[sym])
}
weights = dict(STRING, DOUBLE)
weights[`AAPL] = 0.1
weights[`IBM] = 0.1
weights[`MSFT] = 0.1
weights[`NTES] = 0.1
weights[`AMZN] = 0.1
weights[`GOOG] = 0.5
(4) Define a streaming aggregator to subscribe to the output stream table 'outQuotes'. We specify a filtering condition for the subscription that only data with stock symbols of AAPL, IBM, MSFT, NTES, AMZN or GOOG are published to the aggregator. This significantly reduces unnecessary network overhead and data transfer.
setStreamTableFilterColumn(outQuotes, `symbol)
outputTable = table(1:0, `time`etf, [TIMESTAMP,DOUBLE])
tradesCrossAggregator=createCrossSectionalAggregator("etfvalue", <[etfVal{weights}(symbol, ofr)]>, quotes, outputTable, `symbol, `perBatch)
subscribeTable(tableName="outQuotes", actionName="tradesCrossAggregator", offset=-1, handler=append!{tradesCrossAggregator}, msgAsTable=true, filter=`AAPL`IBM`MSFT`NTES`AMZN`GOOG)
(5) Start to replay data at the specified speed of 100,000 rows per second. The streaming aggregator conducts real-time calculation with the replayed data.
submitJob("replay_quotes", "replay_quotes_stream", replay, [rds], [`outQuotes], `date, `time, 100000, true, 4)
(6) Check ETF intrinsic values
select top 15 * from outputTable
time | etf |
---|---|
2019.06.04T16:40:18.476 | 14.749 |
2019.06.04T16:40:19.476 | 14.749 |
2019.06.04T16:40:20.477 | 14.749 |
2019.06.04T16:40:21.477 | 22.059 |
2019.06.04T16:40:22.477 | 22.059 |
2019.06.04T16:40:23.477 | 34.049 |
2019.06.04T16:40:24.477 | 34.049 |
2019.06.04T16:40:25.477 | 284.214 |
2019.06.04T16:40:26.477 | 284.214 |
2019.06.04T16:40:27.477 | 285.68 |
2019.06.04T16:40:28.477 | 285.68 |
2019.06.04T16:40:29.478 | 285.51 |
2019.06.04T16:40:30.478 | 285.51 |
2019.06.04T16:40:31.478 | 285.51 |
2019.06.04T16:40:32.478 | 285.51 |
We tested data replaying in DolphinDB on a server with the following configuration:
- Server: DELL PowerEdge R730xd
- CPU: Intel Xeon(R) CPU E5-2650 v4(24cores, 48 threads, 2.20GHz)
- RAM: 512 GB (32GB × 16, 2666 MHz)
- Harddisk: 17T HDD (1.7T × 10, read speed 222 MB/s, write speed 210 MB/s)
- Network: 10 Gigabit Ethernet
DolphinDB script:
sch = select name,typeString as type from quotes.schema().colDefs
trs = cutPoints(09:30:00.000..16:00:00.001,60)
rds = replayDS(<select * from quotes where date=2007.08.17>, `date, `time, trs);
share streamTable(100:0, sch.name, sch.type) as outQuotes1
jobid = submitJob("replay_quotes","replay_quotes_stream", replay, [rds], [`outQuotes1], `date, `time, , ,4)
When replaying at maximum speed (parameter 'replayRate' is not specified) and the output table is not subscribed, it only takes about 100 seconds to replay 336,305,414 rows of data.