Skip to content

Commit

Permalink
update to use new config schema
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens committed Sep 3, 2021
1 parent 47d3f33 commit d43b500
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,37 @@
* SOFTWARE.
*/

package io.airbyte.integrations.destination.jdbc;
package io.airbyte.integrations.base.ssh;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.SshTunnel;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.util.List;
import java.util.function.Consumer;

/**
* Decorates a Destination with an SSH Tunnel using the standard configuration that Airbyte uses for
* configuring SSH.
*/
public class SshWrappedJdbcDestination implements Destination {
public class SshWrappedDestination implements Destination {

private final Destination delegate;
private final List<String> hostKey;
private final List<String> portKey;

public SshWrappedJdbcDestination(final Destination delegate) {
public SshWrappedDestination(final Destination delegate,
final List<String> hostKey,
final List<String> portKey) {
this.delegate = delegate;
this.hostKey = hostKey;
this.portKey = portKey;
}

@Override
Expand All @@ -60,16 +66,16 @@ public ConnectorSpecification spec() throws Exception {

@Override
public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
return SshTunnel.sshWrap(config, () -> delegate.check(config));
return SshTunnel.sshWrap(config, hostKey, portKey, delegate::check);
}

@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector)
throws Exception {
final SshTunnel sshTunnel = SshTunnel.getInstance(config);
return AirbyteMessageConsumer.appendOnClose(delegate.getConsumer(config, catalog, outputRecordCollector), sshTunnel::close);
final SshTunnel tunnel = SshTunnel.getInstance(config, hostKey, portKey);
return AirbyteMessageConsumer.appendOnClose(delegate.getConsumer(tunnel.getConfigInTunnel(), catalog, outputRecordCollector), tunnel::close);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.ssh.SshWrappedDestination;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.integrations.destination.jdbc.SshWrappedJdbcDestination;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand All @@ -42,6 +42,8 @@ public class PostgresDestination extends AbstractJdbcDestination implements Dest
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresDestination.class);

public static final String DRIVER_CLASS = "org.postgresql.Driver";
public static final List<String> HOST_KEY = List.of("host");
public static final List<String> PORT_KEY = List.of("port");

public PostgresDestination() {
super(DRIVER_CLASS, new PostgresSQLNameTransformer(), new PostgresSqlOperations());
Expand Down Expand Up @@ -79,7 +81,7 @@ public JsonNode toJdbcConfig(final JsonNode config) {
}

public static void main(final String[] args) throws Exception {
final Destination destination = new SshWrappedJdbcDestination(new PostgresDestination());
final Destination destination = new SshWrappedDestination(new PostgresDestination(), HOST_KEY, PORT_KEY);
LOGGER.info("starting destination: {}", PostgresDestination.class);
new IntegrationRunner(destination).run(args);
LOGGER.info("completed destination: {}", PostgresDestination.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.SshTunnel;
import io.airbyte.integrations.base.ssh.SshTunnel;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import java.nio.file.Path;
Expand Down Expand Up @@ -126,14 +127,22 @@ protected List<String> resolveIdentifier(final String identifier) {

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws Exception {
final JsonNode config = getConfig();
return SshTunnel.sshWrap(config, () -> Databases.createPostgresDatabase(config.get("username").asText(), config.get("password").asText(),
String.format("jdbc:postgresql://%s:%s/%s", config.get("host").asText(), config.get("port").asText(), config.get("database").asText())).query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(Jsons::deserialize)
.collect(Collectors.toList())));
return SshTunnel.sshWrap(
config,
PostgresDestination.HOST_KEY,
PostgresDestination.PORT_KEY,
(CheckedFunction<JsonNode, List<JsonNode>, Exception>) mangledConfig -> Databases.createPostgresDatabase(
mangledConfig.get("username").asText(),
mangledConfig.get("password").asText(),
String.format("jdbc:postgresql://%s:%s/%s", mangledConfig.get("host").asText(), mangledConfig.get("port").asText(),
mangledConfig.get("database").asText()))
.query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(Jsons::deserialize)
.collect(Collectors.toList())));
}

@Override
Expand Down

0 comments on commit d43b500

Please sign in to comment.