diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcess.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcess.java index 189013c00c8..55ada0187d4 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcess.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcess.java @@ -288,7 +288,7 @@ public void transitionStarting() { } public boolean canBeStartedConsumer() { - return currentStateIsOneOf(STARTED, REQUESTED, STARTING, RESUMED); + return currentStateIsOneOf(STARTED, REQUESTED, STARTING, RESUMED, SUSPENDED); } public void transitionStarted(String dataPlaneId) { diff --git a/spi/control-plane/transfer-spi/src/test/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcessTest.java b/spi/control-plane/transfer-spi/src/test/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcessTest.java index 7871d8a3d96..27aad9079e8 100644 --- a/spi/control-plane/transfer-spi/src/test/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcessTest.java +++ b/spi/control-plane/transfer-spi/src/test/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcessTest.java @@ -101,6 +101,13 @@ void verifyConsumerTransitions() { // should not set the data plane id assertThat(process.getDataPlaneId()).isNull(); + process.transitionSuspending("suspension"); + process.transitionSuspended(); + + process.transitionStarted("dataPlaneId"); + // should not set the data plane id + assertThat(process.getDataPlaneId()).isNull(); + process.transitionCompleting(); process.transitionCompleted(); diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java index e3cdf7c161e..39877bdb90e 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java @@ -71,9 +71,9 @@ protected void createResourcesOnProvider(String assetId, Map dat PROVIDER.createContractDefinition(assetId, UUID.randomUUID().toString(), noConstraintPolicyId, noConstraintPolicyId); } - protected void awaitTransferToBeInState(String transferProcessId, TransferProcessStates state) { + protected void awaitTransferToBeInState(TransferEndToEndParticipant participant, String transferProcessId, TransferProcessStates state) { await().atMost(timeout).until( - () -> CONSUMER.getTransferProcessState(transferProcessId), + () -> participant.getTransferProcessState(transferProcessId), it -> Objects.equals(it, state.name()) ); } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java index 4178e1ae2f3..01dc0839a3f 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java @@ -127,7 +127,7 @@ void httpPull_dataTransfer_withCallbacks() { .withCallbacks(callbacks) .execute(); - awaitTransferToBeInState(transferProcessId, STARTED); + awaitTransferToBeInState(CONSUMER, transferProcessId, STARTED); await().atMost(timeout).untilAsserted(() -> assertThat(events.get(transferProcessId)).isNotNull()); @@ -150,7 +150,7 @@ void httpPull_dataTransfer_withEdrCache() { .withTransferType("HttpData-PULL") .execute(); - awaitTransferToBeInState(transferProcessId, STARTED); + awaitTransferToBeInState(CONSUMER, transferProcessId, STARTED); var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); @@ -168,7 +168,7 @@ void httpPull_dataTransfer_withEdrCache() { } @Test - void suspendAndResume_httpPull_dataTransfer_withEdrCache() { + void suspendAndResumeByConsumer_httpPull_dataTransfer_withEdrCache() { providerDataSource.when(HttpRequest.request()).respond(HttpResponse.response().withBody("data")); var assetId = UUID.randomUUID().toString(); createResourcesOnProvider(assetId, httpSourceDataAddress()); @@ -177,7 +177,7 @@ void suspendAndResume_httpPull_dataTransfer_withEdrCache() { .withTransferType("HttpData-PULL") .execute(); - awaitTransferToBeInState(transferProcessId, STARTED); + awaitTransferToBeInState(CONSUMER, transferProcessId, STARTED); var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); @@ -186,7 +186,7 @@ void suspendAndResume_httpPull_dataTransfer_withEdrCache() { CONSUMER.suspendTransfer(transferProcessId, "supension"); - awaitTransferToBeInState(transferProcessId, SUSPENDED); + awaitTransferToBeInState(CONSUMER, transferProcessId, SUSPENDED); // checks that the EDR is gone once the transfer has been suspended await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(transferProcessId))); @@ -196,7 +196,7 @@ void suspendAndResume_httpPull_dataTransfer_withEdrCache() { CONSUMER.resumeTransfer(transferProcessId); // check that transfer is available again - awaitTransferToBeInState(transferProcessId, STARTED); + awaitTransferToBeInState(CONSUMER, transferProcessId, STARTED); var secondEdr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); var secondMessage = UUID.randomUUID().toString(); await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), body -> assertThat(body).isEqualTo("data"))); @@ -204,6 +204,47 @@ void suspendAndResume_httpPull_dataTransfer_withEdrCache() { providerDataSource.verify(HttpRequest.request("/source").withMethod("GET")); } + @Test + void suspendAndResumeByProvider_httpPull_dataTransfer_withEdrCache() { + providerDataSource.when(HttpRequest.request()).respond(HttpResponse.response().withBody("data")); + var assetId = UUID.randomUUID().toString(); + createResourcesOnProvider(assetId, httpSourceDataAddress()); + + var consumerTransferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) + .withTransferType("HttpData-PULL") + .execute(); + + awaitTransferToBeInState(CONSUMER, consumerTransferProcessId, STARTED); + + var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull); + + var msg = UUID.randomUUID().toString(); + await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))); + + var providerTransferProcessId = PROVIDER.getTransferProcesses().stream() + .filter(filter -> filter.asJsonObject().getString("correlationId").equals(consumerTransferProcessId)) + .map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow(); + + PROVIDER.suspendTransfer(providerTransferProcessId, "supension"); + + awaitTransferToBeInState(PROVIDER, providerTransferProcessId, SUSPENDED); + + // checks that the EDR is gone once the transfer has been suspended + await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(consumerTransferProcessId))); + // checks that transfer fails + await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")))); + + PROVIDER.resumeTransfer(providerTransferProcessId); + + // check that transfer is available again + awaitTransferToBeInState(PROVIDER, providerTransferProcessId, STARTED); + var secondEdr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull); + var secondMessage = UUID.randomUUID().toString(); + await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), body -> assertThat(body).isEqualTo("data"))); + + providerDataSource.verify(HttpRequest.request("/source").withMethod("GET")); + } + @Test void pullFromHttp_httpProvision() { providerDataSource.when(HttpRequest.request()).respond(HttpResponse.response().withBody("data")); @@ -222,7 +263,7 @@ void pullFromHttp_httpProvision() { .withTransferType("HttpData-PULL") .execute(); - awaitTransferToBeInState(transferProcessId, STARTED); + awaitTransferToBeInState(CONSUMER, transferProcessId, STARTED); await().atMost(timeout).untilAsserted(() -> { var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); @@ -246,7 +287,7 @@ void shouldTerminateTransfer_whenContractExpires_fixedInForcePeriod() { .withTransferType("HttpData-PULL") .execute(); - awaitTransferToBeInState(transferProcessId, TERMINATED); + awaitTransferToBeInState(CONSUMER, transferProcessId, TERMINATED); } @Test @@ -261,7 +302,7 @@ void shouldTerminateTransfer_whenContractExpires_durationInForcePeriod() { .withTransferType("HttpData-PULL") .execute(); - awaitTransferToBeInState(transferProcessId, TERMINATED); + awaitTransferToBeInState(CONSUMER, transferProcessId, TERMINATED); } public JsonObject createCallback(String url, boolean transactional, Set events) { diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java index 4ee160a7223..2d9c0faf08f 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java @@ -82,7 +82,7 @@ void httpPushDataTransfer() { .withDestination(httpDataAddress("http://localhost:" + consumerDataDestination.getPort() + "/destination")) .withTransferType("HttpData-PUSH").execute(); - awaitTransferToBeInState(transferProcessId, COMPLETED); + awaitTransferToBeInState(CONSUMER, transferProcessId, COMPLETED); providerDataSource.verify(HttpRequest.request("/source").withMethod("GET")); consumerDataDestination.verify(HttpRequest.request("/destination").withBody(BinaryBody.binary("data".getBytes()))); @@ -109,7 +109,7 @@ void httpToHttp_oauth2Provisioning() { .withDestination(httpDataAddress("http://localhost:" + consumerDataDestination.getPort() + "/destination")) .withTransferType("HttpData-PUSH").execute(); - awaitTransferToBeInState(transferProcessId, COMPLETED); + awaitTransferToBeInState(CONSUMER, transferProcessId, COMPLETED); oauth2server.verify(HttpRequest.request("/token").withBody("grant_type=client_credentials&client_secret=supersecret&client_id=clientId")); providerDataSource.verify(HttpRequest.request("/source").withMethod("GET").withHeader("Authorization", "Bearer token")); diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java index 47eaf4184a0..43a4e2639e0 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java @@ -128,7 +128,7 @@ void kafkaToHttpTransfer() { destinationServer.verify(request, atLeast(1)); }); - awaitTransferToBeInState(transferProcessId, TERMINATED); + awaitTransferToBeInState(CONSUMER, transferProcessId, TERMINATED); destinationServer.clear(request) .when(request).respond(response()); @@ -157,7 +157,7 @@ void kafkaToKafkaTransfer() { .withDestination(kafkaSink()).withTransferType("Kafka-PUSH").execute(); assertMessagesAreSentTo(consumer); - awaitTransferToBeInState(transferProcessId, TERMINATED); + awaitTransferToBeInState(CONSUMER, transferProcessId, TERMINATED); assertNoMoreMessagesAreSentTo(consumer); } } @@ -175,11 +175,11 @@ void shouldSuspendAndResumeTransfer() { assertMessagesAreSentTo(consumer); CONSUMER.suspendTransfer(transferProcessId, "any kind of reason"); - awaitTransferToBeInState(transferProcessId, SUSPENDED); + awaitTransferToBeInState(CONSUMER, transferProcessId, SUSPENDED); assertNoMoreMessagesAreSentTo(consumer); CONSUMER.resumeTransfer(transferProcessId); - awaitTransferToBeInState(transferProcessId, STARTED); + awaitTransferToBeInState(CONSUMER, transferProcessId, STARTED); assertMessagesAreSentTo(consumer); } }