Skip to content

Commit

Permalink
fix: DOS-1413 error while resume PULL transfer from provider
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrYurk committed Nov 21, 2024
1 parent c5da66f commit b37bc95
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public void transitionStarting() {
}

public boolean canBeStartedConsumer() {
return currentStateIsOneOf(STARTED, REQUESTED, STARTING, RESUMED);
return currentStateIsOneOf(STARTED, REQUESTED, STARTING, RESUMED, SUSPENDED);
}

public void transitionStarted(String dataPlaneId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ void verifyConsumerTransitions() {
// should not set the data plane id
assertThat(process.getDataPlaneId()).isNull();

process.transitionSuspending("suspension");
process.transitionSuspended();

process.transitionStarted("dataPlaneId");
// should not set the data plane id
assertThat(process.getDataPlaneId()).isNull();

process.transitionCompleting();
process.transitionCompleted();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ protected void createResourcesOnProvider(String assetId, Map<String, Object> dat
PROVIDER.createContractDefinition(assetId, UUID.randomUUID().toString(), noConstraintPolicyId, noConstraintPolicyId);
}

protected void awaitTransferToBeInState(String transferProcessId, TransferProcessStates state) {
protected void awaitTransferToBeInState(TransferEndToEndParticipant participant, String transferProcessId, TransferProcessStates state) {
await().atMost(timeout).until(
() -> CONSUMER.getTransferProcessState(transferProcessId),
() -> participant.getTransferProcessState(transferProcessId),
it -> Objects.equals(it, state.name())
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ void httpPull_dataTransfer_withCallbacks() {
.withCallbacks(callbacks)
.execute();

awaitTransferToBeInState(transferProcessId, STARTED);
awaitTransferToBeInState(CONSUMER, transferProcessId, STARTED);

await().atMost(timeout).untilAsserted(() -> assertThat(events.get(transferProcessId)).isNotNull());

Expand All @@ -150,7 +150,7 @@ void httpPull_dataTransfer_withEdrCache() {
.withTransferType("HttpData-PULL")
.execute();

awaitTransferToBeInState(transferProcessId, STARTED);
awaitTransferToBeInState(CONSUMER, transferProcessId, STARTED);

var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull);

Expand All @@ -168,7 +168,7 @@ void httpPull_dataTransfer_withEdrCache() {
}

@Test
void suspendAndResume_httpPull_dataTransfer_withEdrCache() {
void suspendAndResumeByConsumer_httpPull_dataTransfer_withEdrCache() {
providerDataSource.when(HttpRequest.request()).respond(HttpResponse.response().withBody("data"));
var assetId = UUID.randomUUID().toString();
createResourcesOnProvider(assetId, httpSourceDataAddress());
Expand All @@ -177,7 +177,7 @@ void suspendAndResume_httpPull_dataTransfer_withEdrCache() {
.withTransferType("HttpData-PULL")
.execute();

awaitTransferToBeInState(transferProcessId, STARTED);
awaitTransferToBeInState(CONSUMER, transferProcessId, STARTED);

var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull);

Expand All @@ -186,7 +186,7 @@ void suspendAndResume_httpPull_dataTransfer_withEdrCache() {

CONSUMER.suspendTransfer(transferProcessId, "supension");

awaitTransferToBeInState(transferProcessId, SUSPENDED);
awaitTransferToBeInState(CONSUMER, transferProcessId, SUSPENDED);

// checks that the EDR is gone once the transfer has been suspended
await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(transferProcessId)));
Expand All @@ -196,14 +196,55 @@ void suspendAndResume_httpPull_dataTransfer_withEdrCache() {
CONSUMER.resumeTransfer(transferProcessId);

// check that transfer is available again
awaitTransferToBeInState(transferProcessId, STARTED);
awaitTransferToBeInState(CONSUMER, transferProcessId, STARTED);
var secondEdr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull);
var secondMessage = UUID.randomUUID().toString();
await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), body -> assertThat(body).isEqualTo("data")));

providerDataSource.verify(HttpRequest.request("/source").withMethod("GET"));
}

@Test
void suspendAndResumeByProvider_httpPull_dataTransfer_withEdrCache() {
providerDataSource.when(HttpRequest.request()).respond(HttpResponse.response().withBody("data"));
var assetId = UUID.randomUUID().toString();
createResourcesOnProvider(assetId, httpSourceDataAddress());

var consumerTransferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER)
.withTransferType("HttpData-PULL")
.execute();

awaitTransferToBeInState(CONSUMER, consumerTransferProcessId, STARTED);

var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull);

var msg = UUID.randomUUID().toString();
await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")));

var providerTransferProcessId = PROVIDER.getTransferProcesses().stream()
.filter(filter -> filter.asJsonObject().getString("correlationId").equals(consumerTransferProcessId))
.map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow();

PROVIDER.suspendTransfer(providerTransferProcessId, "supension");

awaitTransferToBeInState(PROVIDER, providerTransferProcessId, SUSPENDED);

// checks that the EDR is gone once the transfer has been suspended
await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(consumerTransferProcessId)));
// checks that transfer fails
await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))));

PROVIDER.resumeTransfer(providerTransferProcessId);

// check that transfer is available again
awaitTransferToBeInState(PROVIDER, providerTransferProcessId, STARTED);
var secondEdr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull);
var secondMessage = UUID.randomUUID().toString();
await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), body -> assertThat(body).isEqualTo("data")));

providerDataSource.verify(HttpRequest.request("/source").withMethod("GET"));
}

@Test
void pullFromHttp_httpProvision() {
providerDataSource.when(HttpRequest.request()).respond(HttpResponse.response().withBody("data"));
Expand All @@ -222,7 +263,7 @@ void pullFromHttp_httpProvision() {
.withTransferType("HttpData-PULL")
.execute();

awaitTransferToBeInState(transferProcessId, STARTED);
awaitTransferToBeInState(CONSUMER, transferProcessId, STARTED);

await().atMost(timeout).untilAsserted(() -> {
var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull);
Expand All @@ -246,7 +287,7 @@ void shouldTerminateTransfer_whenContractExpires_fixedInForcePeriod() {
.withTransferType("HttpData-PULL")
.execute();

awaitTransferToBeInState(transferProcessId, TERMINATED);
awaitTransferToBeInState(CONSUMER, transferProcessId, TERMINATED);
}

@Test
Expand All @@ -261,7 +302,7 @@ void shouldTerminateTransfer_whenContractExpires_durationInForcePeriod() {
.withTransferType("HttpData-PULL")
.execute();

awaitTransferToBeInState(transferProcessId, TERMINATED);
awaitTransferToBeInState(CONSUMER, transferProcessId, TERMINATED);
}

public JsonObject createCallback(String url, boolean transactional, Set<String> events) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ void httpPushDataTransfer() {
.withDestination(httpDataAddress("http://localhost:" + consumerDataDestination.getPort() + "/destination"))
.withTransferType("HttpData-PUSH").execute();

awaitTransferToBeInState(transferProcessId, COMPLETED);
awaitTransferToBeInState(CONSUMER, transferProcessId, COMPLETED);

providerDataSource.verify(HttpRequest.request("/source").withMethod("GET"));
consumerDataDestination.verify(HttpRequest.request("/destination").withBody(BinaryBody.binary("data".getBytes())));
Expand All @@ -109,7 +109,7 @@ void httpToHttp_oauth2Provisioning() {
.withDestination(httpDataAddress("http://localhost:" + consumerDataDestination.getPort() + "/destination"))
.withTransferType("HttpData-PUSH").execute();

awaitTransferToBeInState(transferProcessId, COMPLETED);
awaitTransferToBeInState(CONSUMER, transferProcessId, COMPLETED);

oauth2server.verify(HttpRequest.request("/token").withBody("grant_type=client_credentials&client_secret=supersecret&client_id=clientId"));
providerDataSource.verify(HttpRequest.request("/source").withMethod("GET").withHeader("Authorization", "Bearer token"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ void kafkaToHttpTransfer() {
destinationServer.verify(request, atLeast(1));
});

awaitTransferToBeInState(transferProcessId, TERMINATED);
awaitTransferToBeInState(CONSUMER, transferProcessId, TERMINATED);

destinationServer.clear(request)
.when(request).respond(response());
Expand Down Expand Up @@ -157,7 +157,7 @@ void kafkaToKafkaTransfer() {
.withDestination(kafkaSink()).withTransferType("Kafka-PUSH").execute();
assertMessagesAreSentTo(consumer);

awaitTransferToBeInState(transferProcessId, TERMINATED);
awaitTransferToBeInState(CONSUMER, transferProcessId, TERMINATED);
assertNoMoreMessagesAreSentTo(consumer);
}
}
Expand All @@ -175,11 +175,11 @@ void shouldSuspendAndResumeTransfer() {
assertMessagesAreSentTo(consumer);

CONSUMER.suspendTransfer(transferProcessId, "any kind of reason");
awaitTransferToBeInState(transferProcessId, SUSPENDED);
awaitTransferToBeInState(CONSUMER, transferProcessId, SUSPENDED);
assertNoMoreMessagesAreSentTo(consumer);

CONSUMER.resumeTransfer(transferProcessId);
awaitTransferToBeInState(transferProcessId, STARTED);
awaitTransferToBeInState(CONSUMER, transferProcessId, STARTED);
assertMessagesAreSentTo(consumer);
}
}
Expand Down

0 comments on commit b37bc95

Please sign in to comment.