Skip to content

Commit

Permalink
feat: hardening EDR refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
wolf4ood committed Oct 30, 2023
1 parent abb3667 commit b0cde40
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.edc.connector.spi.callback.CallbackEventRemoteMessage;
import org.eclipse.edc.connector.spi.contractagreement.ContractAgreementService;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessTerminated;
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.spi.event.Event;
import org.eclipse.edc.spi.monitor.Monitor;
Expand All @@ -36,6 +37,7 @@
import java.time.ZoneOffset;

import static java.lang.String.format;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.REFRESHING;

public class TransferProcessLocalCallback implements InProcessCallback {

Expand All @@ -59,14 +61,28 @@ public TransferProcessLocalCallback(EndpointDataReferenceCache edrCache, Transfe

@Override
public <T extends Event> Result<Void> invoke(CallbackEventRemoteMessage<T> message) {
if (message.getEventEnvelope().getPayload() instanceof TransferProcessStarted transferProcessStarted) {
if (transferProcessStarted.getDataAddress() != null) {
return transformerRegistry.transform(transferProcessStarted.getDataAddress(), EndpointDataReference.class)
.compose(this::storeEdr)
.mapTo();
}
if (message.getEventEnvelope().getPayload() instanceof TransferProcessStarted transferProcessStarted && transferProcessStarted.getDataAddress() != null) {
return transformerRegistry.transform(transferProcessStarted.getDataAddress(), EndpointDataReference.class)
.compose(this::storeEdr)
.mapTo();
} else if (message.getEventEnvelope().getPayload() instanceof TransferProcessTerminated terminated && terminated.getReason() != null) {
return handleTransferProcessTermination(terminated);
} else {
return Result.success();
}
return Result.success();
}

private Result<Void> handleTransferProcessTermination(TransferProcessTerminated terminated) {
return transactionContext.execute(() -> {
var transferProcess = transferProcessStore.findById(terminated.getTransferProcessId());
if (transferProcess != null) {
stopEdrNegotiation(transferProcess.getDataRequest().getAssetId(), transferProcess.getDataRequest().getContractId(), terminated.getReason());
return Result.success();
} else {
return Result.failure(format("Failed to find a transfer process with ID %s", terminated.getTransferProcessId()));
}
});

}

private Result<Void> storeEdr(EndpointDataReference edr) {
Expand Down Expand Up @@ -107,6 +123,21 @@ private Result<Void> storeEdr(EndpointDataReference edr) {

}

private void stopEdrNegotiation(String assetId, String agreementId, String errorDetail) {
var querySpec = QuerySpec.Builder.newInstance()
.filter(fieldFilter("agreementId", agreementId))
.filter(fieldFilter("assetId", assetId))
.filter(fieldFilter("state", REFRESHING.code()))
.build();

edrCache.queryForEntries(querySpec).forEach((entry -> {
monitor.debug(format("Transitioning EDR to Error Refreshing for transfer process %s", entry.getTransferProcessId()));
entry.setErrorDetail(errorDetail);
entry.transitionError();
edrCache.update(entry);
}));
}

private void cleanOldEdr(String assetId, String agreementId) {
var querySpec = QuerySpec.Builder.newInstance()
.filter(fieldFilter("agreementId", agreementId))
Expand Down Expand Up @@ -138,7 +169,7 @@ private Result<Long> extractExpirationTime(EndpointDataReference edr) {
return Result.success(0L);
}

private Criterion fieldFilter(String field, String value) {
private Criterion fieldFilter(String field, Object value) {
return Criterion.Builder.newInstance()
.operandLeft(field)
.operator("=")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement;
import org.eclipse.edc.connector.spi.callback.CallbackEventRemoteMessage;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessTerminated;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.event.Event;
import org.eclipse.edc.spi.event.EventEnvelope;
Expand Down Expand Up @@ -80,6 +81,18 @@ public static TransferProcessStarted getTransferProcessStartedEvent(DataAddress
.build();
}

public static TransferProcessTerminated getTransferTerminatedEvent(String transferProcessId, String reason) {
return TransferProcessTerminated.Builder.newInstance()
.callbackAddresses(List.of(CallbackAddress.Builder.newInstance()
.uri("local://test")
.events(Set.of("test"))
.transactional(true)
.build()))
.reason(reason)
.transferProcessId(transferProcessId)
.build();
}

public static EndpointDataReference getEdr() {
return EndpointDataReference.Builder.newInstance()
.id("dataRequestId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
import static org.eclipse.edc.spi.types.domain.edr.EndpointDataReference.EDR_SIMPLE_TYPE;
import static org.eclipse.tractusx.edc.callback.TestFunctions.getEdr;
import static org.eclipse.tractusx.edc.callback.TestFunctions.getTransferProcessStartedEvent;
import static org.eclipse.tractusx.edc.callback.TestFunctions.getTransferTerminatedEvent;
import static org.eclipse.tractusx.edc.callback.TestFunctions.remoteMessage;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.REFRESHING;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -182,6 +184,45 @@ void invoke_shouldFail_withInvalidDataAddress() {
verifyNoInteractions(transferProcessStore);
}

@Test
void invoke_shouldStopEdrNegotiation_whenTerminatedMessageReceived() {

var transferProcessId = "transferProcessId";
var assetId = "assetId";
var contractId = "contractId";
var edr = getEdr();

var dataRequest = DataRequest.Builder.newInstance().id(edr.getId())
.destinationType("HttpProxy")
.assetId(assetId)
.contractId(contractId)
.build();

var transferProcess = TransferProcess.Builder.newInstance()
.id(transferProcessId)
.dataRequest(dataRequest)
.build();

var edrEntry = EndpointDataReferenceEntry.Builder.newInstance()
.agreementId(contractId)
.transferProcessId(transferProcessId)
.assetId(assetId)
.state(REFRESHING.code())
.build();

when(transferProcessStore.findById(transferProcessId)).thenReturn(transferProcess);
when(edrCache.queryForEntries(any())).thenReturn(Stream.of(edrEntry));

var event = getTransferTerminatedEvent(transferProcessId, "Failure");
var message = remoteMessage(event);

var result = callback.invoke(message);
assertThat(result.succeeded()).isTrue();

verify(edrCache).update(argThat(entry -> entry.getState() == EndpointDataReferenceEntryStates.ERROR.code()));

}

@ParameterizedTest
@ArgumentsSource(EventInstances.class)
void invoke_shouldIgnoreOtherEvents(TransferProcessEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Set;

import static java.lang.String.format;
import static org.eclipse.tractusx.edc.iam.ssi.miw.api.MiwFallbackFactories.retryWhenStatusIsNotIn;

public class MiwApiClientImpl implements MiwApiClient {

Expand Down Expand Up @@ -144,8 +145,14 @@ private Result<Void> handleVerifyResult(Map<String, Object> response) {
}

private <R> Result<R> executeRequest(Request request, TypeReference<R> typeReference) {
try (var response = httpClient.execute(request)) {
try (var response = httpClient.execute(request, List.of(retryWhenStatusIsNotIn(200, 201)))) {
return handleResponse(response, typeReference);
} catch (MiwClientException e) {
if (e.getResponse() != null) {
return handleError(e.getResponse());
} else {
return Result.failure(e.getMessage());
}
} catch (IOException e) {
return Result.failure(e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.iam.ssi.miw.api;

import okhttp3.Response;
import org.eclipse.edc.spi.http.EdcHttpClientException;

public class MiwClientException extends EdcHttpClientException {
private final Response response;

public MiwClientException(String message) {
this(message, null);
}

public MiwClientException(String message, Response response) {
super(message);
this.response = response;
}

public Response getResponse() {
return response;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.iam.ssi.miw.api;

import dev.failsafe.Fallback;
import dev.failsafe.event.ExecutionAttemptedEvent;
import dev.failsafe.function.CheckedFunction;
import okhttp3.Response;
import org.eclipse.edc.spi.http.FallbackFactory;

import java.util.Arrays;
import java.util.stream.Collectors;

import static java.lang.String.format;

public interface MiwFallbackFactories {

static FallbackFactory retryWhenStatusIsNot(int status) {
return retryWhenStatusIsNotIn(status);
}

/**
* Verifies that the response has a specific statuses, otherwise it should be retried
*
* @return the {@link FallbackFactory}
*/
static FallbackFactory retryWhenStatusIsNotIn(int... status) {
var codes = Arrays.stream(status).boxed().collect(Collectors.toSet());
return request -> {
CheckedFunction<ExecutionAttemptedEvent<? extends Response>, Exception> exceptionSupplier = event -> {
var response = event.getLastResult();
if (response == null) {
return new MiwClientException(event.getLastException().getMessage());
} else {
return new MiwClientException(format("Server response to %s was not one of %s but was %s", request, Arrays.toString(status), response.code()), response);
}
};
return Fallback.builderOfException(exceptionSupplier)
.handleResultIf(r -> !codes.contains(r.code()))
.build();
};
}
}

0 comments on commit b0cde40

Please sign in to comment.