diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index af586d5ecb5..952521509d7 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -132,6 +132,7 @@ there are some reference value for params above. | GBase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | / | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar | | StarRocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java | | db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | com.ibm.db2.jcc.DB2XADataSource | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | +| teradata | com.teradata.jdbc.TeraDriver | jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | / | https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc | ## Example @@ -200,5 +201,7 @@ sink { ### next version +- [Feature] Support CDC write DELETE/UPDATE/INSERT events ([3378](https://github.com/apache/incubator-seatunnel/issues/3378)) +- [Feature] Support Teradata JDBC Sink ([3362](https://github.com/apache/incubator-seatunnel/pull/3362)) - [Feature] Support Sqlite JDBC Sink ([3089](https://github.com/apache/incubator-seatunnel/pull/3089)) -- [Feature] Support CDC write DELETE/UPDATE/INSERT events ([3378](https://github.com/apache/incubator-seatunnel/issues/3378)) \ No newline at end of file +- [Feature] Support CDC write DELETE/UPDATE/INSERT events ([3378](https://github.com/apache/incubator-seatunnel/issues/3378)) diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md index e6c1c08a92e..742ba48fd2f 100644 --- a/docs/en/connector-v2/source/Jdbc.md +++ b/docs/en/connector-v2/source/Jdbc.md @@ -109,6 +109,7 @@ there are some reference value for params above. | starrocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java | | db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | | tablestore | com.alicloud.openservices.tablestore.jdbc.OTSDriver | "jdbc:ots:http s://myinstance.cn-hangzhou.ots.aliyuncs.com/myinstance" | https://mvnrepository.com/artifact/com.aliyun.openservices/tablestore-jdbc | +| teradata | com.teradata.jdbc.TeraDriver | jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc | ## Example @@ -159,4 +160,5 @@ parallel: - [BugFix] Fix jdbc split bug ([3220](https://github.com/apache/incubator-seatunnel/pull/3220)) - [Feature] Support Sqlite JDBC Source ([3089](https://github.com/apache/incubator-seatunnel/pull/3089)) - [Feature] Support Tablestore Source ([3309](https://github.com/apache/incubator-seatunnel/pull/3309)) +- [Feature] Support Teradata JDBC Source ([3362](https://github.com/apache/incubator-seatunnel/pull/3362)) - [Feature] Support JDBC Fetch Size Config ([3478](https://github.com/apache/incubator-seatunnel/pull/3478)) diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml index b7f40e417a2..6857e9687c6 100644 --- a/seatunnel-connectors-v2/connector-jdbc/pom.xml +++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml @@ -40,6 +40,7 @@ db2jcc4 3.39.3.0 5.13.9 + 17.20.00.12 @@ -98,6 +99,14 @@ ${tablestore.version} provided + + + com.teradata.jdbc + terajdbc4 + ${teradata.version} + provided + + @@ -147,6 +156,10 @@ com.aliyun.openservices tablestore-jdbc + + com.teradata.jdbc + terajdbc4 + diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialect.java new file mode 100644 index 00000000000..d824d2ef01f --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialect.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.teradata; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; + +import java.util.Optional; + +public class TeradataDialect implements JdbcDialect { + + @Override + public String dialectName() { + return "Teradata"; + } + + @Override + public JdbcRowConverter getRowConverter() { + return new TeradataJdbcRowConverter(); + } + + @Override + public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { + return new TeradataTypeMapper(); + } + + @Override + public Optional getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) { + return Optional.empty(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialectFactory.java new file mode 100644 index 00000000000..70a4492868f --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialectFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.teradata; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory; + +import com.google.auto.service.AutoService; + +@AutoService(JdbcDialectFactory.class) +public class TeradataDialectFactory implements JdbcDialectFactory { + + @Override + public boolean acceptsURL(String url) { + return url.startsWith("jdbc:teradata:"); + } + + @Override + public JdbcDialect create() { + return new TeradataDialect(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataJdbcRowConverter.java new file mode 100644 index 00000000000..8e5543eed06 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataJdbcRowConverter.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.teradata; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; + +public class TeradataJdbcRowConverter extends AbstractJdbcRowConverter { + + @Override + public String converterName() { + return "Teradata"; + } + +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataTypeMapper.java new file mode 100644 index 00000000000..97cd9940d79 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataTypeMapper.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.teradata; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; + +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +public class TeradataTypeMapper implements JdbcDialectTypeMapper { + + // ============================data types===================== + + // -------------------------number---------------------------- + private static final String TERADATA_BYTEINT = "BYTEINT"; + private static final String TERADATA_SMALLINT = "SMALLINT"; + private static final String TERADATA_INTEGER = "INTEGER"; + private static final String TERADATA_BIGINT = "BIGINT"; + private static final String TERADATA_FLOAT = "FLOAT"; + private static final String TERADATA_DECIMAL = "DECIMAL"; + + // -------------------------string---------------------------- + private static final String TERADATA_CHAR = "CHAR"; + private static final String TERADATA_VARCHAR = "VARCHAR"; + private static final String TERADATA_CLOB = "CLOB"; + + + // ---------------------------binary--------------------------- + private static final String TERADATA_BYTE = "BYTE"; + private static final String TERADATA_VARBYTE = "VARBYTE"; + + // ------------------------------time------------------------- + private static final String TERADATA_DATE = "DATE"; + private static final String TERADATA_TIME = "TIME"; + private static final String TERADATA_TIMESTAMP = "TIMESTAMP"; + + // ------------------------------blob------------------------- + private static final String TERADATA_BLOB = "BLOB"; + + @Override + public SeaTunnelDataType mapping(ResultSetMetaData metadata, int colIndex) throws SQLException { + String teradataType = metadata.getColumnTypeName(colIndex).toUpperCase(); + switch (teradataType) { + case TERADATA_BYTEINT: + return BasicType.BYTE_TYPE; + case TERADATA_SMALLINT: + return BasicType.SHORT_TYPE; + case TERADATA_INTEGER: + return BasicType.INT_TYPE; + case TERADATA_BIGINT: + return BasicType.LONG_TYPE; + case TERADATA_FLOAT: + return BasicType.FLOAT_TYPE; + case TERADATA_DECIMAL: + return new DecimalType(metadata.getPrecision(colIndex), metadata.getScale(colIndex)); + case TERADATA_CHAR: + case TERADATA_VARCHAR: + case TERADATA_CLOB: + return BasicType.STRING_TYPE; + case TERADATA_BYTE: + case TERADATA_VARBYTE: + case TERADATA_BLOB: + return PrimitiveByteArrayType.INSTANCE; + case TERADATA_DATE: + return LocalTimeType.LOCAL_DATE_TYPE; + case TERADATA_TIME: + return LocalTimeType.LOCAL_TIME_TYPE; + case TERADATA_TIMESTAMP: + return LocalTimeType.LOCAL_DATE_TIME_TYPE; + default: + final String jdbcColumnName = metadata.getColumnName(colIndex); + throw new UnsupportedOperationException( + String.format( + "Doesn't support TERADATA type '%s' on column '%s' yet.", + teradataType, jdbcColumnName)); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml index 9504bb182f5..db554daf3c1 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml @@ -103,6 +103,11 @@ postgresql test + + com.teradata.jdbc + terajdbc4 + test + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcTeradataIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcTeradataIT.java new file mode 100644 index 00000000000..df07b8b193d --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcTeradataIT.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc; + +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.TestContainer; + +import com.teradata.jdbc.TeraDataSource; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; + +import java.sql.Connection; +import java.sql.Statement; + +@Disabled("Disabled because it needs user's personal teradata account to run this test!") +public class JdbcTeradataIT extends TestSuiteBase implements TestResource { + private static final String HOST = "1.2.3.4"; + private static final String PORT = "1025"; + private static final String USERNAME = "dbc"; + private static final String PASSWORD = "dbc"; + private static final String DATABASE = "test"; + private static final String SINK_TABLE = "sink_table"; + private static final String TERADATA_DRIVER_JAR = "https://repo1.maven.org/maven2/com/teradata/jdbc/terajdbc4/17.20.00.12/terajdbc4-17.20.00.12.jar"; + private final ContainerExtendedFactory extendedFactory = container -> { + container.execInContainer("bash", "-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + TERADATA_DRIVER_JAR); + }; + + private Connection connection; + + @TestTemplate + public void testTeradata(TestContainer container) throws Exception { + container.executeExtraCommands(extendedFactory); + Container.ExecResult execResult = container.executeJob("/jdbc_teradata_source_and_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + clearSinkTable(); + } + + private void clearSinkTable() { + try (Statement statement = connection.createStatement()) { + statement.execute(String.format("delete from %s", SINK_TABLE)); + } catch (Exception e) { + throw new RuntimeException("Test teradata server failed!", e); + } + } + + @BeforeAll + @Override + public void startUp() throws Exception { + TeraDataSource teraDataSource = new TeraDataSource(); + teraDataSource.setDSName(HOST); + teraDataSource.setDbsPort(PORT); + teraDataSource.setUser(USERNAME); + teraDataSource.setPassword(PASSWORD); + teraDataSource.setDATABASE(DATABASE); + this.connection = teraDataSource.getConnection(); + } + + @AfterAll + @Override + public void tearDown() throws Exception { + if (connection != null) { + this.connection.close(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_teradata_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_teradata_source_and_sink.conf new file mode 100644 index 00000000000..7408228285b --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_teradata_source_and_sink.conf @@ -0,0 +1,77 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Jdbc { + driver = com.teradata.jdbc.TeraDriver + url = "jdbc:teradata://1.2.3.4/DBS_PORT=1025,DATABASE=test,TYPE=FASTEXPORT" + user = "dbc" + password = "dbc" + query = """ + select id, + c_byteint, + c_smallint, + c_integer, + c_bigint, + c_float, + c_decimal, + c_char, + c_varchar, + c_byte, + c_varbyte, + c_date, + c_timestamp + from source_table; + """ + } + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc +} + +sink { + Jdbc { + driver = com.teradata.jdbc.TeraDriver + url = "jdbc:teradata://1.2.3.4/DBS_PORT=1025,DATABASE=test,TYPE=FASTLOAD" + user = "dbc" + password = "dbc" + auto_commit = false + query = """ + insert into sink_table(id, + c_byteint, + c_smallint, + c_integer, + c_bigint, + c_float, + c_decimal, + c_char, + c_varchar, + c_byte, + c_varbyte, + c_date, + c_timestamp) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +""" + } + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc +}