Skip to content

Commit

Permalink
Merge branch 'master' into liren/publish-mssql
Browse files Browse the repository at this point in the history
  • Loading branch information
tuliren committed May 27, 2022
2 parents 67ba9a3 + e3cb566 commit 7651cbb
Show file tree
Hide file tree
Showing 178 changed files with 1,265 additions and 1,441 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.39.2-alpha
current_version = 0.39.4-alpha
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-[a-z]+)?
Expand Down
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


### SHARED ###
VERSION=0.39.2-alpha
VERSION=0.39.4-alpha

# When using the airbyte-db via default docker image
CONFIG_ROOT=/data
Expand Down
2 changes: 0 additions & 2 deletions .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ jobs:
aws-access-key-id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
github-token: ${{ needs.find_valid_pat.outputs.pat }}
# 80 gb disk
ec2-image-id: ami-0d648081937c75a73
publish-image:
timeout-minutes: 240
needs: start-publish-image-runner
Expand Down
2 changes: 2 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3844,6 +3844,8 @@ components:
$ref: "#/components/schemas/AttemptFailureType"
externalMessage:
type: string
internalMessage:
type: string
stacktrace:
type: string
retryable:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-bootloader/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ ARG JDK_VERSION=17.0.1
ARG JDK_IMAGE=openjdk:${JDK_VERSION}-slim
FROM ${JDK_IMAGE}

ARG VERSION=0.39.2-alpha
ARG VERSION=0.39.4-alpha

ENV APPLICATION airbyte-bootloader
ENV VERSION ${VERSION}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.factory.DatabaseCheckFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.factory.FlywayFactory;
import io.airbyte.db.instance.DatabaseConstants;
import io.airbyte.db.instance.DatabaseMigrator;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
import io.airbyte.db.instance.jobs.JobsDatabaseInstance;
import io.airbyte.db.instance.jobs.JobsDatabaseMigrator;
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
import io.airbyte.scheduler.persistence.JobPersistence;
Expand Down Expand Up @@ -69,6 +69,8 @@ public class BootloaderApp {
private JobPersistence jobPersistence;
private final Flyway configsFlyway;
private final Flyway jobsFlyway;
private final DSLContext configsDslContext;
private final DSLContext jobsDslContext;

/**
* This method is exposed for Airbyte Cloud consumption. This lets us override the seed loading
Expand All @@ -93,7 +95,9 @@ public BootloaderApp(final Configs configs,
this.postLoadExecution = postLoadExecution;
this.featureFlags = featureFlags;
this.secretMigrator = secretMigrator;
this.configsDslContext = configsDslContext;
this.configsFlyway = configsFlyway;
this.jobsDslContext = jobsDslContext;
this.jobsFlyway = jobsFlyway;

initPersistences(configsDslContext, jobsDslContext);
Expand All @@ -109,7 +113,9 @@ public BootloaderApp(final Configs configs,
this.configs = configs;
this.featureFlags = featureFlags;
this.secretMigrator = secretMigrator;
this.configsDslContext = configsDslContext;
this.configsFlyway = configsFlyway;
this.jobsDslContext = jobsDslContext;
this.jobsFlyway = jobsFlyway;

initPersistences(configsDslContext, jobsDslContext);
Expand All @@ -132,6 +138,14 @@ public BootloaderApp(final Configs configs,
}

public void load() throws Exception {
LOGGER.info("Initializing databases...");
DatabaseCheckFactory.createConfigsDatabaseInitializer(configsDslContext,
configs.getConfigsDatabaseInitializationTimeoutMs(), MoreResources.readResource(DatabaseConstants.CONFIGS_SCHEMA_PATH)).initialize();

DatabaseCheckFactory.createJobsDatabaseInitializer(jobsDslContext,
configs.getJobsDatabaseInitializationTimeoutMs(), MoreResources.readResource(DatabaseConstants.JOBS_SCHEMA_PATH)).initialize();
LOGGER.info("Databases initialized.");

LOGGER.info("Setting up config database and default workspace...");
final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase);
final AirbyteVersion currAirbyteVersion = configs.getAirbyteVersion();
Expand Down Expand Up @@ -164,7 +178,7 @@ public void load() throws Exception {
}

private static Database getConfigDatabase(final DSLContext dslContext) throws IOException {
return new ConfigsDatabaseInstance(dslContext).getAndInitialize();
return new Database(dslContext);
}

private static ConfigPersistence getConfigPersistence(final Database configDatabase) throws IOException {
Expand All @@ -177,7 +191,7 @@ private static ConfigPersistence getConfigPersistence(final Database configDatab
}

private static Database getJobDatabase(final DSLContext dslContext) throws IOException {
return new JobsDatabaseInstance(dslContext).getAndInitialize();
return new Database(dslContext);
}

private static JobPersistence getJobPersistence(final Database jobDatabase) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.factory.FlywayFactory;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
import io.airbyte.db.instance.jobs.JobsDatabaseInstance;
import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider;
import io.airbyte.db.instance.jobs.JobsDatabaseMigrator;
import io.airbyte.db.instance.jobs.JobsDatabaseTestProvider;
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
import java.util.Optional;
import java.util.UUID;
Expand Down Expand Up @@ -97,6 +96,8 @@ void testBootloaderAppBlankDb() throws Exception {
when(mockedConfigs.getDatabasePassword()).thenReturn(container.getPassword());
when(mockedConfigs.getAirbyteVersion()).thenReturn(new AirbyteVersion(version));
when(mockedConfigs.runDatabaseMigrationOnStartup()).thenReturn(true);
when(mockedConfigs.getConfigsDatabaseInitializationTimeoutMs()).thenReturn(60000L);
when(mockedConfigs.getJobsDatabaseInitializationTimeoutMs()).thenReturn(60000L);

val mockedFeatureFlags = mock(FeatureFlags.class);
when(mockedFeatureFlags.usesNewScheduler()).thenReturn(false);
Expand All @@ -116,15 +117,16 @@ void testBootloaderAppBlankDb() throws Exception {
val configsFlyway = createConfigsFlyway(configsDataSource);
val jobsFlyway = createJobsFlyway(jobsDataSource);

val configDatabase = new ConfigsDatabaseTestProvider(configsDslContext, configsFlyway).create(false);
val jobDatabase = new JobsDatabaseTestProvider(jobsDslContext, jobsFlyway).create(false);

val bootloader =
new BootloaderApp(mockedConfigs, mockedFeatureFlags, mockedSecretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway);
bootloader.load();

val jobDatabase = new JobsDatabaseInstance(jobsDslContext).getInitialized();
val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway);
assertEquals("0.35.62.001", jobsMigrator.getLatestMigration().getVersion().getVersion());

val configDatabase = new ConfigsDatabaseInstance(configsDslContext).getAndInitialize();
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
// this line should change with every new migration
// to show that you meant to make a new migration to the prod database
Expand All @@ -151,6 +153,8 @@ void testBootloaderAppRunSecretMigration() throws Exception {
when(mockedConfigs.getAirbyteVersion()).thenReturn(new AirbyteVersion(version));
when(mockedConfigs.runDatabaseMigrationOnStartup()).thenReturn(true);
when(mockedConfigs.getSecretPersistenceType()).thenReturn(TESTING_CONFIG_DB_TABLE);
when(mockedConfigs.getConfigsDatabaseInitializationTimeoutMs()).thenReturn(60000L);
when(mockedConfigs.getJobsDatabaseInitializationTimeoutMs()).thenReturn(60000L);

val mockedFeatureFlags = mock(FeatureFlags.class);
when(mockedFeatureFlags.usesNewScheduler()).thenReturn(false);
Expand All @@ -166,10 +170,11 @@ void testBootloaderAppRunSecretMigration() throws Exception {
val configsFlyway = createConfigsFlyway(configsDataSource);
val jobsFlyway = createJobsFlyway(jobsDataSource);

final Database configDatabase = new Database(configsDslContext);
final ConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase, jsonSecretsProcessor);
val configDatabase = new ConfigsDatabaseTestProvider(configsDslContext, configsFlyway).create(false);
val jobDatabase = new JobsDatabaseTestProvider(jobsDslContext, jobsFlyway).create(false);

val jobsPersistence = new DefaultJobPersistence(configDatabase);
val configPersistence = new DatabaseConfigPersistence(configDatabase, jsonSecretsProcessor);
val jobsPersistence = new DefaultJobPersistence(jobDatabase);

val spiedSecretMigrator =
spy(new SecretMigrator(configPersistence, jobsPersistence, SecretPersistence.getLongLived(configsDslContext, mockedConfigs)));
Expand Down Expand Up @@ -293,6 +298,8 @@ void testPostLoadExecutionExecutes() throws Exception {
when(mockedConfigs.getDatabasePassword()).thenReturn(container.getPassword());
when(mockedConfigs.getAirbyteVersion()).thenReturn(new AirbyteVersion(version));
when(mockedConfigs.runDatabaseMigrationOnStartup()).thenReturn(true);
when(mockedConfigs.getConfigsDatabaseInitializationTimeoutMs()).thenReturn(60000L);
when(mockedConfigs.getJobsDatabaseInitializationTimeoutMs()).thenReturn(60000L);

val mockedFeatureFlags = mock(FeatureFlags.class);
when(mockedFeatureFlags.usesNewScheduler()).thenReturn(false);
Expand All @@ -305,6 +312,9 @@ void testPostLoadExecutionExecutes() throws Exception {
val configsFlyway = createConfigsFlyway(configsDataSource);
val jobsFlyway = createJobsFlyway(jobsDataSource);

new ConfigsDatabaseTestProvider(configsDslContext, configsFlyway).create(false);
new JobsDatabaseTestProvider(jobsDslContext, jobsFlyway).create(false);

new BootloaderApp(mockedConfigs, () -> testTriggered.set(true), mockedFeatureFlags, mockedSecretMigrator, configsDslContext, jobsDslContext,
configsFlyway, jobsFlyway)
.load();
Expand Down
11 changes: 11 additions & 0 deletions airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,17 @@ public static byte[] toBytes(final JsonNode jsonNode) {
return serialize(jsonNode).getBytes(Charsets.UTF_8);
}

/**
* Use string length as an estimation for byte size, because all ASCII characters are one byte long
* in UTF-8, and ASCII characters cover most of the use cases. To be more precise, we can convert
* the string to byte[] and use the length of the byte[]. However, this conversion is expensive in
* memory consumption. Given that the byte size of the serialized JSON is already an estimation of
* the actual size of the JSON object, using a cheap operation seems an acceptable compromise.
*/
public static int getEstimatedByteSize(final JsonNode jsonNode) {
return serialize(jsonNode).length();
}

public static Set<String> keys(final JsonNode jsonNode) {
if (jsonNode.isObject()) {
return Jsons.object(jsonNode, new TypeReference<Map<String, Object>>() {}).keySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,12 @@ void testGetStringOrNull() {
assertNull(Jsons.getStringOrNull(json, "xyz"));
}

@Test
void testGetEstimatedByteSize() {
final JsonNode json = Jsons.deserialize("{\"string_key\":\"abc\",\"array_key\":[\"item1\", \"item2\"]}");
assertEquals(Jsons.toBytes(json).length, Jsons.getEstimatedByteSize(json));
}

private static class ToClass {

@JsonProperty("str")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@
- name: Faker
sourceDefinitionId: dfd88b22-b603-4c3d-aad7-3701784586b1
dockerRepository: airbyte/source-faker
dockerImageTag: 0.1.0
dockerImageTag: 0.1.1
documentationUrl: https://docs.airbyte.com/integrations/source-faker
sourceType: api
releaseStage: alpha
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2133,7 +2133,7 @@
oauthFlowInitParameters: []
oauthFlowOutputParameters:
- - "access_token"
- dockerImage: "airbyte/source-faker:0.1.0"
- dockerImage: "airbyte/source-faker:0.1.1"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/faker"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1516,8 +1516,13 @@ public Map<String, Stream<JsonNode>> dumpConfigs() throws IOException {
ConfigSchema.STANDARD_SOURCE_DEFINITION,
sourceDefinitionId.toString(),
StandardSourceDefinition.class);

final JsonNode connectionSpecs = standardSourceDefinition.getSpec().getConnectionSpecification();
return jsonSecretsProcessor.prepareSecretsForOutput(Jsons.jsonNode(configWithMetadata.getConfig()), connectionSpecs);
final JsonNode sanitizedConfig =
jsonSecretsProcessor.prepareSecretsForOutput(configWithMetadata.getConfig().getConfiguration(), connectionSpecs);

configWithMetadata.getConfig().setConfiguration(sanitizedConfig);
return Jsons.jsonNode(configWithMetadata.getConfig());
} catch (final ConfigNotFoundException | JsonValidationException | IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -1534,8 +1539,12 @@ public Map<String, Stream<JsonNode>> dumpConfigs() throws IOException {
ConfigSchema.STANDARD_DESTINATION_DEFINITION,
destinationDefinition.toString(),
StandardDestinationDefinition.class);
final JsonNode connectionSpec = standardDestinationDefinition.getSpec().getConnectionSpecification();
return jsonSecretsProcessor.prepareSecretsForOutput(Jsons.jsonNode(configWithMetadata.getConfig()), connectionSpec);
final JsonNode connectionSpecs = standardDestinationDefinition.getSpec().getConnectionSpecification();
final JsonNode sanitizedConfig =
jsonSecretsProcessor.prepareSecretsForOutput(configWithMetadata.getConfig().getConfiguration(), connectionSpecs);

configWithMetadata.getConfig().setConfiguration(sanitizedConfig);
return Jsons.jsonNode(configWithMetadata.getConfig());
} catch (final ConfigNotFoundException | JsonValidationException | IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import java.util.Optional;
import java.util.Set;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;

@Builder
@SuppressWarnings({"PMD.CognitiveComplexity", "PMD.CyclomaticComplexity"})
@Slf4j
public class JsonSecretsProcessor {

@Builder.Default
Expand Down Expand Up @@ -55,6 +57,7 @@ public JsonNode prepareSecretsForOutput(final JsonNode obj, final JsonNode schem
// todo (cgardens) this is not safe. should throw.
// if schema is an object and has a properties field
if (!isValidJsonSchema(schema)) {
log.error("The schema is not valid, the secret can't be hidden");
return obj;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import io.airbyte.config.Configs;
import io.airbyte.db.Database;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import java.io.IOException;
import java.util.Optional;
import org.jooq.DSLContext;
Expand All @@ -26,7 +25,7 @@ public interface SecretPersistence extends ReadOnlySecretPersistence {
static Optional<SecretPersistence> getLongLived(final DSLContext dslContext, final Configs configs) throws IOException {
switch (configs.getSecretPersistenceType()) {
case TESTING_CONFIG_DB_TABLE -> {
final Database configDatabase = new ConfigsDatabaseInstance(dslContext).getAndInitialize();
final Database configDatabase = new Database(dslContext);
return Optional.of(new LocalTestingSecretPersistence(configDatabase));
}
case GOOGLE_SECRET_MANAGER -> {
Expand All @@ -51,7 +50,7 @@ static SecretsHydrator getSecretsHydrator(final DSLContext dslContext, final Con
static Optional<SecretPersistence> getEphemeral(final DSLContext dslContext, final Configs configs) throws IOException {
switch (configs.getSecretPersistenceType()) {
case TESTING_CONFIG_DB_TABLE -> {
final Database configDatabase = new ConfigsDatabaseInstance(dslContext).getAndInitialize();
final Database configDatabase = new Database(dslContext);
return Optional.of(new LocalTestingSecretPersistence(configDatabase));
}
case GOOGLE_SECRET_MANAGER -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.FlywayFactory;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import io.airbyte.db.init.DatabaseInitializationException;
import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider;
import io.airbyte.db.instance.development.DevDatabaseMigrator;
import io.airbyte.db.instance.development.MigrationDevHelper;
import io.airbyte.protocol.models.AirbyteCatalog;
Expand Down Expand Up @@ -84,12 +85,12 @@ public static void dbSetup() {
}

@BeforeEach
void setup() throws IOException, JsonValidationException, SQLException {
void setup() throws IOException, JsonValidationException, SQLException, DatabaseInitializationException {
dataSource = DatabaseConnectionHelper.createDataSource(container);
dslContext = DSLContextFactory.create(dataSource, SQLDialect.POSTGRES);
flyway = FlywayFactory.create(dataSource, DatabaseConfigPersistenceLoadDataTest.class.getName(), ConfigsDatabaseMigrator.DB_IDENTIFIER,
ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION);
database = new ConfigsDatabaseInstance(dslContext).getAndInitialize();
database = new ConfigsDatabaseTestProvider(dslContext, flyway).create(false);
jsonSecretsProcessor = mock(JsonSecretsProcessor.class);
configPersistence = spy(new DatabaseConfigPersistence(database, jsonSecretsProcessor));
configRepository = spy(new ConfigRepository(configPersistence, database));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.factory.FlywayFactory;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider;
import io.airbyte.db.instance.development.DevDatabaseMigrator;
import io.airbyte.db.instance.development.MigrationDevHelper;
import io.airbyte.test.utils.DatabaseConnectionHelper;
Expand All @@ -47,10 +47,9 @@ class DatabaseConfigPersistenceE2EReadWriteTest extends BaseDatabaseConfigPersis
void setup() throws Exception {
dataSource = DatabaseConnectionHelper.createDataSource(container);
dslContext = DSLContextFactory.create(dataSource, SQLDialect.POSTGRES);
database = new ConfigsDatabaseInstance(dslContext).getAndInitialize();
flyway = FlywayFactory.create(dataSource, DatabaseConfigPersistenceLoadDataTest.class.getName(), ConfigsDatabaseMigrator.DB_IDENTIFIER,
ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION);

database = new ConfigsDatabaseTestProvider(dslContext, flyway).create(false);
configPersistence = spy(new DatabaseConfigPersistence(database, jsonSecretsProcessor));
final ConfigsDatabaseMigrator configsDatabaseMigrator =
new ConfigsDatabaseMigrator(database, flyway);
Expand Down
Loading

0 comments on commit 7651cbb

Please sign in to comment.