Skip to content

Commit

Permalink
feat: introduces dspace prefix in signaling client (eclipse-edc#4468)
Browse files Browse the repository at this point in the history
  • Loading branch information
wolf4ood authored Sep 11, 2024
1 parent add6e25 commit 89946ff
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,23 @@ public class DataPlaneSignalingClient implements DataPlaneClient {
private final TypeTransformerRegistry transformerRegistry;
private final JsonLd jsonLd;

private final String jsonLdScope;
private final ObjectMapper mapper;

public DataPlaneSignalingClient(ControlApiHttpClient httpClient, TypeTransformerRegistry transformerRegistry, JsonLd jsonLd,
public DataPlaneSignalingClient(ControlApiHttpClient httpClient, TypeTransformerRegistry transformerRegistry, JsonLd jsonLd, String jsonLdScope,
ObjectMapper mapper, DataPlaneInstance dataPlane) {
this.httpClient = httpClient;
this.transformerRegistry = transformerRegistry;
this.jsonLd = jsonLd;
this.jsonLdScope = jsonLdScope;
this.mapper = mapper;
this.dataPlane = dataPlane;
}

private static <T> @NotNull StatusResult<T> failedResult(String processId, ServiceFailure failure) {
return StatusResult.failure(FATAL_ERROR, format("Transfer request for process %s failed: %s", processId, failure.getFailureDetail()));
}

@WithSpan
@Override
public StatusResult<DataFlowResponseMessage> start(DataFlowStartMessage message) {
Expand Down Expand Up @@ -102,13 +108,9 @@ public StatusResult<Void> checkAvailability() {
.orElse(failure -> failedResult(null, failure)));
}

private static <T> @NotNull StatusResult<T> failedResult(String processId, ServiceFailure failure) {
return StatusResult.failure(FATAL_ERROR, format("Transfer request for process %s failed: %s", processId, failure.getFailureDetail()));
}

private StatusResult<Request.Builder> createRequestBuilder(Object message, String url) {
return transformerRegistry.transform(message, JsonObject.class)
.compose(jsonLd::compact)
.compose(this::compact)
.compose(this::serializeMessage)
.map(rawBody -> RequestBody.create(rawBody, TYPE_JSON))
.map(body -> new Request.Builder().post(body).url(url))
Expand Down Expand Up @@ -143,6 +145,10 @@ private StatusResult<DataFlowResponseMessage> deserializeStartMessage(String res
}
}

private Result<JsonObject> compact(JsonObject object) {
return jsonLd.compact(object, jsonLdScope);
}

private Result<String> serializeMessage(Object message) {
try {
return Result.success(mapper.writeValueAsString(message));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@

import java.util.Objects;

import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.VOCAB;
import static org.eclipse.edc.jsonld.spi.Namespaces.DSPACE_PREFIX;
import static org.eclipse.edc.jsonld.spi.Namespaces.DSPACE_SCHEMA;
import static org.eclipse.edc.policy.model.OdrlNamespace.ODRL_PREFIX;
import static org.eclipse.edc.policy.model.OdrlNamespace.ODRL_SCHEMA;
import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE;
import static org.eclipse.edc.spi.constants.CoreConstants.JSON_LD;

/**
Expand All @@ -37,6 +43,7 @@
@Extension(value = DataPlaneSignalingClientExtension.NAME)
public class DataPlaneSignalingClientExtension implements ServiceExtension {
public static final String NAME = "Data Plane Signaling Client";
public static final String CONTROL_CLIENT_SCOPE = "CONTROL_CLIENT_SCOPE";

@Inject(required = false)
private ControlApiHttpClient httpClient;
Expand All @@ -62,12 +69,16 @@ public DataPlaneClientFactory dataPlaneClientFactory(ServiceExtensionContext con
context.getMonitor().debug(() -> "Using embedded Data Plane client.");
return instance -> new EmbeddedDataPlaneClient(dataPlaneManager);
}

jsonLd.registerNamespace(ODRL_PREFIX, ODRL_SCHEMA, CONTROL_CLIENT_SCOPE);
jsonLd.registerNamespace(DSPACE_PREFIX, DSPACE_SCHEMA, CONTROL_CLIENT_SCOPE);
jsonLd.registerNamespace(VOCAB, EDC_NAMESPACE);

var mapper = typeManager.getMapper(JSON_LD);
context.getMonitor().debug(() -> "Using remote Data Plane client.");
Objects.requireNonNull(httpClient, "To use remote Data Plane client, a ControlApiHttpClient instance must be registered");
var signalingApiTypeTransformerRegistry = transformerRegistry.forContext("signaling-api");
return instance -> new DataPlaneSignalingClient(httpClient, signalingApiTypeTransformerRegistry, jsonLd, mapper,
return instance -> new DataPlaneSignalingClient(httpClient, signalingApiTypeTransformerRegistry, jsonLd, CONTROL_CLIENT_SCOPE, mapper,
instance);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,39 @@
import org.eclipse.edc.boot.system.injection.ObjectFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.jsonld.spi.JsonLd;
import org.eclipse.edc.junit.extensions.DependencyInjectionExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.edc.connector.dataplane.client.DataPlaneSignalingClientExtension.CONTROL_CLIENT_SCOPE;
import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.VOCAB;
import static org.eclipse.edc.jsonld.spi.Namespaces.DSPACE_PREFIX;
import static org.eclipse.edc.jsonld.spi.Namespaces.DSPACE_SCHEMA;
import static org.eclipse.edc.policy.model.OdrlNamespace.ODRL_PREFIX;
import static org.eclipse.edc.policy.model.OdrlNamespace.ODRL_SCHEMA;
import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

@ExtendWith(DependencyInjectionExtension.class)
class DataPlaneSignalingClientExtensionTest {

@Test
void verifyDataPlaneClientFactory(ServiceExtensionContext context, ObjectFactory factory) {
var jsonLd = mock(JsonLd.class);
context.registerService(DataPlaneManager.class, null);
context.registerService(JsonLd.class, jsonLd);
var extension = factory.constructInstance(DataPlaneSignalingClientExtension.class);

var client = extension.dataPlaneClientFactory(context).createClient(createDataPlaneInstance());

assertThat(client).isInstanceOf(DataPlaneSignalingClient.class);
verify(jsonLd).registerNamespace(ODRL_PREFIX, ODRL_SCHEMA, CONTROL_CLIENT_SCOPE);
verify(jsonLd).registerNamespace(DSPACE_PREFIX, DSPACE_SCHEMA, CONTROL_CLIENT_SCOPE);
verify(jsonLd).registerNamespace(VOCAB, EDC_NAMESPACE);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,15 @@
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.edc.connector.dataplane.client.DataPlaneSignalingClientExtension.CONTROL_CLIENT_SCOPE;
import static org.eclipse.edc.http.client.testfixtures.HttpTestUtils.testHttpClient;
import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.VOCAB;
import static org.eclipse.edc.jsonld.spi.Namespaces.DSPACE_PREFIX;
import static org.eclipse.edc.jsonld.spi.Namespaces.DSPACE_SCHEMA;
import static org.eclipse.edc.jsonld.util.JacksonJsonLd.createObjectMapper;
import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat;
import static org.eclipse.edc.policy.model.OdrlNamespace.ODRL_PREFIX;
import static org.eclipse.edc.policy.model.OdrlNamespace.ODRL_SCHEMA;
import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE;
import static org.eclipse.edc.util.io.Ports.getFreePort;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -94,13 +99,17 @@ class DataPlaneSignalingClientTest {
private final ControlApiHttpClient httpClient = new ControlApiHttpClientImpl(testHttpClient(), mock());

private final DataPlaneClient dataPlaneClient = new DataPlaneSignalingClient(httpClient, TRANSFORMER_REGISTRY,
JSON_LD, MAPPER, instance);
JSON_LD, CONTROL_CLIENT_SCOPE, MAPPER, instance);

@BeforeAll
public static void setUp() {
var factory = Json.createBuilderFactory(Map.of());

JSON_LD.registerNamespace(VOCAB, EDC_NAMESPACE);
JSON_LD.registerNamespace(VOCAB, EDC_NAMESPACE, CONTROL_CLIENT_SCOPE);
JSON_LD.registerNamespace(ODRL_PREFIX, ODRL_SCHEMA, CONTROL_CLIENT_SCOPE);
JSON_LD.registerNamespace(DSPACE_PREFIX, DSPACE_SCHEMA, CONTROL_CLIENT_SCOPE);

dataPlane = startClientAndServer(DATA_PLANE_API_PORT);
TRANSFORMER_REGISTRY.register(new JsonObjectFromDataFlowTerminateMessageTransformer(factory));
TRANSFORMER_REGISTRY.register(new JsonObjectFromDataFlowSuspendMessageTransformer(factory));
Expand All @@ -116,6 +125,10 @@ public static void tearDown() {
stopQuietly(dataPlane);
}

private static Result<JsonObject> compact(JsonObject input) {
return JSON_LD.compact(input, CONTROL_CLIENT_SCOPE);
}

@AfterEach
public void resetMockServer() {
dataPlane.reset();
Expand All @@ -129,7 +142,7 @@ void verifyReturnFatalErrorIfReceiveResponseWithNullBody() throws JsonProcessing
var flowRequest = createDataFlowRequest();

var expected = TRANSFORMER_REGISTRY.transform(flowRequest, JsonObject.class)
.compose(JSON_LD::compact)
.compose(DataPlaneSignalingClientTest::compact)
.orElseThrow((e) -> new EdcException(e.getFailureDetail()));

var httpRequest = new HttpRequest().withPath(DATA_PLANE_PATH).withBody(MAPPER.writeValueAsString(expected));
Expand All @@ -150,7 +163,7 @@ void verifyReturnFatalErrorIfReceiveErrorInResponse() throws JsonProcessingExcep
var flowRequest = createDataFlowRequest();

var expected = TRANSFORMER_REGISTRY.transform(flowRequest, JsonObject.class)
.compose(JSON_LD::compact)
.compose(DataPlaneSignalingClientTest::compact)
.orElseThrow((e) -> new EdcException(e.getFailureDetail()));

var httpRequest = new HttpRequest().withPath(DATA_PLANE_PATH).withBody(MAPPER.writeValueAsString(expected));
Expand All @@ -170,7 +183,7 @@ void verifyReturnFatalErrorIfReceiveErrorInResponse() throws JsonProcessingExcep
void verifyReturnFatalErrorIfTransformFails() {
var flowRequest = createDataFlowRequest();
TypeTransformerRegistry registry = mock();
var dataPlaneClient = new DataPlaneSignalingClient(httpClient, registry, JSON_LD, MAPPER, instance);
var dataPlaneClient = new DataPlaneSignalingClient(httpClient, registry, JSON_LD, CONTROL_CLIENT_SCOPE, MAPPER, instance);

when(registry.transform(any(), any())).thenReturn(Result.failure("Transform Failure"));

Expand All @@ -188,7 +201,7 @@ void verifyReturnFatalErrorIfTransformFails() {
void verifyReturnFatalError_whenBadResponse() throws JsonProcessingException {
var flowRequest = createDataFlowRequest();
var expected = TRANSFORMER_REGISTRY.transform(flowRequest, JsonObject.class)
.compose(JSON_LD::compact)
.compose(DataPlaneSignalingClientTest::compact)
.orElseThrow((e) -> new EdcException(e.getFailureDetail()));


Expand All @@ -211,7 +224,7 @@ void verifyReturnFatalError_whenBadResponse() throws JsonProcessingException {
void verifyTransferSuccess() throws JsonProcessingException {
var flowRequest = createDataFlowRequest();
var expected = TRANSFORMER_REGISTRY.transform(flowRequest, JsonObject.class)
.compose(JSON_LD::compact)
.compose(DataPlaneSignalingClientTest::compact)
.orElseThrow((e) -> new EdcException(e.getFailureDetail()));

var flowResponse = DataFlowResponseMessage.Builder.newInstance().dataAddress(DataAddress.Builder.newInstance().type("type").build()).build();
Expand All @@ -234,7 +247,7 @@ void verifyTransferSuccess() throws JsonProcessingException {
void verifyTransferSuccess_withoutDataAddress() throws JsonProcessingException {
var flowRequest = createDataFlowRequest();
var expected = TRANSFORMER_REGISTRY.transform(flowRequest, JsonObject.class)
.compose(JSON_LD::compact)
.compose(DataPlaneSignalingClientTest::compact)
.orElseThrow((e) -> new EdcException(e.getFailureDetail()));

var flowResponse = DataFlowResponseMessage.Builder.newInstance().build();
Expand Down Expand Up @@ -303,7 +316,7 @@ void shouldFail_whenConflictResponse() {
@Test
void verifyReturnFatalErrorIfTransformFails() {
TypeTransformerRegistry registry = mock();
var dataPlaneClient = new DataPlaneSignalingClient(httpClient, registry, JSON_LD, MAPPER, instance);
var dataPlaneClient = new DataPlaneSignalingClient(httpClient, registry, JSON_LD, CONTROL_CLIENT_SCOPE, MAPPER, instance);

when(registry.transform(any(), any())).thenReturn(Result.failure("Transform Failure"));

Expand Down Expand Up @@ -346,7 +359,7 @@ void shouldFail_whenConflictResponse() {
@Test
void verifyReturnFatalErrorIfTransformFails() {
TypeTransformerRegistry registry = mock();
var dataPlaneClient = new DataPlaneSignalingClient(httpClient, registry, JSON_LD, MAPPER, instance);
var dataPlaneClient = new DataPlaneSignalingClient(httpClient, registry, JSON_LD, CONTROL_CLIENT_SCOPE, MAPPER, instance);

when(registry.transform(any(), any())).thenReturn(Result.failure("Transform Failure"));

Expand Down

0 comments on commit 89946ff

Please sign in to comment.