From 9f64c9d94f18ee25bc052fd045dff515dc1fb133 Mon Sep 17 00:00:00 2001 From: Enrico Risa Date: Thu, 7 Dec 2023 14:53:07 +0100 Subject: [PATCH 1/3] fix: backport of transfer process completion on EDR expired --- .../TransferProcessLocalCallback.java | 9 +++++ .../tractusx/edc/lifecycle/Participant.java | 11 ++++++ .../edc/tests/edr/AbstractRenewalEdrTest.java | 36 +++++++++++++++++-- 3 files changed, 53 insertions(+), 3 deletions(-) diff --git a/edc-extensions/edr/edr-callback/src/main/java/org/eclipse/tractusx/edc/callback/TransferProcessLocalCallback.java b/edc-extensions/edr/edr-callback/src/main/java/org/eclipse/tractusx/edc/callback/TransferProcessLocalCallback.java index 50c1791a5..e8862dbf0 100644 --- a/edc-extensions/edr/edr-callback/src/main/java/org/eclipse/tractusx/edc/callback/TransferProcessLocalCallback.java +++ b/edc-extensions/edr/edr-callback/src/main/java/org/eclipse/tractusx/edc/callback/TransferProcessLocalCallback.java @@ -105,6 +105,15 @@ private void cleanOldEdr(String assetId, String agreementId) { monitor.debug(format("Expiring EDR for transfer process %s", entry.getTransferProcessId())); entry.transitionToExpired(); edrCache.update(entry); + + var transferProcess = transferProcessStore.findById(entry.getTransferProcessId()); + + if (transferProcess != null && transferProcess.canBeCompleted()) { + transferProcess.transitionCompleting(); + transferProcessStore.save(transferProcess); + } else { + monitor.info(format("Cannot terminate transfer process with id: %s", entry.getTransferProcessId())); + } })); } diff --git a/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/lifecycle/Participant.java b/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/lifecycle/Participant.java index 36b722640..7c2eafa1b 100644 --- a/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/lifecycle/Participant.java +++ b/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/lifecycle/Participant.java @@ -283,6 +283,17 @@ public String getTransferProcessState(String id) { .extract().body().jsonPath().getString("'edc:state'"); } + public JsonArray getAllTransferProcess() { + return baseRequest() + .when() + .post("/v2/transferprocesses/request") + .then() + .statusCode(200) + .extract() + .body() + .as(JsonArray.class); + } + public EndpointDataReference getDataReference(String dataRequestId) { var dataReference = new AtomicReference(); diff --git a/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/edr/AbstractRenewalEdrTest.java b/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/edr/AbstractRenewalEdrTest.java index d0c9d1b9c..a4ca3ec3d 100644 --- a/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/edr/AbstractRenewalEdrTest.java +++ b/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/edr/AbstractRenewalEdrTest.java @@ -20,6 +20,7 @@ import okhttp3.mockwebserver.MockWebServer; import org.assertj.core.api.Condition; import org.eclipse.edc.connector.transfer.spi.event.TransferProcessCompleted; +import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates; import org.eclipse.edc.policy.model.Operator; import org.eclipse.tractusx.edc.lifecycle.Participant; import org.junit.jupiter.api.AfterEach; @@ -38,6 +39,7 @@ import static org.assertj.core.api.Assertions.anyOf; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import static org.awaitility.pollinterval.FibonacciPollInterval.fibonacci; import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.EXPIRED; import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.NEGOTIATED; @@ -106,20 +108,48 @@ void negotiateEdr_shouldRenewTheEdr() throws IOException { assertThat(expectedEvents).usingRecursiveFieldByFieldElementComparator().containsAll(events); - JsonArrayBuilder edrCaches = Json.createArrayBuilder(); + JsonArrayBuilder edrCachesBuilder = Json.createArrayBuilder(); await().atMost(ASYNC_TIMEOUT) .untilAsserted(() -> { var localEdrCaches = SOKRATES.getEdrEntriesByAssetId(assetId); assertThat(localEdrCaches).hasSizeGreaterThan(1); - localEdrCaches.forEach(edrCaches::add); + localEdrCaches.forEach(edrCachesBuilder::add); }); + var edrCaches = edrCachesBuilder.build(); - assertThat(edrCaches.build()) + assertThat(edrCaches) .extracting(json -> json.asJsonObject().getJsonString("tx:edrState").getString()) .areAtMost(1, anyOf(stateCondition(NEGOTIATED.name(), "Negotiated"), stateCondition(REFRESHING.name(), "Refreshing"))) .areAtLeast(1, stateCondition(EXPIRED.name(), "Expired")); + + var transferProcessId = edrCaches.stream() + .filter(json -> json.asJsonObject().getJsonString("tx:edrState").getString().equals(EXPIRED.name())) + .map(json -> json.asJsonObject().getJsonString("edc:transferProcessId").getString()) + .findFirst() + .orElseThrow(); + + await().pollInterval(fibonacci()) + .atMost(ASYNC_TIMEOUT) + .untilAsserted(() -> { + var tpState = SOKRATES.getTransferProcessState(transferProcessId); + assertThat(tpState).isNotNull().isEqualTo(TransferProcessStates.COMPLETED.toString()); + }); + + await().pollInterval(fibonacci()) + .atMost(ASYNC_TIMEOUT) + .untilAsserted(() -> { + var tpState = PLATO.getAllTransferProcess() + .stream() + .filter(json -> json.asJsonObject().getJsonString("edc:correlationId").getString().equals(transferProcessId)) + .map(json -> json.asJsonObject().getJsonString("edc:state").getString()) + .findFirst(); + + assertThat(tpState) + .isPresent() + .hasValue(TransferProcessStates.COMPLETED.toString()); + }); } From fb8e0ebada9f90a4814fdc69ed3a55913f42fab2 Mon Sep 17 00:00:00 2001 From: Enrico Risa Date: Thu, 14 Dec 2023 17:12:17 +0100 Subject: [PATCH 2/3] chore: update dependencies file --- DEPENDENCIES | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DEPENDENCIES b/DEPENDENCIES index 7ae8fd739..4a9045f38 100644 --- a/DEPENDENCIES +++ b/DEPENDENCIES @@ -130,7 +130,7 @@ maven/mavencentral/io.opentelemetry.instrumentation/opentelemetry-instrumentatio maven/mavencentral/io.opentelemetry/opentelemetry-api/1.29.0, Apache-2.0, approved, #10088 maven/mavencentral/io.opentelemetry/opentelemetry-context/1.29.0, Apache-2.0, approved, #10090 maven/mavencentral/io.projectreactor.netty/reactor-netty-core/1.0.33, Apache-2.0, approved, #9687 -maven/mavencentral/io.projectreactor.netty/reactor-netty-http/1.0.33, Apache-2.0, approved, clearlydefined +maven/mavencentral/io.projectreactor.netty/reactor-netty-http/1.0.33, Apache-2.0, approved, #11661 maven/mavencentral/io.projectreactor/reactor-core/3.4.30, Apache-2.0, approved, #7517 maven/mavencentral/io.rest-assured/json-path/5.3.1, Apache-2.0, approved, #9261 maven/mavencentral/io.rest-assured/rest-assured-common/5.3.1, Apache-2.0, approved, #9264 From f69827e70fce70c978770c6c6f7aff82ef1c990b Mon Sep 17 00:00:00 2001 From: Enrico Risa Date: Tue, 19 Dec 2023 09:27:27 +0100 Subject: [PATCH 3/3] pr remarks --- .../TransferProcessLocalCallback.java | 4 +-- .../tests/edr/AbstractNegotiateEdrTest.java | 4 +-- .../edc/tests/edr/AbstractRenewalEdrTest.java | 12 +++---- .../proxy/AbstractDataPlaneProxyTest.java | 29 ++++++++++----- ...AbstractHttpConsumerPullWithProxyTest.java | 2 +- .../edc/lifecycle/TestServiceExtension.java | 36 ------------------- ...rg.eclipse.edc.spi.system.ServiceExtension | 2 -- 7 files changed, 30 insertions(+), 59 deletions(-) delete mode 100644 edc-tests/runtime/extensions/src/main/java/org/eclipse/tractusx/edc/lifecycle/TestServiceExtension.java diff --git a/edc-extensions/edr/edr-callback/src/main/java/org/eclipse/tractusx/edc/callback/TransferProcessLocalCallback.java b/edc-extensions/edr/edr-callback/src/main/java/org/eclipse/tractusx/edc/callback/TransferProcessLocalCallback.java index e8862dbf0..3c32d2617 100644 --- a/edc-extensions/edr/edr-callback/src/main/java/org/eclipse/tractusx/edc/callback/TransferProcessLocalCallback.java +++ b/edc-extensions/edr/edr-callback/src/main/java/org/eclipse/tractusx/edc/callback/TransferProcessLocalCallback.java @@ -108,8 +108,8 @@ private void cleanOldEdr(String assetId, String agreementId) { var transferProcess = transferProcessStore.findById(entry.getTransferProcessId()); - if (transferProcess != null && transferProcess.canBeCompleted()) { - transferProcess.transitionCompleting(); + if (transferProcess != null && transferProcess.canBeTerminated()) { + transferProcess.transitionTerminating(); transferProcessStore.save(transferProcess); } else { monitor.info(format("Cannot terminate transfer process with id: %s", entry.getTransferProcessId())); diff --git a/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/edr/AbstractNegotiateEdrTest.java b/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/edr/AbstractNegotiateEdrTest.java index 9e6d87a40..93832459d 100644 --- a/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/edr/AbstractNegotiateEdrTest.java +++ b/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/edr/AbstractNegotiateEdrTest.java @@ -22,7 +22,6 @@ import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationInitiated; import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationRequested; import org.eclipse.edc.connector.contract.spi.event.contractnegotiation.ContractNegotiationVerified; -import org.eclipse.edc.connector.transfer.spi.event.TransferProcessCompleted; import org.eclipse.edc.connector.transfer.spi.event.TransferProcessInitiated; import org.eclipse.edc.connector.transfer.spi.event.TransferProcessProvisioned; import org.eclipse.edc.connector.transfer.spi.event.TransferProcessRequested; @@ -78,8 +77,7 @@ void negotiateEdr_shouldInvokeCallbacks() throws IOException { createEvent(TransferProcessInitiated.class), createEvent(TransferProcessProvisioned.class), createEvent(TransferProcessRequested.class), - createEvent(TransferProcessStarted.class), - createEvent(TransferProcessCompleted.class)); + createEvent(TransferProcessStarted.class)); var assetId = "api-asset-1"; var url = server.url("/mock/api"); diff --git a/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/edr/AbstractRenewalEdrTest.java b/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/edr/AbstractRenewalEdrTest.java index a4ca3ec3d..3c31ac5b5 100644 --- a/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/edr/AbstractRenewalEdrTest.java +++ b/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/edr/AbstractRenewalEdrTest.java @@ -19,7 +19,7 @@ import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import org.assertj.core.api.Condition; -import org.eclipse.edc.connector.transfer.spi.event.TransferProcessCompleted; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted; import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates; import org.eclipse.edc.policy.model.Operator; import org.eclipse.tractusx.edc.lifecycle.Participant; @@ -72,8 +72,8 @@ void setup() { void negotiateEdr_shouldRenewTheEdr() throws IOException { var expectedEvents = List.of( - createEvent(TransferProcessCompleted.class), - createEvent(TransferProcessCompleted.class)); + createEvent(TransferProcessStarted.class), + createEvent(TransferProcessStarted.class)); var assetId = UUID.randomUUID().toString(); var url = server.url("/mock/api"); @@ -95,7 +95,7 @@ void negotiateEdr_shouldRenewTheEdr() throws IOException { PLATO.createContractDefinition(assetId, "def-1", "policy-1", "policy-2"); var callbacks = Json.createArrayBuilder() - .add(createCallback(url.toString(), true, Set.of("transfer.process.completed"))) + .add(createCallback(url.toString(), true, Set.of("transfer.process.started"))) .build(); expectedEvents.forEach(event -> server.enqueue(new MockResponse())); @@ -134,7 +134,7 @@ void negotiateEdr_shouldRenewTheEdr() throws IOException { .atMost(ASYNC_TIMEOUT) .untilAsserted(() -> { var tpState = SOKRATES.getTransferProcessState(transferProcessId); - assertThat(tpState).isNotNull().isEqualTo(TransferProcessStates.COMPLETED.toString()); + assertThat(tpState).isNotNull().isEqualTo(TransferProcessStates.TERMINATED.toString()); }); await().pollInterval(fibonacci()) @@ -148,7 +148,7 @@ void negotiateEdr_shouldRenewTheEdr() throws IOException { assertThat(tpState) .isPresent() - .hasValue(TransferProcessStates.COMPLETED.toString()); + .hasValue(TransferProcessStates.TERMINATED.toString()); }); } diff --git a/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/proxy/AbstractDataPlaneProxyTest.java b/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/proxy/AbstractDataPlaneProxyTest.java index 37f3df3fe..07d060881 100644 --- a/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/proxy/AbstractDataPlaneProxyTest.java +++ b/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/proxy/AbstractDataPlaneProxyTest.java @@ -21,7 +21,7 @@ import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; -import org.eclipse.edc.connector.transfer.spi.event.TransferProcessCompleted; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted; import org.eclipse.edc.policy.model.Operator; import org.eclipse.edc.spi.event.EventEnvelope; import org.eclipse.tractusx.edc.lifecycle.Participant; @@ -32,11 +32,15 @@ import org.junit.jupiter.api.Test; import java.io.IOException; +import java.time.Duration; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; +import static java.time.Duration.ofSeconds; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.awaitility.pollinterval.FibonacciPollInterval.fibonacci; import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; import static org.eclipse.tractusx.edc.helpers.EdrNegotiationHelperFunctions.createCallback; import static org.eclipse.tractusx.edc.helpers.PolicyHelperFunctions.businessPartnerGroupPolicy; @@ -58,6 +62,8 @@ public abstract class AbstractDataPlaneProxyTest { private static final String CUSTOM_QUERY_PARAMS = "foo=bar"; + private static final Duration ASYNC_TIMEOUT = ofSeconds(45); + private static final String CUSTOM_FULL_PATH = CUSTOM_BASE_PATH + CUSTOM_SUB_PATH + "?" + CUSTOM_QUERY_PARAMS; private final ObjectMapper mapper = new ObjectMapper(); private MockWebServer server; @@ -86,7 +92,7 @@ void httpPullDataTransfer_withEdrAndProxy() { PLATO.createContractDefinition(assetId, "def-1", "policy-1", "policy-2"); var callbacks = Json.createArrayBuilder() - .add(createCallback(eventsUrl.toString(), true, Set.of("transfer.process.completed"))) + .add(createCallback(eventsUrl.toString(), true, Set.of("transfer.process.started"))) .build(); // response to callback @@ -159,7 +165,7 @@ void httpPullDataTransfer_shouldFailForAsset_withTwoEdrAndProxy() throws IOExcep PLATO.createContractDefinition(assetId, "def-1", "policy-1", "policy-2"); var callbacks = Json.createArrayBuilder() - .add(createCallback(eventsUrl.toString(), true, Set.of("transfer.process.completed"))) + .add(createCallback(eventsUrl.toString(), true, Set.of("transfer.process.started"))) .build(); // response to callback @@ -174,9 +180,14 @@ void httpPullDataTransfer_shouldFailForAsset_withTwoEdrAndProxy() throws IOExcep var body = "{\"response\": \"ok\"}"; - server.enqueue(new MockResponse().setBody(body)); - SOKRATES.pullProxyDataResponseByAssetId(PLATO, assetId).then() - .assertThat().statusCode(428); + await().pollInterval(fibonacci()) + .atMost(ASYNC_TIMEOUT) + .untilAsserted(() -> { + server.enqueue(new MockResponse().setBody(body)); + SOKRATES.pullProxyDataResponseByAssetId(PLATO, assetId).then() + .assertThat().statusCode(428); + }); + server.enqueue(new MockResponse().setBody(body)); var data = SOKRATES.pullProxyDataByTransferProcessId(PLATO, transferEvent1.getPayload().getTransferProcessId()); @@ -210,7 +221,7 @@ void httpPullDataTransfer_withEdrAndProviderDataPlaneProxy() throws IOException PLATO.createContractDefinition(assetId, "def-1", "policy-1", "policy-2"); var callbacks = Json.createArrayBuilder() - .add(createCallback(eventsUrl.toString(), true, Set.of("transfer.process.completed"))) + .add(createCallback(eventsUrl.toString(), true, Set.of("transfer.process.started"))) .build(); // response to callback @@ -272,7 +283,7 @@ public MockResponse dispatch(@NotNull RecordedRequest recordedRequest) throws In PLATO.createContractDefinition(assetId, "def-1", "policy-1", "policy-2"); var callbacks = Json.createArrayBuilder() - .add(createCallback(eventsUrl.toString(), true, Set.of("transfer.process.completed"))) + .add(createCallback(eventsUrl.toString(), true, Set.of("transfer.process.started"))) .build(); SOKRATES.negotiateEdr(PLATO, assetId, callbacks); @@ -295,7 +306,7 @@ void teardown() throws IOException { server.shutdown(); } - private EventEnvelope waitForTransferCompletion() { + private EventEnvelope waitForTransferCompletion() { try { var request = server.takeRequest(60, TimeUnit.SECONDS); if (request != null) { diff --git a/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/transfer/AbstractHttpConsumerPullWithProxyTest.java b/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/transfer/AbstractHttpConsumerPullWithProxyTest.java index 4f5fcda03..56cf54b06 100644 --- a/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/transfer/AbstractHttpConsumerPullWithProxyTest.java +++ b/edc-tests/e2e-tests/src/test/java/org/eclipse/tractusx/edc/tests/transfer/AbstractHttpConsumerPullWithProxyTest.java @@ -110,7 +110,7 @@ void transferData_privateBackend() throws IOException, InterruptedException { .atMost(ASYNC_TIMEOUT) .untilAsserted(() -> { var tpState = SOKRATES.getTransferProcessState(transferProcessId.get()); - assertThat(tpState).isNotNull().isEqualTo(TransferProcessStates.COMPLETED.toString()); + assertThat(tpState).isNotNull().isEqualTo(TransferProcessStates.STARTED.toString()); }); // wait until EDC is available on the consumer side diff --git a/edc-tests/runtime/extensions/src/main/java/org/eclipse/tractusx/edc/lifecycle/TestServiceExtension.java b/edc-tests/runtime/extensions/src/main/java/org/eclipse/tractusx/edc/lifecycle/TestServiceExtension.java deleted file mode 100644 index b498f185b..000000000 --- a/edc-tests/runtime/extensions/src/main/java/org/eclipse/tractusx/edc/lifecycle/TestServiceExtension.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation - * - */ - -package org.eclipse.tractusx.edc.lifecycle; - -import org.eclipse.edc.connector.transfer.spi.status.StatusCheckerRegistry; -import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; -import org.eclipse.edc.runtime.metamodel.annotation.Extension; -import org.eclipse.edc.runtime.metamodel.annotation.Inject; -import org.eclipse.edc.spi.system.ServiceExtension; -import org.eclipse.edc.spi.system.ServiceExtensionContext; - -@Extension(value = "Extension used to inject dummy services into E2E runtimes") -public class TestServiceExtension implements ServiceExtension { - - @Inject - private StatusCheckerRegistry registry; - - @Override - public void initialize(ServiceExtensionContext context) { - // takes care that ongoing HTTP transfers are actually completed, otherwise they would - // always stay in the "STARTED" state - registry.register("HttpProxy", (tp, r) -> tp.getType() == TransferProcess.Type.CONSUMER); - } -} diff --git a/edc-tests/runtime/extensions/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension b/edc-tests/runtime/extensions/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension index c3abfd832..75f82b423 100644 --- a/edc-tests/runtime/extensions/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension +++ b/edc-tests/runtime/extensions/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension @@ -14,5 +14,3 @@ org.eclipse.tractusx.edc.lifecycle.ConsumerServicesExtension org.eclipse.tractusx.edc.lifecycle.VaultSeedExtension -org.eclipse.tractusx.edc.lifecycle.TestServiceExtension -