From 240277fe5b740f9a66b92f573140b7bdb96a0a91 Mon Sep 17 00:00:00 2001 From: AndrYurk Date: Tue, 17 Dec 2024 22:47:44 +0100 Subject: [PATCH 1/9] fix:Termination of pull transfer process from consumer side --- .../TransferProcessProtocolServiceImpl.java | 10 + ...ransferProcessProtocolServiceImplTest.java | 225 ++++++++++-------- .../test/system/utils/Participant.java | 7 + .../test/e2e/TransferEndToEndTestBase.java | 5 +- .../test/e2e/TransferPullEndToEndTest.java | 63 ++--- 5 files changed, 185 insertions(+), 125 deletions(-) diff --git a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImpl.java b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImpl.java index 84d20b421b0..da6d6b2eae8 100644 --- a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImpl.java +++ b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImpl.java @@ -232,12 +232,22 @@ private ServiceResult suspendedAction(TransferSuspensionMessage @NotNull private ServiceResult terminatedAction(TransferTerminationMessage message, TransferProcess transferProcess) { + if (transferProcess.getType() == PROVIDER) { + var termination = dataFlowManager.terminate(transferProcess); + if (termination.failed()) { + return ServiceResult.conflict("Cannot terminate transfer process %s: %s".formatted(transferProcess.getId(), termination.getFailureDetail())); + } + } if (transferProcess.canBeTerminated()) { observable.invokeForEach(l -> l.preTerminated(transferProcess)); transferProcess.transitionTerminated(); transferProcess.protocolMessageReceived(message.getId()); update(transferProcess); observable.invokeForEach(l -> l.terminated(transferProcess)); + if (transferProcess.getType() == PROVIDER) { + transferProcess.transitionDeprovisioning(); + this.update(transferProcess); + } return ServiceResult.success(transferProcess); } else { return ServiceResult.conflict(format("Cannot process %s because %s", message.getClass().getSimpleName(), "transfer cannot be terminated")); diff --git a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImplTest.java b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImplTest.java index e78e636a7e5..2043d59ec3b 100644 --- a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImplTest.java +++ b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImplTest.java @@ -310,99 +310,6 @@ void notifyCompleted_shouldReturnBadRequest_whenCounterPartyUnauthorized() { } - @Test - void notifyTerminated_shouldTransitionToTerminated() { - var participantAgent = participantAgent(); - var tokenRepresentation = tokenRepresentation(); - var message = TransferTerminationMessage.Builder.newInstance() - .protocol("protocol") - .consumerPid("consumerPid") - .providerPid("providerPid") - .counterPartyAddress("http://any") - .processId("correlationId") - .code("TestCode") - .reason("TestReason") - .build(); - var agreement = contractAgreement(); - var transferProcess = transferProcess(STARTED, "transferProcessId"); - - when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent)); - when(store.findById("correlationId")).thenReturn(transferProcess); - when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess)); - when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); - when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success()); - var result = service.notifyTerminated(message, tokenRepresentation); - - assertThat(result).isSucceeded(); - verify(listener).preTerminated(any()); - verify(store).save(argThat(t -> t.getState() == TERMINATED.code())); - verify(listener).terminated(any()); - verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class)); - } - - @Test - void notifyTerminated_shouldReturnConflict_whenTransferProcessCannotBeTerminated() { - var participantAgent = participantAgent(); - var tokenRepresentation = tokenRepresentation(); - var transferProcess = transferProcess(DEPROVISIONING, UUID.randomUUID().toString()); - var agreement = contractAgreement(); - var message = TransferTerminationMessage.Builder.newInstance() - .protocol("protocol") - .consumerPid("consumerPid") - .providerPid("providerPid") - .counterPartyAddress("http://any") - .processId("correlationId") - .code("TestCode") - .reason("TestReason") - .build(); - - when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent)); - when(store.findById("correlationId")).thenReturn(transferProcess); - when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess)); - when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); - when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success()); - - var result = service.notifyTerminated(message, tokenRepresentation); - - assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(CONFLICT); - // state didn't change - verify(store, times(1)).save(argThat(tp -> tp.getState() == DEPROVISIONING.code())); - verifyNoInteractions(listener); - } - - @Test - void notifyTerminated_shouldReturnBadRequest_whenCounterPartyUnauthorized() { - var participantAgent = participantAgent(); - var tokenRepresentation = tokenRepresentation(); - var agreement = contractAgreement(); - var transferProcess = transferProcess(TERMINATED, UUID.randomUUID().toString()); - var message = TransferTerminationMessage.Builder.newInstance() - .protocol("protocol") - .consumerPid("consumerPid") - .providerPid("providerPid") - .counterPartyAddress("http://any") - .processId("correlationId") - .code("TestCode") - .reason("TestReason") - .build(); - - when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent)); - when(store.findById("correlationId")).thenReturn(transferProcess); - when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess)); - when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); - when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.failure("error")); - - var result = service.notifyTerminated(message, tokenRepresentation); - - assertThat(result) - .isFailed() - .extracting(ServiceFailure::getReason) - .isEqualTo(BAD_REQUEST); - - verify(store, times(1)).save(any()); - - } - @Test void findById_shouldReturnTransferProcess_whenValidCounterParty() { var participantAgent = participantAgent(); @@ -881,6 +788,135 @@ void shouldReturnBadRequest_whenCounterPartyUnauthorized() { } } + @Nested + class NotifyTerminated { + + @Test + void consumer_shouldTransitionToTerminated() { + var participantAgent = participantAgent(); + var tokenRepresentation = tokenRepresentation(); + var message = TransferTerminationMessage.Builder.newInstance() + .protocol("protocol") + .consumerPid("consumerPid") + .providerPid("providerPid") + .counterPartyAddress("http://any") + .processId("correlationId") + .code("TestCode") + .reason("TestReason") + .build(); + var agreement = contractAgreement(); + var transferProcess = transferProcess(STARTED, "transferProcessId"); + + when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent)); + when(store.findById("correlationId")).thenReturn(transferProcess); + when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess)); + when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); + when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success()); + var result = service.notifyTerminated(message, tokenRepresentation); + + assertThat(result).isSucceeded(); + verify(listener).preTerminated(any()); + verify(store).save(argThat(t -> t.getState() == TERMINATED.code())); + verify(listener).terminated(any()); + verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class)); + } + + @Test + void provider_shouldTerminateDataFlowAndTransitionToTerminated() { + var participantAgent = participantAgent(); + var tokenRepresentation = tokenRepresentation(); + var message = TransferTerminationMessage.Builder.newInstance() + .protocol("protocol") + .consumerPid("consumerPid") + .providerPid("providerPid") + .counterPartyAddress("http://any") + .processId("correlationId") + .code("TestCode") + .reason("TestReason") + .build(); + var agreement = contractAgreement(); + var transferProcess = transferProcessBuilder().state(STARTED.code()).type(PROVIDER).build(); + + when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent)); + when(store.findById("correlationId")).thenReturn(transferProcess); + when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess)); + when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); + when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success()); + when(dataFlowManager.terminate(any())).thenReturn(StatusResult.success()); + var result = service.notifyTerminated(message, tokenRepresentation); + + + assertThat(result).isSucceeded(); + verify(listener).preTerminated(any()); + verify(store, atLeastOnce()).save(argThat(t -> t.getState() == DEPROVISIONING.code())); + verify(listener).terminated(any()); + verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class)); + verify(dataFlowManager).terminate(any()); + } + + @Test + void shouldReturnConflict_whenTransferProcessCannotBeTerminated() { + var participantAgent = participantAgent(); + var tokenRepresentation = tokenRepresentation(); + var transferProcess = transferProcess(DEPROVISIONING, UUID.randomUUID().toString()); + var agreement = contractAgreement(); + var message = TransferTerminationMessage.Builder.newInstance() + .protocol("protocol") + .consumerPid("consumerPid") + .providerPid("providerPid") + .counterPartyAddress("http://any") + .processId("correlationId") + .code("TestCode") + .reason("TestReason") + .build(); + + when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent)); + when(store.findById("correlationId")).thenReturn(transferProcess); + when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess)); + when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); + when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success()); + + var result = service.notifyTerminated(message, tokenRepresentation); + + assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(CONFLICT); + // state didn't change + verify(store, times(1)).save(argThat(tp -> tp.getState() == DEPROVISIONING.code())); + verifyNoInteractions(listener); + } + + @Test + void shouldReturnBadRequest_whenCounterPartyUnauthorized() { + var participantAgent = participantAgent(); + var tokenRepresentation = tokenRepresentation(); + var agreement = contractAgreement(); + var transferProcess = transferProcess(TERMINATED, UUID.randomUUID().toString()); + var message = TransferTerminationMessage.Builder.newInstance() + .protocol("protocol") + .consumerPid("consumerPid") + .providerPid("providerPid") + .counterPartyAddress("http://any") + .processId("correlationId") + .code("TestCode") + .reason("TestReason") + .build(); + + when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent)); + when(store.findById("correlationId")).thenReturn(transferProcess); + when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess)); + when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); + when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.failure("error")); + + var result = service.notifyTerminated(message, tokenRepresentation); + + assertThat(result) + .isFailed() + .extracting(ServiceFailure::getReason) + .isEqualTo(BAD_REQUEST); + + verify(store, times(1)).save(any()); + } + } + @Nested class IdempotencyProcessStateReplication { @@ -897,12 +933,13 @@ void notify_shouldStoreReceivedMessageId(Method when(validationService.validateAgreement(any(ParticipantAgent.class), any())).thenAnswer(i -> Result.success(i.getArgument(1))); when(validationService.validateRequest(any(ParticipantAgent.class), isA(ContractAgreement.class))).thenReturn(Result.success()); when(dataFlowManager.suspend(any())).thenReturn(StatusResult.success()); + when(dataFlowManager.terminate(any())).thenReturn(StatusResult.success()); var result = methodCall.call(service, message, tokenRepresentation()); assertThat(result).isSucceeded(); var captor = ArgumentCaptor.forClass(TransferProcess.class); - verify(store).save(captor.capture()); + verify(store, atLeastOnce()).save(captor.capture()); var storedTransferProcess = captor.getValue(); assertThat(storedTransferProcess.getProtocolMessages().isAlreadyReceived(message.getId())).isTrue(); } diff --git a/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java b/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java index 480f0de4945..9b3a1a7e77f 100644 --- a/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java +++ b/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java @@ -58,6 +58,8 @@ */ public class Participant { + public static final String CONSUMER = "consumer"; + public static final String PROVIDER = "provider"; protected String id; protected String name; protected Endpoint managementEndpoint; @@ -519,6 +521,11 @@ public void awaitTransferToBeInState(String transferProcessId, TransferProcessSt await().atMost(timeout).until(() -> getTransferProcessState(transferProcessId), it -> Objects.equals(it, state.name())); } + @Override + public String toString() { + return name; + } + protected String getContractNegotiationField(String negotiationId, String fieldName) { return managementEndpoint.baseRequest() .contentType(JSON) diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java index 3036541641b..cd270f2657f 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java @@ -15,6 +15,7 @@ package org.eclipse.edc.test.e2e; import jakarta.json.JsonObject; +import org.eclipse.edc.connector.controlplane.test.system.utils.Participant; import org.eclipse.edc.spi.security.Vault; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -29,11 +30,11 @@ public abstract class TransferEndToEndTestBase { protected static final TransferEndToEndParticipant CONSUMER = TransferEndToEndParticipant.Builder.newInstance() - .name("consumer") + .name(Participant.CONSUMER) .id("urn:connector:consumer") .build(); protected static final TransferEndToEndParticipant PROVIDER = TransferEndToEndParticipant.Builder.newInstance() - .name("provider") + .name(Participant.PROVIDER) .id("urn:connector:provider") .build(); diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java index ad6a6629402..165f961abd6 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java @@ -24,6 +24,7 @@ import jakarta.json.JsonObject; import okhttp3.Request; import okhttp3.RequestBody; +import org.eclipse.edc.connector.controlplane.test.system.utils.Participant; import org.eclipse.edc.connector.controlplane.test.system.utils.PolicyFixtures; import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessStarted; import org.eclipse.edc.junit.annotations.EndToEndTest; @@ -41,6 +42,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.mockserver.integration.ClientAndServer; import org.mockserver.mock.action.ExpectationResponseCallback; import org.mockserver.model.HttpRequest; @@ -55,6 +58,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; +import java.util.stream.Stream; import static java.time.Duration.ofDays; import static java.util.concurrent.TimeUnit.SECONDS; @@ -167,47 +171,59 @@ void httpPull_dataTransfer_withEdrCache() { providerDataSource.verify(request("/source").withMethod("GET")); } - @Test - void suspendAndResumeByConsumer_httpPull_dataTransfer_withEdrCache() { + private static Stream participants() { + return Stream.of(CONSUMER, PROVIDER); + } + + @ParameterizedTest(name = "Suspend and resume httpPull dataTransfer with Edr cache by {0}") + @MethodSource("participants") + void suspendAndResume_httpPull_dataTransfer_withEdrCache(Participant participant) { var assetId = UUID.randomUUID().toString(); createResourcesOnProvider(assetId, httpSourceDataAddress()); - var transferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) + var consumerTransferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) .withTransferType("HttpData-PULL") .execute(); - CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED); + CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, STARTED); - var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); + var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull); var msg = UUID.randomUUID().toString(); await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))); - CONSUMER.suspendTransfer(transferProcessId, "supension"); + var providerTransferProcessId = PROVIDER.getTransferProcesses().stream() + .filter(filter -> filter.asJsonObject().getString("correlationId").equals(consumerTransferProcessId)) + .map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow(); + + participant.suspendTransfer(Participant.PROVIDER.equals(participant.getName()) ? providerTransferProcessId : consumerTransferProcessId, "supension"); - CONSUMER.awaitTransferToBeInState(transferProcessId, SUSPENDED); + + CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, SUSPENDED); + PROVIDER.awaitTransferToBeInState(providerTransferProcessId, SUSPENDED); // checks that the EDR is gone once the transfer has been suspended - await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(transferProcessId))); + await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(consumerTransferProcessId))); // checks that transfer fails await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")))); - CONSUMER.resumeTransfer(transferProcessId); + participant.resumeTransfer(Participant.PROVIDER.equals(participant.getName()) ? providerTransferProcessId : consumerTransferProcessId); // check that transfer is available again - CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED); - var secondEdr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); + CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, STARTED); + PROVIDER.awaitTransferToBeInState(providerTransferProcessId, STARTED); + var secondEdr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull); var secondMessage = UUID.randomUUID().toString(); await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), body -> assertThat(body).isEqualTo("data"))); providerDataSource.verify(request("/source").withMethod("GET")); } - @Test - void suspendAndResumeByProvider_httpPull_dataTransfer_withEdrCache() { + @ParameterizedTest(name = "Terminate httpPull dataTransfer by {0}") + @MethodSource("participants") + void terminate_httpPull_dataTransfer(Participant participant) { var assetId = UUID.randomUUID().toString(); createResourcesOnProvider(assetId, httpSourceDataAddress()); - var consumerTransferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) .withTransferType("HttpData-PULL") .execute(); @@ -216,6 +232,7 @@ void suspendAndResumeByProvider_httpPull_dataTransfer_withEdrCache() { var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull); + // Do the transfer var msg = UUID.randomUUID().toString(); await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))); @@ -223,23 +240,11 @@ void suspendAndResumeByProvider_httpPull_dataTransfer_withEdrCache() { .filter(filter -> filter.asJsonObject().getString("correlationId").equals(consumerTransferProcessId)) .map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow(); - PROVIDER.suspendTransfer(providerTransferProcessId, "supension"); - - PROVIDER.awaitTransferToBeInState(providerTransferProcessId, SUSPENDED); + participant.terminateTransfer(Participant.PROVIDER.equals(participant.getName()) ? providerTransferProcessId : consumerTransferProcessId); + CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, TERMINATED); + PROVIDER.awaitTransferToBeInState(providerTransferProcessId, DEPROVISIONED); - // checks that the EDR is gone once the transfer has been suspended - await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(consumerTransferProcessId))); - // checks that transfer fails await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")))); - - PROVIDER.resumeTransfer(providerTransferProcessId); - - // check that transfer is available again - PROVIDER.awaitTransferToBeInState(providerTransferProcessId, STARTED); - var secondEdr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull); - var secondMessage = UUID.randomUUID().toString(); - await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), body -> assertThat(body).isEqualTo("data"))); - providerDataSource.verify(request("/source").withMethod("GET")); } From c22f6a8eb0b7985e8a9d045aa60bfde11915d2f5 Mon Sep 17 00:00:00 2001 From: AndrYurk Date: Wed, 18 Dec 2024 23:25:52 +0100 Subject: [PATCH 2/9] fix:Termination of pull transfer process from consumer side --- .../TransferProcessProtocolServiceImpl.java | 26 ++-- ...ransferProcessProtocolServiceImplTest.java | 2 +- .../test/system/utils/Participant.java | 12 +- .../test/e2e/TransferEndToEndTestBase.java | 5 +- .../test/e2e/TransferPullEndToEndTest.java | 145 ++++++++++-------- .../e2e/TransferStreamingEndToEndTest.java | 2 +- 6 files changed, 101 insertions(+), 91 deletions(-) diff --git a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImpl.java b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImpl.java index da6d6b2eae8..3cd4b9e2c22 100644 --- a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImpl.java +++ b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImpl.java @@ -212,13 +212,13 @@ private ServiceResult completedAction(TransferCompletionMessage @NotNull private ServiceResult suspendedAction(TransferSuspensionMessage message, TransferProcess transferProcess) { - if (transferProcess.getType() == PROVIDER) { - var suspension = dataFlowManager.suspend(transferProcess); - if (suspension.failed()) { - return ServiceResult.conflict("Cannot suspend transfer process %s: %s".formatted(transferProcess.getId(), suspension.getFailureDetail())); - } - } if (transferProcess.canBeTerminated()) { + if (transferProcess.getType() == PROVIDER) { + var suspension = dataFlowManager.suspend(transferProcess); + if (suspension.failed()) { + return ServiceResult.conflict("Cannot suspend transfer process %s: %s".formatted(transferProcess.getId(), suspension.getFailureDetail())); + } + } var reason = message.getReason().stream().map(Object::toString).collect(joining(", ")); transferProcess.transitionSuspended(reason); transferProcess.protocolMessageReceived(message.getId()); @@ -232,13 +232,13 @@ private ServiceResult suspendedAction(TransferSuspensionMessage @NotNull private ServiceResult terminatedAction(TransferTerminationMessage message, TransferProcess transferProcess) { - if (transferProcess.getType() == PROVIDER) { - var termination = dataFlowManager.terminate(transferProcess); - if (termination.failed()) { - return ServiceResult.conflict("Cannot terminate transfer process %s: %s".formatted(transferProcess.getId(), termination.getFailureDetail())); - } - } if (transferProcess.canBeTerminated()) { + if (transferProcess.getType() == PROVIDER) { + var termination = dataFlowManager.terminate(transferProcess); + if (termination.failed()) { + return ServiceResult.conflict("Cannot terminate transfer process %s: %s".formatted(transferProcess.getId(), termination.getFailureDetail())); + } + } observable.invokeForEach(l -> l.preTerminated(transferProcess)); transferProcess.transitionTerminated(); transferProcess.protocolMessageReceived(message.getId()); @@ -246,7 +246,7 @@ private ServiceResult terminatedAction(TransferTerminationMessa observable.invokeForEach(l -> l.terminated(transferProcess)); if (transferProcess.getType() == PROVIDER) { transferProcess.transitionDeprovisioning(); - this.update(transferProcess); + update(transferProcess); } return ServiceResult.success(transferProcess); } else { diff --git a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImplTest.java b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImplTest.java index 2043d59ec3b..018c518ea7c 100644 --- a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImplTest.java +++ b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImplTest.java @@ -822,7 +822,7 @@ void consumer_shouldTransitionToTerminated() { } @Test - void provider_shouldTerminateDataFlowAndTransitionToTerminated() { + void provider_shouldTerminateDataFlowAndTransitionToDeprovisioning() { var participantAgent = participantAgent(); var tokenRepresentation = tokenRepresentation(); var message = TransferTerminationMessage.Builder.newInstance() diff --git a/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java b/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java index 9b3a1a7e77f..88c59347f45 100644 --- a/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java +++ b/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java @@ -57,9 +57,6 @@ * Essentially a wrapper around the management API enabling to test interactions with other participants, eg. catalog, transfer... */ public class Participant { - - public static final String CONSUMER = "consumer"; - public static final String PROVIDER = "provider"; protected String id; protected String name; protected Endpoint managementEndpoint; @@ -460,11 +457,11 @@ public String getTransferProcessState(String id) { * * @param id transfer process id. */ - public void suspendTransfer(String id, String reason) { + public void suspendTransfer(String id) { var requestBodyBuilder = createObjectBuilder() .add(CONTEXT, createObjectBuilder().add(VOCAB, EDC_NAMESPACE)) .add(TYPE, "SuspendTransfer") - .add("reason", reason); + .add("reason", "any reason"); managementEndpoint.baseRequest() .contentType(JSON) @@ -521,11 +518,6 @@ public void awaitTransferToBeInState(String transferProcessId, TransferProcessSt await().atMost(timeout).until(() -> getTransferProcessState(transferProcessId), it -> Objects.equals(it, state.name())); } - @Override - public String toString() { - return name; - } - protected String getContractNegotiationField(String negotiationId, String fieldName) { return managementEndpoint.baseRequest() .contentType(JSON) diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java index cd270f2657f..3036541641b 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java @@ -15,7 +15,6 @@ package org.eclipse.edc.test.e2e; import jakarta.json.JsonObject; -import org.eclipse.edc.connector.controlplane.test.system.utils.Participant; import org.eclipse.edc.spi.security.Vault; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -30,11 +29,11 @@ public abstract class TransferEndToEndTestBase { protected static final TransferEndToEndParticipant CONSUMER = TransferEndToEndParticipant.Builder.newInstance() - .name(Participant.CONSUMER) + .name("consumer") .id("urn:connector:consumer") .build(); protected static final TransferEndToEndParticipant PROVIDER = TransferEndToEndParticipant.Builder.newInstance() - .name(Participant.PROVIDER) + .name("provider") .id("urn:connector:provider") .build(); diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java index 165f961abd6..2cff965450d 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java @@ -24,7 +24,6 @@ import jakarta.json.JsonObject; import okhttp3.Request; import okhttp3.RequestBody; -import org.eclipse.edc.connector.controlplane.test.system.utils.Participant; import org.eclipse.edc.connector.controlplane.test.system.utils.PolicyFixtures; import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessStarted; import org.eclipse.edc.junit.annotations.EndToEndTest; @@ -34,6 +33,7 @@ import org.eclipse.edc.spi.EdcException; import org.eclipse.edc.spi.event.EventEnvelope; import org.eclipse.edc.spi.security.Vault; +import org.eclipse.edc.spi.types.domain.DataAddress; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -42,8 +42,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.RegisterExtension; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; import org.mockserver.integration.ClientAndServer; import org.mockserver.mock.action.ExpectationResponseCallback; import org.mockserver.model.HttpRequest; @@ -58,7 +56,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; -import java.util.stream.Stream; import static java.time.Duration.ofDays; import static java.util.concurrent.TimeUnit.SECONDS; @@ -171,80 +168,49 @@ void httpPull_dataTransfer_withEdrCache() { providerDataSource.verify(request("/source").withMethod("GET")); } - private static Stream participants() { - return Stream.of(CONSUMER, PROVIDER); - } - - @ParameterizedTest(name = "Suspend and resume httpPull dataTransfer with Edr cache by {0}") - @MethodSource("participants") - void suspendAndResume_httpPull_dataTransfer_withEdrCache(Participant participant) { - var assetId = UUID.randomUUID().toString(); - createResourcesOnProvider(assetId, httpSourceDataAddress()); - - var consumerTransferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) - .withTransferType("HttpData-PULL") - .execute(); - - CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, STARTED); - - var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull); - - var msg = UUID.randomUUID().toString(); - await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))); - - var providerTransferProcessId = PROVIDER.getTransferProcesses().stream() - .filter(filter -> filter.asJsonObject().getString("correlationId").equals(consumerTransferProcessId)) - .map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow(); + @Test + void suspendAndResumeByConsumer_httpPull_dataTransfer_withEdrCache() { + var startedTransferContext = start_httpPull_dataTransfer(); - participant.suspendTransfer(Participant.PROVIDER.equals(participant.getName()) ? providerTransferProcessId : consumerTransferProcessId, "supension"); + CONSUMER.suspendTransfer(startedTransferContext.consumerTransferProcessId); + check_suspended_httpPull_dataTransfer(startedTransferContext); + CONSUMER.resumeTransfer(startedTransferContext.consumerTransferProcessId); + check_resumed_httpPull_dataTransfer(startedTransferContext); - CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, SUSPENDED); - PROVIDER.awaitTransferToBeInState(providerTransferProcessId, SUSPENDED); + providerDataSource.verify(request("/source").withMethod("GET")); + } - // checks that the EDR is gone once the transfer has been suspended - await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(consumerTransferProcessId))); - // checks that transfer fails - await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")))); + @Test + void suspendAndResumeByProvider_httpPull_dataTransfer_withEdrCache() { + var startedTransferContext = start_httpPull_dataTransfer(); - participant.resumeTransfer(Participant.PROVIDER.equals(participant.getName()) ? providerTransferProcessId : consumerTransferProcessId); + PROVIDER.suspendTransfer(startedTransferContext.providerTransferProcessId); + check_suspended_httpPull_dataTransfer(startedTransferContext); - // check that transfer is available again - CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, STARTED); - PROVIDER.awaitTransferToBeInState(providerTransferProcessId, STARTED); - var secondEdr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull); - var secondMessage = UUID.randomUUID().toString(); - await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), body -> assertThat(body).isEqualTo("data"))); + PROVIDER.resumeTransfer(startedTransferContext.providerTransferProcessId); + check_resumed_httpPull_dataTransfer(startedTransferContext); providerDataSource.verify(request("/source").withMethod("GET")); } - @ParameterizedTest(name = "Terminate httpPull dataTransfer by {0}") - @MethodSource("participants") - void terminate_httpPull_dataTransfer(Participant participant) { - var assetId = UUID.randomUUID().toString(); - createResourcesOnProvider(assetId, httpSourceDataAddress()); - var consumerTransferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) - .withTransferType("HttpData-PULL") - .execute(); - - CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, STARTED); + @Test + void terminateByProvider_httpPull_dataTransfer() { + var startedTransferContext = start_httpPull_dataTransfer(); - var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull); + PROVIDER.terminateTransfer(startedTransferContext.providerTransferProcessId); + check_terminated_httpPull_dataTransfer(startedTransferContext); - // Do the transfer - var msg = UUID.randomUUID().toString(); - await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))); + providerDataSource.verify(request("/source").withMethod("GET")); + } - var providerTransferProcessId = PROVIDER.getTransferProcesses().stream() - .filter(filter -> filter.asJsonObject().getString("correlationId").equals(consumerTransferProcessId)) - .map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow(); + @Test + void terminateByConsumer_httpPull_dataTransfer() { + var startedTransferContext = start_httpPull_dataTransfer(); - participant.terminateTransfer(Participant.PROVIDER.equals(participant.getName()) ? providerTransferProcessId : consumerTransferProcessId); - CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, TERMINATED); - PROVIDER.awaitTransferToBeInState(providerTransferProcessId, DEPROVISIONED); + CONSUMER.terminateTransfer(startedTransferContext.consumerTransferProcessId); + check_terminated_httpPull_dataTransfer(startedTransferContext); - await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")))); providerDataSource.verify(request("/source").withMethod("GET")); } @@ -345,6 +311,57 @@ private HttpResponse cacheEdr(HttpRequest request, Map CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull); + + var msg = UUID.randomUUID().toString(); + await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))); + + var providerTransferProcessId = PROVIDER.getTransferProcesses().stream() + .filter(filter -> filter.asJsonObject().getString("correlationId").equals(consumerTransferProcessId)) + .map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow(); + + return new StartedTransferContext(providerTransferProcessId, consumerTransferProcessId, edr, msg); + + } + + private void check_suspended_httpPull_dataTransfer(StartedTransferContext startedTransferContext){ + PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, SUSPENDED); + CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, SUSPENDED); + + // checks that the EDR is gone once the transfer has been suspended + await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(startedTransferContext.consumerTransferProcessId))); + // checks that transfer fails + await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(startedTransferContext.edr, Map.of("message", startedTransferContext.msg), body -> assertThat(body).isEqualTo("data")))); + } + + private void check_resumed_httpPull_dataTransfer(StartedTransferContext startedTransferContext){ + // check that transfer is available again + PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, STARTED); + CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, STARTED); + + var secondEdr = await().atMost(timeout).until(() -> CONSUMER.getEdr(startedTransferContext.consumerTransferProcessId), Objects::nonNull); + var secondMessage = UUID.randomUUID().toString(); + await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), body -> assertThat(body).isEqualTo("data"))); + } + + private void check_terminated_httpPull_dataTransfer(StartedTransferContext startedTransferContext){ + CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, TERMINATED); + PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, DEPROVISIONED); + + await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(startedTransferContext.edr, Map.of("message", startedTransferContext.msg), body -> assertThat(body).isEqualTo("data")))); + } + /** * Mocked http provisioner */ @@ -384,6 +401,8 @@ public HttpResponse handle(HttpRequest httpRequest) throws Exception { } } + private record StartedTransferContext (String providerTransferProcessId, String consumerTransferProcessId, DataAddress edr, String msg) { } + @Nested @EndToEndTest class InMemory extends Tests { diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java index d4c48988aab..a951d8f87a8 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java @@ -235,7 +235,7 @@ void shouldSuspendAndResumeTransfer() { .withDestination(kafkaSink()).withTransferType("Kafka-PUSH").execute(); assertMessagesAreSentTo(consumer); - CONSUMER.suspendTransfer(transferProcessId, "any kind of reason"); + CONSUMER.suspendTransfer(transferProcessId); CONSUMER.awaitTransferToBeInState(transferProcessId, SUSPENDED); assertNoMoreMessagesAreSentTo(consumer); From 7919d98a89da6166595eee8ee5d4bff883949774 Mon Sep 17 00:00:00 2001 From: AndrYurk Date: Wed, 18 Dec 2024 23:34:29 +0100 Subject: [PATCH 3/9] fix:Termination of pull transfer process from consumer side --- .../org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java index 2cff965450d..a8ad9fc93b1 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java @@ -362,6 +362,8 @@ private void check_terminated_httpPull_dataTransfer(StartedTransferContext start await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(startedTransferContext.edr, Map.of("message", startedTransferContext.msg), body -> assertThat(body).isEqualTo("data")))); } + private record StartedTransferContext (String providerTransferProcessId, String consumerTransferProcessId, DataAddress edr, String msg) { } + /** * Mocked http provisioner */ @@ -401,7 +403,6 @@ public HttpResponse handle(HttpRequest httpRequest) throws Exception { } } - private record StartedTransferContext (String providerTransferProcessId, String consumerTransferProcessId, DataAddress edr, String msg) { } @Nested @EndToEndTest From ea9d3d9df4b127df2d89f005800cca090804fa68 Mon Sep 17 00:00:00 2001 From: AndrYurk Date: Thu, 19 Dec 2024 12:44:14 +0100 Subject: [PATCH 4/9] fix:Termination of pull transfer process from consumer side --- .../TransferProcessProtocolServiceImpl.java | 12 +-- .../test/system/utils/Participant.java | 4 +- .../test/e2e/TransferPullEndToEndTest.java | 79 +++++++++++-------- .../e2e/TransferStreamingEndToEndTest.java | 2 +- 4 files changed, 56 insertions(+), 41 deletions(-) diff --git a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImpl.java b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImpl.java index 3cd4b9e2c22..635002b0a26 100644 --- a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImpl.java +++ b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImpl.java @@ -212,13 +212,13 @@ private ServiceResult completedAction(TransferCompletionMessage @NotNull private ServiceResult suspendedAction(TransferSuspensionMessage message, TransferProcess transferProcess) { - if (transferProcess.canBeTerminated()) { - if (transferProcess.getType() == PROVIDER) { - var suspension = dataFlowManager.suspend(transferProcess); - if (suspension.failed()) { - return ServiceResult.conflict("Cannot suspend transfer process %s: %s".formatted(transferProcess.getId(), suspension.getFailureDetail())); - } + if (transferProcess.getType() == PROVIDER) { + var suspension = dataFlowManager.suspend(transferProcess); + if (suspension.failed()) { + return ServiceResult.conflict("Cannot suspend transfer process %s: %s".formatted(transferProcess.getId(), suspension.getFailureDetail())); } + } + if (transferProcess.canBeTerminated()) { var reason = message.getReason().stream().map(Object::toString).collect(joining(", ")); transferProcess.transitionSuspended(reason); transferProcess.protocolMessageReceived(message.getId()); diff --git a/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java b/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java index 88c59347f45..cc7c1160fa7 100644 --- a/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java +++ b/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java @@ -457,11 +457,11 @@ public String getTransferProcessState(String id) { * * @param id transfer process id. */ - public void suspendTransfer(String id) { + public void suspendTransfer(String id, String reason) { var requestBodyBuilder = createObjectBuilder() .add(CONTEXT, createObjectBuilder().add(VOCAB, EDC_NAMESPACE)) .add(TYPE, "SuspendTransfer") - .add("reason", "any reason"); + .add("reason", reason); managementEndpoint.baseRequest() .contentType(JSON) diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java index a8ad9fc93b1..53bf031a87b 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java @@ -53,6 +53,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.AbstractMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; @@ -170,46 +171,50 @@ void httpPull_dataTransfer_withEdrCache() { @Test void suspendAndResumeByConsumer_httpPull_dataTransfer_withEdrCache() { - var startedTransferContext = start_httpPull_dataTransfer(); + var assetId = start_httpPull_dataTransfer(); + var startedTransferContext = checkStarted_httpPull_dataTransfer(assetId); - CONSUMER.suspendTransfer(startedTransferContext.consumerTransferProcessId); - check_suspended_httpPull_dataTransfer(startedTransferContext); + CONSUMER.suspendTransfer(startedTransferContext.consumerTransferProcessId, "any reason"); + checkSuspended_httpPull_dataTransfer(startedTransferContext); CONSUMER.resumeTransfer(startedTransferContext.consumerTransferProcessId); - check_resumed_httpPull_dataTransfer(startedTransferContext); + checkResumed_httpPull_dataTransfer(startedTransferContext); providerDataSource.verify(request("/source").withMethod("GET")); } @Test void suspendAndResumeByProvider_httpPull_dataTransfer_withEdrCache() { - var startedTransferContext = start_httpPull_dataTransfer(); + var assetId = start_httpPull_dataTransfer(); + var startedTransferContext = checkStarted_httpPull_dataTransfer(assetId); - PROVIDER.suspendTransfer(startedTransferContext.providerTransferProcessId); - check_suspended_httpPull_dataTransfer(startedTransferContext); + PROVIDER.suspendTransfer(startedTransferContext.providerTransferProcessId, "any reason"); + checkSuspended_httpPull_dataTransfer(startedTransferContext); PROVIDER.resumeTransfer(startedTransferContext.providerTransferProcessId); - check_resumed_httpPull_dataTransfer(startedTransferContext); + checkResumed_httpPull_dataTransfer(startedTransferContext); providerDataSource.verify(request("/source").withMethod("GET")); } @Test void terminateByProvider_httpPull_dataTransfer() { - var startedTransferContext = start_httpPull_dataTransfer(); + var assetId = start_httpPull_dataTransfer(); + var startedTransferContext = checkStarted_httpPull_dataTransfer(assetId); PROVIDER.terminateTransfer(startedTransferContext.providerTransferProcessId); - check_terminated_httpPull_dataTransfer(startedTransferContext); + checkTerminated_httpPull_dataTransfer(startedTransferContext); providerDataSource.verify(request("/source").withMethod("GET")); } @Test void terminateByConsumer_httpPull_dataTransfer() { - var startedTransferContext = start_httpPull_dataTransfer(); + var assetId = start_httpPull_dataTransfer(); + var startedTransferContext = checkStarted_httpPull_dataTransfer(assetId); CONSUMER.terminateTransfer(startedTransferContext.consumerTransferProcessId); - check_terminated_httpPull_dataTransfer(startedTransferContext); + checkTerminated_httpPull_dataTransfer(startedTransferContext); providerDataSource.verify(request("/source").withMethod("GET")); } @@ -312,53 +317,63 @@ private HttpResponse cacheEdr(HttpRequest request, Map CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull); - - var msg = UUID.randomUUID().toString(); - await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))); - var providerTransferProcessId = PROVIDER.getTransferProcesses().stream() .filter(filter -> filter.asJsonObject().getString("correlationId").equals(consumerTransferProcessId)) .map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow(); - return new StartedTransferContext(providerTransferProcessId, consumerTransferProcessId, edr, msg); + CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, STARTED); + PROVIDER.awaitTransferToBeInState(providerTransferProcessId, STARTED); + + var edrMessage = assertDataIsAccessible(consumerTransferProcessId); + return new StartedTransferContext(consumerTransferProcessId, providerTransferProcessId, edrMessage.getKey(), edrMessage.getValue()); } - private void check_suspended_httpPull_dataTransfer(StartedTransferContext startedTransferContext){ + private void checkSuspended_httpPull_dataTransfer(StartedTransferContext startedTransferContext){ PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, SUSPENDED); CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, SUSPENDED); - // checks that the EDR is gone once the transfer has been suspended - await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(startedTransferContext.consumerTransferProcessId))); - // checks that transfer fails - await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(startedTransferContext.edr, Map.of("message", startedTransferContext.msg), body -> assertThat(body).isEqualTo("data")))); + assertDataIsNotAccessible(startedTransferContext); } - private void check_resumed_httpPull_dataTransfer(StartedTransferContext startedTransferContext){ - // check that transfer is available again + private void checkResumed_httpPull_dataTransfer(StartedTransferContext startedTransferContext){ PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, STARTED); CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, STARTED); - var secondEdr = await().atMost(timeout).until(() -> CONSUMER.getEdr(startedTransferContext.consumerTransferProcessId), Objects::nonNull); - var secondMessage = UUID.randomUUID().toString(); - await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), body -> assertThat(body).isEqualTo("data"))); + assertDataIsAccessible(startedTransferContext.consumerTransferProcessId); } - private void check_terminated_httpPull_dataTransfer(StartedTransferContext startedTransferContext){ + private void checkTerminated_httpPull_dataTransfer(StartedTransferContext startedTransferContext){ CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, TERMINATED); PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, DEPROVISIONED); + assertDataIsNotAccessible(startedTransferContext); + } + + private Map.Entry assertDataIsAccessible(String consumerTransferProcessId) { + var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull); + var msg = UUID.randomUUID().toString(); + await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))); + + return new AbstractMap.SimpleEntry<>(edr, msg); + } + + private void assertDataIsNotAccessible(StartedTransferContext startedTransferContext) { + // checks that the EDR is gone once the transfer has been suspended + await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(startedTransferContext.consumerTransferProcessId))); + // checks that transfer fails await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(startedTransferContext.edr, Map.of("message", startedTransferContext.msg), body -> assertThat(body).isEqualTo("data")))); } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java index a951d8f87a8..2f67d5a70ad 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java @@ -235,7 +235,7 @@ void shouldSuspendAndResumeTransfer() { .withDestination(kafkaSink()).withTransferType("Kafka-PUSH").execute(); assertMessagesAreSentTo(consumer); - CONSUMER.suspendTransfer(transferProcessId); + CONSUMER.suspendTransfer(transferProcessId, "any reason"); CONSUMER.awaitTransferToBeInState(transferProcessId, SUSPENDED); assertNoMoreMessagesAreSentTo(consumer); From 8af50d20eee34deaae0217e06e2be77a73a1da19 Mon Sep 17 00:00:00 2001 From: AndrYurk Date: Thu, 19 Dec 2024 12:55:09 +0100 Subject: [PATCH 5/9] fix:Termination of pull transfer process from consumer side --- .../java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java index 53bf031a87b..1408a21cd06 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java @@ -377,7 +377,7 @@ private void assertDataIsNotAccessible(StartedTransferContext startedTransferCon await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(startedTransferContext.edr, Map.of("message", startedTransferContext.msg), body -> assertThat(body).isEqualTo("data")))); } - private record StartedTransferContext (String providerTransferProcessId, String consumerTransferProcessId, DataAddress edr, String msg) { } + private record StartedTransferContext (String consumerTransferProcessId, String providerTransferProcessId, DataAddress edr, String msg) { } /** * Mocked http provisioner From 0fac72f7d0d3623af6d4db488902cfcebe4d72e4 Mon Sep 17 00:00:00 2001 From: AndrYurk Date: Thu, 19 Dec 2024 13:53:09 +0100 Subject: [PATCH 6/9] fix:Termination of pull transfer process from consumer side --- .../org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java index 1408a21cd06..a35495afde4 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java @@ -328,12 +328,11 @@ private StartedTransferContext checkStarted_httpPull_dataTransfer(String assetId var consumerTransferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) .withTransferType("HttpData-PULL") .execute(); + CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, STARTED); var providerTransferProcessId = PROVIDER.getTransferProcesses().stream() .filter(filter -> filter.asJsonObject().getString("correlationId").equals(consumerTransferProcessId)) .map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow(); - - CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, STARTED); PROVIDER.awaitTransferToBeInState(providerTransferProcessId, STARTED); var edrMessage = assertDataIsAccessible(consumerTransferProcessId); From a167fef53eff0eb288bd09a2c458829600ee4407 Mon Sep 17 00:00:00 2001 From: AndrYurk Date: Fri, 20 Dec 2024 09:21:06 +0100 Subject: [PATCH 7/9] fix:Termination of pull transfer process from consumer side --- .../test/e2e/TransferPullEndToEndTest.java | 82 +++++++++---------- 1 file changed, 38 insertions(+), 44 deletions(-) diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java index a35495afde4..1c057e95279 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java @@ -171,50 +171,67 @@ void httpPull_dataTransfer_withEdrCache() { @Test void suspendAndResumeByConsumer_httpPull_dataTransfer_withEdrCache() { - var assetId = start_httpPull_dataTransfer(); - var startedTransferContext = checkStarted_httpPull_dataTransfer(assetId); + var assetId = createResources(); + var startedTransferContext = assertTransferProcessIsStarted(assetId); + var edrMessage = assertDataIsAccessible(startedTransferContext.consumerTransferProcessId); CONSUMER.suspendTransfer(startedTransferContext.consumerTransferProcessId, "any reason"); - checkSuspended_httpPull_dataTransfer(startedTransferContext); + PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, SUSPENDED); + CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, SUSPENDED); + assertDataIsNotAccessible(startedTransferContext.consumerTransferProcessId, edrMessage.getKey(), edrMessage.getValue()); CONSUMER.resumeTransfer(startedTransferContext.consumerTransferProcessId); - checkResumed_httpPull_dataTransfer(startedTransferContext); + PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, STARTED); + CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, STARTED); + assertDataIsAccessible(startedTransferContext.consumerTransferProcessId); providerDataSource.verify(request("/source").withMethod("GET")); } @Test void suspendAndResumeByProvider_httpPull_dataTransfer_withEdrCache() { - var assetId = start_httpPull_dataTransfer(); - var startedTransferContext = checkStarted_httpPull_dataTransfer(assetId); + var assetId = createResources(); + var startedTransferContext = assertTransferProcessIsStarted(assetId); + var edrMessage = assertDataIsAccessible(startedTransferContext.consumerTransferProcessId); PROVIDER.suspendTransfer(startedTransferContext.providerTransferProcessId, "any reason"); - checkSuspended_httpPull_dataTransfer(startedTransferContext); + PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, SUSPENDED); + CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, SUSPENDED); + assertDataIsNotAccessible(startedTransferContext.consumerTransferProcessId, edrMessage.getKey(), edrMessage.getValue()); PROVIDER.resumeTransfer(startedTransferContext.providerTransferProcessId); - checkResumed_httpPull_dataTransfer(startedTransferContext); + PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, STARTED); + CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, STARTED); + assertDataIsAccessible(startedTransferContext.consumerTransferProcessId); + providerDataSource.verify(request("/source").withMethod("GET")); } @Test void terminateByProvider_httpPull_dataTransfer() { - var assetId = start_httpPull_dataTransfer(); - var startedTransferContext = checkStarted_httpPull_dataTransfer(assetId); + var assetId = createResources(); + var startedTransferContext = assertTransferProcessIsStarted(assetId); + var edrMessage = assertDataIsAccessible(startedTransferContext.consumerTransferProcessId); PROVIDER.terminateTransfer(startedTransferContext.providerTransferProcessId); - checkTerminated_httpPull_dataTransfer(startedTransferContext); + CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, TERMINATED); + PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, DEPROVISIONED); + assertDataIsNotAccessible(startedTransferContext.consumerTransferProcessId, edrMessage.getKey(), edrMessage.getValue()); providerDataSource.verify(request("/source").withMethod("GET")); } @Test void terminateByConsumer_httpPull_dataTransfer() { - var assetId = start_httpPull_dataTransfer(); - var startedTransferContext = checkStarted_httpPull_dataTransfer(assetId); + var assetId = createResources(); + var startedTransferContext = assertTransferProcessIsStarted(assetId); + var edrMessage = assertDataIsAccessible(startedTransferContext.consumerTransferProcessId); CONSUMER.terminateTransfer(startedTransferContext.consumerTransferProcessId); - checkTerminated_httpPull_dataTransfer(startedTransferContext); + CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, TERMINATED); + PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, DEPROVISIONED); + assertDataIsNotAccessible(startedTransferContext.consumerTransferProcessId, edrMessage.getKey(), edrMessage.getValue()); providerDataSource.verify(request("/source").withMethod("GET")); } @@ -317,14 +334,14 @@ private HttpResponse cacheEdr(HttpRequest request, Map id.asJsonObject().getString("@id")).findFirst().orElseThrow(); PROVIDER.awaitTransferToBeInState(providerTransferProcessId, STARTED); - var edrMessage = assertDataIsAccessible(consumerTransferProcessId); - - return new StartedTransferContext(consumerTransferProcessId, providerTransferProcessId, edrMessage.getKey(), edrMessage.getValue()); - } - - private void checkSuspended_httpPull_dataTransfer(StartedTransferContext startedTransferContext){ - PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, SUSPENDED); - CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, SUSPENDED); - - assertDataIsNotAccessible(startedTransferContext); - } - - private void checkResumed_httpPull_dataTransfer(StartedTransferContext startedTransferContext){ - PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, STARTED); - CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, STARTED); - - assertDataIsAccessible(startedTransferContext.consumerTransferProcessId); - } - - private void checkTerminated_httpPull_dataTransfer(StartedTransferContext startedTransferContext){ - CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, TERMINATED); - PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, DEPROVISIONED); - - assertDataIsNotAccessible(startedTransferContext); + return new StartedTransferContext(consumerTransferProcessId, providerTransferProcessId); } private Map.Entry assertDataIsAccessible(String consumerTransferProcessId) { @@ -369,14 +363,14 @@ private Map.Entry assertDataIsAccessible(String consumerTra return new AbstractMap.SimpleEntry<>(edr, msg); } - private void assertDataIsNotAccessible(StartedTransferContext startedTransferContext) { + private void assertDataIsNotAccessible(String consumerTransferProcessId, DataAddress edr, String msg) { // checks that the EDR is gone once the transfer has been suspended - await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(startedTransferContext.consumerTransferProcessId))); + await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(consumerTransferProcessId))); // checks that transfer fails - await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(startedTransferContext.edr, Map.of("message", startedTransferContext.msg), body -> assertThat(body).isEqualTo("data")))); + await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")))); } - private record StartedTransferContext (String consumerTransferProcessId, String providerTransferProcessId, DataAddress edr, String msg) { } + private record StartedTransferContext(String consumerTransferProcessId, String providerTransferProcessId) { } /** * Mocked http provisioner From 535a8e32f9d78335a932bd8233ea0bcb215e6c9e Mon Sep 17 00:00:00 2001 From: AndrYurk Date: Fri, 20 Dec 2024 09:24:39 +0100 Subject: [PATCH 8/9] fix:Termination of pull transfer process from consumer side --- .../TransferProcessProtocolServiceImpl.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImpl.java b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImpl.java index 635002b0a26..5d95cb9b66c 100644 --- a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImpl.java +++ b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImpl.java @@ -233,12 +233,12 @@ private ServiceResult suspendedAction(TransferSuspensionMessage @NotNull private ServiceResult terminatedAction(TransferTerminationMessage message, TransferProcess transferProcess) { if (transferProcess.canBeTerminated()) { - if (transferProcess.getType() == PROVIDER) { - var termination = dataFlowManager.terminate(transferProcess); - if (termination.failed()) { - return ServiceResult.conflict("Cannot terminate transfer process %s: %s".formatted(transferProcess.getId(), termination.getFailureDetail())); - } - } + if (transferProcess.getType() == PROVIDER) { + var termination = dataFlowManager.terminate(transferProcess); + if (termination.failed()) { + return ServiceResult.conflict("Cannot terminate transfer process %s: %s".formatted(transferProcess.getId(), termination.getFailureDetail())); + } + } observable.invokeForEach(l -> l.preTerminated(transferProcess)); transferProcess.transitionTerminated(); transferProcess.protocolMessageReceived(message.getId()); From b1f6c3acd9802f09eab0a08c378a97ddc8475372 Mon Sep 17 00:00:00 2001 From: AndrYurk Date: Fri, 20 Dec 2024 13:19:13 +0100 Subject: [PATCH 9/9] fix:Termination of pull transfer process from consumer side --- .../eclipse/edc/test/e2e/TransferPullEndToEndTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java index 1c057e95279..b8b07277249 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java @@ -172,7 +172,7 @@ void httpPull_dataTransfer_withEdrCache() { @Test void suspendAndResumeByConsumer_httpPull_dataTransfer_withEdrCache() { var assetId = createResources(); - var startedTransferContext = assertTransferProcessIsStarted(assetId); + var startedTransferContext = startTransferProcess(assetId); var edrMessage = assertDataIsAccessible(startedTransferContext.consumerTransferProcessId); CONSUMER.suspendTransfer(startedTransferContext.consumerTransferProcessId, "any reason"); @@ -191,7 +191,7 @@ void suspendAndResumeByConsumer_httpPull_dataTransfer_withEdrCache() { @Test void suspendAndResumeByProvider_httpPull_dataTransfer_withEdrCache() { var assetId = createResources(); - var startedTransferContext = assertTransferProcessIsStarted(assetId); + var startedTransferContext = startTransferProcess(assetId); var edrMessage = assertDataIsAccessible(startedTransferContext.consumerTransferProcessId); PROVIDER.suspendTransfer(startedTransferContext.providerTransferProcessId, "any reason"); @@ -211,7 +211,7 @@ void suspendAndResumeByProvider_httpPull_dataTransfer_withEdrCache() { @Test void terminateByProvider_httpPull_dataTransfer() { var assetId = createResources(); - var startedTransferContext = assertTransferProcessIsStarted(assetId); + var startedTransferContext = startTransferProcess(assetId); var edrMessage = assertDataIsAccessible(startedTransferContext.consumerTransferProcessId); PROVIDER.terminateTransfer(startedTransferContext.providerTransferProcessId); @@ -225,7 +225,7 @@ void terminateByProvider_httpPull_dataTransfer() { @Test void terminateByConsumer_httpPull_dataTransfer() { var assetId = createResources(); - var startedTransferContext = assertTransferProcessIsStarted(assetId); + var startedTransferContext = startTransferProcess(assetId); var edrMessage = assertDataIsAccessible(startedTransferContext.consumerTransferProcessId); CONSUMER.terminateTransfer(startedTransferContext.consumerTransferProcessId); @@ -341,7 +341,7 @@ private String createResources(){ return assetId; } - private StartedTransferContext assertTransferProcessIsStarted(String assetId){ + private StartedTransferContext startTransferProcess(String assetId){ var consumerTransferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) .withTransferType("HttpData-PULL") .execute();