From cf8cb58e362c55842a9f8d689014d846cf28699f Mon Sep 17 00:00:00 2001 From: jdpgrailsdev Date: Wed, 29 Jun 2022 12:15:28 -0400 Subject: [PATCH 1/6] Add acceptance tests for per-stream state updates --- .../utils/AirbyteAcceptanceTestHarness.java | 13 +- .../test/acceptance/BasicAcceptanceTests.java | 155 +++++++++++++++++- 2 files changed, 166 insertions(+), 2 deletions(-) diff --git a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java index a01439de5465..b2a0c960ef4a 100644 --- a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java +++ b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java @@ -42,6 +42,7 @@ import io.airbyte.api.client.model.generated.SourceCreate; import io.airbyte.api.client.model.generated.SourceDefinitionCreate; import io.airbyte.api.client.model.generated.SourceDefinitionRead; +import io.airbyte.api.client.model.generated.SourceDefinitionUpdate; import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody; import io.airbyte.api.client.model.generated.SourceIdRequestBody; import io.airbyte.api.client.model.generated.SourceRead; @@ -111,6 +112,10 @@ public class AirbyteAcceptanceTestHarness { private static final String SOURCE_E2E_TEST_CONNECTOR_VERSION = "0.1.1"; private static final String DESTINATION_E2E_TEST_CONNECTOR_VERSION = "0.1.1"; + public static final String POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION = "0.4.26"; + + public static final String POSTGRES_SOURCE_PER_STREAM_STATE_CONNECTOR_VERSION = "0.4.28"; + private static final String OUTPUT_NAMESPACE_PREFIX = "output_namespace_"; private static final String OUTPUT_NAMESPACE = OUTPUT_NAMESPACE_PREFIX + "${SOURCE_NAMESPACE}"; private static final String OUTPUT_STREAM_PREFIX = "output_table_"; @@ -274,7 +279,8 @@ private void assignEnvVars() { isGke = System.getenv().containsKey("IS_GKE"); isMac = System.getProperty("os.name").startsWith("Mac"); useExternalDeployment = - System.getenv("USE_EXTERNAL_DEPLOYMENT") != null && System.getenv("USE_EXTERNAL_DEPLOYMENT").equalsIgnoreCase("true"); + System.getenv("USE_EXTERNAL_DEPLOYMENT") != null && + System.getenv("USE_EXTERNAL_DEPLOYMENT").equalsIgnoreCase("true"); } private WorkflowClient getWorkflowClient() { @@ -626,6 +632,11 @@ public UUID getPostgresSourceDefinitionId() throws ApiException { .getSourceDefinitionId(); } + public void updateSourceDefinitionVersion(final UUID sourceDefinitionId, final String dockerImageTag) throws ApiException { + apiClient.getSourceDefinitionApi().updateSourceDefinition(new SourceDefinitionUpdate() + .sourceDefinitionId(sourceDefinitionId).dockerImageTag(dockerImageTag)); + } + private void clearSourceDbData() throws SQLException { final Database database = getSourceDatabase(); final Set pairs = listAllTables(database); diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java index f6289d41b98e..506c653392f7 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java @@ -712,7 +712,160 @@ public void testResetCancelsRunningSync() throws Exception { assertEquals(JobStatus.CANCELLED, connectionSyncReadAfterReset.getStatus()); } - // This test is disabled because it takes a couple minutes to run, as it is testing timeouts. + @Test + public void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws Exception { + LOGGER.info("Starting {}", testInfo.getDisplayName()); + final String connectionName = "test-connection"; + final SourceRead source = testHarness.createPostgresSource(); + final UUID sourceId = source.getSourceId(); + final UUID sourceDefinitionId = source.getSourceDefinitionId(); + final UUID destinationId = testHarness.createDestination().getDestinationId(); + final UUID operationId = testHarness.createOperation().getOperationId(); + final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); + final AirbyteStream stream = catalog.getStreams().get(0).getStream(); + + assertEquals(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL), stream.getSupportedSyncModes()); + // instead of assertFalse to avoid NPE from unboxed. + assertNull(stream.getSourceDefinedCursor()); + assertTrue(stream.getDefaultCursorField().isEmpty()); + assertTrue(stream.getSourceDefinedPrimaryKey().isEmpty()); + + // Set the source to a version that does not support per-stream state + LOGGER.info("Setting source connector to pre-per-stream state version {}...", + AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION); + testHarness.updateSourceDefinitionVersion(sourceDefinitionId, AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION); + + final SyncMode syncMode = SyncMode.INCREMENTAL; + final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND; + catalog.getStreams().forEach(s -> s.getConfig() + .syncMode(syncMode) + .cursorField(List.of(COLUMN_ID)) + .destinationSyncMode(destinationSyncMode)); + final UUID connectionId = + testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + LOGGER.info("Beginning {} sync 1", testInfo.getDisplayName()); + + final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() + .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob()); + LOGGER.info("state after sync 1: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); + + testHarness.assertSourceAndDestinationDbInSync(false); + + // Set source to a version that supports per-stream state + testHarness.updateSourceDefinitionVersion(sourceDefinitionId, AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_PER_STREAM_STATE_CONNECTOR_VERSION); + LOGGER.info("Upgraded source connector per-stream state supported version {}.", + AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_PER_STREAM_STATE_CONNECTOR_VERSION); + + // add new records and run again. + final Database sourceDatabase = testHarness.getSourceDatabase(); + // get contents of source before mutating records. + final List expectedRecords = testHarness.retrieveSourceRecords(sourceDatabase, STREAM_NAME); + expectedRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, "geralt").build())); + // add a new record + sourceDatabase.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(6, 'geralt')")); + // mutate a record that was already synced with out updating its cursor value. if we are actually + // full refreshing, this record will appear in the output and cause the test to fail. if we are, + // correctly, doing incremental, we will not find this value in the destination. + sourceDatabase.query(ctx -> ctx.execute("UPDATE id_and_name SET name='yennefer' WHERE id=2")); + + LOGGER.info("Starting {} sync 2", testInfo.getDisplayName()); + final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi() + .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob()); + LOGGER.info("state after sync 2: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); + + testHarness.assertRawDestinationContains(expectedRecords, new SchemaTableNamePair("public", STREAM_NAME)); + + // reset back to no data. + + LOGGER.info("Starting {} reset", testInfo.getDisplayName()); + final JobInfoRead jobInfoRead = apiClient.getConnectionApi().resetConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitWhileJobHasStatus(apiClient.getJobsApi(), jobInfoRead.getJob(), + Sets.newHashSet(JobStatus.PENDING, JobStatus.RUNNING, JobStatus.INCOMPLETE, JobStatus.FAILED)); + + LOGGER.info("state after reset: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); + + testHarness.assertRawDestinationContains(Collections.emptyList(), new SchemaTableNamePair("public", + STREAM_NAME)); + + // sync one more time. verify it is the equivalent of a full refresh. + LOGGER.info("Starting {} sync 3", testInfo.getDisplayName()); + final JobInfoRead connectionSyncRead3 = + apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob()); + LOGGER.info("state after sync 3: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); + + testHarness.assertSourceAndDestinationDbInSync(false); + } + + @Test + public void testSyncAfterUpgradeToPerStreamStateWithNoNewData(final TestInfo testInfo) throws Exception { + LOGGER.info("Starting {}", testInfo.getDisplayName()); + final String connectionName = "test-connection"; + final SourceRead source = testHarness.createPostgresSource(); + final UUID sourceId = source.getSourceId(); + final UUID sourceDefinitionId = source.getSourceDefinitionId(); + final UUID destinationId = testHarness.createDestination().getDestinationId(); + final UUID operationId = testHarness.createOperation().getOperationId(); + final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); + final AirbyteStream stream = catalog.getStreams().get(0).getStream(); + + assertEquals(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL), stream.getSupportedSyncModes()); + // instead of assertFalse to avoid NPE from unboxed. + assertNull(stream.getSourceDefinedCursor()); + assertTrue(stream.getDefaultCursorField().isEmpty()); + assertTrue(stream.getSourceDefinedPrimaryKey().isEmpty()); + + // Set the source to a version that does not support per-stream state + LOGGER.info("Setting source connector to pre-per-stream state version {}...", + AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION); + testHarness.updateSourceDefinitionVersion(sourceDefinitionId, AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION); + + final SyncMode syncMode = SyncMode.INCREMENTAL; + final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND; + catalog.getStreams().forEach(s -> s.getConfig() + .syncMode(syncMode) + .cursorField(List.of(COLUMN_ID)) + .destinationSyncMode(destinationSyncMode)); + final UUID connectionId = + testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); + LOGGER.info("Beginning {} sync 1", testInfo.getDisplayName()); + + final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() + .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob()); + LOGGER.info("state after sync 1: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); + + testHarness.assertSourceAndDestinationDbInSync(false); + + // get contents of source before mutating records. + final Database sourceDatabase = testHarness.getSourceDatabase(); + final List expectedRecords = testHarness.retrieveSourceRecords(sourceDatabase, STREAM_NAME); + + // Set source to a version that supports per-stream state + testHarness.updateSourceDefinitionVersion(sourceDefinitionId, AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_PER_STREAM_STATE_CONNECTOR_VERSION); + LOGGER.info("Upgraded source connector per-stream state supported version {}.", + AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_PER_STREAM_STATE_CONNECTOR_VERSION); + + // sync one more time. verify that nothing has been synced + LOGGER.info("Starting {} sync 3", testInfo.getDisplayName()); + final JobInfoRead connectionSyncRead3 = + apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob()); + LOGGER.info("state after sync 3: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); + + final JobInfoRead syncJob = apiClient.getJobsApi().getJobInfo(new JobIdRequestBody().id(connectionSyncRead3.getJob().getId())); + final Optional result = syncJob.getAttempts().stream() + .sorted((a, b) -> Long.compare(b.getAttempt().getEndedAt(), a.getAttempt().getEndedAt())) + .findFirst(); + + assertTrue(result.isPresent()); + assertEquals(0, result.get().getAttempt().getRecordsSynced()); + testHarness.assertSourceAndDestinationDbInSync(false); + } + + // This test is disabled because it takes a couple of minutes to run, as it is testing timeouts. // It should be re-enabled when the @SlowIntegrationTest can be applied to it. // See relevant issue: https://github.com/airbytehq/airbyte/issues/8397 @Disabled From 441f1313951aa527ace1ff53071dd0d9234943d8 Mon Sep 17 00:00:00 2001 From: jdpgrailsdev Date: Wed, 29 Jun 2022 17:03:21 -0400 Subject: [PATCH 2/6] PR feedback --- .../test/acceptance/BasicAcceptanceTests.java | 58 ++++++------------- 1 file changed, 18 insertions(+), 40 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java index 506c653392f7..c83396af5fca 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java @@ -57,6 +57,9 @@ public class BasicAcceptanceTests { private static final Logger LOGGER = LoggerFactory.getLogger(BasicAcceptanceTests.class); + private static final SyncMode INCREMENTAL_SYNC_MODE = SyncMode.INCREMENTAL; + private static final DestinationSyncMode DESTINATION_SYNC_MODE_APPEND = DestinationSyncMode.APPEND; + private static AirbyteAcceptanceTestHarness testHarness; private static AirbyteApiClient apiClient; private static UUID workspaceId; @@ -566,12 +569,10 @@ public void testUpdateConnectionWhenWorkflowUnreachable() throws Exception { final UUID destinationId = testHarness.createDestination().getDestinationId(); final UUID operationId = testHarness.createOperation().getOperationId(); final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); - final SyncMode syncMode = SyncMode.INCREMENTAL; - final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND_DEDUP; catalog.getStreams().forEach(s -> s.getConfig() - .syncMode(syncMode) + .syncMode(SyncMode.INCREMENTAL) .cursorField(List.of(COLUMN_ID)) - .destinationSyncMode(destinationSyncMode) + .destinationSyncMode(DestinationSyncMode.APPEND_DEDUP) .primaryKey(List.of(List.of(COLUMN_NAME)))); LOGGER.info("Testing connection update when temporal is in a terminal state"); @@ -611,12 +612,10 @@ public void testManualSyncRepairsWorkflowWhenWorkflowUnreachable() throws Except final UUID destinationId = testHarness.createDestination().getDestinationId(); final UUID operationId = testHarness.createOperation().getOperationId(); final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); - final SyncMode syncMode = SyncMode.INCREMENTAL; - final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND_DEDUP; catalog.getStreams().forEach(s -> s.getConfig() - .syncMode(syncMode) + .syncMode(INCREMENTAL_SYNC_MODE) .cursorField(List.of(COLUMN_ID)) - .destinationSyncMode(destinationSyncMode) + .destinationSyncMode(DestinationSyncMode.APPEND_DEDUP) .primaryKey(List.of(List.of(COLUMN_NAME)))); LOGGER.info("Testing manual sync when temporal is in a terminal state"); @@ -653,12 +652,10 @@ public void testResetConnectionRepairsWorkflowWhenWorkflowUnreachable() throws E final UUID destinationId = testHarness.createDestination().getDestinationId(); final UUID operationId = testHarness.createOperation().getOperationId(); final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); - final SyncMode syncMode = SyncMode.INCREMENTAL; - final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND_DEDUP; catalog.getStreams().forEach(s -> s.getConfig() - .syncMode(syncMode) + .syncMode(SyncMode.INCREMENTAL) .cursorField(List.of(COLUMN_ID)) - .destinationSyncMode(destinationSyncMode) + .destinationSyncMode(DestinationSyncMode.APPEND_DEDUP) .primaryKey(List.of(List.of(COLUMN_NAME)))); LOGGER.info("Testing reset connection when temporal is in a terminal state"); @@ -722,25 +719,16 @@ public void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws final UUID destinationId = testHarness.createDestination().getDestinationId(); final UUID operationId = testHarness.createOperation().getOperationId(); final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); - final AirbyteStream stream = catalog.getStreams().get(0).getStream(); - - assertEquals(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL), stream.getSupportedSyncModes()); - // instead of assertFalse to avoid NPE from unboxed. - assertNull(stream.getSourceDefinedCursor()); - assertTrue(stream.getDefaultCursorField().isEmpty()); - assertTrue(stream.getSourceDefinedPrimaryKey().isEmpty()); // Set the source to a version that does not support per-stream state LOGGER.info("Setting source connector to pre-per-stream state version {}...", AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION); testHarness.updateSourceDefinitionVersion(sourceDefinitionId, AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION); - final SyncMode syncMode = SyncMode.INCREMENTAL; - final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND; catalog.getStreams().forEach(s -> s.getConfig() - .syncMode(syncMode) + .syncMode(INCREMENTAL_SYNC_MODE) .cursorField(List.of(COLUMN_ID)) - .destinationSyncMode(destinationSyncMode)); + .destinationSyncMode(DESTINATION_SYNC_MODE_APPEND)); final UUID connectionId = testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); LOGGER.info("Beginning {} sync 1", testInfo.getDisplayName()); @@ -778,7 +766,6 @@ public void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws testHarness.assertRawDestinationContains(expectedRecords, new SchemaTableNamePair("public", STREAM_NAME)); // reset back to no data. - LOGGER.info("Starting {} reset", testInfo.getDisplayName()); final JobInfoRead jobInfoRead = apiClient.getConnectionApi().resetConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitWhileJobHasStatus(apiClient.getJobsApi(), jobInfoRead.getJob(), @@ -790,13 +777,16 @@ public void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws STREAM_NAME)); // sync one more time. verify it is the equivalent of a full refresh. + final String expectedState = "{\"cdc\":false,\"streams\":[{\"cursor\":\"6\",\"stream_name\":\"id_and_name\",\"cursor_field\":[\"id\"],\"stream_namespace\":\"public\"}]}"; LOGGER.info("Starting {} sync 3", testInfo.getDisplayName()); final JobInfoRead connectionSyncRead3 = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob()); - LOGGER.info("state after sync 3: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); + final ConnectionState state = apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)); + LOGGER.info("state after sync 3: {}", state); testHarness.assertSourceAndDestinationDbInSync(false); + assertEquals(Jsons.deserialize(expectedState), state.getState()); } @Test @@ -809,25 +799,16 @@ public void testSyncAfterUpgradeToPerStreamStateWithNoNewData(final TestInfo tes final UUID destinationId = testHarness.createDestination().getDestinationId(); final UUID operationId = testHarness.createOperation().getOperationId(); final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); - final AirbyteStream stream = catalog.getStreams().get(0).getStream(); - - assertEquals(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL), stream.getSupportedSyncModes()); - // instead of assertFalse to avoid NPE from unboxed. - assertNull(stream.getSourceDefinedCursor()); - assertTrue(stream.getDefaultCursorField().isEmpty()); - assertTrue(stream.getSourceDefinedPrimaryKey().isEmpty()); // Set the source to a version that does not support per-stream state LOGGER.info("Setting source connector to pre-per-stream state version {}...", AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION); testHarness.updateSourceDefinitionVersion(sourceDefinitionId, AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION); - final SyncMode syncMode = SyncMode.INCREMENTAL; - final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND; catalog.getStreams().forEach(s -> s.getConfig() - .syncMode(syncMode) + .syncMode(INCREMENTAL_SYNC_MODE) .cursorField(List.of(COLUMN_ID)) - .destinationSyncMode(destinationSyncMode)); + .destinationSyncMode(DESTINATION_SYNC_MODE_APPEND)); final UUID connectionId = testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); LOGGER.info("Beginning {} sync 1", testInfo.getDisplayName()); @@ -839,10 +820,6 @@ public void testSyncAfterUpgradeToPerStreamStateWithNoNewData(final TestInfo tes testHarness.assertSourceAndDestinationDbInSync(false); - // get contents of source before mutating records. - final Database sourceDatabase = testHarness.getSourceDatabase(); - final List expectedRecords = testHarness.retrieveSourceRecords(sourceDatabase, STREAM_NAME); - // Set source to a version that supports per-stream state testHarness.updateSourceDefinitionVersion(sourceDefinitionId, AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_PER_STREAM_STATE_CONNECTOR_VERSION); LOGGER.info("Upgraded source connector per-stream state supported version {}.", @@ -862,6 +839,7 @@ public void testSyncAfterUpgradeToPerStreamStateWithNoNewData(final TestInfo tes assertTrue(result.isPresent()); assertEquals(0, result.get().getAttempt().getRecordsSynced()); + assertEquals(0, result.get().getAttempt().getTotalStats().getRecordsEmitted()); testHarness.assertSourceAndDestinationDbInSync(false); } From 9a4958e6180f5ffc76f92020bd73083d545c8f94 Mon Sep 17 00:00:00 2001 From: jdpgrailsdev Date: Thu, 30 Jun 2022 09:57:23 -0400 Subject: [PATCH 3/6] Formatting --- .../java/io/airbyte/test/acceptance/BasicAcceptanceTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java index c83396af5fca..ceea9955c186 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java @@ -777,7 +777,8 @@ public void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws STREAM_NAME)); // sync one more time. verify it is the equivalent of a full refresh. - final String expectedState = "{\"cdc\":false,\"streams\":[{\"cursor\":\"6\",\"stream_name\":\"id_and_name\",\"cursor_field\":[\"id\"],\"stream_namespace\":\"public\"}]}"; + final String expectedState = + "{\"cdc\":false,\"streams\":[{\"cursor\":\"6\",\"stream_name\":\"id_and_name\",\"cursor_field\":[\"id\"],\"stream_namespace\":\"public\"}]}"; LOGGER.info("Starting {} sync 3", testInfo.getDisplayName()); final JobInfoRead connectionSyncRead3 = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); From 0f2aa6ae483d03a8a5928964c811a7fec4f5e3d1 Mon Sep 17 00:00:00 2001 From: jdpgrailsdev Date: Thu, 30 Jun 2022 09:58:41 -0400 Subject: [PATCH 4/6] More PR feedback --- .../test/acceptance/BasicAcceptanceTests.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java index ceea9955c186..dd6c8b4c8b29 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java @@ -57,9 +57,6 @@ public class BasicAcceptanceTests { private static final Logger LOGGER = LoggerFactory.getLogger(BasicAcceptanceTests.class); - private static final SyncMode INCREMENTAL_SYNC_MODE = SyncMode.INCREMENTAL; - private static final DestinationSyncMode DESTINATION_SYNC_MODE_APPEND = DestinationSyncMode.APPEND; - private static AirbyteAcceptanceTestHarness testHarness; private static AirbyteApiClient apiClient; private static UUID workspaceId; @@ -613,7 +610,7 @@ public void testManualSyncRepairsWorkflowWhenWorkflowUnreachable() throws Except final UUID operationId = testHarness.createOperation().getOperationId(); final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); catalog.getStreams().forEach(s -> s.getConfig() - .syncMode(INCREMENTAL_SYNC_MODE) + .syncMode(SyncMode.INCREMENTAL) .cursorField(List.of(COLUMN_ID)) .destinationSyncMode(DestinationSyncMode.APPEND_DEDUP) .primaryKey(List.of(List.of(COLUMN_NAME)))); @@ -726,9 +723,9 @@ public void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws testHarness.updateSourceDefinitionVersion(sourceDefinitionId, AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION); catalog.getStreams().forEach(s -> s.getConfig() - .syncMode(INCREMENTAL_SYNC_MODE) + .syncMode(SyncMode.INCREMENTAL) .cursorField(List.of(COLUMN_ID)) - .destinationSyncMode(DESTINATION_SYNC_MODE_APPEND)); + .destinationSyncMode(DestinationSyncMode.APPEND)); final UUID connectionId = testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); LOGGER.info("Beginning {} sync 1", testInfo.getDisplayName()); @@ -807,9 +804,9 @@ public void testSyncAfterUpgradeToPerStreamStateWithNoNewData(final TestInfo tes testHarness.updateSourceDefinitionVersion(sourceDefinitionId, AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION); catalog.getStreams().forEach(s -> s.getConfig() - .syncMode(INCREMENTAL_SYNC_MODE) + .syncMode(SyncMode.INCREMENTAL) .cursorField(List.of(COLUMN_ID)) - .destinationSyncMode(DESTINATION_SYNC_MODE_APPEND)); + .destinationSyncMode(DestinationSyncMode.APPEND)); final UUID connectionId = testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); LOGGER.info("Beginning {} sync 1", testInfo.getDisplayName()); From 9c3bbe221e39004f7fbee20f0f45b6411f67368e Mon Sep 17 00:00:00 2001 From: jdpgrailsdev Date: Thu, 30 Jun 2022 11:10:43 -0400 Subject: [PATCH 5/6] PR feedback --- .../test/acceptance/BasicAcceptanceTests.java | 52 ++++++++++++------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java index dd6c8b4c8b29..d30076993121 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java @@ -57,6 +57,10 @@ public class BasicAcceptanceTests { private static final Logger LOGGER = LoggerFactory.getLogger(BasicAcceptanceTests.class); + private static final Boolean WITH_SCD_TABLE = true; + + private static final Boolean WITHOUT_SCD_TABLE = false; + private static AirbyteAcceptanceTestHarness testHarness; private static AirbyteApiClient apiClient; private static UUID workspaceId; @@ -324,7 +328,7 @@ public void testScheduledSync() throws Exception { sleep(Duration.ofSeconds(30).toMillis()); } - testHarness.assertSourceAndDestinationDbInSync(false); + testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); } @Test @@ -369,7 +373,7 @@ public void testMultipleSchemasSameTablesSync() throws Exception { final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob()); - testHarness.assertSourceAndDestinationDbInSync(false); + testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); } @Test @@ -395,7 +399,7 @@ public void testIncrementalDedupeSync() throws Exception { .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob()); - testHarness.assertSourceAndDestinationDbInSync(true); + testHarness.assertSourceAndDestinationDbInSync(WITH_SCD_TABLE); // add new records and run again. final Database source = testHarness.getSourceDatabase(); @@ -448,7 +452,7 @@ public void testIncrementalSync() throws Exception { waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob()); LOGGER.info("state after sync 1: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); - testHarness.assertSourceAndDestinationDbInSync(false); + testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); // add new records and run again. final Database source = testHarness.getSourceDatabase(); @@ -489,7 +493,7 @@ public void testIncrementalSync() throws Exception { waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob()); LOGGER.info("state after sync 3: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); - testHarness.assertSourceAndDestinationDbInSync(false); + testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); } @@ -717,6 +721,11 @@ public void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws final UUID operationId = testHarness.createOperation().getOperationId(); final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); + // Fetch the current/most recent source definition version + final SourceDefinitionRead sourceDefinitionRead = + apiClient.getSourceDefinitionApi().getSourceDefinition(new SourceDefinitionIdRequestBody().sourceDefinitionId(sourceDefinitionId)); + final String currentSourceDefintionVersion = sourceDefinitionRead.getDockerImageTag(); + // Set the source to a version that does not support per-stream state LOGGER.info("Setting source connector to pre-per-stream state version {}...", AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION); @@ -735,12 +744,11 @@ public void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob()); LOGGER.info("state after sync 1: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); - testHarness.assertSourceAndDestinationDbInSync(false); + testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); // Set source to a version that supports per-stream state - testHarness.updateSourceDefinitionVersion(sourceDefinitionId, AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_PER_STREAM_STATE_CONNECTOR_VERSION); - LOGGER.info("Upgraded source connector per-stream state supported version {}.", - AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_PER_STREAM_STATE_CONNECTOR_VERSION); + testHarness.updateSourceDefinitionVersion(sourceDefinitionId, currentSourceDefintionVersion); + LOGGER.info("Upgraded source connector per-stream state supported version {}.", currentSourceDefintionVersion); // add new records and run again. final Database sourceDatabase = testHarness.getSourceDatabase(); @@ -783,7 +791,7 @@ public void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws final ConnectionState state = apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)); LOGGER.info("state after sync 3: {}", state); - testHarness.assertSourceAndDestinationDbInSync(false); + testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); assertEquals(Jsons.deserialize(expectedState), state.getState()); } @@ -798,6 +806,11 @@ public void testSyncAfterUpgradeToPerStreamStateWithNoNewData(final TestInfo tes final UUID operationId = testHarness.createOperation().getOperationId(); final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); + // Fetch the current/most recent source definition version + final SourceDefinitionRead sourceDefinitionRead = + apiClient.getSourceDefinitionApi().getSourceDefinition(new SourceDefinitionIdRequestBody().sourceDefinitionId(sourceDefinitionId)); + final String currentSourceDefintionVersion = sourceDefinitionRead.getDockerImageTag(); + // Set the source to a version that does not support per-stream state LOGGER.info("Setting source connector to pre-per-stream state version {}...", AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION); @@ -816,21 +829,20 @@ public void testSyncAfterUpgradeToPerStreamStateWithNoNewData(final TestInfo tes waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob()); LOGGER.info("state after sync 1: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); - testHarness.assertSourceAndDestinationDbInSync(false); + testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); // Set source to a version that supports per-stream state - testHarness.updateSourceDefinitionVersion(sourceDefinitionId, AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_PER_STREAM_STATE_CONNECTOR_VERSION); - LOGGER.info("Upgraded source connector per-stream state supported version {}.", - AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_PER_STREAM_STATE_CONNECTOR_VERSION); + testHarness.updateSourceDefinitionVersion(sourceDefinitionId, currentSourceDefintionVersion); + LOGGER.info("Upgraded source connector per-stream state supported version {}.", currentSourceDefintionVersion); // sync one more time. verify that nothing has been synced - LOGGER.info("Starting {} sync 3", testInfo.getDisplayName()); - final JobInfoRead connectionSyncRead3 = + LOGGER.info("Starting {} sync 2", testInfo.getDisplayName()); + final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob()); - LOGGER.info("state after sync 3: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); + waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob()); + LOGGER.info("state after sync 2: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); - final JobInfoRead syncJob = apiClient.getJobsApi().getJobInfo(new JobIdRequestBody().id(connectionSyncRead3.getJob().getId())); + final JobInfoRead syncJob = apiClient.getJobsApi().getJobInfo(new JobIdRequestBody().id(connectionSyncRead2.getJob().getId())); final Optional result = syncJob.getAttempts().stream() .sorted((a, b) -> Long.compare(b.getAttempt().getEndedAt(), a.getAttempt().getEndedAt())) .findFirst(); @@ -838,7 +850,7 @@ public void testSyncAfterUpgradeToPerStreamStateWithNoNewData(final TestInfo tes assertTrue(result.isPresent()); assertEquals(0, result.get().getAttempt().getRecordsSynced()); assertEquals(0, result.get().getAttempt().getTotalStats().getRecordsEmitted()); - testHarness.assertSourceAndDestinationDbInSync(false); + testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); } // This test is disabled because it takes a couple of minutes to run, as it is testing timeouts. From 65964e0bc47853c8f7b2382d63ac0efef390b10d Mon Sep 17 00:00:00 2001 From: Jonathan Pearlin Date: Thu, 30 Jun 2022 11:30:45 -0400 Subject: [PATCH 6/6] Remove unused constant --- .../io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java index cd4da58e07c4..799dc5abb686 100644 --- a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java +++ b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java @@ -116,8 +116,6 @@ public class AirbyteAcceptanceTestHarness { public static final String POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION = "0.4.26"; - public static final String POSTGRES_SOURCE_PER_STREAM_STATE_CONNECTOR_VERSION = "0.4.28"; - private static final String OUTPUT_NAMESPACE_PREFIX = "output_namespace_"; private static final String OUTPUT_NAMESPACE = OUTPUT_NAMESPACE_PREFIX + "${SOURCE_NAMESPACE}"; private static final String OUTPUT_STREAM_PREFIX = "output_table_";