diff --git a/extensions/data-plane/data-plane-public-api-v2/build.gradle.kts b/extensions/data-plane/data-plane-public-api-v2/build.gradle.kts new file mode 100644 index 00000000000..f8d8ba71851 --- /dev/null +++ b/extensions/data-plane/data-plane-public-api-v2/build.gradle.kts @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2022 Microsoft Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Microsoft Corporation - initial API and implementation + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - improvements + * Mercedes-Benz Tech Innovation GmbH - publish public api context into dedicated swagger hub page + * + */ + + +plugins { + `java-library` + id("io.swagger.core.v3.swagger-gradle-plugin") +} + +dependencies { + api(project(":spi:common:http-spi")) + api(project(":spi:common:web-spi")) + api(project(":spi:data-plane:data-plane-spi")) + implementation(project(":core:common:util")) + + implementation(project(":core:data-plane:data-plane-util")) + implementation(libs.jakarta.rsApi) + + testImplementation(project(":extensions:common:http")) + testImplementation(project(":core:common:junit")) + testImplementation(libs.jersey.multipart) + testImplementation(libs.restAssured) + testImplementation(libs.mockserver.netty) + testImplementation(libs.mockserver.client) + testImplementation(testFixtures(project(":extensions:common:http:jersey-core"))) +} +edcBuild { + swagger { + apiGroup.set("public-api") + } +} + + diff --git a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java new file mode 100644 index 00000000000..e8521ac71c8 --- /dev/null +++ b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.api; + +import org.eclipse.edc.connector.dataplane.api.controller.DataPlanePublicApiV2Controller; +import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService; +import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService; +import org.eclipse.edc.runtime.metamodel.annotation.Extension; +import org.eclipse.edc.runtime.metamodel.annotation.Inject; +import org.eclipse.edc.spi.system.ExecutorInstrumentation; +import org.eclipse.edc.spi.system.ServiceExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.eclipse.edc.web.spi.WebServer; +import org.eclipse.edc.web.spi.WebService; +import org.eclipse.edc.web.spi.configuration.WebServiceConfigurer; +import org.eclipse.edc.web.spi.configuration.WebServiceSettings; + +import java.util.concurrent.Executors; + +/** + * This extension provides generic endpoints which are open to public participants of the Dataspace to execute + * requests on the actual data source. + */ +@Extension(value = DataPlanePublicApiExtension.NAME) +public class DataPlanePublicApiExtension implements ServiceExtension { + public static final String NAME = "Data Plane Public API"; + private static final int DEFAULT_PUBLIC_PORT = 8185; + private static final String PUBLIC_API_CONFIG = "web.http.public"; + private static final String PUBLIC_CONTEXT_ALIAS = "public"; + private static final String PUBLIC_CONTEXT_PATH = "/api/v2/public"; + + private static final int DEFAULT_THREAD_POOL = 10; + + private static final WebServiceSettings PUBLIC_SETTINGS = WebServiceSettings.Builder.newInstance() + .apiConfigKey(PUBLIC_API_CONFIG) + .contextAlias(PUBLIC_CONTEXT_ALIAS) + .defaultPath(PUBLIC_CONTEXT_PATH) + .defaultPort(DEFAULT_PUBLIC_PORT) + .name(NAME) + .build(); + + @Inject + private WebServer webServer; + + @Inject + private WebServiceConfigurer webServiceConfigurer; + + @Inject + private PipelineService pipelineService; + + @Inject + private WebService webService; + + @Inject + private ExecutorInstrumentation executorInstrumentation; + @Inject + private DataPlaneAuthorizationService authorizationService; + + @Override + public String name() { + return NAME; + } + + @Override + public void initialize(ServiceExtensionContext context) { + var configuration = webServiceConfigurer.configure(context, webServer, PUBLIC_SETTINGS); + var executorService = executorInstrumentation.instrument( + Executors.newFixedThreadPool(DEFAULT_THREAD_POOL), + "Data plane proxy transfers" + ); + var publicApiController = new DataPlanePublicApiV2Controller(pipelineService, executorService, authorizationService); + webService.registerResource(configuration.getContextAlias(), publicApiController); + } +} diff --git a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ContainerRequestContextApi.java b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ContainerRequestContextApi.java new file mode 100644 index 00000000000..99c48a2e4df --- /dev/null +++ b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ContainerRequestContextApi.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2022 Amadeus + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Amadeus - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.api.controller; + +import jakarta.ws.rs.container.ContainerRequestContext; + +import java.util.Map; + +/** + * Wrapper around {@link ContainerRequestContext} enabling mocking. + */ +public interface ContainerRequestContextApi { + + /** + * Get the request headers. Note that if more than one value is associated to a specific header, + * only the first one is retained. + * + * @return Headers map. + */ + Map headers(); + + /** + * Format query of the request as string, e.g. "hello=world\&foo=bar". + * + * @return Query param string. + */ + String queryParams(); + + /** + * Format the request body into a string. + * + * @return Request body. + */ + String body(); + + /** + * Get the media type from incoming request. + * + * @return Media type. + */ + String mediaType(); + + /** + * Return request path, e.g. "hello/world/foo/bar". + * + * @return Path string. + */ + String path(); + + /** + * Get http method from the incoming request, e.g. "GET", "POST"... + * + * @return Http method. + */ + String method(); +} diff --git a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ContainerRequestContextApiImpl.java b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ContainerRequestContextApiImpl.java new file mode 100644 index 00000000000..c81245a62d6 --- /dev/null +++ b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ContainerRequestContextApiImpl.java @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2022 Amadeus + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Amadeus - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.api.controller; + +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.MediaType; +import org.eclipse.edc.spi.EdcException; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * This class provides a set of API wrapping a {@link ContainerRequestContext}. + */ +public class ContainerRequestContextApiImpl implements ContainerRequestContextApi { + + private static final String QUERY_PARAM_SEPARATOR = "&"; + + private final ContainerRequestContext context; + + public ContainerRequestContextApiImpl(ContainerRequestContext context) { + this.context = context; + } + + @Override + public Map headers() { + return context.getHeaders().entrySet() + .stream() + .filter(entry -> !entry.getValue().isEmpty()) + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().get(0))); + } + + @Override + public String queryParams() { + return context.getUriInfo().getQueryParameters().entrySet() + .stream() + .map(entry -> new QueryParam(entry.getKey(), entry.getValue())) + .filter(QueryParam::isValid) + .map(QueryParam::toString) + .collect(Collectors.joining(QUERY_PARAM_SEPARATOR)); + } + + @Override + public String body() { + try (BufferedReader br = new BufferedReader(new InputStreamReader(context.getEntityStream()))) { + return br.lines().collect(Collectors.joining("\n")); + } catch (IOException e) { + throw new EdcException("Failed to read request body: " + e.getMessage()); + } + } + + @Override + public String path() { + var pathInfo = context.getUriInfo().getPath(); + return pathInfo.startsWith("/") ? pathInfo.substring(1) : pathInfo; + } + + @Override + public String mediaType() { + return Optional.ofNullable(context.getMediaType()) + .map(MediaType::toString) + .orElse(null); + } + + @Override + public String method() { + return context.getMethod(); + } + + private static final class QueryParam { + + private final String key; + private final List values; + private final boolean valid; + + private QueryParam(String key, List values) { + this.key = key; + this.values = values; + this.valid = key != null && values != null && !values.isEmpty(); + } + + public boolean isValid() { + return valid; + } + + @Override + public String toString() { + return valid ? key + "=" + values.get(0) : ""; + } + } +} diff --git a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowRequestSupplier.java b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowRequestSupplier.java new file mode 100644 index 00000000000..b1af7e8159c --- /dev/null +++ b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowRequestSupplier.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2022 Amadeus + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Amadeus - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.api.controller; + +import org.eclipse.edc.connector.dataplane.util.sink.AsyncStreamingDataSink; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.function.BiFunction; + +import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.BODY; +import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.MEDIA_TYPE; +import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.METHOD; +import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.PATH; +import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.QUERY_PARAMS; + +public class DataFlowRequestSupplier implements BiFunction { + + /** + * Put all properties of the incoming request (method, request body, query params...) into a map. + */ + private static Map createProps(ContainerRequestContextApi contextApi) { + var props = new HashMap(); + props.put(METHOD, contextApi.method()); + props.put(QUERY_PARAMS, contextApi.queryParams()); + props.put(PATH, contextApi.path()); + Optional.ofNullable(contextApi.mediaType()) + .ifPresent(mediaType -> { + props.put(MEDIA_TYPE, mediaType); + props.put(BODY, contextApi.body()); + }); + return props; + } + + /** + * Create a {@link DataFlowStartMessage} based on incoming request and claims decoded from the access token. + * + * @param contextApi Api for accessing request properties. + * @param dataAddress Source data address. + * @return DataFlowRequest + */ + @Override + public DataFlowStartMessage apply(ContainerRequestContextApi contextApi, DataAddress dataAddress) { + var props = createProps(contextApi); + return DataFlowStartMessage.Builder.newInstance() + .processId(UUID.randomUUID().toString()) + .sourceDataAddress(dataAddress) + .destinationDataAddress(DataAddress.Builder.newInstance() + .type(AsyncStreamingDataSink.TYPE) + .build()) + .id(UUID.randomUUID().toString()) + .properties(props) + .build(); + } +} diff --git a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2.java b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2.java new file mode 100644 index 00000000000..426d96ccfca --- /dev/null +++ b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2022 Amadeus + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Amadeus - Initial implementation + * + */ + +package org.eclipse.edc.connector.dataplane.api.controller; + +import io.swagger.v3.oas.annotations.OpenAPIDefinition; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.tags.Tag; +import jakarta.ws.rs.container.AsyncResponse; +import jakarta.ws.rs.container.ContainerRequestContext; + +@OpenAPIDefinition +@Tag(name = "Data Plane public API", + description = "The public API of the Data Plane is a data proxy enabling a data consumer to actively query" + + "data from the provider data source (e.g. backend Rest API, internal database...) through its Data Plane" + + "instance. Thus the Data Plane is the only entry/output door for the data, which avoids the provider to expose" + + "directly its data externally." + + "The Data Plane public API being a proxy, it supports all verbs (i.e. GET, POST, PUT, PATCH, DELETE), which" + + "can then conveyed until the data source is required. This is especially useful when the actual data source" + + "is a Rest API itself." + + "In the same manner, any set of arbitrary query parameters, path parameters and request body are supported " + + "(in the limits fixed by the HTTP server) and can also conveyed to the actual data source.") +public interface DataPlanePublicApiV2 { + + @Operation(description = "Send `GET` data query to the Data Plane.", + responses = { + @ApiResponse(responseCode = "400", description = "Missing access token"), + @ApiResponse(responseCode = "403", description = "Access token is expired or invalid"), + @ApiResponse(responseCode = "500", description = "Failed to transfer data") + } + ) + void get(ContainerRequestContext context, AsyncResponse response); + + @Operation(description = "Send `POST` data query to the Data Plane.", + responses = { + @ApiResponse(responseCode = "400", description = "Missing access token"), + @ApiResponse(responseCode = "403", description = "Access token is expired or invalid"), + @ApiResponse(responseCode = "500", description = "Failed to transfer data") + } + ) + void post(ContainerRequestContext context, AsyncResponse response); + + @Operation(description = "Send `PUT` data query to the Data Plane.", + responses = { + @ApiResponse(responseCode = "400", description = "Missing access token"), + @ApiResponse(responseCode = "403", description = "Access token is expired or invalid"), + @ApiResponse(responseCode = "500", description = "Failed to transfer data") + } + ) + void put(ContainerRequestContext context, AsyncResponse response); + + @Operation(description = "Send `DELETE` data query to the Data Plane.", + responses = { + @ApiResponse(responseCode = "400", description = "Missing access token"), + @ApiResponse(responseCode = "403", description = "Access token is expired or invalid"), + @ApiResponse(responseCode = "500", description = "Failed to transfer data") + } + ) + void delete(ContainerRequestContext context, AsyncResponse response); + + @Operation(description = "Send `PATCH` data query to the Data Plane.", + responses = { + @ApiResponse(responseCode = "400", description = "Missing access token"), + @ApiResponse(responseCode = "403", description = "Access token is expired or invalid"), + @ApiResponse(responseCode = "500", description = "Failed to transfer data") + } + ) + void patch(ContainerRequestContext context, AsyncResponse response); +} diff --git a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2Controller.java b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2Controller.java new file mode 100644 index 00000000000..b8f1770c6ae --- /dev/null +++ b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2Controller.java @@ -0,0 +1,177 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.api.controller; + +import jakarta.ws.rs.DELETE; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.PATCH; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.PUT; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.container.AsyncResponse; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.container.Suspended; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.HttpHeaders; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.StreamingOutput; +import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService; +import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService; +import org.eclipse.edc.connector.dataplane.spi.response.TransferErrorResponse; +import org.eclipse.edc.connector.dataplane.util.sink.AsyncStreamingDataSink; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON; +import static jakarta.ws.rs.core.MediaType.WILDCARD; +import static jakarta.ws.rs.core.Response.Status.FORBIDDEN; +import static jakarta.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; +import static jakarta.ws.rs.core.Response.Status.UNAUTHORIZED; +import static jakarta.ws.rs.core.Response.status; + +@Path("/v2/{any:.*}") +@Produces(WILDCARD) +public class DataPlanePublicApiV2Controller implements DataPlanePublicApiV2 { + + private final PipelineService pipelineService; + private final DataFlowRequestSupplier requestSupplier; + private final ExecutorService executorService; + private final DataPlaneAuthorizationService authorizationService; + + public DataPlanePublicApiV2Controller(PipelineService pipelineService, + ExecutorService executorService, + DataPlaneAuthorizationService authorizationService) { + this.pipelineService = pipelineService; + this.authorizationService = authorizationService; + this.requestSupplier = new DataFlowRequestSupplier(); + this.executorService = executorService; + } + + private static Response error(Response.Status status, String error) { + return status(status).type(APPLICATION_JSON).entity(new TransferErrorResponse(List.of(error))).build(); + } + + @GET + @Override + public void get(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { + handle(requestContext, response); + } + + /** + * Sends a {@link POST} request to the data source and returns data. + * + * @param requestContext Request context. + * @param response Data fetched from the data source. + */ + @POST + @Override + public void post(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { + handle(requestContext, response); + } + + /** + * Sends a {@link PUT} request to the data source and returns data. + * + * @param requestContext Request context. + * @param response Data fetched from the data source. + */ + @PUT + @Override + public void put(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { + handle(requestContext, response); + } + + /** + * Sends a {@link DELETE} request to the data source and returns data. + * + * @param requestContext Request context. + * @param response Data fetched from the data source. + */ + @DELETE + @Override + public void delete(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { + handle(requestContext, response); + } + + /** + * Sends a {@link PATCH} request to the data source and returns data. + * + * @param requestContext Request context. + * @param response Data fetched from the data source. + */ + @PATCH + @Override + public void patch(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { + handle(requestContext, response); + } + + private void handle(ContainerRequestContext requestContext, AsyncResponse response) { + var contextApi = new ContainerRequestContextApiImpl(requestContext); + + var token = contextApi.headers().get(HttpHeaders.AUTHORIZATION); + if (token == null) { + response.resume(error(UNAUTHORIZED, "Missing Authorization Header")); + return; + } + + var sourceDataAddress = authorizationService.authorize(token, buildRequestData(requestContext)); + if (sourceDataAddress.failed()) { + response.resume(error(FORBIDDEN, sourceDataAddress.getFailureDetail())); + return; + } + + var startMessage = requestSupplier.apply(contextApi, sourceDataAddress.getContent()); + + processRequest(startMessage, response); + } + + private Map buildRequestData(ContainerRequestContext requestContext) { + var requestData = new HashMap(); + requestData.put("headers", requestContext.getHeaders()); + requestData.put("path", requestContext.getUriInfo()); + requestData.put("method", requestContext.getMethod()); + requestData.put("content-type", requestContext.getMediaType()); + return requestData; + } + + private void processRequest(DataFlowStartMessage dataFlowStartMessage, AsyncResponse response) { + + AsyncStreamingDataSink.AsyncResponseContext asyncResponseContext = callback -> { + StreamingOutput output = t -> callback.outputStreamConsumer().accept(t); + var resp = Response.ok(output).type(callback.mediaType()).build(); + return response.resume(resp); + }; + + var sink = new AsyncStreamingDataSink(asyncResponseContext, executorService); + + pipelineService.transfer(dataFlowStartMessage, sink) + .whenComplete((result, throwable) -> { + if (throwable == null) { + if (result.failed()) { + response.resume(error(INTERNAL_SERVER_ERROR, result.getFailureDetail())); + } + } else { + var error = "Unhandled exception occurred during data transfer: " + throwable.getMessage(); + response.resume(error(INTERNAL_SERVER_ERROR, error)); + } + }); + } + +} diff --git a/extensions/data-plane/data-plane-public-api-v2/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension b/extensions/data-plane/data-plane-public-api-v2/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension new file mode 100644 index 00000000000..d0da2de712a --- /dev/null +++ b/extensions/data-plane/data-plane-public-api-v2/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension @@ -0,0 +1 @@ +org.eclipse.edc.connector.dataplane.api.DataPlanePublicApiExtension \ No newline at end of file diff --git a/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowStartMessageSupplierTest.java b/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowStartMessageSupplierTest.java new file mode 100644 index 00000000000..8961e2e42f4 --- /dev/null +++ b/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowStartMessageSupplierTest.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2022 Amadeus + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Amadeus - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.api.controller; + +import jakarta.ws.rs.HttpMethod; +import jakarta.ws.rs.core.MediaType; +import org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema; +import org.eclipse.edc.connector.dataplane.util.sink.AsyncStreamingDataSink; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class DataFlowStartMessageSupplierTest { + + + private final DataFlowRequestSupplier supplier = new DataFlowRequestSupplier(); + + private static DataAddress createDataAddress() { + return DataAddress.Builder.newInstance().type("test-type").build(); + } + + @Test + void verifyMapping_noInputBody() { + var contextApi = mock(ContainerRequestContextApi.class); + var address = createDataAddress(); + + var method = HttpMethod.GET; + var queryParams = "test-query-param"; + var path = "test-path"; + + when(contextApi.method()).thenReturn(method); + when(contextApi.queryParams()).thenReturn(queryParams); + when(contextApi.path()).thenReturn(path); + + var request = supplier.apply(contextApi, address); + + assertThat(request.getId()).isNotBlank(); + assertThat(request.getDestinationDataAddress().getType()).isEqualTo(AsyncStreamingDataSink.TYPE); + assertThat(request.getSourceDataAddress().getType()).isEqualTo(address.getType()); + assertThat(request.getProperties()).containsExactlyInAnyOrderEntriesOf(Map.of( + DataFlowRequestSchema.PATH, path, + DataFlowRequestSchema.METHOD, method, + DataFlowRequestSchema.QUERY_PARAMS, queryParams + + )); + } + + @Test + void verifyMapping_withInputBody() { + var contextApi = mock(ContainerRequestContextApi.class); + var address = createDataAddress(); + + var method = HttpMethod.GET; + var queryParams = "test-query-param"; + var path = "test-path"; + var body = "Test request body"; + + when(contextApi.method()).thenReturn(method); + when(contextApi.queryParams()).thenReturn(queryParams); + when(contextApi.path()).thenReturn(path); + when(contextApi.mediaType()).thenReturn(MediaType.TEXT_PLAIN); + when(contextApi.body()).thenReturn(body); + + var request = supplier.apply(contextApi, address); + + assertThat(request.getId()).isNotBlank(); + assertThat(request.getDestinationDataAddress().getType()).isEqualTo(AsyncStreamingDataSink.TYPE); + assertThat(request.getSourceDataAddress().getType()).isEqualTo(address.getType()); + assertThat(request.getProperties()).containsExactlyInAnyOrderEntriesOf(Map.of( + DataFlowRequestSchema.PATH, path, + DataFlowRequestSchema.METHOD, method, + DataFlowRequestSchema.QUERY_PARAMS, queryParams, + DataFlowRequestSchema.BODY, body, + DataFlowRequestSchema.MEDIA_TYPE, MediaType.TEXT_PLAIN + )); + } +} diff --git a/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2ControllerTest.java b/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2ControllerTest.java new file mode 100644 index 00000000000..c8d2bfdd92a --- /dev/null +++ b/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2ControllerTest.java @@ -0,0 +1,201 @@ +/* + * Copyright (c) 2022 Amadeus + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Amadeus - initial API and implementation + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - improvements + * + */ + +package org.eclipse.edc.connector.dataplane.api.controller; + +import io.restassured.specification.RequestSpecification; +import jakarta.ws.rs.core.Response; +import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource; +import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService; +import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult; +import org.eclipse.edc.connector.dataplane.spi.resolver.DataAddressResolver; +import org.eclipse.edc.connector.dataplane.util.sink.AsyncStreamingDataSink; +import org.eclipse.edc.junit.annotations.ApiTest; +import org.eclipse.edc.spi.result.Result; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.eclipse.edc.web.jersey.testfixtures.RestControllerTestBase; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.stream.Stream; + +import static io.restassured.RestAssured.given; +import static io.restassured.http.ContentType.JSON; +import static jakarta.ws.rs.core.HttpHeaders.AUTHORIZATION; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.failedFuture; +import static org.assertj.core.api.Assertions.assertThat; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ApiTest +class DataPlanePublicApiV2ControllerTest extends RestControllerTestBase { + + private final PipelineService pipelineService = mock(); + private final DataAddressResolver dataAddressResolver = mock(); + private final DataPlaneAuthorizationService authorizationService = mock(); + + @BeforeEach + void setup() { + when(authorizationService.authorize(anyString(), anyMap())) + .thenReturn(Result.success(testDestAddress())); + } + + @Test + void should_returnBadRequest_if_missingAuthorizationHeader() { + baseRequest() + .post("/any") + .then() + .statusCode(Response.Status.UNAUTHORIZED.getStatusCode()) + .body("errors[0]", is("Missing Authorization Header")); + } + + @Test + void shouldNotReturn302_whenUrlWithoutTrailingSlash() { + baseRequest() + .post("") + .then() + .statusCode(not(302)); + } + + @Test + void should_returnForbidden_if_tokenValidationFails() { + var token = UUID.randomUUID().toString(); + when(authorizationService.authorize(anyString(), anyMap())).thenReturn(Result.failure("token is not valid")); + + baseRequest() + .header(AUTHORIZATION, token) + .post("/any") + .then() + .statusCode(Response.Status.FORBIDDEN.getStatusCode()) + .contentType(JSON) + .body("errors.size()", is(1)); + + verify(authorizationService).authorize(eq(token), anyMap()); + } + + @Test + void should_returnInternalServerError_if_transferFails() { + var token = UUID.randomUUID().toString(); + var errorMsg = UUID.randomUUID().toString(); + when(dataAddressResolver.resolve(any())).thenReturn(Result.success(testDestAddress())); + when(pipelineService.transfer(any(), any())) + .thenReturn(completedFuture(StreamResult.error(errorMsg))); + + baseRequest() + .header(AUTHORIZATION, token) + .when() + .post("/any") + .then() + .statusCode(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()) + .contentType(JSON) + .body("errors[0]", is(errorMsg)); + } + + @Test + void should_returnInternalServerError_if_transferThrows() { + var token = UUID.randomUUID().toString(); + var errorMsg = UUID.randomUUID().toString(); + when(dataAddressResolver.resolve(any())).thenReturn(Result.success(testDestAddress())); + when(pipelineService.transfer(any(DataFlowStartMessage.class), any())) + .thenReturn(failedFuture(new RuntimeException(errorMsg))); + + baseRequest() + .header(AUTHORIZATION, token) + .when() + .post("/any") + .then() + .statusCode(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()) + .contentType(JSON) + .body("errors[0]", is("Unhandled exception occurred during data transfer: " + errorMsg)); + } + + @Test + void shouldStreamSourceToResponse() { + when(dataAddressResolver.resolve(any())).thenReturn(Result.success(testDestAddress())); + when(pipelineService.transfer(any(), any())).thenAnswer(i -> { + ((AsyncStreamingDataSink) i.getArgument(1)).transfer(new TestDataSource("application/something", "data")); + return CompletableFuture.completedFuture(StreamResult.success()); + }); + + var responseBody = baseRequest() + .header(AUTHORIZATION, UUID.randomUUID().toString()) + .when() + .post("/any?foo=bar") + .then() + .log().ifError() + .statusCode(Response.Status.OK.getStatusCode()) + .contentType("application/something") + .extract().body().asString(); + + assertThat(responseBody).isEqualTo("data"); + var requestCaptor = ArgumentCaptor.forClass(DataFlowStartMessage.class); + verify(pipelineService).transfer(requestCaptor.capture(), any()); + var request = requestCaptor.getValue(); + assertThat(request.getDestinationDataAddress().getType()).isEqualTo(AsyncStreamingDataSink.TYPE); + assertThat(request.getSourceDataAddress().getType()).isEqualTo("test"); + assertThat(request.getProperties()).containsEntry("method", "POST").containsEntry("pathSegments", "v2/any").containsEntry("queryParams", "foo=bar"); + } + + @Override + protected Object controller() { + return new DataPlanePublicApiV2Controller(pipelineService, Executors.newSingleThreadExecutor(), authorizationService); + } + + private RequestSpecification baseRequest() { + return given() + .baseUri("http://localhost:" + port + "/v2") + .when(); + } + + private DataAddress testDestAddress() { + return DataAddress.Builder.newInstance().type("test").build(); + } + + private record TestDataSource(String mediaType, String data) implements DataSource, DataSource.Part { + + @Override + public StreamResult> openPartStream() { + return StreamResult.success(Stream.of(this)); + } + + @Override + public String name() { + return "test"; + } + + @Override + public InputStream openStream() { + return new ByteArrayInputStream(data.getBytes()); + } + + } + +} diff --git a/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java b/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java index 75973c5a8df..8265c6645b6 100644 --- a/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java +++ b/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java @@ -93,7 +93,11 @@ public void initialize(ServiceExtensionContext context) { Executors.newFixedThreadPool(DEFAULT_THREAD_POOL), "Data plane proxy transfers" ); - var publicApiController = new DataPlanePublicApiController(pipelineService, dataAddressResolver, executorService); + var monitor = context.getMonitor().withPrefix("DataPlane Public API"); + var publicApiController = new DataPlanePublicApiController(pipelineService, dataAddressResolver, executorService, monitor); webService.registerResource(configuration.getContextAlias(), publicApiController); + + monitor.warning("This public API controller is scheduled for removal. Please consider upgrading your deployment " + + "to the data-plane-public-api-v2 module. The Data Plane Public API will then be available under at /v2/ prefix."); } } diff --git a/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApi.java b/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApi.java index 0f40b8a733f..a90f74be0f2 100644 --- a/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApi.java +++ b/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApi.java @@ -27,7 +27,6 @@ "data from the provider data source (e.g. backend Rest API, internal database...) through its Data Plane" + "instance. Thus the Data Plane is the only entry/output door for the data, which avoids the provider to expose" + "directly its data externally." + - "" + "The Data Plane public API being a proxy, it supports all verbs (i.e. GET, POST, PUT, PATCH, DELETE), which" + "can then conveyed until the data source is required. This is especially useful when the actual data source" + "is a Rest API itself." + @@ -40,8 +39,10 @@ public interface DataPlanePublicApi { @ApiResponse(responseCode = "400", description = "Missing access token"), @ApiResponse(responseCode = "403", description = "Access token is expired or invalid"), @ApiResponse(responseCode = "500", description = "Failed to transfer data") - } + }, + deprecated = true ) + @Deprecated(since = "0.5.1") void get(ContainerRequestContext context, AsyncResponse response); @Operation(description = "Send `POST` data query to the Data Plane.", @@ -49,8 +50,10 @@ public interface DataPlanePublicApi { @ApiResponse(responseCode = "400", description = "Missing access token"), @ApiResponse(responseCode = "403", description = "Access token is expired or invalid"), @ApiResponse(responseCode = "500", description = "Failed to transfer data") - } + }, + deprecated = true ) + @Deprecated(since = "0.5.1") void post(ContainerRequestContext context, AsyncResponse response); @Operation(description = "Send `PUT` data query to the Data Plane.", @@ -58,8 +61,10 @@ public interface DataPlanePublicApi { @ApiResponse(responseCode = "400", description = "Missing access token"), @ApiResponse(responseCode = "403", description = "Access token is expired or invalid"), @ApiResponse(responseCode = "500", description = "Failed to transfer data") - } + }, + deprecated = true ) + @Deprecated(since = "0.5.1") void put(ContainerRequestContext context, AsyncResponse response); @Operation(description = "Send `DELETE` data query to the Data Plane.", @@ -76,7 +81,9 @@ public interface DataPlanePublicApi { @ApiResponse(responseCode = "400", description = "Missing access token"), @ApiResponse(responseCode = "403", description = "Access token is expired or invalid"), @ApiResponse(responseCode = "500", description = "Failed to transfer data") - } + }, + deprecated = true ) + @Deprecated(since = "0.5.1") void patch(ContainerRequestContext context, AsyncResponse response); } diff --git a/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiController.java b/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiController.java index be9f033e049..64fe5effe91 100644 --- a/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiController.java +++ b/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiController.java @@ -34,6 +34,7 @@ import org.eclipse.edc.connector.dataplane.spi.resolver.DataAddressResolver; import org.eclipse.edc.connector.dataplane.spi.response.TransferErrorResponse; import org.eclipse.edc.connector.dataplane.util.sink.AsyncStreamingDataSink; +import org.eclipse.edc.spi.monitor.Monitor; import java.util.List; import java.util.concurrent.ExecutorService; @@ -53,15 +54,21 @@ public class DataPlanePublicApiController implements DataPlanePublicApi { private final DataAddressResolver dataAddressResolver; private final DataFlowRequestSupplier requestSupplier; private final ExecutorService executorService; + private final Monitor monitor; public DataPlanePublicApiController(PipelineService pipelineService, DataAddressResolver dataAddressResolver, - ExecutorService executorService) { + ExecutorService executorService, Monitor monitor) { this.pipelineService = pipelineService; this.dataAddressResolver = dataAddressResolver; + this.monitor = monitor; this.requestSupplier = new DataFlowRequestSupplier(); this.executorService = executorService; } + private static Response error(Response.Status status, String error) { + return status(status).type(APPLICATION_JSON).entity(new TransferErrorResponse(List.of(error))).build(); + } + @GET @Override public void get(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { @@ -69,54 +76,57 @@ public void get(@Context ContainerRequestContext requestContext, @Suspended Asyn } /** - * Sends a {@link DELETE} request to the data source and returns data. + * Sends a {@link POST} request to the data source and returns data. * * @param requestContext Request context. * @param response Data fetched from the data source. */ - @DELETE + @POST @Override - public void delete(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { + public void post(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { handle(requestContext, response); } /** - * Sends a {@link PATCH} request to the data source and returns data. + * Sends a {@link PUT} request to the data source and returns data. * * @param requestContext Request context. * @param response Data fetched from the data source. */ - @PATCH + @PUT @Override - public void patch(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { + public void put(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { handle(requestContext, response); } /** - * Sends a {@link PUT} request to the data source and returns data. + * Sends a {@link DELETE} request to the data source and returns data. * * @param requestContext Request context. * @param response Data fetched from the data source. */ - @PUT + @DELETE @Override - public void put(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { + public void delete(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { handle(requestContext, response); } /** - * Sends a {@link POST} request to the data source and returns data. + * Sends a {@link PATCH} request to the data source and returns data. * * @param requestContext Request context. * @param response Data fetched from the data source. */ - @POST + @PATCH @Override - public void post(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { + public void patch(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { handle(requestContext, response); } private void handle(ContainerRequestContext context, AsyncResponse response) { + + monitor.warning("The DataPlane Public API is deprecated. Please consider upgrading to the /v2/ path. Your request will then be: %s" + .formatted(context.getUriInfo().getBaseUri() + "v2/" + context.getUriInfo().getPath())); var contextApi = new ContainerRequestContextApiImpl(context); var token = contextApi.headers().get(HttpHeaders.AUTHORIZATION); if (token == null) { @@ -154,8 +164,4 @@ private void handle(ContainerRequestContext context, AsyncResponse response) { }); } - private static Response error(Response.Status status, String error) { - return status(status).type(APPLICATION_JSON).entity(new TransferErrorResponse(List.of(error))).build(); - } - } diff --git a/extensions/data-plane/data-plane-public-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiControllerTest.java b/extensions/data-plane/data-plane-public-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiControllerTest.java index 279b81b6c0e..dd33ec267da 100644 --- a/extensions/data-plane/data-plane-public-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiControllerTest.java +++ b/extensions/data-plane/data-plane-public-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiControllerTest.java @@ -152,6 +152,11 @@ void shouldStreamSourceToResponse() { assertThat(request.getProperties()).containsEntry("method", "POST").containsEntry("pathSegments", "any").containsEntry("queryParams", "foo=bar"); } + @Override + protected Object controller() { + return new DataPlanePublicApiController(pipelineService, dataAddressResolver, Executors.newSingleThreadExecutor(), mock()); + } + private RequestSpecification baseRequest() { return given() .baseUri("http://localhost:" + port) @@ -162,11 +167,6 @@ private DataAddress testDestAddress() { return DataAddress.Builder.newInstance().type("test").build(); } - @Override - protected Object controller() { - return new DataPlanePublicApiController(pipelineService, dataAddressResolver, Executors.newSingleThreadExecutor()); - } - private record TestDataSource(String mediaType, String data) implements DataSource, DataSource.Part { @Override diff --git a/extensions/data-plane/store/sql/accesstokendata-store-sql/docs/schema.sql b/extensions/data-plane/store/sql/accesstokendata-store-sql/docs/schema.sql index 707a0f42082..de43bc74ce1 100644 --- a/extensions/data-plane/store/sql/accesstokendata-store-sql/docs/schema.sql +++ b/extensions/data-plane/store/sql/accesstokendata-store-sql/docs/schema.sql @@ -17,9 +17,11 @@ CREATE TABLE IF NOT EXISTS edc_accesstokendata ( id VARCHAR NOT NULL PRIMARY KEY, - claim_token json NOT NULL, - data_address JSON NOT NULL + claim_token JSON NOT NULL, + data_address JSON NOT NULL, + additional_properties JSON DEFAULT '{}' ); COMMENT ON COLUMN edc_accesstokendata.claim_token IS 'ClaimToken serialized as JSON map'; COMMENT ON COLUMN edc_accesstokendata.data_address IS 'DataAddress serialized as JSON map'; +COMMENT ON COLUMN edc_accesstokendata.additional_properties IS 'Optional Additional properties serialized as JSON map'; diff --git a/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlAccessTokenDataStore.java b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlAccessTokenDataStore.java index 030d348b340..25cefcd0a8e 100644 --- a/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlAccessTokenDataStore.java +++ b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlAccessTokenDataStore.java @@ -14,6 +14,7 @@ package org.eclipse.edc.connector.dataplane.store.sql; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.eclipse.edc.connector.dataplane.spi.AccessTokenData; import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataStore; @@ -34,6 +35,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.Collection; +import java.util.Map; import static org.eclipse.edc.spi.query.Criterion.criterion; @@ -42,6 +44,8 @@ */ public class SqlAccessTokenDataStore extends AbstractSqlStore implements AccessTokenDataStore { + private static final TypeReference> MAP_TYPE_REF = new TypeReference<>() { + }; private final AccessTokenDataStatements statements; public SqlAccessTokenDataStore(DataSourceRegistry dataSourceRegistry, @@ -113,7 +117,8 @@ private void insert(Connection connection, AccessTokenData dataFlow) { queryExecutor.execute(connection, sql, dataFlow.id(), toJson(dataFlow.claimToken()), - toJson(dataFlow.dataAddress()) + toJson(dataFlow.dataAddress()), + toJson(dataFlow.additionalProperties()) ); } @@ -121,9 +126,10 @@ private void insert(Connection connection, AccessTokenData dataFlow) { private AccessTokenData mapAccessTokenData(ResultSet resultSet) throws SQLException { var claimToken = fromJson(resultSet.getString(statements.getClaimTokenColumn()), ClaimToken.class); var dataAddress = fromJson(resultSet.getString(statements.getDataAddressColumn()), DataAddress.class); + var additionalProperties = fromJson(resultSet.getString(statements.getAdditionalPropertiesColumn()), MAP_TYPE_REF); var id = resultSet.getString(statements.getIdColumn()); - return new AccessTokenData(id, claimToken, dataAddress); + return new AccessTokenData(id, claimToken, dataAddress, additionalProperties); } private @Nullable AccessTokenData findByIdInternal(Connection conn, String id) { diff --git a/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/AccessTokenDataStatements.java b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/AccessTokenDataStatements.java index 2f7bfc6f2e6..ed7a724e71f 100644 --- a/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/AccessTokenDataStatements.java +++ b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/AccessTokenDataStatements.java @@ -39,6 +39,10 @@ default String getDataAddressColumn() { return "data_address"; } + default String getAdditionalPropertiesColumn() { + return "additional_properties"; + } + String getInsertTemplate(); String getSelectTemplate(); diff --git a/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlAccessTokenStatements.java b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlAccessTokenStatements.java index 88139d33799..46a647c24e8 100644 --- a/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlAccessTokenStatements.java +++ b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlAccessTokenStatements.java @@ -33,6 +33,7 @@ public String getInsertTemplate() { .column(getIdColumn()) .jsonColumn(getClaimTokenColumn()) .jsonColumn(getDataAddressColumn()) + .jsonColumn(getAdditionalPropertiesColumn()) .insertInto(getTableName()); } diff --git a/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/AccessTokenDataMapping.java b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/AccessTokenDataMapping.java index 3741953dce7..ef6ddef26c0 100644 --- a/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/AccessTokenDataMapping.java +++ b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/AccessTokenDataMapping.java @@ -29,5 +29,6 @@ public AccessTokenDataMapping(AccessTokenDataStatements statements) { add("id", statements.getIdColumn()); add("claimToken", new JsonFieldTranslator(statements.getClaimTokenColumn())); add("dataAddress", new JsonFieldTranslator(statements.getDataAddressColumn())); + add("additionalProperties", new JsonFieldTranslator(statements.getAdditionalPropertiesColumn())); } } diff --git a/settings.gradle.kts b/settings.gradle.kts index d041677c193..b12dea2c7df 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -175,6 +175,7 @@ include(":extensions:data-plane:data-plane-signaling:data-plane-signaling-api-co include(":extensions:data-plane:data-plane-signaling:data-plane-signaling-client") include(":extensions:data-plane:data-plane-signaling:data-plane-signaling-transform") include(":extensions:data-plane:data-plane-public-api") +include(":extensions:data-plane:data-plane-public-api-v2") include(":extensions:data-plane:data-plane-http") include(":extensions:data-plane:data-plane-http-oauth2") diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/AccessTokenData.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/AccessTokenData.java index c1bc0f8b354..a392308545b 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/AccessTokenData.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/AccessTokenData.java @@ -17,10 +17,22 @@ import org.eclipse.edc.spi.iam.ClaimToken; import org.eclipse.edc.spi.types.domain.DataAddress; +import java.util.Map; + /** * Container object for a {@link ClaimToken} and a {@link DataAddress} that the data plane uses to keep track of currently * all access tokens that are currently valid. + * + * @param id The correlation ID of the EDR, that is authorized for this data resources. The token, that is stored inside the + * EDR must carry this information. For JWTs this would be the "jti" claim. + * @param claimToken The representation of the EDR + * @param dataAddress The data resource (= source address) for which the token is authorized + * @param additionalProperties (optional) a list of additional properties that should be persisted with the AccessTokenData, for example refresh tokens, etc. */ -public record AccessTokenData(String id, ClaimToken claimToken, DataAddress dataAddress) { +public record AccessTokenData(String id, ClaimToken claimToken, DataAddress dataAddress, + Map additionalProperties) { + public AccessTokenData(String id, ClaimToken claimToken, DataAddress dataAddress) { + this(id, claimToken, dataAddress, Map.of()); + } } diff --git a/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/store/AccessTokenDataTestBase.java b/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/store/AccessTokenDataTestBase.java index a5b328486c8..f4514186907 100644 --- a/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/store/AccessTokenDataTestBase.java +++ b/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/store/AccessTokenDataTestBase.java @@ -25,6 +25,7 @@ import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; @@ -161,6 +162,6 @@ protected DataAddress dataAddress() { } protected AccessTokenData accessTokenData(String id) { - return new AccessTokenData(id, ClaimToken.Builder.newInstance().build(), dataAddress()); + return new AccessTokenData(id, ClaimToken.Builder.newInstance().build(), dataAddress(), Map.of("foo", List.of("bar", "baz"))); } } diff --git a/system-tests/e2e-dataplane-tests/runtimes/data-plane/build.gradle.kts b/system-tests/e2e-dataplane-tests/runtimes/data-plane/build.gradle.kts index cdb72787362..25cceb3888c 100644 --- a/system-tests/e2e-dataplane-tests/runtimes/data-plane/build.gradle.kts +++ b/system-tests/e2e-dataplane-tests/runtimes/data-plane/build.gradle.kts @@ -24,7 +24,7 @@ dependencies { implementation(project(":extensions:control-plane:api:control-plane-api-client")) implementation(project(":extensions:data-plane:data-plane-http")) implementation(project(":extensions:data-plane:data-plane-control-api")) - implementation(project(":extensions:data-plane:data-plane-public-api")) + implementation(project(":extensions:data-plane:data-plane-public-api-v2")) implementation(project(":extensions:common:vault:vault-filesystem")) } diff --git a/system-tests/e2e-dataplane-tests/tests/build.gradle.kts b/system-tests/e2e-dataplane-tests/tests/build.gradle.kts index 1d06fdfe57d..6f6d290c63e 100644 --- a/system-tests/e2e-dataplane-tests/tests/build.gradle.kts +++ b/system-tests/e2e-dataplane-tests/tests/build.gradle.kts @@ -35,6 +35,7 @@ dependencies { testImplementation(libs.mockserver.netty) testImplementation(libs.mockserver.client) testImplementation(project(":core:common:transform-core")) // for the transformer registry impl + testImplementation(project(":extensions:common:crypto:crypto-common")) testCompileOnly(project(":system-tests:e2e-dataplane-tests:runtimes:data-plane")) } diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/AbstractDataPlaneTest.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/AbstractDataPlaneTest.java new file mode 100644 index 00000000000..5b72fcd1a17 --- /dev/null +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/AbstractDataPlaneTest.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.test.e2e; + +import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; +import org.eclipse.edc.test.e2e.participant.DataPlaneParticipant; +import org.junit.jupiter.api.extension.RegisterExtension; + +public abstract class AbstractDataPlaneTest { + protected static final DataPlaneParticipant DATAPLANE = DataPlaneParticipant.Builder.newInstance() + .name("provider") + .id("urn:connector:provider") + .build(); + @RegisterExtension + protected static EdcRuntimeExtension runtime = + new EdcRuntimeExtension( + ":system-tests:e2e-dataplane-tests:runtimes:data-plane", + "data-plane", + DATAPLANE.dataPlaneConfiguration() + ); +} diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlanePublicApiEndToEndTest.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlanePublicApiEndToEndTest.java new file mode 100644 index 00000000000..3d61049c356 --- /dev/null +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlanePublicApiEndToEndTest.java @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.test.e2e; + +import com.nimbusds.jose.JOSEException; +import com.nimbusds.jose.JWSAlgorithm; +import com.nimbusds.jose.JWSHeader; +import com.nimbusds.jwt.JWTClaimsSet; +import com.nimbusds.jwt.SignedJWT; +import io.restassured.http.ContentType; +import jakarta.ws.rs.core.HttpHeaders; +import org.eclipse.edc.connector.core.security.keyparsers.PemParser; +import org.eclipse.edc.connector.dataplane.spi.AccessTokenData; +import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataStore; +import org.eclipse.edc.security.token.jwt.CryptoConverter; +import org.eclipse.edc.spi.iam.ClaimToken; +import org.eclipse.edc.spi.security.Vault; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.security.Key; +import java.security.PrivateKey; +import java.util.Date; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; +import static org.mockito.Mockito.mock; + +public class DataPlanePublicApiEndToEndTest extends AbstractDataPlaneTest { + + public static final String PUBLIC_KEY_ALIAS = "public-key"; + public static final String PRIVATE_KEY_ALIAS = "1"; + // this is a data address representing the private backend for an HTTP pull transfer + public static final DataAddress BACKEND_API_DATAADDRESS = DataAddress.Builder.newInstance() + .type("HttpData") + .property(EDC_NAMESPACE + "baseUrl", "https://jsonplaceholder.typicode.com/todos") + .build(); + + @Test + void httpPull_missingToken_expect401() { + DATAPLANE.getDataPlanePublicEndpoint() + .baseRequest() + .contentType(ContentType.JSON) + /*.header(HttpHeaders.AUTHORIZATION, token) missing */ + .body(""" + { + "bar": "baz" + } + """) + .post("/v2/foo") + .then() + .statusCode(401) + .body(Matchers.containsString("Missing Authorization Header")); + } + + @Test + void httpPull_invalidToken_expect403() { + var token = "some-invalid-token"; + DATAPLANE.getDataPlanePublicEndpoint() + .baseRequest() + .contentType(ContentType.JSON) + .header(HttpHeaders.AUTHORIZATION, token) + .body(""" + { + "bar": "baz" + } + """) + .post("/v2/foo") + .then() + .statusCode(403); + } + + @ParameterizedTest(name = "Method = {0}") + @ValueSource(strings = { "GET", "POST", "PUT", "PATCH", "DELETE", "HEAD" }) + void request_expect200(String method) { + var token = createEdr(); + var body = DATAPLANE.getDataPlanePublicEndpoint() + .baseRequest() + .contentType(ContentType.JSON) + .header(HttpHeaders.AUTHORIZATION, token) + .request(method, "/v2/bar/baz") + .then() + .log().ifError() + .statusCode(200) + .extract().body().asString(); + assertThat(body).isNotNull(); + } + + private Key resolvePrivateKey() { + var privateKeyPem = runtime.getService(Vault.class).resolveSecret(PRIVATE_KEY_ALIAS); + return new PemParser(mock()).parse(privateKeyPem).orElseThrow(f -> new RuntimeException(f.getFailureDetail())); + } + + /** + * Creates and stores an EDR in the data plane. The serialized EDR (as serialized JWT) is returned. Token and the {@link AccessTokenData} + * stored in the data plane are correlated via the "jti" claim in the token. + * + * @return The EDR in the form of a serialized JWT. + */ + private String createEdr() { + var tokenId = UUID.randomUUID().toString(); + // create JWT representing the EDR + var jwt = createJwt(tokenId); + + // store the EDR + var accessTokenStore = runtime.getService(AccessTokenDataStore.class); + accessTokenStore.store(new AccessTokenData(tokenId, ClaimToken.Builder.newInstance().build(), BACKEND_API_DATAADDRESS)); + return jwt; + } + + private String createJwt(String tokenId) { + + try { + var jwk = resolvePrivateKey(); + var header = new JWSHeader.Builder(JWSAlgorithm.RS256) + .keyID(PUBLIC_KEY_ALIAS).build(); + var claims = new JWTClaimsSet.Builder() + .issuer("me") + .subject("me") + .issueTime(new Date()) + .jwtID(tokenId) + .build(); + + var jwt = new SignedJWT(header, claims); + jwt.sign(CryptoConverter.createSignerFor((PrivateKey) jwk)); + return jwt.serialize(); + } catch (JOSEException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java index 16339d97afc..5d8584e2706 100644 --- a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java @@ -32,18 +32,16 @@ import org.eclipse.edc.core.transform.TypeTransformerRegistryImpl; import org.eclipse.edc.jsonld.util.JacksonJsonLd; import org.eclipse.edc.junit.annotations.EndToEndTest; -import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; import org.eclipse.edc.spi.result.Failure; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowTerminateMessage; import org.eclipse.edc.spi.types.domain.transfer.FlowType; -import org.eclipse.edc.test.e2e.participant.DataPlaneParticipant; +import org.hamcrest.Matchers; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; import java.net.URI; import java.time.Duration; @@ -59,20 +57,9 @@ import static org.hamcrest.Matchers.notNullValue; @EndToEndTest -public class DataPlaneSignalingApiEndToEndTest { - - public static final String DATAPLANE_PUBLIC_ENDPOINT_URL = "http://fizz.buzz/bar"; - protected static final DataPlaneParticipant DATAPLANE = DataPlaneParticipant.Builder.newInstance() - .name("provider") - .id("urn:connector:provider") - .build(); - @RegisterExtension - static EdcRuntimeExtension runtime = - new EdcRuntimeExtension( - ":system-tests:e2e-dataplane-tests:runtimes:data-plane", - "data-plane", - DATAPLANE.dataPlaneConfiguration() - ); +public class DataPlaneSignalingApiEndToEndTest extends AbstractDataPlaneTest { + + private static final String DATAPLANE_PUBLIC_ENDPOINT_URL = "http://fizz.buzz/bar"; protected final Duration timeout = Duration.ofSeconds(60); private ObjectMapper mapper; @@ -97,9 +84,17 @@ void startTransfer() throws JsonProcessingException { var processId = "test-processId"; var flowMessage = createStartMessage(processId); - var jo = registry.transform(flowMessage, JsonObject.class).orElseThrow(failTest()); + var startMessage = registry.transform(flowMessage, JsonObject.class).orElseThrow(failTest()); - var resultJson = DATAPLANE.initiateTransfer(jo); + var resultJson = DATAPLANE.getDataPlaneSignalingEndpoint() + .baseRequest() + .contentType(ContentType.JSON) + .body(startMessage) + .post("/v1/dataflows") + .then() + .body(Matchers.notNullValue()) + .statusCode(200) + .extract().body().asString(); var dataAddress = registry.transform(mapper.readValue(resultJson, JsonObject.class), DataAddress.class) .orElseThrow(failTest()); @@ -127,7 +122,7 @@ void getState() { .build(); runtime.getService(DataPlaneStore.class).save(flow); - var resultJson = DATAPLANE.getDataPlaneSignalingApi() + var resultJson = DATAPLANE.getDataPlaneSignalingEndpoint() .baseRequest() .contentType(ContentType.JSON) .get("/v1/dataflows/%s/state".formatted(dataFlowId)) @@ -159,7 +154,7 @@ void terminate() { .add(DATA_FLOW_TERMINATE_MESSAGE_REASON, "test-reason") .build(); - DATAPLANE.getDataPlaneSignalingApi() + DATAPLANE.getDataPlaneSignalingEndpoint() .baseRequest() .body(terminateMessage) .contentType(ContentType.JSON) diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/participant/DataPlaneParticipant.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/participant/DataPlaneParticipant.java index 714ce0cda07..749b13c89f2 100644 --- a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/participant/DataPlaneParticipant.java +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/participant/DataPlaneParticipant.java @@ -15,10 +15,7 @@ package org.eclipse.edc.test.e2e.participant; import com.fasterxml.jackson.annotation.JsonCreator; -import io.restassured.http.ContentType; -import jakarta.json.JsonObject; import org.eclipse.edc.test.system.utils.Participant; -import org.hamcrest.Matchers; import org.jetbrains.annotations.NotNull; import java.net.URI; @@ -35,14 +32,17 @@ public class DataPlaneParticipant extends Participant { private final URI dataPlaneControl = URI.create("http://localhost:" + getFreePort() + "/control"); private final URI dataPlanePublic = URI.create("http://localhost:" + getFreePort() + "/public"); private final URI dataPlaneSignaling = URI.create("http://localhost:" + getFreePort() + "/api/signaling"); - private final Endpoint dataPlaneSignalingApi = new Endpoint(dataPlaneSignaling); private DataPlaneParticipant() { super(); } - public Endpoint getDataPlaneSignalingApi() { - return dataPlaneSignalingApi; + public Endpoint getDataPlaneSignalingEndpoint() { + return new Endpoint(dataPlaneSignaling); + } + + public Endpoint getDataPlanePublicEndpoint() { + return new Endpoint(dataPlanePublic); } public Map dataPlaneConfiguration() { @@ -66,20 +66,6 @@ public Map dataPlaneConfiguration() { }; } - /** - * Uses the data plane's control API to initiate a transfer - */ - public String initiateTransfer(JsonObject startMessage) { - return dataPlaneSignalingApi.baseRequest() - .contentType(ContentType.JSON) - .body(startMessage) - .post("/v1/dataflows") - .then() - .body(Matchers.notNullValue()) - .statusCode(200) - .extract().body().asString(); - } - @NotNull private String resourceAbsolutePath(String filename) { return System.getProperty("user.dir") + separator + "build" + separator + "resources" + separator + "test" + separator + filename;