From 5b3b78dd634a088dc72cdb06f8d92a23ac6f8a56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Ko=C5=82akowski?= Date: Wed, 22 Nov 2023 13:38:10 +0100 Subject: [PATCH] Add initial implementation --- .gitignore | 4 + CHANGELOG.md | 5 + LICENSE | 201 +++++++++ pom.xml | 417 ++++++++++++++++++ .../dialect/ElasticsearchDialect.java | 108 +++++ .../dialect/ElasticsearchDialectFactory.java | 35 ++ .../dialect/ElasticsearchRowConverter.java | 57 +++ ....connector.jdbc.dialect.JdbcDialectFactory | 16 + .../ElasticsearchDialectTypeTest.java | 44 ++ .../ElasticsearchPreparedStatementTest.java | 57 +++ .../elasticsearch/ElasticsearchTestBase.java | 35 ++ ...ElasticsearchDynamicTableSourceITCase.java | 409 +++++++++++++++++ .../ElasticsearchBulkBuilder.java | 77 ++++ .../elasticsearch/ElasticsearchDatabase.java | 90 ++++ .../elasticsearch/ElasticsearchImages.java | 24 + .../ElasticsearchIndexSchemaBuilder.java | 253 +++++++++++ .../elasticsearch/ElasticsearchMetadata.java | 93 ++++ .../ElasticsearchRestClient.java | 136 ++++++ .../jdbc/dialect/JdbcDialectTypeTest.java | 133 ++++++ .../jdbc/testutils/DatabaseExtension.java | 181 ++++++++ .../jdbc/testutils/DatabaseMetadata.java | 67 +++ .../jdbc/testutils/DatabaseTest.java | 31 ++ .../jdbc/testutils/JdbcITCaseBase.java | 35 ++ .../jdbc/testutils/TableManaged.java | 34 ++ .../functions/JdbcResultSetBuilder.java | 30 ++ .../jdbc/testutils/tables/TableBase.java | 294 ++++++++++++ .../jdbc/testutils/tables/TableBuilder.java | 54 +++ .../testutils/tables/TableBuilderTest.java | 119 +++++ .../jdbc/testutils/tables/TableField.java | 82 ++++ .../jdbc/testutils/tables/TableRow.java | 121 +++++ .../tables/templates/BooksTable.java | 177 ++++++++ 31 files changed, 3419 insertions(+) create mode 100644 .gitignore create mode 100644 CHANGELOG.md create mode 100644 LICENSE create mode 100644 pom.xml create mode 100644 src/main/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchDialect.java create mode 100644 src/main/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchDialectFactory.java create mode 100644 src/main/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchRowConverter.java create mode 100644 src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory create mode 100644 src/test/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/ElasticsearchDialectTypeTest.java create mode 100644 src/test/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/ElasticsearchPreparedStatementTest.java create mode 100644 src/test/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/ElasticsearchTestBase.java create mode 100644 src/test/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/table/ElasticsearchDynamicTableSourceITCase.java create mode 100644 src/test/java/com/getindata/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchBulkBuilder.java create mode 100644 src/test/java/com/getindata/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchDatabase.java create mode 100644 src/test/java/com/getindata/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchImages.java create mode 100644 src/test/java/com/getindata/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchIndexSchemaBuilder.java create mode 100644 src/test/java/com/getindata/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchMetadata.java create mode 100644 src/test/java/com/getindata/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchRestClient.java create mode 100644 src/test/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectTypeTest.java create mode 100644 src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseExtension.java create mode 100644 src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseMetadata.java create mode 100644 src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseTest.java create mode 100644 src/test/java/org/apache/flink/connector/jdbc/testutils/JdbcITCaseBase.java create mode 100644 src/test/java/org/apache/flink/connector/jdbc/testutils/TableManaged.java create mode 100644 src/test/java/org/apache/flink/connector/jdbc/testutils/functions/JdbcResultSetBuilder.java create mode 100644 src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBase.java create mode 100644 src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBuilder.java create mode 100644 src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBuilderTest.java create mode 100644 src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableField.java create mode 100644 src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableRow.java create mode 100644 src/test/java/org/apache/flink/connector/jdbc/testutils/tables/templates/BooksTable.java diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f69be90 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/.idea +**/target/ + +*.iml diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..d4f7ccb --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,5 @@ +# Changelog + +## [Unreleased] + +- Initial implementation of Elasticsearch SQL Dialect for flink-connector-jdbc diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..261eeb9 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..93cf079 --- /dev/null +++ b/pom.xml @@ -0,0 +1,417 @@ + + + 4.0.0 + + com.getindata + flink-connector-jdbc-elasticsearch-dialect + 0.1.0-SNAPSHOT + + flink-connector-jdbc-elasticsearch-dialect + Elasticsearch SQL Dialect for flink-connector-jdbc. + https://github.com/getindata/flink-connector-jdbc-elasticsearch-dialect + + + + The Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + + GetInData + office@getindata.com + GetInData + https://getindata.com + + + + + scm:git:git://github.com/getindata/flink-connector-jdbc-elasticsearch-dialect + scm:git:ssh://github.com/getindata/flink-connector-jdbc-elasticsearch-dialect + + https://github.com/getindata/flink-connector-jdbc-elasticsearch-dialect/tree/main + + + + 8 + 8 + 2.12 + UTF-8 + + 8.11.1 + 1.17.1 + 3.1.1-1.17 + 2.15.2 + + + **/*Test.* + 2048m + 1024m + + 2 + 4 + -XX:+UseG1GC -Xms256m + + 3.23.1 + 5.9.1 + 2.21.0 + 1.18.2 + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.0 + + ${maven.compiler.source} + ${maven.compiler.target} + + false + + + -Xpkginfo:always + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 3.0.0-M8 + + + + false + + 0${surefire.forkNumber} + + $${surefire.forkNumber} + US + en + ${project.basedir} + true + + -Xms256m -Xmx2048m -XX:+UseG1GC + + + + + default-test + test + + test + + + + ${test.unit.pattern} + + ${flink.forkCountUnitTest} + ${flink.surefire.baseArgLine} -Xmx${flink.XmxUnitTest} + + + + + integration-tests + integration-test + + test + + + + **/*.* + + + ${test.unit.pattern} + + ${flink.forkCountITCase} + ${flink.surefire.baseArgLine} -Xmx${flink.XmxITCase} + false + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 1.6 + + + package + + shade + + + + + com.getindata:flink-connector-jdbc-elasticsearch-dialect + + + + + META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory + + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 3.3.0 + + + parse-version + + parse-version + + + + + + + + + + + org.apache.flink + flink-table-common + ${flink.version} + + + org.apache.flink + flink-connector-jdbc + ${flink-connector-jdbc.version} + + + + + org.elasticsearch.plugin + x-pack-sql-jdbc + ${elasticsearch.version} + provided + + + + + org.junit + junit-bom + ${junit.version} + pom + import + + + org.junit.jupiter + junit-jupiter + ${junit.version} + test + + + org.junit.vintage + junit-vintage-engine + ${junit.version} + test + + + org.assertj + assertj-core + ${assertj.version} + test + + + org.mockito + mockito-core + ${mockito.version} + jar + test + + + org.testcontainers + junit-jupiter + ${testcontainers.version} + test + + + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + org.apache.flink + flink-test-utils-junit + ${flink.version} + test + + + org.apache.flink + flink-table-common + ${flink.version} + test-jar + test + + + org.apache.flink + flink-streaming-java + ${flink.version} + test-jar + test + + + org.apache.flink + flink-core + ${flink.version} + test + test-jar + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test-jar + test + + + org.apache.flink + flink-table-api-scala-bridge_${scala.binary.version} + ${flink.version} + test + + + + + org.testcontainers + elasticsearch + ${testcontainers.version} + test + + + org.elasticsearch.client + elasticsearch-rest-client + ${elasticsearch.version} + test + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + test + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + ${jackson.version} + test + + + + + + bump-patch + + + bumpPatch + + + + + + org.codehaus.mojo + versions-maven-plugin + + + + + set + + validate + + + ${parsedVersion.majorVersion}.${parsedVersion.minorVersion}.${parsedVersion.nextIncrementalVersion}-SNAPSHOT + + + + + + + + + + bump-minor + + + bumpMinor + + + + + + org.codehaus.mojo + versions-maven-plugin + + + + + set + + validate + + + ${parsedVersion.majorVersion}.${parsedVersion.nextMinorVersion}.0-SNAPSHOT + + + + + + + + + + bump-major + + + bumpMajor + + + + + + org.codehaus.mojo + versions-maven-plugin + + + + + set + + validate + + ${parsedVersion.nextMajorVersion}.0.0-SNAPSHOT + + + + + + + + + + \ No newline at end of file diff --git a/src/main/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchDialect.java b/src/main/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchDialect.java new file mode 100644 index 0000000..c791a3a --- /dev/null +++ b/src/main/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchDialect.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.getindata.flink.connector.jdbc.databases.elasticsearch.dialect; + +import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; +import org.apache.flink.connector.jdbc.dialect.AbstractDialect; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; + +import java.util.EnumSet; +import java.util.Optional; +import java.util.Set; + +public class ElasticsearchDialect extends AbstractDialect { + + private static final long serialVersionUID = 1L; + + // Define MAX/MIN precision of TIMESTAMP type according to Elastic docs: + // https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-data-types.html + private static final int MIN_TIMESTAMP_PRECISION = 0; + private static final int MAX_TIMESTAMP_PRECISION = 9; + + @Override + public String dialectName() { + return "Elasticsearch"; + } + + @Override + public Optional defaultDriverName() { + return Optional.of("org.elasticsearch.xpack.sql.jdbc.EsDriver"); + } + + @Override + public Set supportedTypes() { + // The list of types supported by Elastic SQL. + // https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-data-types.html + return EnumSet.of( + LogicalTypeRoot.BIGINT, + LogicalTypeRoot.BOOLEAN, + LogicalTypeRoot.DATE, + LogicalTypeRoot.DOUBLE, + LogicalTypeRoot.INTEGER, + LogicalTypeRoot.FLOAT, + LogicalTypeRoot.SMALLINT, + LogicalTypeRoot.TINYINT, + LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, + LogicalTypeRoot.VARBINARY, + LogicalTypeRoot.VARCHAR); + } + + @Override + public Optional timestampPrecisionRange() { + return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION)); + } + + @Override + public JdbcRowConverter getRowConverter(RowType rowType) { + return new ElasticsearchRowConverter(rowType); + } + + @Override + public String getLimitClause(long limit) { + return "LIMIT " + limit; + } + + @Override + public String quoteIdentifier(String identifier) { + return '"' + identifier + '"'; + } + + @Override + public Optional getUpsertStatement( + String tableName, String[] fieldNames, String[] uniqueKeyFields) { + throw new UnsupportedOperationException("Upsert is not supported."); + } + + @Override + public String getInsertIntoStatement(String tableName, String[] fieldNames) { + throw new UnsupportedOperationException("Insert into is not supported."); + } + + @Override + public String getUpdateStatement( + String tableName, String[] fieldNames, String[] conditionFields) { + throw new UnsupportedOperationException("Update is not supported."); + } + + @Override + public String getDeleteStatement(String tableName, String[] conditionFields) { + throw new UnsupportedOperationException("Delete is not supported."); + } +} diff --git a/src/main/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchDialectFactory.java b/src/main/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchDialectFactory.java new file mode 100644 index 0000000..d11302d --- /dev/null +++ b/src/main/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchDialectFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.getindata.flink.connector.jdbc.databases.elasticsearch.dialect; + +import org.apache.flink.connector.jdbc.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory; + +public class ElasticsearchDialectFactory implements JdbcDialectFactory { + + @Override + public boolean acceptsURL(String url) { + return url.startsWith("jdbc:elasticsearch:") || url.startsWith("jdbc:es:"); + } + + @Override + public JdbcDialect create() { + return new ElasticsearchDialect(); + } +} \ No newline at end of file diff --git a/src/main/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchRowConverter.java b/src/main/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchRowConverter.java new file mode 100644 index 0000000..f90708c --- /dev/null +++ b/src/main/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchRowConverter.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.getindata.flink.connector.jdbc.databases.elasticsearch.dialect; + +import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import java.sql.Timestamp; + +/** + * Runtime converter that responsible to convert between JDBC object and Flink internal object for + * Elasticsearch. + */ +public class ElasticsearchRowConverter extends AbstractJdbcRowConverter { + private static final long serialVersionUID = 1L; + + public ElasticsearchRowConverter(RowType rowType) { + super(rowType); + } + + @Override + public String converterName() { + return "Elasticsearch"; + } + + @Override + protected JdbcDeserializationConverter createInternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case TINYINT: + case DOUBLE: + case FLOAT: + return val -> val; + case DATE: + return val -> + (int) (((Timestamp) val).toLocalDateTime().toLocalDate().toEpochDay()); + default: + return super.createInternalConverter(type); + } + } +} \ No newline at end of file diff --git a/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory b/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory new file mode 100644 index 0000000..35cb683 --- /dev/null +++ b/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +com.getindata.flink.connector.jdbc.databases.elasticsearch.dialect.ElasticsearchDialectFactory diff --git a/src/test/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/ElasticsearchDialectTypeTest.java b/src/test/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/ElasticsearchDialectTypeTest.java new file mode 100644 index 0000000..6ff4f78 --- /dev/null +++ b/src/test/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/ElasticsearchDialectTypeTest.java @@ -0,0 +1,44 @@ +package com.getindata.flink.connector.jdbc.databases.elasticsearch; + +import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeTest; + +import java.util.Arrays; +import java.util.List; + +/** The Elasticsearch params for {@link JdbcDialectTypeTest}. */ +public class ElasticsearchDialectTypeTest extends JdbcDialectTypeTest { + + @Override + protected String testDialect() { + return "elasticsearch"; + } + + @Override + protected List testData() { + return Arrays.asList( + createTestItem("VARCHAR"), + createTestItem("BOOLEAN"), + createTestItem("TINYINT"), + createTestItem("SMALLINT"), + createTestItem("INTEGER"), + createTestItem("BIGINT"), + createTestItem("FLOAT"), + createTestItem("DOUBLE"), + createTestItem("DATE"), + createTestItem("TIMESTAMP(3)"), + createTestItem("TIMESTAMP WITHOUT TIME ZONE"), + createTestItem("VARBINARY"), + + // Not valid data + createTestItem("CHAR", "The Elasticsearch dialect doesn't support type: CHAR(1)."), + createTestItem( + "BINARY", "The Elasticsearch dialect doesn't support type: BINARY(1)."), + createTestItem("TIME", "The Elasticsearch dialect doesn't support type: TIME(0)."), + createTestItem( + "VARBINARY(10)", + "The Elasticsearch dialect doesn't support type: VARBINARY(10)."), + createTestItem( + "DECIMAL(10, 4)", + "The Elasticsearch dialect doesn't support type: DECIMAL(10, 4).")); + } +} \ No newline at end of file diff --git a/src/test/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/ElasticsearchPreparedStatementTest.java b/src/test/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/ElasticsearchPreparedStatementTest.java new file mode 100644 index 0000000..0c4c6a6 --- /dev/null +++ b/src/test/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/ElasticsearchPreparedStatementTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.getindata.flink.connector.jdbc.databases.elasticsearch; + +import org.apache.flink.connector.jdbc.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link ElasticsearchPreparedStatementTest}. */ +public class ElasticsearchPreparedStatementTest { + + private final JdbcDialect dialect = + JdbcDialectLoader.load( + "jdbc:elasticsearch://localhost:9200/test", getClass().getClassLoader()); + + private final String[] fieldNames = + new String[] {"id", "name", "email", "ts", "field1", "field_2", "__field_3__"}; + private final String[] keyFields = new String[] {"id", "__field_3__"}; + private final String tableName = "tbl"; + + @Test + void testRowExistsStatement() { + String rowExistStmt = dialect.getRowExistsStatement(tableName, keyFields); + assertThat(rowExistStmt) + .isEqualTo( + "SELECT 1 FROM \"tbl\" WHERE \"id\" = :id AND \"__field_3__\" = :__field_3__"); + } + + @Test + void testSelectStatement() { + String selectStmt = dialect.getSelectFromStatement(tableName, fieldNames, keyFields); + assertThat(selectStmt) + .isEqualTo( + "SELECT \"id\", \"name\", \"email\", \"ts\", \"field1\", \"field_2\", \"__field_3__\" " + + "FROM \"tbl\" " + + "WHERE \"id\" = :id AND \"__field_3__\" = :__field_3__"); + } +} \ No newline at end of file diff --git a/src/test/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/ElasticsearchTestBase.java b/src/test/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/ElasticsearchTestBase.java new file mode 100644 index 0000000..567569f --- /dev/null +++ b/src/test/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/ElasticsearchTestBase.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.getindata.flink.connector.jdbc.databases.elasticsearch; + +import com.getindata.flink.connector.jdbc.testutils.databases.elasticsearch.ElasticsearchDatabase; +import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.connector.jdbc.testutils.DatabaseTest; + +import org.junit.jupiter.api.extension.ExtendWith; + +/** Base class for Elasticsearch testing. */ +@ExtendWith(ElasticsearchDatabase.class) +public interface ElasticsearchTestBase extends DatabaseTest { + + @Override + default DatabaseMetadata getMetadata() { + return ElasticsearchDatabase.getMetadata(); + } +} \ No newline at end of file diff --git a/src/test/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/table/ElasticsearchDynamicTableSourceITCase.java b/src/test/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/table/ElasticsearchDynamicTableSourceITCase.java new file mode 100644 index 0000000..d715c76 --- /dev/null +++ b/src/test/java/com/getindata/flink/connector/jdbc/databases/elasticsearch/table/ElasticsearchDynamicTableSourceITCase.java @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.getindata.flink.connector.jdbc.databases.elasticsearch.table; + + +import com.getindata.flink.connector.jdbc.databases.elasticsearch.ElasticsearchTestBase; +import com.getindata.flink.connector.jdbc.databases.elasticsearch.dialect.ElasticsearchDialect; +import com.getindata.flink.connector.jdbc.testutils.databases.elasticsearch.ElasticsearchMetadata; +import com.getindata.flink.connector.jdbc.testutils.databases.elasticsearch.ElasticsearchRestClient; +import org.apache.flink.connector.jdbc.testutils.tables.TableRow; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.connector.source.lookup.cache.LookupCache; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager; +import org.apache.flink.table.test.lookup.cache.LookupCacheAssert; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.types.Row; +import org.apache.flink.util.CollectionUtil; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static com.getindata.flink.connector.jdbc.testutils.databases.elasticsearch.ElasticsearchBulkBuilder.createBulkContent; +import static com.getindata.flink.connector.jdbc.testutils.databases.elasticsearch.ElasticsearchIndexSchemaBuilder.buildIndexSchema; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.tableRow; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * The Table Source ITCase for {@link ElasticsearchDialect}. + */ +public class ElasticsearchDynamicTableSourceITCase extends AbstractTestBase + implements ElasticsearchTestBase { + + private ElasticsearchRestClient client; + private TableEnvironment tEnv; + private final TableRow inputTable = createInputTable(); + + @BeforeEach + void beforeEach() throws Exception { + client = new ElasticsearchRestClient((ElasticsearchMetadata) getMetadata()); + + client.createIndex(inputTable.getTableName(), buildIndexSchema(inputTable)); + client.addDataBulk(inputTable.getTableName(), createBulkContent(inputTable, getTestData())); + + tEnv = StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment()); + } + + @AfterEach + void afterEach() throws Exception { + if (client != null) { + client.deleteIndex(inputTable.getTableName()); + } + } + + @Test + void testJdbcSource() { + String testTable = "testTable"; + tEnv.executeSql(inputTable.getCreateQueryForFlink(getMetadata(), testTable)); + + List collected = executeQuery("SELECT * FROM " + testTable); + + assertThat(collected).containsExactlyInAnyOrderElementsOf(getTestData()); + } + + @Test + void testProject() { + String testTable = "testTable"; + tEnv.executeSql( + inputTable.getCreateQueryForFlink( + getMetadata(), + testTable, + Arrays.asList( + "'scan.partition.column'='id'", + "'scan.partition.num'='2'", + "'scan.partition.lower-bound'='0'", + "'scan.partition.upper-bound'='100'"))); + + String fields = String.join(",", Arrays.copyOfRange(inputTable.getTableFields(), 0, 3)); + List collected = executeQuery(String.format("SELECT %s FROM %s", fields, testTable)); + + List expected = + getTestData().stream() + .map(row -> Row.of(row.getField(0), row.getField(1), row.getField(2))) + .collect(Collectors.toList()); + + assertThat(collected).containsExactlyInAnyOrderElementsOf(expected); + } + + @Test + public void testLimit() { + String testTable = "testTable"; + tEnv.executeSql( + inputTable.getCreateQueryForFlink( + getMetadata(), + testTable, + Arrays.asList( + "'scan.partition.column'='id'", + "'scan.partition.num'='2'", + "'scan.partition.lower-bound'='1'", + "'scan.partition.upper-bound'='2'"))); + + List collected = executeQuery("SELECT * FROM " + testTable + " LIMIT 1"); + + assertThat(collected).hasSize(1); + assertThat(getTestData()) + .as("The actual output is not a subset of the expected set.") + .containsAll(collected); + } + + @Test + public void testFilter() { + String testTable = "testTable"; + tEnv.executeSql(inputTable.getCreateQueryForFlink(getMetadata(), testTable)); + + // create a partitioned table to ensure no regression + String partitionedTable = "PARTITIONED_TABLE"; + tEnv.executeSql( + inputTable.getCreateQueryForFlink( + getMetadata(), + partitionedTable, + Arrays.asList( + "'scan.partition.column'='id'", + "'scan.partition.num'='1'", + "'scan.partition.lower-bound'='1'", + "'scan.partition.upper-bound'='1'"))); + + // we create a VIEW here to test column remapping, ie. would filter push down work if we + // create a view that depends on our source table + tEnv.executeSql( + String.format( + "CREATE VIEW FAKE_TABLE (idx, %s) as (SELECT * from %s )", + Arrays.stream(inputTable.getTableFields()) + .filter(f -> !f.equals("id")) + .collect(Collectors.joining(",")), + testTable)); + + Row onlyRow1 = + getTestData().stream() + .filter(row -> row.getFieldAs(0).equals(1L)) + .findAny() + .orElseThrow(NullPointerException::new); + + Row onlyRow2 = + getTestData().stream() + .filter(row -> row.getFieldAs(0).equals(2L)) + .findAny() + .orElseThrow(NullPointerException::new); + + List twoRows = getTestData(); + + // test simple filter + assertThat(executeQuery("SELECT * FROM FAKE_TABLE WHERE idx = 1")) + .containsExactly(onlyRow1); + + // test TIMESTAMP filter + assertThat( + executeQuery( + "SELECT * FROM FAKE_TABLE WHERE timestamp6_col = TIMESTAMP '2020-01-01 15:35:00.123456'")) + .containsExactly(onlyRow1); + + // test the IN operator + assertThat( + executeQuery( + "SELECT * FROM FAKE_TABLE WHERE 1 = idx AND double_col IN (100.1234, 101.1234)")) + .containsExactly(onlyRow1); + + // test mixing AND and OR operator + assertThat( + executeQuery( + "SELECT * FROM FAKE_TABLE WHERE idx = 1 AND double_col = 100.1234 OR double_col = 101.1234")) + .containsExactlyInAnyOrderElementsOf(twoRows); + + // test mixing AND/OR with parenthesis, and the swapping the operand of equal expression + assertThat( + executeQuery( + "SELECT * FROM FAKE_TABLE WHERE (2 = idx AND double_col = 100.1234) OR double_col = 101.1234")) + .containsExactly(onlyRow2); + + // test Greater than, just to make sure we didnt break anything that we cannot pushdown + assertThat( + executeQuery( + "SELECT * FROM FAKE_TABLE WHERE idx = 2 AND double_col > 100 OR double_col = 101.123")) + .containsExactly(onlyRow2); + + // One more test of parenthesis + assertThat( + executeQuery( + "SELECT * FROM FAKE_TABLE WHERE 2 = idx AND (double_col = 100.1234 OR double_col = 102.1234)")) + .isEmpty(); + + assertThat( + executeQuery( + "SELECT * FROM " + + partitionedTable + + " WHERE id = 2 AND double_col > 100 OR double_col = 101.123")) + .isEmpty(); + + assertThat( + executeQuery( + "SELECT * FROM " + + partitionedTable + + " WHERE 1 = id AND double_col IN (100.1234, 101.1234)")) + .containsExactly(onlyRow1); + } + + @ParameterizedTest + @EnumSource(Caching.class) + void testLookupJoin(Caching caching) { + // Create JDBC lookup table + List cachingOptions = Collections.emptyList(); + if (caching.equals(Caching.ENABLE_CACHE)) { + cachingOptions = + Arrays.asList( + "'lookup.cache.max-rows' = '100'", "'lookup.cache.ttl' = '10min'"); + } + tEnv.executeSql( + inputTable.getCreateQueryForFlink(getMetadata(), "jdbc_lookup", cachingOptions)); + + // Create and prepare a value source + String dataId = + TestValuesTableFactory.registerData( + Arrays.asList( + Row.of(1L, "Alice"), + Row.of(1L, "Alice"), + Row.of(2L, "Bob"), + Row.of(3L, "Charlie"))); + tEnv.executeSql( + String.format( + "CREATE TABLE value_source ( " + + " `id` BIGINT, " + + " `name` STRING, " + + " `proctime` AS PROCTIME()" + + ") WITH (" + + " 'connector' = 'values', " + + " 'data-id' = '%s'" + + ")", + dataId)); + + if (caching == Caching.ENABLE_CACHE) { + LookupCacheManager.keepCacheOnRelease(true); + } + + // Execute lookup join + try { + List collected = + executeQuery( + "SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source" + + " AS S JOIN jdbc_lookup for system_time as of S.proctime AS D ON S.id = D.id"); + + assertThat(collected).hasSize(3); + + List expected = + Arrays.asList( + Row.of( + 1L, + "Alice", + 1L, + LocalDateTime.parse("2020-01-01T15:35:00.123456"), + 100.1234d), + Row.of( + 1L, + "Alice", + 1L, + LocalDateTime.parse("2020-01-01T15:35:00.123456"), + 100.1234d), + Row.of( + 2L, + "Bob", + 2L, + LocalDateTime.parse("2020-01-01T15:36:01.123456"), + 101.1234d)); + + assertThat(collected) + .as("The actual output is not a subset of the expected set") + .containsAll(expected); + + if (caching == Caching.ENABLE_CACHE) { + validateCachedValues(); + } + } finally { + if (caching == Caching.ENABLE_CACHE) { + LookupCacheManager.getInstance().checkAllReleased(); + LookupCacheManager.getInstance().clear(); + LookupCacheManager.keepCacheOnRelease(false); + } + } + } + + private void validateCachedValues() { + // Validate cache + Map managedCaches = + LookupCacheManager.getInstance().getManagedCaches(); + assertThat(managedCaches).as("There should be only 1 shared cache registered").hasSize(1); + LookupCache cache = managedCaches.get(managedCaches.keySet().iterator().next()).getCache(); + // jdbc does support project push down, the cached row has been projected + RowData key1 = GenericRowData.of(1L); + RowData value1 = + GenericRowData.of( + 1L, + 100.1234d, + TimestampData.fromLocalDateTime( + LocalDateTime.parse("2020-01-01T15:35:00.123456"))); + + RowData key2 = GenericRowData.of(2L); + RowData value2 = + GenericRowData.of( + 2L, + 101.1234d, + TimestampData.fromLocalDateTime( + LocalDateTime.parse("2020-01-01T15:36:01.123456"))); + + RowData key3 = GenericRowData.of(3L); + + Map> expectedEntries = new HashMap<>(); + expectedEntries.put(key1, Collections.singletonList(value1)); + expectedEntries.put(key2, Collections.singletonList(value2)); + expectedEntries.put(key3, Collections.emptyList()); + + LookupCacheAssert.assertThat(cache).containsExactlyEntriesOf(expectedEntries); + } + + private enum Caching { + ENABLE_CACHE, + DISABLE_CACHE + } + + private List executeQuery(String query) { + return CollectionUtil.iteratorToList(tEnv.executeSql(query).collect()); + } + + private TableRow createInputTable() { + return tableRow( + "jdbc_dynamic_table_source", + field("id", DataTypes.BIGINT().notNull()), + field("double_col", DataTypes.DOUBLE()), + field("timestamp6_col", DataTypes.TIMESTAMP(6)), + field("boolean_col", DataTypes.BOOLEAN()), + field("tinyint_col", DataTypes.TINYINT()), + field("smallint_col", DataTypes.SMALLINT()), + field("integer_col", DataTypes.INT()), + field("bigint_col", DataTypes.BIGINT()), + field("float_col", DataTypes.FLOAT()), + field("varchar15_col", DataTypes.VARCHAR(15)), + field("date_col", DataTypes.DATE())); + } + + protected List getTestData() { + return Arrays.asList( + Row.of( + 1L, + 100.1234d, + LocalDateTime.parse("2020-01-01T15:35:00.123456"), + true, + (byte) -12, + (short) 31_987, + 1_147_483_647, + 8_223_372_036_854_775_807L, + 70.5f, + "some-text", + LocalDate.parse("2020-01-01")), + Row.of( + 2L, + 101.1234d, + LocalDateTime.parse("2020-01-01T15:36:01.123456"), + false, + (byte) 23, + (short) -29_987, + -2_047_483_647, + -1_223_372_036_854_775_807L, + -123.5f, + "other-text", + LocalDate.parse("2020-01-02"))); + } +} diff --git a/src/test/java/com/getindata/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchBulkBuilder.java b/src/test/java/com/getindata/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchBulkBuilder.java new file mode 100644 index 0000000..633596c --- /dev/null +++ b/src/test/java/com/getindata/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchBulkBuilder.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.getindata.flink.connector.jdbc.testutils.databases.elasticsearch; + +import org.apache.flink.connector.jdbc.testutils.tables.TableRow; +import org.apache.flink.types.Row; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.lang.String.format; + +/** Creates content for Elastic Bulk API call. */ +public class ElasticsearchBulkBuilder { + + private static final ObjectMapper OBJECT_MAPPER = + new ObjectMapper() + .registerModule(new JavaTimeModule()) + .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); + + public static String createBulkContent(TableRow schema, List data) + throws JsonProcessingException { + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < data.size(); ++i) { + builder.append( + format( + "{\"create\":{\"_index\":\"%s\",\"_id\":\"%d\"}}\n", + schema.getTableName(), i + 1)); + builder.append(rowToJson(schema, data.get(i))).append('\n'); + } + return builder.toString(); + } + + private static String rowToJson(TableRow schema, Row data) throws JsonProcessingException { + int fieldCount = schema.getTableDataFields().length; + Map fieldMap = new HashMap<>(fieldCount); + for (int i = 0; i < fieldCount; ++i) { + fieldMap.put(schema.getTableFields()[i], adjustValueIfNeeded(data.getField(i))); + } + return OBJECT_MAPPER.writeValueAsString(fieldMap); + } + + private static Object adjustValueIfNeeded(Object object) { + if (object instanceof LocalDateTime) { + return ((LocalDateTime) object) + .atZone(ZoneId.systemDefault()) + .withZoneSameInstant(ZoneId.of("UTC")) + .toLocalDateTime(); + } else { + return object; + } + } +} \ No newline at end of file diff --git a/src/test/java/com/getindata/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchDatabase.java b/src/test/java/com/getindata/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchDatabase.java new file mode 100644 index 0000000..e955086 --- /dev/null +++ b/src/test/java/com/getindata/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchDatabase.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.getindata.flink.connector.jdbc.testutils.databases.elasticsearch; + +import org.apache.flink.connector.jdbc.testutils.DatabaseExtension; +import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.util.FlinkRuntimeException; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.elasticsearch.ElasticsearchContainer; + +import java.time.Duration; + +import static com.getindata.flink.connector.jdbc.testutils.databases.elasticsearch.ElasticsearchMetadata.PASSWORD; +import static com.getindata.flink.connector.jdbc.testutils.databases.elasticsearch.ElasticsearchMetadata.USERNAME; + + +/** + * Elasticsearch database for testing. + */ +public class ElasticsearchDatabase extends DatabaseExtension implements ElasticsearchImages { + + private static final ElasticsearchContainer CONTAINER = + new ElasticsearchContainer(ELASTICSEARCH_8) + .waitingFor( + Wait.forHttp("/_license") + .withBasicCredentials(USERNAME, PASSWORD) + .withReadTimeout(Duration.ofSeconds(5)) + .withStartupTimeout(Duration.ofMinutes(5))); + + private static ElasticsearchMetadata metadata; + private static ElasticsearchRestClient client; + + public static ElasticsearchMetadata getMetadata() { + if (!CONTAINER.isRunning()) { + throw new FlinkRuntimeException("Container is stopped."); + } + if (metadata == null) { + metadata = new ElasticsearchMetadata(CONTAINER); + } + return metadata; + } + + private static ElasticsearchRestClient getClient() { + if (!CONTAINER.isRunning()) { + throw new FlinkRuntimeException("Container is stopped."); + } + if (client == null) { + client = new ElasticsearchRestClient(getMetadata()); + } + return client; + } + + @Override + protected DatabaseMetadata startDatabase() throws Exception { + CONTAINER.withEnv("xpack.security.enabled", "true"); + CONTAINER.withEnv("ELASTIC_PASSWORD", PASSWORD); + CONTAINER.withEnv("ES_JAVA_OPTS", "-Xms1g -Xmx1g"); + CONTAINER.start(); + + // JDBC plugin is available only in Platinum and Enterprise licenses or in trial. + if (!getClient().trialEnabled()) { + getClient().enableTrial(); + } + + return getMetadata(); + } + + @Override + protected void stopDatabase() throws Exception { + CONTAINER.stop(); + metadata = null; + client = null; + } +} \ No newline at end of file diff --git a/src/test/java/com/getindata/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchImages.java b/src/test/java/com/getindata/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchImages.java new file mode 100644 index 0000000..f7606f9 --- /dev/null +++ b/src/test/java/com/getindata/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchImages.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.getindata.flink.connector.jdbc.testutils.databases.elasticsearch; + +/** Elasticsearch docker images. */ +public interface ElasticsearchImages { + + String ELASTICSEARCH_8 = "docker.elastic.co/elasticsearch/elasticsearch:8.11.1"; +} \ No newline at end of file diff --git a/src/test/java/com/getindata/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchIndexSchemaBuilder.java b/src/test/java/com/getindata/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchIndexSchemaBuilder.java new file mode 100644 index 0000000..2a68b68 --- /dev/null +++ b/src/test/java/com/getindata/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchIndexSchemaBuilder.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.getindata.flink.connector.jdbc.testutils.databases.elasticsearch; + +import org.apache.flink.connector.jdbc.testutils.tables.TableRow; +import org.apache.flink.table.types.AtomicDataType; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataTypeVisitor; +import org.apache.flink.table.types.FieldsDataType; +import org.apache.flink.table.types.KeyValueDataType; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BinaryType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DayTimeIntervalType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.NullType; +import org.apache.flink.table.types.logical.RawType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.SymbolType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.YearMonthIntervalType; +import org.apache.flink.table.types.logical.ZonedTimestampType; + +import static java.lang.String.format; +import static java.util.Arrays.stream; +import static java.util.stream.Collectors.joining; + +/** Creates content for Elastic Index API call. */ +public class ElasticsearchIndexSchemaBuilder { + + private static final ElasticsearchDataTypeMapper MAPPER = new ElasticsearchDataTypeMapper(); + + public static String buildIndexSchema(TableRow tableRow) { + String fields = + stream(tableRow.getTableDataFields()) + .map( + field -> + format( + "\"%s\": %s", + field.getName(), + field.getDataType().accept(MAPPER))) + .collect(joining(", ")); + return "{\"settings\": {\"number_of_shards\": 1}, \"mappings\": {\"properties\": {" + + fields + + "}}}"; + } + + /** Maps Flink types to Elasticsearch types. */ + private static class ElasticsearchDataTypeMapper + implements DataTypeVisitor, LogicalTypeVisitor { + + @Override + public String visit(AtomicDataType atomicDataType) { + return atomicDataType.getLogicalType().accept(this); + } + + @Override + public String visit(CollectionDataType collectionDataType) { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public String visit(FieldsDataType fieldsDataType) { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public String visit(KeyValueDataType keyValueDataType) { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public String visit(CharType charType) { + throw new IllegalArgumentException("CharType is not supported."); + } + + @Override + public String visit(VarCharType varCharType) { + return "{\"type\": \"text\"}"; + } + + @Override + public String visit(BooleanType booleanType) { + return "{\"type\": \"boolean\"}"; + } + + @Override + public String visit(BinaryType binaryType) { + return "{\"type\": \"binary\"}"; + } + + @Override + public String visit(VarBinaryType varBinaryType) { + return "{\"type\": \"binary\"}"; + } + + @Override + public String visit(DecimalType decimalType) { + throw new IllegalArgumentException("DecimalType is not supported."); + } + + @Override + public String visit(TinyIntType tinyIntType) { + return "{\"type\": \"byte\"}"; + } + + @Override + public String visit(SmallIntType smallIntType) { + return "{\"type\": \"short\"}"; + } + + @Override + public String visit(IntType intType) { + return "{\"type\": \"integer\"}"; + } + + @Override + public String visit(BigIntType bigIntType) { + return "{\"type\": \"long\"}"; + } + + @Override + public String visit(FloatType floatType) { + return "{\"type\": \"float\"}"; + } + + @Override + public String visit(DoubleType doubleType) { + return "{\"type\": \"double\"}"; + } + + @Override + public String visit(DateType dateType) { + return "{\"type\": \"date\", \"format\": \"strict_date||basic_date\"}"; + } + + @Override + public String visit(TimeType timeType) { + throw new IllegalArgumentException("TimeType is not supported."); + } + + @Override + public String visit(TimestampType timestampType) { + return timestampType.getPrecision() <= 3 + ? "{\"type\": \"date\"}" + : "{\"type\": \"date_nanos\"}"; + } + + @Override + public String visit(ZonedTimestampType zonedTimestampType) { + throw new IllegalArgumentException("ZonedTimestampType is not supported."); + } + + @Override + public String visit(LocalZonedTimestampType localZonedTimestampType) { + throw new IllegalArgumentException("LocalZonedTimestampType is not supported."); + } + + @Override + public String visit(YearMonthIntervalType yearMonthIntervalType) { + throw new IllegalArgumentException("YearMonthIntervalType is not supported."); + } + + @Override + public String visit(DayTimeIntervalType dayTimeIntervalType) { + throw new IllegalArgumentException("DayTimeIntervalType is not supported."); + } + + @Override + public String visit(ArrayType arrayType) { + throw new IllegalArgumentException("ArrayType is not supported."); + } + + @Override + public String visit(MultisetType multisetType) { + throw new IllegalArgumentException("MultisetType is not supported."); + } + + @Override + public String visit(MapType mapType) { + throw new IllegalArgumentException("MapType is not supported."); + } + + @Override + public String visit(RowType rowType) { + throw new IllegalArgumentException("RowType is not supported."); + } + + @Override + public String visit(DistinctType distinctType) { + throw new IllegalArgumentException("DistinctType is not supported."); + } + + @Override + public String visit(StructuredType structuredType) { + throw new IllegalArgumentException("StructuredType is not supported."); + } + + @Override + public String visit(NullType nullType) { + throw new IllegalArgumentException("NullType is not supported."); + } + + @Override + public String visit(RawType rawType) { + throw new IllegalArgumentException("RawType is not supported."); + } + + @Override + public String visit(SymbolType symbolType) { + throw new IllegalArgumentException("SymbolType is not supported."); + } + + @Override + public String visit(LogicalType other) { + return other.accept(this); + } + } +} \ No newline at end of file diff --git a/src/test/java/com/getindata/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchMetadata.java b/src/test/java/com/getindata/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchMetadata.java new file mode 100644 index 0000000..93783c8 --- /dev/null +++ b/src/test/java/com/getindata/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchMetadata.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.getindata.flink.connector.jdbc.testutils.databases.elasticsearch; + +import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; + +import org.testcontainers.elasticsearch.ElasticsearchContainer; + +import javax.sql.XADataSource; + +/** Elasticsearch metadata. */ +public class ElasticsearchMetadata implements DatabaseMetadata { + + static final int ELASTIC_PORT = 9200; + static final String USERNAME = "elastic"; + static final String PASSWORD = "password"; + + private final String username; + private final String password; + private final String jdbcUrl; + private final String driver; + private final String version; + private final String containerHost; + private final int containerPort; + + public ElasticsearchMetadata(ElasticsearchContainer container) { + this.containerHost = container.getHost(); + this.containerPort = container.getMappedPort(ELASTIC_PORT); + this.username = USERNAME; + this.password = PASSWORD; + this.jdbcUrl = "jdbc:elasticsearch://" + containerHost + ":" + containerPort; + this.driver = "org.elasticsearch.xpack.sql.jdbc.EsDriver"; + this.version = container.getDockerImageName(); + } + + @Override + public String getJdbcUrl() { + return this.jdbcUrl; + } + + @Override + public String getJdbcUrlWithCredentials() { + return String.format("%s&user=%s&password=%s", getJdbcUrl(), getUsername(), getPassword()); + } + + @Override + public String getUsername() { + return this.username; + } + + @Override + public String getPassword() { + return this.password; + } + + @Override + public XADataSource buildXaDataSource() { + throw new UnsupportedOperationException(); + } + + @Override + public String getDriverClass() { + return this.driver; + } + + @Override + public String getVersion() { + return this.version; + } + + public String getContainerHost() { + return containerHost; + } + + public int getContainerPort() { + return containerPort; + } +} \ No newline at end of file diff --git a/src/test/java/com/getindata/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchRestClient.java b/src/test/java/com/getindata/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchRestClient.java new file mode 100644 index 0000000..efe1f5b --- /dev/null +++ b/src/test/java/com/getindata/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchRestClient.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.getindata.flink.connector.jdbc.testutils.databases.elasticsearch; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.util.EntityUtils; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RestClient; +import org.junit.jupiter.api.Assertions; + +import java.io.IOException; + +import static java.lang.String.format; + +/** Elasticsearch REST API client. */ +public class ElasticsearchRestClient { + + private static final ObjectMapper OBJECT_MAPPER = + new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + private final RestClient restClient; + + public ElasticsearchRestClient(ElasticsearchMetadata metadata) { + this( + metadata.getContainerHost(), + metadata.getContainerPort(), + metadata.getUsername(), + metadata.getPassword()); + } + + public ElasticsearchRestClient(String host, int port, String username, String password) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, new UsernamePasswordCredentials(username, password)); + this.restClient = + RestClient.builder(new HttpHost(host, port, "http")) + .setHttpClientConfigCallback( + builder -> + builder.setDefaultCredentialsProvider(credentialsProvider)) + .build(); + } + + public boolean trialEnabled() throws Exception { + Request request = new Request("GET", "/_license"); + ElasticLicenseResponse response = executeRequest(request, ElasticLicenseResponse.class); + return response != null + && response.license.status.equals("active") + && response.license.type.equals("trial"); + } + + public void enableTrial() throws Exception { + executeRequest(new Request("POST", "/_license/start_trial?acknowledge=true")); + } + + public void createIndex(String indexName, String indexDefinition) throws Exception { + Request request = new Request("PUT", format("/%s/", indexName)); + request.setJsonEntity(indexDefinition); + executeRequest(request); + } + + public void deleteIndex(String indexName) throws Exception { + executeRequest(new Request("DELETE", format("/%s/", indexName))); + } + + public void addDataBulk(String indexName, String content) throws Exception { + Request request = new Request("PUT", format("/%s/_bulk?refresh=true", indexName)); + request.setJsonEntity(content); + executeRequest(request); + } + + private T executeRequest(Request request, Class outputClass) throws IOException { + org.elasticsearch.client.Response response = restClient.performRequest(request); + Assertions.assertEquals(200, response.getStatusLine().getStatusCode()); + return OBJECT_MAPPER.readValue(EntityUtils.toString(response.getEntity()), outputClass); + } + + private void executeRequest(Request request) throws IOException { + org.elasticsearch.client.Response response = restClient.performRequest(request); + Assertions.assertEquals(200, response.getStatusLine().getStatusCode()); + } + + private static class ElasticLicenseResponse { + private static class License { + private String status; + private String type; + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + } + + private License license; + + public License getLicense() { + return license; + } + + public void setLicense(License license) { + this.license = license; + } + } +} \ No newline at end of file diff --git a/src/test/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectTypeTest.java b/src/test/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectTypeTest.java new file mode 100644 index 0000000..58de5c8 --- /dev/null +++ b/src/test/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectTypeTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.dialect; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for all DataTypes and Dialects of JDBC connector. */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class JdbcDialectTypeTest { + + private static final String DDL_FORMAT = + "CREATE TABLE T (f0 %s)" + + " WITH (" + + " 'connector'='jdbc'," + + " 'url'='jdbc:%s:memory:test'," + + " 'table-name'='myTable'" + + ")"; + + protected String testDialect() { + return "derby"; + } + + protected List testData() { + return Arrays.asList( + createTestItem("CHAR"), + createTestItem("VARCHAR"), + createTestItem("BOOLEAN"), + createTestItem("TINYINT"), + createTestItem("SMALLINT"), + createTestItem("INTEGER"), + createTestItem("BIGINT"), + createTestItem("FLOAT"), + createTestItem("DOUBLE"), + createTestItem("DECIMAL(10, 4)"), + createTestItem("DATE"), + createTestItem("TIME"), + createTestItem("TIMESTAMP(3)"), + createTestItem("TIMESTAMP WITHOUT TIME ZONE"), + createTestItem("TIMESTAMP(9) WITHOUT TIME ZONE"), + createTestItem("VARBINARY"), + + // Not valid data + createTestItem("BINARY", "The Derby dialect doesn't support type: BINARY(1)."), + createTestItem( + "VARBINARY(10)", "The Derby dialect doesn't support type: VARBINARY(10)."), + createTestItem( + "TIMESTAMP_LTZ(3)", + "The Derby dialect doesn't support type: TIMESTAMP_LTZ(3)."), + createTestItem( + "DECIMAL(38, 18)", + "The precision of field 'f0' is out of the DECIMAL precision range [1, 31] supported by Derby dialect.")); + } + + protected TestItem createTestItem(String dataType) { + return TestItem.of(testDialect(), dataType); + } + + protected TestItem createTestItem(String dataType, String expectError) { + return TestItem.of(testDialect(), dataType, expectError); + } + + @ParameterizedTest + @MethodSource("testData") + void testDataTypeValidate(TestItem testItem) { + String sqlDDL = String.format(DDL_FORMAT, testItem.dataTypeExpr, testItem.dialect); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + tEnv.executeSql(sqlDDL); + + if (testItem.expectError != null) { + assertThatThrownBy(() -> tEnv.sqlQuery("SELECT * FROM T")) + .satisfies(anyCauseMatches(testItem.expectError)); + } else { + tEnv.sqlQuery("SELECT * FROM T"); + } + } + + // ~ Inner Class + /** Test item for parameterized test. */ + public static class TestItem { + private final String dialect; + private final String dataTypeExpr; + private final String expectError; + + private TestItem(String dialect, String dataTypeExpr, @Nullable String expectError) { + this.dialect = dialect; + this.dataTypeExpr = dataTypeExpr; + this.expectError = expectError; + } + + static TestItem of(String dialect, String dataTypeExpr) { + return new TestItem(dialect, dataTypeExpr, null); + } + + static TestItem of(String dialect, String dataTypeExpr, String expectError) { + return new TestItem(dialect, dataTypeExpr, expectError); + } + + @Override + public String toString() { + return String.format("Dialect: %s, DataType: %s", dialect, dataTypeExpr); + } + } +} diff --git a/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseExtension.java b/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseExtension.java new file mode 100644 index 0000000..ad50d92 --- /dev/null +++ b/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseExtension.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.testutils; + +import org.apache.flink.util.function.BiConsumerWithException; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ExtensionContext.Namespace; + +import java.lang.reflect.Constructor; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +import static org.junit.platform.commons.support.AnnotationSupport.findRepeatableAnnotations; + +/** Database extension for testing. */ +public abstract class DatabaseExtension + implements BeforeAllCallback, + AfterAllCallback, + BeforeEachCallback, + AfterEachCallback, + ExtensionContext.Store.CloseableResource { + + /** + * Database Lifecycle for testing. The goal it's that all database containers are create only + * one time. + */ + public enum Lifecycle { + /** Database will be instantiated only one time. */ + PER_EXECUTION, + /** Database will be instantiated by class. */ + PER_CLASS + } + + protected abstract DatabaseMetadata startDatabase() throws Exception; + + protected abstract void stopDatabase() throws Exception; + + private final String uniqueKey = this.getClass().getSimpleName(); + + protected Lifecycle getLifecycle() { + return Lifecycle.PER_EXECUTION; + } + + private ExtensionContext.Store getStore(ExtensionContext context) { + return context.getRoot().getStore(Namespace.GLOBAL); + } + + private DatabaseTest getDatabaseBaseTest(Class clazz) throws Exception { + DatabaseTest dbClazz = null; + for (Constructor c : clazz.getDeclaredConstructors()) { + c.setAccessible(true); + dbClazz = (DatabaseTest) c.newInstance(); + } + return dbClazz; + } + + private void getManagedTables( + ExtensionContext context, + BiConsumerWithException execute) { + context.getTestClass() + .filter(DatabaseTest.class::isAssignableFrom) + .ifPresent( + clazz -> { + DatabaseMetadata metadata = + getStore(context).get(uniqueKey, DatabaseMetadata.class); + if (metadata != null) { + try (Connection conn = metadata.getConnection()) { + for (TableManaged table : + getDatabaseBaseTest(clazz).getManagedTables()) { + execute.accept(table, conn); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + } + + private boolean ignoreTestDatabase(ExtensionContext context) { + Set dbExtensions = retrieveDatabaseExtensions(context); + + if (dbExtensions.size() > 1) { + return uniqueKey.equals("DerbyDatabase") && dbExtensions.contains(uniqueKey); + } + return false; + } + + @Override + public final void beforeAll(ExtensionContext context) throws Exception { + if (ignoreTestDatabase(context)) { + return; + } + + if (getStore(context).get(uniqueKey) == null) { + getStore(context).put(uniqueKey, startDatabase()); + } + + getManagedTables(context, TableManaged::createTable); + } + + @Override + public final void beforeEach(ExtensionContext context) throws Exception {} + + @Override + public final void afterEach(ExtensionContext context) throws Exception { + if (ignoreTestDatabase(context)) { + return; + } + getManagedTables(context, TableManaged::deleteTable); + } + + @Override + public final void afterAll(ExtensionContext context) throws Exception { + if (ignoreTestDatabase(context)) { + return; + } + getManagedTables(context, TableManaged::dropTable); + if (Lifecycle.PER_CLASS == getLifecycle()) { + stopDatabase(); + getStore(context).remove(uniqueKey, DatabaseMetadata.class); + } + } + + @Override + public final void close() throws Throwable { + if (Lifecycle.PER_EXECUTION == getLifecycle()) { + stopDatabase(); + } + } + + private Set retrieveDatabaseExtensions(final ExtensionContext context) { + + BiFunction, Set> retrieveExtensions = + new BiFunction, Set>() { + + @Override + public Set apply(ExtensionContext context, Set acc) { + Set current = new HashSet<>(acc); + current.addAll( + findRepeatableAnnotations(context.getElement(), ExtendWith.class) + .stream() + .flatMap(extendWith -> Arrays.stream(extendWith.value())) + .filter(DatabaseExtension.class::isAssignableFrom) + .map(Class::getSimpleName) + .collect(Collectors.toSet())); + + return context.getParent() + .map(extensionContext -> apply(extensionContext, current)) + .orElse(current); + } + }; + + return retrieveExtensions.apply(context, new HashSet<>()); + } +} diff --git a/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseMetadata.java b/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseMetadata.java new file mode 100644 index 0000000..9e57a71 --- /dev/null +++ b/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseMetadata.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.testutils; + +import org.apache.flink.connector.jdbc.JdbcConnectionOptions; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.function.SerializableSupplier; + +import javax.sql.XADataSource; +import java.io.Serializable; +import java.sql.Connection; +import java.sql.DriverManager; + +/** Describes a database: driver, schema and urls. */ +public interface DatabaseMetadata extends Serializable { + + String getJdbcUrl(); + + String getJdbcUrlWithCredentials(); + + String getUsername(); + + String getPassword(); + + XADataSource buildXaDataSource(); + + String getDriverClass(); + + String getVersion(); + + default SerializableSupplier getXaSourceSupplier() { + return this::buildXaDataSource; + } + + default JdbcConnectionOptions getConnectionOptions() { + return new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() + .withDriverName(getDriverClass()) + .withUrl(getJdbcUrl()) + .withUsername(getUsername()) + .withPassword(getPassword()) + .build(); + } + + default Connection getConnection() { + try { + Class.forName(getDriverClass()); + return DriverManager.getConnection(getJdbcUrl(), getUsername(), getPassword()); + } catch (Exception e) { + throw new FlinkRuntimeException(e); + } + } +} diff --git a/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseTest.java b/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseTest.java new file mode 100644 index 0000000..d0f26fa --- /dev/null +++ b/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.testutils; + +import java.util.Collections; +import java.util.List; + +/** Base interface for tests that have dependency in a database. */ +public interface DatabaseTest { + + DatabaseMetadata getMetadata(); + + default List getManagedTables() { + return Collections.emptyList(); + } +} diff --git a/src/test/java/org/apache/flink/connector/jdbc/testutils/JdbcITCaseBase.java b/src/test/java/org/apache/flink/connector/jdbc/testutils/JdbcITCaseBase.java new file mode 100644 index 0000000..c3b9832 --- /dev/null +++ b/src/test/java/org/apache/flink/connector/jdbc/testutils/JdbcITCaseBase.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.testutils; + +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +/** A base for ITCase implementations. */ +public interface JdbcITCaseBase { + + @RegisterExtension + MiniClusterExtension MINI_CLUSTER = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(2) + .build()); +} diff --git a/src/test/java/org/apache/flink/connector/jdbc/testutils/TableManaged.java b/src/test/java/org/apache/flink/connector/jdbc/testutils/TableManaged.java new file mode 100644 index 0000000..dd43a09 --- /dev/null +++ b/src/test/java/org/apache/flink/connector/jdbc/testutils/TableManaged.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.testutils; + +import java.sql.Connection; +import java.sql.SQLException; + +/** Table that can be manage by {@link DatabaseExtension}. */ +public interface TableManaged { + + String getTableName(); + + void createTable(Connection conn) throws SQLException; + + void deleteTable(Connection conn) throws SQLException; + + void dropTable(Connection conn) throws SQLException; +} diff --git a/src/test/java/org/apache/flink/connector/jdbc/testutils/functions/JdbcResultSetBuilder.java b/src/test/java/org/apache/flink/connector/jdbc/testutils/functions/JdbcResultSetBuilder.java new file mode 100644 index 0000000..d6734be --- /dev/null +++ b/src/test/java/org/apache/flink/connector/jdbc/testutils/functions/JdbcResultSetBuilder.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.testutils.functions; + +import java.io.Serializable; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +/** ResultSet builder. * */ +@FunctionalInterface +public interface JdbcResultSetBuilder extends Serializable { + List accept(ResultSet rs) throws SQLException; +} diff --git a/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBase.java b/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBase.java new file mode 100644 index 0000000..d8cbd79 --- /dev/null +++ b/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBase.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.testutils.tables; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.connector.jdbc.JdbcStatementBuilder; +import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.connector.jdbc.testutils.TableManaged; +import org.apache.flink.connector.jdbc.testutils.functions.JdbcResultSetBuilder; +import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.FunctionWithException; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** Base table operations. * */ +public abstract class TableBase implements TableManaged { + + private final String name; + private final TableField[] fields; + + protected TableBase(String name, TableField[] fields) { + Preconditions.checkArgument(name != null && !name.isEmpty(), "Table name must be defined"); + Preconditions.checkArgument( + fields != null && fields.length != 0, "Table fields must be defined"); + this.name = name; + this.fields = fields; + } + + protected abstract JdbcResultSetBuilder getResultSetBuilder(); + + public String getTableName() { + return name; + } + + private Stream getStreamFields() { + return Arrays.stream(this.fields); + } + + private Stream getStreamFieldNames() { + return getStreamFields().map(TableField::getName); + } + + private Stream getStreamDataTypes() { + return getStreamFields().map(TableField::getDataType); + } + + public String[] getTableFields() { + return getStreamFieldNames().toArray(String[]::new); + } + + public DataTypes.Field[] getTableDataFields() { + return getStreamFields() + .map(field -> DataTypes.FIELD(field.getName(), field.getDataType())) + .toArray(DataTypes.Field[]::new); + } + + public DataType[] getTableDataTypes() { + return getStreamDataTypes().toArray(DataType[]::new); + } + + public RowTypeInfo getTableRowTypeInfo() { + TypeInformation[] typesArray = + getStreamDataTypes() + .map(TypeConversions::fromDataTypeToLegacyInfo) + .toArray(TypeInformation[]::new); + String[] fieldsArray = getTableFields(); + return new RowTypeInfo(typesArray, fieldsArray); + } + + public RowType getTableRowType() { + LogicalType[] typesArray = + getStreamDataTypes().map(DataType::getLogicalType).toArray(LogicalType[]::new); + String[] fieldsArray = getTableFields(); + return RowType.of(typesArray, fieldsArray); + } + + public int[] getTableTypes() { + return getStreamDataTypes() + .map(DataType::getLogicalType) + .map(LogicalType::getTypeRoot) + .map(JdbcTypeUtil::logicalTypeToSqlType) + .mapToInt(x -> x) + .toArray(); + } + + public Schema getTableSchema() { + Schema.Builder schema = Schema.newBuilder(); + getStreamFields().forEach(field -> schema.column(field.getName(), field.getDataType())); + + String pkFields = + getStreamFields() + .filter(TableField::isPkField) + .map(TableField::getName) + .collect(Collectors.joining(", ")); + schema.primaryKeyNamed("PRIMARY", pkFields); + + return schema.build(); + } + + public ResolvedSchema getTableResolvedSchema() { + return ResolvedSchema.of( + getStreamFields() + .map(field -> Column.physical(field.getName(), field.getDataType())) + .collect(Collectors.toList())); + } + + public String getCreateQuery() { + String pkFields = + getStreamFields() + .filter(TableField::isPkField) + .map(TableField::getName) + .collect(Collectors.joining(", ")); + return String.format( + "CREATE TABLE %s (%s%s)", + name, + getStreamFields().map(TableField::asString).collect(Collectors.joining(", ")), + pkFields.isEmpty() ? "" : String.format(", PRIMARY KEY (%s)", pkFields)); + } + + public String getCreateQueryForFlink(DatabaseMetadata metadata, String newName) { + return getCreateQueryForFlink(metadata, newName, Collections.emptyList()); + } + + public String getCreateQueryForFlink( + DatabaseMetadata metadata, String newName, List withParams) { + return getCreateQueryForFlink( + metadata, newName, Arrays.asList(getTableFields()), withParams); + } + + public String getCreateQueryForFlink( + DatabaseMetadata metadata, + String newName, + List newFields, + List withParams) { + + Map fieldsMap = + getStreamFields().collect(Collectors.toMap(TableField::getName, f -> f)); + + String fields = + newFields.stream() + .map(fieldsMap::get) + .map(field -> String.format("%s %s", field.getName(), field.getDataType())) + .collect(Collectors.joining(", ")); + String pkFields = + getStreamFields() + .filter(TableField::isPkField) + .map(TableField::getName) + .collect(Collectors.joining(", ")); + + String primaryKey = + (pkFields.isEmpty()) + ? "" + : String.format(", PRIMARY KEY (%s) NOT ENFORCED", pkFields); + + List params = new ArrayList<>(); + params.add("'connector'='jdbc'"); + params.add(String.format("'table-name'='%s'", getTableName())); + params.add(String.format("'url'='%s'", metadata.getJdbcUrl())); + params.add(String.format("'username'='%s'", metadata.getUsername())); + params.add(String.format("'password'='%s'", metadata.getPassword())); + params.addAll(withParams); + + return String.format( + "CREATE TABLE %s (%s%s) WITH (%s)", + newName, fields, primaryKey, String.join(", ", params)); + } + + private String getInsertIntoQuery(String... values) { + return String.format( + "INSERT INTO %s (%s) VALUES %s", + name, + getStreamFieldNames().collect(Collectors.joining(", ")), + Arrays.stream(values) + .map(v -> String.format("(%s)", v)) + .collect(Collectors.joining(","))); + } + + public String getInsertIntoQuery() { + return getInsertIntoQuery( + getStreamFieldNames().map(x -> "?").collect(Collectors.joining(", "))); + } + + public String getSelectAllQuery() { + return String.format( + "SELECT %s FROM %s", getStreamFieldNames().collect(Collectors.joining(", ")), name); + } + + protected String getDeleteFromQuery() { + return String.format("DELETE FROM %s", name); + } + + public String getDropTableQuery() { + return String.format("DROP TABLE %s", name); + } + + public void createTable(Connection conn) throws SQLException { + executeUpdate(conn, getCreateQuery()); + } + + public void insertIntoTableValues(Connection conn, String... values) throws SQLException { + executeUpdate(conn, getInsertIntoQuery(values)); + } + + public List selectAllTable(DatabaseMetadata metadata) throws SQLException { + try (Connection conn = metadata.getConnection()) { + return selectAllTable(conn); + } + } + + public List selectAllTable(Connection conn) throws SQLException { + return executeStatement(conn, getSelectAllQuery(), getResultSetBuilder()); + } + + public void deleteTable(Connection conn) throws SQLException { + executeUpdate(conn, getDeleteFromQuery()); + } + + public void dropTable(Connection conn) throws SQLException { + executeUpdate(conn, getDropTableQuery()); + } + + protected void executeUpdate(Connection conn, String sql) throws SQLException { + try (Statement st = conn.createStatement()) { + st.executeUpdate(sql); + } + } + + protected List executeStatement( + Connection conn, String sql, JdbcResultSetBuilder rsGetter) throws SQLException { + try (Statement st = conn.createStatement(); + ResultSet rs = st.executeQuery(sql)) { + return rsGetter.accept(rs); + } + } + + protected int[] executeStatement( + Connection conn, String sql, JdbcStatementBuilder psSetter, List values) + throws SQLException { + try (PreparedStatement ps = conn.prepareStatement(sql)) { + for (T value : values) { + psSetter.accept(ps, value); + ps.addBatch(); + } + return ps.executeBatch(); + } + } + + protected T getNullable(ResultSet rs, FunctionWithException get) + throws SQLException { + T value = get.apply(rs); + return getNullable(rs, value); + } + + protected T getNullable(ResultSet rs, T value) throws SQLException { + return rs.wasNull() ? null : value; + } +} diff --git a/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBuilder.java b/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBuilder.java new file mode 100644 index 0000000..6a8f80d --- /dev/null +++ b/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBuilder.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.testutils.tables; + +import org.apache.flink.table.types.DataType; + +/** Table builder. * */ +public final class TableBuilder { + + public static TableRow tableRow(String name, TableField... fields) { + return new TableRow(name, fields); + } + + public static TableField field(String name, DataType dataType) { + return field(name, null, dataType); + } + + public static TableField field(String name, TableField.DbType dbType, DataType dataType) { + return createField(name, dbType, dataType, false); + } + + public static TableField pkField(String name, DataType dataType) { + return pkField(name, null, dataType); + } + + public static TableField pkField(String name, TableField.DbType dbType, DataType dataType) { + return createField(name, dbType, dataType, true); + } + + public static TableField.DbType dbType(String type) { + return new TableField.DbType(type); + } + + private static TableField createField( + String name, TableField.DbType dbType, DataType dataType, boolean pkField) { + return new TableField(name, dataType, dbType, pkField); + } +} diff --git a/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBuilderTest.java b/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBuilderTest.java new file mode 100644 index 0000000..6f57463 --- /dev/null +++ b/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBuilderTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.testutils.tables; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.junit.jupiter.api.Test; + +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.tableRow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class TableBuilderTest { + + TableBase table = + tableRow( + "test", + pkField("id", DataTypes.INT().notNull()), + field("name", DataTypes.VARCHAR(10))); + + @Test + void testTableCreationFails() { + assertThrows( + IllegalArgumentException.class, () -> tableRow(""), "Table name must be defined"); + assertThrows( + IllegalArgumentException.class, + () -> tableRow("test"), + "Table fields must be defined"); + } + + @Test + void testTableCreationOk() { + assertEquals("test", table.getTableName()); + assertEquals(2, table.getTableFields().length); + assertEquals("id", table.getTableFields()[0]); + assertEquals("name", table.getTableFields()[1]); + } + + @Test + void testQueryCreation() { + String expected = "CREATE TABLE test (id INT NOT NULL, name VARCHAR(10), PRIMARY KEY (id))"; + assertEquals(expected, table.getCreateQuery()); + } + + @Test + void testQueryCreationWithDbType() { + TableRow table = + tableRow( + "test", + pkField("id", dbType("DOUBLE").notNull(), DataTypes.FLOAT().notNull()), + field("type", dbType("REAL"), DataTypes.FLOAT())); + String expected = "CREATE TABLE test (id DOUBLE NOT NULL, type REAL, PRIMARY KEY (id))"; + assertEquals(expected, table.getCreateQuery()); + } + + @Test + void testQueryInsertInto() { + String expected = "INSERT INTO test (id, name) VALUES (?, ?)"; + assertEquals(expected, table.getInsertIntoQuery()); + } + + @Test + void testQuerySelectAll() { + String expected = "SELECT id, name FROM test"; + assertEquals(expected, table.getSelectAllQuery()); + } + + @Test + void testQueryDeleteFrom() { + String expected = "DELETE FROM test"; + assertEquals(expected, table.getDeleteFromQuery()); + } + + @Test + void testQueryDropTable() { + String expected = "DROP TABLE test"; + assertEquals(expected, table.getDropTableQuery()); + } + + @Test + void testRowTypeInfo() { + RowTypeInfo expected = + new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + assertEquals(expected, table.getTableRowTypeInfo()); + } + + @Test + void testRowType() { + RowType expected = + RowType.of( + new LogicalType[] {new IntType(false), new VarCharType(10)}, + new String[] {"id", "name"}); + + assertEquals(expected, table.getTableRowType()); + } +} diff --git a/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableField.java b/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableField.java new file mode 100644 index 0000000..97e7721 --- /dev/null +++ b/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableField.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.testutils.tables; + +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; + +/** Table field. * */ +public class TableField { + private final String name; + private final DbType dbType; + private final DataType dataType; + private final boolean pkField; + + protected TableField(String name, DataType dataType, DbType dbType, boolean pkField) { + Preconditions.checkNotNull(name, "Column name can not be null."); + Preconditions.checkNotNull(dataType, "Column data type can not be null."); + this.name = name; + this.dataType = dataType; + this.dbType = dbType; + this.pkField = pkField; + } + + public String getName() { + return this.name; + } + + public DataType getDataType() { + return this.dataType; + } + + public boolean isPkField() { + return pkField; + } + + public String asString() { + String fieldType = + (this.dbType != null) ? this.dbType.toString() : this.dataType.toString(); + return String.format("%s %s", this.name, fieldType); + } + + @Override + public String toString() { + return asString(); + } + + /** Field definition for database. */ + public static class DbType { + private final String type; + private Boolean nullable = true; + + public DbType(String type) { + this.type = type; + } + + public DbType notNull() { + this.nullable = false; + return this; + } + + @Override + public String toString() { + return String.format("%s%s", this.type, this.nullable ? "" : " NOT NULL"); + } + } +} diff --git a/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableRow.java b/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableRow.java new file mode 100644 index 0000000..87308fb --- /dev/null +++ b/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableRow.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.testutils.tables; + +import org.apache.flink.connector.jdbc.JdbcStatementBuilder; +import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.connector.jdbc.testutils.functions.JdbcResultSetBuilder; +import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.Row; + +import java.sql.Connection; +import java.sql.Date; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Row table. * */ +public class TableRow extends TableBase { + + protected TableRow(String name, TableField[] fields) { + super(name, fields); + } + + protected JdbcResultSetBuilder getResultSetBuilder() { + return (rs) -> { + List result = new ArrayList<>(); + DataTypes.Field[] fields = getTableDataFields(); + while (rs.next()) { + Row row = new Row(fields.length); + for (int i = 0; i < fields.length; i++) { + Object dbValue; + Class conversionClass = fields[i].getDataType().getConversionClass(); + if (conversionClass.equals(LocalTime.class)) { + dbValue = rs.getTime(i + 1); + } else if (conversionClass.equals(LocalDate.class)) { + dbValue = rs.getDate(i + 1); + } else if (conversionClass.equals(LocalDateTime.class)) { + dbValue = rs.getTimestamp(i + 1); + } else { + dbValue = rs.getObject(i + 1, conversionClass); + } + row.setField(i, getNullable(rs, dbValue)); + } + result.add(row); + } + return result; + }; + } + + private final JdbcStatementBuilder statementBuilder = + (ps, row) -> { + DataTypes.Field[] fields = getTableDataFields(); + for (int i = 0; i < row.getArity(); i++) { + DataType type = fields[i].getDataType(); + int dbType = + JdbcTypeUtil.logicalTypeToSqlType(type.getLogicalType().getTypeRoot()); + if (row.getField(i) == null) { + ps.setNull(i + 1, dbType); + } else { + if (type.getConversionClass().equals(LocalTime.class)) { + Time time = Time.valueOf(row.getFieldAs(i)); + ps.setTime(i + 1, time); + } else if (type.getConversionClass().equals(LocalDate.class)) { + ps.setDate(i + 1, Date.valueOf(row.getFieldAs(i))); + } else if (type.getConversionClass().equals(LocalDateTime.class)) { + ps.setTimestamp( + i + 1, Timestamp.valueOf(row.getFieldAs(i))); + } else { + ps.setObject(i + 1, row.getField(i)); + } + } + } + }; + + public void insertIntoTableValues(Connection conn, List values) throws SQLException { + executeStatement(conn, getInsertIntoQuery(), statementBuilder, values); + } + + public void checkContent(DatabaseMetadata metadata, Row... content) throws SQLException { + try (Connection dbConn = metadata.getConnection()) { + String[] results = + selectAllTable(dbConn).stream() + .map(Row::toString) + .sorted() + .toArray(String[]::new); + + assertThat(results) + .isEqualTo( + Arrays.stream(content) + .map(Row::toString) + .sorted() + .toArray(String[]::new)); + } + } +} diff --git a/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/templates/BooksTable.java b/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/templates/BooksTable.java new file mode 100644 index 0000000..a4a12ef --- /dev/null +++ b/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/templates/BooksTable.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.testutils.tables.templates; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.flink.connector.jdbc.JdbcStatementBuilder; +import org.apache.flink.connector.jdbc.testutils.TableManaged; +import org.apache.flink.connector.jdbc.testutils.functions.JdbcResultSetBuilder; +import org.apache.flink.connector.jdbc.testutils.tables.TableBase; +import org.apache.flink.connector.jdbc.testutils.tables.TableField; + +import java.io.Serializable; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static java.lang.String.format; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField; +import static org.apache.flink.table.api.DataTypes.DOUBLE; +import static org.apache.flink.table.api.DataTypes.INT; +import static org.apache.flink.table.api.DataTypes.VARCHAR; + +/** Book table template. * */ +public class BooksTable extends TableBase implements TableManaged { + + private final JdbcStatementBuilder statementBuilder = + (ps, book) -> { + ps.setInt(1, book.id); + ps.setString(2, book.title); + ps.setString(3, book.author); + if (book.price == null) { + ps.setNull(4, Types.DOUBLE); + } else { + ps.setDouble(4, book.price); + } + ps.setInt(5, book.qty); + }; + + private final JdbcResultSetBuilder resultSetBuilder = + (rs) -> { + List result = new ArrayList<>(); + while (rs.next()) { + result.add( + new BookEntry( + getNullable(rs, r -> r.getInt(1)), + getNullable(rs, r -> r.getString(2)), + getNullable(rs, r -> r.getString(3)), + getNullable(rs, r -> r.getDouble(4)), + getNullable(rs, r -> r.getInt(5)))); + } + return result; + }; + + public BooksTable(String name) { + super( + name, + Arrays.asList( + pkField("id", INT().notNull()), + field("title", VARCHAR(50)), + field("author", VARCHAR(50)), + field("price", dbType("FLOAT"), DOUBLE()), + field("qty", INT())) + .toArray(new TableField[0])); + } + + public String getSelectByIdBetweenQuery() { + return format("%s WHERE id BETWEEN ? AND ?", getSelectAllQuery()); + } + + public String getSelectByAuthorQuery() { + return format("%s WHERE author = ?", getSelectAllQuery()); + } + + public String getSelectAllNoQuantityQuery() { + return format("%s WHERE QTY < 0", getSelectAllQuery()); + } + + // public List getTestData() { + // return Arrays.asList(BooksStore.TEST_DATA); + // } + // public void insertTableTestData(Connection conn) throws SQLException { + // executeStatement(conn, getInsertIntoQuery(), statementBuilder, getTestData()); + // } + + public JdbcStatementBuilder getStatementBuilder() { + return statementBuilder; + } + + @Override + protected JdbcResultSetBuilder getResultSetBuilder() { + return resultSetBuilder; + } + + public List selectAllTable(Connection conn) throws SQLException { + return executeStatement(conn, getSelectAllQuery(), resultSetBuilder); + } + + /** Book table entry. * */ + public static class BookEntry implements Serializable { + public final Integer id; + public final String title; + public final String author; + public final Double price; + public final Integer qty; + + public BookEntry(Integer id, String title, String author, Double price, Integer qty) { + this.id = id; + this.title = title; + this.author = author; + this.price = price; + this.qty = qty; + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("id", id) + .append("title", title) + .append("author", author) + .append("price", price) + .append("qty", qty) + .toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + BookEntry bookEntry = (BookEntry) o; + return new EqualsBuilder() + .append(id, bookEntry.id) + .append(title, bookEntry.title) + .append(author, bookEntry.author) + .append(price, bookEntry.price) + .append(qty, bookEntry.qty) + .isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .append(id) + .append(title) + .append(author) + .append(price) + .append(qty) + .toHashCode(); + } + } +}