forked from dolphindb/swordfish
-
Notifications
You must be signed in to change notification settings - Fork 0
/
subscribeTest.dos
48 lines (41 loc) · 1.65 KB
/
subscribeTest.dos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 算子
def sum_diff(x, y){
return (x-y)/(x+y)
}
@state
def factor1(price) {
a = ema(price, 20)
b = ema(price, 40)
c = 1000 * sum_diff(a, b)
return ema(c, 10) - ema(c, 20)
}
// 创建共享流表
def createStreamTable(streamTableName=`outStream) {
share(streamTable(1000:0, `sym`time`factor1, [STRING, TIMESTAMP, DOUBLE]), streamTableName)
}
// 创建引擎
def createEngine(rseName="reactiveDemo", streamTableName=`outStream) {
inputSchema = table(1:0, `time`sym`price, [TIMESTAMP,STRING,DOUBLE])
return createReactiveStateEngine(name=rseName, metrics=[<time>,<factor1(price)>], dummyTable=inputSchema, outputTable=objByName(streamTableName), keyColumn="sym")
}
// 建立流表订阅
def subscribeStreamTable(outputTableName, streamTableName=`outStream, subActName="streamToOLTP") {
subscribeTable(tableName=streamTableName, actionName=subActName, handler=tableInsert{objByName(outputTableName)}, msgAsTable=true)
}
// 向引擎输入数据
def produceData(n, rseName="reactiveDemo") {
rse = getStreamEngine(rseName)
rse.tableInsert(table((now() + (1..n)) as time, take("000001.SH", n) as sym, rand(10.0, n) as price))
}
// 清理环境 (释放流数据引擎、共享流表和流表订阅)
def cleanEnvironment(streamTableName=`outStream, subActName="streamToOLTP", rseName="reactiveDemo") {
unsubscribeTable(tableName=streamTableName, actionName=subActName)
undef(streamTableName,SHARED)
dropStreamEngine(rseName)
}
// output_oltp = table(100:0, `sym`time`factor1, [STRING, TIMESTAMP, DOUBLE])
// createStreamTable()
// createEngine()
// subscribeStreamTable(`output_oltp)
// produceData(100)
// cleanEnvironment()