diff --git a/README.md b/README.md index eba6547..408713e 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,69 @@ -# TDengine +![catfish logo](docs/images/catfish-logo.png) + +# TDengine Extension of HiveMQ (Codename: Catfish) + This is a HiveMQ extension that supports data save to TDEngine. + +## Deployment + +1. Please download hivemq-tdengine-extension-1.0-distribution.zip from releases. +2. Unzip the file: hivemq-tdengine-extension-1.0-distribution.zip on local folder. +3. Put the whole folder into {HIVEMQHOME}/extensions/ as below: +![extensions folder layout](docs/images/extension-folder.png) +4. Create a configuration file named "TDengine.properties" under hivemq-tdengine-extension. The template as below: + +TDengine.properties +```shell +host: {TDengine Server IP/hosename} +port:6041 +user: {TDengine account} +password:{password} + +prefix:mqtt_msg +database:testdb + +reportingInterval:1 +connectTimeout:5000 +``` +5. start HiveMQ service by {HIVEMQHOME}/bin/run.sh + +Notes: +**** +It doesen't need to create any database or tables in TDengine service. Just specify database name and table name by database and prefix properties in TDengine.properties. These database objects will created automatically. +**** + +## Enable Firewall Rules + +The TDengine extension needs to access TDengine REST APIs which is locate on TDengine services with port 6041. Please configure firewall to enable outbound rule to communicate to 6041. + +By default, the HiveMQ service exposes 1883 to recevied MQTT traffic, please enable it either. + +## Tuning I/O Performance + +There are two ways to extend I/O performance: ++ Separate database into multiple files. ++ Extends REST APIs serve threads number. + +### Separate database into multiple files + +You can specified how much time range into a database files when Create database. The syntax as below: + +```sql +CREATE DATABASE demo DAYS 10 CACHE 16000 ROWS 2000 +``` ++ days: number of days to cover for a data file ++ keep: number of days to keep the data ++ rows: number of rows of records in a block in data file. + +As an administrator, you can specify days, keep, rows parameters to get best performance when create database. + +Please don't worry about TDengine exetsion overrwites these configurations. Because TDengine extension use "CREATE DATABASE IF NOT EXISTS " clause. It won't affect existing database. + +### Extends REST APIs serve threads number + +The TDengine extension uses REST APIs to communicate to TDengine service. By default, there are only 2 threads serve HTTP traffic. If you have a heavy MQTT traffic, please add the number of httMaxThreads. + +![taos.cfg](docs/images/tsos.cfg.png) + +For details, please review: +[TDengine Documentation](https://www.taosdata.com/en/documentation/administrator/#Configuration-on-Server) diff --git a/docs/images/catfish-logo.png b/docs/images/catfish-logo.png new file mode 100644 index 0000000..19a8f14 Binary files /dev/null and b/docs/images/catfish-logo.png differ diff --git a/docs/images/extension-folder.png b/docs/images/extension-folder.png new file mode 100644 index 0000000..43f79a1 Binary files /dev/null and b/docs/images/extension-folder.png differ diff --git a/docs/images/tsos.cfg.png b/docs/images/tsos.cfg.png new file mode 100644 index 0000000..f0d929d Binary files /dev/null and b/docs/images/tsos.cfg.png differ diff --git a/hivemq-tdengine-extension/README.adoc b/hivemq-tdengine-extension/README.adoc index 14afcb4..743dd2d 100644 --- a/hivemq-tdengine-extension/README.adoc +++ b/hivemq-tdengine-extension/README.adoc @@ -13,7 +13,7 @@ *Type*: Data Persistent Extension -*Version*: 0.1 +*Version*: 1.0 *License*: MIT diff --git a/hivemq-tdengine-extension/pom.xml b/hivemq-tdengine-extension/pom.xml index b089bbe..3ea22a1 100644 --- a/hivemq-tdengine-extension/pom.xml +++ b/hivemq-tdengine-extension/pom.xml @@ -1,19 +1,27 @@ +~ MIT License + +~ Copyright (c) 2020 Michael Li +~ +~ Permission is hereby granted, free of charge, to any person obtaining a copy +~ of this software and associated documentation files (the "Software"), to deal +~ in the Software without restriction, including without limitation the rights +~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +~ copies of the Software, and to permit persons to whom the Software is +~ furnished to do so, subject to the following conditions: +~ +~ The above copyright notice and this permission notice shall be included in all +~ copies or substantial portions of the Software. +~ +~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +~ SOFTWARE. +--> @@ -21,7 +29,7 @@ com.github.micli.TDengine hivemq-tdengine-extension - 0.1 + 1.0 HiveMQ 4 Catfith Extension - an extension that supports save MQTT data to TDengine database. diff --git a/hivemq-tdengine-extension/src/main/java/com/github/micli/catfish/TDengineHttpClient.java b/hivemq-tdengine-extension/src/main/java/com/github/micli/catfish/TDengineHttpClient.java index ed85b91..5c20e41 100644 --- a/hivemq-tdengine-extension/src/main/java/com/github/micli/catfish/TDengineHttpClient.java +++ b/hivemq-tdengine-extension/src/main/java/com/github/micli/catfish/TDengineHttpClient.java @@ -71,7 +71,7 @@ public class TDengineHttpClient { private final String defaultEncode = "utf-8"; private RequestConfig defaultRequestConfig = null; - private static final Logger log = LoggerFactory.getLogger(TDengineMain.class); + private static final Logger log = LoggerFactory.getLogger(TDengineHttpClient.class); private HashMap deviceMap = new HashMap(); public TDengineHttpClient(final String host, final int port, final String username, final String password, @@ -118,15 +118,16 @@ public String getAccessToken() throws Exception { final CloseableHttpResponse response = client.execute(httpGet); // Retrieve status code from response line. final int statusCode = response.getStatusLine().getStatusCode(); + final HttpEntity entity = response.getEntity(); + final String result = EntityUtils.toString(entity, defaultEncode); if (statusCode == 200) { - final HttpEntity entity = response.getEntity(); - final String result = EntityUtils.toString(entity, defaultEncode); final TDengineAuthResult authResult = JSON.parseObject(result, TDengineAuthResult.class); if (authResult.getStatus().equals("succ")) { return "Taosd " + authResult.getDesc(); } } } catch (final Exception e) { + log.error("getAccessToken() error: {}", e); return ""; } return ""; @@ -160,7 +161,7 @@ public void WriteData(String msgid, String deviceId, String topic, int qos, Stri database, tableName, timeString, msgid, topic, qos, payLoad); executeSQL(sqlInsert); } catch (Exception ex) { - + log.error("writeData error: {}", ex); } } @@ -174,10 +175,12 @@ public void changeToCurrentDatabase() throws Exception { executeSQL(sqlUseDb); } private boolean tableExists(final String tableName) throws Exception { - final String sqlTableExists = String.format(TDengineSQLCmdList.getDescTable(), tableName); + final String sqlTableExists = String.format(TDengineSQLCmdList.getGetSubTable(), database, tableName); String result = executeSQL(sqlTableExists); if(result.length() > 0) { TDengineQueryResult tdResult = TDengineQueryResult.GetResult(result); + if(!tdResult.getStatus().equals("succ")) + return false; if(null != tdResult) { return tdResult.getRows() > 0 ? true : false; } @@ -227,8 +230,10 @@ public int loadDevices() throws Exception { */ public String executeSQL(final String sqlStatement) throws Exception { - if (this.getAuthToken().equals("") || sqlStatement.equals("")) + if (this.getAuthToken().equals("") || sqlStatement.equals("")) { + log.warn("Access token is empty. Any actions will be failed!"); return ""; + } final URI uri = new URIBuilder().setScheme("http").setHost(serverHost + ":" + String.valueOf(serverPort)) .setPath("/rest/sql").build(); @@ -245,9 +250,10 @@ public String executeSQL(final String sqlStatement) throws Exception { final CloseableHttpResponse response = client.execute(thePost); // Retrieve status code from response line. final int statusCode = response.getStatusLine().getStatusCode(); + final HttpEntity entity = response.getEntity(); + final String result = EntityUtils.toString(entity, defaultEncode); + log.info("Execute SQL statement: {} Result: {}", sqlStatement, result); if (200 == statusCode) { - final HttpEntity entity = response.getEntity(); - final String result = EntityUtils.toString(entity, defaultEncode); return result; } } catch (final Exception ex) { diff --git a/hivemq-tdengine-extension/src/main/resources/TDengine.properties b/hivemq-tdengine-extension/src/main/resources/TDengine.properties index 67ccfa4..386a5f1 100644 --- a/hivemq-tdengine-extension/src/main/resources/TDengine.properties +++ b/hivemq-tdengine-extension/src/main/resources/TDengine.properties @@ -1,7 +1,7 @@ -host:40.73.37.164 +host: {TDengine Server IP/hosename} port:6041 -user:test -password:123456 +user: {TDengine account} +password:{password} prefix:mqtt_msg database:testdb diff --git a/hivemq-tdengine-extension/src/test/java/com/github/micli/catfish/TDengineInterceptorIT.java b/hivemq-tdengine-extension/src/test/java/com/github/micli/catfish/TDengineInterceptorIT.java deleted file mode 100644 index 0e40cb2..0000000 --- a/hivemq-tdengine-extension/src/test/java/com/github/micli/catfish/TDengineInterceptorIT.java +++ /dev/null @@ -1,68 +0,0 @@ - -/* - * Copyright 2018-present HiveMQ GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.github.micli.catfish; - -import com.hivemq.client.mqtt.MqttGlobalPublishFilter; -import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; -import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; -import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; -import com.hivemq.extension.sdk.api.annotations.NotNull; -import com.hivemq.testcontainer.core.MavenHiveMQExtensionSupplier; -import com.hivemq.testcontainer.junit5.HiveMQTestContainerExtension; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; -import org.junit.jupiter.api.extension.RegisterExtension; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.TimeUnit; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -/** - * This tests the functionality of the {@link TDengineInterceptor}. - * It uses the HiveMQ Testcontainer to automatically package and deploy this extension inside a HiveMQ docker container. - * - * @author Yannick Weber - * @since 4.3.1 - */ -public class TDengineInterceptorIT { - - @RegisterExtension - public final @NotNull HiveMQTestContainerExtension extension = - new HiveMQTestContainerExtension() - .withExtension(MavenHiveMQExtensionSupplier.direct().get()); - - @Test - @Timeout(value = 5, unit = TimeUnit.MINUTES) - void test_payload_modified() throws InterruptedException { - final Mqtt5BlockingClient client = Mqtt5Client.builder() - .identifier("hello-world-client") - .serverPort(extension.getMqttPort()) - .buildBlocking(); - client.connect(); - - final Mqtt5BlockingClient.Mqtt5Publishes publishes = client.publishes(MqttGlobalPublishFilter.ALL); - client.subscribeWith().topicFilter("hello/world").send(); - - client.publishWith().topic("hello/world").payload("Good Bye World!".getBytes(StandardCharsets.UTF_8)).send(); - - final Mqtt5Publish receive = publishes.receive(); - assertTrue(receive.getPayload().isPresent()); - assertEquals("Hello World!", new String(receive.getPayloadAsBytes(), StandardCharsets.UTF_8)); - } -} \ No newline at end of file