Skip to content

Commit

Permalink
v1.0 Release
Browse files Browse the repository at this point in the history
  • Loading branch information
micli committed Oct 3, 2020
1 parent eb1b4e0 commit c0500a1
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 96 deletions.
69 changes: 68 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,69 @@
# TDengine
![catfish logo](docs/images/catfish-logo.png)

# TDengine Extension of HiveMQ (Codename: Catfish)

This is a HiveMQ extension that supports data save to TDEngine.

## Deployment

1. Please download hivemq-tdengine-extension-1.0-distribution.zip from releases.
2. Unzip the file: hivemq-tdengine-extension-1.0-distribution.zip on local folder.
3. Put the whole folder into {HIVEMQHOME}/extensions/ as below:
![extensions folder layout](docs/images/extension-folder.png)
4. Create a configuration file named "TDengine.properties" under hivemq-tdengine-extension. The template as below:

TDengine.properties
```shell
host: {TDengine Server IP/hosename}
port:6041
user: {TDengine account}
password:{password}

prefix:mqtt_msg
database:testdb

reportingInterval:1
connectTimeout:5000
```
5. start HiveMQ service by {HIVEMQHOME}/bin/run.sh

Notes:
****
It doesen't need to create any database or tables in TDengine service. Just specify database name and table name by database and prefix properties in TDengine.properties. These database objects will created automatically.
****

## Enable Firewall Rules

The TDengine extension needs to access TDengine REST APIs which is locate on TDengine services with port 6041. Please configure firewall to enable outbound rule to communicate to 6041.

By default, the HiveMQ service exposes 1883 to recevied MQTT traffic, please enable it either.

## Tuning I/O Performance

There are two ways to extend I/O performance:
+ Separate database into multiple files.
+ Extends REST APIs serve threads number.

### Separate database into multiple files

You can specified how much time range into a database files when Create database. The syntax as below:

```sql
CREATE DATABASE demo DAYS 10 CACHE 16000 ROWS 2000
```
+ days: number of days to cover for a data file
+ keep: number of days to keep the data
+ rows: number of rows of records in a block in data file.

As an administrator, you can specify days, keep, rows parameters to get best performance when create database.

Please don't worry about TDengine exetsion overrwites these configurations. Because TDengine extension use "CREATE DATABASE IF NOT EXISTS " clause. It won't affect existing database.

### Extends REST APIs serve threads number

The TDengine extension uses REST APIs to communicate to TDengine service. By default, there are only 2 threads serve HTTP traffic. If you have a heavy MQTT traffic, please add the number of httMaxThreads.

![taos.cfg](docs/images/tsos.cfg.png)

For details, please review:
[TDengine Documentation](https://www.taosdata.com/en/documentation/administrator/#Configuration-on-Server)
Binary file added docs/images/catfish-logo.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/extension-folder.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/tsos.cfg.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion hivemq-tdengine-extension/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

*Type*: Data Persistent Extension

*Version*: 0.1
*Version*: 1.0

*License*: MIT

Expand Down
38 changes: 23 additions & 15 deletions hivemq-tdengine-extension/pom.xml
Original file line number Diff line number Diff line change
@@ -1,27 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2018-present HiveMQ GmbH
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
~ MIT License
~ Copyright (c) 2020 Michael Li
~
~ Permission is hereby granted, free of charge, to any person obtaining a copy
~ of this software and associated documentation files (the "Software"), to deal
~ in the Software without restriction, including without limitation the rights
~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
~ copies of the Software, and to permit persons to whom the Software is
~ furnished to do so, subject to the following conditions:
~
~ The above copyright notice and this permission notice shall be included in all
~ copies or substantial portions of the Software.
~
~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
~ SOFTWARE.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.github.micli.TDengine</groupId>
<artifactId>hivemq-tdengine-extension</artifactId>
<version>0.1</version>
<version>1.0</version>

<description>HiveMQ 4 Catfith Extension - an extension that supports save MQTT data to TDengine database.</description>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class TDengineHttpClient {
private final String defaultEncode = "utf-8";

private RequestConfig defaultRequestConfig = null;
private static final Logger log = LoggerFactory.getLogger(TDengineMain.class);
private static final Logger log = LoggerFactory.getLogger(TDengineHttpClient.class);
private HashMap<String, String> deviceMap = new HashMap<String, String>();

public TDengineHttpClient(final String host, final int port, final String username, final String password,
Expand Down Expand Up @@ -118,15 +118,16 @@ public String getAccessToken() throws Exception {
final CloseableHttpResponse response = client.execute(httpGet);
// Retrieve status code from response line.
final int statusCode = response.getStatusLine().getStatusCode();
final HttpEntity entity = response.getEntity();
final String result = EntityUtils.toString(entity, defaultEncode);
if (statusCode == 200) {
final HttpEntity entity = response.getEntity();
final String result = EntityUtils.toString(entity, defaultEncode);
final TDengineAuthResult authResult = JSON.parseObject(result, TDengineAuthResult.class);
if (authResult.getStatus().equals("succ")) {
return "Taosd " + authResult.getDesc();
}
}
} catch (final Exception e) {
log.error("getAccessToken() error: {}", e);
return "";
}
return "";
Expand Down Expand Up @@ -160,7 +161,7 @@ public void WriteData(String msgid, String deviceId, String topic, int qos, Stri
database, tableName, timeString, msgid, topic, qos, payLoad);
executeSQL(sqlInsert);
} catch (Exception ex) {

log.error("writeData error: {}", ex);
}

}
Expand All @@ -174,10 +175,12 @@ public void changeToCurrentDatabase() throws Exception {
executeSQL(sqlUseDb);
}
private boolean tableExists(final String tableName) throws Exception {
final String sqlTableExists = String.format(TDengineSQLCmdList.getDescTable(), tableName);
final String sqlTableExists = String.format(TDengineSQLCmdList.getGetSubTable(), database, tableName);
String result = executeSQL(sqlTableExists);
if(result.length() > 0) {
TDengineQueryResult tdResult = TDengineQueryResult.GetResult(result);
if(!tdResult.getStatus().equals("succ"))
return false;
if(null != tdResult) {
return tdResult.getRows() > 0 ? true : false;
}
Expand Down Expand Up @@ -227,8 +230,10 @@ public int loadDevices() throws Exception {
*/
public String executeSQL(final String sqlStatement) throws Exception {

if (this.getAuthToken().equals("") || sqlStatement.equals(""))
if (this.getAuthToken().equals("") || sqlStatement.equals("")) {
log.warn("Access token is empty. Any actions will be failed!");
return "";
}

final URI uri = new URIBuilder().setScheme("http").setHost(serverHost + ":" + String.valueOf(serverPort))
.setPath("/rest/sql").build();
Expand All @@ -245,9 +250,10 @@ public String executeSQL(final String sqlStatement) throws Exception {
final CloseableHttpResponse response = client.execute(thePost);
// Retrieve status code from response line.
final int statusCode = response.getStatusLine().getStatusCode();
final HttpEntity entity = response.getEntity();
final String result = EntityUtils.toString(entity, defaultEncode);
log.info("Execute SQL statement: {} Result: {}", sqlStatement, result);
if (200 == statusCode) {
final HttpEntity entity = response.getEntity();
final String result = EntityUtils.toString(entity, defaultEncode);
return result;
}
} catch (final Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
host:40.73.37.164
host: {TDengine Server IP/hosename}
port:6041
user:test
password:123456
user: {TDengine account}
password:{password}

prefix:mqtt_msg
database:testdb
Expand Down

This file was deleted.

0 comments on commit c0500a1

Please sign in to comment.