From 242d5b340bb8ee76876ff01694bcc78a36b55ed4 Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Thu, 5 Aug 2021 18:09:46 -0500 Subject: [PATCH 01/14] Created specification for ssh tunnel configuration --- .../integrations/base/SSHTunnelConfig.java | 55 +++++ .../src/main/resources/spec.json | 210 +++++++++++++++- .../src/main/resources/spec.json | 227 +++++++++++++++++- 3 files changed, 477 insertions(+), 15 deletions(-) create mode 100644 airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnelConfig.java diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnelConfig.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnelConfig.java new file mode 100644 index 000000000000..02173da562ce --- /dev/null +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnelConfig.java @@ -0,0 +1,55 @@ +package io.airbyte.integrations.base; + +/** + * Encapsulates the connection configuration for an ssh tunnel port forward through a proxy/bastion host plus + * the remote host and remote port to forward to a specified local port. + */ +public class SSHTunnelConfig { + + private final String method; + private final String host; + private final String tunnel_ssh_port; + private final String destinationPort; + private final String user; + private final String sshkey; + private final String password; + + public SSHTunnelConfig(String method, String host, String tunnel_ssh_port, String destinationPort, String user, String sshkey, String password) { + this.method = method; + this.host = host; + this.destinationPort = destinationPort; + this.tunnel_ssh_port = tunnel_ssh_port; + this.user = user; + this.sshkey = sshkey; + this.password = password; + } + + public String getMethod() { + return method; + } + + public String getHost() { + return host; + } + + public String getTunnel_ssh_port() { + return tunnel_ssh_port; + } + + public String getDestinationPort() { + return destinationPort; + } + + public String getUser() { + return user; + } + + // TODO: Determine if we can lock down the access on credentials a bit tighter + public String getSSHKey() { + return sshkey; + } + + public String getPassword() { + return password; + } +} diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-postgres/src/main/resources/spec.json index bfb18a87f2fb..415a1446967a 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-postgres/src/main/resources/spec.json @@ -3,12 +3,22 @@ "supportsIncremental": true, "supportsNormalization": true, "supportsDBT": true, - "supported_destination_sync_modes": ["overwrite", "append", "append_dedup"], + "supported_destination_sync_modes": [ + "overwrite", + "append", + "append_dedup" + ], "connectionSpecification": { "$schema": "http://json-schema.org/draft-07/schema#", "title": "Postgres Destination Spec", "type": "object", - "required": ["host", "port", "username", "database", "schema"], + "required": [ + "host", + "port", + "username", + "database", + "schema" + ], "additionalProperties": true, "properties": { "host": { @@ -24,7 +34,9 @@ "minimum": 0, "maximum": 65536, "default": 5432, - "examples": ["5432"], + "examples": [ + "5432" + ], "order": 1 }, "database": { @@ -37,7 +49,9 @@ "title": "Default Schema", "description": "The default schema tables are written to if the source does not specify a namespace. The usual value for this field is \"public\".", "type": "string", - "examples": ["public"], + "examples": [ + "public" + ], "default": "public", "order": 3 }, @@ -60,6 +74,194 @@ "type": "boolean", "default": false, "order": 6 + }, + "tunnel_method": { + "type": "object", + "title": "SSH Tunnel Method", + "description": "Whether to initiate an SSH tunnel before connecting to the database, and if so, which kind of authentication to use.", + "order": 7, + "oneOf": [ + { + "title": "No Tunnel", + "required": [ + "tunnel_method" + ], + "properties": { + "tunnel_method": { + "description": "No ssh tunnel needed to connect to database", + "type": "string", + "enum": [ + "NO_TUNNEL" + ], + "default": "NO_TUNNEL", + "order": 0 + } + } + }, + { + "title": "SSH Key Authentication", + "required": [ + "tunnel_method", + "tunnel_host", + "tunnel_ssh_port", + "tunnel_username", + "tunnel_localport", + "tunnel_usersshkey" + ], + "properties": { + "tunnel_method": { + "description": "Connect through a jump server tunnel host using username and ssh key", + "type": "string", + "enum": [ + "SSH_KEY_AUTH" + ], + "default": "SSH_KEY_AUTH", + "order": 0 + }, + "tunnel_host": { + "title": "SSH Tunnel Jump Server Host", + "description": "Hostname of the jump server host that allows inbound ssh tunnel.", + "type": "string", + "order": 1 + }, + "tunnel_ssh_port": { + "title": "SSH Connection Port", + "description": "Port on the proxy/jump server that accepts inbound ssh connections.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "default": 22, + "examples": [ + "22" + ], + "order": 2 + }, + "tunnel_username": { + "title": "SSH Login Username", + "description": "OS-level username for logging into the jump server host.", + "type": "string", + "order": 3 + }, + "tunnel_usersshkey": { + "title": "SSH Private Key", + "description": "OS-level user account ssh key credentials for logging into the jump server host.", + "type": "string", + "airbyte_secret": true, + "multiline": true, + "order": 4 + }, + "tunnel_db_remote_host": { + "title": "Remote Database Host", + "description": "Hostname or static IP address of the database to port-forward, as recognized from the jump server.", + "type": "String", + "order": 5 + }, + "tunnel_db_remote_port": { + "title": "Remote Database Port", + "description": "Port on the database to port-forward, typically that database's usual default port.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "examples": [ + "5432" + ], + "order": 6 + }, + "tunnel_localport": { + "title": "SSH Tunnel Database Port", + "description": "Port on the jump server host for the database's port to forward to. Do not share a port between database instances.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "examples": [ + "5000" + ], + "order": 7 + } + } + }, + { + "title": "Password Authentication", + "required": [ + "tunnel_method", + "tunnel_host", + "tunnel_ssh_port", + "tunnel_username", + "tunnel_localport", + "tunnel_userpass" + ], + "properties": { + "tunnel_method": { + "description": "Connect through a jump server tunnel host using username and password authentication", + "type": "string", + "enum": [ + "SSH_PASSWORD_AUTH" + ], + "default": "SSH_PASSWORD_AUTH", + "order": 0 + }, + "tunnel_host": { + "title": "SSH Tunnel Jump Server Host", + "description": "Hostname of the jump server host that allows inbound ssh tunnel.", + "type": "string", + "order": 1 + }, + "tunnel_ssh_port": { + "title": "SSH Connection Port", + "description": "Port on the proxy/jump server that accepts inbound ssh connections.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "default": 22, + "examples": [ + "22" + ], + "order": 2 + }, + "tunnel_username": { + "title": "SSH Login Username", + "description": "OS-level username for logging into the jump server host", + "type": "string", + "order": 3 + }, + "tunnel_userpass": { + "title": "Password", + "description": "OS-level password for logging into the jump server host", + "type": "string", + "airbyte_secret": true, + "order": 4 + }, + "tunnel_db_remote_host": { + "title": "Remote Database Host", + "description": "Hostname or static IP address of the database to port-forward, as recognized from the jump server.", + "type": "String", + "order": 5 + }, + "tunnel_db_remote_port": { + "title": "Remote Database Port", + "description": "Port on the database to port-forward, typically that database's usual default port.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "examples": [ + "5432" + ], + "order": 6 + }, + "tunnel_localport": { + "title": "SSH Tunnel Database Port", + "description": "Port on the jump server host for the database's port to forward to. Do not share a port between database instances.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "examples": [ + "5000" + ], + "order": 7 + } + } + } + ] } } } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json b/airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json index 878c4bb3db48..61c08ef9a869 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json @@ -4,7 +4,12 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Postgres Source Spec", "type": "object", - "required": ["host", "port", "database", "username"], + "required": [ + "host", + "port", + "database", + "username" + ], "additionalProperties": false, "properties": { "host": { @@ -20,50 +25,244 @@ "minimum": 0, "maximum": 65536, "default": 5432, - "examples": ["5432"], + "examples": [ + "5432" + ], "order": 1 }, + "tunnel_method": { + "type": "object", + "title": "SSH Tunnel Method", + "description": "Whether to initiate an SSH tunnel before connecting to the database, and if so, which kind of authentication to use.", + "order": 5, + "oneOf": [ + { + "title": "No Tunnel", + "required": [ + "tunnel_method" + ], + "properties": { + "tunnel_method": { + "description": "No ssh tunnel needed to connect to database", + "type": "string", + "enum": [ + "NO_TUNNEL" + ], + "default": "NO_TUNNEL", + "order": 0 + } + } + }, + { + "title": "SSH Key Authentication", + "required": [ + "tunnel_method", + "tunnel_host", + "tunnel_ssh_port", + "tunnel_username", + "tunnel_localport", + "tunnel_usersshkey" + ], + "properties": { + "tunnel_method": { + "description": "Connect through a jump server tunnel host using username and ssh key", + "type": "string", + "enum": [ + "SSH_KEY_AUTH" + ], + "default": "SSH_KEY_AUTH", + "order": 0 + }, + "tunnel_host": { + "title": "SSH Tunnel Jump Server Host", + "description": "Hostname of the jump server host that allows inbound ssh tunnel.", + "type": "string", + "order": 1 + }, + "tunnel_ssh_port": { + "title": "SSH Connection Port", + "description": "Port on the proxy/jump server that accepts inbound ssh connections.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "default": 22, + "examples": [ + "22" + ], + "order": 2 + }, + "tunnel_username": { + "title": "SSH Login Username", + "description": "OS-level username for logging into the jump server host.", + "type": "string", + "order": 3 + }, + "tunnel_usersshkey": { + "title": "SSH Private Key", + "description": "OS-level user account ssh key credentials for logging into the jump server host.", + "type": "string", + "airbyte_secret": true, + "multiline": true, + "order": 4 + }, + "tunnel_db_remote_host": { + "title": "Remote Database Host", + "description": "Hostname or static IP address of the database to port-forward, as recognized from the jump server.", + "type": "String", + "order": 5 + }, + "tunnel_db_remote_port": { + "title": "Remote Database Port", + "description": "Port on the database to port-forward, typically that database's usual default port.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "examples": [ + "5432" + ], + "order": 6 + }, + "tunnel_localport": { + "title": "SSH Tunnel Database Port", + "description": "Port on the jump server host for the database's port to forward to. Do not share a port between database instances.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "examples": [ + "5000" + ], + "order": 7 + } + } + }, + { + "title": "Password Authentication", + "required": [ + "tunnel_method", + "tunnel_host", + "tunnel_ssh_port", + "tunnel_username", + "tunnel_localport", + "tunnel_userpass" + ], + "properties": { + "tunnel_method": { + "description": "Connect through a jump server tunnel host using username and password authentication", + "type": "string", + "enum": [ + "SSH_PASSWORD_AUTH" + ], + "default": "SSH_PASSWORD_AUTH", + "order": 0 + }, + "tunnel_host": { + "title": "SSH Tunnel Jump Server Host", + "description": "Hostname of the jump server host that allows inbound ssh tunnel.", + "type": "string", + "order": 1 + }, + "tunnel_ssh_port": { + "title": "SSH Connection Port", + "description": "Port on the proxy/jump server that accepts inbound ssh connections.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "default": 22, + "examples": [ + "22" + ], + "order": 2 + }, + "tunnel_username": { + "title": "SSH Login Username", + "description": "OS-level username for logging into the jump server host", + "type": "string", + "order": 3 + }, + "tunnel_userpass": { + "title": "Password", + "description": "OS-level password for logging into the jump server host", + "type": "string", + "airbyte_secret": true, + "order": 4 + }, + "tunnel_db_remote_host": { + "title": "Remote Database Host", + "description": "Hostname or static IP address of the database to port-forward, as recognized from the jump server.", + "type": "String", + "order": 5 + }, + "tunnel_db_remote_port": { + "title": "Remote Database Port", + "description": "Port on the database to port-forward, typically that database's usual default port.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "examples": [ + "5432" + ], + "order": 6 + }, + "tunnel_localport": { + "title": "SSH Tunnel Database Port", + "description": "Port on the jump server host for the database's port to forward to. Do not share a port between database instances.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "examples": [ + "5000" + ], + "order": 7 + } + } + } + ] + }, "database": { "title": "DB Name", "description": "Name of the database.", "type": "string", - "order": 2 + "order": 7 }, "username": { "title": "User", "description": "Username to use to access the database.", "type": "string", - "order": 3 + "order": 8 }, "password": { "title": "Password", "description": "Password associated with the username.", "type": "string", "airbyte_secret": true, - "order": 4 + "order": 9 }, "ssl": { "title": "Connect using SSL", "description": "Encrypt client/server communications for increased security.", "type": "boolean", "default": false, - "order": 5 + "order": 10 }, "replication_method": { "type": "object", "title": "Replication Method", "description": "Replication method to use for extracting data from the database.", - "order": 6, + "order": 11, "oneOf": [ { "title": "Standard", "additionalProperties": false, "description": "Standard replication requires no setup on the DB side but will not be able to represent deletions incrementally.", - "required": ["method"], + "required": [ + "method" + ], "properties": { "method": { "type": "string", - "enum": ["Standard"], + "enum": [ + "Standard" + ], "default": "Standard", "order": 0 } @@ -73,11 +272,17 @@ "title": "Logical Replication (CDC)", "additionalProperties": false, "description": "Logical replication uses the Postgres write-ahead log (WAL) to detect inserts, updates, and deletes. This needs to be configured on the source database itself. Only available on Postgres 10 and above. Read the Postgres Source docs for more information.", - "required": ["method", "replication_slot", "publication"], + "required": [ + "method", + "replication_slot", + "publication" + ], "properties": { "method": { "type": "string", - "enum": ["CDC"], + "enum": [ + "CDC" + ], "default": "CDC", "order": 0 }, From ab33eb6d577fde0de19d02c3b8d8827120a8624a Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Fri, 6 Aug 2021 19:09:22 -0500 Subject: [PATCH 02/14] Read ssh tunnel config for destination postgres --- .../integrations/base/SSHTunnelConfig.java | 12 ++++++++++++ .../jdbc/AbstractJdbcDestination.java | 19 ++++++++++++++++++- .../jdbc/JdbcBufferedConsumerFactory.java | 10 ++++++---- .../src/main/resources/spec.json | 4 ++-- .../src/main/resources/spec.json | 4 ++-- 5 files changed, 40 insertions(+), 9 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnelConfig.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnelConfig.java index 02173da562ce..54a37532e78d 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnelConfig.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnelConfig.java @@ -52,4 +52,16 @@ public String getSSHKey() { public String getPassword() { return password; } + + @Override + public String toString() { + return "SSHTunnelConfig{" + + "method='" + method + '\'' + + ", host='" + host + '\'' + + ", tunnel_ssh_port='" + tunnel_ssh_port + '\'' + + ", destinationPort='" + destinationPort + '\'' + + ", user='" + user + '\'' + + ", sshkey='" + sshkey + '\'' + + '}'; + } } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java index 61922be562d9..6522f1ddb029 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java @@ -31,6 +31,7 @@ import io.airbyte.integrations.BaseConnector; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.SSHTunnelConfig; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; @@ -73,6 +74,7 @@ public AbstractJdbcDestination(final String driverClass, public AirbyteConnectionStatus check(JsonNode config) { try (final JdbcDatabase database = getDatabase(config)) { + SSHTunnelConfig tunnelConfig = getSSHTunnelConfig(config); String outputSchema = namingResolver.getIdentifier(config.get("schema").asText()); attemptSQLCreateAndDropTableOperations(outputSchema, database, namingResolver, sqlOperations); return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); @@ -110,11 +112,26 @@ protected JdbcDatabase getDatabase(JsonNode config) { driverClass); } + protected SSHTunnelConfig getSSHTunnelConfig(JsonNode config) { + LOGGER.error("Getting SSH Tunnel config"); + SSHTunnelConfig sshconfig = new SSHTunnelConfig( + config.get("tunnel_method").asText(), + config.get("tunnel_host").asText(), + config.get("tunnel_ssh_port").asText(), + config.get("tunnel_localport").asText(), + config.get("tunnel_user").asText(), + config.has("tunnel_usersshkey") ? config.get("tunnel_usersshkey").asText() : null, + config.has("tunnel_userpass") ? config.get("tunnel_userpass").asText() : null + ); + LOGGER.error("Got SSH Tunnel config " + sshconfig); // TODO + return sshconfig; + } + public abstract JsonNode toJdbcConfig(JsonNode config); @Override public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog, Consumer outputRecordCollector) { - return JdbcBufferedConsumerFactory.create(outputRecordCollector, getDatabase(config), sqlOperations, namingResolver, config, catalog); + return JdbcBufferedConsumerFactory.create(outputRecordCollector, getDatabase(config), getSSHTunnelConfig(config), sqlOperations, namingResolver, config, catalog); } } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java index e1f3fcef08b4..ebbe640ebedf 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java @@ -30,6 +30,7 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.integrations.base.SSHTunnelConfig; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer; import io.airbyte.integrations.destination.buffered_stream_consumer.OnCloseFunction; @@ -67,6 +68,7 @@ public class JdbcBufferedConsumerFactory { public static AirbyteMessageConsumer create(Consumer outputRecordCollector, JdbcDatabase database, + SSHTunnelConfig sshTunnelConfig, SqlOperations sqlOperations, NamingConventionTransformer namingResolver, JsonNode config, @@ -75,9 +77,9 @@ public static AirbyteMessageConsumer create(Consumer outputRecor return new BufferedStreamConsumer( outputRecordCollector, - onStartFunction(database, sqlOperations, writeConfigs), + onStartFunction(database, sshTunnelConfig, sqlOperations, writeConfigs), recordWriterFunction(database, sqlOperations, writeConfigs, catalog), - onCloseFunction(database, sqlOperations, writeConfigs), + onCloseFunction(database, sshTunnelConfig, sqlOperations, writeConfigs), catalog, sqlOperations::isValidData, MAX_BATCH_SIZE); @@ -134,7 +136,7 @@ private static String getOutputSchema(AirbyteStream stream, String defaultDestSc return defaultDestSchema; } - private static OnStartFunction onStartFunction(JdbcDatabase database, SqlOperations sqlOperations, List writeConfigs) { + private static OnStartFunction onStartFunction(JdbcDatabase database, SSHTunnelConfig sshTunnelConfig, SqlOperations sqlOperations, List writeConfigs) { return () -> { LOGGER.info("Preparing tmp tables in destination started for {} streams", writeConfigs.size()); for (final WriteConfig writeConfig : writeConfigs) { @@ -168,7 +170,7 @@ private static RecordWriter recordWriterFunction(JdbcDatabase database, }; } - private static OnCloseFunction onCloseFunction(JdbcDatabase database, SqlOperations sqlOperations, List writeConfigs) { + private static OnCloseFunction onCloseFunction(JdbcDatabase database, SSHTunnelConfig sshTunnelConfig, SqlOperations sqlOperations, List writeConfigs) { return (hasFailed) -> { // copy data if (!hasFailed) { diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-postgres/src/main/resources/spec.json index 415a1446967a..61f92dc8eef1 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-postgres/src/main/resources/spec.json @@ -153,7 +153,7 @@ "tunnel_db_remote_host": { "title": "Remote Database Host", "description": "Hostname or static IP address of the database to port-forward, as recognized from the jump server.", - "type": "String", + "type": "string", "order": 5 }, "tunnel_db_remote_port": { @@ -234,7 +234,7 @@ "tunnel_db_remote_host": { "title": "Remote Database Host", "description": "Hostname or static IP address of the database to port-forward, as recognized from the jump server.", - "type": "String", + "type": "string", "order": 5 }, "tunnel_db_remote_port": { diff --git a/airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json b/airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json index 61c08ef9a869..c8b2014a3bf1 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json @@ -108,7 +108,7 @@ "tunnel_db_remote_host": { "title": "Remote Database Host", "description": "Hostname or static IP address of the database to port-forward, as recognized from the jump server.", - "type": "String", + "type": "string", "order": 5 }, "tunnel_db_remote_port": { @@ -189,7 +189,7 @@ "tunnel_db_remote_host": { "title": "Remote Database Host", "description": "Hostname or static IP address of the database to port-forward, as recognized from the jump server.", - "type": "String", + "type": "string", "order": 5 }, "tunnel_db_remote_port": { From bfedf02297d1e08ce6cc8034fdb178f55c25a394 Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Sat, 7 Aug 2021 15:33:14 -0500 Subject: [PATCH 03/14] Added apache mina sshd for ssh client tunnel support --- .../bases/base-java/build.gradle | 1 + .../airbyte/integrations/base/SSHTunnel.java | 155 ++++++++++++++++++ .../integrations/base/SSHTunnelConfig.java | 67 -------- .../jdbc/AbstractJdbcDestination.java | 14 +- .../jdbc/JdbcBufferedConsumerFactory.java | 12 +- 5 files changed, 170 insertions(+), 79 deletions(-) create mode 100644 airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java delete mode 100644 airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnelConfig.java diff --git a/airbyte-integrations/bases/base-java/build.gradle b/airbyte-integrations/bases/base-java/build.gradle index 34f8fca02d6d..d1a783bfccb8 100644 --- a/airbyte-integrations/bases/base-java/build.gradle +++ b/airbyte-integrations/bases/base-java/build.gradle @@ -5,6 +5,7 @@ plugins { dependencies { implementation 'commons-cli:commons-cli:1.4' + implementation group: 'org.apache.sshd', name: 'sshd-mina', version: '2.7.0' implementation project(':airbyte-protocol:models') implementation project(":airbyte-json-validation") diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java new file mode 100644 index 000000000000..d9aa245e02b9 --- /dev/null +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java @@ -0,0 +1,155 @@ +package io.airbyte.integrations.base; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.KeyFactory; +import java.security.KeyPair; +import java.security.NoSuchAlgorithmException; +import java.security.PrivateKey; +import java.security.interfaces.RSAPublicKey; +import java.security.spec.InvalidKeySpecException; +import java.security.spec.PKCS8EncodedKeySpec; +import java.security.spec.X509EncodedKeySpec; +import java.util.Base64; +import org.apache.sshd.client.SshClient; +import org.apache.sshd.client.session.ClientSession; +import org.apache.sshd.common.util.net.SshdSocketAddress; +import org.apache.sshd.server.forward.AcceptAllForwardingFilter; + +/** + * Encapsulates the connection configuration for an ssh tunnel port forward through a proxy/bastion host plus the remote host and remote port to + * forward to a specified local port. + */ +public class SSHTunnel { + + public static final int TIMEOUT_MILLIS = 1000; + private final String method; + private final String host; + private final String tunnelSshPort; + private final String user; + private final String sshkey; + private final String password; + private final String remoteDatabaseHost; + private final String remoteDatabasePort; + private final String tunnelDatabasePort; + + public SSHTunnel(String method, String host, String tunnelSshPort, + String user, String sshkey, String password, String remoteDatabaseHost, String remoteDatabasePort, String tunnelDatabasePort) { + this.method = method; + this.host = host; + this.tunnelSshPort = tunnelSshPort; + this.user = user; + this.sshkey = sshkey; + this.password = password; + this.remoteDatabaseHost = remoteDatabaseHost; + this.remoteDatabasePort = remoteDatabasePort; + this.tunnelDatabasePort = tunnelDatabasePort; + } + + + public String getMethod() { + return method; + } + + public String getHost() { + return host; + } + + public String getTunnelSshPort() { + return tunnelSshPort; + } + + public String getUser() { + return user; + } + + // TODO: Determine if we can lock down the access on credentials a bit tighter + String getSSHKey() { + return sshkey; + } + + public String getPassword() { + return password; + } + + public String getRemoteDatabaseHost() { + return remoteDatabaseHost; + } + + public String getRemoteDatabasePort() { + return remoteDatabasePort; + } + + public String getTunnelDatabasePort() { + return tunnelDatabasePort; + } + + /** + * From the pem format private key string, parse the private key, discover the public key, and return the pair for auth use. + * + * @return + * @throws InvalidKeySpecException + * @throws NoSuchAlgorithmException + * @throws URISyntaxException + */ + protected KeyPair getPrivateKeyPair() throws InvalidKeySpecException, NoSuchAlgorithmException { + KeyFactory kf = KeyFactory.getInstance("RSA"); + // TODO: bouncycastle has a pem reader that can do this step for us. + String privateKeyContent = getSSHKey().replaceAll("\\n", "").replace("-----BEGIN PRIVATE KEY-----", "").replace("-----END PRIVATE KEY-----", ""); + PrivateKey privKey = kf.generatePrivate(new PKCS8EncodedKeySpec(Base64.getDecoder().decode(privateKeyContent))); + RSAPublicKey pubKey = (RSAPublicKey) kf.generatePublic(kf.getKeySpec(privKey, X509EncodedKeySpec.class)); + return new KeyPair(pubKey, privKey); + } + + /** + * Generates a new ssh client and returns it, with forwarding set to accept all types; use this before opening a tunnel. + * @return + */ + public SshClient createClient() { + SshClient client = SshClient.setUpDefaultClient(); + client.setForwardingFilter(AcceptAllForwardingFilter.INSTANCE); + return client; + } + + /** + * Starts an ssh session; wrap this in a try-finally and use closeTunnel() to close it. + * @return + * @throws IOException + * @throws InvalidKeySpecException + * @throws NoSuchAlgorithmException + * @throws URISyntaxException + */ + public ClientSession openTunnel(SshClient client) throws IOException, InvalidKeySpecException, NoSuchAlgorithmException { + client.start(); + ClientSession session = client + .connect(getUser(), getHost(), Integer.getInteger(getTunnelSshPort())) + .verify(TIMEOUT_MILLIS) + .getSession(); + session.addPasswordIdentity(getPassword()); + session.addPublicKeyIdentity(getPrivateKeyPair()); + session.auth().verify(TIMEOUT_MILLIS); + session.startRemotePortForwarding( + new SshdSocketAddress(getRemoteDatabaseHost(), Integer.getInteger(getRemoteDatabasePort())), + new SshdSocketAddress("localhost", Integer.getInteger(getTunnelDatabasePort())) + ); + return session; + } + + public void closeTunnel(SshClient client, ClientSession session) throws IOException { + session.close(); + client.stop(); + } + + @Override + public String toString() { + return "SSHTunnel{" + + "method='" + method + '\'' + + ", host='" + host + '\'' + + ", tunnelSshPort='" + tunnelSshPort + '\'' + + ", user='" + user + '\'' + + ", remoteDatabaseHost='" + remoteDatabaseHost + '\'' + + ", remoteDatabasePort='" + remoteDatabasePort + '\'' + + ", tunnelDatabasePort='" + tunnelDatabasePort + '\'' + + '}'; + } +} diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnelConfig.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnelConfig.java deleted file mode 100644 index 54a37532e78d..000000000000 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnelConfig.java +++ /dev/null @@ -1,67 +0,0 @@ -package io.airbyte.integrations.base; - -/** - * Encapsulates the connection configuration for an ssh tunnel port forward through a proxy/bastion host plus - * the remote host and remote port to forward to a specified local port. - */ -public class SSHTunnelConfig { - - private final String method; - private final String host; - private final String tunnel_ssh_port; - private final String destinationPort; - private final String user; - private final String sshkey; - private final String password; - - public SSHTunnelConfig(String method, String host, String tunnel_ssh_port, String destinationPort, String user, String sshkey, String password) { - this.method = method; - this.host = host; - this.destinationPort = destinationPort; - this.tunnel_ssh_port = tunnel_ssh_port; - this.user = user; - this.sshkey = sshkey; - this.password = password; - } - - public String getMethod() { - return method; - } - - public String getHost() { - return host; - } - - public String getTunnel_ssh_port() { - return tunnel_ssh_port; - } - - public String getDestinationPort() { - return destinationPort; - } - - public String getUser() { - return user; - } - - // TODO: Determine if we can lock down the access on credentials a bit tighter - public String getSSHKey() { - return sshkey; - } - - public String getPassword() { - return password; - } - - @Override - public String toString() { - return "SSHTunnelConfig{" + - "method='" + method + '\'' + - ", host='" + host + '\'' + - ", tunnel_ssh_port='" + tunnel_ssh_port + '\'' + - ", destinationPort='" + destinationPort + '\'' + - ", user='" + user + '\'' + - ", sshkey='" + sshkey + '\'' + - '}'; - } -} diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java index 6522f1ddb029..4068b5adbaca 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java @@ -31,7 +31,7 @@ import io.airbyte.integrations.BaseConnector; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.Destination; -import io.airbyte.integrations.base.SSHTunnelConfig; +import io.airbyte.integrations.base.SSHTunnel; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; @@ -74,7 +74,7 @@ public AbstractJdbcDestination(final String driverClass, public AirbyteConnectionStatus check(JsonNode config) { try (final JdbcDatabase database = getDatabase(config)) { - SSHTunnelConfig tunnelConfig = getSSHTunnelConfig(config); + SSHTunnel tunnelConfig = getSSHTunnelConfig(config); String outputSchema = namingResolver.getIdentifier(config.get("schema").asText()); attemptSQLCreateAndDropTableOperations(outputSchema, database, namingResolver, sqlOperations); return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); @@ -112,16 +112,18 @@ protected JdbcDatabase getDatabase(JsonNode config) { driverClass); } - protected SSHTunnelConfig getSSHTunnelConfig(JsonNode config) { + protected SSHTunnel getSSHTunnelConfig(JsonNode config) { LOGGER.error("Getting SSH Tunnel config"); - SSHTunnelConfig sshconfig = new SSHTunnelConfig( + SSHTunnel sshconfig = new SSHTunnel( config.get("tunnel_method").asText(), config.get("tunnel_host").asText(), config.get("tunnel_ssh_port").asText(), - config.get("tunnel_localport").asText(), config.get("tunnel_user").asText(), config.has("tunnel_usersshkey") ? config.get("tunnel_usersshkey").asText() : null, - config.has("tunnel_userpass") ? config.get("tunnel_userpass").asText() : null + config.has("tunnel_userpass") ? config.get("tunnel_userpass").asText() : null, + config.get("tunnel_db_remote_host").asText(), + config.get("tunnel_db_remote_port").asText(), + config.get("tunnel_localport").asText() ); LOGGER.error("Got SSH Tunnel config " + sshconfig); // TODO return sshconfig; diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java index ebbe640ebedf..bff9abd535db 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java @@ -30,7 +30,7 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; -import io.airbyte.integrations.base.SSHTunnelConfig; +import io.airbyte.integrations.base.SSHTunnel; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer; import io.airbyte.integrations.destination.buffered_stream_consumer.OnCloseFunction; @@ -68,7 +68,7 @@ public class JdbcBufferedConsumerFactory { public static AirbyteMessageConsumer create(Consumer outputRecordCollector, JdbcDatabase database, - SSHTunnelConfig sshTunnelConfig, + SSHTunnel sshTunnel, SqlOperations sqlOperations, NamingConventionTransformer namingResolver, JsonNode config, @@ -77,9 +77,9 @@ public static AirbyteMessageConsumer create(Consumer outputRecor return new BufferedStreamConsumer( outputRecordCollector, - onStartFunction(database, sshTunnelConfig, sqlOperations, writeConfigs), + onStartFunction(database, sshTunnel, sqlOperations, writeConfigs), recordWriterFunction(database, sqlOperations, writeConfigs, catalog), - onCloseFunction(database, sshTunnelConfig, sqlOperations, writeConfigs), + onCloseFunction(database, sshTunnel, sqlOperations, writeConfigs), catalog, sqlOperations::isValidData, MAX_BATCH_SIZE); @@ -136,7 +136,7 @@ private static String getOutputSchema(AirbyteStream stream, String defaultDestSc return defaultDestSchema; } - private static OnStartFunction onStartFunction(JdbcDatabase database, SSHTunnelConfig sshTunnelConfig, SqlOperations sqlOperations, List writeConfigs) { + private static OnStartFunction onStartFunction(JdbcDatabase database, SSHTunnel sshTunnel, SqlOperations sqlOperations, List writeConfigs) { return () -> { LOGGER.info("Preparing tmp tables in destination started for {} streams", writeConfigs.size()); for (final WriteConfig writeConfig : writeConfigs) { @@ -170,7 +170,7 @@ private static RecordWriter recordWriterFunction(JdbcDatabase database, }; } - private static OnCloseFunction onCloseFunction(JdbcDatabase database, SSHTunnelConfig sshTunnelConfig, SqlOperations sqlOperations, List writeConfigs) { + private static OnCloseFunction onCloseFunction(JdbcDatabase database, SSHTunnel sshTunnel, SqlOperations sqlOperations, List writeConfigs) { return (hasFailed) -> { // copy data if (!hasFailed) { From 6b53d169df21eb3823be63a20410989da60b237f Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Tue, 10 Aug 2021 13:19:53 -0500 Subject: [PATCH 04/14] Integrate ssh key format parsing. Fix up json config reading. --- airbyte-commons/src/main/resources/log4j2.xml | 10 +- .../bases/base-java/build.gradle | 2 + .../airbyte/integrations/base/SSHTunnel.java | 112 ++++++++++++++++-- .../connectors/destination-jdbc/build.gradle | 3 + .../jdbc/AbstractJdbcDestination.java | 54 +++++++-- 5 files changed, 151 insertions(+), 30 deletions(-) diff --git a/airbyte-commons/src/main/resources/log4j2.xml b/airbyte-commons/src/main/resources/log4j2.xml index 84456bcbaf37..45bd61b2ca89 100644 --- a/airbyte-commons/src/main/resources/log4j2.xml +++ b/airbyte-commons/src/main/resources/log4j2.xml @@ -1,8 +1,8 @@ - %d{yyyy-MM-dd HH:mm:ss}{GMT+0} %highlight{%p} %C{1.}(%M):%L - %X - %m%n - %d{yyyy-MM-dd HH:mm:ss}{GMT+0} %p (%X{job_root}) %C{1}(%M):%L - %m%n + %d{yyyy-MM-dd HH:mm:ss}{GMT+0} %highlight{%p} %C{1.}(%M):%L - %X - %replace{%m}{[\r\n]}{ | }%n + %d{yyyy-MM-dd HH:mm:ss}{GMT+0} %p (%X{job_root}) %C{1}(%M):%L - %replace{%m}{[\r\n]}{ | } %n $${env:LOG_LEVEL:-INFO} @@ -56,7 +56,7 @@ s3ServiceEndpoint="${s3-minio-endpoint}" s3PathStyleAccess="${s3-path-style-access}" gcpStorageBucket="${gcp-storage-bucket}" gcpStorageBlobNamePrefix="job-logging${ctx:job_log_path}"> - %d{yyyy-MM-dd HH:mm:ss} %-5p %m%n + %d{yyyy-MM-dd HH:mm:ss} %-5p %replace{%m}{[\r\n]}{ | } %n @@ -77,7 +77,7 @@ filePattern="${ctx:workspace_app_root}/logs.%i.log.gz" ignoreExceptions="false"> - %d{yyyy-MM-dd HH:mm:ss} %-5p %m%n + %d{yyyy-MM-dd HH:mm:ss} %-5p %replace{%m}{[\r\n]}{ | } %n @@ -102,7 +102,7 @@ s3ServiceEndpoint="${s3-minio-endpoint}" s3PathStyleAccess="${s3-path-style-access}" gcpStorageBucket="${gcp-storage-bucket}" gcpStorageBlobNamePrefix="app-logging${ctx:workspace_app_root}"> - %d{yyyy-MM-dd HH:mm:ss} %-5p %m%n + %d{yyyy-MM-dd HH:mm:ss} %-5p %replace{%m}{[\r\n]}{ | } %n diff --git a/airbyte-integrations/bases/base-java/build.gradle b/airbyte-integrations/bases/base-java/build.gradle index d1a783bfccb8..c2c4a1563e93 100644 --- a/airbyte-integrations/bases/base-java/build.gradle +++ b/airbyte-integrations/bases/base-java/build.gradle @@ -6,6 +6,8 @@ plugins { dependencies { implementation 'commons-cli:commons-cli:1.4' implementation group: 'org.apache.sshd', name: 'sshd-mina', version: '2.7.0' + implementation group: 'org.bouncycastle', name: 'bcprov-jdk14', version: '1.69' + implementation group: 'org.bouncycastle', name: 'bcpkix-jdk14', version: '1.69' implementation project(':airbyte-protocol:models') implementation project(":airbyte-json-validation") diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java index d9aa245e02b9..47b616d5e4cd 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java @@ -1,20 +1,37 @@ package io.airbyte.integrations.base; +import java.io.File; +import java.io.FileReader; import java.io.IOException; +import java.io.StringReader; import java.net.URISyntaxException; import java.security.KeyFactory; import java.security.KeyPair; import java.security.NoSuchAlgorithmException; import java.security.PrivateKey; +import java.security.PublicKey; +import java.security.interfaces.RSAPrivateKey; import java.security.interfaces.RSAPublicKey; import java.security.spec.InvalidKeySpecException; import java.security.spec.PKCS8EncodedKeySpec; +import java.security.spec.RSAPrivateKeySpec; +import java.security.spec.RSAPublicKeySpec; import java.security.spec.X509EncodedKeySpec; import java.util.Base64; import org.apache.sshd.client.SshClient; import org.apache.sshd.client.session.ClientSession; import org.apache.sshd.common.util.net.SshdSocketAddress; import org.apache.sshd.server.forward.AcceptAllForwardingFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.bouncycastle.asn1.pkcs.PrivateKeyInfo; +import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo; +import org.bouncycastle.openssl.PEMParser; +import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter; +import org.bouncycastle.util.io.pem.PemObject; +import org.bouncycastle.util.io.pem.PemReader; + /** * Encapsulates the connection configuration for an ssh tunnel port forward through a proxy/bastion host plus the remote host and remote port to @@ -22,7 +39,9 @@ */ public class SSHTunnel { - public static final int TIMEOUT_MILLIS = 1000; + private static final Logger LOGGER = LoggerFactory.getLogger(SSHTunnel.class); + + public static final int TIMEOUT_MILLIS = 15000; // 15 seconds private final String method; private final String host; private final String tunnelSshPort; @@ -35,7 +54,11 @@ public class SSHTunnel { public SSHTunnel(String method, String host, String tunnelSshPort, String user, String sshkey, String password, String remoteDatabaseHost, String remoteDatabasePort, String tunnelDatabasePort) { - this.method = method; + if (method == null) { + this.method = "NO_TUNNEL"; + } else { + this.method = method; + } this.host = host; this.tunnelSshPort = tunnelSshPort; this.user = user; @@ -46,6 +69,10 @@ public SSHTunnel(String method, String host, String tunnelSshPort, this.tunnelDatabasePort = tunnelDatabasePort; } + public boolean shouldTunnel() { + return method != null && !"NO_TUNNEL".equals(method); + } + public String getMethod() { return method; @@ -92,17 +119,45 @@ public String getTunnelDatabasePort() { * @throws NoSuchAlgorithmException * @throws URISyntaxException */ - protected KeyPair getPrivateKeyPair() throws InvalidKeySpecException, NoSuchAlgorithmException { + protected KeyPair xgetPrivateKeyPair() throws InvalidKeySpecException, NoSuchAlgorithmException { KeyFactory kf = KeyFactory.getInstance("RSA"); // TODO: bouncycastle has a pem reader that can do this step for us. - String privateKeyContent = getSSHKey().replaceAll("\\n", "").replace("-----BEGIN PRIVATE KEY-----", "").replace("-----END PRIVATE KEY-----", ""); + String privateKeyContent = getSSHKey() + .replaceAll("\\n", "") + .replace("-----BEGIN PRIVATE KEY-----", "") + .replace("-----END PRIVATE KEY-----", ""); PrivateKey privKey = kf.generatePrivate(new PKCS8EncodedKeySpec(Base64.getDecoder().decode(privateKeyContent))); RSAPublicKey pubKey = (RSAPublicKey) kf.generatePublic(kf.getKeySpec(privKey, X509EncodedKeySpec.class)); return new KeyPair(pubKey, privKey); } + public RSAPublicKey readX509PublicKey() throws InvalidKeySpecException, IOException, NoSuchAlgorithmException { + File file = new File("/Users/jennybrown/dev/airbyte/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/secrets/dbtunnel-bastion-airbyte_rsa.pub.pem"); + if (! file.canRead()) { + throw new RuntimeException("Cannot read file!"); + }; + try (FileReader keyReader = new FileReader(file)) { + PEMParser pemParser = new PEMParser(keyReader); + JcaPEMKeyConverter converter = new JcaPEMKeyConverter(); + SubjectPublicKeyInfo publicKeyInfo = SubjectPublicKeyInfo.getInstance(pemParser.readObject()); + return (RSAPublicKey) converter.getPublicKey(publicKeyInfo); + } + } + + public KeyPair getPrivateKeyPair() throws IOException, NoSuchAlgorithmException, InvalidKeySpecException { + FileReader fr = new FileReader(new File( + "/Users/jennybrown/dev/airbyte/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/secrets/dbtunnel-bastion-airbyte_pkcs8.pem")); + PEMParser pemParser = new PEMParser(fr); + JcaPEMKeyConverter converter = new JcaPEMKeyConverter(); + PrivateKeyInfo privateKeyInfo = PrivateKeyInfo.getInstance(pemParser.readObject()); + RSAPrivateKey privKey = (RSAPrivateKey) converter.getPrivateKey(privateKeyInfo); + RSAPublicKey pubKey = readX509PublicKey(); + return new KeyPair(pubKey, privKey); + } + /** * Generates a new ssh client and returns it, with forwarding set to accept all types; use this before opening a tunnel. + * * @return */ public SshClient createClient() { @@ -111,8 +166,15 @@ public SshClient createClient() { return client; } + private void validate() { + if (getHost() == null) { + throw new RuntimeException("SSH Tunnel host is null - verify configuration before starting tunnel!"); + } + } + /** * Starts an ssh session; wrap this in a try-finally and use closeTunnel() to close it. + * * @return * @throws IOException * @throws InvalidKeySpecException @@ -120,24 +182,38 @@ public SshClient createClient() { * @throws URISyntaxException */ public ClientSession openTunnel(SshClient client) throws IOException, InvalidKeySpecException, NoSuchAlgorithmException { + validate(); client.start(); - ClientSession session = client - .connect(getUser(), getHost(), Integer.getInteger(getTunnelSshPort())) + ClientSession session = client.connect( + getUser().trim(), + getHost().trim(), + Integer.parseInt( + getTunnelSshPort().trim() + )) .verify(TIMEOUT_MILLIS) .getSession(); - session.addPasswordIdentity(getPassword()); - session.addPublicKeyIdentity(getPrivateKeyPair()); + if (getMethod().equals("SSH_KEY_AUTH")) { + session.addPublicKeyIdentity(getPrivateKeyPair()); + } + if (getMethod().equals("SSH_PASSWORD_AUTH")) { + session.addPasswordIdentity(getPassword()); + } session.auth().verify(TIMEOUT_MILLIS); session.startRemotePortForwarding( - new SshdSocketAddress(getRemoteDatabaseHost(), Integer.getInteger(getRemoteDatabasePort())), - new SshdSocketAddress("localhost", Integer.getInteger(getTunnelDatabasePort())) + new SshdSocketAddress(getRemoteDatabaseHost(), Integer.parseInt(getRemoteDatabasePort().trim())), + new SshdSocketAddress("localhost", Integer.parseInt(getTunnelDatabasePort().trim())) ); + LOGGER.info("Established tunnelling session. Port forwarding started."); return session; } public void closeTunnel(SshClient client, ClientSession session) throws IOException { - session.close(); - client.stop(); + if (session != null) { + session.close(); + } + if (client != null) { + client.stop(); + } } @Override @@ -152,4 +228,16 @@ public String toString() { ", tunnelDatabasePort='" + tunnelDatabasePort + '\'' + '}'; } + + public static void main(String[] args) throws Throwable { + SSHTunnel tunnel = new SSHTunnel( + "SSH_KEY_AUTH", "3.18.93.32", "22", "airbyte", "", "", + "tunnel-dev.cevykyaz98rn.us-east-2.rds.amazonaws.com", "5432", + "5000"); + SshClient client; + client = tunnel.createClient(); + ClientSession session = tunnel.openTunnel(client); + tunnel.closeTunnel(client, session); + } + } diff --git a/airbyte-integrations/connectors/destination-jdbc/build.gradle b/airbyte-integrations/connectors/destination-jdbc/build.gradle index cec590fe388c..781df026633d 100644 --- a/airbyte-integrations/connectors/destination-jdbc/build.gradle +++ b/airbyte-integrations/connectors/destination-jdbc/build.gradle @@ -11,6 +11,9 @@ application { dependencies { implementation 'com.google.cloud:google-cloud-storage:1.113.16' implementation 'com.google.auth:google-auth-library-oauth2-http:0.25.5' + implementation group: 'org.apache.sshd', name: 'sshd-mina', version: '2.7.0' + implementation group: 'org.bouncycastle', name: 'bcprov-jdk14', version: '1.69' + implementation group: 'org.bouncycastle', name: 'bcpkix-jdk14', version: '1.69' implementation project(':airbyte-db') implementation project(':airbyte-integrations:bases:base-java') diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java index 4068b5adbaca..9cb90d0be03c 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java @@ -41,6 +41,8 @@ import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.sshd.client.SshClient; +import org.apache.sshd.client.session.ClientSession; public abstract class AbstractJdbcDestination extends BaseConnector implements Destination { @@ -72,9 +74,17 @@ public AbstractJdbcDestination(final String driverClass, @Override public AirbyteConnectionStatus check(JsonNode config) { - + SSHTunnel tunnelConfig = null; + SshClient sshclient = null; + ClientSession tunnelSession = null; try (final JdbcDatabase database = getDatabase(config)) { - SSHTunnel tunnelConfig = getSSHTunnelConfig(config); + tunnelConfig = getSSHTunnelConfig(config); + if (tunnelConfig.shouldTunnel()) { + sshclient = tunnelConfig.createClient(); + LOGGER.error("JENNY TESTING - Client created."); + tunnelSession = tunnelConfig.openTunnel(sshclient); + LOGGER.error("JENNY TESTING - Tunnel opened."); + } String outputSchema = namingResolver.getIdentifier(config.get("schema").asText()); attemptSQLCreateAndDropTableOperations(outputSchema, database, namingResolver, sqlOperations); return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); @@ -83,6 +93,16 @@ public AirbyteConnectionStatus check(JsonNode config) { return new AirbyteConnectionStatus() .withStatus(Status.FAILED) .withMessage("Could not connect with provided configuration. \n" + e.getMessage()); + } finally { + if (tunnelConfig.shouldTunnel()) { + try { + if (sshclient != null) { + tunnelConfig.closeTunnel(sshclient, tunnelSession); + } + } catch (Throwable t) { + t.printStackTrace(); + } + } } } @@ -112,23 +132,31 @@ protected JdbcDatabase getDatabase(JsonNode config) { driverClass); } + + protected SSHTunnel getSSHTunnelConfig(JsonNode config) { - LOGGER.error("Getting SSH Tunnel config"); + JsonNode ourConfig = config.get("tunnel_method"); SSHTunnel sshconfig = new SSHTunnel( - config.get("tunnel_method").asText(), - config.get("tunnel_host").asText(), - config.get("tunnel_ssh_port").asText(), - config.get("tunnel_user").asText(), - config.has("tunnel_usersshkey") ? config.get("tunnel_usersshkey").asText() : null, - config.has("tunnel_userpass") ? config.get("tunnel_userpass").asText() : null, - config.get("tunnel_db_remote_host").asText(), - config.get("tunnel_db_remote_port").asText(), - config.get("tunnel_localport").asText() + getConfigValueOrNull(ourConfig, "tunnel_method"), + getConfigValueOrNull(ourConfig, "tunnel_host"), + getConfigValueOrNull(ourConfig, "tunnel_ssh_port"), + getConfigValueOrNull(ourConfig, "tunnel_username"), + getConfigValueOrNull(ourConfig, "tunnel_usersshkey"), + getConfigValueOrNull(ourConfig, "tunnel_userpass"), + getConfigValueOrNull(ourConfig, "tunnel_db_remote_host"), + getConfigValueOrNull(ourConfig, "tunnel_db_remote_port"), + getConfigValueOrNull(ourConfig, "tunnel_localport") + ); + java.security.Security.addProvider( + new org.bouncycastle.jce.provider.BouncyCastleProvider() ); - LOGGER.error("Got SSH Tunnel config " + sshconfig); // TODO return sshconfig; } + private String getConfigValueOrNull(JsonNode config, String key) { + return config != null && config.has(key) ? config.get(key).asText() : null; + } + public abstract JsonNode toJdbcConfig(JsonNode config); @Override From 9fc7d4e7a005f2e9faf2b0313e15d11b45a6c4b5 Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Tue, 10 Aug 2021 13:23:10 -0500 Subject: [PATCH 05/14] Undid log format experiment --- airbyte-commons/src/main/resources/log4j2.xml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/airbyte-commons/src/main/resources/log4j2.xml b/airbyte-commons/src/main/resources/log4j2.xml index 45bd61b2ca89..84456bcbaf37 100644 --- a/airbyte-commons/src/main/resources/log4j2.xml +++ b/airbyte-commons/src/main/resources/log4j2.xml @@ -1,8 +1,8 @@ - %d{yyyy-MM-dd HH:mm:ss}{GMT+0} %highlight{%p} %C{1.}(%M):%L - %X - %replace{%m}{[\r\n]}{ | }%n - %d{yyyy-MM-dd HH:mm:ss}{GMT+0} %p (%X{job_root}) %C{1}(%M):%L - %replace{%m}{[\r\n]}{ | } %n + %d{yyyy-MM-dd HH:mm:ss}{GMT+0} %highlight{%p} %C{1.}(%M):%L - %X - %m%n + %d{yyyy-MM-dd HH:mm:ss}{GMT+0} %p (%X{job_root}) %C{1}(%M):%L - %m%n $${env:LOG_LEVEL:-INFO} @@ -56,7 +56,7 @@ s3ServiceEndpoint="${s3-minio-endpoint}" s3PathStyleAccess="${s3-path-style-access}" gcpStorageBucket="${gcp-storage-bucket}" gcpStorageBlobNamePrefix="job-logging${ctx:job_log_path}"> - %d{yyyy-MM-dd HH:mm:ss} %-5p %replace{%m}{[\r\n]}{ | } %n + %d{yyyy-MM-dd HH:mm:ss} %-5p %m%n @@ -77,7 +77,7 @@ filePattern="${ctx:workspace_app_root}/logs.%i.log.gz" ignoreExceptions="false"> - %d{yyyy-MM-dd HH:mm:ss} %-5p %replace{%m}{[\r\n]}{ | } %n + %d{yyyy-MM-dd HH:mm:ss} %-5p %m%n @@ -102,7 +102,7 @@ s3ServiceEndpoint="${s3-minio-endpoint}" s3PathStyleAccess="${s3-path-style-access}" gcpStorageBucket="${gcp-storage-bucket}" gcpStorageBlobNamePrefix="app-logging${ctx:workspace_app_root}"> - %d{yyyy-MM-dd HH:mm:ss} %-5p %replace{%m}{[\r\n]}{ | } %n + %d{yyyy-MM-dd HH:mm:ss} %-5p %m%n From 5c20347a22141d614a4a82171dcc731b12da5f17 Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Tue, 10 Aug 2021 15:41:19 -0500 Subject: [PATCH 06/14] Cleanup --- .../airbyte/integrations/base/SSHTunnel.java | 42 +++---------------- .../jdbc/AbstractJdbcDestination.java | 3 -- 2 files changed, 5 insertions(+), 40 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java index 47b616d5e4cd..6c7c34b5c2a4 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java @@ -119,41 +119,17 @@ public String getTunnelDatabasePort() { * @throws NoSuchAlgorithmException * @throws URISyntaxException */ - protected KeyPair xgetPrivateKeyPair() throws InvalidKeySpecException, NoSuchAlgorithmException { + protected KeyPair getPrivateKeyPair() throws InvalidKeySpecException, NoSuchAlgorithmException { KeyFactory kf = KeyFactory.getInstance("RSA"); - // TODO: bouncycastle has a pem reader that can do this step for us. String privateKeyContent = getSSHKey() .replaceAll("\\n", "") .replace("-----BEGIN PRIVATE KEY-----", "") .replace("-----END PRIVATE KEY-----", ""); - PrivateKey privKey = kf.generatePrivate(new PKCS8EncodedKeySpec(Base64.getDecoder().decode(privateKeyContent))); + RSAPrivateKey privKey = (RSAPrivateKey) kf.generatePrivate(new PKCS8EncodedKeySpec(Base64.getDecoder().decode(privateKeyContent))); RSAPublicKey pubKey = (RSAPublicKey) kf.generatePublic(kf.getKeySpec(privKey, X509EncodedKeySpec.class)); return new KeyPair(pubKey, privKey); } - public RSAPublicKey readX509PublicKey() throws InvalidKeySpecException, IOException, NoSuchAlgorithmException { - File file = new File("/Users/jennybrown/dev/airbyte/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/secrets/dbtunnel-bastion-airbyte_rsa.pub.pem"); - if (! file.canRead()) { - throw new RuntimeException("Cannot read file!"); - }; - try (FileReader keyReader = new FileReader(file)) { - PEMParser pemParser = new PEMParser(keyReader); - JcaPEMKeyConverter converter = new JcaPEMKeyConverter(); - SubjectPublicKeyInfo publicKeyInfo = SubjectPublicKeyInfo.getInstance(pemParser.readObject()); - return (RSAPublicKey) converter.getPublicKey(publicKeyInfo); - } - } - - public KeyPair getPrivateKeyPair() throws IOException, NoSuchAlgorithmException, InvalidKeySpecException { - FileReader fr = new FileReader(new File( - "/Users/jennybrown/dev/airbyte/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/secrets/dbtunnel-bastion-airbyte_pkcs8.pem")); - PEMParser pemParser = new PEMParser(fr); - JcaPEMKeyConverter converter = new JcaPEMKeyConverter(); - PrivateKeyInfo privateKeyInfo = PrivateKeyInfo.getInstance(pemParser.readObject()); - RSAPrivateKey privKey = (RSAPrivateKey) converter.getPrivateKey(privateKeyInfo); - RSAPublicKey pubKey = readX509PublicKey(); - return new KeyPair(pubKey, privKey); - } /** * Generates a new ssh client and returns it, with forwarding set to accept all types; use this before opening a tunnel. @@ -161,6 +137,9 @@ public KeyPair getPrivateKeyPair() throws IOException, NoSuchAlgorithmException, * @return */ public SshClient createClient() { + java.security.Security.addProvider( + new org.bouncycastle.jce.provider.BouncyCastleProvider() + ); SshClient client = SshClient.setUpDefaultClient(); client.setForwardingFilter(AcceptAllForwardingFilter.INSTANCE); return client; @@ -229,15 +208,4 @@ public String toString() { '}'; } - public static void main(String[] args) throws Throwable { - SSHTunnel tunnel = new SSHTunnel( - "SSH_KEY_AUTH", "3.18.93.32", "22", "airbyte", "", "", - "tunnel-dev.cevykyaz98rn.us-east-2.rds.amazonaws.com", "5432", - "5000"); - SshClient client; - client = tunnel.createClient(); - ClientSession session = tunnel.openTunnel(client); - tunnel.closeTunnel(client, session); - } - } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java index 9cb90d0be03c..ca2b005e2ac3 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java @@ -147,9 +147,6 @@ protected SSHTunnel getSSHTunnelConfig(JsonNode config) { getConfigValueOrNull(ourConfig, "tunnel_db_remote_port"), getConfigValueOrNull(ourConfig, "tunnel_localport") ); - java.security.Security.addProvider( - new org.bouncycastle.jce.provider.BouncyCastleProvider() - ); return sshconfig; } From f3940bc30856eeaf0fcbf2f926781ab0bd335e7b Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Tue, 10 Aug 2021 17:30:15 -0500 Subject: [PATCH 07/14] Got ssh key format conversion working --- .../airbyte/integrations/base/SSHTunnel.java | 28 +++++++++++-------- .../jdbc/AbstractJdbcDestination.java | 2 -- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java index 6c7c34b5c2a4..2fa669f42f25 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java @@ -22,6 +22,7 @@ import org.apache.sshd.client.session.ClientSession; import org.apache.sshd.common.util.net.SshdSocketAddress; import org.apache.sshd.server.forward.AcceptAllForwardingFilter; +import org.bouncycastle.openssl.PEMKeyPair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,12 +91,11 @@ public String getUser() { return user; } - // TODO: Determine if we can lock down the access on credentials a bit tighter - String getSSHKey() { + private String getSSHKey() { return sshkey; } - public String getPassword() { + private String getPassword() { return password; } @@ -112,25 +112,29 @@ public String getTunnelDatabasePort() { } /** - * From the pem format private key string, parse the private key, discover the public key, and return the pair for auth use. + * From the RSA format private key string, parse the private key, + * discover the public key, and return the pair for auth use. * * @return - * @throws InvalidKeySpecException * @throws NoSuchAlgorithmException * @throws URISyntaxException - */ - protected KeyPair getPrivateKeyPair() throws InvalidKeySpecException, NoSuchAlgorithmException { + */ + protected KeyPair getPrivateKeyPair() throws NoSuchAlgorithmException, IOException { KeyFactory kf = KeyFactory.getInstance("RSA"); String privateKeyContent = getSSHKey() .replaceAll("\\n", "") - .replace("-----BEGIN PRIVATE KEY-----", "") - .replace("-----END PRIVATE KEY-----", ""); - RSAPrivateKey privKey = (RSAPrivateKey) kf.generatePrivate(new PKCS8EncodedKeySpec(Base64.getDecoder().decode(privateKeyContent))); - RSAPublicKey pubKey = (RSAPublicKey) kf.generatePublic(kf.getKeySpec(privKey, X509EncodedKeySpec.class)); + .replace("-----BEGIN RSA PRIVATE KEY-----", "") + .replace("-----END RSA PRIVATE KEY-----", ""); + PEMParser pemParser = new PEMParser(new StringReader(getSSHKey())); + JcaPEMKeyConverter converter = new JcaPEMKeyConverter(); + PEMKeyPair keypair = (PEMKeyPair) pemParser.readObject(); + PrivateKeyInfo privKeyInfo = keypair.getPrivateKeyInfo(); + SubjectPublicKeyInfo pubKeyInfo = keypair.getPublicKeyInfo(); + RSAPrivateKey privKey = (RSAPrivateKey) converter.getPrivateKey(privKeyInfo); + RSAPublicKey pubKey = (RSAPublicKey) converter.getPublicKey(SubjectPublicKeyInfo.getInstance(pubKeyInfo)); return new KeyPair(pubKey, privKey); } - /** * Generates a new ssh client and returns it, with forwarding set to accept all types; use this before opening a tunnel. * diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java index ca2b005e2ac3..10fd8ba9ba25 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java @@ -81,9 +81,7 @@ public AirbyteConnectionStatus check(JsonNode config) { tunnelConfig = getSSHTunnelConfig(config); if (tunnelConfig.shouldTunnel()) { sshclient = tunnelConfig.createClient(); - LOGGER.error("JENNY TESTING - Client created."); tunnelSession = tunnelConfig.openTunnel(sshclient); - LOGGER.error("JENNY TESTING - Tunnel opened."); } String outputSchema = namingResolver.getIdentifier(config.get("schema").asText()); attemptSQLCreateAndDropTableOperations(outputSchema, database, namingResolver, sqlOperations); From 390a32f2b2a335e006a976837a16e08078c5df8c Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Tue, 10 Aug 2021 17:34:03 -0500 Subject: [PATCH 08/14] Got ssh key format conversion working --- .../java/io/airbyte/integrations/base/SSHTunnel.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java index 2fa669f42f25..da3149bd0989 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java @@ -116,15 +116,9 @@ public String getTunnelDatabasePort() { * discover the public key, and return the pair for auth use. * * @return - * @throws NoSuchAlgorithmException - * @throws URISyntaxException + * @throws IOException */ - protected KeyPair getPrivateKeyPair() throws NoSuchAlgorithmException, IOException { - KeyFactory kf = KeyFactory.getInstance("RSA"); - String privateKeyContent = getSSHKey() - .replaceAll("\\n", "") - .replace("-----BEGIN RSA PRIVATE KEY-----", "") - .replace("-----END RSA PRIVATE KEY-----", ""); + protected KeyPair getPrivateKeyPair() throws IOException { PEMParser pemParser = new PEMParser(new StringReader(getSSHKey())); JcaPEMKeyConverter converter = new JcaPEMKeyConverter(); PEMKeyPair keypair = (PEMKeyPair) pemParser.readObject(); From 6815bd426edf442baa711c8004d96e5d3d528d77 Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Tue, 10 Aug 2021 17:38:37 -0500 Subject: [PATCH 09/14] Readability improvement --- .../java/io/airbyte/integrations/base/SSHTunnel.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java index da3149bd0989..ee30042e35cd 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java @@ -120,13 +120,12 @@ public String getTunnelDatabasePort() { */ protected KeyPair getPrivateKeyPair() throws IOException { PEMParser pemParser = new PEMParser(new StringReader(getSSHKey())); - JcaPEMKeyConverter converter = new JcaPEMKeyConverter(); PEMKeyPair keypair = (PEMKeyPair) pemParser.readObject(); - PrivateKeyInfo privKeyInfo = keypair.getPrivateKeyInfo(); - SubjectPublicKeyInfo pubKeyInfo = keypair.getPublicKeyInfo(); - RSAPrivateKey privKey = (RSAPrivateKey) converter.getPrivateKey(privKeyInfo); - RSAPublicKey pubKey = (RSAPublicKey) converter.getPublicKey(SubjectPublicKeyInfo.getInstance(pubKeyInfo)); - return new KeyPair(pubKey, privKey); + JcaPEMKeyConverter converter = new JcaPEMKeyConverter(); + return new KeyPair( + (RSAPublicKey) converter.getPublicKey(SubjectPublicKeyInfo.getInstance(keypair.getPublicKeyInfo())), + (RSAPrivateKey) converter.getPrivateKey(keypair.getPrivateKeyInfo()) + ); } /** From 9ce73fee54810fff8a66eae9369f9276e306b1a6 Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Tue, 10 Aug 2021 17:41:47 -0500 Subject: [PATCH 10/14] Comments --- .../src/main/java/io/airbyte/integrations/base/SSHTunnel.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java index ee30042e35cd..ab23e2810ff2 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java @@ -112,8 +112,8 @@ public String getTunnelDatabasePort() { } /** - * From the RSA format private key string, parse the private key, - * discover the public key, and return the pair for auth use. + * From the RSA format private key string, use bouncycastle to deserialize the key pair, + * reconstruct the keys from the key info, and return the key pair for use in authentication. * * @return * @throws IOException From e518a4efb5588961656f3c5a402552eebdb26567 Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Wed, 11 Aug 2021 10:29:19 -0500 Subject: [PATCH 11/14] Cleanup on bouncycastle jars --- airbyte-integrations/connectors/destination-jdbc/build.gradle | 2 -- 1 file changed, 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-jdbc/build.gradle b/airbyte-integrations/connectors/destination-jdbc/build.gradle index 781df026633d..5ce268da4cc1 100644 --- a/airbyte-integrations/connectors/destination-jdbc/build.gradle +++ b/airbyte-integrations/connectors/destination-jdbc/build.gradle @@ -12,8 +12,6 @@ dependencies { implementation 'com.google.cloud:google-cloud-storage:1.113.16' implementation 'com.google.auth:google-auth-library-oauth2-http:0.25.5' implementation group: 'org.apache.sshd', name: 'sshd-mina', version: '2.7.0' - implementation group: 'org.bouncycastle', name: 'bcprov-jdk14', version: '1.69' - implementation group: 'org.bouncycastle', name: 'bcpkix-jdk14', version: '1.69' implementation project(':airbyte-db') implementation project(':airbyte-integrations:bases:base-java') From c805d06eb5296c5257dfff3502ad7c09581f647a Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Wed, 11 Aug 2021 18:02:03 -0500 Subject: [PATCH 12/14] Fix permissions for test user to allow connect --- .../ssh_tunnel/module/sql/postgresql-02-user-create.sql | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airbyte-integrations/infrastructure/ssh_tunnel/module/sql/postgresql-02-user-create.sql b/airbyte-integrations/infrastructure/ssh_tunnel/module/sql/postgresql-02-user-create.sql index cbc7759aca95..54245f7d8952 100644 --- a/airbyte-integrations/infrastructure/ssh_tunnel/module/sql/postgresql-02-user-create.sql +++ b/airbyte-integrations/infrastructure/ssh_tunnel/module/sql/postgresql-02-user-create.sql @@ -2,4 +2,6 @@ CREATE USER testcaseuser WITH password 'ThisIsNotTheRealPassword.PleaseSetThisByHand'; +GRANT CONNECT ON DATABASE test TO testcaseuser; +GRANT USAGE ON SCHEMA public TO testcaseuser; GRANT integrationtest_rw TO testcaseuser; From 8c1cc301c705eb1588f6803435c4c58f2a38fded Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Wed, 11 Aug 2021 18:04:10 -0500 Subject: [PATCH 13/14] Made ssh tunnel work correctly. No more jar conflict. Correct local port forwarding. --- airbyte-integrations/bases/base-java/build.gradle | 7 +++++-- .../io/airbyte/integrations/base/SSHTunnel.java | 13 +++++++------ 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/airbyte-integrations/bases/base-java/build.gradle b/airbyte-integrations/bases/base-java/build.gradle index c2c4a1563e93..a429d6ddb61a 100644 --- a/airbyte-integrations/bases/base-java/build.gradle +++ b/airbyte-integrations/bases/base-java/build.gradle @@ -6,8 +6,11 @@ plugins { dependencies { implementation 'commons-cli:commons-cli:1.4' implementation group: 'org.apache.sshd', name: 'sshd-mina', version: '2.7.0' - implementation group: 'org.bouncycastle', name: 'bcprov-jdk14', version: '1.69' - implementation group: 'org.bouncycastle', name: 'bcpkix-jdk14', version: '1.69' + // bouncycastle is pinned to version-match the transitive dependency from kubernetes client-java + // because a version conflict causes "parameter object not a ECParameterSpec" on ssh tunnel initiation + implementation group: 'org.bouncycastle', name: 'bcprov-jdk15on', version: '1.66' + implementation group: 'org.bouncycastle', name: 'bcpkix-jdk15on', version: '1.66' + implementation group: 'org.bouncycastle', name: 'bctls-jdk15on', version: '1.66' implementation project(':airbyte-protocol:models') implementation project(":airbyte-json-validation") diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java index ab23e2810ff2..a13b4ff84b30 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java @@ -19,6 +19,7 @@ import java.security.spec.X509EncodedKeySpec; import java.util.Base64; import org.apache.sshd.client.SshClient; +import org.apache.sshd.client.keyverifier.AcceptAllServerKeyVerifier; import org.apache.sshd.client.session.ClientSession; import org.apache.sshd.common.util.net.SshdSocketAddress; import org.apache.sshd.server.forward.AcceptAllForwardingFilter; @@ -139,6 +140,7 @@ public SshClient createClient() { ); SshClient client = SshClient.setUpDefaultClient(); client.setForwardingFilter(AcceptAllForwardingFilter.INSTANCE); + client.setServerKeyVerifier(AcceptAllServerKeyVerifier.INSTANCE); return client; } @@ -157,7 +159,7 @@ private void validate() { * @throws NoSuchAlgorithmException * @throws URISyntaxException */ - public ClientSession openTunnel(SshClient client) throws IOException, InvalidKeySpecException, NoSuchAlgorithmException { + public ClientSession openTunnel(SshClient client) throws IOException { validate(); client.start(); ClientSession session = client.connect( @@ -175,11 +177,11 @@ public ClientSession openTunnel(SshClient client) throws IOException, InvalidKey session.addPasswordIdentity(getPassword()); } session.auth().verify(TIMEOUT_MILLIS); - session.startRemotePortForwarding( - new SshdSocketAddress(getRemoteDatabaseHost(), Integer.parseInt(getRemoteDatabasePort().trim())), - new SshdSocketAddress("localhost", Integer.parseInt(getTunnelDatabasePort().trim())) + SshdSocketAddress address = session.startLocalPortForwarding( + new SshdSocketAddress(SshdSocketAddress.LOCALHOST_ADDRESS.getHostName(), Integer.parseInt(getTunnelDatabasePort().trim())), + new SshdSocketAddress(getRemoteDatabaseHost().trim(), Integer.parseInt(getRemoteDatabasePort().trim())) ); - LOGGER.info("Established tunnelling session. Port forwarding started."); + LOGGER.info("Established tunneling session. Port forwarding started on " + address.toInetSocketAddress()); return session; } @@ -204,5 +206,4 @@ public String toString() { ", tunnelDatabasePort='" + tunnelDatabasePort + '\'' + '}'; } - } From e057bc301ba37ea86902fe54516ee54f70950c44 Mon Sep 17 00:00:00 2001 From: Jenny Brown Date: Mon, 16 Aug 2021 15:04:16 -0500 Subject: [PATCH 14/14] Fixed for ssh tunneling --- .../airbyte/integrations/base/SSHTunnel.java | 83 +++++++++++-------- .../module/sql/postgresql-02-user-create.sql | 8 +- 2 files changed, 54 insertions(+), 37 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java index a13b4ff84b30..f511eb2fb1ef 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SSHTunnel.java @@ -1,43 +1,52 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package io.airbyte.integrations.base; -import java.io.File; -import java.io.FileReader; import java.io.IOException; import java.io.StringReader; import java.net.URISyntaxException; -import java.security.KeyFactory; import java.security.KeyPair; import java.security.NoSuchAlgorithmException; -import java.security.PrivateKey; -import java.security.PublicKey; import java.security.interfaces.RSAPrivateKey; import java.security.interfaces.RSAPublicKey; import java.security.spec.InvalidKeySpecException; -import java.security.spec.PKCS8EncodedKeySpec; -import java.security.spec.RSAPrivateKeySpec; -import java.security.spec.RSAPublicKeySpec; -import java.security.spec.X509EncodedKeySpec; -import java.util.Base64; import org.apache.sshd.client.SshClient; import org.apache.sshd.client.keyverifier.AcceptAllServerKeyVerifier; import org.apache.sshd.client.session.ClientSession; import org.apache.sshd.common.util.net.SshdSocketAddress; import org.apache.sshd.server.forward.AcceptAllForwardingFilter; -import org.bouncycastle.openssl.PEMKeyPair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.bouncycastle.asn1.pkcs.PrivateKeyInfo; import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo; +import org.bouncycastle.openssl.PEMKeyPair; import org.bouncycastle.openssl.PEMParser; import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter; -import org.bouncycastle.util.io.pem.PemObject; -import org.bouncycastle.util.io.pem.PemReader; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Encapsulates the connection configuration for an ssh tunnel port forward through a proxy/bastion host plus the remote host and remote port to - * forward to a specified local port. + * Encapsulates the connection configuration for an ssh tunnel port forward through a proxy/bastion + * host plus the remote host and remote port to forward to a specified local port. */ public class SSHTunnel { @@ -54,8 +63,15 @@ public class SSHTunnel { private final String remoteDatabasePort; private final String tunnelDatabasePort; - public SSHTunnel(String method, String host, String tunnelSshPort, - String user, String sshkey, String password, String remoteDatabaseHost, String remoteDatabasePort, String tunnelDatabasePort) { + public SSHTunnel(String method, + String host, + String tunnelSshPort, + String user, + String sshkey, + String password, + String remoteDatabaseHost, + String remoteDatabasePort, + String tunnelDatabasePort) { if (method == null) { this.method = "NO_TUNNEL"; } else { @@ -75,7 +91,6 @@ public boolean shouldTunnel() { return method != null && !"NO_TUNNEL".equals(method); } - public String getMethod() { return method; } @@ -113,31 +128,30 @@ public String getTunnelDatabasePort() { } /** - * From the RSA format private key string, use bouncycastle to deserialize the key pair, - * reconstruct the keys from the key info, and return the key pair for use in authentication. + * From the RSA format private key string, use bouncycastle to deserialize the key pair, reconstruct + * the keys from the key info, and return the key pair for use in authentication. * * @return * @throws IOException - */ + */ protected KeyPair getPrivateKeyPair() throws IOException { PEMParser pemParser = new PEMParser(new StringReader(getSSHKey())); PEMKeyPair keypair = (PEMKeyPair) pemParser.readObject(); JcaPEMKeyConverter converter = new JcaPEMKeyConverter(); return new KeyPair( (RSAPublicKey) converter.getPublicKey(SubjectPublicKeyInfo.getInstance(keypair.getPublicKeyInfo())), - (RSAPrivateKey) converter.getPrivateKey(keypair.getPrivateKeyInfo()) - ); + (RSAPrivateKey) converter.getPrivateKey(keypair.getPrivateKeyInfo())); } /** - * Generates a new ssh client and returns it, with forwarding set to accept all types; use this before opening a tunnel. + * Generates a new ssh client and returns it, with forwarding set to accept all types; use this + * before opening a tunnel. * * @return */ public SshClient createClient() { java.security.Security.addProvider( - new org.bouncycastle.jce.provider.BouncyCastleProvider() - ); + new org.bouncycastle.jce.provider.BouncyCastleProvider()); SshClient client = SshClient.setUpDefaultClient(); client.setForwardingFilter(AcceptAllForwardingFilter.INSTANCE); client.setServerKeyVerifier(AcceptAllServerKeyVerifier.INSTANCE); @@ -166,8 +180,7 @@ public ClientSession openTunnel(SshClient client) throws IOException { getUser().trim(), getHost().trim(), Integer.parseInt( - getTunnelSshPort().trim() - )) + getTunnelSshPort().trim())) .verify(TIMEOUT_MILLIS) .getSession(); if (getMethod().equals("SSH_KEY_AUTH")) { @@ -179,8 +192,7 @@ public ClientSession openTunnel(SshClient client) throws IOException { session.auth().verify(TIMEOUT_MILLIS); SshdSocketAddress address = session.startLocalPortForwarding( new SshdSocketAddress(SshdSocketAddress.LOCALHOST_ADDRESS.getHostName(), Integer.parseInt(getTunnelDatabasePort().trim())), - new SshdSocketAddress(getRemoteDatabaseHost().trim(), Integer.parseInt(getRemoteDatabasePort().trim())) - ); + new SshdSocketAddress(getRemoteDatabaseHost().trim(), Integer.parseInt(getRemoteDatabasePort().trim()))); LOGGER.info("Established tunneling session. Port forwarding started on " + address.toInetSocketAddress()); return session; } @@ -206,4 +218,5 @@ public String toString() { ", tunnelDatabasePort='" + tunnelDatabasePort + '\'' + '}'; } + } diff --git a/airbyte-integrations/infrastructure/ssh_tunnel/module/sql/postgresql-02-user-create.sql b/airbyte-integrations/infrastructure/ssh_tunnel/module/sql/postgresql-02-user-create.sql index 54245f7d8952..6c3b11074440 100644 --- a/airbyte-integrations/infrastructure/ssh_tunnel/module/sql/postgresql-02-user-create.sql +++ b/airbyte-integrations/infrastructure/ssh_tunnel/module/sql/postgresql-02-user-create.sql @@ -2,6 +2,10 @@ CREATE USER testcaseuser WITH password 'ThisIsNotTheRealPassword.PleaseSetThisByHand'; -GRANT CONNECT ON DATABASE test TO testcaseuser; -GRANT USAGE ON SCHEMA public TO testcaseuser; +GRANT CONNECT ON +DATABASE test TO testcaseuser; + +GRANT USAGE ON +SCHEMA public TO testcaseuser; + GRANT integrationtest_rw TO testcaseuser;