ZeroMQ (also known zmq) is an asynchronous messaging library, aimed at use in distributed or concurrent applications.
Through the DolphinDB ZeroMQ plugin, users can create zmq sockets and communicate via zmq messages, which includes session establishment, message publish-subscribe and transmission.
The DolphinDB ZeroMQ plugin has the branches release 200 and release130. Each plugin version corresponds to a DolphinDB server version. If you use a different DolphinDB server version, refer to the corresponding branch of the plugin documentation.
- ZeroMQ Plugin for DolphinDB
Linux
The precompiled binaries are stored in the directory DolphinDBPlugin/zmq/bin/linux64. You can execute the following command to load the plugin in DolphinDB:
cd DolphinDB/server //Change to the directory of DolphinDB server
./dolphindb //Start DolphinDB server
loadPlugin("<PluginDir>/zmq/bin/linux64/PluginZmq.txt") //Load the plugin
You can also manually compile a zmq plugin following the instructions:
(1) compile libzmq
Download libzmq-4.3.4
cd libzmq-4.3.4
cp include/zmq.h /path/to/PluginZmq/bin/include/
mkdir build && cd build
cmake ..
make -j8
cp lib/libzmq.a /path/to/PluginZmq/bin/linux64/
(2) obtain the header file of cppzmq
Download cppzmq-4.7.1
cd cppzmq-4.7.1
cp zmq.hpp /path/to/PluginZmq/bin/include/
(3) build a plugin
mkdir build
cd build
cmake ../
make
Note: Make sure the file libDolphinDB.so is under the GCC search path before compilation. You can add the plugin path to the library search path LD_LIBRARY_PATH
or copy it to the build directory.
libPluginZmq.so and PluginZmq.txt are generated under the working directory after successful compilation.
Syntax
zmq::socket(type, formatter, [batchSize], [prefix])
Parameters
- type: a STRING scalar indicating the socket type to be created. It can be “ZMQ_PUB” and “ZMQ_PUSH”.
- formatter: a function used to package published data in a specified format. Currently it supports methods
createJsonFormatter
andcreateCsvFormatter
. Alternatively, you can define a formatter function which takes data of methodzmq::send
as the argument. - batchSize: an integer indicating the number of messages sent each time. For a table to be published, it can be sent in batches.
- prefix: a STRING scalar indicating the message prefix.
Details
Create a zmq socket.
Note: When using methods connect
, bind
, send
and close
for concurrent operations, different zmq sockets must be constructed for different threads.
Example
formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
Syntax
zmq::connect(socket, addr, [prefix])
Parameters
- socket: a zmq socket.
- addr: the address string in the form of "protocol://interface:port", indicating the remote address to be connected to. "protocol" is the underlying transport protocol to use, including tcp, ipc, inproc, and epgm. "interface:port" is the remote IP address and port number.
- prefix: a STRING scalar indicating the message prefix.
Details
Use socket to establish connections to zmq. Keepalive is enabled after the tcp connection is set so that it can be automatically connected.
Example
formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::connect(socket, "tcp://localhost:55632", "prefix1")
Syntax
zmq::bind(socket, addr, [prefix])
Parameters
- socket: a zmq socket.
- addr: the address string in the form of "protocol://interface:port", indicating the remote address to be connected to. "protocol" is the underlying transport protocol to use, including tcp, ipc, inproc, and epgm. "interface:port" is the remote IP address and port number.
- prefix: a STRING scalar indicating the message prefix.
Details
Bind a socket to a specific address to accept incoming requests.
Example
formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::bind(socket, "tcp://*:55631", "prefix1")
Syntax
zmq::send(socket, data, [prefix])
Parameters
- socket: a zmq socket.
- data: the data to be sent. Its data type must match the argument passing to formatter of method
zmq::socket
. Otherwise, the formatting will fail and an exception will be thrown. - prefix: a STRING scalar indicating the message prefix.
Details
Send a zmq message. Return true if successful.
Example
formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::connect(socket, "tcp://localhost:55632", "prefix1")
zmq::send(socket, table(1..10 as id))
Syntax
zmq::close(socket)
Details
Close a zmq socket.
Example
formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::connect(socket, "tcp://localhost:55632", "prefix1")
zmq::close(socket)
Syntax
zmq::createSubJob(addr, type, isConnnect, handle, parser, [prefix])
Details
Create a zmq subscription. The subscription will automatically reconnect after network failures.
Parameters
- addr: the address string in the form of "protocol://interface:port", indicating the remote address to be connected to. "protocol" is the underlying transport protocol to use, including tcp, ipc, inproc, and epgm. "interface:port" is the remote IP address and port number.
- type: a STRING indicating the socket type to be created. It can be “ZMQ_SUB” and “ZMQ_PULL”.
- isConnnect: a Boolean value indicating whether to connect to addr. If false the addr is binded.
- handle: a function or a table used to handle messages sent from zmq.
- parser: is a function for parsing subscribed messages. Currently supported functions are
createJsonParser
andcreateCsvParser
. It takes a STRING scalar as input and outputs a table. - prefix: a STRING indicating the message prefix.
Example
handle = streamTable(10:0, [`int], [INT])
enableTableShareAndPersistence(table=handle, tableName=`test1, asynWrite=true, compress=true, cacheSize=10000000, retentionMinutes=120)
parser = zmq::createJSONParser([INT], [`bool])
zmq::createSubJob("tcp://localhost:55633", "ZMQ_SUB", true, handle, parser, "prefix1")
You can use it with a Python script:
import zmq
import time
import sys
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:55633")
msg = '[{"bool":234}]'
while True:
socket.send(msg.encode('utf-8'))
time.sleep(2)
Syntax
zmq::getSubJobStat()
Details
Get all zmq subscription messages.
Return a table with the following columns:
- subscriptionId: the subscription ID.
- addr: the subscription address.
- prefix: the message prefix.
- recvPackets: the number of messages received.
- createTimestamp: the timestamp when the subscription is created.
Example
handle = streamTable(10:0, [`int], [INT])
enableTableShareAndPersistence(table=handle, tableName=`test1, asynWrite=true, compress=true, cacheSize=10000000, retentionMinutes=120)
parser = zmq::createJSONParser([INT], [`bool])
zmq::createSubJob("tcp://localhost:55633", "ZMQ_SUB", handle, parser, "prefix1")
zmq::getSubJobStat()
Syntax
zmq::cancelSubJob(subscription)
Parameters
- subscription: is the value returned by
createSubJob
, or the subscriptionId returned bygetJobStat
.
Details
Cancel a zmq subscription.
Example
zmq::cancelSubJob(sub1)
zmq::cancelSubJob(42070480)
Syntax
zmq::zmqCreatepusher(socket, dummyTable)
Parameters
- socket: a zmq socket.
- dummyTable: a table object which receives the input messages.
Details
Create a zmq pusher. The plugin offers 2 ways to send messages to the pusher to forward the messages:
- Append data to the pusher with method
append!
; - Ingest data from the output table of a streaming engine to the pusher.
Example
share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
output1 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::connect(socket, "tcp://localhost:55632")
pusher = zmq::createPusher(socket, output1)
engine1 = createTimeSeriesEngine(name="engine1", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=pusher, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=false)
subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);
insert into trades values(2018.10.08T01:01:01.785,`A,10)
insert into trades values(2018.10.08T01:01:02.125,`B,26)
insert into trades values(2018.10.08T01:01:10.263,`B,14)
insert into trades values(2018.10.08T01:01:12.457,`A,28)
insert into trades values(2018.10.08T01:02:10.789,`A,15)
insert into trades values(2018.10.08T01:02:12.005,`B,9)
insert into trades values(2018.10.08T01:02:30.021,`A,10)
insert into trades values(2018.10.08T01:04:02.236,`A,29)
insert into trades values(2018.10.08T01:04:04.412,`B,32)
insert into trades values(2018.10.08T01:04:05.152,`B,23)
Syntax
zmq::createCSVFormatter([format], [delimiter=','], [rowDelimiter=';'])
Parameters
- format: a vector of STRING type.
- delimiter: the delimiter between columns, the default is ','.
- rowDelimiter: the delimiter between rows, the default is ';'.
Details
Create a Formatter function in CSV format.
Example
MyFormat = take("", 5)
MyFormat[2] = "0.000"
f = createCSVFormatter(MyFormat, ',', ';')
Syntax
zmq::createCSVParser(schema, [delimiter=','], [rowDelimiter=';'])
Parameters
- schema: a vector indicating the data type of each column.
- delimiter: the delimiter between columns, the default is ','.
- rowDelimiter: the delimiter between rows, the default is ';'.
Details
Create a Parser function in CSV format.
Example
def createT(n) {
return table(take([false, true], n) as bool, take('a'..'z', n) as char, take(short(-5..5), n) as short, take(-5..5, n) as int, take(-5..5, n) as long, take(2001.01.01..2010.01.01, n) as date, take(2001.01M..2010.01M, n) as month, take(time(now()), n) as time, take(minute(now()), n) as minute, take(second(now()), n) as second, take(datetime(now()), n) as datetime, take(now(), n) as timestamp, take(nanotime(now()), n) as nanotime, take(nanotimestamp(now()), n) as nanotimestamp, take(3.1415, n) as float, take(3.1415, n) as double, take(`AAPL`IBM, n) as string, take(`AAPL`IBM, n) as symbol)
}
t = createT(100)
f = zmq::createCSVFormatter([BOOL,CHAR,SHORT,INT,LONG,DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP,FLOAT,DOUBLE,STRING,SYMBOL])
s=f(t)
p = zmq::createCSVParser([BOOL,CHAR,SHORT,INT,LONG,DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP,FLOAT,DOUBLE,STRING,SYMBOL])
p(s)
Syntax
zmq::createJSONFormatter()
Details
Create a Formatter function in JSON format
Example
def createT(n) {
return table(take([false, true], n) as bool, take('a'..'z', n) as char, take(short(-5..5), n) as short, take(-5..5, n) as int, take(-5..5, n) as long, take(2001.01.01..2010.01.01, n) as date, take(2001.01M..2010.01M, n) as month, take(time(now()), n) as time, take(minute(now()), n) as minute, take(second(now()), n) as second, take(datetime(now()), n) as datetime, take(now(), n) as timestamp, take(nanotime(now()), n) as nanotime, take(nanotimestamp(now()), n) as nanotimestamp, take(3.1415, n) as float, take(3.1415, n) as double, take(`AAPL`IBM, n) as string, take(`AAPL`IBM, n) as symbol)
}
t = createT(100)
f = zmq::createJSONFormatter()
f(t)
Syntax
zmq::createJSONParser(schema, colNames)
Parameters
- schema: a vector indicating the data type of each column.
- colNames: a vector indicating the name of each column.
Details
Create a Parser function in JSON format.
Example
def createT(n) {
return table(take([false, true], n) as bool, take('a'..'z', n) as char, take(short(-5..5), n) as short, take(-5..5, n) as int, take(-5..5, n) as long, take(2001.01.01..2010.01.01, n) as date, take(2001.01M..2010.01M, n) as month, take(time(now()), n) as time, take(minute(now()), n) as minute, take(second(now()), n) as second, take(datetime(now()), n) as datetime, take(now(), n) as timestamp, take(nanotime(now()), n) as nanotime, take(nanotimestamp(now()), n) as nanotimestamp, take(3.1415, n) as float, take(3.1415, n) as double, take(`AAPL`IBM, n) as string, take(`AAPL`IBM, n) as symbol)
}
t = createT(100)
f = zmq::createJSONFormatter()
p = createJSONParser([BOOL,CHAR,SHORT,INT,LONG,DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP,FLOAT,DOUBLE,STRING,SYMBOL],
`bool`char`short`int`long`date`month`time`minute`second`datetime`timestamp`nanotime`nanotimestamp`float`double`string`symbol)
s=f(t)
x=p(s)
loadPlugin("/home/zmx/worker/DolphinDBPlugin/zmq/cmake-build-debug/PluginZmq.txt")
go
formatter = zmq::createJSONFormatter()
socket = zmq::socket("ZMQ_PUB", formatter)
zmq::bind(socket, "tcp://localhost:55632")
data = table(1..10 as id, take(now(), 10) as ts, rand(10, 10) as volume)
zmq::send(socket, data)
You can use it with a Python script:
import zmq
from zmq.sugar import socket
import json
if __name__=='__main__':
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.TCP_KEEPALIVE, 1);
socket.setsockopt(zmq.TCP_KEEPALIVE_IDLE, 30);
socket.setsockopt(zmq.TCP_KEEPALIVE_INTVL, 1);
socket.setsockopt(zmq.TCP_KEEPALIVE_CNT, 5);
socket.connect("tcp://192.168.0.48:55632")
zip_filter = ""
socket.setsockopt(zmq.SUBSCRIBE, zip_filter.encode('ascii'))
while True:
recvStr = socket.recv()
print (recvStr)