diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshPostgresDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshPostgresDestinationAcceptanceTest.java index d45feb73c57f..e192125c2d4e 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshPostgresDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshPostgresDestinationAcceptanceTest.java @@ -29,6 +29,7 @@ import io.airbyte.commons.functional.CheckedFunction; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.db.Database; import io.airbyte.db.Databases; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.base.ssh.SshTunnel; @@ -38,6 +39,7 @@ import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; +import org.apache.commons.lang3.RandomStringUtils; import org.jooq.JSONFormat; import org.jooq.JSONFormat.RecordFormat; @@ -53,6 +55,8 @@ public abstract class SshPostgresDestinationAcceptanceTest extends DestinationAc private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer(); + private String schemaName; + public abstract Path getConfigFilePath(); @Override @@ -62,6 +66,13 @@ protected String getImageName() { @Override protected JsonNode getConfig() { + final JsonNode config = getConfigFromSecretsFile(); + // do everything in a randomly generated schema so that we can wipe it out at the end. + ((ObjectNode) config).put("schema", schemaName); + return config; + } + + private JsonNode getConfigFromSecretsFile() { return Jsons.deserialize(IOs.readFile(Path.of("secrets/ssh-pwd-config.json"))); } @@ -125,17 +136,21 @@ protected List resolveIdentifier(final String identifier) { return result; } + private static Database getDatabaseFromConfig(final JsonNode config) { + return Databases.createPostgresDatabase( + config.get("username").asText(), + config.get("password").asText(), + String.format("jdbc:postgresql://%s:%s/%s", config.get("host").asText(), config.get("port").asText(), + config.get("database").asText())); + } + private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws Exception { final JsonNode config = getConfig(); return SshTunnel.sshWrap( config, PostgresDestination.HOST_KEY, PostgresDestination.PORT_KEY, - (CheckedFunction, Exception>) mangledConfig -> Databases.createPostgresDatabase( - mangledConfig.get("username").asText(), - mangledConfig.get("password").asText(), - String.format("jdbc:postgresql://%s:%s/%s", mangledConfig.get("host").asText(), mangledConfig.get("port").asText(), - mangledConfig.get("database").asText())) + (CheckedFunction, Exception>) mangledConfig -> getDatabaseFromConfig(mangledConfig) .query( ctx -> ctx .fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) @@ -146,9 +161,28 @@ private List retrieveRecordsFromTable(final String tableName, final St } @Override - protected void setup(final TestDestinationEnv testEnv) {} + protected void setup(final TestDestinationEnv testEnv) throws Exception { + // do everything in a randomly generated schema so that we can wipe it out at the end. + schemaName = RandomStringUtils.randomAlphanumeric(5); + SshTunnel.sshWrap( + getConfig(), + PostgresDestination.HOST_KEY, + PostgresDestination.PORT_KEY, + mangledConfig -> { + getDatabaseFromConfig(mangledConfig).query(ctx -> ctx.fetch(String.format("CREATE SCHEMA %s;", schemaName))); + }); + } @Override - protected void tearDown(final TestDestinationEnv testEnv) {} + protected void tearDown(final TestDestinationEnv testEnv) throws Exception { + // blow away the test schema at the end. + SshTunnel.sshWrap( + getConfig(), + PostgresDestination.HOST_KEY, + PostgresDestination.PORT_KEY, + mangledConfig -> { + getDatabaseFromConfig(mangledConfig).query(ctx -> ctx.fetch(String.format("DROP SCHEMA %s CASCADE;", schemaName))); + }); + } }