diff --git a/edc-extensions/edr/edr-callback/src/main/java/org/eclipse/tractusx/edc/callback/TransferProcessLocalCallback.java b/edc-extensions/edr/edr-callback/src/main/java/org/eclipse/tractusx/edc/callback/TransferProcessLocalCallback.java index b05593f2e..b04033ae7 100644 --- a/edc-extensions/edr/edr-callback/src/main/java/org/eclipse/tractusx/edc/callback/TransferProcessLocalCallback.java +++ b/edc-extensions/edr/edr-callback/src/main/java/org/eclipse/tractusx/edc/callback/TransferProcessLocalCallback.java @@ -18,6 +18,7 @@ import org.eclipse.edc.connector.spi.callback.CallbackEventRemoteMessage; import org.eclipse.edc.connector.spi.contractagreement.ContractAgreementService; import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessTerminated; import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore; import org.eclipse.edc.spi.event.Event; import org.eclipse.edc.spi.monitor.Monitor; @@ -36,6 +37,7 @@ import java.time.ZoneOffset; import static java.lang.String.format; +import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.REFRESHING; public class TransferProcessLocalCallback implements InProcessCallback { @@ -59,14 +61,28 @@ public TransferProcessLocalCallback(EndpointDataReferenceCache edrCache, Transfe @Override public Result invoke(CallbackEventRemoteMessage message) { - if (message.getEventEnvelope().getPayload() instanceof TransferProcessStarted transferProcessStarted) { - if (transferProcessStarted.getDataAddress() != null) { - return transformerRegistry.transform(transferProcessStarted.getDataAddress(), EndpointDataReference.class) - .compose(this::storeEdr) - .mapTo(); - } + if (message.getEventEnvelope().getPayload() instanceof TransferProcessStarted transferProcessStarted && transferProcessStarted.getDataAddress() != null) { + return transformerRegistry.transform(transferProcessStarted.getDataAddress(), EndpointDataReference.class) + .compose(this::storeEdr) + .mapTo(); + } else if (message.getEventEnvelope().getPayload() instanceof TransferProcessTerminated terminated && terminated.getReason() != null) { + return handleTransferProcessTermination(terminated); + } else { + return Result.success(); } - return Result.success(); + } + + private Result handleTransferProcessTermination(TransferProcessTerminated terminated) { + return transactionContext.execute(() -> { + var transferProcess = transferProcessStore.findById(terminated.getTransferProcessId()); + if (transferProcess != null) { + stopEdrNegotiation(transferProcess.getDataRequest().getAssetId(), transferProcess.getDataRequest().getContractId(), terminated.getReason()); + return Result.success(); + } else { + return Result.failure(format("Failed to find a transfer process with ID %s", terminated.getTransferProcessId())); + } + }); + } private Result storeEdr(EndpointDataReference edr) { @@ -107,6 +123,21 @@ private Result storeEdr(EndpointDataReference edr) { } + private void stopEdrNegotiation(String assetId, String agreementId, String errorDetail) { + var querySpec = QuerySpec.Builder.newInstance() + .filter(fieldFilter("agreementId", agreementId)) + .filter(fieldFilter("assetId", assetId)) + .filter(fieldFilter("state", REFRESHING.code())) + .build(); + + edrCache.queryForEntries(querySpec).forEach((entry -> { + monitor.debug(format("Transitioning EDR to Error Refreshing for transfer process %s", entry.getTransferProcessId())); + entry.setErrorDetail(errorDetail); + entry.transitionError(); + edrCache.update(entry); + })); + } + private void cleanOldEdr(String assetId, String agreementId) { var querySpec = QuerySpec.Builder.newInstance() .filter(fieldFilter("agreementId", agreementId)) @@ -138,7 +169,7 @@ private Result extractExpirationTime(EndpointDataReference edr) { return Result.success(0L); } - private Criterion fieldFilter(String field, String value) { + private Criterion fieldFilter(String field, Object value) { return Criterion.Builder.newInstance() .operandLeft(field) .operator("=") diff --git a/edc-extensions/edr/edr-callback/src/test/java/org/eclipse/tractusx/edc/callback/TestFunctions.java b/edc-extensions/edr/edr-callback/src/test/java/org/eclipse/tractusx/edc/callback/TestFunctions.java index a44b2fe4e..1884b9d60 100644 --- a/edc-extensions/edr/edr-callback/src/test/java/org/eclipse/tractusx/edc/callback/TestFunctions.java +++ b/edc-extensions/edr/edr-callback/src/test/java/org/eclipse/tractusx/edc/callback/TestFunctions.java @@ -26,6 +26,7 @@ import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement; import org.eclipse.edc.connector.spi.callback.CallbackEventRemoteMessage; import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessTerminated; import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.spi.event.Event; import org.eclipse.edc.spi.event.EventEnvelope; @@ -80,6 +81,18 @@ public static TransferProcessStarted getTransferProcessStartedEvent(DataAddress .build(); } + public static TransferProcessTerminated getTransferTerminatedEvent(String transferProcessId, String reason) { + return TransferProcessTerminated.Builder.newInstance() + .callbackAddresses(List.of(CallbackAddress.Builder.newInstance() + .uri("local://test") + .events(Set.of("test")) + .transactional(true) + .build())) + .reason(reason) + .transferProcessId(transferProcessId) + .build(); + } + public static EndpointDataReference getEdr() { return EndpointDataReference.Builder.newInstance() .id("dataRequestId") diff --git a/edc-extensions/edr/edr-callback/src/test/java/org/eclipse/tractusx/edc/callback/TransferProcessLocalCallbackTest.java b/edc-extensions/edr/edr-callback/src/test/java/org/eclipse/tractusx/edc/callback/TransferProcessLocalCallbackTest.java index b64b82e89..84dc5eb0a 100644 --- a/edc-extensions/edr/edr-callback/src/test/java/org/eclipse/tractusx/edc/callback/TransferProcessLocalCallbackTest.java +++ b/edc-extensions/edr/edr-callback/src/test/java/org/eclipse/tractusx/edc/callback/TransferProcessLocalCallbackTest.java @@ -52,7 +52,9 @@ import static org.eclipse.edc.spi.types.domain.edr.EndpointDataReference.EDR_SIMPLE_TYPE; import static org.eclipse.tractusx.edc.callback.TestFunctions.getEdr; import static org.eclipse.tractusx.edc.callback.TestFunctions.getTransferProcessStartedEvent; +import static org.eclipse.tractusx.edc.callback.TestFunctions.getTransferTerminatedEvent; import static org.eclipse.tractusx.edc.callback.TestFunctions.remoteMessage; +import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.REFRESHING; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; @@ -182,6 +184,45 @@ void invoke_shouldFail_withInvalidDataAddress() { verifyNoInteractions(transferProcessStore); } + @Test + void invoke_shouldStopEdrNegotiation_whenTerminatedMessageReceived() { + + var transferProcessId = "transferProcessId"; + var assetId = "assetId"; + var contractId = "contractId"; + var edr = getEdr(); + + var dataRequest = DataRequest.Builder.newInstance().id(edr.getId()) + .destinationType("HttpProxy") + .assetId(assetId) + .contractId(contractId) + .build(); + + var transferProcess = TransferProcess.Builder.newInstance() + .id(transferProcessId) + .dataRequest(dataRequest) + .build(); + + var edrEntry = EndpointDataReferenceEntry.Builder.newInstance() + .agreementId(contractId) + .transferProcessId(transferProcessId) + .assetId(assetId) + .state(REFRESHING.code()) + .build(); + + when(transferProcessStore.findById(transferProcessId)).thenReturn(transferProcess); + when(edrCache.queryForEntries(any())).thenReturn(Stream.of(edrEntry)); + + var event = getTransferTerminatedEvent(transferProcessId, "Failure"); + var message = remoteMessage(event); + + var result = callback.invoke(message); + assertThat(result.succeeded()).isTrue(); + + verify(edrCache).update(argThat(entry -> entry.getState() == EndpointDataReferenceEntryStates.ERROR.code())); + + } + @ParameterizedTest @ArgumentsSource(EventInstances.class) void invoke_shouldIgnoreOtherEvents(TransferProcessEvent event) { diff --git a/edc-extensions/ssi/ssi-miw-credential-client/src/main/java/org/eclipse/tractusx/edc/iam/ssi/miw/api/MiwApiClientImpl.java b/edc-extensions/ssi/ssi-miw-credential-client/src/main/java/org/eclipse/tractusx/edc/iam/ssi/miw/api/MiwApiClientImpl.java index c8de50c4b..09ea5d058 100644 --- a/edc-extensions/ssi/ssi-miw-credential-client/src/main/java/org/eclipse/tractusx/edc/iam/ssi/miw/api/MiwApiClientImpl.java +++ b/edc-extensions/ssi/ssi-miw-credential-client/src/main/java/org/eclipse/tractusx/edc/iam/ssi/miw/api/MiwApiClientImpl.java @@ -36,6 +36,7 @@ import java.util.Set; import static java.lang.String.format; +import static org.eclipse.tractusx.edc.iam.ssi.miw.api.MiwFallbackFactories.retryWhenStatusIsNotIn; public class MiwApiClientImpl implements MiwApiClient { @@ -144,8 +145,14 @@ private Result handleVerifyResult(Map response) { } private Result executeRequest(Request request, TypeReference typeReference) { - try (var response = httpClient.execute(request)) { + try (var response = httpClient.execute(request, List.of(retryWhenStatusIsNotIn(200, 201)))) { return handleResponse(response, typeReference); + } catch (MiwClientException e) { + if (e.getResponse() != null) { + return handleError(e.getResponse()); + } else { + return Result.failure(e.getMessage()); + } } catch (IOException e) { return Result.failure(e.getMessage()); } diff --git a/edc-extensions/ssi/ssi-miw-credential-client/src/main/java/org/eclipse/tractusx/edc/iam/ssi/miw/api/MiwClientException.java b/edc-extensions/ssi/ssi-miw-credential-client/src/main/java/org/eclipse/tractusx/edc/iam/ssi/miw/api/MiwClientException.java new file mode 100644 index 000000000..17ec5a369 --- /dev/null +++ b/edc-extensions/ssi/ssi-miw-credential-client/src/main/java/org/eclipse/tractusx/edc/iam/ssi/miw/api/MiwClientException.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.tractusx.edc.iam.ssi.miw.api; + +import okhttp3.Response; +import org.eclipse.edc.spi.http.EdcHttpClientException; + +public class MiwClientException extends EdcHttpClientException { + private final Response response; + + public MiwClientException(String message) { + this(message, null); + } + + public MiwClientException(String message, Response response) { + super(message); + this.response = response; + } + + public Response getResponse() { + return response; + } +} diff --git a/edc-extensions/ssi/ssi-miw-credential-client/src/main/java/org/eclipse/tractusx/edc/iam/ssi/miw/api/MiwFallbackFactories.java b/edc-extensions/ssi/ssi-miw-credential-client/src/main/java/org/eclipse/tractusx/edc/iam/ssi/miw/api/MiwFallbackFactories.java new file mode 100644 index 000000000..fdc07f5fa --- /dev/null +++ b/edc-extensions/ssi/ssi-miw-credential-client/src/main/java/org/eclipse/tractusx/edc/iam/ssi/miw/api/MiwFallbackFactories.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.tractusx.edc.iam.ssi.miw.api; + +import dev.failsafe.Fallback; +import dev.failsafe.event.ExecutionAttemptedEvent; +import dev.failsafe.function.CheckedFunction; +import okhttp3.Response; +import org.eclipse.edc.spi.http.FallbackFactory; + +import java.util.Arrays; +import java.util.stream.Collectors; + +import static java.lang.String.format; + +public interface MiwFallbackFactories { + + static FallbackFactory retryWhenStatusIsNot(int status) { + return retryWhenStatusIsNotIn(status); + } + + /** + * Verifies that the response has a specific statuses, otherwise it should be retried + * + * @return the {@link FallbackFactory} + */ + static FallbackFactory retryWhenStatusIsNotIn(int... status) { + var codes = Arrays.stream(status).boxed().collect(Collectors.toSet()); + return request -> { + CheckedFunction, Exception> exceptionSupplier = event -> { + var response = event.getLastResult(); + if (response == null) { + return new MiwClientException(event.getLastException().getMessage()); + } else { + return new MiwClientException(format("Server response to %s was not one of %s but was %s", request, Arrays.toString(status), response.code()), response); + } + }; + return Fallback.builderOfException(exceptionSupplier) + .handleResultIf(r -> !codes.contains(r.code())) + .build(); + }; + } +}