diff --git a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java index 55e9deda3409..799dc5abb686 100644 --- a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java +++ b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java @@ -42,6 +42,7 @@ import io.airbyte.api.client.model.generated.SourceCreate; import io.airbyte.api.client.model.generated.SourceDefinitionCreate; import io.airbyte.api.client.model.generated.SourceDefinitionRead; +import io.airbyte.api.client.model.generated.SourceDefinitionUpdate; import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody; import io.airbyte.api.client.model.generated.SourceIdRequestBody; import io.airbyte.api.client.model.generated.SourceRead; @@ -113,6 +114,8 @@ public class AirbyteAcceptanceTestHarness { private static final String SOURCE_E2E_TEST_CONNECTOR_VERSION = "0.1.1"; private static final String DESTINATION_E2E_TEST_CONNECTOR_VERSION = "0.1.1"; + public static final String POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION = "0.4.26"; + private static final String OUTPUT_NAMESPACE_PREFIX = "output_namespace_"; private static final String OUTPUT_NAMESPACE = OUTPUT_NAMESPACE_PREFIX + "${SOURCE_NAMESPACE}"; private static final String OUTPUT_STREAM_PREFIX = "output_table_"; @@ -284,7 +287,8 @@ private void assignEnvVars() { isGke = System.getenv().containsKey("IS_GKE"); isMac = System.getProperty("os.name").startsWith("Mac"); useExternalDeployment = - System.getenv("USE_EXTERNAL_DEPLOYMENT") != null && System.getenv("USE_EXTERNAL_DEPLOYMENT").equalsIgnoreCase("true"); + System.getenv("USE_EXTERNAL_DEPLOYMENT") != null && + System.getenv("USE_EXTERNAL_DEPLOYMENT").equalsIgnoreCase("true"); } private WorkflowClient getWorkflowClient() { @@ -636,6 +640,11 @@ public UUID getPostgresSourceDefinitionId() throws ApiException { .getSourceDefinitionId(); } + public void updateSourceDefinitionVersion(final UUID sourceDefinitionId, final String dockerImageTag) throws ApiException { + apiClient.getSourceDefinitionApi().updateSourceDefinition(new SourceDefinitionUpdate() + .sourceDefinitionId(sourceDefinitionId).dockerImageTag(dockerImageTag)); + } + private void clearSourceDbData() throws SQLException { final Database database = getSourceDatabase(); final Set pairs = listAllTables(database); diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java index f6289d41b98e..d30076993121 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java @@ -57,6 +57,10 @@ public class BasicAcceptanceTests { private static final Logger LOGGER = LoggerFactory.getLogger(BasicAcceptanceTests.class); + private static final Boolean WITH_SCD_TABLE = true; + + private static final Boolean WITHOUT_SCD_TABLE = false; + private static AirbyteAcceptanceTestHarness testHarness; private static AirbyteApiClient apiClient; private static UUID workspaceId; @@ -324,7 +328,7 @@ public void testScheduledSync() throws Exception { sleep(Duration.ofSeconds(30).toMillis()); } - testHarness.assertSourceAndDestinationDbInSync(false); + testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); } @Test @@ -369,7 +373,7 @@ public void testMultipleSchemasSameTablesSync() throws Exception { final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob()); - testHarness.assertSourceAndDestinationDbInSync(false); + testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); } @Test @@ -395,7 +399,7 @@ public void testIncrementalDedupeSync() throws Exception { .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob()); - testHarness.assertSourceAndDestinationDbInSync(true); + testHarness.assertSourceAndDestinationDbInSync(WITH_SCD_TABLE); // add new records and run again. final Database source = testHarness.getSourceDatabase(); @@ -448,7 +452,7 @@ public void testIncrementalSync() throws Exception { waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob()); LOGGER.info("state after sync 1: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); - testHarness.assertSourceAndDestinationDbInSync(false); + testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); // add new records and run again. final Database source = testHarness.getSourceDatabase(); @@ -489,7 +493,7 @@ public void testIncrementalSync() throws Exception { waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob()); LOGGER.info("state after sync 3: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); - testHarness.assertSourceAndDestinationDbInSync(false); + testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); } @@ -566,12 +570,10 @@ public void testUpdateConnectionWhenWorkflowUnreachable() throws Exception { final UUID destinationId = testHarness.createDestination().getDestinationId(); final UUID operationId = testHarness.createOperation().getOperationId(); final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); - final SyncMode syncMode = SyncMode.INCREMENTAL; - final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND_DEDUP; catalog.getStreams().forEach(s -> s.getConfig() - .syncMode(syncMode) + .syncMode(SyncMode.INCREMENTAL) .cursorField(List.of(COLUMN_ID)) - .destinationSyncMode(destinationSyncMode) + .destinationSyncMode(DestinationSyncMode.APPEND_DEDUP) .primaryKey(List.of(List.of(COLUMN_NAME)))); LOGGER.info("Testing connection update when temporal is in a terminal state"); @@ -611,12 +613,10 @@ public void testManualSyncRepairsWorkflowWhenWorkflowUnreachable() throws Except final UUID destinationId = testHarness.createDestination().getDestinationId(); final UUID operationId = testHarness.createOperation().getOperationId(); final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); - final SyncMode syncMode = SyncMode.INCREMENTAL; - final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND_DEDUP; catalog.getStreams().forEach(s -> s.getConfig() - .syncMode(syncMode) + .syncMode(SyncMode.INCREMENTAL) .cursorField(List.of(COLUMN_ID)) - .destinationSyncMode(destinationSyncMode) + .destinationSyncMode(DestinationSyncMode.APPEND_DEDUP) .primaryKey(List.of(List.of(COLUMN_NAME)))); LOGGER.info("Testing manual sync when temporal is in a terminal state"); @@ -653,12 +653,10 @@ public void testResetConnectionRepairsWorkflowWhenWorkflowUnreachable() throws E final UUID destinationId = testHarness.createDestination().getDestinationId(); final UUID operationId = testHarness.createOperation().getOperationId(); final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); - final SyncMode syncMode = SyncMode.INCREMENTAL; - final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND_DEDUP; catalog.getStreams().forEach(s -> s.getConfig() - .syncMode(syncMode) + .syncMode(SyncMode.INCREMENTAL) .cursorField(List.of(COLUMN_ID)) - .destinationSyncMode(destinationSyncMode) + .destinationSyncMode(DestinationSyncMode.APPEND_DEDUP) .primaryKey(List.of(List.of(COLUMN_NAME)))); LOGGER.info("Testing reset connection when temporal is in a terminal state"); @@ -712,7 +710,150 @@ public void testResetCancelsRunningSync() throws Exception { assertEquals(JobStatus.CANCELLED, connectionSyncReadAfterReset.getStatus()); } - // This test is disabled because it takes a couple minutes to run, as it is testing timeouts. + @Test + public void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws Exception { + LOGGER.info("Starting {}", testInfo.getDisplayName()); + final String connectionName = "test-connection"; + final SourceRead source = testHarness.createPostgresSource(); + final UUID sourceId = source.getSourceId(); + final UUID sourceDefinitionId = source.getSourceDefinitionId(); + final UUID destinationId = testHarness.createDestination().getDestinationId(); + final UUID operationId = testHarness.createOperation().getOperationId(); + final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); + + // Fetch the current/most recent source definition version + final SourceDefinitionRead sourceDefinitionRead = + apiClient.getSourceDefinitionApi().getSourceDefinition(new SourceDefinitionIdRequestBody().sourceDefinitionId(sourceDefinitionId)); + final String currentSourceDefintionVersion = sourceDefinitionRead.getDockerImageTag(); + + // Set the source to a version that does not support per-stream state + LOGGER.info("Setting source connector to pre-per-stream state version {}...", + AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION); + testHarness.updateSourceDefinitionVersion(sourceDefinitionId, AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION); + + catalog.getStreams().forEach(s -> s.getConfig() + .syncMode(SyncMode.INCREMENTAL) + .cursorField(List.of(COLUMN_ID)) + .destinationSyncMode(DestinationSyncMode.APPEND)); + final UUID connectionId = + testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + LOGGER.info("Beginning {} sync 1", testInfo.getDisplayName()); + + final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() + .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob()); + LOGGER.info("state after sync 1: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); + + testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); + + // Set source to a version that supports per-stream state + testHarness.updateSourceDefinitionVersion(sourceDefinitionId, currentSourceDefintionVersion); + LOGGER.info("Upgraded source connector per-stream state supported version {}.", currentSourceDefintionVersion); + + // add new records and run again. + final Database sourceDatabase = testHarness.getSourceDatabase(); + // get contents of source before mutating records. + final List expectedRecords = testHarness.retrieveSourceRecords(sourceDatabase, STREAM_NAME); + expectedRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, "geralt").build())); + // add a new record + sourceDatabase.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(6, 'geralt')")); + // mutate a record that was already synced with out updating its cursor value. if we are actually + // full refreshing, this record will appear in the output and cause the test to fail. if we are, + // correctly, doing incremental, we will not find this value in the destination. + sourceDatabase.query(ctx -> ctx.execute("UPDATE id_and_name SET name='yennefer' WHERE id=2")); + + LOGGER.info("Starting {} sync 2", testInfo.getDisplayName()); + final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi() + .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob()); + LOGGER.info("state after sync 2: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); + + testHarness.assertRawDestinationContains(expectedRecords, new SchemaTableNamePair("public", STREAM_NAME)); + + // reset back to no data. + LOGGER.info("Starting {} reset", testInfo.getDisplayName()); + final JobInfoRead jobInfoRead = apiClient.getConnectionApi().resetConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitWhileJobHasStatus(apiClient.getJobsApi(), jobInfoRead.getJob(), + Sets.newHashSet(JobStatus.PENDING, JobStatus.RUNNING, JobStatus.INCOMPLETE, JobStatus.FAILED)); + + LOGGER.info("state after reset: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); + + testHarness.assertRawDestinationContains(Collections.emptyList(), new SchemaTableNamePair("public", + STREAM_NAME)); + + // sync one more time. verify it is the equivalent of a full refresh. + final String expectedState = + "{\"cdc\":false,\"streams\":[{\"cursor\":\"6\",\"stream_name\":\"id_and_name\",\"cursor_field\":[\"id\"],\"stream_namespace\":\"public\"}]}"; + LOGGER.info("Starting {} sync 3", testInfo.getDisplayName()); + final JobInfoRead connectionSyncRead3 = + apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob()); + final ConnectionState state = apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)); + LOGGER.info("state after sync 3: {}", state); + + testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); + assertEquals(Jsons.deserialize(expectedState), state.getState()); + } + + @Test + public void testSyncAfterUpgradeToPerStreamStateWithNoNewData(final TestInfo testInfo) throws Exception { + LOGGER.info("Starting {}", testInfo.getDisplayName()); + final String connectionName = "test-connection"; + final SourceRead source = testHarness.createPostgresSource(); + final UUID sourceId = source.getSourceId(); + final UUID sourceDefinitionId = source.getSourceDefinitionId(); + final UUID destinationId = testHarness.createDestination().getDestinationId(); + final UUID operationId = testHarness.createOperation().getOperationId(); + final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); + + // Fetch the current/most recent source definition version + final SourceDefinitionRead sourceDefinitionRead = + apiClient.getSourceDefinitionApi().getSourceDefinition(new SourceDefinitionIdRequestBody().sourceDefinitionId(sourceDefinitionId)); + final String currentSourceDefintionVersion = sourceDefinitionRead.getDockerImageTag(); + + // Set the source to a version that does not support per-stream state + LOGGER.info("Setting source connector to pre-per-stream state version {}...", + AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION); + testHarness.updateSourceDefinitionVersion(sourceDefinitionId, AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION); + + catalog.getStreams().forEach(s -> s.getConfig() + .syncMode(SyncMode.INCREMENTAL) + .cursorField(List.of(COLUMN_ID)) + .destinationSyncMode(DestinationSyncMode.APPEND)); + final UUID connectionId = + testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + LOGGER.info("Beginning {} sync 1", testInfo.getDisplayName()); + + final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() + .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob()); + LOGGER.info("state after sync 1: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); + + testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); + + // Set source to a version that supports per-stream state + testHarness.updateSourceDefinitionVersion(sourceDefinitionId, currentSourceDefintionVersion); + LOGGER.info("Upgraded source connector per-stream state supported version {}.", currentSourceDefintionVersion); + + // sync one more time. verify that nothing has been synced + LOGGER.info("Starting {} sync 2", testInfo.getDisplayName()); + final JobInfoRead connectionSyncRead2 = + apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob()); + LOGGER.info("state after sync 2: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); + + final JobInfoRead syncJob = apiClient.getJobsApi().getJobInfo(new JobIdRequestBody().id(connectionSyncRead2.getJob().getId())); + final Optional result = syncJob.getAttempts().stream() + .sorted((a, b) -> Long.compare(b.getAttempt().getEndedAt(), a.getAttempt().getEndedAt())) + .findFirst(); + + assertTrue(result.isPresent()); + assertEquals(0, result.get().getAttempt().getRecordsSynced()); + assertEquals(0, result.get().getAttempt().getTotalStats().getRecordsEmitted()); + testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); + } + + // This test is disabled because it takes a couple of minutes to run, as it is testing timeouts. // It should be re-enabled when the @SlowIntegrationTest can be applied to it. // See relevant issue: https://github.com/airbytehq/airbyte/issues/8397 @Disabled