diff --git a/spi/control-plane/transfer-spi/src/test/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcessTest.java b/spi/control-plane/transfer-spi/src/test/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcessTest.java index 27aad9079e..7fb191ca0c 100644 --- a/spi/control-plane/transfer-spi/src/test/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcessTest.java +++ b/spi/control-plane/transfer-spi/src/test/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcessTest.java @@ -20,6 +20,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.io.StringWriter; @@ -86,8 +87,9 @@ void verifyCopy() { assertThat(process).usingRecursiveComparison().isEqualTo(copy); } - @Test - void verifyConsumerTransitions() { + @ParameterizedTest + @ValueSource(strings = { "STARTED", "SUSPENDED"}) + void verifyConsumerTransitions(String state) { var process = TransferProcess.Builder.newInstance().id(UUID.randomUUID().toString()).type(CONSUMER).build(); process.transitionProvisioning(ResourceManifest.Builder.newInstance().build()); @@ -101,18 +103,21 @@ void verifyConsumerTransitions() { // should not set the data plane id assertThat(process.getDataPlaneId()).isNull(); - process.transitionSuspending("suspension"); - process.transitionSuspended(); - - process.transitionStarted("dataPlaneId"); - // should not set the data plane id - assertThat(process.getDataPlaneId()).isNull(); - - process.transitionCompleting(); - process.transitionCompleted(); - - process.transitionDeprovisioning(); - process.transitionDeprovisioned(); + switch (state) { + case "STARTED": + process.transitionCompleting(); + process.transitionCompleted(); + + process.transitionDeprovisioning(); + process.transitionDeprovisioned(); + break; + case "SUSPENDED": + process.transitionSuspending("suspension"); + process.transitionSuspended(); + break; + default: + throw new IllegalArgumentException("Unsupported state: " + state); + } } @Test diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndParticipant.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndParticipant.java index 1631449066..096281bd54 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndParticipant.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndParticipant.java @@ -17,17 +17,20 @@ import io.restassured.common.mapper.TypeRef; import org.assertj.core.api.ThrowingConsumer; import org.eclipse.edc.connector.controlplane.test.system.utils.Participant; +import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates; import org.eclipse.edc.spi.types.domain.DataAddress; import org.jetbrains.annotations.NotNull; import java.net.URI; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import static io.restassured.RestAssured.given; import static io.restassured.http.ContentType.JSON; import static java.io.File.separator; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.eclipse.edc.boot.BootServicesExtension.PARTICIPANT_ID; import static org.eclipse.edc.sql.testfixtures.PostgresqlEndToEndInstance.defaultDatasourceConfiguration; import static org.eclipse.edc.util.io.Ports.getFreePort; @@ -172,6 +175,10 @@ public void pullData(DataAddress edr, Map queryParams, ThrowingC assertThat(data).satisfies(bodyAssertion); } + protected void awaitTransferToBeInState(String transferProcessId, TransferProcessStates state) { + await().atMost(timeout).until(() -> getTransferProcessState(transferProcessId), it -> Objects.equals(it, state.name())); + } + @NotNull private String resourceAbsolutePath(String filename) { return System.getProperty("user.dir") + separator + "build" + separator + "resources" + separator + "test" + separator + filename; diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java index 39877bdb90..3036541641 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java @@ -15,17 +15,14 @@ package org.eclipse.edc.test.e2e; import jakarta.json.JsonObject; -import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates; import org.eclipse.edc.spi.security.Vault; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import java.time.Duration; import java.util.Map; -import java.util.Objects; import java.util.UUID; -import static org.awaitility.Awaitility.await; import static org.eclipse.edc.connector.controlplane.test.system.utils.PolicyFixtures.noConstraintPolicy; import static org.eclipse.edc.junit.testfixtures.TestUtils.getResourceFileContentAsString; @@ -70,12 +67,4 @@ protected void createResourcesOnProvider(String assetId, Map dat PROVIDER.createAsset(assetId, Map.of("description", "description"), dataAddressProperties); PROVIDER.createContractDefinition(assetId, UUID.randomUUID().toString(), noConstraintPolicyId, noConstraintPolicyId); } - - protected void awaitTransferToBeInState(TransferEndToEndParticipant participant, String transferProcessId, TransferProcessStates state) { - await().atMost(timeout).until( - () -> participant.getTransferProcessState(transferProcessId), - it -> Objects.equals(it, state.name()) - ); - } - } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java index 01dc0839a3..413bf89206 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java @@ -127,7 +127,7 @@ void httpPull_dataTransfer_withCallbacks() { .withCallbacks(callbacks) .execute(); - awaitTransferToBeInState(CONSUMER, transferProcessId, STARTED); + CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED); await().atMost(timeout).untilAsserted(() -> assertThat(events.get(transferProcessId)).isNotNull()); @@ -150,7 +150,7 @@ void httpPull_dataTransfer_withEdrCache() { .withTransferType("HttpData-PULL") .execute(); - awaitTransferToBeInState(CONSUMER, transferProcessId, STARTED); + CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED); var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); @@ -177,7 +177,7 @@ void suspendAndResumeByConsumer_httpPull_dataTransfer_withEdrCache() { .withTransferType("HttpData-PULL") .execute(); - awaitTransferToBeInState(CONSUMER, transferProcessId, STARTED); + CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED); var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); @@ -186,7 +186,7 @@ void suspendAndResumeByConsumer_httpPull_dataTransfer_withEdrCache() { CONSUMER.suspendTransfer(transferProcessId, "supension"); - awaitTransferToBeInState(CONSUMER, transferProcessId, SUSPENDED); + CONSUMER.awaitTransferToBeInState(transferProcessId, SUSPENDED); // checks that the EDR is gone once the transfer has been suspended await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(transferProcessId))); @@ -196,7 +196,7 @@ void suspendAndResumeByConsumer_httpPull_dataTransfer_withEdrCache() { CONSUMER.resumeTransfer(transferProcessId); // check that transfer is available again - awaitTransferToBeInState(CONSUMER, transferProcessId, STARTED); + CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED); var secondEdr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); var secondMessage = UUID.randomUUID().toString(); await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), body -> assertThat(body).isEqualTo("data"))); @@ -214,7 +214,7 @@ void suspendAndResumeByProvider_httpPull_dataTransfer_withEdrCache() { .withTransferType("HttpData-PULL") .execute(); - awaitTransferToBeInState(CONSUMER, consumerTransferProcessId, STARTED); + CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, STARTED); var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull); @@ -227,7 +227,7 @@ void suspendAndResumeByProvider_httpPull_dataTransfer_withEdrCache() { PROVIDER.suspendTransfer(providerTransferProcessId, "supension"); - awaitTransferToBeInState(PROVIDER, providerTransferProcessId, SUSPENDED); + PROVIDER.awaitTransferToBeInState(providerTransferProcessId, SUSPENDED); // checks that the EDR is gone once the transfer has been suspended await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(consumerTransferProcessId))); @@ -237,7 +237,7 @@ void suspendAndResumeByProvider_httpPull_dataTransfer_withEdrCache() { PROVIDER.resumeTransfer(providerTransferProcessId); // check that transfer is available again - awaitTransferToBeInState(PROVIDER, providerTransferProcessId, STARTED); + PROVIDER.awaitTransferToBeInState(providerTransferProcessId, STARTED); var secondEdr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull); var secondMessage = UUID.randomUUID().toString(); await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), body -> assertThat(body).isEqualTo("data"))); @@ -263,7 +263,7 @@ void pullFromHttp_httpProvision() { .withTransferType("HttpData-PULL") .execute(); - awaitTransferToBeInState(CONSUMER, transferProcessId, STARTED); + CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED); await().atMost(timeout).untilAsserted(() -> { var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); @@ -287,7 +287,7 @@ void shouldTerminateTransfer_whenContractExpires_fixedInForcePeriod() { .withTransferType("HttpData-PULL") .execute(); - awaitTransferToBeInState(CONSUMER, transferProcessId, TERMINATED); + CONSUMER.awaitTransferToBeInState(transferProcessId, TERMINATED); } @Test @@ -302,7 +302,7 @@ void shouldTerminateTransfer_whenContractExpires_durationInForcePeriod() { .withTransferType("HttpData-PULL") .execute(); - awaitTransferToBeInState(CONSUMER, transferProcessId, TERMINATED); + CONSUMER.awaitTransferToBeInState(transferProcessId, TERMINATED); } public JsonObject createCallback(String url, boolean transactional, Set events) { diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java index 2d9c0faf08..ae400fe707 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java @@ -82,7 +82,7 @@ void httpPushDataTransfer() { .withDestination(httpDataAddress("http://localhost:" + consumerDataDestination.getPort() + "/destination")) .withTransferType("HttpData-PUSH").execute(); - awaitTransferToBeInState(CONSUMER, transferProcessId, COMPLETED); + CONSUMER.awaitTransferToBeInState(transferProcessId, COMPLETED); providerDataSource.verify(HttpRequest.request("/source").withMethod("GET")); consumerDataDestination.verify(HttpRequest.request("/destination").withBody(BinaryBody.binary("data".getBytes()))); @@ -109,7 +109,7 @@ void httpToHttp_oauth2Provisioning() { .withDestination(httpDataAddress("http://localhost:" + consumerDataDestination.getPort() + "/destination")) .withTransferType("HttpData-PUSH").execute(); - awaitTransferToBeInState(CONSUMER, transferProcessId, COMPLETED); + CONSUMER.awaitTransferToBeInState(transferProcessId, COMPLETED); oauth2server.verify(HttpRequest.request("/token").withBody("grant_type=client_credentials&client_secret=supersecret&client_id=clientId")); providerDataSource.verify(HttpRequest.request("/source").withMethod("GET").withHeader("Authorization", "Bearer token")); diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java index 43a4e2639e..58a252df2c 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java @@ -128,7 +128,7 @@ void kafkaToHttpTransfer() { destinationServer.verify(request, atLeast(1)); }); - awaitTransferToBeInState(CONSUMER, transferProcessId, TERMINATED); + CONSUMER.awaitTransferToBeInState(transferProcessId, TERMINATED); destinationServer.clear(request) .when(request).respond(response()); @@ -157,7 +157,7 @@ void kafkaToKafkaTransfer() { .withDestination(kafkaSink()).withTransferType("Kafka-PUSH").execute(); assertMessagesAreSentTo(consumer); - awaitTransferToBeInState(CONSUMER, transferProcessId, TERMINATED); + CONSUMER.awaitTransferToBeInState(transferProcessId, TERMINATED); assertNoMoreMessagesAreSentTo(consumer); } } @@ -175,11 +175,11 @@ void shouldSuspendAndResumeTransfer() { assertMessagesAreSentTo(consumer); CONSUMER.suspendTransfer(transferProcessId, "any kind of reason"); - awaitTransferToBeInState(CONSUMER, transferProcessId, SUSPENDED); + CONSUMER.awaitTransferToBeInState(transferProcessId, SUSPENDED); assertNoMoreMessagesAreSentTo(consumer); CONSUMER.resumeTransfer(transferProcessId); - awaitTransferToBeInState(CONSUMER, transferProcessId, STARTED); + CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED); assertMessagesAreSentTo(consumer); } }