Skip to content

Commit

Permalink
refactor(CPA): obliviates the control plane adapter term and module r…
Browse files Browse the repository at this point in the history
…efactor
  • Loading branch information
wolf4ood committed Jul 12, 2023
1 parent 293fbbc commit a720cb9
Show file tree
Hide file tree
Showing 96 changed files with 1,005 additions and 382 deletions.
4 changes: 2 additions & 2 deletions core/edr-cache-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ dependencies {
implementation(libs.edc.config.filesystem)
implementation(libs.edc.util)

implementation(project(":spi:edr-cache-spi"))
implementation(project(":spi:edr-spi"))

testImplementation(testFixtures(project(":spi:edr-cache-spi")))
testImplementation(testFixtures(project(":spi:edr-spi")))

}

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.tractusx.edc.edr.core.defaults.InMemoryEndpointDataReferenceCache;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCache;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;

/**
* Registers default services for the EDR cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,19 @@
package org.eclipse.tractusx.edc.edr.core.defaults;

import org.eclipse.edc.spi.query.BaseCriterionToPredicateConverter;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry;
import org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry;

public class EdrCacheEntryPredicateConverter extends BaseCriterionToPredicateConverter<EndpointDataReferenceEntry> {

@Override
protected Object property(String key, Object object) {
if (object instanceof EndpointDataReferenceEntry) {
var entry = (EndpointDataReferenceEntry) object;
switch (key) {
case "assetId":
return entry.getAssetId();
case "agreementId":
return entry.getAgreementId();
default:
return null;
}
if (object instanceof EndpointDataReferenceEntry entry) {
return switch (key) {
case "assetId" -> entry.getAssetId();
case "agreementId" -> entry.getAgreementId();
case "providerId" -> entry.getProviderId();
default -> null;
};
}
throw new IllegalArgumentException("Can only handle objects of type " + EndpointDataReferenceEntry.class.getSimpleName() + " but received an " + object.getClass().getSimpleName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.eclipse.edc.spi.result.StoreResult;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.edc.util.concurrency.LockManager;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCache;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;
import org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
Expand Down Expand Up @@ -66,12 +67,22 @@ public InMemoryEndpointDataReferenceCache() {

@Override
@NotNull
public List<EndpointDataReference> referencesForAsset(String assetId) {
public List<EndpointDataReference> referencesForAsset(String assetId, String providerId) {
var entries = entriesByAssetId.get(assetId);

Predicate<EndpointDataReferenceEntry> providerIdFilter = (cached) ->
Optional.ofNullable(providerId)
.map(id -> id.equals(cached.getProviderId()))
.orElse(true);

if (entries == null) {
return emptyList();
}
return entries.stream().map(e -> resolveReference(e.getTransferProcessId())).filter(Objects::nonNull).collect(toList());
return entries.stream()
.filter(providerIdFilter)
.map(e -> resolveReference(e.getTransferProcessId()))
.filter(Objects::nonNull)
.collect(toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry;
import org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry;

/**
* A wrapper to persist {@link EndpointDataReferenceEntry}s and {@link EndpointDataReference}s.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

package org.eclipse.tractusx.edc.edr.core.defaults;

import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCache;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCacheBaseTest;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;

class InMemoryEndpointDataReferenceCacheTest extends EndpointDataReferenceCacheBaseTest {
private final InMemoryEndpointDataReferenceCache cache = new InMemoryEndpointDataReferenceCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry;
import org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry;
import org.junit.jupiter.api.Test;

import static java.util.UUID.randomUUID;
Expand All @@ -39,6 +39,7 @@ void verify_serializeDeserialize() throws JsonProcessingException {
.assetId(randomUUID().toString())
.agreementId(randomUUID().toString())
.transferProcessId(randomUUID().toString())
.providerId(randomUUID().toString())
.build();

var serialized = mapper.writeValueAsString(new PersistentCacheEntry(edrEntry, edr));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,20 @@

plugins {
`java-library`
`maven-publish`
}

dependencies {
implementation(project(":spi:control-plane-adapter-spi"))
implementation(project(":spi:edr-cache-spi"))
implementation(libs.edc.spi.core)
implementation(libs.edc.spi.transfer)
implementation(libs.edc.config.filesystem)
implementation(libs.edc.util)
implementation(libs.edc.spi.aggregateservices)
implementation(libs.edc.spi.contract)
implementation(libs.edc.spi.controlplane)
implementation(libs.edc.spi.aggregateservices)

implementation(project(":spi:edr-spi"))

testImplementation(libs.edc.junit)
testImplementation(testFixtures(project(":spi:edr-spi")))

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.edr.core;

import org.eclipse.edc.connector.spi.contractnegotiation.ContractNegotiationService;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.tractusx.edc.edr.core.service.EdrServiceImpl;
import org.eclipse.tractusx.edc.edr.spi.service.EdrService;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;

/**
* Registers default services for the EDR cache.
*/
@Extension(value = EdrCoreExtension.NAME)
public class EdrCoreExtension implements ServiceExtension {
static final String NAME = "EDR Core";


@Inject
private Monitor monitor;

@Inject
private ContractNegotiationService contractNegotiationService;

@Inject
private EndpointDataReferenceCache endpointDataReferenceCache;

@Override
public String name() {
return NAME;
}


@Provider
public EdrService adapterTransferProcessService() {
return new EdrServiceImpl(contractNegotiationService, endpointDataReferenceCache);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,20 @@
*
*/

package org.eclipse.tractusx.edc.cp.adapter.callback;
package org.eclipse.tractusx.edc.edr.core.service;

import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequest;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequestData;
import org.eclipse.edc.connector.spi.contractnegotiation.ContractNegotiationService;
import org.eclipse.edc.service.spi.result.ServiceResult;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.types.domain.callback.CallbackAddress;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCache;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry;
import org.eclipse.tractusx.edc.spi.cp.adapter.model.NegotiateEdrRequest;
import org.eclipse.tractusx.edc.spi.cp.adapter.service.AdapterTransferProcessService;
import org.eclipse.tractusx.edc.edr.spi.service.EdrService;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;
import org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry;
import org.eclipse.tractusx.edc.edr.spi.types.NegotiateEdrRequest;

import java.util.List;
import java.util.Optional;
Expand All @@ -35,10 +34,8 @@
import java.util.stream.Stream;

import static java.lang.String.format;
import static org.eclipse.edc.service.spi.result.ServiceResult.notFound;
import static org.eclipse.edc.service.spi.result.ServiceResult.success;

public class AdapterTransferProcessServiceImpl implements AdapterTransferProcessService {
public class EdrServiceImpl implements EdrService {

public static final String LOCAL_ADAPTER_URI = "local://adapter";
public static final Set<String> LOCAL_EVENTS = Set.of("contract.negotiation", "transfer.process");
Expand All @@ -51,63 +48,50 @@ public class AdapterTransferProcessServiceImpl implements AdapterTransferProcess

private final EndpointDataReferenceCache endpointDataReferenceCache;

public AdapterTransferProcessServiceImpl(ContractNegotiationService contractNegotiationService, EndpointDataReferenceCache endpointDataReferenceCache) {
public EdrServiceImpl(ContractNegotiationService contractNegotiationService, EndpointDataReferenceCache endpointDataReferenceCache) {
this.contractNegotiationService = contractNegotiationService;
this.endpointDataReferenceCache = endpointDataReferenceCache;
}

@Override
public ServiceResult<ContractNegotiation> initiateEdrNegotiation(NegotiateEdrRequest request) {
var contractNegotiation = contractNegotiationService.initiateNegotiation(createContractRequest(request));
return success(contractNegotiation);
}

private ContractRequest createContractRequest(NegotiateEdrRequest request) {
var callbacks = Stream.concat(request.getCallbackAddresses().stream(), Stream.of(LOCAL_CALLBACK)).collect(Collectors.toList());

var requestData = ContractRequestData.Builder.newInstance()
.contractOffer(request.getOffer())
.protocol(request.getProtocol())
.counterPartyAddress(request.getConnectorAddress())
.connectorId(request.getConnectorId())
.build();

return ContractRequest.Builder.newInstance()
.requestData(requestData)
.callbackAddresses(callbacks).build();
return ServiceResult.success(contractNegotiation);
}

@Override
public ServiceResult<EndpointDataReference> findByTransferProcessId(String transferProcessId) {
var edr = endpointDataReferenceCache.resolveReference(transferProcessId);
return Optional.ofNullable(edr)
.map(ServiceResult::success)
.orElse(notFound(format("No Edr found associated to the transfer process with id: %s", transferProcessId)));
.orElse(ServiceResult.notFound(format("No Edr found associated to the transfer process with id: %s", transferProcessId)));
}

@Override
public ServiceResult<List<EndpointDataReferenceEntry>> findByAssetAndAgreement(String assetId, String agreementId) {
var results = queryEdrs(assetId, agreementId).collect(Collectors.toList());
return success(results);
public ServiceResult<List<EndpointDataReferenceEntry>> findBy(QuerySpec querySpec) {
var results = endpointDataReferenceCache.queryForEntries(querySpec).collect(Collectors.toList());
return ServiceResult.success(results);
}

private Stream<EndpointDataReferenceEntry> queryEdrs(String assetId, String agreementId) {
var queryBuilder = QuerySpec.Builder.newInstance();
if (assetId != null) {
queryBuilder.filter(fieldFilter("assetId", assetId));
}
if (agreementId != null) {
queryBuilder.filter(fieldFilter("agreementId", agreementId));
}
return endpointDataReferenceCache.queryForEntries(queryBuilder.build());
@Override
public ServiceResult<EndpointDataReferenceEntry> deleteByTransferProcessId(String transferProcessId) {
var deleted = endpointDataReferenceCache.deleteByTransferProcessId(transferProcessId);
return ServiceResult.from(deleted);
}

private ContractRequest createContractRequest(NegotiateEdrRequest request) {
var callbacks = Stream.concat(request.getCallbackAddresses().stream(), Stream.of(LOCAL_CALLBACK)).collect(Collectors.toList());

private Criterion fieldFilter(String field, String value) {
return Criterion.Builder.newInstance()
.operandLeft(field)
.operator("=")
.operandRight(value)
var requestData = ContractRequestData.Builder.newInstance()
.contractOffer(request.getOffer())
.protocol(request.getProtocol())
.counterPartyAddress(request.getConnectorAddress())
.connectorId(request.getConnectorId())
.build();

return ContractRequest.Builder.newInstance()
.requestData(requestData)
.callbackAddresses(callbacks).build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
#
#

org.eclipse.tractusx.edc.api.cp.adapter.AdapterApiExtension
org.eclipse.tractusx.edc.edr.core.EdrCoreExtension
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.edr.core.service;

import org.eclipse.edc.connector.spi.contractnegotiation.ContractNegotiationService;
import org.eclipse.edc.junit.extensions.DependencyInjectionExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.system.injection.ObjectFactory;
import org.eclipse.tractusx.edc.edr.core.EdrCoreExtension;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

@ExtendWith(DependencyInjectionExtension.class)
public class EdrCoreExtensionTest {

EdrCoreExtension extension;

@BeforeEach
void setUp(ObjectFactory factory, ServiceExtensionContext context) {
context.registerService(ContractNegotiationService.class, mock(ContractNegotiationService.class));
context.registerService(EndpointDataReferenceCache.class, mock(EndpointDataReferenceCache.class));
extension = factory.constructInstance(EdrCoreExtension.class);
}

@Test
void shouldInitializeTheExtension(ServiceExtensionContext context) {
extension.initialize(context);

var service = extension.adapterTransferProcessService();
assertThat(service).isInstanceOf(EdrServiceImpl.class);

}
}
Loading

0 comments on commit a720cb9

Please sign in to comment.