Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add snowflake JDBC Connector #454

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
5f2ac29
Add a Protected Getter on the JDBCFactory in JDBCRecordHandler, so ch…
rs-alexis May 22, 2021
5f85f9f
Add an intermediate constructor for more flexibility
rs-alexis May 22, 2021
9fd5019
Make MySqlQueryStringBuilder constructor public. so that piece of cod…
rs-alexis May 22, 2021
645a2cd
Add an intermediate constructor for more flexibility
rs-alexis May 22, 2021
347ac93
add snowflake drivers
rs-richardlau Jun 16, 2021
bf65adb
add missing case
rs-richardlau Jun 16, 2021
ed92b0b
Use MySQL Metadatahandler + Recordhandler for Snowflake
rs-alexis Jun 22, 2021
bfb54cf
add logger to ResultSet
rs-richardlau Jun 24, 2021
8577242
remove extra comma
rs-richardlau Jun 24, 2021
c521cb9
forgot semicolon
rs-richardlau Jun 25, 2021
0de25a0
use double quotes, not single
rs-richardlau Jun 25, 2021
28e31d7
constant LOGGER instead of class"
rs-richardlau Jun 25, 2021
0631334
try adding a snowflake metadata handler
rs-richardlau Jun 25, 2021
f6fd9f5
import snowflake handler
rs-richardlau Jun 25, 2021
116f929
try overriding ResultSet in child Snowflake
rs-richardlau Jun 25, 2021
8ddd852
add imports
rs-richardlau Jun 25, 2021
92fc4ae
try a different table name
rs-richardlau Jun 25, 2021
6f2b3a9
try to upcase strings
rs-richardlau Jun 25, 2021
68af094
Merge branch 'rl/ise-1728/add_snowflake_drivers' of https://github.co…
rs-alexis Jul 6, 2021
7cc03d2
Fix
rs-alexis Jul 6, 2021
2046bb6
No partition, no splits for now
rs-alexis Jul 6, 2021
f5141df
Fix
rs-alexis Jul 6, 2021
3756f11
Fix
rs-alexis Jul 6, 2021
2f5c3d1
Update lib
rs-alexis Jul 6, 2021
9c1ac87
Add logger
rs-alexis Jul 7, 2021
71a47be
Add logger
rs-alexis Jul 7, 2021
c8596b9
Change partition
rs-alexis Jul 7, 2021
3ad61e1
Fix Snowflake handlers
rs-alexis Jul 7, 2021
f789aea
Uppercase everything
rs-alexis Jul 7, 2021
d9d9a92
Remove comments
rs-alexis Jul 8, 2021
f04497b
Fix lib
rs-alexis Jul 8, 2021
14ad8d0
Fix + Finished Specs
rs-alexis Jul 14, 2021
adca384
Merge remote-tracking branch 'origin/master' into ad/ise-1728/add_sno…
rs-alexis Jul 14, 2021
e2ae729
Merge remote-tracking branch 'upstream/master' into ad/ise-1728/add_s…
rs-alexis Jul 14, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions athena-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-jdbc</artifactId>
<version>3.13.4</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>athena-federation-integ-test</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,17 @@ public class GenericJdbcConnectionFactory
private static final String REDSHIFT_DRIVER_CLASS = "com.amazon.redshift.jdbc.Driver";
private static final int REDSHIFT_DEFAULT_PORT = 5439;

private static final String SNOWFLAKE_DRIVER_CLASS = "net.snowflake.client.jdbc.SnowflakeDriver";
private static final int SNOWFLAKE_DEFAULT_PORT = 443;

private static final String SECRET_NAME_PATTERN_STRING = "(\\$\\{[a-zA-Z0-9:/_+=.@-]+})";
public static final Pattern SECRET_NAME_PATTERN = Pattern.compile(SECRET_NAME_PATTERN_STRING);

private static final ImmutableMap<DatabaseEngine, DatabaseConnectionInfo> CONNECTION_INFO = ImmutableMap.of(
DatabaseEngine.MYSQL, new DatabaseConnectionInfo(MYSQL_DRIVER_CLASS, MYSQL_DEFAULT_PORT),
DatabaseEngine.POSTGRES, new DatabaseConnectionInfo(POSTGRESQL_DRIVER_CLASS, POSTGRESQL_DEFAULT_PORT),
DatabaseEngine.REDSHIFT, new DatabaseConnectionInfo(REDSHIFT_DRIVER_CLASS, REDSHIFT_DEFAULT_PORT));
DatabaseEngine.REDSHIFT, new DatabaseConnectionInfo(REDSHIFT_DRIVER_CLASS, REDSHIFT_DEFAULT_PORT),
DatabaseEngine.SNOWFLAKE, new DatabaseConnectionInfo(SNOWFLAKE_DRIVER_CLASS, SNOWFLAKE_DEFAULT_PORT));

private final DatabaseConnectionConfig databaseConnectionConfig;
private final Properties jdbcProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -41,7 +41,8 @@ enum DatabaseEngine
{
MYSQL("mysql"),
POSTGRES("postgres"),
REDSHIFT("redshift");
REDSHIFT("redshift"),
SNOWFLAKE("snowflake");

private final String dbName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.amazonaws.connectors.athena.jdbc.mysql.MySqlRecordHandler;
import com.amazonaws.connectors.athena.jdbc.postgresql.PostGreSqlMetadataHandler;
import com.amazonaws.connectors.athena.jdbc.postgresql.PostGreSqlRecordHandler;
import com.amazonaws.connectors.athena.jdbc.snowflake.SnowflakeMetadataHandler;
import com.amazonaws.connectors.athena.jdbc.snowflake.SnowflakeRecordHandler;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.Validate;

Expand Down Expand Up @@ -100,6 +102,8 @@ private static JdbcMetadataHandler createJdbcMetadataHandler(final DatabaseConne
switch (databaseConnectionConfig.getType()) {
case MYSQL:
return new MySqlMetadataHandler(databaseConnectionConfig);
case SNOWFLAKE:
return new SnowflakeMetadataHandler(databaseConnectionConfig);
case REDSHIFT:
case POSTGRES:
return new PostGreSqlMetadataHandler(databaseConnectionConfig);
Expand Down Expand Up @@ -149,9 +153,11 @@ private static JdbcRecordHandler createJdbcRecordHandler(final DatabaseConnectio
switch (databaseConnectionConfig.getType()) {
case MYSQL:
return new MySqlRecordHandler(databaseConnectionConfig);
case REDSHIFT:
case POSTGRES:
case REDSHIFT:
return new PostGreSqlRecordHandler(databaseConnectionConfig);
case SNOWFLAKE:
return new SnowflakeRecordHandler(databaseConnectionConfig);
default:
throw new RuntimeException("Mux: Unhandled database engine " + databaseConnectionConfig.getType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ private Schema getSchema(Connection jdbcConnection, TableName tableName, Schema
}
}

private ResultSet getColumns(final String catalogName, final TableName tableHandle, final DatabaseMetaData metadata)
protected ResultSet getColumns(final String catalogName, final TableName tableHandle, final DatabaseMetaData metadata)
throws SQLException
{
String escape = metadata.getSearchStringEscape();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*-
* #%L
* athena-jdbc
* %%
* Copyright (C) 2019 Amazon Web Services
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package com.amazonaws.connectors.athena.jdbc.snowflake;

import com.amazonaws.athena.connector.lambda.QueryStatusChecker;
import com.amazonaws.athena.connector.lambda.data.BlockAllocator;
import com.amazonaws.athena.connector.lambda.data.BlockWriter;
import com.amazonaws.athena.connector.lambda.data.SchemaBuilder;
import com.amazonaws.athena.connector.lambda.domain.Split;
import com.amazonaws.athena.connector.lambda.domain.TableName;
import com.amazonaws.athena.connector.lambda.metadata.GetSplitsRequest;
import com.amazonaws.athena.connector.lambda.metadata.GetSplitsResponse;
import com.amazonaws.athena.connector.lambda.metadata.GetTableLayoutRequest;
import com.amazonaws.connectors.athena.jdbc.connection.DatabaseConnectionConfig;
import com.amazonaws.connectors.athena.jdbc.connection.GenericJdbcConnectionFactory;
import com.amazonaws.connectors.athena.jdbc.connection.JdbcConnectionFactory;
import com.amazonaws.connectors.athena.jdbc.manager.JDBCUtil;
import com.amazonaws.connectors.athena.jdbc.manager.JdbcMetadataHandler;
import com.amazonaws.services.athena.AmazonAthena;
import com.amazonaws.services.secretsmanager.AWSSecretsManager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.arrow.vector.types.pojo.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* Handles metadata for Snowflake.
*/
public class SnowflakeMetadataHandler
extends JdbcMetadataHandler
{
static final Map<String, String> JDBC_PROPERTIES = ImmutableMap.of("databaseTerm", "SCHEMA");
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeMetadataHandler.class);

/**
* Instantiates handler to be used by Lambda function directly.
*
* Recommend using {@link com.amazonaws.connectors.athena.jdbc.MultiplexingJdbcCompositeHandler} instead.
*/
public SnowflakeMetadataHandler()
{
this(JDBCUtil.getSingleDatabaseConfigFromEnv(JdbcConnectionFactory.DatabaseEngine.SNOWFLAKE));
}

public SnowflakeMetadataHandler(final DatabaseConnectionConfig databaseConnectionConfig)
{
super(databaseConnectionConfig, new GenericJdbcConnectionFactory(databaseConnectionConfig, JDBC_PROPERTIES));
}

@VisibleForTesting
protected SnowflakeMetadataHandler(final DatabaseConnectionConfig databaseConnectionConfig, final AWSSecretsManager secretsManager,
final AmazonAthena athena, final JdbcConnectionFactory jdbcConnectionFactory)
{
super(databaseConnectionConfig, secretsManager, athena, jdbcConnectionFactory);
}

@Override
protected String escapeNamePattern(final String name, final String escape)
{
return super.escapeNamePattern(name, escape).toUpperCase();
}

@Override
protected ResultSet getColumns(final String catalogName, final TableName tableHandle, final DatabaseMetaData metadata)
throws SQLException
{
String escape = metadata.getSearchStringEscape();
return metadata.getColumns(
catalogName,
escapeNamePattern(tableHandle.getSchemaName(), escape),
escapeNamePattern(tableHandle.getTableName(), escape),
null);
}

@Override
public Schema getPartitionSchema(final String catalogName)
{
SchemaBuilder schemaBuilder = SchemaBuilder.newBuilder();
return schemaBuilder.build();
}

@Override
public void getPartitions(final BlockWriter blockWriter, final GetTableLayoutRequest getTableLayoutRequest, QueryStatusChecker queryStatusChecker)
{ }

@Override
public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest)
{
LOGGER.info("doGetSplits: enter - " + getSplitsRequest);

String catalogName = getSplitsRequest.getCatalogName();
Set<Split> splits = new HashSet<>();

Split split = Split.newBuilder(makeSpillLocation(getSplitsRequest), makeEncryptionKey()).build();
splits.add(split);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does snowflake support any partitioning or parallelism that we can exploit to create multiple splits?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the big question, Snowflake supports micro-partitioning but not sure yet how that would fit in the structure of the code yet (https://docs.snowflake.com/en/user-guide/tables-clustering-keys.html)

I'm open to feedback on that


LOGGER.info("doGetSplits: exit - " + splits.size());
return new GetSplitsResponse(catalogName, splits);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*-
* #%L
* athena-jdbc
* %%
* Copyright (C) 2019 Amazon Web Services
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package com.amazonaws.connectors.athena.jdbc.snowflake;

import com.amazonaws.athena.connector.lambda.domain.Split;
import com.amazonaws.connectors.athena.jdbc.manager.JdbcSplitQueryBuilder;
import com.google.common.base.Strings;

import java.util.Collections;
import java.util.List;

/**
* Extends {@link JdbcSplitQueryBuilder} and implements PostGreSql specific SQL clauses for split.
*
* PostGreSql partitions through child tables that can be used in a FROM clause.
*/
public class SnowflakeQueryStringBuilder
extends JdbcSplitQueryBuilder
{
SnowflakeQueryStringBuilder(final String quoteCharacters)
{
super(quoteCharacters);
}

@Override
protected String getFromClauseWithSplit(String catalog, String schema, String table, Split split)
{
StringBuilder tableName = new StringBuilder();
if (!Strings.isNullOrEmpty(catalog)) {
tableName.append(quote(catalog.toUpperCase())).append('.');
}
if (!Strings.isNullOrEmpty(schema)) {
tableName.append(quote(schema.toUpperCase())).append('.');
}
tableName.append(quote(table.toUpperCase()));

return String.format(" FROM %s ", tableName);
}

@Override
protected List<String> getPartitionWhereClauses(final Split split)
{
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*-
* #%L
* athena-jdbc
* %%
* Copyright (C) 2019 Amazon Web Services
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package com.amazonaws.connectors.athena.jdbc.snowflake;

import com.amazonaws.athena.connector.lambda.domain.Split;
import com.amazonaws.athena.connector.lambda.domain.TableName;
import com.amazonaws.athena.connector.lambda.domain.predicate.Constraints;
import com.amazonaws.connectors.athena.jdbc.connection.DatabaseConnectionConfig;
import com.amazonaws.connectors.athena.jdbc.connection.GenericJdbcConnectionFactory;
import com.amazonaws.connectors.athena.jdbc.connection.JdbcConnectionFactory;
import com.amazonaws.connectors.athena.jdbc.manager.JDBCUtil;
import com.amazonaws.connectors.athena.jdbc.manager.JdbcRecordHandler;
import com.amazonaws.connectors.athena.jdbc.manager.JdbcSplitQueryBuilder;
import com.amazonaws.services.athena.AmazonAthena;
import com.amazonaws.services.athena.AmazonAthenaClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.secretsmanager.AWSSecretsManager;
import com.amazonaws.services.secretsmanager.AWSSecretsManagerClientBuilder;
import com.google.common.annotations.VisibleForTesting;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class SnowflakeRecordHandler
extends JdbcRecordHandler
{
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeRecordHandler.class);

private static final int FETCH_SIZE = 1000;

private final JdbcSplitQueryBuilder jdbcSplitQueryBuilder;

private static final String SNOWFLAKE_QUOTE_CHARACTER = "\"";

/**
* Instantiates handler to be used by Lambda function directly.
*
* Recommend using {@link com.amazonaws.connectors.athena.jdbc.MultiplexingJdbcCompositeHandler} instead.
*/
public SnowflakeRecordHandler()
{
this(JDBCUtil.getSingleDatabaseConfigFromEnv(JdbcConnectionFactory.DatabaseEngine.SNOWFLAKE));
}

public SnowflakeRecordHandler(final DatabaseConnectionConfig databaseConnectionConfig)
{
this(databaseConnectionConfig, AmazonS3ClientBuilder.defaultClient(), AWSSecretsManagerClientBuilder.defaultClient(), AmazonAthenaClientBuilder.defaultClient(),
new GenericJdbcConnectionFactory(databaseConnectionConfig, SnowflakeMetadataHandler.JDBC_PROPERTIES), new SnowflakeQueryStringBuilder(SNOWFLAKE_QUOTE_CHARACTER));
}

@VisibleForTesting
SnowflakeRecordHandler(final DatabaseConnectionConfig databaseConnectionConfig, final AmazonS3 amazonS3, final AWSSecretsManager secretsManager,
final AmazonAthena athena, final JdbcConnectionFactory jdbcConnectionFactory, final JdbcSplitQueryBuilder jdbcSplitQueryBuilder)
{
super(amazonS3, secretsManager, athena, databaseConnectionConfig, jdbcConnectionFactory);
this.jdbcSplitQueryBuilder = Validate.notNull(jdbcSplitQueryBuilder, "query builder must not be null");
}

@Override
public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalogName, TableName tableName, Schema schema, Constraints constraints, Split split)
throws SQLException
{
PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split);

// Disable fetching all rows.
preparedStatement.setFetchSize(FETCH_SIZE);

return preparedStatement;
}
}
Loading