Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: handling renewals failure #855

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DEPENDENCIES
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
maven/mavencentral/com.apicatalog/carbon-did/0.0.2, Apache-2.0, approved, #9239

Check warning on line 1 in DEPENDENCIES

View workflow job for this annotation

GitHub Actions / verify / verify-dependencies / Dash-Verify-Licenses

Restricted Dependencies found

Some dependencies are marked 'restricted' - please review them
maven/mavencentral/com.apicatalog/iron-verifiable-credentials/0.8.1, Apache-2.0, approved, #9234
maven/mavencentral/com.apicatalog/titanium-json-ld/1.0.0, Apache-2.0, approved, clearlydefined
maven/mavencentral/com.apicatalog/titanium-json-ld/1.3.1, Apache-2.0, approved, #8912
Expand Down Expand Up @@ -166,7 +166,7 @@
maven/mavencentral/io.rest-assured/xml-path/5.3.2, Apache-2.0, approved, #9267
maven/mavencentral/io.setl/rdf-urdna/1.1, Apache-2.0, approved, clearlydefined
maven/mavencentral/io.swagger.core.v3/swagger-annotations-jakarta/2.2.15, Apache-2.0, approved, #5947
maven/mavencentral/io.swagger.core.v3/swagger-annotations/2.2.15, Apache-2.0, approved, clearlydefined
maven/mavencentral/io.swagger.core.v3/swagger-annotations/2.2.15, Apache-2.0, approved, #11362
maven/mavencentral/io.swagger.core.v3/swagger-core-jakarta/2.2.15, Apache-2.0, approved, #5929
maven/mavencentral/io.swagger.core.v3/swagger-core/2.2.15, Apache-2.0, approved, #9265
maven/mavencentral/io.swagger.core.v3/swagger-integration-jakarta/2.2.15, Apache-2.0, approved, clearlydefined
Expand Down
1 change: 0 additions & 1 deletion core/edr-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ dependencies {
implementation(project(":spi:edr-spi"))
implementation(project(":spi:core-spi"))


testImplementation(libs.edc.junit)
testImplementation(libs.awaitility)
testImplementation(testFixtures(project(":spi:edr-spi")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.edc.connector.spi.callback.CallbackEventRemoteMessage;
import org.eclipse.edc.connector.spi.contractagreement.ContractAgreementService;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessTerminated;
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.spi.event.Event;
import org.eclipse.edc.spi.monitor.Monitor;
Expand All @@ -36,6 +37,7 @@
import java.time.ZoneOffset;

import static java.lang.String.format;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.REFRESHING;

public class TransferProcessLocalCallback implements InProcessCallback {

Expand All @@ -59,14 +61,28 @@ public TransferProcessLocalCallback(EndpointDataReferenceCache edrCache, Transfe

@Override
public <T extends Event> Result<Void> invoke(CallbackEventRemoteMessage<T> message) {
if (message.getEventEnvelope().getPayload() instanceof TransferProcessStarted transferProcessStarted) {
if (transferProcessStarted.getDataAddress() != null) {
return transformerRegistry.transform(transferProcessStarted.getDataAddress(), EndpointDataReference.class)
.compose(this::storeEdr)
.mapTo();
}
if (message.getEventEnvelope().getPayload() instanceof TransferProcessStarted transferProcessStarted && transferProcessStarted.getDataAddress() != null) {
return transformerRegistry.transform(transferProcessStarted.getDataAddress(), EndpointDataReference.class)
.compose(this::storeEdr)
.mapTo();
} else if (message.getEventEnvelope().getPayload() instanceof TransferProcessTerminated terminated && terminated.getReason() != null) {
return handleTransferProcessTermination(terminated);
} else {
return Result.success();
}
return Result.success();
}

private Result<Void> handleTransferProcessTermination(TransferProcessTerminated terminated) {
return transactionContext.execute(() -> {
var transferProcess = transferProcessStore.findById(terminated.getTransferProcessId());
if (transferProcess != null) {
stopEdrNegotiation(transferProcess.getDataRequest().getAssetId(), transferProcess.getDataRequest().getContractId(), terminated.getReason());
return Result.success();
} else {
return Result.failure(format("Failed to find a transfer process with ID %s", terminated.getTransferProcessId()));
}
});

}

private Result<Void> storeEdr(EndpointDataReference edr) {
Expand Down Expand Up @@ -107,6 +123,21 @@ private Result<Void> storeEdr(EndpointDataReference edr) {

}

private void stopEdrNegotiation(String assetId, String agreementId, String errorDetail) {
var querySpec = QuerySpec.Builder.newInstance()
.filter(fieldFilter("agreementId", agreementId))
.filter(fieldFilter("assetId", assetId))
.filter(fieldFilter("state", REFRESHING.code()))
.build();

edrCache.queryForEntries(querySpec).forEach((entry -> {
monitor.debug(format("Transitioning EDR to Error Refreshing for transfer process %s", entry.getTransferProcessId()));
entry.setErrorDetail(errorDetail);
entry.transitionError();
edrCache.update(entry);
}));
}

private void cleanOldEdr(String assetId, String agreementId) {
var querySpec = QuerySpec.Builder.newInstance()
.filter(fieldFilter("agreementId", agreementId))
Expand All @@ -117,6 +148,16 @@ private void cleanOldEdr(String assetId, String agreementId) {
monitor.debug(format("Expiring EDR for transfer process %s", entry.getTransferProcessId()));
entry.transitionToExpired();
edrCache.update(entry);

var transferProcess = transferProcessStore.findById(entry.getTransferProcessId());

if (transferProcess != null && transferProcess.canBeCompleted()) {
transferProcess.transitionCompleting();
transferProcessStore.save(transferProcess);
} else {
monitor.info(format("Cannot terminate transfer process with id: %s", entry.getTransferProcessId()));
}

}));
}

Expand All @@ -138,7 +179,7 @@ private Result<Long> extractExpirationTime(EndpointDataReference edr) {
return Result.success(0L);
}

private Criterion fieldFilter(String field, String value) {
private Criterion fieldFilter(String field, Object value) {
return Criterion.Builder.newInstance()
.operandLeft(field)
.operator("=")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement;
import org.eclipse.edc.connector.spi.callback.CallbackEventRemoteMessage;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessTerminated;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.event.Event;
import org.eclipse.edc.spi.event.EventEnvelope;
Expand Down Expand Up @@ -80,6 +81,18 @@ public static TransferProcessStarted getTransferProcessStartedEvent(DataAddress
.build();
}

public static TransferProcessTerminated getTransferTerminatedEvent(String transferProcessId, String reason) {
return TransferProcessTerminated.Builder.newInstance()
.callbackAddresses(List.of(CallbackAddress.Builder.newInstance()
.uri("local://test")
.events(Set.of("test"))
.transactional(true)
.build()))
.reason(reason)
.transferProcessId(transferProcessId)
.build();
}

public static EndpointDataReference getEdr() {
return EndpointDataReference.Builder.newInstance()
.id("dataRequestId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
import static org.eclipse.edc.spi.types.domain.edr.EndpointDataReference.EDR_SIMPLE_TYPE;
import static org.eclipse.tractusx.edc.callback.TestFunctions.getEdr;
import static org.eclipse.tractusx.edc.callback.TestFunctions.getTransferProcessStartedEvent;
import static org.eclipse.tractusx.edc.callback.TestFunctions.getTransferTerminatedEvent;
import static org.eclipse.tractusx.edc.callback.TestFunctions.remoteMessage;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.REFRESHING;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -182,6 +184,45 @@ void invoke_shouldFail_withInvalidDataAddress() {
verifyNoInteractions(transferProcessStore);
}

@Test
void invoke_shouldStopEdrNegotiation_whenTerminatedMessageReceived() {

var transferProcessId = "transferProcessId";
var assetId = "assetId";
var contractId = "contractId";
var edr = getEdr();

var dataRequest = DataRequest.Builder.newInstance().id(edr.getId())
.destinationType("HttpProxy")
.assetId(assetId)
.contractId(contractId)
.build();

var transferProcess = TransferProcess.Builder.newInstance()
.id(transferProcessId)
.dataRequest(dataRequest)
.build();

var edrEntry = EndpointDataReferenceEntry.Builder.newInstance()
.agreementId(contractId)
.transferProcessId(transferProcessId)
.assetId(assetId)
.state(REFRESHING.code())
.build();

when(transferProcessStore.findById(transferProcessId)).thenReturn(transferProcess);
when(edrCache.queryForEntries(any())).thenReturn(Stream.of(edrEntry));

var event = getTransferTerminatedEvent(transferProcessId, "Failure");
var message = remoteMessage(event);

var result = callback.invoke(message);
assertThat(result.succeeded()).isTrue();

verify(edrCache).update(argThat(entry -> entry.getState() == EndpointDataReferenceEntryStates.ERROR.code()));

}

@ParameterizedTest
@ArgumentsSource(EventInstances.class)
void invoke_shouldIgnoreOtherEvents(TransferProcessEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Set;

import static java.lang.String.format;
import static org.eclipse.tractusx.edc.iam.ssi.miw.api.MiwFallbackFactories.retryWhenStatusIsNotIn;

public class MiwApiClientImpl implements MiwApiClient {

Expand Down Expand Up @@ -144,8 +145,14 @@ private Result<Void> handleVerifyResult(Map<String, Object> response) {
}

private <R> Result<R> executeRequest(Request request, TypeReference<R> typeReference) {
try (var response = httpClient.execute(request)) {
try (var response = httpClient.execute(request, List.of(retryWhenStatusIsNotIn(200, 201)))) {
return handleResponse(response, typeReference);
} catch (MiwClientException e) {
if (e.getResponse() != null) {
return handleError(e.getResponse());
} else {
return Result.failure(e.getMessage());
}
} catch (IOException e) {
return Result.failure(e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.iam.ssi.miw.api;

import okhttp3.Response;
import org.eclipse.edc.spi.http.EdcHttpClientException;

/**
* Custom client exception for handling failure and retries when fetching data from MIW.
*/
public class MiwClientException extends EdcHttpClientException {
private final Response response;

public MiwClientException(String message) {
this(message, null);
}

public MiwClientException(String message, Response response) {
super(message);
this.response = response;
}

public Response getResponse() {
return response;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.iam.ssi.miw.api;

import dev.failsafe.Fallback;
import dev.failsafe.event.ExecutionAttemptedEvent;
import dev.failsafe.function.CheckedFunction;
import okhttp3.Response;
import org.eclipse.edc.spi.http.FallbackFactory;

import java.util.Arrays;
import java.util.stream.Collectors;

import static java.lang.String.format;

public interface MiwFallbackFactories {

static FallbackFactory retryWhenStatusIsNot(int status) {
return retryWhenStatusIsNotIn(status);
}

/**
* Verifies that the response has a specific statuses, otherwise it should be retried
*
* @return the {@link FallbackFactory}
*/
static FallbackFactory retryWhenStatusIsNotIn(int... status) {
var codes = Arrays.stream(status).boxed().collect(Collectors.toSet());
return request -> {
CheckedFunction<ExecutionAttemptedEvent<? extends Response>, Exception> exceptionSupplier = event -> {
var response = event.getLastResult();
if (response == null) {
return new MiwClientException(event.getLastException().getMessage());
} else {
return new MiwClientException(format("Server response to %s was not one of %s but was %s", request, Arrays.toString(status), response.code()), response);
}
};
return Fallback.builderOfException(exceptionSupplier)
.handleResultIf(r -> !codes.contains(r.code()))
.build();
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import okhttp3.mockwebserver.MockWebServer;
import org.assertj.core.api.Condition;
import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates;
import org.eclipse.edc.policy.model.Operator;
import org.eclipse.tractusx.edc.lifecycle.Participant;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -38,6 +39,7 @@
import static org.assertj.core.api.Assertions.anyOf;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.awaitility.pollinterval.FibonacciPollInterval.fibonacci;
import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.EXPIRED;
import static org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntryStates.NEGOTIATED;
Expand Down Expand Up @@ -108,21 +110,35 @@ void negotiateEdr_shouldRenewTheEdr() throws IOException {

assertThat(expectedEvents).usingRecursiveFieldByFieldElementComparator().containsAll(events);

JsonArrayBuilder edrCaches = Json.createArrayBuilder();
JsonArrayBuilder edrCachesBuilder = Json.createArrayBuilder();
wolf4ood marked this conversation as resolved.
Show resolved Hide resolved

await().atMost(ASYNC_TIMEOUT)
.pollInterval(ASYNC_POLL_INTERVAL)
.untilAsserted(() -> {
var localEdrCaches = SOKRATES.getEdrEntriesByAssetId(assetId);
assertThat(localEdrCaches).hasSizeGreaterThan(1);
localEdrCaches.forEach(edrCaches::add);
localEdrCaches.forEach(edrCachesBuilder::add);
});

var edrCaches = edrCachesBuilder.build();

assertThat(edrCaches.build())
assertThat(edrCaches)
.extracting(json -> json.asJsonObject().getJsonString("tx:edrState").getString())
.areAtMost(1, anyOf(stateCondition(NEGOTIATED.name(), "Negotiated"), stateCondition(REFRESHING.name(), "Refreshing")))
.areAtLeast(1, stateCondition(EXPIRED.name(), "Expired"));

var transferProcessId = edrCaches.stream()
.filter(json -> json.asJsonObject().getJsonString("tx:edrState").getString().equals(EXPIRED.name()))
.map(json -> json.asJsonObject().getJsonString("edc:transferProcessId").getString())
.findFirst()
.orElseThrow();

await().pollInterval(fibonacci())
.atMost(ASYNC_TIMEOUT)
.untilAsserted(() -> {
var tpState = SOKRATES.getTransferProcessState(transferProcessId);
assertThat(tpState).isNotNull().isEqualTo(TransferProcessStates.COMPLETED.toString());
});
}


Expand Down