From 22c4f8cd18419a9c0cec9432fea3898c14104d37 Mon Sep 17 00:00:00 2001 From: zyyang Date: Tue, 22 Mar 2022 13:50:43 +0800 Subject: [PATCH 1/7] [TS-1342]: record missing when batchWrite --- .../datax/plugin/writer/tdenginewriter/DefaultDataHandler.java | 1 + 1 file changed, 1 insertion(+) diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java index 7854afeece..f2d865097b 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java @@ -89,6 +89,7 @@ public int handle(RecordReceiver lineReceiver, TaskPluginCollector collector) { recordBatch.add(record); } else { try { + recordBatch.add(record); affectedRows = writeBatch(conn, recordBatch); } catch (SQLException e) { LOG.warn("use one row insert. because:" + e.getMessage()); From 1856f044b7730933f5dc98d73800851f4b0abd07 Mon Sep 17 00:00:00 2001 From: Zhiyu Yang <69311263+zyyang-taosdata@users.noreply.github.com> Date: Tue, 22 Mar 2022 14:20:42 +0800 Subject: [PATCH 2/7] Update tdenginewriter-CN.md --- tdenginewriter/doc/tdenginewriter-CN.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tdenginewriter/doc/tdenginewriter-CN.md b/tdenginewriter/doc/tdenginewriter-CN.md index 54cb9b13dc..3d115fb79a 100644 --- a/tdenginewriter/doc/tdenginewriter-CN.md +++ b/tdenginewriter/doc/tdenginewriter-CN.md @@ -175,8 +175,8 @@ datax中的数据类型,可以映射到TDengine的数据类型 | TDengine到TDengine | [超级表到子表](../src/test/resources/t2t-3.json) | | TDengine到TDengine | [普通表到普通表](../src/test/resources/t2t-4.json) | | RDBMS到TDengine | [普通表到超级表,指定tbname](../src/test/resources/dm2t-1.json) | -| RDBMS到TDengine | [普通表到超级表,不指定tbname](../src/test/resources/dm2t-2.json) | -| RDBMS到TDengine | [普通表到子表](../src/test/resources/dm2t-3.json) | +| RDBMS到TDengine | [普通表到超级表,不指定tbname](../src/test/resources/dm2t-3.json) | +| RDBMS到TDengine | [普通表到子表](../src/test/resources/dm2t-2.json) | | RDBMS到TDengine | [普通表到普通表](../src/test/resources/dm2t-4.json) | | OpenTSDB到TDengine | [metric到普通表](../src/test/resources/o2t-1.json) | From 7ca8038318e2bde17bc0e7e273520852a4107b22 Mon Sep 17 00:00:00 2001 From: ShiChao <13263325202@163.com> Date: Wed, 23 Mar 2022 19:23:11 +0800 Subject: [PATCH 3/7] [TS-1358]: fix affected row incorrect --- .../plugin/writer/tdenginewriter/DefaultDataHandler.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java index f2d865097b..30ba7e234d 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java @@ -90,10 +90,10 @@ public int handle(RecordReceiver lineReceiver, TaskPluginCollector collector) { } else { try { recordBatch.add(record); - affectedRows = writeBatch(conn, recordBatch); + affectedRows += writeBatch(conn, recordBatch); } catch (SQLException e) { LOG.warn("use one row insert. because:" + e.getMessage()); - affectedRows = writeEachRow(conn, recordBatch); + affectedRows += writeEachRow(conn, recordBatch); } recordBatch.clear(); } @@ -102,10 +102,10 @@ public int handle(RecordReceiver lineReceiver, TaskPluginCollector collector) { if (!recordBatch.isEmpty()) { try { - affectedRows = writeBatch(conn, recordBatch); + affectedRows += writeBatch(conn, recordBatch); } catch (SQLException e) { LOG.warn("use one row insert. because:" + e.getMessage()); - affectedRows = writeEachRow(conn, recordBatch); + affectedRows += writeEachRow(conn, recordBatch); } recordBatch.clear(); } From 81391b60730cc5b030581d3eb9434f4e7a39216b Mon Sep 17 00:00:00 2001 From: ShiChao <13263325202@163.com> Date: Wed, 23 Mar 2022 19:26:13 +0800 Subject: [PATCH 4/7] [TS-1358]: fix affected row incorrect --- .../plugin/writer/tdenginewriter/DefaultDataHandler.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java index f2d865097b..30ba7e234d 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java @@ -90,10 +90,10 @@ public int handle(RecordReceiver lineReceiver, TaskPluginCollector collector) { } else { try { recordBatch.add(record); - affectedRows = writeBatch(conn, recordBatch); + affectedRows += writeBatch(conn, recordBatch); } catch (SQLException e) { LOG.warn("use one row insert. because:" + e.getMessage()); - affectedRows = writeEachRow(conn, recordBatch); + affectedRows += writeEachRow(conn, recordBatch); } recordBatch.clear(); } @@ -102,10 +102,10 @@ public int handle(RecordReceiver lineReceiver, TaskPluginCollector collector) { if (!recordBatch.isEmpty()) { try { - affectedRows = writeBatch(conn, recordBatch); + affectedRows += writeBatch(conn, recordBatch); } catch (SQLException e) { LOG.warn("use one row insert. because:" + e.getMessage()); - affectedRows = writeEachRow(conn, recordBatch); + affectedRows += writeEachRow(conn, recordBatch); } recordBatch.clear(); } From beaf7aa620e11ca3577c03fb58bf43f67c8b57d6 Mon Sep 17 00:00:00 2001 From: zyyang Date: Wed, 15 Jun 2022 10:57:10 +0800 Subject: [PATCH 5/7] feat: migrate mongo's data to tdengine without tbname specified #TS-1538 #TS-1539 --- tdenginereader/pom.xml | 14 +-- tdenginewriter/pom.xml | 31 ++--- .../tdenginewriter/DefaultDataHandler.java | 105 ++++++++++++++--- .../writer/tdenginewriter/SchemaManager.java | 42 ++++++- .../DefaultDataHandlerTest.java | 13 +-- .../tdenginewriter/Mongo2TDengineTest.java | 16 +++ .../tdenginewriter/SchemaManagerTest.java | 21 +++- .../resources/incremental_sync/clean_env.sh | 9 ++ .../resources/incremental_sync/csv2t-jni.json | 106 ++++++++++++++++++ .../incremental_sync/csv2t-restful.json | 57 ++++++++++ .../resources/incremental_sync/dm2t-jni.json | 62 ++++++++++ .../incremental_sync/dm2t-restful.json | 62 ++++++++++ .../incremental_sync/dm2t-update.json | 63 +++++++++++ .../resources/incremental_sync/dm2t_sync.sh | 57 ++++++++++ .../resources/incremental_sync/t2dm-jni.json | 50 +++++++++ .../incremental_sync/t2dm-restful.json | 50 +++++++++ .../test/resources/incremental_sync/upload.sh | 13 +++ .../src/test/resources/mongo2t.json | 66 +++++++++++ 18 files changed, 774 insertions(+), 63 deletions(-) create mode 100644 tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mongo2TDengineTest.java create mode 100755 tdenginewriter/src/test/resources/incremental_sync/clean_env.sh create mode 100644 tdenginewriter/src/test/resources/incremental_sync/csv2t-jni.json create mode 100644 tdenginewriter/src/test/resources/incremental_sync/csv2t-restful.json create mode 100644 tdenginewriter/src/test/resources/incremental_sync/dm2t-jni.json create mode 100644 tdenginewriter/src/test/resources/incremental_sync/dm2t-restful.json create mode 100644 tdenginewriter/src/test/resources/incremental_sync/dm2t-update.json create mode 100755 tdenginewriter/src/test/resources/incremental_sync/dm2t_sync.sh create mode 100644 tdenginewriter/src/test/resources/incremental_sync/t2dm-jni.json create mode 100644 tdenginewriter/src/test/resources/incremental_sync/t2dm-restful.json create mode 100755 tdenginewriter/src/test/resources/incremental_sync/upload.sh create mode 100644 tdenginewriter/src/test/resources/mongo2t.json diff --git a/tdenginereader/pom.xml b/tdenginereader/pom.xml index 319152f82d..075a2789c0 100644 --- a/tdenginereader/pom.xml +++ b/tdenginereader/pom.xml @@ -29,12 +29,6 @@ - - com.alibaba - fastjson - 1.2.78 - - com.alibaba.datax.tdenginewriter tdenginewriter @@ -45,13 +39,7 @@ com.taosdata.jdbc taos-jdbcdriver - 2.0.37 - - - com.alibaba - fastjson - - + 2.0.39 diff --git a/tdenginewriter/pom.xml b/tdenginewriter/pom.xml index 791a4bdcb6..a7564e6bad 100644 --- a/tdenginewriter/pom.xml +++ b/tdenginewriter/pom.xml @@ -20,22 +20,10 @@ - - com.alibaba - fastjson - 1.2.78 - - com.taosdata.jdbc taos-jdbcdriver - 2.0.37 - - - com.alibaba - fastjson - - + 2.0.39 @@ -74,15 +62,16 @@ 5.1.49 test + - - - - - - - - + + + + + + + + diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java index 30ba7e234d..84adaf72e0 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java @@ -17,19 +17,20 @@ import java.util.*; import java.util.Date; import java.util.stream.Collectors; +import java.util.stream.IntStream; public class DefaultDataHandler implements DataHandler { + private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class); + static { try { Class.forName("com.taosdata.jdbc.TSDBDriver"); Class.forName("com.taosdata.jdbc.rs.RestfulDriver"); } catch (ClassNotFoundException e) { - e.printStackTrace(); + LOG.error(e.getMessage(), e); } } - private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class); - private final TaskPluginCollector taskPluginCollector; private String username; private String password; @@ -47,15 +48,15 @@ public void setTableMetas(Map tableMetas) { this.tableMetas = tableMetas; } - public void setColumnMetas(Map> columnMetas) { - this.columnMetas = columnMetas; + public void setTbnameColumnMetasMap(Map> tbnameColumnMetasMap) { + this.tbnameColumnMetasMap = tbnameColumnMetasMap; } public void setSchemaManager(SchemaManager schemaManager) { this.schemaManager = schemaManager; } - private Map> columnMetas; + private Map> tbnameColumnMetasMap; public DefaultDataHandler(Configuration configuration, TaskPluginCollector taskPluginCollector) { this.username = configuration.getString(Key.USERNAME, Constants.DEFAULT_USERNAME); @@ -73,14 +74,13 @@ public int handle(RecordReceiver lineReceiver, TaskPluginCollector collector) { int count = 0; int affectedRows = 0; - try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) { LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established."); // prepare table_name -> table_meta this.schemaManager = new SchemaManager(conn); this.tableMetas = schemaManager.loadTableMeta(tables); // prepare table_name -> column_meta - this.columnMetas = schemaManager.loadColumnMetas(tables); + this.tbnameColumnMetasMap = schemaManager.loadColumnMetas(tables); List recordBatch = new ArrayList<>(); Record record; @@ -152,10 +152,12 @@ public int writeBatch(Connection conn, List recordBatch) throws SQLExcep TableMeta tableMeta = tableMetas.get(table); switch (tableMeta.tableType) { case SUP_TABLE: { - if (columns.contains("tbname")) + if (columns.contains("tbname")) { affectedRows += writeBatchToSupTableBySQL(conn, table, recordBatch); - else - affectedRows += writeBatchToSupTableBySchemaless(conn, table, recordBatch); + } else { + Map tag2Tbname = schemaManager.loadTagTableNameMap(table); + affectedRows += writeBatchToSupTableWithoutTbname(conn, table, recordBatch, tag2Tbname); + } } break; case SUB_TABLE: @@ -169,13 +171,82 @@ public int writeBatch(Connection conn, List recordBatch) throws SQLExcep return affectedRows; } + private int writeBatchToSupTableWithoutTbname(Connection conn, String table, List recordBatch, Map tag2Tbname) throws SQLException { + List columnMetas = tbnameColumnMetasMap.get(table); + List subTableExist = filterSubTableExistRecords(recordBatch, columnMetas, tag2Tbname); + List subTableNotExist = filterSubTableNotExistRecords(recordBatch, columnMetas, tag2Tbname); + + int affectedRows = 0; + Map> subTableRecordsMap = splitRecords(subTableExist, columnMetas, tag2Tbname); + + List subTables = new ArrayList<>(subTableRecordsMap.keySet()); + this.tbnameColumnMetasMap.putAll(schemaManager.loadColumnMetas(subTables)); + + for (String subTable : subTableRecordsMap.keySet()) { + List subTableRecords = subTableRecordsMap.get(subTable); + affectedRows += writeBatchToNormalTable(conn, subTable, subTableRecords); + } + if (!subTableNotExist.isEmpty()) + affectedRows += writeBatchToSupTableBySchemaless(conn, table, subTableNotExist); + return affectedRows; + } + + private List filterSubTableExistRecords(List recordBatch, List columnMetas, Map tag2Tbname) { + return recordBatch.stream().filter(record -> { + String tagStr = getTagString(columnMetas, record); + return tag2Tbname.containsKey(tagStr); + }).collect(Collectors.toList()); + } + + private List filterSubTableNotExistRecords(List recordBatch, List columnMetas, Map tag2Tbname) { + return recordBatch.stream().filter(record -> { + String tagStr = getTagString(columnMetas, record); + return !tag2Tbname.containsKey(tagStr); + }).collect(Collectors.toList()); + } + + private Map> splitRecords(List subTableExist, List columnMetas, Map tag2Tbname) { + Map> ret = new HashMap<>(); + for (Record record : subTableExist) { + String tagstr = getTagString(columnMetas, record); + String tbname = tag2Tbname.get(tagstr); + if (ret.containsKey(tbname)) { + ret.get(tbname).add(record); + } else { + List list = new ArrayList<>(); + list.add(record); + ret.put(tbname, list); + } + } + return ret; + } + + private String getTagString(List columnMetas, Record record) { + return IntStream.range(0, columnMetas.size()).mapToObj(colIndex -> { + ColumnMeta columnMeta = columnMetas.get(colIndex); + if (columnMeta.isTag) { + Column column = record.getColumn(colIndex); + switch (columnMeta.type) { + case "TINYINT": + case "SMALLINT": + case "INT": + case "BIGINT": + return column.asLong().toString(); + default: + return column.asString(); + } + } + return ""; + }).collect(Collectors.joining()); + } + /** * insert into record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], ) * record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], ) * record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], ) */ private int writeBatchToSupTableBySQL(Connection conn, String table, List recordBatch) throws SQLException { - List columnMetas = this.columnMetas.get(table); + List columnMetas = this.tbnameColumnMetasMap.get(table); StringBuilder sb = new StringBuilder("insert into"); for (Record record : recordBatch) { @@ -257,7 +328,7 @@ private int writeBatchToSupTableBySchemaless(Connection conn, String table, List int count = 0; TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision(); - List columnMetaList = this.columnMetas.get(table); + List columnMetaList = this.tbnameColumnMetasMap.get(table); ColumnMeta ts = columnMetaList.stream().filter(colMeta -> colMeta.isPrimaryKey).findFirst().get(); List lines = new ArrayList<>(); @@ -394,7 +465,7 @@ private String buildSchemalessColumnValue(ColumnMeta colMeta, Record record) { * insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)]) */ private int writeBatchToSubTable(Connection conn, String table, List recordBatch) throws SQLException { - List columnMetas = this.columnMetas.get(table); + List columnMetas = this.tbnameColumnMetasMap.get(table); StringBuilder sb = new StringBuilder(); sb.append("insert into ").append(table).append(" ") @@ -464,18 +535,18 @@ private boolean equals(Column column, ColumnMeta colMeta) { * sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...) */ private int writeBatchToNormalTable(Connection conn, String table, List recordBatch) throws SQLException { - List columnMetas = this.columnMetas.get(table); + List columnMetas = this.tbnameColumnMetasMap.get(table); StringBuilder sb = new StringBuilder(); sb.append("insert into ").append(table) .append(" ") - .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> { + .append(columnMetas.stream().filter(colMeta -> !colMeta.isTag).filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> { return colMeta.field; }).collect(Collectors.joining(",", "(", ")"))) .append(" values "); for (Record record : recordBatch) { - sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> { + sb.append(columnMetas.stream().filter(colMeta -> !colMeta.isTag).filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> { return buildColumnValue(colMeta, record); }).collect(Collectors.joining(",", "(", ")"))); } diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java index c48b7942dc..fc0c002d91 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java @@ -5,17 +5,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; +import java.sql.*; import java.util.*; +import java.util.stream.Collectors; public class SchemaManager { private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class); +// private static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = "_"; + private static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = ""; private final Connection conn; private TimestampPrecision precision; + private Map> tags2tbnameMaps = new HashMap<>(); public SchemaManager(Connection conn) { this.conn = conn; @@ -169,4 +170,37 @@ private ColumnMeta buildColumnMeta(ResultSet rs, boolean isPrimaryKey) throws SQ return columnMeta; } + public Map loadTagTableNameMap(String table) throws SQLException { + if (tags2tbnameMaps.containsKey(table)) + return tags2tbnameMaps.get(table); + Map tags2tbname = new HashMap<>(); + try (Statement stmt = conn.createStatement()) { + // describe table + List tags = new ArrayList<>(); + ResultSet rs = stmt.executeQuery("describe " + table); + while (rs.next()) { + String note = rs.getString("Note"); + if ("TAG".equals(note)) { + tags.add(rs.getString("Field")); + } + } + // select distinct tbname, t1, t2 from stb + rs = stmt.executeQuery("select distinct " + String.join(",", tags) + ",tbname from " + table); + while (rs.next()) { + ResultSet finalRs = rs; + String tagStr = tags.stream().map(t -> { + try { + return finalRs.getString(t); + } catch (SQLException e) { + LOG.error(e.getMessage(), e); + } + return "NULL"; + }).collect(Collectors.joining(TAG_TABLE_NAME_MAP_KEY_SPLITTER)); + String tbname = rs.getString("tbname"); + tags2tbname.put(tagStr, tbname); + } + } + tags2tbnameMaps.put(table, tags2tbname); + return tags2tbname; + } } diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java index 8840aa28fb..46e601ad4a 100644 --- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java @@ -5,7 +5,6 @@ import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.element.StringColumn; import com.alibaba.datax.common.plugin.TaskPluginCollector; -import com.alibaba.datax.common.spi.Writer; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.core.transport.record.DefaultRecord; import org.junit.AfterClass; @@ -60,7 +59,7 @@ public void writeSupTableBySQL() throws SQLException { Map tableMetas = schemaManager.loadTableMeta(tables); Map> columnMetas = schemaManager.loadColumnMetas(tables); handler.setTableMetas(tableMetas); - handler.setColumnMetas(columnMetas); + handler.setTbnameColumnMetasMap(columnMetas); handler.setSchemaManager(schemaManager); int count = handler.writeBatch(conn, recordList); @@ -98,7 +97,7 @@ public void writeSupTableBySQL_2() throws SQLException { Map tableMetas = schemaManager.loadTableMeta(tables); Map> columnMetas = schemaManager.loadColumnMetas(tables); handler.setTableMetas(tableMetas); - handler.setColumnMetas(columnMetas); + handler.setTbnameColumnMetasMap(columnMetas); handler.setSchemaManager(schemaManager); int count = handler.writeBatch(conn, recordList); @@ -138,7 +137,7 @@ public void writeSupTableBySchemaless() throws SQLException { Map tableMetas = schemaManager.loadTableMeta(tables); Map> columnMetas = schemaManager.loadColumnMetas(tables); handler.setTableMetas(tableMetas); - handler.setColumnMetas(columnMetas); + handler.setTbnameColumnMetasMap(columnMetas); handler.setSchemaManager(schemaManager); int count = handler.writeBatch(connection, recordList); @@ -177,7 +176,7 @@ public void writeSubTableWithTableName() throws SQLException { Map tableMetas = schemaManager.loadTableMeta(tables); Map> columnMetas = schemaManager.loadColumnMetas(tables); handler.setTableMetas(tableMetas); - handler.setColumnMetas(columnMetas); + handler.setTbnameColumnMetasMap(columnMetas); handler.setSchemaManager(schemaManager); int count = handler.writeBatch(conn, recordList); @@ -216,7 +215,7 @@ public void writeSubTableWithoutTableName() throws SQLException { Map tableMetas = schemaManager.loadTableMeta(tables); Map> columnMetas = schemaManager.loadColumnMetas(tables); handler.setTableMetas(tableMetas); - handler.setColumnMetas(columnMetas); + handler.setTbnameColumnMetasMap(columnMetas); handler.setSchemaManager(schemaManager); int count = handler.writeBatch(conn, recordList); @@ -255,7 +254,7 @@ public void writeNormalTable() throws SQLException { Map tableMetas = schemaManager.loadTableMeta(tables); Map> columnMetas = schemaManager.loadColumnMetas(tables); handler.setTableMetas(tableMetas); - handler.setColumnMetas(columnMetas); + handler.setTbnameColumnMetasMap(columnMetas); handler.setSchemaManager(schemaManager); int count = handler.writeBatch(conn, recordList); diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mongo2TDengineTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mongo2TDengineTest.java new file mode 100644 index 0000000000..2356b6f808 --- /dev/null +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mongo2TDengineTest.java @@ -0,0 +1,16 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import com.alibaba.datax.core.Engine; +import org.junit.Test; + +public class Mongo2TDengineTest { + + @Test + public void case01() throws Throwable { + + // when + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/mongo2t.json"}; + System.setProperty("datax.home", "../target/datax/datax"); + Engine.entry(params); + } +} \ No newline at end of file diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManagerTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManagerTest.java index 6df0de1da5..3708e6f967 100644 --- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManagerTest.java +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManagerTest.java @@ -1,6 +1,5 @@ package com.alibaba.datax.plugin.writer.tdenginewriter; -import com.alibaba.fastjson.util.TypeUtils; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -62,6 +61,23 @@ public void loadColumnMetas() { Assert.assertEquals(4, stb1.size()); } + @Test + public void loadTagTableNameMap() throws SQLException { + // given + SchemaManager schemaManager = new SchemaManager(conn); + String table = "stb3"; + + // when + Map tagTableMap = schemaManager.loadTagTableNameMap(table); + + // then + Assert.assertEquals(2, tagTableMap.keySet().size()); + Assert.assertTrue(tagTableMap.containsKey("11.1abc")); + Assert.assertTrue(tagTableMap.containsKey("22.2defg")); + Assert.assertEquals("tb5", tagTableMap.get("11.1abc")); + Assert.assertEquals("tb6", tagTableMap.get("22.2defg")); + } + @BeforeClass public static void beforeClass() throws SQLException { conn = DriverManager.getConnection("jdbc:TAOS-RS://192.168.56.105:6041", "root", "taosdata"); @@ -76,6 +92,9 @@ public static void beforeClass() throws SQLException { stmt.execute("insert into tb3 using stb2 tags(1,1) values(now, 1, 2, 3)"); stmt.execute("insert into tb4 using stb2 tags(2,2) values(now, 1, 2, 3)"); stmt.execute("create table weather(ts timestamp, f1 int, f2 int, f3 int, t1 int, t2 int)"); + stmt.execute("create table stb3(ts timestamp, f1 int) tags(t1 int, t2 float, t3 nchar(32))"); + stmt.execute("insert into tb5 using stb3 tags(1,1.1,'abc') values(now, 1)"); + stmt.execute("insert into tb6 using stb3 tags(2,2.2,'defg') values(now, 2)"); } } diff --git a/tdenginewriter/src/test/resources/incremental_sync/clean_env.sh b/tdenginewriter/src/test/resources/incremental_sync/clean_env.sh new file mode 100755 index 0000000000..f3dca7c13f --- /dev/null +++ b/tdenginewriter/src/test/resources/incremental_sync/clean_env.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +datax_home_dir=$(dirname $(readlink -f "$0")) + +curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'drop table if exists db2.stb2;' 192.168.1.93:6041/rest/sql +curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'create table if not exists db2.stb2 (`ts` TIMESTAMP,`f2` SMALLINT,`f4` BIGINT,`f5` FLOAT,`f6` DOUBLE,`f7` DOUBLE,`f8` BOOL,`f9` NCHAR(100),`f10` NCHAR(200)) TAGS (`f1` TINYINT,`f3` INT);' 192.168.1.93:6041/rest/sql + +rm -f ${datax_home_dir}/log/* +rm -f ${datax_home_dir}/job/*.csv \ No newline at end of file diff --git a/tdenginewriter/src/test/resources/incremental_sync/csv2t-jni.json b/tdenginewriter/src/test/resources/incremental_sync/csv2t-jni.json new file mode 100644 index 0000000000..625c38010c --- /dev/null +++ b/tdenginewriter/src/test/resources/incremental_sync/csv2t-jni.json @@ -0,0 +1,106 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "txtfilereader", + "parameter": { + "path": [ + "/root/workspace/tmp/a.txt" + ], + "encoding": "UTF-8", + "column": [ + { + "index": 0, + "type": "date", + "format": "yyyy-MM-dd HH:mm:ss.SSS" + }, + { + "index": 1, + "type": "long" + }, + { + "index": 2, + "type": "long" + }, + { + "index": 3, + "type": "long" + }, + { + "index": 4, + "type": "long" + }, + { + "index": 5, + "type": "double" + }, + { + "index": 6, + "type": "double" + }, + { + "index": 7, + "type": "boolean" + }, + { + "index": 8, + "type": "string" + }, + { + "index": 9, + "type": "string" + }, + { + "index": 10, + "type": "date", + "format": "yyyy-MM-dd HH:mm:ss.SSS" + }, + { + "index": 11, + "type": "string" + } + ], + "fieldDelimiter": "," + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "username": "root", + "password": "taosdata", + "column": [ + "ts", + "f1", + "f2", + "f3", + "f4", + "f5", + "f6", + "f7", + "f8", + "f9", + "t1", + "tbname" + ], + "connection": [ + { + "table": [ + "stb2" + ], + "jdbcUrl": "jdbc:TAOS://192.168.1.93:6030/db2" + } + ], + "batchSize": 1000, + "ignoreTagsUnmatched": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/tdenginewriter/src/test/resources/incremental_sync/csv2t-restful.json b/tdenginewriter/src/test/resources/incremental_sync/csv2t-restful.json new file mode 100644 index 0000000000..d852e2e2c1 --- /dev/null +++ b/tdenginewriter/src/test/resources/incremental_sync/csv2t-restful.json @@ -0,0 +1,57 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "txtfilereader", + "parameter": { + "path": [ + "/root/workspace/tmp/a.txt" + ], + "encoding": "UTF-8", + "column": [ + "*" + ], + "fieldDelimiter": "," + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "username": "root", + "password": "taosdata", + "column": [ + "ts", + "f1", + "f2", + "f3", + "f4", + "f5", + "f6", + "f7", + "f8", + "f9", + "t1", + "tbname" + ], + "connection": [ + { + "table": [ + "stb2" + ], + "jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2" + } + ], + "batchSize": 1000, + "ignoreTagsUnmatched": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/tdenginewriter/src/test/resources/incremental_sync/dm2t-jni.json b/tdenginewriter/src/test/resources/incremental_sync/dm2t-jni.json new file mode 100644 index 0000000000..3e86bb8deb --- /dev/null +++ b/tdenginewriter/src/test/resources/incremental_sync/dm2t-jni.json @@ -0,0 +1,62 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "rdbmsreader", + "parameter": { + "username": "TESTUSER", + "password": "test123456", + "connection": [ + { + "querySql": [ + "select concat(concat(concat('t', f1), '_'),f3) as tbname,* from stb1;" + ], + "jdbcUrl": [ + "jdbc:dm://192.168.0.72:5236" + ] + } + ], + "fetchSize": 1024 + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "username": "root", + "password": "taosdata", + "column": [ + "tbname", + "ts", + "f1", + "f2", + "f3", + "f4", + "f5", + "f6", + "f7", + "f8", + "f9", + "f10" + ], + "connection": [ + { + "table": [ + "stb2" + ], + "jdbcUrl": "jdbc:TAOS://192.168.1.93:6030/db2" + } + ], + "batchSize": 1000, + "ignoreTagsUnmatched": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/tdenginewriter/src/test/resources/incremental_sync/dm2t-restful.json b/tdenginewriter/src/test/resources/incremental_sync/dm2t-restful.json new file mode 100644 index 0000000000..183786bf82 --- /dev/null +++ b/tdenginewriter/src/test/resources/incremental_sync/dm2t-restful.json @@ -0,0 +1,62 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "rdbmsreader", + "parameter": { + "username": "TESTUSER", + "password": "test123456", + "connection": [ + { + "querySql": [ + "select concat(concat(concat('t', f1), '_'),f3) as tbname,* from stb1;" + ], + "jdbcUrl": [ + "jdbc:dm://192.168.0.72:5236" + ] + } + ], + "fetchSize": 1024 + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "username": "root", + "password": "taosdata", + "column": [ + "tbname", + "ts", + "f1", + "f2", + "f3", + "f4", + "f5", + "f6", + "f7", + "f8", + "f9", + "f10" + ], + "connection": [ + { + "table": [ + "stb2" + ], + "jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2" + } + ], + "batchSize": 1000, + "ignoreTagsUnmatched": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/tdenginewriter/src/test/resources/incremental_sync/dm2t-update.json b/tdenginewriter/src/test/resources/incremental_sync/dm2t-update.json new file mode 100644 index 0000000000..d9285b23cd --- /dev/null +++ b/tdenginewriter/src/test/resources/incremental_sync/dm2t-update.json @@ -0,0 +1,63 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "rdbmsreader", + "parameter": { + "username": "TESTUSER", + "password": "test123456", + "connection": [ + { + "querySql": [ + "select concat(concat(concat('t', f1), '_'),f3) as tbname,* from stb1" + ], + "jdbcUrl": [ + "jdbc:dm://192.168.0.72:5236" + ] + } + ], + "where": "1=1", + "fetchSize": 1024 + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "username": "root", + "password": "taosdata", + "column": [ + "tbname", + "ts", + "f1", + "f2", + "f3", + "f4", + "f5", + "f6", + "f7", + "f8", + "f9", + "f10" + ], + "connection": [ + { + "table": [ + "stb2" + ], + "jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2" + } + ], + "batchSize": 1000, + "ignoreTagsUnmatched": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/tdenginewriter/src/test/resources/incremental_sync/dm2t_sync.sh b/tdenginewriter/src/test/resources/incremental_sync/dm2t_sync.sh new file mode 100755 index 0000000000..426c623338 --- /dev/null +++ b/tdenginewriter/src/test/resources/incremental_sync/dm2t_sync.sh @@ -0,0 +1,57 @@ +#!/bin/bash + +set -e +#set -x + +datax_home_dir=$(dirname $(readlink -f "$0")) +table_name="stb1" +update_key="ts" + +while getopts "hd:t:" arg; do + case $arg in + d) + datax_home_dir=$(echo $OPTARG) + ;; + v) + table_name=$(echo $OPTARG) + ;; + h) + echo "Usage: $(basename $0) -d [datax_home_dir] -t [table_name] -k [update_key]" + echo " -h help" + exit 0 + ;; + ?) #unknow option + echo "unkonw argument" + exit 1 + ;; + esac +done + +if [[ -e ${datax_home_dir}/job/${table_name}.csv ]]; then + MAX_TIME=$(cat ${datax_home_dir}/job/${table_name}.csv) +else + MAX_TIME="null" +fi +current_datetime=$(date +"%Y-%m-%d %H:%M:%S") +current_timestamp=$(date +%s) + +if [ "$MAX_TIME" != "null" ]; then + WHERE="${update_key} >= '$MAX_TIME' and ${update_key} < '$current_datetime'" + sed "s/1=1/$WHERE/g" ${datax_home_dir}/job/dm2t-update.json >${datax_home_dir}/job/dm2t_${current_timestamp}.json + echo "incremental data synchronization, from '${MAX_TIME}' to '${current_datetime}'" + python ${datax_home_dir}/bin/datax.py ${datax_home_dir}/job/dm2t_${current_timestamp}.json 1> /dev/null 2>&1 +else + echo "full data synchronization, to '${current_datetime}'" + python ${datax_home_dir}/bin/datax.py ${datax_home_dir}/job/dm2t-update.json 1> /dev/null 2>&1 +fi + +if [[ $? -ne 0 ]]; then + echo "datax migration job falied" +else + echo ${current_datetime} >$datax_home_dir/job/${table_name}.csv + echo "datax migration job success" +fi + +rm -rf ${datax_home_dir}/job/dm2t_${current_timestamp}.json + +#while true; do ./dm2t_sync.sh; sleep 5s; done \ No newline at end of file diff --git a/tdenginewriter/src/test/resources/incremental_sync/t2dm-jni.json b/tdenginewriter/src/test/resources/incremental_sync/t2dm-jni.json new file mode 100644 index 0000000000..341f6293ed --- /dev/null +++ b/tdenginewriter/src/test/resources/incremental_sync/t2dm-jni.json @@ -0,0 +1,50 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "tdenginereader", + "parameter": { + "username": "root", + "password": "taosdata", + "column": [ + "*" + ], + "connection": [ + { + "table": [ + "stb1" + ], + "jdbcUrl": "jdbc:TAOS://192.168.56.105:6030/db1" + } + ] + } + }, + "writer": { + "name": "rdbmswriter", + "parameter": { + "connection": [ + { + "table": [ + "stb2" + ], + "jdbcUrl": "jdbc:dm://192.168.0.72:5236" + } + ], + "username": "TESTUSER", + "password": "test123456", + "table": "stb2", + "column": [ + "*" + ] + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/tdenginewriter/src/test/resources/incremental_sync/t2dm-restful.json b/tdenginewriter/src/test/resources/incremental_sync/t2dm-restful.json new file mode 100644 index 0000000000..b2cf91e2ee --- /dev/null +++ b/tdenginewriter/src/test/resources/incremental_sync/t2dm-restful.json @@ -0,0 +1,50 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "tdenginereader", + "parameter": { + "username": "root", + "password": "taosdata", + "column": [ + "*" + ], + "connection": [ + { + "table": [ + "stb1" + ], + "jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/db1" + } + ] + } + }, + "writer": { + "name": "rdbmswriter", + "parameter": { + "connection": [ + { + "table": [ + "stb2" + ], + "jdbcUrl": "jdbc:dm://192.168.0.72:5236" + } + ], + "username": "TESTUSER", + "password": "test123456", + "table": "stb2", + "column": [ + "*" + ] + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/tdenginewriter/src/test/resources/incremental_sync/upload.sh b/tdenginewriter/src/test/resources/incremental_sync/upload.sh new file mode 100755 index 0000000000..388d275b03 --- /dev/null +++ b/tdenginewriter/src/test/resources/incremental_sync/upload.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +scp t2dm-restful.json root@192.168.56.105:/root/workspace/tmp/datax/job +scp t2dm-jni.json root@192.168.56.105:/root/workspace/tmp/datax/job +scp dm2t-restful.json root@192.168.56.105:/root/workspace/tmp/datax/job +scp dm2t-jni.json root@192.168.56.105:/root/workspace/tmp/datax/job +scp dm2t-update.json root@192.168.56.105:/root/workspace/tmp/datax/job +scp csv2t-restful.json root@192.168.56.105:/root/workspace/tmp/datax/job +scp csv2t-jni.json root@192.168.56.105:/root/workspace/tmp/datax/job + + +scp dm2t_sync.sh root@192.168.56.105:/root/workspace/tmp/datax +scp clean_env.sh root@192.168.56.105:/root/workspace/tmp/datax \ No newline at end of file diff --git a/tdenginewriter/src/test/resources/mongo2t.json b/tdenginewriter/src/test/resources/mongo2t.json new file mode 100644 index 0000000000..902e6f7c74 --- /dev/null +++ b/tdenginewriter/src/test/resources/mongo2t.json @@ -0,0 +1,66 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "mongodbreader", + "parameter": { + "address": [ + "192.168.1.213:27017" + ], + "userName": "", + "userPassword": "", + "dbName": "testdb", + "collectionName": "monitor_data", + "column": [ + { + "name": "ct", + "type": "date" + }, + { + "name": "pv", + "type": "float" + }, + { + "name": "tv", + "type": "float" + }, + { + "name": "pid", + "type": "float" + } + ] + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "username": "root", + "password": "hmdata", + "column": [ + "ts", + "pressure", + "temperature", + "position_id" + ], + "connection": [ + { + "table": [ + "pipeline_data" + ], + "jdbcUrl": "jdbc:TAOS-RS://192.168.1.213:6041/mongo3040" + } + ], + "batchSize": 1000, + "ignoreTagsUnmatched": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file From 2de4e5513798c37b6ea0c1e05fba5aa6fc28e5a8 Mon Sep 17 00:00:00 2001 From: zyyang Date: Thu, 16 Jun 2022 13:06:23 +0800 Subject: [PATCH 6/7] handle null data --- .../datax/plugin/writer/tdenginewriter/DefaultDataHandler.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java index 84adaf72e0..27ade38278 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java @@ -307,6 +307,8 @@ private String buildColumnValue(ColumnMeta colMeta, Record record) { if (colMeta.type.equals("TIMESTAMP")) return "\"" + column.asString() + "\""; String value = column.asString(); + if (value == null) + return "NULL"; return "\'" + Utils.escapeSingleQuota(value) + "\'"; case NULL: case BAD: From bef624414ab6f20d4b18290b040b446b2579019f Mon Sep 17 00:00:00 2001 From: zyyang Date: Tue, 28 Jun 2022 15:43:53 +0800 Subject: [PATCH 7/7] merge taosdata/master --- .../datax/plugin/reader/TDengineReader.java | 3 +- .../writer/tdenginewriter/Constants.java | 4 +- .../tdenginewriter/DefaultDataHandler.java | 165 ++++++------------ .../writer/tdenginewriter/SchemaManager.java | 10 +- .../TDengineWriterErrorCode.java | 8 +- .../tdenginewriter/DM2TDengineTest.java | 2 - .../DefaultDataHandlerTest.java | 18 +- .../tdenginewriter/Mysql2TDengineTest.java | 2 - .../tdenginewriter/Opentsdb2TDengineTest.java | 2 - .../tdenginewriter/SchemaManagerTest.java | 6 +- .../tdenginewriter/Stream2TDengineTest.java | 3 +- .../tdenginewriter/TDengine2TDengineTest.java | 2 - 12 files changed, 79 insertions(+), 146 deletions(-) diff --git a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java index 379777941d..4ec42d9e39 100644 --- a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java +++ b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java @@ -93,7 +93,7 @@ public void init() { } if (start >= end) throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, - "The parameter " + Key.BEGIN_DATETIME + ": " + beginDatetime + " should be less than the parameter " + Key.END_DATETIME + ": " + endDatetime + "."); + "The parameter [" + Key.BEGIN_DATETIME + "] should be less than the parameter [" + Key.END_DATETIME + "]."); } @@ -119,6 +119,7 @@ public List split(int adviceNumber) { } } + LOG.info("Configuration: {}", configurations); return configurations; } } diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Constants.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Constants.java index e0445219e8..d62c8f3273 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Constants.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Constants.java @@ -1,6 +1,8 @@ package com.alibaba.datax.plugin.writer.tdenginewriter; public class Constants { - public static final int DEFAULT_BATCH_SIZE = 1000; + public static final String DEFAULT_USERNAME = "root"; + public static final String DEFAULT_PASSWORD = "taosdata"; + public static final int DEFAULT_BATCH_SIZE = 1; public static final boolean DEFAULT_IGNORE_TAGS_UNMATCHED = false; } \ No newline at end of file diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java index 8eb7466ab0..27ade38278 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java @@ -22,6 +22,15 @@ public class DefaultDataHandler implements DataHandler { private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class); + static { + try { + Class.forName("com.taosdata.jdbc.TSDBDriver"); + Class.forName("com.taosdata.jdbc.rs.RestfulDriver"); + } catch (ClassNotFoundException e) { + LOG.error(e.getMessage(), e); + } + } + private final TaskPluginCollector taskPluginCollector; private String username; private String password; @@ -49,17 +58,9 @@ public void setSchemaManager(SchemaManager schemaManager) { private Map> tbnameColumnMetasMap; - static { - try { - Class.forName("com.taosdata.jdbc.TSDBDriver"); - Class.forName("com.taosdata.jdbc.rs.RestfulDriver"); - } catch (ClassNotFoundException ignored) { - } - } - public DefaultDataHandler(Configuration configuration, TaskPluginCollector taskPluginCollector) { - this.username = configuration.getString(Key.USERNAME); - this.password = configuration.getString(Key.PASSWORD); + this.username = configuration.getString(Key.USERNAME, Constants.DEFAULT_USERNAME); + this.password = configuration.getString(Key.PASSWORD, Constants.DEFAULT_PASSWORD); this.jdbcUrl = configuration.getString(Key.JDBC_URL); this.batchSize = configuration.getInt(Key.BATCH_SIZE, Constants.DEFAULT_BATCH_SIZE); this.tables = configuration.getList(Key.TABLE, String.class); @@ -75,13 +76,11 @@ public int handle(RecordReceiver lineReceiver, TaskPluginCollector collector) { try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) { LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established."); - if (schemaManager == null) { - // prepare table_name -> table_meta - this.schemaManager = new SchemaManager(conn); - this.tableMetas = schemaManager.loadTableMeta(tables); - // prepare table_name -> column_meta - this.columnMetas = schemaManager.loadColumnMetas(tables); - } + // prepare table_name -> table_meta + this.schemaManager = new SchemaManager(conn); + this.tableMetas = schemaManager.loadTableMeta(tables); + // prepare table_name -> column_meta + this.tbnameColumnMetasMap = schemaManager.loadColumnMetas(tables); List recordBatch = new ArrayList<>(); Record record; @@ -92,7 +91,7 @@ public int handle(RecordReceiver lineReceiver, TaskPluginCollector collector) { try { recordBatch.add(record); affectedRows += writeBatch(conn, recordBatch); - } catch (Exception e) { + } catch (SQLException e) { LOG.warn("use one row insert. because:" + e.getMessage()); affectedRows += writeEachRow(conn, recordBatch); } @@ -104,7 +103,7 @@ public int handle(RecordReceiver lineReceiver, TaskPluginCollector collector) { if (!recordBatch.isEmpty()) { try { affectedRows += writeBatch(conn, recordBatch); - } catch (Exception e) { + } catch (SQLException e) { LOG.warn("use one row insert. because:" + e.getMessage()); affectedRows += writeEachRow(conn, recordBatch); } @@ -128,8 +127,8 @@ private int writeEachRow(Connection conn, List recordBatch) { recordList.add(record); try { affectedRows += writeBatch(conn, recordList); - } catch (Exception e) { - LOG.error(e.getMessage(), e); + } catch (SQLException e) { + LOG.error(e.getMessage()); this.taskPluginCollector.collectDirtyRecord(record, e); } } @@ -147,7 +146,7 @@ private int writeEachRow(Connection conn, List recordBatch) { * 3. 对于tb,拼sql,例如:data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => insert into tb(ts, f1, f2) values(ts, f1, f2) * 4. 对于t,拼sql,例如:data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, f3, t1, t2] insert into t(ts, f1, f2, f3, t1, t2) values(ts, f1, f2, f3, t1, t2) */ - public int writeBatch(Connection conn, List recordBatch) throws Exception { + public int writeBatch(Connection conn, List recordBatch) throws SQLException { int affectedRows = 0; for (String table : tables) { TableMeta tableMeta = tableMetas.get(table); @@ -246,62 +245,31 @@ private String getTagString(List columnMetas, Record record) { * record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], ) * record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], ) */ - private int writeBatchToSupTableBySQL(Connection conn, String table, List recordBatch) throws Exception { - List columnMetas = this.columnMetas.get(table); + private int writeBatchToSupTableBySQL(Connection conn, String table, List recordBatch) throws SQLException { + List columnMetas = this.tbnameColumnMetasMap.get(table); StringBuilder sb = new StringBuilder("insert into"); for (Record record : recordBatch) { sb.append(" ").append(record.getColumn(indexOf("tbname")).asString()) .append(" using ").append(table) - .append(" tags"); -// sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { -// return colMeta.isTag; -// }).map(colMeta -> { -// return buildColumnValue(colMeta, record); -// }).collect(Collectors.joining(",", "(", ")"))); - sb.append("("); - for (int i = 0; i < columns.size(); i++) { - ColumnMeta colMeta = columnMetas.get(i); - if (!columns.contains(colMeta.field)) - continue; - if (!colMeta.isTag) - continue; - String tagValue = buildColumnValue(colMeta, record); - if (i == 0) { - sb.append(tagValue); - } else { - sb.append(",").append(tagValue); - } - } - sb.append(")"); - - sb.append(" ").append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { + .append(" tags") + .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { + return colMeta.isTag; + }).map(colMeta -> { + return buildColumnValue(colMeta, record); + }).collect(Collectors.joining(",", "(", ")"))) + .append(" ") + .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { return !colMeta.isTag; }).map(colMeta -> { return colMeta.field; }).collect(Collectors.joining(",", "(", ")"))) - .append(" values"); - -// sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { -// return !colMeta.isTag; -// }).map(colMeta -> { -// return buildColumnValue(colMeta, record); -// }).collect(Collectors.joining(",", "(", ")"))); - sb.append("("); - for (int i = 0; i < columnMetas.size(); i++) { - ColumnMeta colMeta = columnMetas.get(i); - if (!columns.contains(colMeta.field)) - continue; - if (colMeta.isTag) - continue; - String colValue = buildColumnValue(colMeta, record); - if (i == 0) { - sb.append(colValue); - } else { - sb.append(",").append(colValue); - } - } - sb.append(")"); + .append(" values") + .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { + return !colMeta.isTag; + }).map(colMeta -> { + return buildColumnValue(colMeta, record); + }).collect(Collectors.joining(",", "(", ")"))); } String sql = sb.toString(); @@ -317,11 +285,10 @@ private int executeUpdate(Connection conn, String sql) throws SQLException { return count; } - private String buildColumnValue(ColumnMeta colMeta, Record record) throws Exception { + private String buildColumnValue(ColumnMeta colMeta, Record record) { Column column = record.getColumn(indexOf(colMeta.field)); TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision(); - Column.Type type = column.getType(); - switch (type) { + switch (column.getType()) { case DATE: { Date value = column.asDate(); switch (timestampPrecision) { @@ -350,9 +317,8 @@ private String buildColumnValue(ColumnMeta colMeta, Record record) throws Except case DOUBLE: case INT: case LONG: - column.asString(); default: - throw new Exception("invalid column type: " + type); + return column.asString(); } } @@ -500,8 +466,8 @@ private String buildSchemalessColumnValue(ColumnMeta colMeta, Record record) { * else * insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)]) */ - private int writeBatchToSubTable(Connection conn, String table, List recordBatch) throws Exception { - List columnMetas = this.columnMetas.get(table); + private int writeBatchToSubTable(Connection conn, String table, List recordBatch) throws SQLException { + List columnMetas = this.tbnameColumnMetasMap.get(table); StringBuilder sb = new StringBuilder(); sb.append("insert into ").append(table).append(" ") @@ -527,25 +493,11 @@ private int writeBatchToSubTable(Connection conn, String table, List rec if (ignoreTagsUnmatched && !tagsAllMatch) continue; -// sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { -// return !colMeta.isTag; -// }).map(colMeta -> { -// return buildColumnValue(colMeta, record); -// }).collect(Collectors.joining(", ", "(", ") "))); - sb.append("("); - for (int i = 0; i < columnMetas.size(); i++) { - ColumnMeta colMeta = columnMetas.get(i); - if (colMeta.isTag) - continue; - String colValue = buildColumnValue(colMeta, record); - if (i == 0) { - sb.append(colValue); - } else { - sb.append(",").append(colValue); - } - } - sb.append(")"); - + sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { + return !colMeta.isTag; + }).map(colMeta -> { + return buildColumnValue(colMeta, record); + }).collect(Collectors.joining(", ", "(", ") "))); validRecords++; } @@ -584,8 +536,8 @@ private boolean equals(Column column, ColumnMeta colMeta) { * table: ["weather"], column: ["ts, f1, f2, f3, t1, t2"] * sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...) */ - private int writeBatchToNormalTable(Connection conn, String table, List recordBatch) throws Exception { - List columnMetas = this.columnMetas.get(table); + private int writeBatchToNormalTable(Connection conn, String table, List recordBatch) throws SQLException { + List columnMetas = this.tbnameColumnMetasMap.get(table); StringBuilder sb = new StringBuilder(); sb.append("insert into ").append(table) @@ -596,22 +548,9 @@ private int writeBatchToNormalTable(Connection conn, String table, List .append(" values "); for (Record record : recordBatch) { -// sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> { -// return buildColumnValue(colMeta, record); -// }).collect(Collectors.joining(",", "(", ")"))); - sb.append("("); - for (int i = 0; i < columnMetas.size(); i++) { - ColumnMeta colMeta = columnMetas.get(i); - if (!columns.contains(colMeta.field)) - continue; - String colValue = buildColumnValue(colMeta, record); - if (i == 0) { - sb.append(colValue); - } else { - sb.append(",").append(colValue); - } - } - sb.append(")"); + sb.append(columnMetas.stream().filter(colMeta -> !colMeta.isTag).filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> { + return buildColumnValue(colMeta, record); + }).collect(Collectors.joining(",", "(", ")"))); } String sql = sb.toString(); diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java index 2e77354022..fc0c002d91 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java @@ -124,14 +124,12 @@ public Map> loadColumnMetas(List tables) throws } } } catch (SQLException e) { - LOG.error(e.getMessage(), e); + e.printStackTrace(); } colMeta.value = value; }); - LOG.debug("load column metadata of " + table + ": " + - columnMetaList.stream().map(ColumnMeta::toString).collect(Collectors.joining(",", "[", "]")) - ); + LOG.debug("load column metadata of " + table + ": " + Arrays.toString(columnMetaList.toArray())); ret.put(table, columnMetaList); } return ret; @@ -145,9 +143,7 @@ private TableMeta buildSupTableMeta(ResultSet rs) throws SQLException { tableMeta.tags = rs.getInt("tags"); tableMeta.tables = rs.getInt("tables"); - if (LOG.isDebugEnabled()){ - LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta); - } + LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta); return tableMeta; } diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java index e1f2bc291c..469449e63a 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriterErrorCode.java @@ -4,10 +4,10 @@ public enum TDengineWriterErrorCode implements ErrorCode { - REQUIRED_VALUE("TDengineWriter-00", "parameter value is missing"), - ILLEGAL_VALUE("TDengineWriter-01", "invalid parameter value"), - RUNTIME_EXCEPTION("TDengineWriter-02", "runtime exception"), - TYPE_ERROR("TDengineWriter-03", "data type mapping error"); + REQUIRED_VALUE("TDengineWriter-00", "缺失必要的值"), + ILLEGAL_VALUE("TDengineWriter-01", "值非法"), + RUNTIME_EXCEPTION("TDengineWriter-02", "运行时异常"), + TYPE_ERROR("TDengineWriter-03", "Datax类型无法正确映射到TDengine类型"); private final String code; private final String description; diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DM2TDengineTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DM2TDengineTest.java index b6932f6004..15f6b1bcd0 100644 --- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DM2TDengineTest.java +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DM2TDengineTest.java @@ -2,7 +2,6 @@ import com.alibaba.datax.core.Engine; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import java.sql.*; @@ -10,7 +9,6 @@ import java.util.Date; import java.util.Random; -@Ignore public class DM2TDengineTest { private String host1 = "192.168.0.72"; diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java index ad5537ba26..46e601ad4a 100644 --- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java @@ -7,7 +7,10 @@ import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.core.transport.record.DefaultRecord; -import org.junit.*; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; import java.sql.Connection; import java.sql.DriverManager; @@ -18,7 +21,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -@Ignore public class DefaultDataHandlerTest { private static final String host = "192.168.1.93"; @@ -27,7 +29,7 @@ public class DefaultDataHandlerTest { private final TaskPluginCollector taskPluginCollector = new TDengineWriter.Task().getTaskPluginCollector(); @Test - public void writeSupTableBySQL() throws Exception { + public void writeSupTableBySQL() throws SQLException { // given createSupAndSubTable(); Configuration configuration = Configuration.from("{" + @@ -67,7 +69,7 @@ public void writeSupTableBySQL() throws Exception { } @Test - public void writeSupTableBySQL_2() throws Exception { + public void writeSupTableBySQL_2() throws SQLException { // given createSupAndSubTable(); Configuration configuration = Configuration.from("{" + @@ -105,7 +107,7 @@ public void writeSupTableBySQL_2() throws Exception { } @Test - public void writeSupTableBySchemaless() throws Exception { + public void writeSupTableBySchemaless() throws SQLException { // given createSupTable(); Configuration configuration = Configuration.from("{" + @@ -145,7 +147,7 @@ public void writeSupTableBySchemaless() throws Exception { } @Test - public void writeSubTableWithTableName() throws Exception { + public void writeSubTableWithTableName() throws SQLException { // given createSupAndSubTable(); Configuration configuration = Configuration.from("{" + @@ -184,7 +186,7 @@ public void writeSubTableWithTableName() throws Exception { } @Test - public void writeSubTableWithoutTableName() throws Exception { + public void writeSubTableWithoutTableName() throws SQLException { // given createSupAndSubTable(); Configuration configuration = Configuration.from("{" + @@ -223,7 +225,7 @@ public void writeSubTableWithoutTableName() throws Exception { } @Test - public void writeNormalTable() throws Exception { + public void writeNormalTable() throws SQLException { // given createSupAndSubTable(); Configuration configuration = Configuration.from("{" + diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mysql2TDengineTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mysql2TDengineTest.java index 8cbc2d90a5..4a662711f3 100644 --- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mysql2TDengineTest.java +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mysql2TDengineTest.java @@ -2,14 +2,12 @@ import com.alibaba.datax.core.Engine; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import java.sql.*; import java.text.SimpleDateFormat; import java.util.Random; -@Ignore public class Mysql2TDengineTest { private static final String host1 = "192.168.56.105"; diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Opentsdb2TDengineTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Opentsdb2TDengineTest.java index 0d212e8a9f..ad326f7e33 100644 --- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Opentsdb2TDengineTest.java +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Opentsdb2TDengineTest.java @@ -2,12 +2,10 @@ import com.alibaba.datax.core.Engine; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import java.sql.*; -@Ignore public class Opentsdb2TDengineTest { @Test diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManagerTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManagerTest.java index 51d17d7ba1..3708e6f967 100644 --- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManagerTest.java +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManagerTest.java @@ -1,6 +1,9 @@ package com.alibaba.datax.plugin.writer.tdenginewriter; -import org.junit.*; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; import java.sql.Connection; import java.sql.DriverManager; @@ -10,7 +13,6 @@ import java.util.List; import java.util.Map; -@Ignore public class SchemaManagerTest { private static Connection conn; diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Stream2TDengineTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Stream2TDengineTest.java index 613d2aa999..e54bcbde02 100644 --- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Stream2TDengineTest.java +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Stream2TDengineTest.java @@ -1,7 +1,7 @@ package com.alibaba.datax.plugin.writer.tdenginewriter; import com.alibaba.datax.core.Engine; -import org.junit.Ignore; +import org.junit.Before; import org.junit.Test; import java.sql.Connection; @@ -9,7 +9,6 @@ import java.sql.SQLException; import java.sql.Statement; -@Ignore public class Stream2TDengineTest { private String host2 = "192.168.56.105"; diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengine2TDengineTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengine2TDengineTest.java index 18efdc142e..9e9546332a 100644 --- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengine2TDengineTest.java +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengine2TDengineTest.java @@ -2,14 +2,12 @@ import com.alibaba.datax.core.Engine; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import java.sql.*; import java.text.SimpleDateFormat; import java.util.Random; -@Ignore public class TDengine2TDengineTest { private static final String host1 = "192.168.56.105";