Skip to content

Commit

Permalink
feat: implements EDR cache receiver + E2E tests (eclipse-edc#4004)
Browse files Browse the repository at this point in the history
* feat: implements EDR cache receiver + E2E tests

* pr remarks

* chore: dependencies file
  • Loading branch information
wolf4ood authored Mar 15, 2024
1 parent a8e69fd commit dcac6d3
Show file tree
Hide file tree
Showing 33 changed files with 1,478 additions and 614 deletions.
2 changes: 1 addition & 1 deletion DEPENDENCIES
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ maven/mavencentral/com.apicatalog/iron-ed25519-cryptosuite-2020/0.8.1, Apache-2.
maven/mavencentral/com.apicatalog/iron-verifiable-credentials/0.8.1, Apache-2.0, approved, #9234
maven/mavencentral/com.apicatalog/titanium-json-ld/1.0.0, Apache-2.0, approved, clearlydefined
maven/mavencentral/com.apicatalog/titanium-json-ld/1.3.1, Apache-2.0, approved, #8912
maven/mavencentral/com.apicatalog/titanium-json-ld/1.4.0, , restricted, clearlydefined
maven/mavencentral/com.apicatalog/titanium-json-ld/1.4.0, Apache-2.0, approved, #13683
maven/mavencentral/com.atomikos/atomikos-util/6.0.0, Apache-2.0, approved, #9326
maven/mavencentral/com.atomikos/transactions-api/6.0.0, Apache-2.0, approved, #10351
maven/mavencentral/com.atomikos/transactions-jdbc/6.0.0, Apache-2.0, approved, #9273
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,7 @@ public class EndpointDataReferenceStoreDefaultServicesExtension implements Servi
@Setting(value = "Directory/Path where to store EDRs in the vault for vaults that supports hierarchical structuring.", defaultValue = DEFAULT_EDR_VAULT_PATH)
public static final String EDC_EDR_VAULT_PATH = "edc.edr.vault.path";
protected static final String NAME = "Endpoint Data Reference Core Default Services Extension";
@Inject
private EndpointDataReferenceEntryIndex edrEntryStore;

@Inject
private EndpointDataReferenceCache edrCache;


@Inject
private CriterionOperatorRegistry criterionOperatorRegistry;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.eclipse.edc.spi.query.QueryResolver;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.StoreResult;
import org.jetbrains.annotations.Nullable;

import java.util.List;
import java.util.Map;
Expand All @@ -40,6 +41,11 @@ public InMemoryEndpointDataReferenceEntryIndex(CriterionOperatorRegistry criteri
queryResolver = new ReflectionBasedQueryResolver<>(EndpointDataReferenceEntry.class, criterionOperatorRegistry);
}

@Override
public @Nullable EndpointDataReferenceEntry findById(String transferProcessId) {
return cache.get(transferProcessId);
}

@Override
public StoreResult<List<EndpointDataReferenceEntry>> query(QuerySpec spec) {
return StoreResult.success(queryResolver.query(cache.values().stream(), spec).collect(Collectors.toList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.eclipse.edc.spi.result.StoreResult;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.jetbrains.annotations.Nullable;

import java.util.List;

Expand All @@ -47,6 +48,11 @@ public StoreResult<DataAddress> resolveByTransferProcess(String transferProcessI
return transactionalContext.execute(() -> dataReferenceCache.get(transferProcessId));
}

@Override
public @Nullable EndpointDataReferenceEntry findById(String transferProcessId) {
return transactionalContext.execute(() -> dataReferenceEntryIndex.findById(transferProcessId));
}

@Override
public StoreResult<List<EndpointDataReferenceEntry>> query(QuerySpec querySpec) {
return transactionalContext.execute(() -> dataReferenceEntryIndex.query(querySpec));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
#


org.eclipse.edc.core.edr.EndpointDataReferenceStoreExtension
org.eclipse.edc.core.edr.EndpointDataReferenceStoreExtension
org.eclipse.edc.core.edr.EndpointDataReferenceStoreDefaultServicesExtension
Original file line number Diff line number Diff line change
Expand Up @@ -46,98 +46,87 @@ public TransferProcessEventListener(EventRouter eventRouter, Clock clock) {

@Override
public void initiated(TransferProcess process) {
var event = TransferProcessInitiated.Builder.newInstance()
.transferProcessId(process.getId())
.callbackAddresses(process.getCallbackAddresses())
var event = withBaseProperties(TransferProcessInitiated.Builder.newInstance(), process)
.build();

publish(event);
}

@Override
public void provisioningRequested(TransferProcess process) {
var event = TransferProcessProvisioningRequested.Builder.newInstance()
.transferProcessId(process.getId())
.callbackAddresses(process.getCallbackAddresses())
var event = withBaseProperties(TransferProcessProvisioningRequested.Builder.newInstance(), process)
.build();

publish(event);
}

@Override
public void provisioned(TransferProcess process) {
var event = TransferProcessProvisioned.Builder.newInstance()
.transferProcessId(process.getId())
.callbackAddresses(process.getCallbackAddresses())
var event = withBaseProperties(TransferProcessProvisioned.Builder.newInstance(), process)
.build();

publish(event);
}

@Override
public void requested(TransferProcess process) {
var event = TransferProcessRequested.Builder.newInstance()
.transferProcessId(process.getId())
.callbackAddresses(process.getCallbackAddresses())
var event = withBaseProperties(TransferProcessRequested.Builder.newInstance(), process)
.build();

publish(event);
}

@Override
public void started(TransferProcess process, TransferProcessStartedData additionalData) {
var event = TransferProcessStarted.Builder.newInstance()
.transferProcessId(process.getId())
var event = withBaseProperties(TransferProcessStarted.Builder.newInstance(), process)
.dataAddress(additionalData.getDataAddress())
.callbackAddresses(process.getCallbackAddresses())
.contractId(process.getContractId())
.type(process.getType().name())
.build();

publish(event);
}

@Override
public void completed(TransferProcess process) {
var event = TransferProcessCompleted.Builder.newInstance()
.transferProcessId(process.getId())
.callbackAddresses(process.getCallbackAddresses())
var event = withBaseProperties(TransferProcessCompleted.Builder.newInstance(), process)
.build();

publish(event);
}

@Override
public void terminated(TransferProcess process) {
var event = TransferProcessTerminated.Builder.newInstance()
var event = withBaseProperties(TransferProcessTerminated.Builder.newInstance(), process)
.reason(process.getErrorDetail())
.transferProcessId(process.getId())
.callbackAddresses(process.getCallbackAddresses())
.build();

publish(event);
}

@Override
public void deprovisioningRequested(TransferProcess process) {
var event = TransferProcessDeprovisioningRequested.Builder.newInstance()
.transferProcessId(process.getId())
.callbackAddresses(process.getCallbackAddresses())
var event = withBaseProperties(TransferProcessDeprovisioningRequested.Builder.newInstance(), process)
.build();

publish(event);
}

@Override
public void deprovisioned(TransferProcess process) {
var event = TransferProcessDeprovisioned.Builder.newInstance()
.transferProcessId(process.getId())
.callbackAddresses(process.getCallbackAddresses())
var event = withBaseProperties(TransferProcessDeprovisioned.Builder.newInstance(), process)
.build();

publish(event);
}

private <T extends TransferProcessEvent, B extends TransferProcessEvent.Builder<T, B>> B withBaseProperties(B builder, TransferProcess process) {
return builder.transferProcessId(process.getId())
.contractId(process.getContractId())
.assetId(process.getAssetId())
.type(process.getType().name())
.callbackAddresses(process.getCallbackAddresses());
}

@SuppressWarnings("unchecked")
private void publish(TransferProcessEvent event) {
var envelope = EventEnvelope.Builder.newInstance()
.payload(event)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.UUID;

import static java.util.stream.Collectors.toMap;
import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE;
import static org.eclipse.edc.spi.result.Result.success;

public class DataPlaneAuthorizationServiceImpl implements DataPlaneAuthorizationService {
Expand Down Expand Up @@ -85,10 +86,10 @@ public Result<DataAddress> authorize(String token, Map<String, Object> requestDa
private Result<DataAddress> createDataAddress(TokenRepresentation tokenRepresentation, Endpoint publicEndpoint) {
var address = DataAddress.Builder.newInstance()
.type(publicEndpoint.endpointType())
.property("endpoint", publicEndpoint.endpoint())
.property("endpointType", publicEndpoint.endpointType()) //this is duplicated in the type() field, but will make serialization easier
.property(EDC_NAMESPACE + "endpoint", publicEndpoint.endpoint())
.property(EDC_NAMESPACE + "endpointType", publicEndpoint.endpointType()) //this is duplicated in the type() field, but will make serialization easier
.properties(tokenRepresentation.getAdditional()) // would contain the "authType = bearer" entry
.property("authorization", tokenRepresentation.getToken())
.property(EDC_NAMESPACE + "authorization", tokenRepresentation.getToken())
.build();

return success(address);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import java.util.function.Supplier;
import java.util.stream.Stream;

import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE;

/**
* This implementation of the {@link DataPlaneAccessTokenService} uses a backing storage ({@link AccessTokenDataStore}) to keep a record of all
* tokens it has issued. Tokens are in JWT format.
Expand Down Expand Up @@ -93,7 +95,7 @@ public Result<TokenRepresentation> obtainToken(TokenParameters parameters, DataA
var allDecorators = new ArrayList<>(Stream.concat(claimDecorators, headerDecorators).toList());
var keyIdDecorator = new KeyIdDecorator(publicKeyIdSupplier.get());
allDecorators.add(keyIdDecorator);

// if there is no "jti" header on the token params, we'll assign a random one, and add it back to the decorators
if (id == null) {
monitor.info("No '%s' claim found on TokenParameters. Will generate a random one.".formatted(TOKEN_ID));
Expand All @@ -113,7 +115,7 @@ public Result<TokenRepresentation> obtainToken(TokenParameters parameters, DataA

var storeResult = accessTokenDataStore.store(accessTokenData);
var content = tokenResult.getContent();
content.getAdditional().put("authType", "bearer");
content.getAdditional().put(EDC_NAMESPACE + "authType", "bearer");
return storeResult.succeeded() ? Result.success(content) : Result.failure(storeResult.getFailureMessages());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ void createEndpointDataReference() {
assertThat(result).isSucceeded()
.satisfies(da -> {
assertThat(da.getType()).isEqualTo("https://w3id.org/idsa/v4.1/HTTP");
assertThat(da.getProperties().get("endpoint")).isEqualTo("http://example.com");
assertThat(da.getProperties().get("endpointType")).isEqualTo(da.getType());
assertThat(da.getStringProperty("endpoint")).isEqualTo("http://example.com");
assertThat(da.getStringProperty("endpointType")).isEqualTo(da.getType());
assertThat(da.getStringProperty("authorization")).isEqualTo("footoken");
});

Expand Down Expand Up @@ -105,7 +105,7 @@ void createEndpointDataReference_withAuthType() {
assertThat(result).isSucceeded()
.satisfies(da -> {
assertThat(da.getType()).isEqualTo("https://w3id.org/idsa/v4.1/HTTP");
assertThat(da.getProperties().get("endpoint")).isEqualTo("http://example.com");
assertThat(da.getStringProperty("endpoint")).isEqualTo("http://example.com");
assertThat(da.getStringProperty("authorization")).isEqualTo("footoken");
assertThat(da.getStringProperty("authType")).isEqualTo("bearer");
assertThat(da.getStringProperty("fizz")).isEqualTo("buzz");
Expand Down
28 changes: 28 additions & 0 deletions extensions/control-plane/edr/edr-store-receiver/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

plugins {
`java-library`
}

dependencies {
api(project(":spi:common:core-spi"))
api(project(":spi:control-plane:transfer-spi"))
api(project(":spi:control-plane:control-plane-spi"))
api(project(":spi:control-plane:policy-spi"))
api(project(":spi:common:edr-store-spi"))

testImplementation(project(":core:common:junit"))

}
Loading

0 comments on commit dcac6d3

Please sign in to comment.