Skip to content

Commit

Permalink
fix: Fatal error while resume PULL transfer from provider (#4647)
Browse files Browse the repository at this point in the history
* fix: DOS-1413 error while resume PULL transfer from provider

* fix: DOS-1413 error while resume PULL transfer from provider
tests improvement

* fix: DOS-1413 error while resume PULL transfer from provider
tests improvement

* fix: unit test shouldResumeTransfer_whenDataPlaneRestarts
  • Loading branch information
AndrYurk authored Nov 28, 2024
1 parent 8c49531 commit 1d74c05
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.eclipse.edc.connector.controlplane.catalog.spi.Dataset;
import org.eclipse.edc.connector.controlplane.contract.spi.types.offer.ContractDefinition;
import org.eclipse.edc.connector.controlplane.policy.spi.PolicyDefinition;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates;
import org.eclipse.edc.jsonld.TitaniumJsonLd;
import org.eclipse.edc.jsonld.spi.JsonLd;
import org.eclipse.edc.jsonld.util.JacksonJsonLd;
Expand Down Expand Up @@ -498,6 +499,10 @@ public String getContractNegotiationState(String id) {
return getContractNegotiationField(id, "state");
}

public void awaitTransferToBeInState(String transferProcessId, TransferProcessStates state) {
await().atMost(timeout).until(() -> getTransferProcessState(transferProcessId), it -> Objects.equals(it, state.name()));
}

protected String getContractNegotiationField(String negotiationId, String fieldName) {
return managementEndpoint.baseRequest()
.contentType(JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public void transitionStarting() {
}

public boolean canBeStartedConsumer() {
return currentStateIsOneOf(STARTED, REQUESTED, STARTING, RESUMED);
return currentStateIsOneOf(STARTED, REQUESTED, STARTING, RESUMED, SUSPENDED);
}

public void transitionStarted(String dataPlaneId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess.Type.CONSUMER;
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.STARTED;
import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.TERMINATING;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -98,8 +99,11 @@ void verifyConsumerTransitions() {

assertThrows(IllegalStateException.class, process::transitionStarting, "STARTING is not a valid state for consumer");
process.transitionStarted("dataPlaneId");
// should not set the data plane id
assertThat(process.getDataPlaneId()).isNull();

process.transitionSuspending("suspension");
process.transitionSuspended();

process.transitionStarted("dataPlaneId");

process.transitionCompleting();
process.transitionCompleted();
Expand All @@ -108,6 +112,20 @@ void verifyConsumerTransitions() {
process.transitionDeprovisioned();
}

@ParameterizedTest
@EnumSource(value = TransferProcessStates.class, mode = INCLUDE, names = { "STARTING", "SUSPENDED" })
void shouldNotSetDataPlaneIdOnStart_whenTransferIsConsumer(TransferProcessStates fromState) {
var process = TransferProcess.Builder.newInstance()
.id(UUID.randomUUID().toString()).type(CONSUMER)
.state(fromState.code())
.build();

process.transitionStarted("dataPlaneId");

assertThat(process.stateAsString()).isEqualTo(STARTED.name());
assertThat(process.getDataPlaneId()).isNull();
}

@Test
void verifyProviderTransitions() {
var process = TransferProcess.Builder.newInstance().id(UUID.randomUUID().toString()).type(TransferProcess.Type.PROVIDER).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,14 @@
package org.eclipse.edc.test.e2e;

import jakarta.json.JsonObject;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates;
import org.eclipse.edc.spi.security.Vault;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;

import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;

import static org.awaitility.Awaitility.await;
import static org.eclipse.edc.connector.controlplane.test.system.utils.PolicyFixtures.noConstraintPolicy;
import static org.eclipse.edc.junit.testfixtures.TestUtils.getResourceFileContentAsString;

Expand Down Expand Up @@ -70,12 +67,4 @@ protected void createResourcesOnProvider(String assetId, Map<String, Object> dat
PROVIDER.createAsset(assetId, Map.of("description", "description"), dataAddressProperties);
PROVIDER.createContractDefinition(assetId, UUID.randomUUID().toString(), noConstraintPolicyId, noConstraintPolicyId);
}

protected void awaitTransferToBeInState(String transferProcessId, TransferProcessStates state) {
await().atMost(timeout).until(
() -> CONSUMER.getTransferProcessState(transferProcessId),
it -> Objects.equals(it, state.name())
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ void httpPull_dataTransfer_withCallbacks() {
.withCallbacks(callbacks)
.execute();

awaitTransferToBeInState(transferProcessId, STARTED);
CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED);

await().atMost(timeout).untilAsserted(() -> assertThat(events.get(transferProcessId)).isNotNull());

Expand All @@ -150,7 +150,7 @@ void httpPull_dataTransfer_withEdrCache() {
.withTransferType("HttpData-PULL")
.execute();

awaitTransferToBeInState(transferProcessId, STARTED);
CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED);

var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull);

Expand All @@ -168,7 +168,7 @@ void httpPull_dataTransfer_withEdrCache() {
}

@Test
void suspendAndResume_httpPull_dataTransfer_withEdrCache() {
void suspendAndResumeByConsumer_httpPull_dataTransfer_withEdrCache() {
providerDataSource.when(HttpRequest.request()).respond(HttpResponse.response().withBody("data"));
var assetId = UUID.randomUUID().toString();
createResourcesOnProvider(assetId, httpSourceDataAddress());
Expand All @@ -177,7 +177,7 @@ void suspendAndResume_httpPull_dataTransfer_withEdrCache() {
.withTransferType("HttpData-PULL")
.execute();

awaitTransferToBeInState(transferProcessId, STARTED);
CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED);

var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull);

Expand All @@ -186,7 +186,7 @@ void suspendAndResume_httpPull_dataTransfer_withEdrCache() {

CONSUMER.suspendTransfer(transferProcessId, "supension");

awaitTransferToBeInState(transferProcessId, SUSPENDED);
CONSUMER.awaitTransferToBeInState(transferProcessId, SUSPENDED);

// checks that the EDR is gone once the transfer has been suspended
await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(transferProcessId)));
Expand All @@ -196,14 +196,55 @@ void suspendAndResume_httpPull_dataTransfer_withEdrCache() {
CONSUMER.resumeTransfer(transferProcessId);

// check that transfer is available again
awaitTransferToBeInState(transferProcessId, STARTED);
CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED);
var secondEdr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull);
var secondMessage = UUID.randomUUID().toString();
await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), body -> assertThat(body).isEqualTo("data")));

providerDataSource.verify(HttpRequest.request("/source").withMethod("GET"));
}

@Test
void suspendAndResumeByProvider_httpPull_dataTransfer_withEdrCache() {
providerDataSource.when(HttpRequest.request()).respond(HttpResponse.response().withBody("data"));
var assetId = UUID.randomUUID().toString();
createResourcesOnProvider(assetId, httpSourceDataAddress());

var consumerTransferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER)
.withTransferType("HttpData-PULL")
.execute();

CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, STARTED);

var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull);

var msg = UUID.randomUUID().toString();
await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")));

var providerTransferProcessId = PROVIDER.getTransferProcesses().stream()
.filter(filter -> filter.asJsonObject().getString("correlationId").equals(consumerTransferProcessId))
.map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow();

PROVIDER.suspendTransfer(providerTransferProcessId, "supension");

PROVIDER.awaitTransferToBeInState(providerTransferProcessId, SUSPENDED);

// checks that the EDR is gone once the transfer has been suspended
await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(consumerTransferProcessId)));
// checks that transfer fails
await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))));

PROVIDER.resumeTransfer(providerTransferProcessId);

// check that transfer is available again
PROVIDER.awaitTransferToBeInState(providerTransferProcessId, STARTED);
var secondEdr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull);
var secondMessage = UUID.randomUUID().toString();
await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), body -> assertThat(body).isEqualTo("data")));

providerDataSource.verify(HttpRequest.request("/source").withMethod("GET"));
}

@Test
void pullFromHttp_httpProvision() {
providerDataSource.when(HttpRequest.request()).respond(HttpResponse.response().withBody("data"));
Expand All @@ -222,7 +263,7 @@ void pullFromHttp_httpProvision() {
.withTransferType("HttpData-PULL")
.execute();

awaitTransferToBeInState(transferProcessId, STARTED);
CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED);

await().atMost(timeout).untilAsserted(() -> {
var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull);
Expand All @@ -246,7 +287,7 @@ void shouldTerminateTransfer_whenContractExpires_fixedInForcePeriod() {
.withTransferType("HttpData-PULL")
.execute();

awaitTransferToBeInState(transferProcessId, TERMINATED);
CONSUMER.awaitTransferToBeInState(transferProcessId, TERMINATED);
}

@Test
Expand All @@ -261,7 +302,7 @@ void shouldTerminateTransfer_whenContractExpires_durationInForcePeriod() {
.withTransferType("HttpData-PULL")
.execute();

awaitTransferToBeInState(transferProcessId, TERMINATED);
CONSUMER.awaitTransferToBeInState(transferProcessId, TERMINATED);
}

public JsonObject createCallback(String url, boolean transactional, Set<String> events) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ void httpPushDataTransfer() {
.withDestination(httpDataAddress("http://localhost:" + consumerDataDestination.getPort() + "/destination"))
.withTransferType("HttpData-PUSH").execute();

awaitTransferToBeInState(transferProcessId, COMPLETED);
CONSUMER.awaitTransferToBeInState(transferProcessId, COMPLETED);

providerDataSource.verify(HttpRequest.request("/source").withMethod("GET"));
consumerDataDestination.verify(HttpRequest.request("/destination").withBody(BinaryBody.binary("data".getBytes())));
Expand All @@ -109,7 +109,7 @@ void httpToHttp_oauth2Provisioning() {
.withDestination(httpDataAddress("http://localhost:" + consumerDataDestination.getPort() + "/destination"))
.withTransferType("HttpData-PUSH").execute();

awaitTransferToBeInState(transferProcessId, COMPLETED);
CONSUMER.awaitTransferToBeInState(transferProcessId, COMPLETED);

oauth2server.verify(HttpRequest.request("/token").withBody("grant_type=client_credentials&client_secret=supersecret&client_id=clientId"));
providerDataSource.verify(HttpRequest.request("/source").withMethod("GET").withHeader("Authorization", "Bearer token"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ void shouldResumeTransfer_whenDataPlaneRestarts() {

PROVIDER_DATA_PLANE_RUNTIME.boot(false);

awaitTransferToBeInState(transferProcessId, STARTED);
CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED);
assertMessagesAreSentTo(consumer);
}
}
Expand Down Expand Up @@ -189,7 +189,7 @@ void kafkaToHttpTransfer() {
destinationServer.verify(request, atLeast(1));
});

awaitTransferToBeInState(transferProcessId, TERMINATED);
CONSUMER.awaitTransferToBeInState(transferProcessId, TERMINATED);

destinationServer.clear(request)
.when(request).respond(response());
Expand Down Expand Up @@ -218,7 +218,7 @@ void kafkaToKafkaTransfer() {
.withDestination(kafkaSink()).withTransferType("Kafka-PUSH").execute();
assertMessagesAreSentTo(consumer);

awaitTransferToBeInState(transferProcessId, TERMINATED);
CONSUMER.awaitTransferToBeInState(transferProcessId, TERMINATED);
assertNoMoreMessagesAreSentTo(consumer);
}
}
Expand All @@ -236,11 +236,11 @@ void shouldSuspendAndResumeTransfer() {
assertMessagesAreSentTo(consumer);

CONSUMER.suspendTransfer(transferProcessId, "any kind of reason");
awaitTransferToBeInState(transferProcessId, SUSPENDED);
CONSUMER.awaitTransferToBeInState(transferProcessId, SUSPENDED);
assertNoMoreMessagesAreSentTo(consumer);

CONSUMER.resumeTransfer(transferProcessId);
awaitTransferToBeInState(transferProcessId, STARTED);
CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED);
assertMessagesAreSentTo(consumer);
}
}
Expand Down

0 comments on commit 1d74c05

Please sign in to comment.