From 5f2ac2932cc829d6011a7c1f020222f757e780e6 Mon Sep 17 00:00:00 2001 From: Alexis Darnat Date: Sat, 22 May 2021 09:45:31 -0700 Subject: [PATCH 01/31] Add a Protected Getter on the JDBCFactory in JDBCRecordHandler, so children classes can access it --- .../connectors/athena/jdbc/manager/JdbcRecordHandler.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcRecordHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcRecordHandler.java index 6529d4c147..e2994c41dc 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcRecordHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcRecordHandler.java @@ -114,6 +114,11 @@ protected JdbcRecordHandler(final AmazonS3 amazonS3, final AWSSecretsManager sec this.databaseConnectionConfig = Validate.notNull(databaseConnectionConfig, "databaseConnectionConfig must not be null"); } + protected JdbcConnectionFactory getJdbcConnectionFactory() + { + return jdbcConnectionFactory; + } + private JdbcCredentialProvider getCredentialProvider() { final String secretName = this.databaseConnectionConfig.getSecret(); From 5f85f9f236fd49d314ce6063c7c115f0582ac61a Mon Sep 17 00:00:00 2001 From: Alexis Darnat Date: Sat, 22 May 2021 09:45:54 -0700 Subject: [PATCH 02/31] Add an intermediate constructor for more flexibility --- .../connectors/athena/jdbc/mysql/MySqlMetadataHandler.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/mysql/MySqlMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/mysql/MySqlMetadataHandler.java index 7d72ecc931..1cbfc5abe3 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/mysql/MySqlMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/mysql/MySqlMetadataHandler.java @@ -86,7 +86,12 @@ public MySqlMetadataHandler() */ public MySqlMetadataHandler(final DatabaseConnectionConfig databaseConnectionConfig) { - super(databaseConnectionConfig, new GenericJdbcConnectionFactory(databaseConnectionConfig, JDBC_PROPERTIES)); + this(databaseConnectionConfig, new GenericJdbcConnectionFactory(databaseConnectionConfig, JDBC_PROPERTIES)); + } + + public MySqlMetadataHandler(final DatabaseConnectionConfig databaseConnectionConfig, final JdbcConnectionFactory jdbcConnectionFactory) + { + super(databaseConnectionConfig, jdbcConnectionFactory); } @VisibleForTesting From 9fd5019e948de4231a2f6b392166dd5dbc39be74 Mon Sep 17 00:00:00 2001 From: Alexis Darnat Date: Sat, 22 May 2021 09:46:29 -0700 Subject: [PATCH 03/31] Make MySqlQueryStringBuilder constructor public. so that piece of code can be reused in a bigger scope --- .../connectors/athena/jdbc/mysql/MySqlQueryStringBuilder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/mysql/MySqlQueryStringBuilder.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/mysql/MySqlQueryStringBuilder.java index ee4e395759..66fab44ef4 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/mysql/MySqlQueryStringBuilder.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/mysql/MySqlQueryStringBuilder.java @@ -34,7 +34,7 @@ public class MySqlQueryStringBuilder extends JdbcSplitQueryBuilder { - MySqlQueryStringBuilder(final String quoteCharacters) + public MySqlQueryStringBuilder(final String quoteCharacters) { super(quoteCharacters); } From 645a2cd5b55b6be2b7ddbc1296a802471a883473 Mon Sep 17 00:00:00 2001 From: Alexis Darnat Date: Sat, 22 May 2021 09:47:19 -0700 Subject: [PATCH 04/31] Add an intermediate constructor for more flexibility --- .../connectors/athena/jdbc/mysql/MySqlRecordHandler.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/mysql/MySqlRecordHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/mysql/MySqlRecordHandler.java index 740eee6ccf..533a473ff2 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/mysql/MySqlRecordHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/mysql/MySqlRecordHandler.java @@ -67,9 +67,14 @@ public MySqlRecordHandler() } public MySqlRecordHandler(final DatabaseConnectionConfig databaseConnectionConfig) + { + this(databaseConnectionConfig, new GenericJdbcConnectionFactory(databaseConnectionConfig, MySqlMetadataHandler.JDBC_PROPERTIES)); + } + + public MySqlRecordHandler(final DatabaseConnectionConfig databaseConnectionConfig, final JdbcConnectionFactory jdbcConnectionFactory) { this(databaseConnectionConfig, AmazonS3ClientBuilder.defaultClient(), AWSSecretsManagerClientBuilder.defaultClient(), AmazonAthenaClientBuilder.defaultClient(), - new GenericJdbcConnectionFactory(databaseConnectionConfig, MySqlMetadataHandler.JDBC_PROPERTIES), new MySqlQueryStringBuilder(MYSQL_QUOTE_CHARACTER)); + jdbcConnectionFactory, new MySqlQueryStringBuilder(MYSQL_QUOTE_CHARACTER)); } @VisibleForTesting From 347ac93fa67078bf081705450ff2951964f7fd84 Mon Sep 17 00:00:00 2001 From: Richard Date: Wed, 16 Jun 2021 13:51:34 -0700 Subject: [PATCH 05/31] add snowflake drivers --- athena-jdbc/pom.xml | 5 +++++ .../jdbc/connection/GenericJdbcConnectionFactory.java | 6 +++++- .../athena/jdbc/connection/JdbcConnectionFactory.java | 7 ++++--- .../amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java | 1 + 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/athena-jdbc/pom.xml b/athena-jdbc/pom.xml index b4f7db44c7..6ac35636b1 100644 --- a/athena-jdbc/pom.xml +++ b/athena-jdbc/pom.xml @@ -27,6 +27,11 @@ + + net.snowflake + snowflake-jdbc + 3.13.4 + com.amazonaws athena-federation-integ-test diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/connection/GenericJdbcConnectionFactory.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/connection/GenericJdbcConnectionFactory.java index 37dad4d22b..5a646826c0 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/connection/GenericJdbcConnectionFactory.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/connection/GenericJdbcConnectionFactory.java @@ -56,13 +56,17 @@ public class GenericJdbcConnectionFactory private static final String REDSHIFT_DRIVER_CLASS = "com.amazon.redshift.jdbc.Driver"; private static final int REDSHIFT_DEFAULT_PORT = 5439; + private static final String SNOWFLAKE_DRIVER_CLASS = "net.snowflake.client.jdbc.SnowflakeDriver"; + private static final int SNOWFLAKE_DEFAULT_PORT = 443; + private static final String SECRET_NAME_PATTERN_STRING = "(\\$\\{[a-zA-Z0-9/_+=.@-]+})"; public static final Pattern SECRET_NAME_PATTERN = Pattern.compile(SECRET_NAME_PATTERN_STRING); private static final ImmutableMap CONNECTION_INFO = ImmutableMap.of( DatabaseEngine.MYSQL, new DatabaseConnectionInfo(MYSQL_DRIVER_CLASS, MYSQL_DEFAULT_PORT), DatabaseEngine.POSTGRES, new DatabaseConnectionInfo(POSTGRESQL_DRIVER_CLASS, POSTGRESQL_DEFAULT_PORT), - DatabaseEngine.REDSHIFT, new DatabaseConnectionInfo(REDSHIFT_DRIVER_CLASS, REDSHIFT_DEFAULT_PORT)); + DatabaseEngine.REDSHIFT, new DatabaseConnectionInfo(REDSHIFT_DRIVER_CLASS, REDSHIFT_DEFAULT_PORT), + DatabaseEngine.SNOWFLAKE, new DatabaseConnectionInfo(SNOWFLAKE_DRIVER_CLASS, SNOWFLAKE_DEFAULT_PORT)); private final DatabaseConnectionConfig databaseConnectionConfig; private final Properties jdbcProperties; diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/connection/JdbcConnectionFactory.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/connection/JdbcConnectionFactory.java index 36502327b4..55694742bc 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/connection/JdbcConnectionFactory.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/connection/JdbcConnectionFactory.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -41,7 +41,8 @@ enum DatabaseEngine { MYSQL("mysql"), POSTGRES("postgres"), - REDSHIFT("redshift"); + REDSHIFT("redshift"), + SNOWFLAKE("snowflake"); private final String dbName; diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java index 81ed8e5dcf..84ec498a59 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java @@ -101,6 +101,7 @@ private static JdbcMetadataHandler createJdbcMetadataHandler(final DatabaseConne case MYSQL: return new MySqlMetadataHandler(databaseConnectionConfig); case REDSHIFT: + case SNOWFLAKE: case POSTGRES: return new PostGreSqlMetadataHandler(databaseConnectionConfig); default: From bf65adbc2b7b662d48f915485824369289e27bc6 Mon Sep 17 00:00:00 2001 From: Richard Date: Wed, 16 Jun 2021 14:20:26 -0700 Subject: [PATCH 06/31] add missing case --- .../com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java | 1 + 1 file changed, 1 insertion(+) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java index 84ec498a59..b0996569e8 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java @@ -151,6 +151,7 @@ private static JdbcRecordHandler createJdbcRecordHandler(final DatabaseConnectio case MYSQL: return new MySqlRecordHandler(databaseConnectionConfig); case REDSHIFT: + case SNOWFLAKE: case POSTGRES: return new PostGreSqlRecordHandler(databaseConnectionConfig); default: From ed92b0bca47f0a633dc5243b7f467c73794274a5 Mon Sep 17 00:00:00 2001 From: Alexis Darnat Date: Tue, 22 Jun 2021 14:58:23 -0700 Subject: [PATCH 07/31] Use MySQL Metadatahandler + Recordhandler for Snowflake --- .../amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java index b0996569e8..1c56097cf6 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java @@ -98,10 +98,10 @@ public static Map createJdbcMetadataHandlerMap(fina private static JdbcMetadataHandler createJdbcMetadataHandler(final DatabaseConnectionConfig databaseConnectionConfig) { switch (databaseConnectionConfig.getType()) { + case SNOWFLAKE: case MYSQL: return new MySqlMetadataHandler(databaseConnectionConfig); case REDSHIFT: - case SNOWFLAKE: case POSTGRES: return new PostGreSqlMetadataHandler(databaseConnectionConfig); default: @@ -148,10 +148,10 @@ public static Map createJdbcRecordHandlerMap(final Ma private static JdbcRecordHandler createJdbcRecordHandler(final DatabaseConnectionConfig databaseConnectionConfig) { switch (databaseConnectionConfig.getType()) { + case SNOWFLAKE: case MYSQL: return new MySqlRecordHandler(databaseConnectionConfig); case REDSHIFT: - case SNOWFLAKE: case POSTGRES: return new PostGreSqlRecordHandler(databaseConnectionConfig); default: From bfb54cf512bed1663a1635472be71d39ce65da37 Mon Sep 17 00:00:00 2001 From: Richard Date: Thu, 24 Jun 2021 16:35:49 -0700 Subject: [PATCH 08/31] add logger to ResultSet --- .../connectors/athena/jdbc/manager/JdbcMetadataHandler.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java index 2aceb971d9..9696d06eb3 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java @@ -270,6 +270,12 @@ private ResultSet getColumns(final String catalogName, final TableName tableHand throws SQLException { String escape = metadata.getSearchStringEscape(); + Logger.warn( + '[SNOWFLAKE DEBUG] catalogName: {}, SchemaName: {}, TableName: {}', + catalogName, + escapeNamePattern(tableHandle.getSchemaName(), escape), + escapeNamePattern(tableHandle.getTableName(), escape), + ) return metadata.getColumns( catalogName, escapeNamePattern(tableHandle.getSchemaName(), escape), From 85772423ccd906c7d310d85f8c806215e4fe510c Mon Sep 17 00:00:00 2001 From: Richard Date: Thu, 24 Jun 2021 16:57:17 -0700 Subject: [PATCH 09/31] remove extra comma --- .../connectors/athena/jdbc/manager/JdbcMetadataHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java index 9696d06eb3..8458349846 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java @@ -274,7 +274,7 @@ private ResultSet getColumns(final String catalogName, final TableName tableHand '[SNOWFLAKE DEBUG] catalogName: {}, SchemaName: {}, TableName: {}', catalogName, escapeNamePattern(tableHandle.getSchemaName(), escape), - escapeNamePattern(tableHandle.getTableName(), escape), + escapeNamePattern(tableHandle.getTableName(), escape) ) return metadata.getColumns( catalogName, From c521cb908fb81116a98bc466761fae3466eb578d Mon Sep 17 00:00:00 2001 From: Richard Date: Thu, 24 Jun 2021 17:15:08 -0700 Subject: [PATCH 10/31] forgot semicolon --- .../connectors/athena/jdbc/manager/JdbcMetadataHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java index 8458349846..e5943c0409 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java @@ -275,7 +275,7 @@ private ResultSet getColumns(final String catalogName, final TableName tableHand catalogName, escapeNamePattern(tableHandle.getSchemaName(), escape), escapeNamePattern(tableHandle.getTableName(), escape) - ) + ); return metadata.getColumns( catalogName, escapeNamePattern(tableHandle.getSchemaName(), escape), From 0de25a0c919a542b8ae1d98e2b6e078dea0f9dea Mon Sep 17 00:00:00 2001 From: Richard Date: Thu, 24 Jun 2021 17:39:23 -0700 Subject: [PATCH 11/31] use double quotes, not single --- .../connectors/athena/jdbc/manager/JdbcMetadataHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java index e5943c0409..0332d5fa7f 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java @@ -271,7 +271,7 @@ private ResultSet getColumns(final String catalogName, final TableName tableHand { String escape = metadata.getSearchStringEscape(); Logger.warn( - '[SNOWFLAKE DEBUG] catalogName: {}, SchemaName: {}, TableName: {}', + "[SNOWFLAKE DEBUG] catalogName: {}, SchemaName: {}, TableName: {}", catalogName, escapeNamePattern(tableHandle.getSchemaName(), escape), escapeNamePattern(tableHandle.getTableName(), escape) From 28e31d7b1de3bd0fe19a2507cf70cceed5f30bb5 Mon Sep 17 00:00:00 2001 From: Richard Date: Thu, 24 Jun 2021 17:47:46 -0700 Subject: [PATCH 12/31] constant LOGGER instead of class" --- .../connectors/athena/jdbc/manager/JdbcMetadataHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java index 0332d5fa7f..50c678a507 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java @@ -270,7 +270,7 @@ private ResultSet getColumns(final String catalogName, final TableName tableHand throws SQLException { String escape = metadata.getSearchStringEscape(); - Logger.warn( + LOGGER.warn( "[SNOWFLAKE DEBUG] catalogName: {}, SchemaName: {}, TableName: {}", catalogName, escapeNamePattern(tableHandle.getSchemaName(), escape), From 0631334790fbbd2ef031e61d9e99db77cb457c84 Mon Sep 17 00:00:00 2001 From: Richard Date: Fri, 25 Jun 2021 15:14:20 -0700 Subject: [PATCH 13/31] try adding a snowflake metadata handler --- .../athena/jdbc/manager/JDBCUtil.java | 3 +- .../snowflake/SnowflakeMetadataHandler.java | 262 ++++++++++++++++++ 2 files changed, 264 insertions(+), 1 deletion(-) create mode 100644 athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java index b0996569e8..51848d4495 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java @@ -100,8 +100,9 @@ private static JdbcMetadataHandler createJdbcMetadataHandler(final DatabaseConne switch (databaseConnectionConfig.getType()) { case MYSQL: return new MySqlMetadataHandler(databaseConnectionConfig); - case REDSHIFT: case SNOWFLAKE: + return new SnowflakeMetadataHandler(databaseConnectionConfig); + case REDSHIFT: case POSTGRES: return new PostGreSqlMetadataHandler(databaseConnectionConfig); default: diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java new file mode 100644 index 0000000000..37fac7179f --- /dev/null +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java @@ -0,0 +1,262 @@ +/*- + * #%L + * athena-jdbc + * %% + * Copyright (C) 2019 Amazon Web Services + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package com.amazonaws.connectors.athena.jdbc.snowflake; + +import com.amazonaws.athena.connector.lambda.QueryStatusChecker; +import com.amazonaws.athena.connector.lambda.data.Block; +import com.amazonaws.athena.connector.lambda.data.BlockAllocator; +import com.amazonaws.athena.connector.lambda.data.BlockWriter; +import com.amazonaws.athena.connector.lambda.data.SchemaBuilder; +import com.amazonaws.athena.connector.lambda.domain.Split; +import com.amazonaws.athena.connector.lambda.domain.spill.SpillLocation; +import com.amazonaws.athena.connector.lambda.metadata.GetSplitsRequest; +import com.amazonaws.athena.connector.lambda.metadata.GetSplitsResponse; +import com.amazonaws.athena.connector.lambda.metadata.GetTableLayoutRequest; +import com.amazonaws.connectors.athena.jdbc.connection.DatabaseConnectionConfig; +import com.amazonaws.connectors.athena.jdbc.connection.GenericJdbcConnectionFactory; +import com.amazonaws.connectors.athena.jdbc.connection.JdbcConnectionFactory; +import com.amazonaws.connectors.athena.jdbc.manager.JDBCUtil; +import com.amazonaws.connectors.athena.jdbc.manager.JdbcMetadataHandler; +import com.amazonaws.connectors.athena.jdbc.manager.PreparedStatementBuilder; +import com.amazonaws.services.athena.AmazonAthena; +import com.amazonaws.services.secretsmanager.AWSSecretsManager; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import org.apache.arrow.vector.complex.reader.FieldReader; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Handles metadata for Snowflake. User must have access to `schemata`, `tables`, `columns`, `partitions` tables in + * information_schema. + */ +public class SnowflakeMetadataHandler + extends JdbcMetadataHandler +{ + static final Map JDBC_PROPERTIES = ImmutableMap.of("databaseTerm", "SCHEMA"); + static final String GET_PARTITIONS_QUERY = "SELECT nmsp_child.nspname AS child_schema, child.relname AS child FROM pg_inherits JOIN pg_class parent " + + "ON pg_inherits.inhparent = parent.oid JOIN pg_class child ON pg_inherits.inhrelid = child.oid JOIN pg_namespace nmsp_parent " + + "ON nmsp_parent.oid = parent.relnamespace JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace where nmsp_parent.nspname = ? " + + "AND parent.relname = ?"; + static final String BLOCK_PARTITION_COLUMN_NAME = "partition_name"; + static final String BLOCK_PARTITION_SCHEMA_COLUMN_NAME = "partition_schema_name"; + static final String ALL_PARTITIONS = "*"; + private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeMetadataHandler.class); + private static final String PARTITION_SCHEMA_NAME = "child_schema"; + private static final String PARTITION_NAME = "child"; + private static final int MAX_SPLITS_PER_REQUEST = 1000_000; + + /** + * Instantiates handler to be used by Lambda function directly. + * + * Recommend using {@link com.amazonaws.connectors.athena.jdbc.MultiplexingJdbcCompositeHandler} instead. + */ + public SnowflakeMetadataHandler() + { + this(JDBCUtil.getSingleDatabaseConfigFromEnv(JdbcConnectionFactory.DatabaseEngine.SNOWFLAKE)); + } + + public SnowflakeMetadataHandler(final DatabaseConnectionConfig databaseConnectionConfig) + { + super(databaseConnectionConfig, new GenericJdbcConnectionFactory(databaseConnectionConfig, JDBC_PROPERTIES)); + } + + @VisibleForTesting + protected SnowflakeMetadataHandler(final DatabaseConnectionConfig databaseConnectionConfig, final AWSSecretsManager secretsManager, + final AmazonAthena athena, final JdbcConnectionFactory jdbcConnectionFactory) + { + super(databaseConnectionConfig, secretsManager, athena, jdbcConnectionFactory); + } + + @Override + public Schema getPartitionSchema(final String catalogName) + { + SchemaBuilder schemaBuilder = SchemaBuilder.newBuilder() + .addField(BLOCK_PARTITION_SCHEMA_COLUMN_NAME, Types.MinorType.VARCHAR.getType()) + .addField(BLOCK_PARTITION_COLUMN_NAME, Types.MinorType.VARCHAR.getType()); + return schemaBuilder.build(); + } + + @Override + public void getPartitions(final BlockWriter blockWriter, final GetTableLayoutRequest getTableLayoutRequest, QueryStatusChecker queryStatusChecker) + { + LOGGER.info("{}: Catalog {}, table {}", getTableLayoutRequest.getQueryId(), getTableLayoutRequest.getTableName().getSchemaName(), + getTableLayoutRequest.getTableName().getTableName()); + try (Connection connection = getJdbcConnectionFactory().getConnection(getCredentialProvider())) { + List parameters = Arrays.asList(getTableLayoutRequest.getTableName().getSchemaName(), + getTableLayoutRequest.getTableName().getTableName()); + try (PreparedStatement preparedStatement = new PreparedStatementBuilder().withConnection(connection).withQuery(GET_PARTITIONS_QUERY).withParameters(parameters).build(); + ResultSet resultSet = preparedStatement.executeQuery()) { + // Return a single partition if no partitions defined + if (!resultSet.next()) { + blockWriter.writeRows((Block block, int rowNum) -> { + block.setValue(BLOCK_PARTITION_SCHEMA_COLUMN_NAME, rowNum, ALL_PARTITIONS); + block.setValue(BLOCK_PARTITION_COLUMN_NAME, rowNum, ALL_PARTITIONS); + //we wrote 1 row so we return 1 + return 1; + }); + } + else { + do { + final String partitionSchemaName = resultSet.getString(PARTITION_SCHEMA_NAME); + final String partitionName = resultSet.getString(PARTITION_NAME); + + // 1. Returns all partitions of table, we are not supporting constraints push down to filter partitions. + // 2. This API is not paginated, we could use order by and limit clause with offsets here. + blockWriter.writeRows((Block block, int rowNum) -> { + block.setValue(BLOCK_PARTITION_SCHEMA_COLUMN_NAME, rowNum, partitionSchemaName); + block.setValue(BLOCK_PARTITION_COLUMN_NAME, rowNum, partitionName); + //we wrote 1 row so we return 1 + return 1; + }); + } + while (resultSet.next()); + } + } + } + catch (SQLException sqlException) { + throw new RuntimeException(sqlException.getErrorCode() + ": " + sqlException.getMessage(), sqlException); + } + } + + @Override + public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest) + { + LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName()); + int partitionContd = decodeContinuationToken(getSplitsRequest); + Set splits = new HashSet<>(); + Block partitions = getSplitsRequest.getPartitions(); + + boolean splitterUsed = false; + if (partitions.getRowCount() == 1) { + FieldReader partitionsSchemaFieldReader = partitions.getFieldReader(BLOCK_PARTITION_SCHEMA_COLUMN_NAME); + partitionsSchemaFieldReader.setPosition(0); + FieldReader partitionsFieldReader = partitions.getFieldReader(BLOCK_PARTITION_COLUMN_NAME); + partitionsFieldReader.setPosition(0); + + if (ALL_PARTITIONS.equals(partitionsSchemaFieldReader.readText().toString()) && ALL_PARTITIONS.equals(partitionsFieldReader.readText().toString())) { + for (String splitClause : getSplitClauses(getSplitsRequest.getTableName())) { + //Every split must have a unique location if we wish to spill to avoid failures + SpillLocation spillLocation = makeSpillLocation(getSplitsRequest); + + Split.Builder splitBuilder = Split.newBuilder(spillLocation, makeEncryptionKey()) + .add(BLOCK_PARTITION_SCHEMA_COLUMN_NAME, String.valueOf(partitionsSchemaFieldReader.readText())) + .add(BLOCK_PARTITION_COLUMN_NAME, String.valueOf(splitClause)); + + splits.add(splitBuilder.build()); + + if (splits.size() >= MAX_SPLITS_PER_REQUEST) { + throw new RuntimeException("Max splits supported with splitter " + MAX_SPLITS_PER_REQUEST); + } + + splitterUsed = true; + } + } + } + + if (!splitterUsed) { + for (int curPartition = partitionContd; curPartition < partitions.getRowCount(); curPartition++) { + FieldReader partitionsSchemaFieldReader = partitions.getFieldReader(BLOCK_PARTITION_SCHEMA_COLUMN_NAME); + partitionsSchemaFieldReader.setPosition(curPartition); + FieldReader partitionsFieldReader = partitions.getFieldReader(BLOCK_PARTITION_COLUMN_NAME); + partitionsFieldReader.setPosition(curPartition); + + //Every split must have a unique location if we wish to spill to avoid failures + SpillLocation spillLocation = makeSpillLocation(getSplitsRequest); + + LOGGER.info("{}: Input partition is {}", getSplitsRequest.getQueryId(), String.valueOf(partitionsFieldReader.readText())); + Split.Builder splitBuilder = Split.newBuilder(spillLocation, makeEncryptionKey()) + .add(BLOCK_PARTITION_SCHEMA_COLUMN_NAME, String.valueOf(partitionsSchemaFieldReader.readText())) + .add(BLOCK_PARTITION_COLUMN_NAME, String.valueOf(partitionsFieldReader.readText())); + + splits.add(splitBuilder.build()); + + if (splits.size() >= MAX_SPLITS_PER_REQUEST) { + //We exceeded the number of split we want to return in a single request, return and provide a continuation token. + return new GetSplitsResponse(getSplitsRequest.getCatalogName(), splits, encodeContinuationToken(curPartition + 1)); + } + } + } + + return new GetSplitsResponse(getSplitsRequest.getCatalogName(), splits, null); + } + + private int decodeContinuationToken(GetSplitsRequest request) + { + if (request.hasContinuationToken()) { + return Integer.valueOf(request.getContinuationToken()); + } + + //No continuation token present + return 0; + } + + private String encodeContinuationToken(int partition) + { + return String.valueOf(partition); + } + + /** + * Converts an ARRAY column's TYPE_NAME (provided by the jdbc metadata) to an ArrowType. + * @param typeName The column's TYPE_NAME (e.g. _int4, _text, _float8, etc...) + * @param precision Used for BigDecimal ArrowType + * @param scale Used for BigDecimal ArrowType + * @return ArrowType equivalent of the fieldType. + */ + @Override + protected ArrowType getArrayArrowTypeFromTypeName(String typeName, int precision, int scale) + { + switch(typeName) { + case "_bool": + return Types.MinorType.BIT.getType(); + case "_int2": + return Types.MinorType.SMALLINT.getType(); + case "_int4": + return Types.MinorType.INT.getType(); + case "_int8": + return Types.MinorType.BIGINT.getType(); + case "_float4": + return Types.MinorType.FLOAT4.getType(); + case "_float8": + return Types.MinorType.FLOAT8.getType(); + case "_date": + return Types.MinorType.DATEDAY.getType(); + case "_timestamp": + return Types.MinorType.DATEMILLI.getType(); + case "_numeric": + return new ArrowType.Decimal(precision, scale); + default: + return super.getArrayArrowTypeFromTypeName(typeName, precision, scale); + } + } +} From f6fd9f54ffd64054769379a90aceab88f4c0ce6c Mon Sep 17 00:00:00 2001 From: Richard Date: Fri, 25 Jun 2021 15:19:46 -0700 Subject: [PATCH 14/31] import snowflake handler --- .../com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java | 1 + 1 file changed, 1 insertion(+) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java index 51848d4495..de8e27f42c 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java @@ -26,6 +26,7 @@ import com.amazonaws.connectors.athena.jdbc.mysql.MySqlRecordHandler; import com.amazonaws.connectors.athena.jdbc.postgresql.PostGreSqlMetadataHandler; import com.amazonaws.connectors.athena.jdbc.postgresql.PostGreSqlRecordHandler; +import com.amazonaws.connectors.athena.jdbc.snowflake.SnowflakeMetadataHandler; import com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.Validate; From 116f929e73434f516384ae9dffb7f423cb08bde8 Mon Sep 17 00:00:00 2001 From: Richard Date: Fri, 25 Jun 2021 15:42:48 -0700 Subject: [PATCH 15/31] try overriding ResultSet in child Snowflake --- .../jdbc/manager/JdbcMetadataHandler.java | 8 +------- .../snowflake/SnowflakeMetadataHandler.java | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java index 50c678a507..074a8c0482 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java @@ -266,16 +266,10 @@ private Schema getSchema(Connection jdbcConnection, TableName tableName, Schema } } - private ResultSet getColumns(final String catalogName, final TableName tableHandle, final DatabaseMetaData metadata) + protected ResultSet getColumns(final String catalogName, final TableName tableHandle, final DatabaseMetaData metadata) throws SQLException { String escape = metadata.getSearchStringEscape(); - LOGGER.warn( - "[SNOWFLAKE DEBUG] catalogName: {}, SchemaName: {}, TableName: {}", - catalogName, - escapeNamePattern(tableHandle.getSchemaName(), escape), - escapeNamePattern(tableHandle.getTableName(), escape) - ); return metadata.getColumns( catalogName, escapeNamePattern(tableHandle.getSchemaName(), escape), diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java index 37fac7179f..a45240204a 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java @@ -98,6 +98,24 @@ protected SnowflakeMetadataHandler(final DatabaseConnectionConfig databaseConnec super(databaseConnectionConfig, secretsManager, athena, jdbcConnectionFactory); } + @Override + protected ResultSet getColumns(final String catalogName, final TableName tableHandle, final DatabaseMetaData metadata) + throws SQLException + { + String escape = metadata.getSearchStringEscape(); + LOGGER.warn( + "[SNOWFLAKE DEBUG] catalogName: {}, SchemaName: {}, TableName: {}", + catalogName, + escapeNamePattern(tableHandle.getSchemaName(), escape), + escapeNamePattern(tableHandle.getTableName(), escape) + ); + return metadata.getColumns( + catalogName, + escapeNamePattern(tableHandle.getSchemaName(), escape), + escapeNamePattern(tableHandle.getTableName(), escape), + null); + } + @Override public Schema getPartitionSchema(final String catalogName) { From 8ddd852fe9c917bbaf341a948db66e5abb8cddf5 Mon Sep 17 00:00:00 2001 From: Richard Date: Fri, 25 Jun 2021 16:06:14 -0700 Subject: [PATCH 16/31] add imports --- .../athena/jdbc/snowflake/SnowflakeMetadataHandler.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java index a45240204a..b70c2b5354 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java @@ -26,6 +26,7 @@ import com.amazonaws.athena.connector.lambda.data.SchemaBuilder; import com.amazonaws.athena.connector.lambda.domain.Split; import com.amazonaws.athena.connector.lambda.domain.spill.SpillLocation; +import com.amazonaws.athena.connector.lambda.domain.TableName; import com.amazonaws.athena.connector.lambda.metadata.GetSplitsRequest; import com.amazonaws.athena.connector.lambda.metadata.GetSplitsResponse; import com.amazonaws.athena.connector.lambda.metadata.GetTableLayoutRequest; @@ -47,6 +48,7 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; +import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; From 92fc4ae18ecdcb091ca203f59ffac1d3086f55c5 Mon Sep 17 00:00:00 2001 From: Richard Date: Fri, 25 Jun 2021 16:13:12 -0700 Subject: [PATCH 17/31] try a different table name --- .../athena/jdbc/snowflake/SnowflakeMetadataHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java index b70c2b5354..78e2be0971 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java @@ -25,8 +25,8 @@ import com.amazonaws.athena.connector.lambda.data.BlockWriter; import com.amazonaws.athena.connector.lambda.data.SchemaBuilder; import com.amazonaws.athena.connector.lambda.domain.Split; -import com.amazonaws.athena.connector.lambda.domain.spill.SpillLocation; import com.amazonaws.athena.connector.lambda.domain.TableName; +import com.amazonaws.athena.connector.lambda.domain.spill.SpillLocation; import com.amazonaws.athena.connector.lambda.metadata.GetSplitsRequest; import com.amazonaws.athena.connector.lambda.metadata.GetSplitsResponse; import com.amazonaws.athena.connector.lambda.metadata.GetTableLayoutRequest; From 6f2b3a936fe9ce7bc8700a63d127efd3a8dc9563 Mon Sep 17 00:00:00 2001 From: Richard Date: Fri, 25 Jun 2021 16:34:13 -0700 Subject: [PATCH 18/31] try to upcase strings --- .../athena/jdbc/snowflake/SnowflakeMetadataHandler.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java index 78e2be0971..5df21a53a7 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java @@ -100,6 +100,12 @@ protected SnowflakeMetadataHandler(final DatabaseConnectionConfig databaseConnec super(databaseConnectionConfig, secretsManager, athena, jdbcConnectionFactory); } + @Override + protected String escapeNamePattern(final String name, final String escape) + { + return super.escapeNamePattern(name, escape).toUpperCase(); + } + @Override protected ResultSet getColumns(final String catalogName, final TableName tableHandle, final DatabaseMetaData metadata) throws SQLException From 7cc03d22bd2b05d72e01d08b6633b24015d5d359 Mon Sep 17 00:00:00 2001 From: Alexis Darnat Date: Tue, 6 Jul 2021 15:29:41 -0700 Subject: [PATCH 19/31] Fix --- .../com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java | 1 - 1 file changed, 1 deletion(-) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java index ab06dc4ba1..892b1ce12d 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java @@ -99,7 +99,6 @@ public static Map createJdbcMetadataHandlerMap(fina private static JdbcMetadataHandler createJdbcMetadataHandler(final DatabaseConnectionConfig databaseConnectionConfig) { switch (databaseConnectionConfig.getType()) { - case SNOWFLAKE: case MYSQL: return new MySqlMetadataHandler(databaseConnectionConfig); case SNOWFLAKE: From 2046bb692dc4b96458b1fb59aebfa54a44326176 Mon Sep 17 00:00:00 2001 From: Alexis Darnat Date: Tue, 6 Jul 2021 16:21:25 -0700 Subject: [PATCH 20/31] No partition, no splits for now --- .../snowflake/SnowflakeMetadataHandler.java | 180 ++++++++++-------- 1 file changed, 99 insertions(+), 81 deletions(-) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java index 5df21a53a7..f96aa58087 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java @@ -136,105 +136,123 @@ public Schema getPartitionSchema(final String catalogName) @Override public void getPartitions(final BlockWriter blockWriter, final GetTableLayoutRequest getTableLayoutRequest, QueryStatusChecker queryStatusChecker) { - LOGGER.info("{}: Catalog {}, table {}", getTableLayoutRequest.getQueryId(), getTableLayoutRequest.getTableName().getSchemaName(), - getTableLayoutRequest.getTableName().getTableName()); - try (Connection connection = getJdbcConnectionFactory().getConnection(getCredentialProvider())) { - List parameters = Arrays.asList(getTableLayoutRequest.getTableName().getSchemaName(), - getTableLayoutRequest.getTableName().getTableName()); - try (PreparedStatement preparedStatement = new PreparedStatementBuilder().withConnection(connection).withQuery(GET_PARTITIONS_QUERY).withParameters(parameters).build(); - ResultSet resultSet = preparedStatement.executeQuery()) { - // Return a single partition if no partitions defined - if (!resultSet.next()) { - blockWriter.writeRows((Block block, int rowNum) -> { - block.setValue(BLOCK_PARTITION_SCHEMA_COLUMN_NAME, rowNum, ALL_PARTITIONS); - block.setValue(BLOCK_PARTITION_COLUMN_NAME, rowNum, ALL_PARTITIONS); - //we wrote 1 row so we return 1 - return 1; - }); - } - else { - do { - final String partitionSchemaName = resultSet.getString(PARTITION_SCHEMA_NAME); - final String partitionName = resultSet.getString(PARTITION_NAME); - - // 1. Returns all partitions of table, we are not supporting constraints push down to filter partitions. - // 2. This API is not paginated, we could use order by and limit clause with offsets here. - blockWriter.writeRows((Block block, int rowNum) -> { - block.setValue(BLOCK_PARTITION_SCHEMA_COLUMN_NAME, rowNum, partitionSchemaName); - block.setValue(BLOCK_PARTITION_COLUMN_NAME, rowNum, partitionName); - //we wrote 1 row so we return 1 - return 1; - }); - } - while (resultSet.next()); - } - } - } - catch (SQLException sqlException) { - throw new RuntimeException(sqlException.getErrorCode() + ": " + sqlException.getMessage(), sqlException); - } + // blockWriter.writeRows((Block block, int rowNum) -> { + // block.setValue(BLOCK_PARTITION_SCHEMA_COLUMN_NAME, rowNum, ALL_PARTITIONS); + // block.setValue(BLOCK_PARTITION_COLUMN_NAME, rowNum, ALL_PARTITIONS); + // //we wrote 1 row so we return 1 + // return 1; + // }); + + + // LOGGER.info("{}: Catalog {}, table {}", getTableLayoutRequest.getQueryId(), getTableLayoutRequest.getTableName().getSchemaName(), + // getTableLayoutRequest.getTableName().getTableName()); + // try (Connection connection = getJdbcConnectionFactory().getConnection(getCredentialProvider())) { + // List parameters = Arrays.asList(getTableLayoutRequest.getTableName().getSchemaName(), + // getTableLayoutRequest.getTableName().getTableName()); + // try (PreparedStatement preparedStatement = new PreparedStatementBuilder().withConnection(connection).withQuery(GET_PARTITIONS_QUERY).withParameters(parameters).build(); + // ResultSet resultSet = preparedStatement.executeQuery()) { + // // Return a single partition if no partitions defined + // if (!resultSet.next()) { + // blockWriter.writeRows((Block block, int rowNum) -> { + // block.setValue(BLOCK_PARTITION_SCHEMA_COLUMN_NAME, rowNum, ALL_PARTITIONS); + // block.setValue(BLOCK_PARTITION_COLUMN_NAME, rowNum, ALL_PARTITIONS); + // //we wrote 1 row so we return 1 + // return 1; + // }); + // } + // else { + // do { + // final String partitionSchemaName = resultSet.getString(PARTITION_SCHEMA_NAME); + // final String partitionName = resultSet.getString(PARTITION_NAME); + + // // 1. Returns all partitions of table, we are not supporting constraints push down to filter partitions. + // // 2. This API is not paginated, we could use order by and limit clause with offsets here. + // blockWriter.writeRows((Block block, int rowNum) -> { + // block.setValue(BLOCK_PARTITION_SCHEMA_COLUMN_NAME, rowNum, partitionSchemaName); + // block.setValue(BLOCK_PARTITION_COLUMN_NAME, rowNum, partitionName); + // //we wrote 1 row so we return 1 + // return 1; + // }); + // } + // while (resultSet.next()); + // } + // } + // } + // catch (SQLException sqlException) { + // throw new RuntimeException(sqlException.getErrorCode() + ": " + sqlException.getMessage(), sqlException); + // } } @Override public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest) { - LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName()); - int partitionContd = decodeContinuationToken(getSplitsRequest); + logger.info("doGetSplits: enter - " + getSplitsRequest); + + String catalogName = getSplitsRequest.getCatalogName(); Set splits = new HashSet<>(); - Block partitions = getSplitsRequest.getPartitions(); - boolean splitterUsed = false; - if (partitions.getRowCount() == 1) { - FieldReader partitionsSchemaFieldReader = partitions.getFieldReader(BLOCK_PARTITION_SCHEMA_COLUMN_NAME); - partitionsSchemaFieldReader.setPosition(0); - FieldReader partitionsFieldReader = partitions.getFieldReader(BLOCK_PARTITION_COLUMN_NAME); - partitionsFieldReader.setPosition(0); + Split split = Split.newBuilder(makeSpillLocation(getSplitsRequest), makeEncryptionKey()).build(); + splits.add(split); - if (ALL_PARTITIONS.equals(partitionsSchemaFieldReader.readText().toString()) && ALL_PARTITIONS.equals(partitionsFieldReader.readText().toString())) { - for (String splitClause : getSplitClauses(getSplitsRequest.getTableName())) { - //Every split must have a unique location if we wish to spill to avoid failures - SpillLocation spillLocation = makeSpillLocation(getSplitsRequest); + logger.info("doGetSplits: exit - " + splits.size()); + return new GetSplitsResponse(catalogName, splits); + // LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName()); + // int partitionContd = decodeContinuationToken(getSplitsRequest); + // Set splits = new HashSet<>(); + // Block partitions = getSplitsRequest.getPartitions(); - Split.Builder splitBuilder = Split.newBuilder(spillLocation, makeEncryptionKey()) - .add(BLOCK_PARTITION_SCHEMA_COLUMN_NAME, String.valueOf(partitionsSchemaFieldReader.readText())) - .add(BLOCK_PARTITION_COLUMN_NAME, String.valueOf(splitClause)); + // boolean splitterUsed = false; + // if (partitions.getRowCount() == 1) { + // FieldReader partitionsSchemaFieldReader = partitions.getFieldReader(BLOCK_PARTITION_SCHEMA_COLUMN_NAME); + // partitionsSchemaFieldReader.setPosition(0); + // FieldReader partitionsFieldReader = partitions.getFieldReader(BLOCK_PARTITION_COLUMN_NAME); + // partitionsFieldReader.setPosition(0); - splits.add(splitBuilder.build()); + // if (ALL_PARTITIONS.equals(partitionsSchemaFieldReader.readText().toString()) && ALL_PARTITIONS.equals(partitionsFieldReader.readText().toString())) { + // for (String splitClause : getSplitClauses(getSplitsRequest.getTableName())) { + // //Every split must have a unique location if we wish to spill to avoid failures + // SpillLocation spillLocation = makeSpillLocation(getSplitsRequest); - if (splits.size() >= MAX_SPLITS_PER_REQUEST) { - throw new RuntimeException("Max splits supported with splitter " + MAX_SPLITS_PER_REQUEST); - } + // Split.Builder splitBuilder = Split.newBuilder(spillLocation, makeEncryptionKey()) + // .add(BLOCK_PARTITION_SCHEMA_COLUMN_NAME, String.valueOf(partitionsSchemaFieldReader.readText())) + // .add(BLOCK_PARTITION_COLUMN_NAME, String.valueOf(splitClause)); - splitterUsed = true; - } - } - } + // splits.add(splitBuilder.build()); - if (!splitterUsed) { - for (int curPartition = partitionContd; curPartition < partitions.getRowCount(); curPartition++) { - FieldReader partitionsSchemaFieldReader = partitions.getFieldReader(BLOCK_PARTITION_SCHEMA_COLUMN_NAME); - partitionsSchemaFieldReader.setPosition(curPartition); - FieldReader partitionsFieldReader = partitions.getFieldReader(BLOCK_PARTITION_COLUMN_NAME); - partitionsFieldReader.setPosition(curPartition); + // if (splits.size() >= MAX_SPLITS_PER_REQUEST) { + // throw new RuntimeException("Max splits supported with splitter " + MAX_SPLITS_PER_REQUEST); + // } - //Every split must have a unique location if we wish to spill to avoid failures - SpillLocation spillLocation = makeSpillLocation(getSplitsRequest); + // splitterUsed = true; + // } + // } + // } - LOGGER.info("{}: Input partition is {}", getSplitsRequest.getQueryId(), String.valueOf(partitionsFieldReader.readText())); - Split.Builder splitBuilder = Split.newBuilder(spillLocation, makeEncryptionKey()) - .add(BLOCK_PARTITION_SCHEMA_COLUMN_NAME, String.valueOf(partitionsSchemaFieldReader.readText())) - .add(BLOCK_PARTITION_COLUMN_NAME, String.valueOf(partitionsFieldReader.readText())); + // if (!splitterUsed) { + // for (int curPartition = partitionContd; curPartition < partitions.getRowCount(); curPartition++) { + // FieldReader partitionsSchemaFieldReader = partitions.getFieldReader(BLOCK_PARTITION_SCHEMA_COLUMN_NAME); + // partitionsSchemaFieldReader.setPosition(curPartition); + // FieldReader partitionsFieldReader = partitions.getFieldReader(BLOCK_PARTITION_COLUMN_NAME); + // partitionsFieldReader.setPosition(curPartition); - splits.add(splitBuilder.build()); + // //Every split must have a unique location if we wish to spill to avoid failures + // SpillLocation spillLocation = makeSpillLocation(getSplitsRequest); - if (splits.size() >= MAX_SPLITS_PER_REQUEST) { - //We exceeded the number of split we want to return in a single request, return and provide a continuation token. - return new GetSplitsResponse(getSplitsRequest.getCatalogName(), splits, encodeContinuationToken(curPartition + 1)); - } - } - } + // LOGGER.info("{}: Input partition is {}", getSplitsRequest.getQueryId(), String.valueOf(partitionsFieldReader.readText())); + // Split.Builder splitBuilder = Split.newBuilder(spillLocation, makeEncryptionKey()) + // .add(BLOCK_PARTITION_SCHEMA_COLUMN_NAME, String.valueOf(partitionsSchemaFieldReader.readText())) + // .add(BLOCK_PARTITION_COLUMN_NAME, String.valueOf(partitionsFieldReader.readText())); + + // splits.add(splitBuilder.build()); + + // if (splits.size() >= MAX_SPLITS_PER_REQUEST) { + // //We exceeded the number of split we want to return in a single request, return and provide a continuation token. + // return new GetSplitsResponse(getSplitsRequest.getCatalogName(), splits, encodeContinuationToken(curPartition + 1)); + // } + // } + // } - return new GetSplitsResponse(getSplitsRequest.getCatalogName(), splits, null); + // return new GetSplitsResponse(getSplitsRequest.getCatalogName(), splits, null); } private int decodeContinuationToken(GetSplitsRequest request) From f5141dfb9839d94d5c14b5d967f6e5a7aac1c118 Mon Sep 17 00:00:00 2001 From: Alexis Darnat Date: Tue, 6 Jul 2021 16:27:34 -0700 Subject: [PATCH 21/31] Fix --- .../snowflake/SnowflakeMetadataHandler.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java index f96aa58087..65e7000ec7 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java @@ -20,13 +20,13 @@ package com.amazonaws.connectors.athena.jdbc.snowflake; import com.amazonaws.athena.connector.lambda.QueryStatusChecker; -import com.amazonaws.athena.connector.lambda.data.Block; +// import com.amazonaws.athena.connector.lambda.data.Block; import com.amazonaws.athena.connector.lambda.data.BlockAllocator; import com.amazonaws.athena.connector.lambda.data.BlockWriter; import com.amazonaws.athena.connector.lambda.data.SchemaBuilder; import com.amazonaws.athena.connector.lambda.domain.Split; import com.amazonaws.athena.connector.lambda.domain.TableName; -import com.amazonaws.athena.connector.lambda.domain.spill.SpillLocation; +// import com.amazonaws.athena.connector.lambda.domain.spill.SpillLocation; import com.amazonaws.athena.connector.lambda.metadata.GetSplitsRequest; import com.amazonaws.athena.connector.lambda.metadata.GetSplitsResponse; import com.amazonaws.athena.connector.lambda.metadata.GetTableLayoutRequest; @@ -35,26 +35,26 @@ import com.amazonaws.connectors.athena.jdbc.connection.JdbcConnectionFactory; import com.amazonaws.connectors.athena.jdbc.manager.JDBCUtil; import com.amazonaws.connectors.athena.jdbc.manager.JdbcMetadataHandler; -import com.amazonaws.connectors.athena.jdbc.manager.PreparedStatementBuilder; +// import com.amazonaws.connectors.athena.jdbc.manager.PreparedStatementBuilder; import com.amazonaws.services.athena.AmazonAthena; import com.amazonaws.services.secretsmanager.AWSSecretsManager; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; -import org.apache.arrow.vector.complex.reader.FieldReader; +// import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.Connection; +// import java.sql.Connection; import java.sql.DatabaseMetaData; -import java.sql.PreparedStatement; +// import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.Arrays; +// import java.util.Arrays; import java.util.HashSet; -import java.util.List; +// import java.util.List; import java.util.Map; import java.util.Set; @@ -143,7 +143,6 @@ public void getPartitions(final BlockWriter blockWriter, final GetTableLayoutReq // return 1; // }); - // LOGGER.info("{}: Catalog {}, table {}", getTableLayoutRequest.getQueryId(), getTableLayoutRequest.getTableName().getSchemaName(), // getTableLayoutRequest.getTableName().getTableName()); // try (Connection connection = getJdbcConnectionFactory().getConnection(getCredentialProvider())) { From 3756f11ebcb781aa398ba01d6a812167010f111b Mon Sep 17 00:00:00 2001 From: Alexis Darnat Date: Tue, 6 Jul 2021 16:31:58 -0700 Subject: [PATCH 22/31] Fix --- .../athena/jdbc/snowflake/SnowflakeMetadataHandler.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java index 65e7000ec7..7a7cf56d41 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java @@ -20,13 +20,11 @@ package com.amazonaws.connectors.athena.jdbc.snowflake; import com.amazonaws.athena.connector.lambda.QueryStatusChecker; -// import com.amazonaws.athena.connector.lambda.data.Block; import com.amazonaws.athena.connector.lambda.data.BlockAllocator; import com.amazonaws.athena.connector.lambda.data.BlockWriter; import com.amazonaws.athena.connector.lambda.data.SchemaBuilder; import com.amazonaws.athena.connector.lambda.domain.Split; import com.amazonaws.athena.connector.lambda.domain.TableName; -// import com.amazonaws.athena.connector.lambda.domain.spill.SpillLocation; import com.amazonaws.athena.connector.lambda.metadata.GetSplitsRequest; import com.amazonaws.athena.connector.lambda.metadata.GetSplitsResponse; import com.amazonaws.athena.connector.lambda.metadata.GetTableLayoutRequest; @@ -35,26 +33,20 @@ import com.amazonaws.connectors.athena.jdbc.connection.JdbcConnectionFactory; import com.amazonaws.connectors.athena.jdbc.manager.JDBCUtil; import com.amazonaws.connectors.athena.jdbc.manager.JdbcMetadataHandler; -// import com.amazonaws.connectors.athena.jdbc.manager.PreparedStatementBuilder; import com.amazonaws.services.athena.AmazonAthena; import com.amazonaws.services.secretsmanager.AWSSecretsManager; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; -// import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// import java.sql.Connection; import java.sql.DatabaseMetaData; -// import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -// import java.util.Arrays; import java.util.HashSet; -// import java.util.List; import java.util.Map; import java.util.Set; From 2f5c3d1be9e17b7da59246ae1f866ff00beaf832 Mon Sep 17 00:00:00 2001 From: Alexis Darnat Date: Tue, 6 Jul 2021 16:36:09 -0700 Subject: [PATCH 23/31] Update lib --- .../athena/jdbc/snowflake/SnowflakeMetadataHandler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java index 7a7cf56d41..89d312aefa 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java @@ -177,7 +177,7 @@ public void getPartitions(final BlockWriter blockWriter, final GetTableLayoutReq @Override public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest) { - logger.info("doGetSplits: enter - " + getSplitsRequest); + LOGGER.info("doGetSplits: enter - " + getSplitsRequest); String catalogName = getSplitsRequest.getCatalogName(); Set splits = new HashSet<>(); @@ -185,7 +185,7 @@ public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsReq Split split = Split.newBuilder(makeSpillLocation(getSplitsRequest), makeEncryptionKey()).build(); splits.add(split); - logger.info("doGetSplits: exit - " + splits.size()); + LOGGER.info("doGetSplits: exit - " + splits.size()); return new GetSplitsResponse(catalogName, splits); // LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName()); // int partitionContd = decodeContinuationToken(getSplitsRequest); From 9c1ac87fcac16f3a80eef21bdbd22c56f58da0fa Mon Sep 17 00:00:00 2001 From: Alexis Darnat Date: Wed, 7 Jul 2021 11:28:45 -0700 Subject: [PATCH 24/31] Add logger --- .../connectors/athena/jdbc/manager/JdbcMetadataHandler.java | 1 + .../athena/jdbc/snowflake/SnowflakeMetadataHandler.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java index 074a8c0482..d13f039081 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java @@ -235,6 +235,7 @@ private Schema getSchema(Connection jdbcConnection, TableName tableName, Schema resultSet.getInt("COLUMN_SIZE"), resultSet.getInt("DECIMAL_DIGITS")); String columnName = resultSet.getString("COLUMN_NAME"); + LOGGER.info("[SNOWFLAKE DEBUG] catalogName: {}, columnType: {}", columnName, columnType.getClass().getSimpleName()); if (columnType != null && SupportedTypes.isSupported(columnType)) { if (columnType instanceof ArrowType.List) { schemaBuilder.addListField(columnName, getArrayArrowTypeFromTypeName( diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java index 89d312aefa..701324d49d 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java @@ -103,7 +103,7 @@ protected ResultSet getColumns(final String catalogName, final TableName tableHa throws SQLException { String escape = metadata.getSearchStringEscape(); - LOGGER.warn( + LOGGER.info( "[SNOWFLAKE DEBUG] catalogName: {}, SchemaName: {}, TableName: {}", catalogName, escapeNamePattern(tableHandle.getSchemaName(), escape), From 71a47be8c01044d7f47d8526f82fd7f17170e36d Mon Sep 17 00:00:00 2001 From: Alexis Darnat Date: Wed, 7 Jul 2021 11:36:07 -0700 Subject: [PATCH 25/31] Add logger --- .../connectors/athena/jdbc/manager/JdbcMetadataHandler.java | 2 +- .../athena/jdbc/snowflake/SnowflakeMetadataHandler.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java index d13f039081..682f9577e2 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java @@ -235,7 +235,7 @@ private Schema getSchema(Connection jdbcConnection, TableName tableName, Schema resultSet.getInt("COLUMN_SIZE"), resultSet.getInt("DECIMAL_DIGITS")); String columnName = resultSet.getString("COLUMN_NAME"); - LOGGER.info("[SNOWFLAKE DEBUG] catalogName: {}, columnType: {}", columnName, columnType.getClass().getSimpleName()); + LOGGER.warn("[SNOWFLAKE DEBUG] catalogName: {}", columnName); if (columnType != null && SupportedTypes.isSupported(columnType)) { if (columnType instanceof ArrowType.List) { schemaBuilder.addListField(columnName, getArrayArrowTypeFromTypeName( diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java index 701324d49d..89d312aefa 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java @@ -103,7 +103,7 @@ protected ResultSet getColumns(final String catalogName, final TableName tableHa throws SQLException { String escape = metadata.getSearchStringEscape(); - LOGGER.info( + LOGGER.warn( "[SNOWFLAKE DEBUG] catalogName: {}, SchemaName: {}, TableName: {}", catalogName, escapeNamePattern(tableHandle.getSchemaName(), escape), From c8596b9a6561b8474e257e9bb753dce421956704 Mon Sep 17 00:00:00 2001 From: Alexis Darnat Date: Wed, 7 Jul 2021 13:50:00 -0700 Subject: [PATCH 26/31] Change partition --- .../athena/jdbc/manager/JDBCUtil.java | 4 +-- .../snowflake/SnowflakeMetadataHandler.java | 25 +++++++++---------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java index 892b1ce12d..70a4dd1ce9 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java @@ -150,11 +150,11 @@ public static Map createJdbcRecordHandlerMap(final Ma private static JdbcRecordHandler createJdbcRecordHandler(final DatabaseConnectionConfig databaseConnectionConfig) { switch (databaseConnectionConfig.getType()) { - case SNOWFLAKE: case MYSQL: return new MySqlRecordHandler(databaseConnectionConfig); - case REDSHIFT: case POSTGRES: + case REDSHIFT: + case SNOWFLAKE: return new PostGreSqlRecordHandler(databaseConnectionConfig); default: throw new RuntimeException("Mux: Unhandled database engine " + databaseConnectionConfig.getType()); diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java index 89d312aefa..6525445cb4 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java @@ -58,17 +58,17 @@ public class SnowflakeMetadataHandler extends JdbcMetadataHandler { static final Map JDBC_PROPERTIES = ImmutableMap.of("databaseTerm", "SCHEMA"); - static final String GET_PARTITIONS_QUERY = "SELECT nmsp_child.nspname AS child_schema, child.relname AS child FROM pg_inherits JOIN pg_class parent " + - "ON pg_inherits.inhparent = parent.oid JOIN pg_class child ON pg_inherits.inhrelid = child.oid JOIN pg_namespace nmsp_parent " + - "ON nmsp_parent.oid = parent.relnamespace JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace where nmsp_parent.nspname = ? " + - "AND parent.relname = ?"; - static final String BLOCK_PARTITION_COLUMN_NAME = "partition_name"; - static final String BLOCK_PARTITION_SCHEMA_COLUMN_NAME = "partition_schema_name"; - static final String ALL_PARTITIONS = "*"; + // static final String GET_PARTITIONS_QUERY = "SELECT nmsp_child.nspname AS child_schema, child.relname AS child FROM pg_inherits JOIN pg_class parent " + + // "ON pg_inherits.inhparent = parent.oid JOIN pg_class child ON pg_inherits.inhrelid = child.oid JOIN pg_namespace nmsp_parent " + + // "ON nmsp_parent.oid = parent.relnamespace JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace where nmsp_parent.nspname = ? " + + // "AND parent.relname = ?"; + // static final String BLOCK_PARTITION_COLUMN_NAME = "partition_name"; + // static final String BLOCK_PARTITION_SCHEMA_COLUMN_NAME = "partition_schema_name"; + // static final String ALL_PARTITIONS = "*"; private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeMetadataHandler.class); - private static final String PARTITION_SCHEMA_NAME = "child_schema"; - private static final String PARTITION_NAME = "child"; - private static final int MAX_SPLITS_PER_REQUEST = 1000_000; + // private static final String PARTITION_SCHEMA_NAME = "child_schema"; + // private static final String PARTITION_NAME = "child"; + // private static final int MAX_SPLITS_PER_REQUEST = 1000_000; /** * Instantiates handler to be used by Lambda function directly. @@ -119,9 +119,8 @@ protected ResultSet getColumns(final String catalogName, final TableName tableHa @Override public Schema getPartitionSchema(final String catalogName) { - SchemaBuilder schemaBuilder = SchemaBuilder.newBuilder() - .addField(BLOCK_PARTITION_SCHEMA_COLUMN_NAME, Types.MinorType.VARCHAR.getType()) - .addField(BLOCK_PARTITION_COLUMN_NAME, Types.MinorType.VARCHAR.getType()); + LOGGER.warn("[SNOWFLAKE DEBUG] getPartitionSchema "); + SchemaBuilder schemaBuilder = SchemaBuilder.newBuilder(); return schemaBuilder.build(); } From 3ad61e14994234a0a78904cd5dc976bab4eebd92 Mon Sep 17 00:00:00 2001 From: Alexis Darnat Date: Wed, 7 Jul 2021 15:41:50 -0700 Subject: [PATCH 27/31] Fix Snowflake handlers --- .../athena/jdbc/manager/JDBCUtil.java | 4 +- .../SnowflakeQueryStringBuilder.java | 77 +++++++++++++++ .../snowflake/SnowflakeRecordHandler.java | 93 +++++++++++++++++++ 3 files changed, 173 insertions(+), 1 deletion(-) create mode 100644 athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeQueryStringBuilder.java create mode 100644 athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeRecordHandler.java diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java index 70a4dd1ce9..a6524f4a09 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JDBCUtil.java @@ -27,6 +27,7 @@ import com.amazonaws.connectors.athena.jdbc.postgresql.PostGreSqlMetadataHandler; import com.amazonaws.connectors.athena.jdbc.postgresql.PostGreSqlRecordHandler; import com.amazonaws.connectors.athena.jdbc.snowflake.SnowflakeMetadataHandler; +import com.amazonaws.connectors.athena.jdbc.snowflake.SnowflakeRecordHandler; import com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.Validate; @@ -154,8 +155,9 @@ private static JdbcRecordHandler createJdbcRecordHandler(final DatabaseConnectio return new MySqlRecordHandler(databaseConnectionConfig); case POSTGRES: case REDSHIFT: - case SNOWFLAKE: return new PostGreSqlRecordHandler(databaseConnectionConfig); + case SNOWFLAKE: + return new SnowflakeRecordHandler(databaseConnectionConfig); default: throw new RuntimeException("Mux: Unhandled database engine " + databaseConnectionConfig.getType()); } diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeQueryStringBuilder.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeQueryStringBuilder.java new file mode 100644 index 0000000000..2a60ea5a50 --- /dev/null +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeQueryStringBuilder.java @@ -0,0 +1,77 @@ +/*- + * #%L + * athena-jdbc + * %% + * Copyright (C) 2019 Amazon Web Services + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package com.amazonaws.connectors.athena.jdbc.snowflake; + +import com.amazonaws.athena.connector.lambda.domain.Split; +import com.amazonaws.connectors.athena.jdbc.manager.JdbcSplitQueryBuilder; +import com.google.common.base.Strings; + +import java.util.Collections; +import java.util.List; + +/** + * Extends {@link JdbcSplitQueryBuilder} and implements PostGreSql specific SQL clauses for split. + * + * PostGreSql partitions through child tables that can be used in a FROM clause. + */ +public class SnowflakeQueryStringBuilder + extends JdbcSplitQueryBuilder +{ + SnowflakeQueryStringBuilder(final String quoteCharacters) + { + super(quoteCharacters); + } + + @Override + protected String getFromClauseWithSplit(String catalog, String schema, String table, Split split) + { + StringBuilder tableName = new StringBuilder(); + if (!Strings.isNullOrEmpty(catalog)) { + tableName.append(quote(catalog)).append('.'); + } + if (!Strings.isNullOrEmpty(schema)) { + tableName.append(quote(schema)).append('.'); + } + tableName.append(quote(table)); + + return String.format(" FROM %s ", tableName); + + // String partitionSchemaName = split.getProperty(PostGreSqlMetadataHandler.BLOCK_PARTITION_SCHEMA_COLUMN_NAME); + // String partitionName = split.getProperty(PostGreSqlMetadataHandler.BLOCK_PARTITION_COLUMN_NAME); + + // if (PostGreSqlMetadataHandler.ALL_PARTITIONS.equals(partitionSchemaName) || PostGreSqlMetadataHandler.ALL_PARTITIONS.equals(partitionName)) { + // // No partitions + // return String.format(" FROM %s ", tableName); + // } + + // return String.format(" FROM %s.%s ", quote(partitionSchemaName), quote(partitionName)); + } + + @Override + protected List getPartitionWhereClauses(final Split split) + { + // if (split.getProperty(PostGreSqlMetadataHandler.BLOCK_PARTITION_SCHEMA_COLUMN_NAME).equals("*") + // && !split.getProperty(PostGreSqlMetadataHandler.BLOCK_PARTITION_COLUMN_NAME).equals("*")) { + // return Collections.singletonList(split.getProperty(PostGreSqlMetadataHandler.BLOCK_PARTITION_COLUMN_NAME)); + // } + + return Collections.emptyList(); + } +} diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeRecordHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeRecordHandler.java new file mode 100644 index 0000000000..06827fade9 --- /dev/null +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeRecordHandler.java @@ -0,0 +1,93 @@ +/*- + * #%L + * athena-jdbc + * %% + * Copyright (C) 2019 Amazon Web Services + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package com.amazonaws.connectors.athena.jdbc.snowflake; + +import com.amazonaws.athena.connector.lambda.domain.Split; +import com.amazonaws.athena.connector.lambda.domain.TableName; +import com.amazonaws.athena.connector.lambda.domain.predicate.Constraints; +import com.amazonaws.connectors.athena.jdbc.connection.DatabaseConnectionConfig; +import com.amazonaws.connectors.athena.jdbc.connection.GenericJdbcConnectionFactory; +import com.amazonaws.connectors.athena.jdbc.connection.JdbcConnectionFactory; +import com.amazonaws.connectors.athena.jdbc.manager.JDBCUtil; +import com.amazonaws.connectors.athena.jdbc.manager.JdbcRecordHandler; +import com.amazonaws.connectors.athena.jdbc.manager.JdbcSplitQueryBuilder; +import com.amazonaws.services.athena.AmazonAthena; +import com.amazonaws.services.athena.AmazonAthenaClientBuilder; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.secretsmanager.AWSSecretsManager; +import com.amazonaws.services.secretsmanager.AWSSecretsManagerClientBuilder; +import com.google.common.annotations.VisibleForTesting; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.commons.lang3.Validate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +public class SnowflakeRecordHandler + extends JdbcRecordHandler +{ + private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeRecordHandler.class); + + private static final int FETCH_SIZE = 1000; + + private final JdbcSplitQueryBuilder jdbcSplitQueryBuilder; + + private static final String SNOWFLAKE_QUOTE_CHARACTER = "\""; + + /** + * Instantiates handler to be used by Lambda function directly. + * + * Recommend using {@link com.amazonaws.connectors.athena.jdbc.MultiplexingJdbcCompositeHandler} instead. + */ + public SnowflakeRecordHandler() + { + this(JDBCUtil.getSingleDatabaseConfigFromEnv(JdbcConnectionFactory.DatabaseEngine.SNOWFLAKE)); + } + + public SnowflakeRecordHandler(final DatabaseConnectionConfig databaseConnectionConfig) + { + this(databaseConnectionConfig, AmazonS3ClientBuilder.defaultClient(), AWSSecretsManagerClientBuilder.defaultClient(), AmazonAthenaClientBuilder.defaultClient(), + new GenericJdbcConnectionFactory(databaseConnectionConfig, SnowflakeMetadataHandler.JDBC_PROPERTIES), new SnowflakeQueryStringBuilder(SNOWFLAKE_QUOTE_CHARACTER)); + } + + @VisibleForTesting + SnowflakeRecordHandler(final DatabaseConnectionConfig databaseConnectionConfig, final AmazonS3 amazonS3, final AWSSecretsManager secretsManager, + final AmazonAthena athena, final JdbcConnectionFactory jdbcConnectionFactory, final JdbcSplitQueryBuilder jdbcSplitQueryBuilder) + { + super(amazonS3, secretsManager, athena, databaseConnectionConfig, jdbcConnectionFactory); + this.jdbcSplitQueryBuilder = Validate.notNull(jdbcSplitQueryBuilder, "query builder must not be null"); + } + + @Override + public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split) + throws SQLException + { + PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + + // Disable fetching all rows. + preparedStatement.setFetchSize(FETCH_SIZE); + + return preparedStatement; + } +} From f789aea605785292a4a33bf50b55c4374e85a9ec Mon Sep 17 00:00:00 2001 From: Alexis Darnat Date: Wed, 7 Jul 2021 16:01:18 -0700 Subject: [PATCH 28/31] Uppercase everything --- .../athena/jdbc/snowflake/SnowflakeQueryStringBuilder.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeQueryStringBuilder.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeQueryStringBuilder.java index 2a60ea5a50..4ab78b179a 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeQueryStringBuilder.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeQueryStringBuilder.java @@ -44,12 +44,12 @@ protected String getFromClauseWithSplit(String catalog, String schema, String ta { StringBuilder tableName = new StringBuilder(); if (!Strings.isNullOrEmpty(catalog)) { - tableName.append(quote(catalog)).append('.'); + tableName.append(quote(catalog.toUpperCase())).append('.'); } if (!Strings.isNullOrEmpty(schema)) { - tableName.append(quote(schema)).append('.'); + tableName.append(quote(schema.toUpperCase())).append('.'); } - tableName.append(quote(table)); + tableName.append(quote(table.toUpperCase())); return String.format(" FROM %s ", tableName); From d9d9a92dbbab5b0e781d8094ebc44c6012ee4057 Mon Sep 17 00:00:00 2001 From: Alexis Darnat Date: Thu, 8 Jul 2021 11:24:10 -0700 Subject: [PATCH 29/31] Remove comments --- .../snowflake/SnowflakeMetadataHandler.java | 130 +----------------- .../SnowflakeQueryStringBuilder.java | 15 -- 2 files changed, 1 insertion(+), 144 deletions(-) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java index 6525445cb4..0420a11ab9 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java @@ -58,17 +58,7 @@ public class SnowflakeMetadataHandler extends JdbcMetadataHandler { static final Map JDBC_PROPERTIES = ImmutableMap.of("databaseTerm", "SCHEMA"); - // static final String GET_PARTITIONS_QUERY = "SELECT nmsp_child.nspname AS child_schema, child.relname AS child FROM pg_inherits JOIN pg_class parent " + - // "ON pg_inherits.inhparent = parent.oid JOIN pg_class child ON pg_inherits.inhrelid = child.oid JOIN pg_namespace nmsp_parent " + - // "ON nmsp_parent.oid = parent.relnamespace JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace where nmsp_parent.nspname = ? " + - // "AND parent.relname = ?"; - // static final String BLOCK_PARTITION_COLUMN_NAME = "partition_name"; - // static final String BLOCK_PARTITION_SCHEMA_COLUMN_NAME = "partition_schema_name"; - // static final String ALL_PARTITIONS = "*"; private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeMetadataHandler.class); - // private static final String PARTITION_SCHEMA_NAME = "child_schema"; - // private static final String PARTITION_NAME = "child"; - // private static final int MAX_SPLITS_PER_REQUEST = 1000_000; /** * Instantiates handler to be used by Lambda function directly. @@ -125,53 +115,7 @@ public Schema getPartitionSchema(final String catalogName) } @Override - public void getPartitions(final BlockWriter blockWriter, final GetTableLayoutRequest getTableLayoutRequest, QueryStatusChecker queryStatusChecker) - { - // blockWriter.writeRows((Block block, int rowNum) -> { - // block.setValue(BLOCK_PARTITION_SCHEMA_COLUMN_NAME, rowNum, ALL_PARTITIONS); - // block.setValue(BLOCK_PARTITION_COLUMN_NAME, rowNum, ALL_PARTITIONS); - // //we wrote 1 row so we return 1 - // return 1; - // }); - - // LOGGER.info("{}: Catalog {}, table {}", getTableLayoutRequest.getQueryId(), getTableLayoutRequest.getTableName().getSchemaName(), - // getTableLayoutRequest.getTableName().getTableName()); - // try (Connection connection = getJdbcConnectionFactory().getConnection(getCredentialProvider())) { - // List parameters = Arrays.asList(getTableLayoutRequest.getTableName().getSchemaName(), - // getTableLayoutRequest.getTableName().getTableName()); - // try (PreparedStatement preparedStatement = new PreparedStatementBuilder().withConnection(connection).withQuery(GET_PARTITIONS_QUERY).withParameters(parameters).build(); - // ResultSet resultSet = preparedStatement.executeQuery()) { - // // Return a single partition if no partitions defined - // if (!resultSet.next()) { - // blockWriter.writeRows((Block block, int rowNum) -> { - // block.setValue(BLOCK_PARTITION_SCHEMA_COLUMN_NAME, rowNum, ALL_PARTITIONS); - // block.setValue(BLOCK_PARTITION_COLUMN_NAME, rowNum, ALL_PARTITIONS); - // //we wrote 1 row so we return 1 - // return 1; - // }); - // } - // else { - // do { - // final String partitionSchemaName = resultSet.getString(PARTITION_SCHEMA_NAME); - // final String partitionName = resultSet.getString(PARTITION_NAME); - - // // 1. Returns all partitions of table, we are not supporting constraints push down to filter partitions. - // // 2. This API is not paginated, we could use order by and limit clause with offsets here. - // blockWriter.writeRows((Block block, int rowNum) -> { - // block.setValue(BLOCK_PARTITION_SCHEMA_COLUMN_NAME, rowNum, partitionSchemaName); - // block.setValue(BLOCK_PARTITION_COLUMN_NAME, rowNum, partitionName); - // //we wrote 1 row so we return 1 - // return 1; - // }); - // } - // while (resultSet.next()); - // } - // } - // } - // catch (SQLException sqlException) { - // throw new RuntimeException(sqlException.getErrorCode() + ": " + sqlException.getMessage(), sqlException); - // } - } + public void getPartitions(final BlockWriter blockWriter, final GetTableLayoutRequest getTableLayoutRequest, QueryStatusChecker queryStatusChecker) { } @Override public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest) @@ -186,78 +130,6 @@ public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsReq LOGGER.info("doGetSplits: exit - " + splits.size()); return new GetSplitsResponse(catalogName, splits); - // LOGGER.info("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName()); - // int partitionContd = decodeContinuationToken(getSplitsRequest); - // Set splits = new HashSet<>(); - // Block partitions = getSplitsRequest.getPartitions(); - - // boolean splitterUsed = false; - // if (partitions.getRowCount() == 1) { - // FieldReader partitionsSchemaFieldReader = partitions.getFieldReader(BLOCK_PARTITION_SCHEMA_COLUMN_NAME); - // partitionsSchemaFieldReader.setPosition(0); - // FieldReader partitionsFieldReader = partitions.getFieldReader(BLOCK_PARTITION_COLUMN_NAME); - // partitionsFieldReader.setPosition(0); - - // if (ALL_PARTITIONS.equals(partitionsSchemaFieldReader.readText().toString()) && ALL_PARTITIONS.equals(partitionsFieldReader.readText().toString())) { - // for (String splitClause : getSplitClauses(getSplitsRequest.getTableName())) { - // //Every split must have a unique location if we wish to spill to avoid failures - // SpillLocation spillLocation = makeSpillLocation(getSplitsRequest); - - // Split.Builder splitBuilder = Split.newBuilder(spillLocation, makeEncryptionKey()) - // .add(BLOCK_PARTITION_SCHEMA_COLUMN_NAME, String.valueOf(partitionsSchemaFieldReader.readText())) - // .add(BLOCK_PARTITION_COLUMN_NAME, String.valueOf(splitClause)); - - // splits.add(splitBuilder.build()); - - // if (splits.size() >= MAX_SPLITS_PER_REQUEST) { - // throw new RuntimeException("Max splits supported with splitter " + MAX_SPLITS_PER_REQUEST); - // } - - // splitterUsed = true; - // } - // } - // } - - // if (!splitterUsed) { - // for (int curPartition = partitionContd; curPartition < partitions.getRowCount(); curPartition++) { - // FieldReader partitionsSchemaFieldReader = partitions.getFieldReader(BLOCK_PARTITION_SCHEMA_COLUMN_NAME); - // partitionsSchemaFieldReader.setPosition(curPartition); - // FieldReader partitionsFieldReader = partitions.getFieldReader(BLOCK_PARTITION_COLUMN_NAME); - // partitionsFieldReader.setPosition(curPartition); - - // //Every split must have a unique location if we wish to spill to avoid failures - // SpillLocation spillLocation = makeSpillLocation(getSplitsRequest); - - // LOGGER.info("{}: Input partition is {}", getSplitsRequest.getQueryId(), String.valueOf(partitionsFieldReader.readText())); - // Split.Builder splitBuilder = Split.newBuilder(spillLocation, makeEncryptionKey()) - // .add(BLOCK_PARTITION_SCHEMA_COLUMN_NAME, String.valueOf(partitionsSchemaFieldReader.readText())) - // .add(BLOCK_PARTITION_COLUMN_NAME, String.valueOf(partitionsFieldReader.readText())); - - // splits.add(splitBuilder.build()); - - // if (splits.size() >= MAX_SPLITS_PER_REQUEST) { - // //We exceeded the number of split we want to return in a single request, return and provide a continuation token. - // return new GetSplitsResponse(getSplitsRequest.getCatalogName(), splits, encodeContinuationToken(curPartition + 1)); - // } - // } - // } - - // return new GetSplitsResponse(getSplitsRequest.getCatalogName(), splits, null); - } - - private int decodeContinuationToken(GetSplitsRequest request) - { - if (request.hasContinuationToken()) { - return Integer.valueOf(request.getContinuationToken()); - } - - //No continuation token present - return 0; - } - - private String encodeContinuationToken(int partition) - { - return String.valueOf(partition); } /** diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeQueryStringBuilder.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeQueryStringBuilder.java index 4ab78b179a..d670d28ef3 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeQueryStringBuilder.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeQueryStringBuilder.java @@ -52,26 +52,11 @@ protected String getFromClauseWithSplit(String catalog, String schema, String ta tableName.append(quote(table.toUpperCase())); return String.format(" FROM %s ", tableName); - - // String partitionSchemaName = split.getProperty(PostGreSqlMetadataHandler.BLOCK_PARTITION_SCHEMA_COLUMN_NAME); - // String partitionName = split.getProperty(PostGreSqlMetadataHandler.BLOCK_PARTITION_COLUMN_NAME); - - // if (PostGreSqlMetadataHandler.ALL_PARTITIONS.equals(partitionSchemaName) || PostGreSqlMetadataHandler.ALL_PARTITIONS.equals(partitionName)) { - // // No partitions - // return String.format(" FROM %s ", tableName); - // } - - // return String.format(" FROM %s.%s ", quote(partitionSchemaName), quote(partitionName)); } @Override protected List getPartitionWhereClauses(final Split split) { - // if (split.getProperty(PostGreSqlMetadataHandler.BLOCK_PARTITION_SCHEMA_COLUMN_NAME).equals("*") - // && !split.getProperty(PostGreSqlMetadataHandler.BLOCK_PARTITION_COLUMN_NAME).equals("*")) { - // return Collections.singletonList(split.getProperty(PostGreSqlMetadataHandler.BLOCK_PARTITION_COLUMN_NAME)); - // } - return Collections.emptyList(); } } From f04497b6a86b0400c29c78dd8d03b5ccf19ce209 Mon Sep 17 00:00:00 2001 From: Alexis Darnat Date: Thu, 8 Jul 2021 11:34:23 -0700 Subject: [PATCH 30/31] Fix lib --- .../athena/jdbc/snowflake/SnowflakeMetadataHandler.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java index 0420a11ab9..e388ccc70a 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java @@ -115,7 +115,8 @@ public Schema getPartitionSchema(final String catalogName) } @Override - public void getPartitions(final BlockWriter blockWriter, final GetTableLayoutRequest getTableLayoutRequest, QueryStatusChecker queryStatusChecker) { } + public void getPartitions(final BlockWriter blockWriter, final GetTableLayoutRequest getTableLayoutRequest, QueryStatusChecker queryStatusChecker) + { } @Override public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest) From 14ad8d03f2d8197a716deb8a6fdab111f271ca46 Mon Sep 17 00:00:00 2001 From: Alexis Darnat Date: Tue, 13 Jul 2021 17:54:37 -0700 Subject: [PATCH 31/31] Fix + Finished Specs --- .../jdbc/manager/JdbcMetadataHandler.java | 1 - .../snowflake/SnowflakeMetadataHandler.java | 46 +--- .../SnowflakeMetadataHandlerTest.java | 232 ++++++++++++++++++ .../snowflake/SnowflakeRecordHandlerTest.java | 218 ++++++++++++++++ 4 files changed, 451 insertions(+), 46 deletions(-) create mode 100644 athena-jdbc/src/test/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandlerTest.java create mode 100644 athena-jdbc/src/test/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeRecordHandlerTest.java diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java index 682f9577e2..074a8c0482 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/manager/JdbcMetadataHandler.java @@ -235,7 +235,6 @@ private Schema getSchema(Connection jdbcConnection, TableName tableName, Schema resultSet.getInt("COLUMN_SIZE"), resultSet.getInt("DECIMAL_DIGITS")); String columnName = resultSet.getString("COLUMN_NAME"); - LOGGER.warn("[SNOWFLAKE DEBUG] catalogName: {}", columnName); if (columnType != null && SupportedTypes.isSupported(columnType)) { if (columnType instanceof ArrowType.List) { schemaBuilder.addListField(columnName, getArrayArrowTypeFromTypeName( diff --git a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java index e388ccc70a..14e9e2231b 100644 --- a/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java +++ b/athena-jdbc/src/main/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandler.java @@ -37,8 +37,6 @@ import com.amazonaws.services.secretsmanager.AWSSecretsManager; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; -import org.apache.arrow.vector.types.Types; -import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,8 +49,7 @@ import java.util.Set; /** - * Handles metadata for Snowflake. User must have access to `schemata`, `tables`, `columns`, `partitions` tables in - * information_schema. + * Handles metadata for Snowflake. */ public class SnowflakeMetadataHandler extends JdbcMetadataHandler @@ -93,12 +90,6 @@ protected ResultSet getColumns(final String catalogName, final TableName tableHa throws SQLException { String escape = metadata.getSearchStringEscape(); - LOGGER.warn( - "[SNOWFLAKE DEBUG] catalogName: {}, SchemaName: {}, TableName: {}", - catalogName, - escapeNamePattern(tableHandle.getSchemaName(), escape), - escapeNamePattern(tableHandle.getTableName(), escape) - ); return metadata.getColumns( catalogName, escapeNamePattern(tableHandle.getSchemaName(), escape), @@ -109,7 +100,6 @@ protected ResultSet getColumns(final String catalogName, final TableName tableHa @Override public Schema getPartitionSchema(final String catalogName) { - LOGGER.warn("[SNOWFLAKE DEBUG] getPartitionSchema "); SchemaBuilder schemaBuilder = SchemaBuilder.newBuilder(); return schemaBuilder.build(); } @@ -132,38 +122,4 @@ public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsReq LOGGER.info("doGetSplits: exit - " + splits.size()); return new GetSplitsResponse(catalogName, splits); } - - /** - * Converts an ARRAY column's TYPE_NAME (provided by the jdbc metadata) to an ArrowType. - * @param typeName The column's TYPE_NAME (e.g. _int4, _text, _float8, etc...) - * @param precision Used for BigDecimal ArrowType - * @param scale Used for BigDecimal ArrowType - * @return ArrowType equivalent of the fieldType. - */ - @Override - protected ArrowType getArrayArrowTypeFromTypeName(String typeName, int precision, int scale) - { - switch(typeName) { - case "_bool": - return Types.MinorType.BIT.getType(); - case "_int2": - return Types.MinorType.SMALLINT.getType(); - case "_int4": - return Types.MinorType.INT.getType(); - case "_int8": - return Types.MinorType.BIGINT.getType(); - case "_float4": - return Types.MinorType.FLOAT4.getType(); - case "_float8": - return Types.MinorType.FLOAT8.getType(); - case "_date": - return Types.MinorType.DATEDAY.getType(); - case "_timestamp": - return Types.MinorType.DATEMILLI.getType(); - case "_numeric": - return new ArrowType.Decimal(precision, scale); - default: - return super.getArrayArrowTypeFromTypeName(typeName, precision, scale); - } - } } diff --git a/athena-jdbc/src/test/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandlerTest.java b/athena-jdbc/src/test/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandlerTest.java new file mode 100644 index 0000000000..e7e75d3630 --- /dev/null +++ b/athena-jdbc/src/test/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeMetadataHandlerTest.java @@ -0,0 +1,232 @@ +/*- + * #%L + * athena-jdbc + * %% + * Copyright (C) 2019 Amazon Web Services + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package com.amazonaws.connectors.athena.jdbc.snowflake; + +import com.amazonaws.athena.connector.lambda.data.BlockAllocator; +import com.amazonaws.athena.connector.lambda.data.BlockAllocatorImpl; +import com.amazonaws.athena.connector.lambda.data.BlockUtils; +import com.amazonaws.athena.connector.lambda.data.FieldBuilder; +import com.amazonaws.athena.connector.lambda.data.SchemaBuilder; +import com.amazonaws.athena.connector.lambda.domain.Split; +import com.amazonaws.athena.connector.lambda.domain.TableName; +import com.amazonaws.athena.connector.lambda.domain.predicate.Constraints; +import com.amazonaws.athena.connector.lambda.metadata.GetSplitsRequest; +import com.amazonaws.athena.connector.lambda.metadata.GetSplitsResponse; +import com.amazonaws.athena.connector.lambda.metadata.GetTableLayoutRequest; +import com.amazonaws.athena.connector.lambda.metadata.GetTableLayoutResponse; +import com.amazonaws.athena.connector.lambda.metadata.GetTableRequest; +import com.amazonaws.athena.connector.lambda.metadata.GetTableResponse; +import com.amazonaws.athena.connector.lambda.security.FederatedIdentity; +import com.amazonaws.connectors.athena.jdbc.TestBase; +import com.amazonaws.connectors.athena.jdbc.connection.DatabaseConnectionConfig; +import com.amazonaws.connectors.athena.jdbc.connection.JdbcConnectionFactory; +import com.amazonaws.connectors.athena.jdbc.connection.JdbcCredentialProvider; +import com.amazonaws.services.athena.AmazonAthena; +import com.amazonaws.services.secretsmanager.AWSSecretsManager; +import com.amazonaws.services.secretsmanager.model.GetSecretValueRequest; +import com.amazonaws.services.secretsmanager.model.GetSecretValueResult; +import com.google.common.collect.ImmutableMap; +import org.apache.arrow.vector.complex.reader.FieldReader; +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +public class SnowflakeMetadataHandlerTest + extends TestBase +{ + private static final Logger logger = LoggerFactory.getLogger(SnowflakeMetadataHandlerTest.class); + + private DatabaseConnectionConfig databaseConnectionConfig = new DatabaseConnectionConfig("TESTCATALOG", JdbcConnectionFactory.DatabaseEngine.SNOWFLAKE, + "snowflake://jdbc:snowflake://hostname/user=A&password=B"); + private SnowflakeMetadataHandler snowflakeMetadataHandler; + private JdbcConnectionFactory jdbcConnectionFactory; + private Connection connection; + private FederatedIdentity federatedIdentity; + private AWSSecretsManager secretsManager; + private AmazonAthena athena; + + @Before + public void setup() + { + this.jdbcConnectionFactory = Mockito.mock(JdbcConnectionFactory.class); + this.connection = Mockito.mock(Connection.class, Mockito.RETURNS_DEEP_STUBS); + Mockito.when(this.jdbcConnectionFactory.getConnection(Mockito.any(JdbcCredentialProvider.class))).thenReturn(this.connection); + this.secretsManager = Mockito.mock(AWSSecretsManager.class); + Mockito.when(this.secretsManager.getSecretValue(Mockito.eq(new GetSecretValueRequest().withSecretId("testSecret")))).thenReturn(new GetSecretValueResult().withSecretString("{\"username\": \"testUser\", \"password\": \"testPassword\"}")); + this.snowflakeMetadataHandler = new SnowflakeMetadataHandler(databaseConnectionConfig, this.secretsManager, this.athena, this.jdbcConnectionFactory); + this.federatedIdentity = Mockito.mock(FederatedIdentity.class); + } + + @Test + public void getPartitionSchema() + { + Assert.assertEquals(SchemaBuilder.newBuilder().build(), + this.snowflakeMetadataHandler.getPartitionSchema("TESTCATALOGNAME")); + } + + @Test + public void doGetTableLayout() + throws Exception + { + BlockAllocator blockAllocator = new BlockAllocatorImpl(); + Constraints constraints = Mockito.mock(Constraints.class); + TableName tableName = new TableName("TESTSCHEMA", "TESTTABLE"); + Schema partitionSchema = this.snowflakeMetadataHandler.getPartitionSchema("TESTCATALOGNAME"); + Set partitionCols = partitionSchema.getFields().stream().map(Field::getName).collect(Collectors.toSet()); + GetTableLayoutRequest getTableLayoutRequest = new GetTableLayoutRequest(this.federatedIdentity, "testQueryId", "TESTCATALOGNAME", tableName, constraints, partitionSchema, partitionCols); + + GetTableLayoutResponse getTableLayoutResponse = this.snowflakeMetadataHandler.doGetTableLayout(blockAllocator, getTableLayoutRequest); + + Assert.assertEquals(1, getTableLayoutResponse.getPartitions().getRowCount()); + + List expectedValues = new ArrayList<>(); + for (int i = 0; i < getTableLayoutResponse.getPartitions().getRowCount(); i++) { + expectedValues.add(BlockUtils.rowToString(getTableLayoutResponse.getPartitions(), i)); + } + Assert.assertEquals(expectedValues, Arrays.asList("[partitionId : 1]")); + + SchemaBuilder expectedSchemaBuilder = SchemaBuilder.newBuilder(); + expectedSchemaBuilder.addField(FieldBuilder.newBuilder("partitionId", org.apache.arrow.vector.types.Types.MinorType.INT.getType()).build()); + + Schema expectedSchema = expectedSchemaBuilder.build(); + Assert.assertEquals(expectedSchema, getTableLayoutResponse.getPartitions().getSchema()); + Assert.assertEquals(tableName, getTableLayoutResponse.getTableName()); + } + + @Test + public void doGetTableLayoutWithNoPartitions() + throws Exception + { + BlockAllocator blockAllocator = new BlockAllocatorImpl(); + Constraints constraints = Mockito.mock(Constraints.class); + TableName tableName = new TableName("TESTSCHEMA", "TESTTABLE"); + Schema partitionSchema = this.snowflakeMetadataHandler.getPartitionSchema("TESTCATALOGNAME"); + Set partitionCols = partitionSchema.getFields().stream().map(Field::getName).collect(Collectors.toSet()); + GetTableLayoutRequest getTableLayoutRequest = new GetTableLayoutRequest(this.federatedIdentity, "testQueryId", "TESTCATALOGNAME", tableName, constraints, partitionSchema, partitionCols); + + GetTableLayoutResponse getTableLayoutResponse = this.snowflakeMetadataHandler.doGetTableLayout(blockAllocator, getTableLayoutRequest); + + Assert.assertEquals(1, getTableLayoutResponse.getPartitions().getRowCount()); + + List expectedValues = new ArrayList<>(); + for (int i = 0; i < getTableLayoutResponse.getPartitions().getRowCount(); i++) { + expectedValues.add(BlockUtils.rowToString(getTableLayoutResponse.getPartitions(), i)); + } + Assert.assertEquals(expectedValues, Collections.singletonList("[partitionId : 1]")); + + SchemaBuilder expectedSchemaBuilder = SchemaBuilder.newBuilder(); + expectedSchemaBuilder.addField(FieldBuilder.newBuilder("partitionId", org.apache.arrow.vector.types.Types.MinorType.INT.getType()).build()); + + Schema expectedSchema = expectedSchemaBuilder.build(); + Assert.assertEquals(expectedSchema, getTableLayoutResponse.getPartitions().getSchema()); + Assert.assertEquals(tableName, getTableLayoutResponse.getTableName()); + } + + @Test(expected = RuntimeException.class) + public void doGetTableLayoutWithSQLException() + throws Exception + { + Constraints constraints = Mockito.mock(Constraints.class); + TableName tableName = new TableName("TESTSCHEMA", "TESTTABLE"); + Schema partitionSchema = this.snowflakeMetadataHandler.getPartitionSchema("TESTCATALOGNAME"); + Set partitionCols = partitionSchema.getFields().stream().map(Field::getName).collect(Collectors.toSet()); + GetTableLayoutRequest getTableLayoutRequest = new GetTableLayoutRequest(this.federatedIdentity, "testQueryId", "TESTCATALOGNAME", tableName, constraints, partitionSchema, partitionCols); + + Connection connection = Mockito.mock(Connection.class, Mockito.RETURNS_DEEP_STUBS); + JdbcConnectionFactory jdbcConnectionFactory = Mockito.mock(JdbcConnectionFactory.class); + Mockito.when(jdbcConnectionFactory.getConnection(Mockito.any(JdbcCredentialProvider.class))).thenReturn(connection); + Mockito.when(connection.getMetaData().getSearchStringEscape()).thenThrow(new SQLException()); + SnowflakeMetadataHandler snowflakeMetadataHandler = new SnowflakeMetadataHandler(databaseConnectionConfig, this.secretsManager, this.athena, jdbcConnectionFactory); + + snowflakeMetadataHandler.doGetTableLayout(Mockito.mock(BlockAllocator.class), getTableLayoutRequest); + } + + @Test + public void doGetSplits() + throws Exception + { + BlockAllocator blockAllocator = new BlockAllocatorImpl(); + Constraints constraints = Mockito.mock(Constraints.class); + TableName tableName = new TableName("TESTSCHEMA", "TESTTABLE"); + Schema partitionSchema = this.snowflakeMetadataHandler.getPartitionSchema("TESTCATALOGNAME"); + Set partitionCols = partitionSchema.getFields().stream().map(Field::getName).collect(Collectors.toSet()); + GetTableLayoutRequest getTableLayoutRequest = new GetTableLayoutRequest(this.federatedIdentity, "testQueryId", "TESTCATALOGNAME", tableName, constraints, partitionSchema, partitionCols); + + GetTableLayoutResponse getTableLayoutResponse = this.snowflakeMetadataHandler.doGetTableLayout(blockAllocator, getTableLayoutRequest); + + BlockAllocator splitBlockAllocator = new BlockAllocatorImpl(); + GetSplitsRequest getSplitsRequest = new GetSplitsRequest(this.federatedIdentity, "testQueryId", "TESTCATALOGNAME", tableName, getTableLayoutResponse.getPartitions(), new ArrayList<>(partitionCols), constraints, null); + GetSplitsResponse getSplitsResponse = this.snowflakeMetadataHandler.doGetSplits(splitBlockAllocator, getSplitsRequest); + + Set> expectedSplits = new HashSet<>(); + expectedSplits.add(ImmutableMap.of()); + + Assert.assertEquals(expectedSplits.size(), getSplitsResponse.getSplits().size()); + Set> actualSplits = getSplitsResponse.getSplits().stream().map(Split::getProperties).collect(Collectors.toSet()); + Assert.assertEquals(expectedSplits, actualSplits); + } + + @Test + public void doGetSplitsContinuation() + throws Exception + { + BlockAllocator blockAllocator = new BlockAllocatorImpl(); + Constraints constraints = Mockito.mock(Constraints.class); + TableName tableName = new TableName("TESTSCHEMA", "TESTTABLE"); + Schema partitionSchema = this.snowflakeMetadataHandler.getPartitionSchema("TESTCATALOGNAME"); + Set partitionCols = partitionSchema.getFields().stream().map(Field::getName).collect(Collectors.toSet()); + GetTableLayoutRequest getTableLayoutRequest = new GetTableLayoutRequest(this.federatedIdentity, "testQueryId", "TESTCATALOGNAME", tableName, constraints, partitionSchema, partitionCols); + + GetTableLayoutResponse getTableLayoutResponse = this.snowflakeMetadataHandler.doGetTableLayout(blockAllocator, getTableLayoutRequest); + + BlockAllocator splitBlockAllocator = new BlockAllocatorImpl(); + GetSplitsRequest getSplitsRequest = new GetSplitsRequest(this.federatedIdentity, "testQueryId", "TESTCATALOGNAME", tableName, getTableLayoutResponse.getPartitions(), new ArrayList<>(partitionCols), constraints, "1"); + GetSplitsResponse getSplitsResponse = this.snowflakeMetadataHandler.doGetSplits(splitBlockAllocator, getSplitsRequest); + + Set> expectedSplits = new HashSet<>(); + expectedSplits.add(ImmutableMap.of()); + Assert.assertEquals(expectedSplits.size(), getSplitsResponse.getSplits().size()); + Set> actualSplits = getSplitsResponse.getSplits().stream().map(Split::getProperties).collect(Collectors.toSet()); + Assert.assertEquals(expectedSplits, actualSplits); + } +} diff --git a/athena-jdbc/src/test/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeRecordHandlerTest.java b/athena-jdbc/src/test/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeRecordHandlerTest.java new file mode 100644 index 0000000000..51170d42f6 --- /dev/null +++ b/athena-jdbc/src/test/java/com/amazonaws/connectors/athena/jdbc/snowflake/SnowflakeRecordHandlerTest.java @@ -0,0 +1,218 @@ +/*- + * #%L + * athena-jdbc + * %% + * Copyright (C) 2019 Amazon Web Services + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package com.amazonaws.connectors.athena.jdbc.snowflake; + +import com.amazonaws.athena.connector.lambda.data.FieldBuilder; +import com.amazonaws.athena.connector.lambda.data.SchemaBuilder; +import com.amazonaws.athena.connector.lambda.domain.Split; +import com.amazonaws.athena.connector.lambda.domain.TableName; +import com.amazonaws.athena.connector.lambda.domain.predicate.Constraints; +import com.amazonaws.athena.connector.lambda.domain.predicate.Marker; +import com.amazonaws.athena.connector.lambda.domain.predicate.Range; +import com.amazonaws.athena.connector.lambda.domain.predicate.SortedRangeSet; +import com.amazonaws.athena.connector.lambda.domain.predicate.ValueSet; +import com.amazonaws.connectors.athena.jdbc.TestBase; +import com.amazonaws.connectors.athena.jdbc.connection.DatabaseConnectionConfig; +import com.amazonaws.connectors.athena.jdbc.connection.JdbcConnectionFactory; +import com.amazonaws.connectors.athena.jdbc.connection.JdbcCredentialProvider; +import com.amazonaws.connectors.athena.jdbc.manager.JdbcSplitQueryBuilder; +import com.amazonaws.services.athena.AmazonAthena; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.secretsmanager.AWSSecretsManager; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +public class SnowflakeRecordHandlerTest extends TestBase +{ + private static final Logger logger = LoggerFactory.getLogger(SnowflakeRecordHandlerTest.class); + + private SnowflakeRecordHandler snowflakeRecordHandler; + private Connection connection; + private JdbcConnectionFactory jdbcConnectionFactory; + private JdbcSplitQueryBuilder jdbcSplitQueryBuilder; + private AmazonS3 amazonS3; + private AWSSecretsManager secretsManager; + private AmazonAthena athena; + + @Before + public void setup() + { + this.amazonS3 = Mockito.mock(AmazonS3.class); + this.secretsManager = Mockito.mock(AWSSecretsManager.class); + this.athena = Mockito.mock(AmazonAthena.class); + this.connection = Mockito.mock(Connection.class); + this.jdbcConnectionFactory = Mockito.mock(JdbcConnectionFactory.class); + Mockito.when(this.jdbcConnectionFactory.getConnection(Mockito.mock(JdbcCredentialProvider.class))).thenReturn(this.connection); + jdbcSplitQueryBuilder = new SnowflakeQueryStringBuilder("\""); + final DatabaseConnectionConfig databaseConnectionConfig = new DatabaseConnectionConfig("TESTCATALOG", JdbcConnectionFactory.DatabaseEngine.SNOWFLAKE, + "snowflake://jdbc:snowflake://hostname/user=A&password=B"); + + this.snowflakeRecordHandler = new SnowflakeRecordHandler(databaseConnectionConfig, amazonS3, secretsManager, athena, jdbcConnectionFactory, jdbcSplitQueryBuilder); + } + + @Test + public void buildSplitSqlTest() + throws SQLException + { + logger.info("buildSplitSqlTest - enter"); + + TableName tableName = new TableName("TESTSCHEMA", "TESTTABLE"); + + SchemaBuilder schemaBuilder = SchemaBuilder.newBuilder(); + schemaBuilder.addField(FieldBuilder.newBuilder("TESTCOL1", Types.MinorType.INT.getType()).build()); + schemaBuilder.addField(FieldBuilder.newBuilder("TESTCOL2", Types.MinorType.VARCHAR.getType()).build()); + schemaBuilder.addField(FieldBuilder.newBuilder("TESTCOL3", Types.MinorType.BIGINT.getType()).build()); + schemaBuilder.addField(FieldBuilder.newBuilder("TESTCOL4", Types.MinorType.FLOAT4.getType()).build()); + schemaBuilder.addField(FieldBuilder.newBuilder("TESTCOL5", Types.MinorType.SMALLINT.getType()).build()); + schemaBuilder.addField(FieldBuilder.newBuilder("TESTCOL6", Types.MinorType.TINYINT.getType()).build()); + schemaBuilder.addField(FieldBuilder.newBuilder("TESTCOL7", Types.MinorType.FLOAT8.getType()).build()); + schemaBuilder.addField(FieldBuilder.newBuilder("TESTCOL8", Types.MinorType.BIT.getType()).build()); + schemaBuilder.addField(FieldBuilder.newBuilder("TESTCOL9", new ArrowType.Decimal(8, 2)).build()); + Schema schema = schemaBuilder.build(); + + Split split = Mockito.mock(Split.class); + Mockito.when(split.getProperties()).thenReturn(ImmutableMap.of()); + + Range range1a = Mockito.mock(Range.class, Mockito.RETURNS_DEEP_STUBS); + Mockito.when(range1a.isSingleValue()).thenReturn(true); + Mockito.when(range1a.getLow().getValue()).thenReturn(1); + Range range1b = Mockito.mock(Range.class, Mockito.RETURNS_DEEP_STUBS); + Mockito.when(range1b.isSingleValue()).thenReturn(true); + Mockito.when(range1b.getLow().getValue()).thenReturn(2); + ValueSet valueSet1 = Mockito.mock(SortedRangeSet.class, Mockito.RETURNS_DEEP_STUBS); + Mockito.when(valueSet1.getRanges().getOrderedRanges()).thenReturn(ImmutableList.of(range1a, range1b)); + + ValueSet valueSet2 = getRangeSet(Marker.Bound.EXACTLY, "1", Marker.Bound.BELOW, "10"); + ValueSet valueSet3 = getRangeSet(Marker.Bound.ABOVE, 2L, Marker.Bound.EXACTLY, 20L); + ValueSet valueSet4 = getSingleValueSet(1.1F); + ValueSet valueSet5 = getSingleValueSet(1); + ValueSet valueSet6 = getSingleValueSet(0); + ValueSet valueSet7 = getSingleValueSet(1.2d); + ValueSet valueSet8 = getSingleValueSet(true); + ValueSet valueSet9 = getSingleValueSet(BigDecimal.valueOf(12.34)); + + Constraints constraints = Mockito.mock(Constraints.class); + Mockito.when(constraints.getSummary()).thenReturn(new ImmutableMap.Builder() + .put("TESTCOL1", valueSet1) + .put("TESTCOL2", valueSet2) + .put("TESTCOL3", valueSet3) + .put("TESTCOL4", valueSet4) + .put("TESTCOL5", valueSet5) + .put("TESTCOL6", valueSet6) + .put("TESTCOL7", valueSet7) + .put("TESTCOL8", valueSet8) + .put("TESTCOL9", valueSet9) + .build()); + + String expectedSql = "SELECT \"TESTCOL1\", \"TESTCOL2\", \"TESTCOL3\", \"TESTCOL4\", \"TESTCOL5\", \"TESTCOL6\", \"TESTCOL7\", \"TESTCOL8\", \"TESTCOL9\" FROM \"TESTSCHEMA\".\"TESTTABLE\" WHERE (\"TESTCOL1\" IN (?,?)) AND ((\"TESTCOL2\" >= ? AND \"TESTCOL2\" < ?)) AND ((\"TESTCOL3\" > ? AND \"TESTCOL3\" <= ?)) AND (\"TESTCOL4\" = ?) AND (\"TESTCOL5\" = ?) AND (\"TESTCOL6\" = ?) AND (\"TESTCOL7\" = ?) AND (\"TESTCOL8\" = ?) AND (\"TESTCOL9\" = ?)"; + PreparedStatement expectedPreparedStatement = Mockito.mock(PreparedStatement.class); + Mockito.when(this.connection.prepareStatement(Mockito.eq(expectedSql))).thenReturn(expectedPreparedStatement); + + PreparedStatement preparedStatement = this.snowflakeRecordHandler.buildSplitSql(this.connection, "TESTCATALOGNAME", tableName, schema, constraints, split); + + Assert.assertEquals(expectedPreparedStatement, preparedStatement); + Mockito.verify(preparedStatement, Mockito.times(1)).setInt(1, 1); + Mockito.verify(preparedStatement, Mockito.times(1)).setInt(2, 2); + Mockito.verify(preparedStatement, Mockito.times(1)).setString(3, "1"); + Mockito.verify(preparedStatement, Mockito.times(1)).setString(4, "10"); + Mockito.verify(preparedStatement, Mockito.times(1)).setLong(5, 2L); + Mockito.verify(preparedStatement, Mockito.times(1)).setLong(6, 20L); + Mockito.verify(preparedStatement, Mockito.times(1)).setFloat(7, 1.1F); + Mockito.verify(preparedStatement, Mockito.times(1)).setShort(8, (short) 1); + Mockito.verify(preparedStatement, Mockito.times(1)).setByte(9, (byte) 0); + Mockito.verify(preparedStatement, Mockito.times(1)).setDouble(10, 1.2d); + Mockito.verify(preparedStatement, Mockito.times(1)).setBoolean(11, true); + Mockito.verify(preparedStatement, Mockito.times(1)).setBigDecimal(12, BigDecimal.valueOf(12.34)); + + logger.info("buildSplitSqlTest - exit"); + } + + @Test + public void buildSplitSqlForDateTest() + throws SQLException + { + logger.info("buildSplitSqlForDateTest - enter"); + + TableName tableName = new TableName("TESTSCHEMA", "TESTTABLE"); + + SchemaBuilder schemaBuilder = SchemaBuilder.newBuilder(); + schemaBuilder.addField(FieldBuilder.newBuilder("testDate", Types.MinorType.DATEDAY.getType()).build()); + Schema schema = schemaBuilder.build(); + + Split split = Mockito.mock(Split.class); + Mockito.when(split.getProperties()).thenReturn(ImmutableMap.of()); + + final long dateDays = TimeUnit.MILLISECONDS.toDays(Date.valueOf("2020-01-05").getTime()); + ValueSet valueSet = getSingleValueSet(dateDays); + + Constraints constraints = Mockito.mock(Constraints.class); + Mockito.when(constraints.getSummary()).thenReturn(Collections.singletonMap("testDate", valueSet)); + + String expectedSql = "SELECT \"testDate\" FROM \"TESTSCHEMA\".\"TESTTABLE\" WHERE (\"testDate\" = ?)"; + PreparedStatement expectedPreparedStatement = Mockito.mock(PreparedStatement.class); + Mockito.when(this.connection.prepareStatement(Mockito.eq(expectedSql))).thenReturn(expectedPreparedStatement); + + PreparedStatement preparedStatement = this.snowflakeRecordHandler.buildSplitSql(this.connection, "TESTCATALOGNAME", tableName, schema, constraints, split); + + Assert.assertEquals(expectedPreparedStatement, preparedStatement); + Mockito.verify(preparedStatement, Mockito.times(1)) + .setDate(1, new Date(TimeUnit.DAYS.toMillis(dateDays))); + + logger.info("buildSplitSqlForDateTest - exit"); + } + + private ValueSet getSingleValueSet(Object value) { + Range range = Mockito.mock(Range.class, Mockito.RETURNS_DEEP_STUBS); + Mockito.when(range.isSingleValue()).thenReturn(true); + Mockito.when(range.getLow().getValue()).thenReturn(value); + ValueSet valueSet = Mockito.mock(SortedRangeSet.class, Mockito.RETURNS_DEEP_STUBS); + Mockito.when(valueSet.getRanges().getOrderedRanges()).thenReturn(Collections.singletonList(range)); + return valueSet; + } + + private ValueSet getRangeSet(Marker.Bound lowerBound, Object lowerValue, Marker.Bound upperBound, Object upperValue) { + Range range = Mockito.mock(Range.class, Mockito.RETURNS_DEEP_STUBS); + Mockito.when(range.isSingleValue()).thenReturn(false); + Mockito.when(range.getLow().getBound()).thenReturn(lowerBound); + Mockito.when(range.getLow().getValue()).thenReturn(lowerValue); + Mockito.when(range.getHigh().getBound()).thenReturn(upperBound); + Mockito.when(range.getHigh().getValue()).thenReturn(upperValue); + ValueSet valueSet = Mockito.mock(SortedRangeSet.class, Mockito.RETURNS_DEEP_STUBS); + Mockito.when(valueSet.getRanges().getOrderedRanges()).thenReturn(Collections.singletonList(range)); + return valueSet; + } +}