Skip to content

Commit

Permalink
feat(EdrCache): add SQL implementation of EDR cache store
Browse files Browse the repository at this point in the history
  • Loading branch information
wolf4ood committed May 23, 2023
1 parent 1072c6c commit 57be5cc
Show file tree
Hide file tree
Showing 33 changed files with 950 additions and 181 deletions.
3 changes: 3 additions & 0 deletions core/edr-cache-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,8 @@ dependencies {
implementation(libs.edc.util)

implementation(project(":spi:edr-cache-spi"))

testImplementation(testFixtures(project(":spi:edr-cache-spi")))

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.edr.core.defaults;

import org.eclipse.edc.spi.query.BaseCriterionToPredicateConverter;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry;

public class EdrCacheEntryPredicateConverter extends BaseCriterionToPredicateConverter<EndpointDataReferenceEntry> {

@Override
protected Object property(String key, Object object) {
if (object instanceof EndpointDataReferenceEntry) {
var entry = (EndpointDataReferenceEntry) object;
switch (key) {
case "assetId":
return entry.getAssetId();
case "agreementId":
return entry.getAgreementId();
default:
return null;
}
}
throw new IllegalArgumentException("Can only handle objects of type " + EndpointDataReferenceEntry.class.getSimpleName() + " but received an " + object.getClass().getSimpleName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package org.eclipse.tractusx.edc.edr.core.defaults;

import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.StoreResult;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.edc.util.concurrency.LockManager;
Expand All @@ -28,6 +30,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import java.util.stream.Stream;

import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
Expand All @@ -40,17 +44,16 @@
public class InMemoryEndpointDataReferenceCache implements EndpointDataReferenceCache {
private final LockManager lockManager;

private final EdrCacheEntryPredicateConverter predicateConverter = new EdrCacheEntryPredicateConverter();

private final Map<String, List<EndpointDataReferenceEntry>> entriesByAssetId;

private final Map<String, List<EndpointDataReferenceEntry>> entriesByAgreementId;

private final Map<String, EndpointDataReferenceEntry> entriesByEdrId;
private final Map<String, EndpointDataReference> edrsByTransferProcessId;

public InMemoryEndpointDataReferenceCache() {
lockManager = new LockManager(new ReentrantReadWriteLock());
entriesByAssetId = new HashMap<>();
entriesByAgreementId = new HashMap<>();
entriesByEdrId = new HashMap<>();
edrsByTransferProcessId = new HashMap<>();
}
Expand All @@ -71,14 +74,8 @@ public List<EndpointDataReference> referencesForAsset(String assetId) {
}

@Override
@NotNull
public List<EndpointDataReferenceEntry> entriesForAsset(String assetId) {
return lockManager.readLock(() -> entriesByAssetId.getOrDefault(assetId, emptyList()));
}

@Override
public @NotNull List<EndpointDataReferenceEntry> entriesForAgreement(String agreementId) {
return lockManager.readLock(() -> entriesByAgreementId.getOrDefault(agreementId, emptyList()));
public Stream<EndpointDataReferenceEntry> queryForEntries(QuerySpec spec) {
return filterBy(spec.getFilterExpression());
}

@Override
Expand All @@ -88,9 +85,6 @@ public void save(EndpointDataReferenceEntry entry, EndpointDataReference edr) {
var list = entriesByAssetId.computeIfAbsent(entry.getAssetId(), k -> new ArrayList<>());
list.add(entry);

var agreementList = entriesByAgreementId.computeIfAbsent(entry.getAgreementId(), k -> new ArrayList<>());
agreementList.add(entry);

edrsByTransferProcessId.put(entry.getTransferProcessId(), edr);
return null;
});
Expand All @@ -103,13 +97,23 @@ public StoreResult<EndpointDataReferenceEntry> deleteByTransferProcessId(String
if (edr == null) {
return notFound("EDR entry not found for id: " + id);
}
var entry = entriesByEdrId.get(edr.getId());
var entry = entriesByEdrId.remove(edr.getId());
var entries = entriesByAssetId.get(entry.getAssetId());
entries.remove(entry);
if (entries.isEmpty()) {
entriesByAssetId.remove(entry.getAssetId());
}

return success(entry);
});
}

private Stream<EndpointDataReferenceEntry> filterBy(List<Criterion> criteria) {
var predicate = criteria.stream()
.map(predicateConverter::convert)
.reduce(x -> true, Predicate::and);

return entriesByEdrId.values().stream()
.filter(predicate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,55 +14,15 @@

package org.eclipse.tractusx.edc.edr.core.defaults;

import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

class InMemoryEndpointDataReferenceCacheTest {
private static final String TRANSFER_PROCESS_ID = "tp1";
private static final String ASSET_ID = "asset1";
private static final String AGREEMENT_ID = "agreement1";

private static final String EDR_ID = "edr1";
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCache;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCacheBaseTest;

class InMemoryEndpointDataReferenceCacheTest extends EndpointDataReferenceCacheBaseTest {
private final InMemoryEndpointDataReferenceCache cache = new InMemoryEndpointDataReferenceCache();

@Test
@SuppressWarnings("DataFlowIssue")
void verify_operations() {
var edr = EndpointDataReference.Builder.newInstance()
.endpoint("http://test.com")
.id(EDR_ID)
.authCode("11111")
.authKey("authentication").build();

var entry = EndpointDataReferenceEntry.Builder.newInstance()
.assetId(ASSET_ID)
.agreementId(AGREEMENT_ID)
.transferProcessId(TRANSFER_PROCESS_ID)
.build();

cache.save(entry, edr);

assertThat(cache.resolveReference(TRANSFER_PROCESS_ID).getId()).isEqualTo(EDR_ID);

var edrs = cache.referencesForAsset(ASSET_ID);
assertThat(edrs.size()).isEqualTo(1);
assertThat(edrs.get((0)).getId()).isEqualTo(EDR_ID);

var entries = cache.entriesForAsset(ASSET_ID);
assertThat(entries.size()).isEqualTo(1);
assertThat(entries.get((0)).getAssetId()).isEqualTo(ASSET_ID);

entries = cache.entriesForAgreement(AGREEMENT_ID);
assertThat(entries.size()).isEqualTo(1);
assertThat(entries.get((0)).getAgreementId()).isEqualTo(AGREEMENT_ID);

assertThat(cache.deleteByTransferProcessId(TRANSFER_PROCESS_ID).succeeded()).isTrue();

assertThat(cache.entriesForAsset(ASSET_ID)).isEmpty();
assertThat(cache.resolveReference(TRANSFER_PROCESS_ID)).isNull();
@Override
protected EndpointDataReferenceCache getStore() {
return cache;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
runtimeOnly(project(":edc-controlplane:edc-controlplane-base"))
runtimeOnly(project(":edc-extensions:postgresql-migration"))
runtimeOnly(project(":edc-extensions:hashicorp-vault"))
runtimeOnly(project(":edc-extensions:edr-cache-sql"))
runtimeOnly(libs.bundles.edc.sqlstores)
runtimeOnly(libs.edc.transaction.local)
runtimeOnly(libs.edc.sql.pool)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,18 @@
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequestData;
import org.eclipse.edc.connector.spi.contractnegotiation.ContractNegotiationService;
import org.eclipse.edc.service.spi.result.ServiceResult;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.types.domain.callback.CallbackAddress;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCache;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry;
import org.eclipse.tractusx.edc.spi.cp.adapter.model.NegotiateEdrRequest;
import org.eclipse.tractusx.edc.spi.cp.adapter.service.AdapterTransferProcessService;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -88,26 +87,27 @@ public ServiceResult<EndpointDataReference> findByTransferProcessId(String trans

@Override
public ServiceResult<List<EndpointDataReferenceEntry>> findByAssetAndAgreement(String assetId, String agreementId) {
var results = queryEdrs(assetId, agreementId)
.stream()
.filter(fieldFilter(assetId, EndpointDataReferenceEntry::getAssetId))
.filter(fieldFilter(agreementId, EndpointDataReferenceEntry::getAgreementId))
.collect(Collectors.toList());
var results = queryEdrs(assetId, agreementId).collect(Collectors.toList());
return success(results);
}

private Predicate<EndpointDataReferenceEntry> fieldFilter(String value, Function<EndpointDataReferenceEntry, String> function) {
return entry -> Optional.ofNullable(value)
.map(val -> val.equals(function.apply(entry)))
.orElse(true);
private Stream<EndpointDataReferenceEntry> queryEdrs(String assetId, String agreementId) {
var queryBuilder = QuerySpec.Builder.newInstance();
if (assetId != null) {
queryBuilder.filter(fieldFilter("assetId", assetId));
}
if (agreementId != null) {
queryBuilder.filter(fieldFilter("agreementId", agreementId));
}
return endpointDataReferenceCache.queryForEntries(queryBuilder.build());
}

private List<EndpointDataReferenceEntry> queryEdrs(String assetId, String agreementId) {
// Try first for agreementId and then assetId
return Optional.ofNullable(agreementId)
.map(endpointDataReferenceCache::entriesForAgreement)
.or(() -> Optional.ofNullable(assetId).map(endpointDataReferenceCache::entriesForAsset))
.orElseGet(Collections::emptyList);
}

private Criterion fieldFilter(String field, String value) {
return Criterion.Builder.newInstance()
.operandLeft(field)
.operator("=")
.operandRight(value)
.build();
}
}
32 changes: 32 additions & 0 deletions edc-extensions/edr-cache-sql/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

plugins {
`java-library`
`maven-publish`
}

dependencies {
implementation(project(":spi:edr-cache-spi"))

implementation(libs.edc.spi.core)
implementation(libs.edc.core.sql)
implementation(libs.edc.spi.transactionspi)
implementation(libs.edc.spi.transaction.datasource)

testImplementation(testFixtures(project(":spi:edr-cache-spi")))
testImplementation(testFixtures(libs.edc.core.sql))
testImplementation(testFixtures(libs.edc.junit))

}
22 changes: 22 additions & 0 deletions edc-extensions/edr-cache-sql/docs/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
--
-- Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
--
-- This program and the accompanying materials are made available under the
-- terms of the Apache License, Version 2.0 which is available at
-- https://www.apache.org/licenses/LICENSE-2.0
--
-- SPDX-License-Identifier: Apache-2.0
--
-- Contributors:
-- Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
--

CREATE TABLE IF NOT EXISTS edc_edr_cache
(
transfer_process_id VARCHAR NOT NULL PRIMARY KEY,
agreement_id VARCHAR NOT NULL,
asset_id VARCHAR NOT NULL,
edr_id VARCHAR NOT NULL,
created_at BIGINT NOT NULL,
updated_at BIGINT NOT NULL
);
Loading

0 comments on commit 57be5cc

Please sign in to comment.