Skip to content

Commit

Permalink
Merge pull request #7 from program-0/release-1.0.0
Browse files Browse the repository at this point in the history
add the mqtt source and sink, fix the udf issue and the window size p…
  • Loading branch information
j-ching authored Jan 5, 2022
2 parents 1a3bd73 + dbd2335 commit 650c452
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 0 deletions.
28 changes: 28 additions & 0 deletions docs/stream_sink/mqtt/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# 语法示例

流计算可以将消息队列作为流式数据输入,如下:

```sql
CREATE TABLE mqtt_result
(
x varchar,
y varchar,
z varchar
) WITH (
type = 'mqtt',
url = 'tcp://ip:port',
clientId = '客户端ID',
topic = 'Topic信息',
username = '用户名',
password = '密码');
```

| 参数名 | 是否必填 | 字段说明 | 默认值 |
|-----------------------|------|---------------------------------------------------------------------------------------|-------|
| type || 固定值,必须是mqtt | |
| url || mqtt broker的地址, 格式为: 协议://IP:port | |
| clientId || 客户度ID | |
| topic || 需要订阅的topic信息 | |
| username || 当mqtt需要进行鉴权时,需要注明username和password参数 | |
| password || 当mqtt需要进行鉴权时,需要注明username和password参数 | |

99 changes: 99 additions & 0 deletions docs/stream_source/mqtt/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
MQTT 协议在边缘计算场景经常被使用, rocketmq-streams支持mqtt协议,也是想边缘计算场景迈进了一步

# 语法示例

流计算可以将消息队列作为流式数据输入,如下:

```sql
CREATE TABLE mqtt_content
(
`content` VARCHAR
) WITH (
type = 'mqtt',
url = 'tcp://ip:port',
clientId = '客户端ID',
topic = 'Topic信息',
username = '用户名',
password = '密码',
isJsonData = '数据格式是否json',
cleanSession = '是否清理session',
connectionTimeout = '连接超时时间',
aliveInterval = '心跳间隔时间',
automaticReconnect = '连接断开时,是否自动重连');
```

| 参数名 | 是否必填 | 字段说明 | 默认值 |
|-----------------------|------|---------------------------------------------------------------------------------------|-------|
| type || 固定值,必须是mqtt | |
| url || mqtt broker的地址, 格式为: 协议://IP:port | |
| clientId || 客户度ID | |
| topic || 需要订阅的topic信息 | |
| username || 当mqtt需要进行鉴权时,需要注明username和password参数 | |
| password || 当mqtt需要进行鉴权时,需要注明username和password参数 | |
| isJsonData || 消息是否是json格式 | true |
| cleanSession || 当客户端连接重新建立时,原来的session信息是否保留,为true则保留,客户端可以从连接断开的时间点继续消费信息,否则则从当前时间点消费,连接断开这段时间的数据将丢失 | true |
| connectionTimeout || 连接超时时间 | 10(s) |
| aliveInterval || 心跳的额间隔时间 | 60(s) |
| automaticReconnect || 当连接异常时,是否自动重连 | true |

# 自定义解析

当消息非json,jsonarray,或分割符分割,需要业务方自己解析时,可以采用如下方式

```sql
CREATE FUNCTION json_array AS 'org.apache.rocketmq.streams.script.function.impl.flatmap.SplitJsonArrayFunction';

CREATE TABLE mqtt_content
(
`content` VARCHAR
) WITH (
type = 'mqtt',
url = 'tcp://ip:port',
clientId = 'test_client_1',
topic = 'usr/Module/DataDistribution/+/+/broadcast/+/+/metric/+/+',
username = 'username',
password = 'password',
isJsonData = 'false',
cleanSession = 'false',
connectionTimeout = '5',
aliveInterval = '30',
automaticReconnect = 'true');

CREATE TABLE mqtt_result
(
window_start TIMESTAMP,
window_end TIMESTAMP,
`AttributeCode` VARCHAR,
`avg_value` double
) WITH (
type = 'print');

CREATE VIEW temp_view_1 AS
SELECT JSON_VALUE(`content`, '$.Data') AS data
FROM mqtt_content;

CREATE VIEW temp_view_2 AS
SELECT AttributeCode,
AttributeId,
Quality,
`Value`,
AttibuteName,
AssetId,
AssetCode,
`Timestamp`
FROM temp_view_1,
LATERAL TABLE(json_array(data, 'AttributeCode', 'AttributeId', 'Quality', 'Value', 'AttibuteName', 'AssetId', 'AssetCode', 'Timestamp')) AS T(AttributeCode, AttributeId, Quality, `Value`, AttibuteName,AssetId, AssetCode, `Timestamp`);

INSERT INTO mqtt_result
SELECT TUMBLE_START(`Timestamp`, INTERVAL '1' MINUTE),
TUMBLE_END(`Timestamp`, INTERVAL '1' MINUTE),
AttributeCode,
AVG(`Value`) AS avg_value
FROM temp_view_2
group by TUMBLE(ts, INTERVAL '1' MINUTE), AttributeCode;


```

- 把isJsonData设置成false,系统会把这个字节数组根据encoding转化成字符串,并赋值给content字段
- 系统提供多种解析函数(类似flink的udtf),包括grok,正则解析等,也可以自定义解析函数,参考自定函数部分,在create view完成字段的解析

0 comments on commit 650c452

Please sign in to comment.