diff --git a/edc-dataplane/edc-dataplane-base/build.gradle.kts b/edc-dataplane/edc-dataplane-base/build.gradle.kts index 1cf2ae6f1..d64d67d1f 100644 --- a/edc-dataplane/edc-dataplane-base/build.gradle.kts +++ b/edc-dataplane/edc-dataplane-base/build.gradle.kts @@ -30,6 +30,7 @@ dependencies { runtimeOnly(project(":edc-extensions:dataplane:dataplane-token-refresh:token-refresh-core")) runtimeOnly(project(":edc-extensions:dataplane:dataplane-token-refresh:token-refresh-api")) runtimeOnly(project(":edc-extensions:dataplane:dataplane-self-registration")) + runtimeOnly(project(":edc-extensions:dataplane:pipeline-service")) runtimeOnly(libs.edc.jsonld) // needed by the DataPlaneSignalingApi runtimeOnly(libs.edc.core.did) // for the DID Public Key Resolver diff --git a/edc-extensions/dataplane/pipeline-service/build.gradle.kts b/edc-extensions/dataplane/pipeline-service/build.gradle.kts new file mode 100644 index 000000000..94af0602e --- /dev/null +++ b/edc-extensions/dataplane/pipeline-service/build.gradle.kts @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + + +plugins { + `java-library` +} + +dependencies { + implementation(libs.edc.spi.dataplane.dataplane) + + testImplementation(libs.edc.junit) +} diff --git a/edc-extensions/dataplane/pipeline-service/src/main/java/org/eclipse/tractusx/edc/dataplane/pipeline/PipelineServiceExtension.java b/edc-extensions/dataplane/pipeline-service/src/main/java/org/eclipse/tractusx/edc/dataplane/pipeline/PipelineServiceExtension.java new file mode 100644 index 000000000..1ef260172 --- /dev/null +++ b/edc-extensions/dataplane/pipeline-service/src/main/java/org/eclipse/tractusx/edc/dataplane/pipeline/PipelineServiceExtension.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.eclipse.tractusx.edc.dataplane.pipeline; + +import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService; +import org.eclipse.edc.runtime.metamodel.annotation.Extension; +import org.eclipse.edc.runtime.metamodel.annotation.Provider; +import org.eclipse.edc.spi.system.ServiceExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; + +import static org.eclipse.tractusx.edc.dataplane.pipeline.PipelineServiceExtension.NAME; + +@Extension(NAME) +public class PipelineServiceExtension implements ServiceExtension { + + public static final String NAME = "Pipeline Service Override Extension"; + + @Provider + public PipelineService pipelineService(ServiceExtensionContext context) { + return new PipelineServiceImpl(context.getMonitor()); + } +} diff --git a/edc-extensions/dataplane/pipeline-service/src/main/java/org/eclipse/tractusx/edc/dataplane/pipeline/PipelineServiceImpl.java b/edc-extensions/dataplane/pipeline-service/src/main/java/org/eclipse/tractusx/edc/dataplane/pipeline/PipelineServiceImpl.java new file mode 100644 index 000000000..c7457de39 --- /dev/null +++ b/edc-extensions/dataplane/pipeline-service/src/main/java/org/eclipse/tractusx/edc/dataplane/pipeline/PipelineServiceImpl.java @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.eclipse.tractusx.edc.dataplane.pipeline; + +import org.eclipse.edc.connector.dataplane.spi.DataFlow; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSink; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSinkFactory; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSourceFactory; +import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService; +import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult; +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.result.Result; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +import static java.lang.String.format; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.stream.Collectors.toSet; + +/** + * Default pipeline service implementation. + */ +public class PipelineServiceImpl implements PipelineService { + private final List sourceFactories = new ArrayList<>(); + private final List sinkFactories = new ArrayList<>(); + private final Map sources = new ConcurrentHashMap<>(); + private final Monitor monitor; + + public PipelineServiceImpl(Monitor monitor) { + this.monitor = monitor; + } + + @Override + public boolean canHandle(DataFlowStartMessage request) { + return getSourceFactory(request) != null && getSinkFactory(request) != null; + } + + @Override + public Result validate(DataFlowStartMessage request) { + var sourceFactory = getSourceFactory(request); + if (sourceFactory == null) { + // NB: do not include the source type as that can possibly leak internal information + return Result.failure("Data source not supported for: " + request.getId()); + } + + var sourceValidation = sourceFactory.validateRequest(request); + if (sourceValidation.failed()) { + return Result.failure(sourceValidation.getFailureMessages()); + } + + var sinkFactory = getSinkFactory(request); + if (sinkFactory == null) { + // NB: do not include the target type as that can possibly leak internal information + return Result.failure("Data sink not supported for: " + request.getId()); + } + + var sinkValidation = sinkFactory.validateRequest(request); + if (sinkValidation.failed()) { + return Result.failure(sinkValidation.getFailureMessages()); + } + + return Result.success(true); + } + + @Override + public CompletableFuture> transfer(DataFlowStartMessage request) { + var sinkFactory = getSinkFactory(request); + if (sinkFactory == null) { + return noSinkFactory(request); + } + + var sink = sinkFactory.createSink(request); + + return transfer(request, sink); + } + + @Override + public CompletableFuture> transfer(DataFlowStartMessage request, DataSink sink) { + var sourceFactory = getSourceFactory(request); + if (sourceFactory == null) { + return noSourceFactory(request); + } + + var source = sourceFactory.createSource(request); + sources.put(request.getProcessId(), source); + monitor.debug(() -> format("Transferring from %s to %s.", request.getSourceDataAddress().getType(), request.getDestinationDataAddress().getType())); + return sink.transfer(source) + .thenApply(result -> { + terminate(request.getProcessId()); + return result; + }); + } + + @Override + public StreamResult terminate(DataFlow dataFlow) { + return terminate(dataFlow.getId()); + } + + @Override + public void registerFactory(DataSourceFactory factory) { + sourceFactories.add(factory); + } + + @Override + public void registerFactory(DataSinkFactory factory) { + sinkFactories.add(factory); + } + + @Override + public Set supportedSourceTypes() { + return sourceFactories.stream().map(DataSourceFactory::supportedType).collect(toSet()); + } + + @Override + public Set supportedSinkTypes() { + return sinkFactories.stream().map(DataSinkFactory::supportedType).collect(toSet()); + } + + private StreamResult terminate(String dataFlowId) { + var source = sources.remove(dataFlowId); + if (source == null) { + return StreamResult.notFound(); + } else { + try { + source.close(); + return StreamResult.success(); + } catch (Exception e) { + return StreamResult.error("Cannot terminate DataFlow %s: %s".formatted(dataFlowId, e.getMessage())); + } + } + } + + @Nullable + private DataSourceFactory getSourceFactory(DataFlowStartMessage request) { + return sourceFactories.stream() + .filter(s -> Objects.equals(s.supportedType(), request.getSourceDataAddress().getType())) + .findFirst() + .orElse(null); + } + + @Nullable + private DataSinkFactory getSinkFactory(DataFlowStartMessage request) { + return sinkFactories.stream() + .filter(s -> Objects.equals(s.supportedType(), request.getDestinationDataAddress().getType())) + .findFirst() + .orElse(null); + } + + @NotNull + private CompletableFuture> noSourceFactory(DataFlowStartMessage request) { + return completedFuture(StreamResult.error("Unknown data source type: " + request.getSourceDataAddress().getType())); + } + + @NotNull + private CompletableFuture> noSinkFactory(DataFlowStartMessage request) { + return completedFuture(StreamResult.error("Unknown data sink type: " + request.getDestinationDataAddress().getType())); + } + + +} diff --git a/edc-extensions/dataplane/pipeline-service/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension b/edc-extensions/dataplane/pipeline-service/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension new file mode 100644 index 000000000..507bf4743 --- /dev/null +++ b/edc-extensions/dataplane/pipeline-service/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension @@ -0,0 +1,20 @@ +################################################################################# +# Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License, Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0. +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# SPDX-License-Identifier: Apache-2.0 +################################################################################# + +org.eclipse.tractusx.edc.dataplane.pipeline.PipelineServiceExtension diff --git a/edc-extensions/dataplane/pipeline-service/src/test/java/org/eclipse/tractusx/edc/dataplane/pipeline/PipelineServiceImplTest.java b/edc-extensions/dataplane/pipeline-service/src/test/java/org/eclipse/tractusx/edc/dataplane/pipeline/PipelineServiceImplTest.java new file mode 100644 index 000000000..76960d8e2 --- /dev/null +++ b/edc-extensions/dataplane/pipeline-service/src/test/java/org/eclipse/tractusx/edc/dataplane/pipeline/PipelineServiceImplTest.java @@ -0,0 +1,223 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.eclipse.tractusx.edc.dataplane.pipeline; + +import org.eclipse.edc.connector.dataplane.spi.DataFlow; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSink; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSinkFactory; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSourceFactory; +import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure; +import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult; +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; + +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure.Reason.GENERAL_ERROR; +import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure.Reason.NOT_FOUND; +import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; +import static org.junit.jupiter.params.provider.Arguments.arguments; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +class PipelineServiceImplTest { + + private final Monitor monitor = mock(); + + private final DataSourceFactory sourceFactory = mock(); + + private final DataSinkFactory sinkFactory = mock(); + private final DataSource source = mock(); + private final DataSink sink = mock(); + private final PipelineServiceImpl service = new PipelineServiceImpl(monitor); + + @BeforeEach + void setUp() { + service.registerFactory(sourceFactory); + service.registerFactory(sinkFactory); + } + + @Nested + class Transfer { + @Test + void transfer_invokesSink() throws Exception { + when(sourceFactory.supportedType()).thenReturn("source"); + when(sourceFactory.createSource(any())).thenReturn(source); + when(sinkFactory.supportedType()).thenReturn("destination"); + when(sinkFactory.createSink(any())).thenReturn(sink); + when(sink.transfer(any())).thenReturn(completedFuture(StreamResult.success())); + + var future = service.transfer(dataFlow("source", "destination").toRequest()); + + assertThat(future).succeedsWithin(5, TimeUnit.SECONDS).satisfies(result -> { + assertThat(result).isSucceeded(); + }); + verify(sink).transfer(source); + verify(source).close(); + } + + @Test + void transfer_withCustomSink_shouldNotInvokeSinkFactory() throws Exception { + var flowRequest = dataFlow("source", "custom-destination").toRequest(); + + when(sourceFactory.supportedType()).thenReturn("source"); + when(sourceFactory.createSource(any())).thenReturn(source); + + var customSink = new DataSink() { + @Override + public CompletableFuture> transfer(DataSource source) { + return CompletableFuture.completedFuture(StreamResult.success("test-response")); + } + }; + var future = service.transfer(flowRequest, customSink); + + assertThat(future).succeedsWithin(Duration.ofSeconds(5)) + .satisfies(res -> assertThat(res).isSucceeded().satisfies(obj -> assertThat(obj).isEqualTo("test-response"))); + + verify(sourceFactory).createSource(flowRequest); + verifyNoInteractions(sinkFactory); + verify(source).close(); + } + } + + @Nested + class Terminate { + @Test + void shouldCloseDataSource() throws Exception { + var dataFlow = dataFlow("source", "destination"); + when(sourceFactory.supportedType()).thenReturn("source"); + when(sourceFactory.createSource(any())).thenReturn(source); + when(sinkFactory.supportedType()).thenReturn("destination"); + when(sinkFactory.createSink(any())).thenReturn(sink); + when(sink.transfer(any())).thenReturn(new CompletableFuture<>()); + + service.transfer(dataFlow.toRequest()); + + var result = service.terminate(dataFlow); + + assertThat(result).isSucceeded(); + verify(source).close(); + } + + @Test + void shouldFail_whenSourceClosureFails() throws Exception { + var dataFlow = dataFlow("source", "destination"); + when(sourceFactory.supportedType()).thenReturn("source"); + when(sourceFactory.createSource(any())).thenReturn(source); + when(sinkFactory.supportedType()).thenReturn("destination"); + when(sinkFactory.createSink(any())).thenReturn(sink); + when(sink.transfer(any())).thenReturn(new CompletableFuture<>()); + doThrow(IOException.class).when(source).close(); + + service.transfer(dataFlow.toRequest()); + + var result = service.terminate(dataFlow); + + assertThat(result).isFailed().extracting(StreamFailure::getReason).isEqualTo(GENERAL_ERROR); + verify(source).close(); + } + + @Test + void shouldFail_whenTransferDoesNotExist() { + var dataFlow = dataFlow("source", "destination"); + + var result = service.terminate(dataFlow); + + assertThat(result).isFailed().extracting(StreamFailure::getReason).isEqualTo(NOT_FOUND); + verifyNoInteractions(source); + } + } + + @ParameterizedTest + @ArgumentsSource(CanHandleArguments.class) + void canHandle_shouldReturnTrue_whenSourceAndDestinationCanBeHandled(String source, String destination, boolean expected) { + when(sourceFactory.supportedType()).thenReturn("source"); + when(sinkFactory.supportedType()).thenReturn("destination"); + + boolean result = service.canHandle(dataFlow(source, destination).toRequest()); + + assertThat(result).isEqualTo(expected); + } + + @Nested + class SupportedTypes { + + @Test + void shouldReturnSourceTypesFromFactories() { + when(sourceFactory.supportedType()).thenReturn("source"); + + var result = service.supportedSourceTypes(); + + assertThat(result).containsOnly("source"); + verifyNoInteractions(sinkFactory); + } + + @Test + void shouldReturnSinkTypesFromFactories() { + when(sinkFactory.supportedType()).thenReturn("sink"); + + var result = service.supportedSinkTypes(); + + assertThat(result).containsOnly("sink"); + verifyNoInteractions(sourceFactory); + } + + } + + private DataFlow dataFlow(String sourceType, String destinationType) { + return DataFlow.Builder.newInstance() + .id("1") + .source(DataAddress.Builder.newInstance().type(sourceType).build()) + .destination(DataAddress.Builder.newInstance().type(destinationType).build()) + .build(); + } + + private static class CanHandleArguments implements ArgumentsProvider { + + @Override + public Stream provideArguments(ExtensionContext extensionContext) { + return Stream.of( + arguments("source", "destination", true), + arguments("unsupported_source", "destination", false), + arguments("source", "unsupported_destination", false), + arguments("unsupported_source", "unsupported_destination", false) + ); + } + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 08d1da4e6..7888172f9 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -57,6 +57,7 @@ include(":edc-extensions:dataplane:dataplane-selector-configuration") include(":edc-extensions:dataplane:dataplane-self-registration") include(":edc-extensions:dataplane:dataplane-token-refresh:token-refresh-core") include(":edc-extensions:dataplane:dataplane-token-refresh:token-refresh-api") +include(":edc-extensions:dataplane:pipeline-service") // test modules include(":edc-tests:edc-controlplane:edr-api-tests")