diff --git a/tdenginereader/pom.xml b/tdenginereader/pom.xml
index 31cd4ad089..075a2789c0 100644
--- a/tdenginereader/pom.xml
+++ b/tdenginereader/pom.xml
@@ -29,11 +29,6 @@
-
- com.alibaba
- fastjson
-
-
com.alibaba.datax.tdenginewriter
tdenginewriter
@@ -44,13 +39,7 @@
com.taosdata.jdbc
taos-jdbcdriver
- 2.0.37
-
-
- com.alibaba
- fastjson
-
-
+ 2.0.39
diff --git a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java
index 379777941d..4ec42d9e39 100644
--- a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java
+++ b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java
@@ -93,7 +93,7 @@ public void init() {
}
if (start >= end)
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE,
- "The parameter " + Key.BEGIN_DATETIME + ": " + beginDatetime + " should be less than the parameter " + Key.END_DATETIME + ": " + endDatetime + ".");
+ "The parameter [" + Key.BEGIN_DATETIME + "] should be less than the parameter [" + Key.END_DATETIME + "].");
}
@@ -119,6 +119,7 @@ public List split(int adviceNumber) {
}
}
+ LOG.info("Configuration: {}", configurations);
return configurations;
}
}
diff --git a/tdenginewriter/doc/tdenginewriter-CN.md b/tdenginewriter/doc/tdenginewriter-CN.md
index 54cb9b13dc..3d115fb79a 100644
--- a/tdenginewriter/doc/tdenginewriter-CN.md
+++ b/tdenginewriter/doc/tdenginewriter-CN.md
@@ -175,8 +175,8 @@ datax中的数据类型,可以映射到TDengine的数据类型
| TDengine到TDengine | [超级表到子表](../src/test/resources/t2t-3.json) |
| TDengine到TDengine | [普通表到普通表](../src/test/resources/t2t-4.json) |
| RDBMS到TDengine | [普通表到超级表,指定tbname](../src/test/resources/dm2t-1.json) |
-| RDBMS到TDengine | [普通表到超级表,不指定tbname](../src/test/resources/dm2t-2.json) |
-| RDBMS到TDengine | [普通表到子表](../src/test/resources/dm2t-3.json) |
+| RDBMS到TDengine | [普通表到超级表,不指定tbname](../src/test/resources/dm2t-3.json) |
+| RDBMS到TDengine | [普通表到子表](../src/test/resources/dm2t-2.json) |
| RDBMS到TDengine | [普通表到普通表](../src/test/resources/dm2t-4.json) |
| OpenTSDB到TDengine | [metric到普通表](../src/test/resources/o2t-1.json) |
diff --git a/tdenginewriter/pom.xml b/tdenginewriter/pom.xml
index 754b7f509a..a7564e6bad 100644
--- a/tdenginewriter/pom.xml
+++ b/tdenginewriter/pom.xml
@@ -20,21 +20,10 @@
-
- com.alibaba
- fastjson
-
-
com.taosdata.jdbc
taos-jdbcdriver
- 2.0.37
-
-
- com.alibaba
- fastjson
-
-
+ 2.0.39
@@ -73,6 +62,7 @@
5.1.49
test
+
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Constants.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Constants.java
index e0445219e8..d62c8f3273 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Constants.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Constants.java
@@ -1,6 +1,8 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
public class Constants {
- public static final int DEFAULT_BATCH_SIZE = 1000;
+ public static final String DEFAULT_USERNAME = "root";
+ public static final String DEFAULT_PASSWORD = "taosdata";
+ public static final int DEFAULT_BATCH_SIZE = 1;
public static final boolean DEFAULT_IGNORE_TAGS_UNMATCHED = false;
}
\ No newline at end of file
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java
index 4958a6317e..27ade38278 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java
@@ -17,10 +17,20 @@
import java.util.*;
import java.util.Date;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
public class DefaultDataHandler implements DataHandler {
private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
+ static {
+ try {
+ Class.forName("com.taosdata.jdbc.TSDBDriver");
+ Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
+ } catch (ClassNotFoundException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
private final TaskPluginCollector taskPluginCollector;
private String username;
private String password;
@@ -38,27 +48,19 @@ public void setTableMetas(Map tableMetas) {
this.tableMetas = tableMetas;
}
- public void setColumnMetas(Map> columnMetas) {
- this.columnMetas = columnMetas;
+ public void setTbnameColumnMetasMap(Map> tbnameColumnMetasMap) {
+ this.tbnameColumnMetasMap = tbnameColumnMetasMap;
}
public void setSchemaManager(SchemaManager schemaManager) {
this.schemaManager = schemaManager;
}
- private Map> columnMetas;
-
- static {
- try {
- Class.forName("com.taosdata.jdbc.TSDBDriver");
- Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
- } catch (ClassNotFoundException ignored) {
- }
- }
+ private Map> tbnameColumnMetasMap;
public DefaultDataHandler(Configuration configuration, TaskPluginCollector taskPluginCollector) {
- this.username = configuration.getString(Key.USERNAME);
- this.password = configuration.getString(Key.PASSWORD);
+ this.username = configuration.getString(Key.USERNAME, Constants.DEFAULT_USERNAME);
+ this.password = configuration.getString(Key.PASSWORD, Constants.DEFAULT_PASSWORD);
this.jdbcUrl = configuration.getString(Key.JDBC_URL);
this.batchSize = configuration.getInt(Key.BATCH_SIZE, Constants.DEFAULT_BATCH_SIZE);
this.tables = configuration.getList(Key.TABLE, String.class);
@@ -74,13 +76,11 @@ public int handle(RecordReceiver lineReceiver, TaskPluginCollector collector) {
try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) {
LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established.");
- if (schemaManager == null) {
- // prepare table_name -> table_meta
- this.schemaManager = new SchemaManager(conn);
- this.tableMetas = schemaManager.loadTableMeta(tables);
- // prepare table_name -> column_meta
- this.columnMetas = schemaManager.loadColumnMetas(tables);
- }
+ // prepare table_name -> table_meta
+ this.schemaManager = new SchemaManager(conn);
+ this.tableMetas = schemaManager.loadTableMeta(tables);
+ // prepare table_name -> column_meta
+ this.tbnameColumnMetasMap = schemaManager.loadColumnMetas(tables);
List recordBatch = new ArrayList<>();
Record record;
@@ -91,7 +91,7 @@ public int handle(RecordReceiver lineReceiver, TaskPluginCollector collector) {
try {
recordBatch.add(record);
affectedRows += writeBatch(conn, recordBatch);
- } catch (Exception e) {
+ } catch (SQLException e) {
LOG.warn("use one row insert. because:" + e.getMessage());
affectedRows += writeEachRow(conn, recordBatch);
}
@@ -103,7 +103,7 @@ public int handle(RecordReceiver lineReceiver, TaskPluginCollector collector) {
if (!recordBatch.isEmpty()) {
try {
affectedRows += writeBatch(conn, recordBatch);
- } catch (Exception e) {
+ } catch (SQLException e) {
LOG.warn("use one row insert. because:" + e.getMessage());
affectedRows += writeEachRow(conn, recordBatch);
}
@@ -127,8 +127,8 @@ private int writeEachRow(Connection conn, List recordBatch) {
recordList.add(record);
try {
affectedRows += writeBatch(conn, recordList);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ } catch (SQLException e) {
+ LOG.error(e.getMessage());
this.taskPluginCollector.collectDirtyRecord(record, e);
}
}
@@ -146,16 +146,18 @@ private int writeEachRow(Connection conn, List recordBatch) {
* 3. 对于tb,拼sql,例如:data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => insert into tb(ts, f1, f2) values(ts, f1, f2)
* 4. 对于t,拼sql,例如:data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, f3, t1, t2] insert into t(ts, f1, f2, f3, t1, t2) values(ts, f1, f2, f3, t1, t2)
*/
- public int writeBatch(Connection conn, List recordBatch) throws Exception {
+ public int writeBatch(Connection conn, List recordBatch) throws SQLException {
int affectedRows = 0;
for (String table : tables) {
TableMeta tableMeta = tableMetas.get(table);
switch (tableMeta.tableType) {
case SUP_TABLE: {
- if (columns.contains("tbname"))
+ if (columns.contains("tbname")) {
affectedRows += writeBatchToSupTableBySQL(conn, table, recordBatch);
- else
- affectedRows += writeBatchToSupTableBySchemaless(conn, table, recordBatch);
+ } else {
+ Map tag2Tbname = schemaManager.loadTagTableNameMap(table);
+ affectedRows += writeBatchToSupTableWithoutTbname(conn, table, recordBatch, tag2Tbname);
+ }
}
break;
case SUB_TABLE:
@@ -169,67 +171,105 @@ public int writeBatch(Connection conn, List recordBatch) throws Exceptio
return affectedRows;
}
+ private int writeBatchToSupTableWithoutTbname(Connection conn, String table, List recordBatch, Map tag2Tbname) throws SQLException {
+ List columnMetas = tbnameColumnMetasMap.get(table);
+ List subTableExist = filterSubTableExistRecords(recordBatch, columnMetas, tag2Tbname);
+ List subTableNotExist = filterSubTableNotExistRecords(recordBatch, columnMetas, tag2Tbname);
+
+ int affectedRows = 0;
+ Map> subTableRecordsMap = splitRecords(subTableExist, columnMetas, tag2Tbname);
+
+ List subTables = new ArrayList<>(subTableRecordsMap.keySet());
+ this.tbnameColumnMetasMap.putAll(schemaManager.loadColumnMetas(subTables));
+
+ for (String subTable : subTableRecordsMap.keySet()) {
+ List subTableRecords = subTableRecordsMap.get(subTable);
+ affectedRows += writeBatchToNormalTable(conn, subTable, subTableRecords);
+ }
+ if (!subTableNotExist.isEmpty())
+ affectedRows += writeBatchToSupTableBySchemaless(conn, table, subTableNotExist);
+ return affectedRows;
+ }
+
+ private List filterSubTableExistRecords(List recordBatch, List columnMetas, Map tag2Tbname) {
+ return recordBatch.stream().filter(record -> {
+ String tagStr = getTagString(columnMetas, record);
+ return tag2Tbname.containsKey(tagStr);
+ }).collect(Collectors.toList());
+ }
+
+ private List filterSubTableNotExistRecords(List recordBatch, List columnMetas, Map tag2Tbname) {
+ return recordBatch.stream().filter(record -> {
+ String tagStr = getTagString(columnMetas, record);
+ return !tag2Tbname.containsKey(tagStr);
+ }).collect(Collectors.toList());
+ }
+
+ private Map> splitRecords(List subTableExist, List columnMetas, Map tag2Tbname) {
+ Map> ret = new HashMap<>();
+ for (Record record : subTableExist) {
+ String tagstr = getTagString(columnMetas, record);
+ String tbname = tag2Tbname.get(tagstr);
+ if (ret.containsKey(tbname)) {
+ ret.get(tbname).add(record);
+ } else {
+ List list = new ArrayList<>();
+ list.add(record);
+ ret.put(tbname, list);
+ }
+ }
+ return ret;
+ }
+
+ private String getTagString(List columnMetas, Record record) {
+ return IntStream.range(0, columnMetas.size()).mapToObj(colIndex -> {
+ ColumnMeta columnMeta = columnMetas.get(colIndex);
+ if (columnMeta.isTag) {
+ Column column = record.getColumn(colIndex);
+ switch (columnMeta.type) {
+ case "TINYINT":
+ case "SMALLINT":
+ case "INT":
+ case "BIGINT":
+ return column.asLong().toString();
+ default:
+ return column.asString();
+ }
+ }
+ return "";
+ }).collect(Collectors.joining());
+ }
+
/**
* insert into record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
*/
- private int writeBatchToSupTableBySQL(Connection conn, String table, List recordBatch) throws Exception {
- List columnMetas = this.columnMetas.get(table);
+ private int writeBatchToSupTableBySQL(Connection conn, String table, List recordBatch) throws SQLException {
+ List columnMetas = this.tbnameColumnMetasMap.get(table);
StringBuilder sb = new StringBuilder("insert into");
for (Record record : recordBatch) {
sb.append(" ").append(record.getColumn(indexOf("tbname")).asString())
.append(" using ").append(table)
- .append(" tags");
-// sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
-// return colMeta.isTag;
-// }).map(colMeta -> {
-// return buildColumnValue(colMeta, record);
-// }).collect(Collectors.joining(",", "(", ")")));
- sb.append("(");
- for (int i = 0; i < columns.size(); i++) {
- ColumnMeta colMeta = columnMetas.get(i);
- if (!columns.contains(colMeta.field))
- continue;
- if (!colMeta.isTag)
- continue;
- String tagValue = buildColumnValue(colMeta, record);
- if (i == 0) {
- sb.append(tagValue);
- } else {
- sb.append(",").append(tagValue);
- }
- }
- sb.append(")");
-
- sb.append(" ").append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
+ .append(" tags")
+ .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
+ return colMeta.isTag;
+ }).map(colMeta -> {
+ return buildColumnValue(colMeta, record);
+ }).collect(Collectors.joining(",", "(", ")")))
+ .append(" ")
+ .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return !colMeta.isTag;
}).map(colMeta -> {
return colMeta.field;
}).collect(Collectors.joining(",", "(", ")")))
- .append(" values");
-
-// sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
-// return !colMeta.isTag;
-// }).map(colMeta -> {
-// return buildColumnValue(colMeta, record);
-// }).collect(Collectors.joining(",", "(", ")")));
- sb.append("(");
- for (int i = 0; i < columnMetas.size(); i++) {
- ColumnMeta colMeta = columnMetas.get(i);
- if (!columns.contains(colMeta.field))
- continue;
- if (colMeta.isTag)
- continue;
- String colValue = buildColumnValue(colMeta, record);
- if (i == 0) {
- sb.append(colValue);
- } else {
- sb.append(",").append(colValue);
- }
- }
- sb.append(")");
+ .append(" values")
+ .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
+ return !colMeta.isTag;
+ }).map(colMeta -> {
+ return buildColumnValue(colMeta, record);
+ }).collect(Collectors.joining(",", "(", ")")));
}
String sql = sb.toString();
@@ -245,11 +285,10 @@ private int executeUpdate(Connection conn, String sql) throws SQLException {
return count;
}
- private String buildColumnValue(ColumnMeta colMeta, Record record) throws Exception {
+ private String buildColumnValue(ColumnMeta colMeta, Record record) {
Column column = record.getColumn(indexOf(colMeta.field));
TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision();
- Column.Type type = column.getType();
- switch (type) {
+ switch (column.getType()) {
case DATE: {
Date value = column.asDate();
switch (timestampPrecision) {
@@ -268,6 +307,8 @@ private String buildColumnValue(ColumnMeta colMeta, Record record) throws Except
if (colMeta.type.equals("TIMESTAMP"))
return "\"" + column.asString() + "\"";
String value = column.asString();
+ if (value == null)
+ return "NULL";
return "\'" + Utils.escapeSingleQuota(value) + "\'";
case NULL:
case BAD:
@@ -276,9 +317,8 @@ private String buildColumnValue(ColumnMeta colMeta, Record record) throws Except
case DOUBLE:
case INT:
case LONG:
- return column.asString();
default:
- throw new Exception("invalid column type: " + type);
+ return column.asString();
}
}
@@ -290,7 +330,7 @@ private int writeBatchToSupTableBySchemaless(Connection conn, String table, List
int count = 0;
TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision();
- List columnMetaList = this.columnMetas.get(table);
+ List columnMetaList = this.tbnameColumnMetasMap.get(table);
ColumnMeta ts = columnMetaList.stream().filter(colMeta -> colMeta.isPrimaryKey).findFirst().get();
List lines = new ArrayList<>();
@@ -426,8 +466,8 @@ private String buildSchemalessColumnValue(ColumnMeta colMeta, Record record) {
* else
* insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)])
*/
- private int writeBatchToSubTable(Connection conn, String table, List recordBatch) throws Exception {
- List columnMetas = this.columnMetas.get(table);
+ private int writeBatchToSubTable(Connection conn, String table, List recordBatch) throws SQLException {
+ List columnMetas = this.tbnameColumnMetasMap.get(table);
StringBuilder sb = new StringBuilder();
sb.append("insert into ").append(table).append(" ")
@@ -453,25 +493,11 @@ private int writeBatchToSubTable(Connection conn, String table, List rec
if (ignoreTagsUnmatched && !tagsAllMatch)
continue;
-// sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
-// return !colMeta.isTag;
-// }).map(colMeta -> {
-// return buildColumnValue(colMeta, record);
-// }).collect(Collectors.joining(", ", "(", ") ")));
- sb.append("(");
- for (int i = 0; i < columnMetas.size(); i++) {
- ColumnMeta colMeta = columnMetas.get(i);
- if (colMeta.isTag)
- continue;
- String colValue = buildColumnValue(colMeta, record);
- if (i == 0) {
- sb.append(colValue);
- } else {
- sb.append(",").append(colValue);
- }
- }
- sb.append(")");
-
+ sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
+ return !colMeta.isTag;
+ }).map(colMeta -> {
+ return buildColumnValue(colMeta, record);
+ }).collect(Collectors.joining(", ", "(", ") ")));
validRecords++;
}
@@ -510,34 +536,21 @@ private boolean equals(Column column, ColumnMeta colMeta) {
* table: ["weather"], column: ["ts, f1, f2, f3, t1, t2"]
* sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...)
*/
- private int writeBatchToNormalTable(Connection conn, String table, List recordBatch) throws Exception {
- List columnMetas = this.columnMetas.get(table);
+ private int writeBatchToNormalTable(Connection conn, String table, List recordBatch) throws SQLException {
+ List columnMetas = this.tbnameColumnMetasMap.get(table);
StringBuilder sb = new StringBuilder();
sb.append("insert into ").append(table)
.append(" ")
- .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
+ .append(columnMetas.stream().filter(colMeta -> !colMeta.isTag).filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
return colMeta.field;
}).collect(Collectors.joining(",", "(", ")")))
.append(" values ");
for (Record record : recordBatch) {
-// sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
-// return buildColumnValue(colMeta, record);
-// }).collect(Collectors.joining(",", "(", ")")));
- sb.append("(");
- for (int i = 0; i < columnMetas.size(); i++) {
- ColumnMeta colMeta = columnMetas.get(i);
- if (!columns.contains(colMeta.field))
- continue;
- String colValue = buildColumnValue(colMeta, record);
- if (i == 0) {
- sb.append(colValue);
- } else {
- sb.append(",").append(colValue);
- }
- }
- sb.append(")");
+ sb.append(columnMetas.stream().filter(colMeta -> !colMeta.isTag).filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
+ return buildColumnValue(colMeta, record);
+ }).collect(Collectors.joining(",", "(", ")")));
}
String sql = sb.toString();
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java
index f3bdbfbc31..fc0c002d91 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java
@@ -5,18 +5,18 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.sql.*;
import java.util.*;
import java.util.stream.Collectors;
public class SchemaManager {
private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class);
+// private static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = "_";
+ private static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = "";
private final Connection conn;
private TimestampPrecision precision;
+ private Map> tags2tbnameMaps = new HashMap<>();
public SchemaManager(Connection conn) {
this.conn = conn;
@@ -124,14 +124,12 @@ public Map> loadColumnMetas(List tables) throws
}
}
} catch (SQLException e) {
- LOG.error(e.getMessage(), e);
+ e.printStackTrace();
}
colMeta.value = value;
});
- LOG.debug("load column metadata of " + table + ": " +
- columnMetaList.stream().map(ColumnMeta::toString).collect(Collectors.joining(",", "[", "]"))
- );
+ LOG.debug("load column metadata of " + table + ": " + Arrays.toString(columnMetaList.toArray()));
ret.put(table, columnMetaList);
}
return ret;
@@ -145,9 +143,7 @@ private TableMeta buildSupTableMeta(ResultSet rs) throws SQLException {
tableMeta.tags = rs.getInt("tags");
tableMeta.tables = rs.getInt("tables");
- if (LOG.isDebugEnabled()){
- LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta);
- }
+ LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta);
return tableMeta;
}
@@ -174,4 +170,37 @@ private ColumnMeta buildColumnMeta(ResultSet rs, boolean isPrimaryKey) throws SQ
return columnMeta;
}
+ public Map loadTagTableNameMap(String table) throws SQLException {
+ if (tags2tbnameMaps.containsKey(table))
+ return tags2tbnameMaps.get(table);
+ Map tags2tbname = new HashMap<>();
+ try (Statement stmt = conn.createStatement()) {
+ // describe table
+ List tags = new ArrayList<>();
+ ResultSet rs = stmt.executeQuery("describe " + table);
+ while (rs.next()) {
+ String note = rs.getString("Note");
+ if ("TAG".equals(note)) {
+ tags.add(rs.getString("Field"));
+ }
+ }
+ // select distinct tbname, t1, t2 from stb
+ rs = stmt.executeQuery("select distinct " + String.join(",", tags) + ",tbname from " + table);
+ while (rs.next()) {
+ ResultSet finalRs = rs;
+ String tagStr = tags.stream().map(t -> {
+ try {
+ return finalRs.getString(t);
+ } catch (SQLException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ return "NULL";
+ }).collect(Collectors.joining(TAG_TABLE_NAME_MAP_KEY_SPLITTER));
+ String tbname = rs.getString("tbname");
+ tags2tbname.put(tagStr, tbname);
+ }
+ }
+ tags2tbnameMaps.put(table, tags2tbname);
+ return tags2tbname;
+ }
}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java
index e1f2bc291c..469449e63a 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java
@@ -4,10 +4,10 @@
public enum TDengineWriterErrorCode implements ErrorCode {
- REQUIRED_VALUE("TDengineWriter-00", "parameter value is missing"),
- ILLEGAL_VALUE("TDengineWriter-01", "invalid parameter value"),
- RUNTIME_EXCEPTION("TDengineWriter-02", "runtime exception"),
- TYPE_ERROR("TDengineWriter-03", "data type mapping error");
+ REQUIRED_VALUE("TDengineWriter-00", "缺失必要的值"),
+ ILLEGAL_VALUE("TDengineWriter-01", "值非法"),
+ RUNTIME_EXCEPTION("TDengineWriter-02", "运行时异常"),
+ TYPE_ERROR("TDengineWriter-03", "Datax类型无法正确映射到TDengine类型");
private final String code;
private final String description;
diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DM2TDengineTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DM2TDengineTest.java
index b6932f6004..15f6b1bcd0 100644
--- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DM2TDengineTest.java
+++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DM2TDengineTest.java
@@ -2,7 +2,6 @@
import com.alibaba.datax.core.Engine;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import java.sql.*;
@@ -10,7 +9,6 @@
import java.util.Date;
import java.util.Random;
-@Ignore
public class DM2TDengineTest {
private String host1 = "192.168.0.72";
diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java
index 1034b74a88..46e601ad4a 100644
--- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java
+++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java
@@ -5,10 +5,12 @@
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
-import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.transport.record.DefaultRecord;
-import org.junit.*;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -19,7 +21,6 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-@Ignore
public class DefaultDataHandlerTest {
private static final String host = "192.168.1.93";
@@ -28,7 +29,7 @@ public class DefaultDataHandlerTest {
private final TaskPluginCollector taskPluginCollector = new TDengineWriter.Task().getTaskPluginCollector();
@Test
- public void writeSupTableBySQL() throws Exception {
+ public void writeSupTableBySQL() throws SQLException {
// given
createSupAndSubTable();
Configuration configuration = Configuration.from("{" +
@@ -58,7 +59,7 @@ public void writeSupTableBySQL() throws Exception {
Map tableMetas = schemaManager.loadTableMeta(tables);
Map> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
- handler.setColumnMetas(columnMetas);
+ handler.setTbnameColumnMetasMap(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList);
@@ -68,7 +69,7 @@ public void writeSupTableBySQL() throws Exception {
}
@Test
- public void writeSupTableBySQL_2() throws Exception {
+ public void writeSupTableBySQL_2() throws SQLException {
// given
createSupAndSubTable();
Configuration configuration = Configuration.from("{" +
@@ -96,7 +97,7 @@ public void writeSupTableBySQL_2() throws Exception {
Map tableMetas = schemaManager.loadTableMeta(tables);
Map> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
- handler.setColumnMetas(columnMetas);
+ handler.setTbnameColumnMetasMap(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList);
@@ -106,7 +107,7 @@ public void writeSupTableBySQL_2() throws Exception {
}
@Test
- public void writeSupTableBySchemaless() throws Exception {
+ public void writeSupTableBySchemaless() throws SQLException {
// given
createSupTable();
Configuration configuration = Configuration.from("{" +
@@ -136,7 +137,7 @@ public void writeSupTableBySchemaless() throws Exception {
Map tableMetas = schemaManager.loadTableMeta(tables);
Map> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
- handler.setColumnMetas(columnMetas);
+ handler.setTbnameColumnMetasMap(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(connection, recordList);
@@ -146,7 +147,7 @@ public void writeSupTableBySchemaless() throws Exception {
}
@Test
- public void writeSubTableWithTableName() throws Exception {
+ public void writeSubTableWithTableName() throws SQLException {
// given
createSupAndSubTable();
Configuration configuration = Configuration.from("{" +
@@ -175,7 +176,7 @@ public void writeSubTableWithTableName() throws Exception {
Map tableMetas = schemaManager.loadTableMeta(tables);
Map> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
- handler.setColumnMetas(columnMetas);
+ handler.setTbnameColumnMetasMap(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList);
@@ -185,7 +186,7 @@ public void writeSubTableWithTableName() throws Exception {
}
@Test
- public void writeSubTableWithoutTableName() throws Exception {
+ public void writeSubTableWithoutTableName() throws SQLException {
// given
createSupAndSubTable();
Configuration configuration = Configuration.from("{" +
@@ -214,7 +215,7 @@ public void writeSubTableWithoutTableName() throws Exception {
Map tableMetas = schemaManager.loadTableMeta(tables);
Map> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
- handler.setColumnMetas(columnMetas);
+ handler.setTbnameColumnMetasMap(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList);
@@ -224,7 +225,7 @@ public void writeSubTableWithoutTableName() throws Exception {
}
@Test
- public void writeNormalTable() throws Exception {
+ public void writeNormalTable() throws SQLException {
// given
createSupAndSubTable();
Configuration configuration = Configuration.from("{" +
@@ -253,7 +254,7 @@ public void writeNormalTable() throws Exception {
Map tableMetas = schemaManager.loadTableMeta(tables);
Map> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
- handler.setColumnMetas(columnMetas);
+ handler.setTbnameColumnMetasMap(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList);
diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mongo2TDengineTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mongo2TDengineTest.java
new file mode 100644
index 0000000000..2356b6f808
--- /dev/null
+++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mongo2TDengineTest.java
@@ -0,0 +1,16 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+import com.alibaba.datax.core.Engine;
+import org.junit.Test;
+
+public class Mongo2TDengineTest {
+
+ @Test
+ public void case01() throws Throwable {
+
+ // when
+ String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/mongo2t.json"};
+ System.setProperty("datax.home", "../target/datax/datax");
+ Engine.entry(params);
+ }
+}
\ No newline at end of file
diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mysql2TDengineTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mysql2TDengineTest.java
index 8cbc2d90a5..4a662711f3 100644
--- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mysql2TDengineTest.java
+++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mysql2TDengineTest.java
@@ -2,14 +2,12 @@
import com.alibaba.datax.core.Engine;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.Random;
-@Ignore
public class Mysql2TDengineTest {
private static final String host1 = "192.168.56.105";
diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Opentsdb2TDengineTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Opentsdb2TDengineTest.java
index 0d212e8a9f..ad326f7e33 100644
--- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Opentsdb2TDengineTest.java
+++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Opentsdb2TDengineTest.java
@@ -2,12 +2,10 @@
import com.alibaba.datax.core.Engine;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import java.sql.*;
-@Ignore
public class Opentsdb2TDengineTest {
@Test
diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManagerTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManagerTest.java
index fac1e0f7da..3708e6f967 100644
--- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManagerTest.java
+++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManagerTest.java
@@ -1,6 +1,9 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
-import org.junit.*;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -10,7 +13,6 @@
import java.util.List;
import java.util.Map;
-@Ignore
public class SchemaManagerTest {
private static Connection conn;
@@ -59,6 +61,23 @@ public void loadColumnMetas() {
Assert.assertEquals(4, stb1.size());
}
+ @Test
+ public void loadTagTableNameMap() throws SQLException {
+ // given
+ SchemaManager schemaManager = new SchemaManager(conn);
+ String table = "stb3";
+
+ // when
+ Map tagTableMap = schemaManager.loadTagTableNameMap(table);
+
+ // then
+ Assert.assertEquals(2, tagTableMap.keySet().size());
+ Assert.assertTrue(tagTableMap.containsKey("11.1abc"));
+ Assert.assertTrue(tagTableMap.containsKey("22.2defg"));
+ Assert.assertEquals("tb5", tagTableMap.get("11.1abc"));
+ Assert.assertEquals("tb6", tagTableMap.get("22.2defg"));
+ }
+
@BeforeClass
public static void beforeClass() throws SQLException {
conn = DriverManager.getConnection("jdbc:TAOS-RS://192.168.56.105:6041", "root", "taosdata");
@@ -73,6 +92,9 @@ public static void beforeClass() throws SQLException {
stmt.execute("insert into tb3 using stb2 tags(1,1) values(now, 1, 2, 3)");
stmt.execute("insert into tb4 using stb2 tags(2,2) values(now, 1, 2, 3)");
stmt.execute("create table weather(ts timestamp, f1 int, f2 int, f3 int, t1 int, t2 int)");
+ stmt.execute("create table stb3(ts timestamp, f1 int) tags(t1 int, t2 float, t3 nchar(32))");
+ stmt.execute("insert into tb5 using stb3 tags(1,1.1,'abc') values(now, 1)");
+ stmt.execute("insert into tb6 using stb3 tags(2,2.2,'defg') values(now, 2)");
}
}
diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Stream2TDengineTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Stream2TDengineTest.java
index 613d2aa999..e54bcbde02 100644
--- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Stream2TDengineTest.java
+++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Stream2TDengineTest.java
@@ -1,7 +1,7 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.core.Engine;
-import org.junit.Ignore;
+import org.junit.Before;
import org.junit.Test;
import java.sql.Connection;
@@ -9,7 +9,6 @@
import java.sql.SQLException;
import java.sql.Statement;
-@Ignore
public class Stream2TDengineTest {
private String host2 = "192.168.56.105";
diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengine2TDengineTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengine2TDengineTest.java
index 18efdc142e..9e9546332a 100644
--- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengine2TDengineTest.java
+++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengine2TDengineTest.java
@@ -2,14 +2,12 @@
import com.alibaba.datax.core.Engine;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.Random;
-@Ignore
public class TDengine2TDengineTest {
private static final String host1 = "192.168.56.105";
diff --git a/tdenginewriter/src/test/resources/incremental_sync/clean_env.sh b/tdenginewriter/src/test/resources/incremental_sync/clean_env.sh
new file mode 100755
index 0000000000..f3dca7c13f
--- /dev/null
+++ b/tdenginewriter/src/test/resources/incremental_sync/clean_env.sh
@@ -0,0 +1,9 @@
+#!/bin/bash
+
+datax_home_dir=$(dirname $(readlink -f "$0"))
+
+curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'drop table if exists db2.stb2;' 192.168.1.93:6041/rest/sql
+curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'create table if not exists db2.stb2 (`ts` TIMESTAMP,`f2` SMALLINT,`f4` BIGINT,`f5` FLOAT,`f6` DOUBLE,`f7` DOUBLE,`f8` BOOL,`f9` NCHAR(100),`f10` NCHAR(200)) TAGS (`f1` TINYINT,`f3` INT);' 192.168.1.93:6041/rest/sql
+
+rm -f ${datax_home_dir}/log/*
+rm -f ${datax_home_dir}/job/*.csv
\ No newline at end of file
diff --git a/tdenginewriter/src/test/resources/incremental_sync/csv2t-jni.json b/tdenginewriter/src/test/resources/incremental_sync/csv2t-jni.json
new file mode 100644
index 0000000000..625c38010c
--- /dev/null
+++ b/tdenginewriter/src/test/resources/incremental_sync/csv2t-jni.json
@@ -0,0 +1,106 @@
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "txtfilereader",
+ "parameter": {
+ "path": [
+ "/root/workspace/tmp/a.txt"
+ ],
+ "encoding": "UTF-8",
+ "column": [
+ {
+ "index": 0,
+ "type": "date",
+ "format": "yyyy-MM-dd HH:mm:ss.SSS"
+ },
+ {
+ "index": 1,
+ "type": "long"
+ },
+ {
+ "index": 2,
+ "type": "long"
+ },
+ {
+ "index": 3,
+ "type": "long"
+ },
+ {
+ "index": 4,
+ "type": "long"
+ },
+ {
+ "index": 5,
+ "type": "double"
+ },
+ {
+ "index": 6,
+ "type": "double"
+ },
+ {
+ "index": 7,
+ "type": "boolean"
+ },
+ {
+ "index": 8,
+ "type": "string"
+ },
+ {
+ "index": 9,
+ "type": "string"
+ },
+ {
+ "index": 10,
+ "type": "date",
+ "format": "yyyy-MM-dd HH:mm:ss.SSS"
+ },
+ {
+ "index": 11,
+ "type": "string"
+ }
+ ],
+ "fieldDelimiter": ","
+ }
+ },
+ "writer": {
+ "name": "tdenginewriter",
+ "parameter": {
+ "username": "root",
+ "password": "taosdata",
+ "column": [
+ "ts",
+ "f1",
+ "f2",
+ "f3",
+ "f4",
+ "f5",
+ "f6",
+ "f7",
+ "f8",
+ "f9",
+ "t1",
+ "tbname"
+ ],
+ "connection": [
+ {
+ "table": [
+ "stb2"
+ ],
+ "jdbcUrl": "jdbc:TAOS://192.168.1.93:6030/db2"
+ }
+ ],
+ "batchSize": 1000,
+ "ignoreTagsUnmatched": true
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/tdenginewriter/src/test/resources/incremental_sync/csv2t-restful.json b/tdenginewriter/src/test/resources/incremental_sync/csv2t-restful.json
new file mode 100644
index 0000000000..d852e2e2c1
--- /dev/null
+++ b/tdenginewriter/src/test/resources/incremental_sync/csv2t-restful.json
@@ -0,0 +1,57 @@
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "txtfilereader",
+ "parameter": {
+ "path": [
+ "/root/workspace/tmp/a.txt"
+ ],
+ "encoding": "UTF-8",
+ "column": [
+ "*"
+ ],
+ "fieldDelimiter": ","
+ }
+ },
+ "writer": {
+ "name": "tdenginewriter",
+ "parameter": {
+ "username": "root",
+ "password": "taosdata",
+ "column": [
+ "ts",
+ "f1",
+ "f2",
+ "f3",
+ "f4",
+ "f5",
+ "f6",
+ "f7",
+ "f8",
+ "f9",
+ "t1",
+ "tbname"
+ ],
+ "connection": [
+ {
+ "table": [
+ "stb2"
+ ],
+ "jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2"
+ }
+ ],
+ "batchSize": 1000,
+ "ignoreTagsUnmatched": true
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/tdenginewriter/src/test/resources/incremental_sync/dm2t-jni.json b/tdenginewriter/src/test/resources/incremental_sync/dm2t-jni.json
new file mode 100644
index 0000000000..3e86bb8deb
--- /dev/null
+++ b/tdenginewriter/src/test/resources/incremental_sync/dm2t-jni.json
@@ -0,0 +1,62 @@
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "rdbmsreader",
+ "parameter": {
+ "username": "TESTUSER",
+ "password": "test123456",
+ "connection": [
+ {
+ "querySql": [
+ "select concat(concat(concat('t', f1), '_'),f3) as tbname,* from stb1;"
+ ],
+ "jdbcUrl": [
+ "jdbc:dm://192.168.0.72:5236"
+ ]
+ }
+ ],
+ "fetchSize": 1024
+ }
+ },
+ "writer": {
+ "name": "tdenginewriter",
+ "parameter": {
+ "username": "root",
+ "password": "taosdata",
+ "column": [
+ "tbname",
+ "ts",
+ "f1",
+ "f2",
+ "f3",
+ "f4",
+ "f5",
+ "f6",
+ "f7",
+ "f8",
+ "f9",
+ "f10"
+ ],
+ "connection": [
+ {
+ "table": [
+ "stb2"
+ ],
+ "jdbcUrl": "jdbc:TAOS://192.168.1.93:6030/db2"
+ }
+ ],
+ "batchSize": 1000,
+ "ignoreTagsUnmatched": true
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/tdenginewriter/src/test/resources/incremental_sync/dm2t-restful.json b/tdenginewriter/src/test/resources/incremental_sync/dm2t-restful.json
new file mode 100644
index 0000000000..183786bf82
--- /dev/null
+++ b/tdenginewriter/src/test/resources/incremental_sync/dm2t-restful.json
@@ -0,0 +1,62 @@
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "rdbmsreader",
+ "parameter": {
+ "username": "TESTUSER",
+ "password": "test123456",
+ "connection": [
+ {
+ "querySql": [
+ "select concat(concat(concat('t', f1), '_'),f3) as tbname,* from stb1;"
+ ],
+ "jdbcUrl": [
+ "jdbc:dm://192.168.0.72:5236"
+ ]
+ }
+ ],
+ "fetchSize": 1024
+ }
+ },
+ "writer": {
+ "name": "tdenginewriter",
+ "parameter": {
+ "username": "root",
+ "password": "taosdata",
+ "column": [
+ "tbname",
+ "ts",
+ "f1",
+ "f2",
+ "f3",
+ "f4",
+ "f5",
+ "f6",
+ "f7",
+ "f8",
+ "f9",
+ "f10"
+ ],
+ "connection": [
+ {
+ "table": [
+ "stb2"
+ ],
+ "jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2"
+ }
+ ],
+ "batchSize": 1000,
+ "ignoreTagsUnmatched": true
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/tdenginewriter/src/test/resources/incremental_sync/dm2t-update.json b/tdenginewriter/src/test/resources/incremental_sync/dm2t-update.json
new file mode 100644
index 0000000000..d9285b23cd
--- /dev/null
+++ b/tdenginewriter/src/test/resources/incremental_sync/dm2t-update.json
@@ -0,0 +1,63 @@
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "rdbmsreader",
+ "parameter": {
+ "username": "TESTUSER",
+ "password": "test123456",
+ "connection": [
+ {
+ "querySql": [
+ "select concat(concat(concat('t', f1), '_'),f3) as tbname,* from stb1"
+ ],
+ "jdbcUrl": [
+ "jdbc:dm://192.168.0.72:5236"
+ ]
+ }
+ ],
+ "where": "1=1",
+ "fetchSize": 1024
+ }
+ },
+ "writer": {
+ "name": "tdenginewriter",
+ "parameter": {
+ "username": "root",
+ "password": "taosdata",
+ "column": [
+ "tbname",
+ "ts",
+ "f1",
+ "f2",
+ "f3",
+ "f4",
+ "f5",
+ "f6",
+ "f7",
+ "f8",
+ "f9",
+ "f10"
+ ],
+ "connection": [
+ {
+ "table": [
+ "stb2"
+ ],
+ "jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2"
+ }
+ ],
+ "batchSize": 1000,
+ "ignoreTagsUnmatched": true
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/tdenginewriter/src/test/resources/incremental_sync/dm2t_sync.sh b/tdenginewriter/src/test/resources/incremental_sync/dm2t_sync.sh
new file mode 100755
index 0000000000..426c623338
--- /dev/null
+++ b/tdenginewriter/src/test/resources/incremental_sync/dm2t_sync.sh
@@ -0,0 +1,57 @@
+#!/bin/bash
+
+set -e
+#set -x
+
+datax_home_dir=$(dirname $(readlink -f "$0"))
+table_name="stb1"
+update_key="ts"
+
+while getopts "hd:t:" arg; do
+ case $arg in
+ d)
+ datax_home_dir=$(echo $OPTARG)
+ ;;
+ v)
+ table_name=$(echo $OPTARG)
+ ;;
+ h)
+ echo "Usage: $(basename $0) -d [datax_home_dir] -t [table_name] -k [update_key]"
+ echo " -h help"
+ exit 0
+ ;;
+ ?) #unknow option
+ echo "unkonw argument"
+ exit 1
+ ;;
+ esac
+done
+
+if [[ -e ${datax_home_dir}/job/${table_name}.csv ]]; then
+ MAX_TIME=$(cat ${datax_home_dir}/job/${table_name}.csv)
+else
+ MAX_TIME="null"
+fi
+current_datetime=$(date +"%Y-%m-%d %H:%M:%S")
+current_timestamp=$(date +%s)
+
+if [ "$MAX_TIME" != "null" ]; then
+ WHERE="${update_key} >= '$MAX_TIME' and ${update_key} < '$current_datetime'"
+ sed "s/1=1/$WHERE/g" ${datax_home_dir}/job/dm2t-update.json >${datax_home_dir}/job/dm2t_${current_timestamp}.json
+ echo "incremental data synchronization, from '${MAX_TIME}' to '${current_datetime}'"
+ python ${datax_home_dir}/bin/datax.py ${datax_home_dir}/job/dm2t_${current_timestamp}.json 1> /dev/null 2>&1
+else
+ echo "full data synchronization, to '${current_datetime}'"
+ python ${datax_home_dir}/bin/datax.py ${datax_home_dir}/job/dm2t-update.json 1> /dev/null 2>&1
+fi
+
+if [[ $? -ne 0 ]]; then
+ echo "datax migration job falied"
+else
+ echo ${current_datetime} >$datax_home_dir/job/${table_name}.csv
+ echo "datax migration job success"
+fi
+
+rm -rf ${datax_home_dir}/job/dm2t_${current_timestamp}.json
+
+#while true; do ./dm2t_sync.sh; sleep 5s; done
\ No newline at end of file
diff --git a/tdenginewriter/src/test/resources/incremental_sync/t2dm-jni.json b/tdenginewriter/src/test/resources/incremental_sync/t2dm-jni.json
new file mode 100644
index 0000000000..341f6293ed
--- /dev/null
+++ b/tdenginewriter/src/test/resources/incremental_sync/t2dm-jni.json
@@ -0,0 +1,50 @@
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "tdenginereader",
+ "parameter": {
+ "username": "root",
+ "password": "taosdata",
+ "column": [
+ "*"
+ ],
+ "connection": [
+ {
+ "table": [
+ "stb1"
+ ],
+ "jdbcUrl": "jdbc:TAOS://192.168.56.105:6030/db1"
+ }
+ ]
+ }
+ },
+ "writer": {
+ "name": "rdbmswriter",
+ "parameter": {
+ "connection": [
+ {
+ "table": [
+ "stb2"
+ ],
+ "jdbcUrl": "jdbc:dm://192.168.0.72:5236"
+ }
+ ],
+ "username": "TESTUSER",
+ "password": "test123456",
+ "table": "stb2",
+ "column": [
+ "*"
+ ]
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/tdenginewriter/src/test/resources/incremental_sync/t2dm-restful.json b/tdenginewriter/src/test/resources/incremental_sync/t2dm-restful.json
new file mode 100644
index 0000000000..b2cf91e2ee
--- /dev/null
+++ b/tdenginewriter/src/test/resources/incremental_sync/t2dm-restful.json
@@ -0,0 +1,50 @@
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "tdenginereader",
+ "parameter": {
+ "username": "root",
+ "password": "taosdata",
+ "column": [
+ "*"
+ ],
+ "connection": [
+ {
+ "table": [
+ "stb1"
+ ],
+ "jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/db1"
+ }
+ ]
+ }
+ },
+ "writer": {
+ "name": "rdbmswriter",
+ "parameter": {
+ "connection": [
+ {
+ "table": [
+ "stb2"
+ ],
+ "jdbcUrl": "jdbc:dm://192.168.0.72:5236"
+ }
+ ],
+ "username": "TESTUSER",
+ "password": "test123456",
+ "table": "stb2",
+ "column": [
+ "*"
+ ]
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/tdenginewriter/src/test/resources/incremental_sync/upload.sh b/tdenginewriter/src/test/resources/incremental_sync/upload.sh
new file mode 100755
index 0000000000..388d275b03
--- /dev/null
+++ b/tdenginewriter/src/test/resources/incremental_sync/upload.sh
@@ -0,0 +1,13 @@
+#!/bin/bash
+
+scp t2dm-restful.json root@192.168.56.105:/root/workspace/tmp/datax/job
+scp t2dm-jni.json root@192.168.56.105:/root/workspace/tmp/datax/job
+scp dm2t-restful.json root@192.168.56.105:/root/workspace/tmp/datax/job
+scp dm2t-jni.json root@192.168.56.105:/root/workspace/tmp/datax/job
+scp dm2t-update.json root@192.168.56.105:/root/workspace/tmp/datax/job
+scp csv2t-restful.json root@192.168.56.105:/root/workspace/tmp/datax/job
+scp csv2t-jni.json root@192.168.56.105:/root/workspace/tmp/datax/job
+
+
+scp dm2t_sync.sh root@192.168.56.105:/root/workspace/tmp/datax
+scp clean_env.sh root@192.168.56.105:/root/workspace/tmp/datax
\ No newline at end of file
diff --git a/tdenginewriter/src/test/resources/mongo2t.json b/tdenginewriter/src/test/resources/mongo2t.json
new file mode 100644
index 0000000000..902e6f7c74
--- /dev/null
+++ b/tdenginewriter/src/test/resources/mongo2t.json
@@ -0,0 +1,66 @@
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "mongodbreader",
+ "parameter": {
+ "address": [
+ "192.168.1.213:27017"
+ ],
+ "userName": "",
+ "userPassword": "",
+ "dbName": "testdb",
+ "collectionName": "monitor_data",
+ "column": [
+ {
+ "name": "ct",
+ "type": "date"
+ },
+ {
+ "name": "pv",
+ "type": "float"
+ },
+ {
+ "name": "tv",
+ "type": "float"
+ },
+ {
+ "name": "pid",
+ "type": "float"
+ }
+ ]
+ }
+ },
+ "writer": {
+ "name": "tdenginewriter",
+ "parameter": {
+ "username": "root",
+ "password": "hmdata",
+ "column": [
+ "ts",
+ "pressure",
+ "temperature",
+ "position_id"
+ ],
+ "connection": [
+ {
+ "table": [
+ "pipeline_data"
+ ],
+ "jdbcUrl": "jdbc:TAOS-RS://192.168.1.213:6041/mongo3040"
+ }
+ ],
+ "batchSize": 1000,
+ "ignoreTagsUnmatched": true
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
\ No newline at end of file