Skip to content

Commit

Permalink
feat: adds CP callbacks for transfer complete/fail (#782)
Browse files Browse the repository at this point in the history
* feat: adds CP callbacks for transfer complete/fail

* chore: dependencies file
  • Loading branch information
wolf4ood authored Sep 25, 2023
1 parent ceb3cec commit a7931c8
Show file tree
Hide file tree
Showing 12 changed files with 263 additions and 0 deletions.
1 change: 1 addition & 0 deletions DEPENDENCIES
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ maven/mavencentral/org.eclipse.edc/control-api-configuration/0.3.0, Apache-2.0,
maven/mavencentral/org.eclipse.edc/control-plane-aggregate-services/0.3.0, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/control-plane-api-client-spi/0.3.0, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/control-plane-api-client/0.3.0, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/control-plane-api/0.3.0, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/control-plane-core/0.3.0, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/control-plane-spi/0.3.0, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/core-spi/0.3.0, Apache-2.0, approved, technology.edc
Expand Down
7 changes: 7 additions & 0 deletions charts/tractusx-connector-azure-vault/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ Validation URL
{{- printf "http://%s-controlplane:%v%s/token" ( include "txdc.fullname" $ ) $.Values.controlplane.endpoints.control.port $.Values.controlplane.endpoints.control.path -}}
{{- end }}

{{/*
Control Plane URL
*/}}
{{- define "txdc.controlplane.url.control" -}}
{{- printf "http://%s-controlplane:%v%s" ( include "txdc.fullname" $ ) $.Values.controlplane.endpoints.control.port $.Values.controlplane.endpoints.control.path -}}
{{- end }}

{{/*
Data Control URL
*/}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ spec:
value: {{ .Values.controlplane.endpoints.protocol.port | quote }}
- name: "WEB_HTTP_PROTOCOL_PATH"
value: {{ .Values.controlplane.endpoints.protocol.path | quote }}
- name: "EDC_CONTROL_ENDPOINT"
value: {{ include "txdc.controlplane.url.control" .}}

#########
## DSP ##
Expand Down
7 changes: 7 additions & 0 deletions charts/tractusx-connector/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ Validation URL
{{- printf "http://%s-controlplane:%v%s/token" ( include "txdc.fullname" $ ) $.Values.controlplane.endpoints.control.port $.Values.controlplane.endpoints.control.path -}}
{{- end }}

{{/*
Control Plane URL
*/}}
{{- define "txdc.controlplane.url.control" -}}
{{- printf "http://%s-controlplane:%v%s" ( include "txdc.fullname" $ ) $.Values.controlplane.endpoints.control.port $.Values.controlplane.endpoints.control.path -}}
{{- end }}

{{/*
Data Control URL
*/}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ spec:
value: {{ .Values.controlplane.endpoints.protocol.port | quote }}
- name: "WEB_HTTP_PROTOCOL_PATH"
value: {{ .Values.controlplane.endpoints.protocol.path | quote }}
- name: "EDC_CONTROL_ENDPOINT"
value: {{ include "txdc.controlplane.url.control" .}}

#########
## DSP ##
Expand Down
1 change: 1 addition & 0 deletions edc-controlplane/edc-controlplane-base/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ dependencies {
runtimeOnly(libs.edc.auth.tokenbased)

runtimeOnly(libs.edc.api.management)
runtimeOnly(libs.edc.api.controlplane)
runtimeOnly(libs.edc.api.management.config)
runtimeOnly(libs.edc.api.observability)
runtimeOnly(libs.edc.dsp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;

import static io.restassured.RestAssured.given;
import static io.restassured.http.ContentType.JSON;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationStates.FINALIZED;
import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.ID;
import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DCAT_DATASET_ATTRIBUTE;
import static org.eclipse.tractusx.edc.helpers.AssetHelperFunctions.createDataAddressBuilder;
Expand Down Expand Up @@ -342,6 +344,36 @@ public JsonArray getCatalogDatasets(Participant provider, JsonObject querySpec)
return datasetReference.get();
}

/**
* Request a provider asset:
* - retrieves the contract definition associated with the asset,
* - handles the contract negotiation,
* - initiate the data transfer.
*
* @param provider data provider
* @param assetId asset id
* @param destination data destination
* @return transfer process id.
*/
public String requestAsset(Participant provider, String assetId, JsonObject destination) {
var negotiationId = negotiateContract(provider, assetId);
var contractAgreementId = waitForAgreementId(negotiationId);
var transferProcessId = requestTransfer(UUID.randomUUID().toString(), contractAgreementId, assetId, provider, destination);
assertThat(transferProcessId).isNotNull();
return transferProcessId;
}


public String waitForAgreementId(String negotiationId) {
await().atMost(timeout).untilAsserted(() -> {
var state = getContractNegotiationField(negotiationId, "state");
assertThat(state).isEqualTo(FINALIZED.name());
});

return getContractAgreementId(negotiationId);
}


public String pullProxyDataByAssetId(Participant provider, String assetId) {
var body = Map.of("assetId", assetId, "endpointUrl", format("%s/aas/test", provider.gatewayEndpoint));
return getProxyData(body);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public static Map<String, String> sokratesConfiguration() {
put("web.http.public.path", "/api/public");
put("web.http.public.port", SOKRATES_PUBLIC_API_PORT);

put("edc.transfer.send.retry.limit", "1");
put("edc.transfer.send.retry.base-delay.ms", "100");
// embedded dataplane config
put("web.http.control.path", "/api/dataplane/control");
put("web.http.control.port", SOKRATES_DATAPLANE_CONTROL_PORT);
Expand Down Expand Up @@ -128,6 +130,8 @@ public static Map<String, String> platoConfiguration() {
put("edc.api.auth.key", "testkey");
put("web.http.public.port", PLATO_PUBLIC_API_PORT);
put("web.http.public.path", "/api/public");
put("edc.transfer.send.retry.limit", "1");
put("edc.transfer.send.retry.base-delay.ms", "100");
// embedded dataplane config
put("web.http.control.path", "/api/dataplane/control");
put("web.http.control.port", PLATO_DATAPLANE_CONTROL_PORT);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.tests.transfer;

import jakarta.json.Json;
import jakarta.json.JsonObject;
import okhttp3.mockwebserver.Dispatcher;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.eclipse.tractusx.edc.lifecycle.Participant;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.time.Duration;
import java.util.UUID;

import static jakarta.json.Json.createObjectBuilder;
import static java.time.Duration.ofSeconds;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.COMPLETED;
import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE;
import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE;
import static org.eclipse.tractusx.edc.helpers.PolicyHelperFunctions.businessPartnerNumberPolicy;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.PLATO_BPN;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.PLATO_NAME;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.SOKRATES_BPN;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.SOKRATES_NAME;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.platoConfiguration;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.sokratesConfiguration;

public abstract class AbstractHttpProviderPushTest {

protected static final Participant SOKRATES = new Participant(SOKRATES_NAME, SOKRATES_BPN, sokratesConfiguration());
protected static final Participant PLATO = new Participant(PLATO_NAME, PLATO_BPN, platoConfiguration());

private static final Duration ASYNC_TIMEOUT = ofSeconds(45);
private static final Duration ASYNC_POLL_INTERVAL = ofSeconds(1);
private MockWebServer server;

@BeforeEach
void setup() {
server = new MockWebServer();
}

@Test
void httpPushDataTransfer() throws IOException {
var assetId = UUID.randomUUID().toString();

var providerUrl = server.url("/mock/api/provider");
var consumerUrl = server.url("/mock/api/consumer");

server.setDispatcher(new Dispatcher() {
@NotNull
@Override
public MockResponse dispatch(@NotNull RecordedRequest recordedRequest) throws InterruptedException {
return switch (recordedRequest.getPath().split("\\?")[0]) {
case "/mock/api/provider" -> new MockResponse().setResponseCode(200);
case "/mock/api/consumer" -> new MockResponse().setResponseCode(200);
default -> new MockResponse().setResponseCode(404);
};
}
});

server.start();

PLATO.createAsset(assetId, Json.createObjectBuilder().build(), httpDataAddress(providerUrl.toString()));
PLATO.createPolicy(createTestPolicy("policy-1", SOKRATES.getBpn()));
PLATO.createContractDefinition(assetId, "def-1", "policy-1", "policy-1");

var destination = httpDataAddress(consumerUrl.toString());

var transferProcessId = SOKRATES.requestAsset(PLATO, assetId, destination);
await().atMost(ASYNC_TIMEOUT).untilAsserted(() -> {
var state = SOKRATES.getTransferProcessState(transferProcessId);
assertThat(state).isEqualTo(COMPLETED.name());
});
}

@AfterEach
void teardown() throws IOException {
server.shutdown();
}

protected JsonObject createTestPolicy(String policyId, String bpn) {
return businessPartnerNumberPolicy(policyId, bpn);
}

private JsonObject httpDataAddress(String baseUrl) {
return createObjectBuilder()
.add(TYPE, EDC_NAMESPACE + "DataAddress")
.add(EDC_NAMESPACE + "type", "HttpData")
.add(EDC_NAMESPACE + "properties", createObjectBuilder()
.add(EDC_NAMESPACE + "baseUrl", baseUrl)
.build())
.build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.tests.transfer;

import org.eclipse.edc.junit.annotations.EndToEndTest;
import org.eclipse.tractusx.edc.lifecycle.ParticipantRuntime;
import org.junit.jupiter.api.extension.RegisterExtension;

import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.PLATO_BPN;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.PLATO_NAME;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.SOKRATES_BPN;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.SOKRATES_NAME;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.platoConfiguration;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.sokratesConfiguration;

@EndToEndTest
public class HttpProviderPushInMemoryTest extends AbstractHttpProviderPushTest {

@RegisterExtension
protected static final ParticipantRuntime SOKRATES_RUNTIME = new ParticipantRuntime(
":edc-tests:runtime:runtime-memory",
SOKRATES_NAME,
SOKRATES_BPN,
sokratesConfiguration()
);

@RegisterExtension
protected static final ParticipantRuntime PLATO_RUNTIME = new ParticipantRuntime(
":edc-tests:runtime:runtime-memory",
PLATO_NAME,
PLATO_BPN,
platoConfiguration()
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.tests.transfer;

import org.eclipse.edc.junit.annotations.PostgresqlDbIntegrationTest;
import org.eclipse.tractusx.edc.lifecycle.PgParticipantRuntime;
import org.junit.jupiter.api.extension.RegisterExtension;

import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.PLATO_BPN;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.PLATO_NAME;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.SOKRATES_BPN;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.SOKRATES_NAME;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.platoConfiguration;
import static org.eclipse.tractusx.edc.lifecycle.TestRuntimeConfiguration.sokratesConfiguration;

@PostgresqlDbIntegrationTest
public class HttpProviderPushInPostgresqlTest extends AbstractHttpProviderPushTest {

@RegisterExtension
protected static final PgParticipantRuntime SOKRATES_RUNTIME = new PgParticipantRuntime(
":edc-tests:runtime:runtime-postgresql",
SOKRATES_NAME,
SOKRATES_BPN,
sokratesConfiguration()
);
@RegisterExtension
protected static final PgParticipantRuntime PLATO_RUNTIME = new PgParticipantRuntime(
":edc-tests:runtime:runtime-postgresql",
PLATO_NAME,
PLATO_BPN,
platoConfiguration()
);
}
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ edc-api-observability = { module = "org.eclipse.edc:api-observability", version.
edc-api-contractnegotiation = { module = "org.eclipse.edc:contract-negotiation-api", version.ref = "edc" }
edc-api-dataplane = { module = "org.eclipse.edc:dataplane-api", version.ref = "edc" }
edc-api-transferprocess = { module = "org.eclipse.edc:transfer-process-api", version.ref = "edc" }
edc-api-controlplane = { module = "org.eclipse.edc:control-plane-api", version.ref = "edc" }
edc-dsp = { module = "org.eclipse.edc:dsp", version.ref = "edc" }
edc-iam-mock = { module = "org.eclipse.edc:iam-mock", version.ref = "edc" }
edc-policy-engine = { module = "org.eclipse.edc:policy-engine", version.ref = "edc" }
Expand Down

0 comments on commit a7931c8

Please sign in to comment.