Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add acceptance tests for per-stream state updates #14263

Merged
merged 7 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.airbyte.api.client.model.generated.SourceCreate;
import io.airbyte.api.client.model.generated.SourceDefinitionCreate;
import io.airbyte.api.client.model.generated.SourceDefinitionRead;
import io.airbyte.api.client.model.generated.SourceDefinitionUpdate;
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody;
import io.airbyte.api.client.model.generated.SourceIdRequestBody;
import io.airbyte.api.client.model.generated.SourceRead;
Expand Down Expand Up @@ -113,6 +114,8 @@ public class AirbyteAcceptanceTestHarness {
private static final String SOURCE_E2E_TEST_CONNECTOR_VERSION = "0.1.1";
private static final String DESTINATION_E2E_TEST_CONNECTOR_VERSION = "0.1.1";

public static final String POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION = "0.4.26";

private static final String OUTPUT_NAMESPACE_PREFIX = "output_namespace_";
private static final String OUTPUT_NAMESPACE = OUTPUT_NAMESPACE_PREFIX + "${SOURCE_NAMESPACE}";
private static final String OUTPUT_STREAM_PREFIX = "output_table_";
Expand Down Expand Up @@ -284,7 +287,8 @@ private void assignEnvVars() {
isGke = System.getenv().containsKey("IS_GKE");
isMac = System.getProperty("os.name").startsWith("Mac");
useExternalDeployment =
System.getenv("USE_EXTERNAL_DEPLOYMENT") != null && System.getenv("USE_EXTERNAL_DEPLOYMENT").equalsIgnoreCase("true");
System.getenv("USE_EXTERNAL_DEPLOYMENT") != null &&
System.getenv("USE_EXTERNAL_DEPLOYMENT").equalsIgnoreCase("true");
}

private WorkflowClient getWorkflowClient() {
Expand Down Expand Up @@ -636,6 +640,11 @@ public UUID getPostgresSourceDefinitionId() throws ApiException {
.getSourceDefinitionId();
}

public void updateSourceDefinitionVersion(final UUID sourceDefinitionId, final String dockerImageTag) throws ApiException {
apiClient.getSourceDefinitionApi().updateSourceDefinition(new SourceDefinitionUpdate()
.sourceDefinitionId(sourceDefinitionId).dockerImageTag(dockerImageTag));
}

private void clearSourceDbData() throws SQLException {
final Database database = getSourceDatabase();
final Set<SchemaTableNamePair> pairs = listAllTables(database);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public class BasicAcceptanceTests {

private static final Logger LOGGER = LoggerFactory.getLogger(BasicAcceptanceTests.class);

private static final Boolean WITH_SCD_TABLE = true;

private static final Boolean WITHOUT_SCD_TABLE = false;

private static AirbyteAcceptanceTestHarness testHarness;
private static AirbyteApiClient apiClient;
private static UUID workspaceId;
Expand Down Expand Up @@ -324,7 +328,7 @@ public void testScheduledSync() throws Exception {
sleep(Duration.ofSeconds(30).toMillis());
}

testHarness.assertSourceAndDestinationDbInSync(false);
testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);
}

@Test
Expand Down Expand Up @@ -369,7 +373,7 @@ public void testMultipleSchemasSameTablesSync() throws Exception {

final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());
testHarness.assertSourceAndDestinationDbInSync(false);
testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);
}

@Test
Expand All @@ -395,7 +399,7 @@ public void testIncrementalDedupeSync() throws Exception {
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob());

testHarness.assertSourceAndDestinationDbInSync(true);
testHarness.assertSourceAndDestinationDbInSync(WITH_SCD_TABLE);

// add new records and run again.
final Database source = testHarness.getSourceDatabase();
Expand Down Expand Up @@ -448,7 +452,7 @@ public void testIncrementalSync() throws Exception {
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob());
LOGGER.info("state after sync 1: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

testHarness.assertSourceAndDestinationDbInSync(false);
testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);

// add new records and run again.
final Database source = testHarness.getSourceDatabase();
Expand Down Expand Up @@ -489,7 +493,7 @@ public void testIncrementalSync() throws Exception {
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob());
LOGGER.info("state after sync 3: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

testHarness.assertSourceAndDestinationDbInSync(false);
testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);

}

Expand Down Expand Up @@ -566,12 +570,10 @@ public void testUpdateConnectionWhenWorkflowUnreachable() throws Exception {
final UUID destinationId = testHarness.createDestination().getDestinationId();
final UUID operationId = testHarness.createOperation().getOperationId();
final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId);
final SyncMode syncMode = SyncMode.INCREMENTAL;
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND_DEDUP;
catalog.getStreams().forEach(s -> s.getConfig()
.syncMode(syncMode)
.syncMode(SyncMode.INCREMENTAL)
jdpgrailsdev marked this conversation as resolved.
Show resolved Hide resolved
.cursorField(List.of(COLUMN_ID))
.destinationSyncMode(destinationSyncMode)
.destinationSyncMode(DestinationSyncMode.APPEND_DEDUP)
.primaryKey(List.of(List.of(COLUMN_NAME))));

LOGGER.info("Testing connection update when temporal is in a terminal state");
Expand Down Expand Up @@ -611,12 +613,10 @@ public void testManualSyncRepairsWorkflowWhenWorkflowUnreachable() throws Except
final UUID destinationId = testHarness.createDestination().getDestinationId();
final UUID operationId = testHarness.createOperation().getOperationId();
final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId);
final SyncMode syncMode = SyncMode.INCREMENTAL;
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND_DEDUP;
catalog.getStreams().forEach(s -> s.getConfig()
.syncMode(syncMode)
.syncMode(SyncMode.INCREMENTAL)
.cursorField(List.of(COLUMN_ID))
.destinationSyncMode(destinationSyncMode)
.destinationSyncMode(DestinationSyncMode.APPEND_DEDUP)
.primaryKey(List.of(List.of(COLUMN_NAME))));

LOGGER.info("Testing manual sync when temporal is in a terminal state");
Expand Down Expand Up @@ -653,12 +653,10 @@ public void testResetConnectionRepairsWorkflowWhenWorkflowUnreachable() throws E
final UUID destinationId = testHarness.createDestination().getDestinationId();
final UUID operationId = testHarness.createOperation().getOperationId();
final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId);
final SyncMode syncMode = SyncMode.INCREMENTAL;
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND_DEDUP;
catalog.getStreams().forEach(s -> s.getConfig()
.syncMode(syncMode)
.syncMode(SyncMode.INCREMENTAL)
.cursorField(List.of(COLUMN_ID))
.destinationSyncMode(destinationSyncMode)
.destinationSyncMode(DestinationSyncMode.APPEND_DEDUP)
.primaryKey(List.of(List.of(COLUMN_NAME))));

LOGGER.info("Testing reset connection when temporal is in a terminal state");
Expand Down Expand Up @@ -712,7 +710,150 @@ public void testResetCancelsRunningSync() throws Exception {
assertEquals(JobStatus.CANCELLED, connectionSyncReadAfterReset.getStatus());
}

// This test is disabled because it takes a couple minutes to run, as it is testing timeouts.
@Test
public void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is someone else working on an acceptance test to test actual per-stream resets, i.e. sync a few streams, reset one of them, and verify that only that one stream is reset in the destination and that the state looks correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gosusnp is. We paired today so his question about how the acceptance test are working are answered.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly, I am currently working on an acceptance test for current reset behavior. I am happy to help out on that test afterwards.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some room, I will work on those.

LOGGER.info("Starting {}", testInfo.getDisplayName());
final String connectionName = "test-connection";
final SourceRead source = testHarness.createPostgresSource();
final UUID sourceId = source.getSourceId();
final UUID sourceDefinitionId = source.getSourceDefinitionId();
final UUID destinationId = testHarness.createDestination().getDestinationId();
final UUID operationId = testHarness.createOperation().getOperationId();
final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId);

// Fetch the current/most recent source definition version
final SourceDefinitionRead sourceDefinitionRead =
apiClient.getSourceDefinitionApi().getSourceDefinition(new SourceDefinitionIdRequestBody().sourceDefinitionId(sourceDefinitionId));
final String currentSourceDefintionVersion = sourceDefinitionRead.getDockerImageTag();

// Set the source to a version that does not support per-stream state
LOGGER.info("Setting source connector to pre-per-stream state version {}...",
AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION);
testHarness.updateSourceDefinitionVersion(sourceDefinitionId, AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION);

catalog.getStreams().forEach(s -> s.getConfig()
.syncMode(SyncMode.INCREMENTAL)
.cursorField(List.of(COLUMN_ID))
.destinationSyncMode(DestinationSyncMode.APPEND));
final UUID connectionId =
testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();
LOGGER.info("Beginning {} sync 1", testInfo.getDisplayName());

final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob());
LOGGER.info("state after sync 1: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);

// Set source to a version that supports per-stream state
testHarness.updateSourceDefinitionVersion(sourceDefinitionId, currentSourceDefintionVersion);
LOGGER.info("Upgraded source connector per-stream state supported version {}.", currentSourceDefintionVersion);

// add new records and run again.
final Database sourceDatabase = testHarness.getSourceDatabase();
// get contents of source before mutating records.
final List<JsonNode> expectedRecords = testHarness.retrieveSourceRecords(sourceDatabase, STREAM_NAME);
expectedRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, "geralt").build()));
// add a new record
sourceDatabase.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(6, 'geralt')"));
// mutate a record that was already synced with out updating its cursor value. if we are actually
// full refreshing, this record will appear in the output and cause the test to fail. if we are,
// correctly, doing incremental, we will not find this value in the destination.
sourceDatabase.query(ctx -> ctx.execute("UPDATE id_and_name SET name='yennefer' WHERE id=2"));

LOGGER.info("Starting {} sync 2", testInfo.getDisplayName());
final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob());
LOGGER.info("state after sync 2: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

testHarness.assertRawDestinationContains(expectedRecords, new SchemaTableNamePair("public", STREAM_NAME));
jdpgrailsdev marked this conversation as resolved.
Show resolved Hide resolved

// reset back to no data.
LOGGER.info("Starting {} reset", testInfo.getDisplayName());
final JobInfoRead jobInfoRead = apiClient.getConnectionApi().resetConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitWhileJobHasStatus(apiClient.getJobsApi(), jobInfoRead.getJob(),
Sets.newHashSet(JobStatus.PENDING, JobStatus.RUNNING, JobStatus.INCOMPLETE, JobStatus.FAILED));

LOGGER.info("state after reset: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

testHarness.assertRawDestinationContains(Collections.emptyList(), new SchemaTableNamePair("public",
STREAM_NAME));

// sync one more time. verify it is the equivalent of a full refresh.
final String expectedState =
"{\"cdc\":false,\"streams\":[{\"cursor\":\"6\",\"stream_name\":\"id_and_name\",\"cursor_field\":[\"id\"],\"stream_namespace\":\"public\"}]}";
LOGGER.info("Starting {} sync 3", testInfo.getDisplayName());
final JobInfoRead connectionSyncRead3 =
apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob());
final ConnectionState state = apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My comment was more about having a check on the state type, like using the new repository done by Jimmy and check that it is legacy here. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think it would be useful if in every place in this test where we are currently calling and just logging apiClient.getConnectionApi().getState(), we instead explicitly check that the ConnectionState object that was returned has the expected stateType and the expected state/globalState/streamState field is set to the expected value.

E.g. I think after sync 1, we expect it to be a LEGACY state. After sync 2, we expect a STREAM state. After the reset, we expect a NOT_SET state. And after sync 3, we expect a STREAM state again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, That sounds closer to the acceptance test spirit to use the API to do any operation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E.g. I think after sync 1, we expect it to be a LEGACY state. After sync 2, we expect a STREAM state. After the reset, we expect a NOT_SET state. And after sync 3, we expect a STREAM state again.

This will depend on our ability to control the feature flag (Passing it to the container that have the flag). This is what I am investigating at the moment but for now it should always be LEGACY.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benmoriceau I think we can do that as a separate PR once everything is ready for use. For now, this is verifying that the existing state stuff works even with the updated connector.

LOGGER.info("state after sync 3: {}", state);

testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);
assertEquals(Jsons.deserialize(expectedState), state.getState());
}

@Test
public void testSyncAfterUpgradeToPerStreamStateWithNoNewData(final TestInfo testInfo) throws Exception {
LOGGER.info("Starting {}", testInfo.getDisplayName());
final String connectionName = "test-connection";
final SourceRead source = testHarness.createPostgresSource();
final UUID sourceId = source.getSourceId();
final UUID sourceDefinitionId = source.getSourceDefinitionId();
final UUID destinationId = testHarness.createDestination().getDestinationId();
final UUID operationId = testHarness.createOperation().getOperationId();
final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId);

// Fetch the current/most recent source definition version
final SourceDefinitionRead sourceDefinitionRead =
apiClient.getSourceDefinitionApi().getSourceDefinition(new SourceDefinitionIdRequestBody().sourceDefinitionId(sourceDefinitionId));
final String currentSourceDefintionVersion = sourceDefinitionRead.getDockerImageTag();

// Set the source to a version that does not support per-stream state
LOGGER.info("Setting source connector to pre-per-stream state version {}...",
AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION);
testHarness.updateSourceDefinitionVersion(sourceDefinitionId, AirbyteAcceptanceTestHarness.POSTGRES_SOURCE_LEGACY_CONNECTOR_VERSION);

catalog.getStreams().forEach(s -> s.getConfig()
.syncMode(SyncMode.INCREMENTAL)
.cursorField(List.of(COLUMN_ID))
.destinationSyncMode(DestinationSyncMode.APPEND));
final UUID connectionId =
testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();
LOGGER.info("Beginning {} sync 1", testInfo.getDisplayName());

final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob());
LOGGER.info("state after sync 1: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);

// Set source to a version that supports per-stream state
testHarness.updateSourceDefinitionVersion(sourceDefinitionId, currentSourceDefintionVersion);
LOGGER.info("Upgraded source connector per-stream state supported version {}.", currentSourceDefintionVersion);

// sync one more time. verify that nothing has been synced
LOGGER.info("Starting {} sync 2", testInfo.getDisplayName());
final JobInfoRead connectionSyncRead2 =
apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob());
LOGGER.info("state after sync 2: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

final JobInfoRead syncJob = apiClient.getJobsApi().getJobInfo(new JobIdRequestBody().id(connectionSyncRead2.getJob().getId()));
final Optional<AttemptInfoRead> result = syncJob.getAttempts().stream()
.sorted((a, b) -> Long.compare(b.getAttempt().getEndedAt(), a.getAttempt().getEndedAt()))
.findFirst();

assertTrue(result.isPresent());
assertEquals(0, result.get().getAttempt().getRecordsSynced());
jdpgrailsdev marked this conversation as resolved.
Show resolved Hide resolved
assertEquals(0, result.get().getAttempt().getTotalStats().getRecordsEmitted());
testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);
}

// This test is disabled because it takes a couple of minutes to run, as it is testing timeouts.
// It should be re-enabled when the @SlowIntegrationTest can be applied to it.
// See relevant issue: https://github.com/airbytehq/airbyte/issues/8397
@Disabled
Expand Down