From 42b2607163cd467bbaacea50ab39045dbb74bee1 Mon Sep 17 00:00:00 2001 From: Enrico Risa Date: Wed, 10 Jul 2024 16:34:34 +0200 Subject: [PATCH] feat: sql federated catalog cache (#218) --- DEPENDENCIES | 17 +- core/federated-catalog-core/build.gradle.kts | 2 + .../InMemoryFederatedCatalogCacheTest.java | 132 +-------- .../build.gradle.kts | 29 ++ .../docs/schema.sql | 21 ++ .../store/sql/BaseSqlDialectStatements.java | 75 +++++ .../sql/FederatedCatalogCacheStatements.java | 53 ++++ .../store/sql/SqlFederatedCatalogCache.java | 121 ++++++++ .../SqlFederatedCatalogCacheExtension.java | 66 +++++ .../postgres/FederatedCatalogMapping.java | 30 ++ .../postgres/PostgresDialectStatements.java | 53 ++++ .../postgres/PrefixedJsonFieldTranslator.java | 45 +++ ...rg.eclipse.edc.spi.system.ServiceExtension | 15 + ...SqlFederatedCatalogCacheExtensionTest.java | 56 ++++ .../sql/SqlFederatedCatalogCacheTest.java | 59 ++++ gradle/libs.versions.toml | 8 +- settings.gradle.kts | 1 + spi/federated-catalog-spi/build.gradle.kts | 5 + .../FederatedCatalogCacheTestBase.java | 259 ++++++++++++++++++ 19 files changed, 916 insertions(+), 131 deletions(-) create mode 100644 extensions/store/sql/federated-catalog-cache-sql/build.gradle.kts create mode 100644 extensions/store/sql/federated-catalog-cache-sql/docs/schema.sql create mode 100644 extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/BaseSqlDialectStatements.java create mode 100644 extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/FederatedCatalogCacheStatements.java create mode 100644 extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/SqlFederatedCatalogCache.java create mode 100644 extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/SqlFederatedCatalogCacheExtension.java create mode 100644 extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/schema/postgres/FederatedCatalogMapping.java create mode 100644 extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/schema/postgres/PostgresDialectStatements.java create mode 100644 extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/schema/postgres/PrefixedJsonFieldTranslator.java create mode 100644 extensions/store/sql/federated-catalog-cache-sql/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension create mode 100644 extensions/store/sql/federated-catalog-cache-sql/src/test/java/org/eclipse/edc/catalog/store/sql/SqlFederatedCatalogCacheExtensionTest.java create mode 100644 extensions/store/sql/federated-catalog-cache-sql/src/test/java/org/eclipse/edc/catalog/store/sql/SqlFederatedCatalogCacheTest.java create mode 100644 spi/federated-catalog-spi/src/testFixtures/java/org/eclipse/edc/catalog/spi/testfixtures/FederatedCatalogCacheTestBase.java diff --git a/DEPENDENCIES b/DEPENDENCIES index eb20f5cc..c6d3282a 100644 --- a/DEPENDENCIES +++ b/DEPENDENCIES @@ -8,20 +8,16 @@ maven/mavencentral/com.apicatalog/titanium-json-ld/1.4.0, Apache-2.0, approved, maven/mavencentral/com.fasterxml.jackson.core/jackson-annotations/2.10.3, Apache-2.0, approved, CQ21280 maven/mavencentral/com.fasterxml.jackson.core/jackson-annotations/2.16.2, Apache-2.0, approved, #11606 maven/mavencentral/com.fasterxml.jackson.core/jackson-annotations/2.17.0, Apache-2.0, approved, #13672 -maven/mavencentral/com.fasterxml.jackson.core/jackson-annotations/2.17.1, Apache-2.0, approved, #13672 maven/mavencentral/com.fasterxml.jackson.core/jackson-annotations/2.17.2, Apache-2.0, approved, #13672 maven/mavencentral/com.fasterxml.jackson.core/jackson-core/2.16.2, Apache-2.0 AND MIT, approved, #11602 maven/mavencentral/com.fasterxml.jackson.core/jackson-core/2.17.2, , approved, #13665 maven/mavencentral/com.fasterxml.jackson.core/jackson-databind/2.16.2, Apache-2.0, approved, #11605 maven/mavencentral/com.fasterxml.jackson.core/jackson-databind/2.17.0, Apache-2.0, approved, #13671 -maven/mavencentral/com.fasterxml.jackson.core/jackson-databind/2.17.1, Apache-2.0, approved, #13671 maven/mavencentral/com.fasterxml.jackson.core/jackson-databind/2.17.2, Apache-2.0, approved, #13671 maven/mavencentral/com.fasterxml.jackson.dataformat/jackson-dataformat-yaml/2.16.2, Apache-2.0, approved, #11855 maven/mavencentral/com.fasterxml.jackson.dataformat/jackson-dataformat-yaml/2.17.2, Apache-2.0, approved, #13669 -maven/mavencentral/com.fasterxml.jackson.datatype/jackson-datatype-jakarta-jsonp/2.17.1, Apache-2.0, approved, #14161 maven/mavencentral/com.fasterxml.jackson.datatype/jackson-datatype-jakarta-jsonp/2.17.2, Apache-2.0, approved, #14161 maven/mavencentral/com.fasterxml.jackson.datatype/jackson-datatype-jsr310/2.16.2, Apache-2.0, approved, #11853 -maven/mavencentral/com.fasterxml.jackson.datatype/jackson-datatype-jsr310/2.17.1, Apache-2.0, approved, #14160 maven/mavencentral/com.fasterxml.jackson.datatype/jackson-datatype-jsr310/2.17.2, Apache-2.0, approved, #14160 maven/mavencentral/com.fasterxml.jackson.jakarta.rs/jackson-jakarta-rs-base/2.17.2, Apache-2.0, approved, #14194 maven/mavencentral/com.fasterxml.jackson.jakarta.rs/jackson-jakarta-rs-json-provider/2.16.2, Apache-2.0, approved, #11858 @@ -93,9 +89,12 @@ maven/mavencentral/jakarta.xml.bind/jakarta.xml.bind-api/4.0.2, BSD-3-Clause, ap maven/mavencentral/javax.servlet/javax.servlet-api/3.1.0, (CDDL-1.1 OR GPL-2.0-only WITH Classpath-exception-2.0) AND Apache-2.0, approved, CQ7248 maven/mavencentral/javax.ws.rs/javax.ws.rs-api/2.1, (CDDL-1.1 OR GPL-2.0 WITH Classpath-exception-2.0) AND Apache-2.0, approved, CQ18121 maven/mavencentral/junit/junit/4.13.2, EPL-2.0, approved, CQ23636 +maven/mavencentral/net.bytebuddy/byte-buddy-agent/1.14.1, Apache-2.0, approved, #7164 maven/mavencentral/net.bytebuddy/byte-buddy-agent/1.14.15, Apache-2.0, approved, #7164 +maven/mavencentral/net.bytebuddy/byte-buddy/1.14.1, Apache-2.0 AND BSD-3-Clause, approved, #7163 maven/mavencentral/net.bytebuddy/byte-buddy/1.14.15, Apache-2.0 AND BSD-3-Clause, approved, #7163 maven/mavencentral/net.bytebuddy/byte-buddy/1.14.16, Apache-2.0 AND BSD-3-Clause, approved, #7163 +maven/mavencentral/net.bytebuddy/byte-buddy/1.14.18, Apache-2.0 AND BSD-3-Clause, approved, #7163 maven/mavencentral/net.java.dev.jna/jna/5.13.0, Apache-2.0 AND LGPL-2.1-or-later, approved, #15196 maven/mavencentral/net.sf.saxon/Saxon-HE/12.4, MPL-2.0 AND (MPL-2.0 AND Apache-2.0) AND (MPL-2.0 AND LicenseRef-X11-style) AND MPL-1.0 AND W3C, approved, #12716 maven/mavencentral/org.antlr/antlr4-runtime/4.13.1, BSD-3-Clause, approved, #10767 @@ -124,11 +123,13 @@ maven/mavencentral/org.apache.maven.doxia/doxia-sink-api/1.12.0, Apache-2.0, app maven/mavencentral/org.apache.xbean/xbean-reflect/3.7, Apache-2.0, approved, clearlydefined maven/mavencentral/org.apiguardian/apiguardian-api/1.1.2, Apache-2.0, approved, clearlydefined maven/mavencentral/org.assertj/assertj-core/3.26.0, Apache-2.0, approved, #14886 +maven/mavencentral/org.assertj/assertj-core/3.26.3, Apache-2.0, approved, #14886 maven/mavencentral/org.awaitility/awaitility/4.2.0, Apache-2.0, approved, #14178 maven/mavencentral/org.bouncycastle/bcpkix-jdk18on/1.78.1, MIT, approved, #14434 maven/mavencentral/org.bouncycastle/bcprov-jdk18on/1.78.1, MIT AND CC0-1.0, approved, #14433 maven/mavencentral/org.bouncycastle/bcutil-jdk18on/1.78.1, MIT, approved, #14435 maven/mavencentral/org.ccil.cowan.tagsoup/tagsoup/1.2.1, Apache-2.0, approved, clearlydefined +maven/mavencentral/org.checkerframework/checker-qual/3.42.0, MIT, approved, clearlydefined maven/mavencentral/org.checkerframework/checker-qual/3.43.0, MIT, approved, clearlydefined maven/mavencentral/org.codehaus.plexus/plexus-classworlds/2.6.0, Apache-2.0 AND Plexus, approved, CQ22821 maven/mavencentral/org.codehaus.plexus/plexus-component-annotations/2.1.0, Apache-2.0, approved, #809 @@ -237,6 +238,7 @@ maven/mavencentral/org.eclipse.edc/policy-spi/0.8.1-SNAPSHOT, Apache-2.0, approv maven/mavencentral/org.eclipse.edc/query-lib/0.8.1-SNAPSHOT, Apache-2.0, approved, technology.edc maven/mavencentral/org.eclipse.edc/runtime-metamodel/0.8.1-SNAPSHOT, Apache-2.0, approved, technology.edc maven/mavencentral/org.eclipse.edc/secrets-spi/0.8.1-SNAPSHOT, Apache-2.0, approved, technology.edc +maven/mavencentral/org.eclipse.edc/sql-core/0.8.1-SNAPSHOT, Apache-2.0, approved, technology.edc maven/mavencentral/org.eclipse.edc/state-machine-lib/0.8.1-SNAPSHOT, Apache-2.0, approved, technology.edc maven/mavencentral/org.eclipse.edc/store-lib/0.8.1-SNAPSHOT, Apache-2.0, approved, technology.edc maven/mavencentral/org.eclipse.edc/token-core/0.8.1-SNAPSHOT, Apache-2.0, approved, technology.edc @@ -311,12 +313,15 @@ maven/mavencentral/org.jetbrains/annotations/13.0, Apache-2.0, approved, clearly maven/mavencentral/org.jetbrains/annotations/17.0.0, Apache-2.0, approved, clearlydefined maven/mavencentral/org.jetbrains/annotations/24.1.0, Apache-2.0, approved, clearlydefined maven/mavencentral/org.junit-pioneer/junit-pioneer/2.2.0, EPL-2.0, approved, #11857 +maven/mavencentral/org.junit.jupiter/junit-jupiter-api/5.10.2, EPL-2.0, approved, #9714 maven/mavencentral/org.junit.jupiter/junit-jupiter-api/5.10.3, EPL-2.0, approved, #9714 maven/mavencentral/org.junit.jupiter/junit-jupiter-engine/5.10.3, EPL-2.0, approved, #9711 maven/mavencentral/org.junit.jupiter/junit-jupiter-params/5.10.3, EPL-2.0, approved, #15250 +maven/mavencentral/org.junit.platform/junit-platform-commons/1.10.2, EPL-2.0, approved, #9715 maven/mavencentral/org.junit.platform/junit-platform-commons/1.10.3, EPL-2.0, approved, #9715 maven/mavencentral/org.junit.platform/junit-platform-engine/1.10.3, EPL-2.0, approved, #9709 maven/mavencentral/org.junit.platform/junit-platform-launcher/1.10.3, EPL-2.0, approved, #15216 +maven/mavencentral/org.junit/junit-bom/5.10.2, EPL-2.0, approved, #9844 maven/mavencentral/org.junit/junit-bom/5.10.3, EPL-2.0, approved, #9844 maven/mavencentral/org.junit/junit-bom/5.9.2, EPL-2.0, approved, #4711 maven/mavencentral/org.jvnet.mimepull/mimepull/1.9.15, CDDL-1.1 OR GPL-2.0-only WITH Classpath-exception-2.0, approved, CQ21484 @@ -330,13 +335,17 @@ maven/mavencentral/org.ow2.asm/asm-tree/9.5, BSD-3-Clause, approved, #7555 maven/mavencentral/org.ow2.asm/asm-tree/9.7, BSD-3-Clause, approved, #14073 maven/mavencentral/org.ow2.asm/asm/9.5, BSD-3-Clause, approved, #7554 maven/mavencentral/org.ow2.asm/asm/9.7, BSD-3-Clause, approved, #14076 +maven/mavencentral/org.postgresql/postgresql/42.7.3, BSD-2-Clause AND Apache-2.0, approved, #11681 maven/mavencentral/org.reflections/reflections/0.10.2, Apache-2.0 AND WTFPL, approved, clearlydefined maven/mavencentral/org.rnorth.duct-tape/duct-tape/1.0.8, MIT, approved, clearlydefined maven/mavencentral/org.slf4j/slf4j-api/1.7.25, MIT, approved, CQ13368 maven/mavencentral/org.slf4j/slf4j-api/1.7.30, MIT, approved, CQ13368 maven/mavencentral/org.slf4j/slf4j-api/1.7.36, MIT, approved, CQ13368 maven/mavencentral/org.slf4j/slf4j-api/2.0.9, MIT, approved, #5915 +maven/mavencentral/org.testcontainers/database-commons/1.19.8, Apache-2.0, approved, #10345 +maven/mavencentral/org.testcontainers/jdbc/1.19.8, Apache-2.0, approved, #10348 maven/mavencentral/org.testcontainers/junit-jupiter/1.19.8, MIT, approved, #10344 +maven/mavencentral/org.testcontainers/postgresql/1.19.8, MIT, approved, #10350 maven/mavencentral/org.testcontainers/testcontainers/1.19.8, MIT, approved, #15203 maven/mavencentral/org.xmlresolver/xmlresolver/5.2.2, Apache-2.0, approved, clearlydefined maven/mavencentral/org.yaml/snakeyaml/2.2, Apache-2.0 AND (Apache-2.0 OR BSD-3-Clause OR EPL-1.0 OR GPL-2.0-or-later OR LGPL-2.1-or-later), approved, #10232 diff --git a/core/federated-catalog-core/build.gradle.kts b/core/federated-catalog-core/build.gradle.kts index 47d093e9..6f697210 100644 --- a/core/federated-catalog-core/build.gradle.kts +++ b/core/federated-catalog-core/build.gradle.kts @@ -42,6 +42,8 @@ dependencies { testImplementation(libs.edc.junit) testImplementation(libs.edc.ext.http) testImplementation(libs.awaitility) + + testImplementation(testFixtures(project(":spi:federated-catalog-spi"))) } edcBuild { diff --git a/core/federated-catalog-core/src/test/java/org/eclipse/edc/catalog/defaults/store/InMemoryFederatedCatalogCacheTest.java b/core/federated-catalog-core/src/test/java/org/eclipse/edc/catalog/defaults/store/InMemoryFederatedCatalogCacheTest.java index 82cfaa76..1735391e 100644 --- a/core/federated-catalog-core/src/test/java/org/eclipse/edc/catalog/defaults/store/InMemoryFederatedCatalogCacheTest.java +++ b/core/federated-catalog-core/src/test/java/org/eclipse/edc/catalog/defaults/store/InMemoryFederatedCatalogCacheTest.java @@ -15,138 +15,20 @@ package org.eclipse.edc.catalog.defaults.store; -import org.eclipse.edc.catalog.spi.CatalogConstants; +import org.eclipse.edc.catalog.spi.FederatedCatalogCache; +import org.eclipse.edc.catalog.spi.testfixtures.FederatedCatalogCacheTestBase; import org.eclipse.edc.catalog.store.InMemoryFederatedCatalogCache; -import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset; -import org.eclipse.edc.connector.controlplane.catalog.spi.Catalog; -import org.eclipse.edc.connector.controlplane.catalog.spi.DataService; -import org.eclipse.edc.connector.controlplane.catalog.spi.Dataset; -import org.eclipse.edc.connector.controlplane.catalog.spi.Distribution; import org.eclipse.edc.query.CriterionOperatorRegistryImpl; -import org.eclipse.edc.spi.query.QuerySpec; import org.eclipse.edc.util.concurrency.LockManager; -import org.junit.jupiter.api.Test; -import java.util.List; -import java.util.UUID; import java.util.concurrent.locks.ReentrantReadWriteLock; -import static org.assertj.core.api.Assertions.assertThat; - -class InMemoryFederatedCatalogCacheTest { +class InMemoryFederatedCatalogCacheTest extends FederatedCatalogCacheTestBase { private final InMemoryFederatedCatalogCache store = new InMemoryFederatedCatalogCache(new LockManager(new ReentrantReadWriteLock()), CriterionOperatorRegistryImpl.ofDefaults()); - - @Test - void queryCacheContainingOneElementWithNoCriterion_shouldReturnUniqueElement() { - var contractOfferId = UUID.randomUUID().toString(); - var assetId = UUID.randomUUID().toString(); - var catalogEntry = createCatalog(contractOfferId, createAsset(assetId)); - - store.save(catalogEntry); - - var result = store.query(QuerySpec.none()); - - assertThat(result) - .hasSize(1) - .allSatisfy(co -> assertThat(co.getDatasets().get(0).getId()).isEqualTo(assetId)); - } - - @Test - void queryCacheAfterInsertingSameAssetTwice_shouldReturnLastInsertedContractOfferOnly() { - var contractOfferId1 = UUID.randomUUID().toString(); - var assetId = UUID.randomUUID().toString(); - var entry1 = createCatalog(contractOfferId1, createAsset(assetId)); - var entry2 = createCatalog(contractOfferId1, createAsset(assetId)); - - store.save(entry1); - store.save(entry2); - - var result = store.query(QuerySpec.none()); - - assertThat(result) - .hasSize(1) - .allSatisfy(co -> { - assertThat(co.getId()).isEqualTo(contractOfferId1); - assertThat(co.getDatasets().get(0).getId()).isEqualTo(assetId); - }); - - } - - @Test - void queryCacheContainingTwoDistinctAssets_shouldReturnBothContractOffers() { - var contractOfferId1 = UUID.randomUUID().toString(); - var contractOfferId2 = UUID.randomUUID().toString(); - var assetId1 = UUID.randomUUID().toString(); - var assetId2 = UUID.randomUUID().toString(); - var entry1 = createCatalog(contractOfferId1, createAsset(assetId1)); - var entry2 = createCatalog(contractOfferId2, createAsset(assetId2)); - - store.save(entry1); - store.save(entry2); - - var result = store.query(QuerySpec.none()); - - assertThat(result) - .hasSize(2) - .anySatisfy(co -> assertThat(co.getDatasets().get(0).getId()).isEqualTo(assetId1)) - .anySatisfy(co -> assertThat(co.getDatasets().get(0).getId()).isEqualTo(assetId2)); - } - - @Test - void removedMarked_noneMarked() { - var contractOfferId1 = UUID.randomUUID().toString(); - var contractOfferId2 = UUID.randomUUID().toString(); - var assetId1 = UUID.randomUUID().toString(); - var assetId2 = UUID.randomUUID().toString(); - var entry1 = createCatalog(contractOfferId1, createAsset(assetId1)); - var entry2 = createCatalog(contractOfferId2, createAsset(assetId2)); - - store.save(entry1); - store.save(entry2); - - assertThat(store.query(QuerySpec.none())).hasSize(2); - - store.deleteExpired(); // none of them is marked, d - assertThat(store.query(QuerySpec.none())).containsExactlyInAnyOrder(entry1, entry2); - - } - - @Test - void removedMarked_shouldDeleteMarked() { - var contractOfferId1 = UUID.randomUUID().toString(); - var contractOfferId2 = UUID.randomUUID().toString(); - var assetId1 = UUID.randomUUID().toString(); - var assetId2 = UUID.randomUUID().toString(); - var entry1 = createCatalog(contractOfferId1, createAsset(assetId1)); - var entry2 = createCatalog(contractOfferId2, createAsset(assetId2)); - - store.save(entry1); - store.save(entry2); - - assertThat(store.query(QuerySpec.none())).hasSize(2); - - store.expireAll(); // two items marked - store.save(createCatalog(UUID.randomUUID().toString(), createAsset(UUID.randomUUID().toString()))); - store.deleteExpired(); // should delete only marked items - assertThat(store.query(QuerySpec.none())).hasSize(1) - .doesNotContain(entry1, entry2); - - } - - private Catalog createCatalog(String id, Asset asset) { - var dataService = DataService.Builder.newInstance().build(); - return Catalog.Builder.newInstance() - .id(id) - .dataServices(List.of(dataService)) - .datasets(List.of(Dataset.Builder.newInstance().id(asset.getId()).distributions(List.of(Distribution.Builder.newInstance().dataService(dataService).format("test-format").build())).build())) - .property(CatalogConstants.PROPERTY_ORIGINATOR, "https://test.source/" + id) - .build(); - } - - private Asset createAsset(String id) { - return Asset.Builder.newInstance() - .id(id) - .build(); + + @Override + protected FederatedCatalogCache getStore() { + return store; } } diff --git a/extensions/store/sql/federated-catalog-cache-sql/build.gradle.kts b/extensions/store/sql/federated-catalog-cache-sql/build.gradle.kts new file mode 100644 index 00000000..9800017e --- /dev/null +++ b/extensions/store/sql/federated-catalog-cache-sql/build.gradle.kts @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +plugins { + `java-library` +} + +dependencies { + api(project(":spi:federated-catalog-spi")) + testImplementation(testFixtures(libs.edc.core.sql)) + implementation(libs.edc.core.sql) // for the SqlStatements + implementation(libs.edc.spi.transaction.datasource) + implementation(libs.edc.lib.util) + + testImplementation(libs.edc.junit) + testImplementation(testFixtures(libs.edc.core.sql)) + testImplementation(testFixtures(project(":spi:federated-catalog-spi"))) +} diff --git a/extensions/store/sql/federated-catalog-cache-sql/docs/schema.sql b/extensions/store/sql/federated-catalog-cache-sql/docs/schema.sql new file mode 100644 index 00000000..d35368be --- /dev/null +++ b/extensions/store/sql/federated-catalog-cache-sql/docs/schema.sql @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +-- only intended for and tested with Postgres! +CREATE TABLE IF NOT EXISTS edc_federated_catalog +( + id VARCHAR PRIMARY KEY NOT NULL, + catalog JSON, + marked BOOLEAN DEFAULT FALSE +); diff --git a/extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/BaseSqlDialectStatements.java b/extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/BaseSqlDialectStatements.java new file mode 100644 index 00000000..38d46e94 --- /dev/null +++ b/extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/BaseSqlDialectStatements.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.catalog.store.sql; + +import org.eclipse.edc.catalog.store.sql.schema.postgres.FederatedCatalogMapping; +import org.eclipse.edc.spi.query.QuerySpec; +import org.eclipse.edc.sql.translation.SqlOperatorTranslator; +import org.eclipse.edc.sql.translation.SqlQueryStatement; + +import static java.lang.String.format; + +public abstract class BaseSqlDialectStatements implements FederatedCatalogCacheStatements { + + protected final SqlOperatorTranslator operatorTranslator; + + protected BaseSqlDialectStatements(SqlOperatorTranslator operatorTranslator) { + this.operatorTranslator = operatorTranslator; + } + + @Override + public String getFindByIdTemplate() { + return format("SELECT * FROM %s WHERE %s = ?", getFederatedCatalogTable(), getIdColumn()); + } + + @Override + public String getUpdateAsMarkedTemplate() { + return format("UPDATE %s SET %s = ?", getFederatedCatalogTable(), getMarkedColumn()); + } + + @Override + public String getDeleteByMarkedTemplate() { + return executeStatement() + .delete(getFederatedCatalogTable(), getMarkedColumn()); + } + + @Override + public String getInsertTemplate() { + return executeStatement() + .column(getIdColumn()) + .jsonColumn(getCatalogColumn()) + .column(getMarkedColumn()) + .insertInto(getFederatedCatalogTable()); + } + + @Override + public String getUpdateTemplate() { + return executeStatement() + .jsonColumn(getCatalogColumn()) + .column(getMarkedColumn()) + .update(getFederatedCatalogTable(), getIdColumn()); + } + + @Override + public SqlQueryStatement createQuery(QuerySpec querySpec) { + var select = getSelectStatement(); + return new SqlQueryStatement(select, querySpec, new FederatedCatalogMapping(this), operatorTranslator); + } + + @Override + public String getSelectStatement() { + return format("SELECT * FROM %s", getFederatedCatalogTable()); + } +} diff --git a/extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/FederatedCatalogCacheStatements.java b/extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/FederatedCatalogCacheStatements.java new file mode 100644 index 00000000..d39ad3b2 --- /dev/null +++ b/extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/FederatedCatalogCacheStatements.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.catalog.store.sql; + +import org.eclipse.edc.spi.query.QuerySpec; +import org.eclipse.edc.sql.statement.SqlStatements; +import org.eclipse.edc.sql.translation.SqlQueryStatement; + +public interface FederatedCatalogCacheStatements extends SqlStatements { + + default String getFederatedCatalogTable() { + return "edc_federated_catalog"; + } + + default String getIdColumn() { + return "id"; + } + + default String getCatalogColumn() { + return "catalog"; + } + + default String getMarkedColumn() { + return "marked"; + } + + String getFindByIdTemplate(); + + String getUpdateAsMarkedTemplate(); + + String getDeleteByMarkedTemplate(); + + + String getInsertTemplate(); + + String getUpdateTemplate(); + + SqlQueryStatement createQuery(QuerySpec query); + + String getSelectStatement(); +} diff --git a/extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/SqlFederatedCatalogCache.java b/extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/SqlFederatedCatalogCache.java new file mode 100644 index 00000000..fc002370 --- /dev/null +++ b/extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/SqlFederatedCatalogCache.java @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.catalog.store.sql; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.edc.catalog.spi.CatalogConstants; +import org.eclipse.edc.catalog.spi.FederatedCatalogCache; +import org.eclipse.edc.connector.controlplane.catalog.spi.Catalog; +import org.eclipse.edc.spi.persistence.EdcPersistenceException; +import org.eclipse.edc.spi.query.QuerySpec; +import org.eclipse.edc.sql.QueryExecutor; +import org.eclipse.edc.sql.store.AbstractSqlStore; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; +import org.eclipse.edc.transaction.spi.TransactionContext; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collection; + +import static java.util.Optional.ofNullable; + +public class SqlFederatedCatalogCache extends AbstractSqlStore implements FederatedCatalogCache { + + private final FederatedCatalogCacheStatements statements; + + public SqlFederatedCatalogCache(DataSourceRegistry dataSourceRegistry, String dataSourceName, TransactionContext transactionContext, + ObjectMapper objectMapper, QueryExecutor queryExecutor, FederatedCatalogCacheStatements statements) { + super(dataSourceRegistry, dataSourceName, transactionContext, objectMapper, queryExecutor); + this.statements = statements; + } + + @Override + public void save(Catalog catalog) { + transactionContext.execute(() -> { + try (var connection = getConnection()) { + var id = ofNullable(catalog.getProperties().get(CatalogConstants.PROPERTY_ORIGINATOR)) + .map(Object::toString) + .orElse(catalog.getId()); + + if (findByIdInternal(connection, id) == null) { + insertInternal(connection, id, catalog); + } else { + updateInternal(connection, id, catalog); + } + + } catch (SQLException e) { + throw new EdcPersistenceException(e); + } + }); + } + + @Override + public Collection query(QuerySpec querySpec) { + return transactionContext.execute(() -> { + try (var connection = getConnection()) { + var query = statements.createQuery(querySpec); + return queryExecutor.query(connection, true, this::mapResultSet, query.getQueryAsString(), query.getParameters()).toList(); + } catch (SQLException e) { + throw new EdcPersistenceException(e); + } + }); + } + + @Override + public void deleteExpired() { + transactionContext.execute(() -> { + try (var connection = getConnection()) { + var stmt = statements.getDeleteByMarkedTemplate(); + queryExecutor.execute(connection, stmt, true); + } catch (SQLException e) { + throw new EdcPersistenceException(e); + } + }); + } + + @Override + public void expireAll() { + transactionContext.execute(() -> { + try (var connection = getConnection()) { + var stmt = statements.getUpdateAsMarkedTemplate(); + queryExecutor.execute(connection, stmt, true); + } catch (SQLException e) { + throw new EdcPersistenceException(e); + } + }); + + } + + private Catalog findByIdInternal(Connection connection, String id) { + var stmt = statements.getFindByIdTemplate(); + return queryExecutor.single(connection, false, this::mapResultSet, stmt, id); + } + + private void insertInternal(Connection connection, String id, Catalog catalog) { + var stmt = statements.getInsertTemplate(); + queryExecutor.execute(connection, stmt, id, toJson(catalog), false); + } + + private void updateInternal(Connection connection, String id, Catalog catalog) { + var stmt = statements.getUpdateTemplate(); + queryExecutor.execute(connection, stmt, toJson(catalog), false, id); + } + + private Catalog mapResultSet(ResultSet resultSet) throws Exception { + var json = resultSet.getString(statements.getCatalogColumn()); + return fromJson(json, Catalog.class); + } +} diff --git a/extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/SqlFederatedCatalogCacheExtension.java b/extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/SqlFederatedCatalogCacheExtension.java new file mode 100644 index 00000000..abad1dc5 --- /dev/null +++ b/extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/SqlFederatedCatalogCacheExtension.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.catalog.store.sql; + +import org.eclipse.edc.catalog.spi.FederatedCatalogCache; +import org.eclipse.edc.catalog.store.sql.schema.postgres.PostgresDialectStatements; +import org.eclipse.edc.runtime.metamodel.annotation.Extension; +import org.eclipse.edc.runtime.metamodel.annotation.Inject; +import org.eclipse.edc.runtime.metamodel.annotation.Provides; +import org.eclipse.edc.runtime.metamodel.annotation.Setting; +import org.eclipse.edc.spi.system.ServiceExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.eclipse.edc.spi.types.TypeManager; +import org.eclipse.edc.sql.QueryExecutor; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; +import org.eclipse.edc.transaction.spi.TransactionContext; + +@Provides(FederatedCatalogCache.class) +@Extension(value = "SQL federated catalog cache") +public class SqlFederatedCatalogCacheExtension implements ServiceExtension { + + @Setting + public static final String DATASOURCE_NAME_SETTING = "edc.datasource.federatedcataog.name"; + + @Inject + private DataSourceRegistry dataSourceRegistry; + @Inject + private TransactionContext trxContext; + @Inject(required = false) + private FederatedCatalogCacheStatements statements; + @Inject + private TypeManager typeManager; + + @Inject + private QueryExecutor queryExecutor; + + @Override + public void initialize(ServiceExtensionContext context) { + var store = new SqlFederatedCatalogCache(dataSourceRegistry, getDataSourceName(context), trxContext, + typeManager.getMapper(), queryExecutor, getStatementImpl()); + context.registerService(FederatedCatalogCache.class, store); + } + + /** + * returns an externally-provided sql statement dialect, or postgres as a default + */ + private FederatedCatalogCacheStatements getStatementImpl() { + return statements != null ? statements : new PostgresDialectStatements(); + } + + private String getDataSourceName(ServiceExtensionContext context) { + return context.getConfig().getString(DATASOURCE_NAME_SETTING, DataSourceRegistry.DEFAULT_DATASOURCE); + } +} diff --git a/extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/schema/postgres/FederatedCatalogMapping.java b/extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/schema/postgres/FederatedCatalogMapping.java new file mode 100644 index 00000000..1b0946e1 --- /dev/null +++ b/extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/schema/postgres/FederatedCatalogMapping.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.catalog.store.sql.schema.postgres; + +import org.eclipse.edc.catalog.store.sql.FederatedCatalogCacheStatements; +import org.eclipse.edc.sql.translation.JsonFieldTranslator; +import org.eclipse.edc.sql.translation.TranslationMapping; + +public class FederatedCatalogMapping extends TranslationMapping { + + public FederatedCatalogMapping(FederatedCatalogCacheStatements statements) { + add("id", statements.getIdColumn()); + add("participantId", new JsonFieldTranslator(statements.getCatalogColumn())); + add("properties", new PrefixedJsonFieldTranslator(statements.getCatalogColumn(), "properties")); + add("datasets", new JsonFieldTranslator("datasets")); + add("dataServices", new JsonFieldTranslator("dataServices")); + } +} diff --git a/extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/schema/postgres/PostgresDialectStatements.java b/extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/schema/postgres/PostgresDialectStatements.java new file mode 100644 index 00000000..4ed483a2 --- /dev/null +++ b/extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/schema/postgres/PostgresDialectStatements.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.catalog.store.sql.schema.postgres; + +import org.eclipse.edc.catalog.store.sql.BaseSqlDialectStatements; +import org.eclipse.edc.spi.query.QuerySpec; +import org.eclipse.edc.sql.dialect.PostgresDialect; +import org.eclipse.edc.sql.translation.PostgresqlOperatorTranslator; +import org.eclipse.edc.sql.translation.SqlQueryStatement; + +import static java.lang.String.format; +import static org.eclipse.edc.sql.dialect.PostgresDialect.getSelectFromJsonArrayTemplate; + +public class PostgresDialectStatements extends BaseSqlDialectStatements { + + public static final String DATASETS_ALIAS = "datasets"; + public static final String DATA_SERVICES_ALIAS = "dataServices"; + + public PostgresDialectStatements() { + super(new PostgresqlOperatorTranslator()); + } + + @Override + public String getFormatAsJsonOperator() { + return PostgresDialect.getJsonCastOperator(); + } + + + @Override + public SqlQueryStatement createQuery(QuerySpec querySpec) { + if (querySpec.containsAnyLeftOperand("datasets")) { + var select = getSelectFromJsonArrayTemplate(getSelectStatement(), format("%s -> '%s'", getCatalogColumn(), "datasets"), DATASETS_ALIAS); + return new SqlQueryStatement(select, querySpec, new FederatedCatalogMapping(this), operatorTranslator); + } else if (querySpec.containsAnyLeftOperand("dataServices")) { + var select = getSelectFromJsonArrayTemplate(getSelectStatement(), format("%s -> '%s'", getCatalogColumn(), "dataServices"), DATA_SERVICES_ALIAS); + return new SqlQueryStatement(select, querySpec, new FederatedCatalogMapping(this), operatorTranslator); + } else { + return super.createQuery(querySpec); + } + } +} diff --git a/extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/schema/postgres/PrefixedJsonFieldTranslator.java b/extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/schema/postgres/PrefixedJsonFieldTranslator.java new file mode 100644 index 00000000..b88f3004 --- /dev/null +++ b/extensions/store/sql/federated-catalog-cache-sql/src/main/java/org/eclipse/edc/catalog/store/sql/schema/postgres/PrefixedJsonFieldTranslator.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.catalog.store.sql.schema.postgres; + +import org.eclipse.edc.sql.translation.JsonFieldTranslator; +import org.eclipse.edc.util.reflection.PathItem; + +import java.util.List; +import java.util.stream.Stream; + +/** + * An extension of {@link JsonFieldTranslator} where the list of path is prefixed with + * the provided prefix value. This is useful for mapping field that are stored in a single json column + * without exposing the column/path name directly. For example, we store the catalog DTO as JSON inside the `catalog` column. + * and we want to expose the catalog fields directly without using the prefix `catalog`. For query like + * properties.name = 'name' where the `properties` field is stored inside the json column `catalog` and thus + * the path `catalog.properties` the final JSON filter should be `catalog -> properties ->> name. With only {@link JsonFieldTranslator} + * we would get `catalog ->> name`. + */ +public class PrefixedJsonFieldTranslator extends JsonFieldTranslator { + + private final String prefix; + + public PrefixedJsonFieldTranslator(String columnName, String prefix) { + super(columnName); + this.prefix = prefix; + } + + @Override + public String getLeftOperand(List path, Class type) { + return super.getLeftOperand(Stream.concat(PathItem.parse(prefix).stream(), path.stream()).toList(), type); + } +} diff --git a/extensions/store/sql/federated-catalog-cache-sql/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension b/extensions/store/sql/federated-catalog-cache-sql/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension new file mode 100644 index 00000000..7561ae84 --- /dev/null +++ b/extensions/store/sql/federated-catalog-cache-sql/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension @@ -0,0 +1,15 @@ +# +# Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +# +# This program and the accompanying materials are made available under the +# terms of the Apache License, Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# +# Contributors: +# Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation +# +# + +org.eclipse.edc.catalog.store.sql.SqlFederatedCatalogCacheExtension \ No newline at end of file diff --git a/extensions/store/sql/federated-catalog-cache-sql/src/test/java/org/eclipse/edc/catalog/store/sql/SqlFederatedCatalogCacheExtensionTest.java b/extensions/store/sql/federated-catalog-cache-sql/src/test/java/org/eclipse/edc/catalog/store/sql/SqlFederatedCatalogCacheExtensionTest.java new file mode 100644 index 00000000..bfcfd340 --- /dev/null +++ b/extensions/store/sql/federated-catalog-cache-sql/src/test/java/org/eclipse/edc/catalog/store/sql/SqlFederatedCatalogCacheExtensionTest.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.catalog.store.sql; + +import org.eclipse.edc.catalog.spi.FederatedCatalogCache; +import org.eclipse.edc.json.JacksonTypeManager; +import org.eclipse.edc.junit.extensions.DependencyInjectionExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.eclipse.edc.spi.system.configuration.Config; +import org.eclipse.edc.spi.types.TypeManager; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.edc.catalog.store.sql.SqlFederatedCatalogCacheExtension.DATASOURCE_NAME_SETTING; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(DependencyInjectionExtension.class) +public class SqlFederatedCatalogCacheExtensionTest { + + @BeforeEach + void setUp(ServiceExtensionContext context) { + context.registerService(TypeManager.class, new JacksonTypeManager()); + } + + @Test + void shouldInitializeTheStore(SqlFederatedCatalogCacheExtension extension, ServiceExtensionContext context) { + var config = mock(Config.class); + when(context.getConfig()).thenReturn(config); + when(config.getString(any(), any())).thenReturn("test"); + + extension.initialize(context); + + var service = context.getService(FederatedCatalogCache.class); + assertThat(service).isInstanceOf(SqlFederatedCatalogCache.class); + + verify(config).getString(DATASOURCE_NAME_SETTING, DataSourceRegistry.DEFAULT_DATASOURCE); + } +} diff --git a/extensions/store/sql/federated-catalog-cache-sql/src/test/java/org/eclipse/edc/catalog/store/sql/SqlFederatedCatalogCacheTest.java b/extensions/store/sql/federated-catalog-cache-sql/src/test/java/org/eclipse/edc/catalog/store/sql/SqlFederatedCatalogCacheTest.java new file mode 100644 index 00000000..0c1401f9 --- /dev/null +++ b/extensions/store/sql/federated-catalog-cache-sql/src/test/java/org/eclipse/edc/catalog/store/sql/SqlFederatedCatalogCacheTest.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.catalog.store.sql; + +import org.eclipse.edc.catalog.spi.FederatedCatalogCache; +import org.eclipse.edc.catalog.spi.testfixtures.FederatedCatalogCacheTestBase; +import org.eclipse.edc.catalog.store.sql.schema.postgres.PostgresDialectStatements; +import org.eclipse.edc.json.JacksonTypeManager; +import org.eclipse.edc.junit.annotations.ComponentTest; +import org.eclipse.edc.sql.QueryExecutor; +import org.eclipse.edc.sql.testfixtures.PostgresqlStoreSetupExtension; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +@ComponentTest +@ExtendWith(PostgresqlStoreSetupExtension.class) +public class SqlFederatedCatalogCacheTest extends FederatedCatalogCacheTestBase { + + private final FederatedCatalogCacheStatements statements = new PostgresDialectStatements(); + + private FederatedCatalogCache store; + + @BeforeEach + void setup(PostgresqlStoreSetupExtension extension, QueryExecutor queryExecutor) throws IOException { + var typeManager = new JacksonTypeManager(); + store = new SqlFederatedCatalogCache(extension.getDataSourceRegistry(), extension.getDatasourceName(), + extension.getTransactionContext(), typeManager.getMapper(), queryExecutor, statements); + + var schema = Files.readString(Paths.get("./docs/schema.sql")); + extension.runQuery(schema); + } + + @AfterEach + void tearDown(PostgresqlStoreSetupExtension extension) { + extension.runQuery("DROP TABLE " + statements.getFederatedCatalogTable()); + } + + @Override + protected FederatedCatalogCache getStore() { + return store; + } +} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index cdbb4f62..8b5862d4 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -7,6 +7,8 @@ edc = "0.8.1-SNAPSHOT" failsafe = "3.3.2" restAssured = "5.4.0" jackson = "2.17.2" +jupiter = "5.10.2" +assertj = "3.26.0" [libraries] awaitility = { module = "org.awaitility:awaitility", version.ref = "awaitility" } @@ -33,7 +35,7 @@ edc-dcp = { module = "org.eclipse.edc:identity-trust", version.ref = "edc" } edc-did-core = { module = "org.eclipse.edc:identity-did-core", version.ref = "edc" } edc-did-web = { module = "org.eclipse.edc:identity-did-web", version.ref = "edc" } edc-oauth2-client = { module = "org.eclipse.edc:oauth2-client", version.ref = "edc" } - +edc-core-sql = { module = "org.eclipse.edc:sql-core", version.ref = "edc" } edc-junit = { module = "org.eclipse.edc:junit", version.ref = "edc" } edc-json-ld-lib = { module = "org.eclipse.edc:json-ld-lib", version.ref = "edc" } edc-spi-catalog = { module = "org.eclipse.edc:catalog-spi", version.ref = "edc" } @@ -45,6 +47,7 @@ edc-spi-dsp-http = { module = "org.eclipse.edc:dsp-http-spi", version.ref = "edc edc-spi-jsonld = { module = "org.eclipse.edc:json-ld-spi", version.ref = "edc" } edc-spi-transform = { module = "org.eclipse.edc:transform-spi", version.ref = "edc" } edc-spi-dataplane-selector = { module = "org.eclipse.edc:data-plane-selector-spi", version.ref = "edc" } +edc-spi-transaction-datasource = { module = "org.eclipse.edc:transaction-datasource-spi", version.ref = "edc" } failsafe-core = { module = "dev.failsafe:failsafe", version.ref = "failsafe" } restAssured = { module = "io.rest-assured:rest-assured", version.ref = "restAssured" } @@ -66,7 +69,8 @@ edc-controlplane-transform = { module = "org.eclipse.edc:control-plane-transform # third-party desp jackson-jsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref = "jackson" } - +junit-jupiter-api = { module = "org.junit.jupiter:junit-jupiter-api", version.ref = "jupiter" } +assertj = { module = "org.assertj:assertj-core", version.ref = "assertj" } [bundles] edc-connector = ["edc-boot", "edc-core-connector", "edc-core-jersey", "edc-api-observability"] diff --git a/settings.gradle.kts b/settings.gradle.kts index 1df003a1..9917695a 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -8,6 +8,7 @@ include(":core:federated-catalog-core") include(":core:common:lib:catalog-util-lib") include(":extensions:api:federated-catalog-api") +include(":extensions:store:sql:federated-catalog-cache-sql") include(":launchers:catalog-base") include(":launchers:catalog-dcp") include(":launchers:catalog-mocked") diff --git a/spi/federated-catalog-spi/build.gradle.kts b/spi/federated-catalog-spi/build.gradle.kts index d38a4ad6..f58d2848 100644 --- a/spi/federated-catalog-spi/build.gradle.kts +++ b/spi/federated-catalog-spi/build.gradle.kts @@ -14,10 +14,15 @@ plugins { `java-library` + `java-test-fixtures` } dependencies { api(project(":spi:crawler-spi")) api(libs.edc.spi.catalog) api(libs.edc.spi.core) + + testFixturesImplementation(libs.edc.junit) + testFixturesImplementation(libs.junit.jupiter.api) + testFixturesImplementation(libs.assertj) } diff --git a/spi/federated-catalog-spi/src/testFixtures/java/org/eclipse/edc/catalog/spi/testfixtures/FederatedCatalogCacheTestBase.java b/spi/federated-catalog-spi/src/testFixtures/java/org/eclipse/edc/catalog/spi/testfixtures/FederatedCatalogCacheTestBase.java new file mode 100644 index 00000000..73c47c9b --- /dev/null +++ b/spi/federated-catalog-spi/src/testFixtures/java/org/eclipse/edc/catalog/spi/testfixtures/FederatedCatalogCacheTestBase.java @@ -0,0 +1,259 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.catalog.spi.testfixtures; + +import org.eclipse.edc.catalog.spi.CatalogConstants; +import org.eclipse.edc.catalog.spi.FederatedCatalogCache; +import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset; +import org.eclipse.edc.connector.controlplane.catalog.spi.Catalog; +import org.eclipse.edc.connector.controlplane.catalog.spi.DataService; +import org.eclipse.edc.connector.controlplane.catalog.spi.Dataset; +import org.eclipse.edc.connector.controlplane.catalog.spi.Distribution; +import org.eclipse.edc.spi.query.Criterion; +import org.eclipse.edc.spi.query.QuerySpec; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +public abstract class FederatedCatalogCacheTestBase { + + + protected abstract FederatedCatalogCache getStore(); + + private Catalog createCatalog(String id, Asset asset) { + return createCatalogBuilder(id, asset).build(); + } + + private Catalog.Builder createCatalogBuilder(String id, Asset asset) { + return createCatalogBuilder(id, asset, null); + } + + private Catalog.Builder createCatalogBuilder(String id, Asset asset, String ednpointUrl) { + var dataService = DataService.Builder.newInstance().endpointUrl(ednpointUrl).build(); + + + return Catalog.Builder.newInstance() + .id(id) + .dataServices(List.of(dataService)) + .datasets(List.of(Dataset.Builder.newInstance().id(asset.getId()).properties(asset.getProperties()).distributions(List.of(Distribution.Builder.newInstance().dataService(dataService).format("test-format").build())).build())) + .property(CatalogConstants.PROPERTY_ORIGINATOR, "https://test.source/" + id); + } + + private Asset createAsset(String id) { + return createAssetBuilder(id) + .build(); + } + + private Asset.Builder createAssetBuilder(String id) { + return Asset.Builder.newInstance() + .id(id); + } + + @Nested + class Save { + + @Test + void save_shouldReturnUniqueElement() { + var contractOfferId = UUID.randomUUID().toString(); + var assetId = UUID.randomUUID().toString(); + var catalogEntry = createCatalog(contractOfferId, createAsset(assetId)); + + getStore().save(catalogEntry); + + var result = getStore().query(QuerySpec.none()); + + assertThat(result) + .hasSize(1) + .allSatisfy(co -> assertThat(co.getDatasets().get(0).getId()).isEqualTo(assetId)); + } + + @Test + void save_shouldReturnLastInsertedContractOfferOnly() { + var contractOfferId1 = UUID.randomUUID().toString(); + var assetId = UUID.randomUUID().toString(); + var entry1 = createCatalog(contractOfferId1, createAsset(assetId)); + var entry2 = createCatalog(contractOfferId1, createAsset(assetId)); + + getStore().save(entry1); + getStore().save(entry2); + + var result = getStore().query(QuerySpec.none()); + + assertThat(result) + .hasSize(1) + .allSatisfy(co -> { + assertThat(co.getId()).isEqualTo(contractOfferId1); + assertThat(co.getDatasets().get(0).getId()).isEqualTo(assetId); + }); + + } + + @Test + void save_shouldReturnBothContractOffers() { + var contractOfferId1 = UUID.randomUUID().toString(); + var contractOfferId2 = UUID.randomUUID().toString(); + var assetId1 = UUID.randomUUID().toString(); + var assetId2 = UUID.randomUUID().toString(); + var entry1 = createCatalog(contractOfferId1, createAsset(assetId1)); + var entry2 = createCatalog(contractOfferId2, createAsset(assetId2)); + + getStore().save(entry1); + getStore().save(entry2); + + var result = getStore().query(QuerySpec.none()); + + assertThat(result) + .hasSize(2) + .anySatisfy(co -> assertThat(co.getDatasets().get(0).getId()).isEqualTo(assetId1)) + .anySatisfy(co -> assertThat(co.getDatasets().get(0).getId()).isEqualTo(assetId2)); + } + } + + @Nested + class Query { + + @Test + void queryByParticipantId() { + var contractOfferId1 = UUID.randomUUID().toString(); + var contractOfferId2 = UUID.randomUUID().toString(); + var assetId1 = UUID.randomUUID().toString(); + var assetId2 = UUID.randomUUID().toString(); + var entry1 = createCatalogBuilder(contractOfferId1, createAsset(assetId1)).participantId("participant1").build(); + var entry2 = createCatalogBuilder(contractOfferId2, createAsset(assetId2)).participantId("participant2").build(); + + getStore().save(entry1); + getStore().save(entry2); + + var query = QuerySpec.Builder.newInstance().filter(Criterion.criterion("participantId", "=", entry1.getParticipantId())).build(); + var result = getStore().query(query); + + assertThat(result) + .hasSize(1) + .allSatisfy(co -> assertThat(co.getParticipantId()).isEqualTo(entry1.getParticipantId())); + } + + @Test + void queryByDatasetId() { + var contractOfferId1 = UUID.randomUUID().toString(); + var contractOfferId2 = UUID.randomUUID().toString(); + var assetId1 = UUID.randomUUID().toString(); + var assetId2 = UUID.randomUUID().toString(); + var entry1 = createCatalogBuilder(contractOfferId1, createAsset(assetId1)).build(); + var entry2 = createCatalogBuilder(contractOfferId2, createAsset(assetId2)).build(); + + getStore().save(entry1); + getStore().save(entry2); + + var query = QuerySpec.Builder.newInstance().filter(Criterion.criterion("datasets.id", "=", assetId2)).build(); + var result = getStore().query(query); + + assertThat(result) + .hasSize(1) + .allSatisfy(co -> assertThat(co.getDatasets().get(0).getId()).isEqualTo(assetId2)); + } + + + @Test + void queryByCatalogProperty() { + var contractOfferId1 = UUID.randomUUID().toString(); + var contractOfferId2 = UUID.randomUUID().toString(); + var assetId1 = UUID.randomUUID().toString(); + var assetId2 = UUID.randomUUID().toString(); + var entry1 = createCatalogBuilder(contractOfferId1, createAsset(assetId1)).property("name", "value").build(); + var entry2 = createCatalogBuilder(contractOfferId2, createAsset(assetId2)).build(); + + getStore().save(entry1); + getStore().save(entry2); + + var query = QuerySpec.Builder.newInstance().filter(Criterion.criterion("properties.name", "=", "value")).build(); + var result = getStore().query(query); + + assertThat(result) + .hasSize(1) + .allSatisfy(co -> assertThat(co.getProperties().get("name")).isEqualTo("value")); + } + + @Test + void queryByDataServiceEndpoint() { + var contractOfferId1 = UUID.randomUUID().toString(); + var contractOfferId2 = UUID.randomUUID().toString(); + var endpoint = "http://endpoint"; + var assetId1 = UUID.randomUUID().toString(); + var assetId2 = UUID.randomUUID().toString(); + var entry1 = createCatalogBuilder(contractOfferId1, createAsset(assetId1), endpoint).build(); + var entry2 = createCatalogBuilder(contractOfferId2, createAsset(assetId2)).build(); + + getStore().save(entry1); + getStore().save(entry2); + + var query = QuerySpec.Builder.newInstance().filter(Criterion.criterion("dataServices.endpointUrl", "=", endpoint)).build(); + var result = getStore().query(query); + + assertThat(result) + .hasSize(1) + .allSatisfy(co -> assertThat(co.getDataServices().get(0).getEndpointUrl()).isEqualTo(endpoint)); + } + + } + + @Nested + class Delete { + + @Test + void removedMarked_noneMarked() { + var contractOfferId1 = UUID.randomUUID().toString(); + var contractOfferId2 = UUID.randomUUID().toString(); + var assetId1 = UUID.randomUUID().toString(); + var assetId2 = UUID.randomUUID().toString(); + var entry1 = createCatalog(contractOfferId1, createAsset(assetId1)); + var entry2 = createCatalog(contractOfferId2, createAsset(assetId2)); + + getStore().save(entry1); + getStore().save(entry2); + + assertThat(getStore().query(QuerySpec.none())).hasSize(2); + + getStore().deleteExpired(); // none of them is marked, d + assertThat(getStore().query(QuerySpec.none())).usingRecursiveFieldByFieldElementComparator().containsExactlyInAnyOrder(entry1, entry2); + + } + + @Test + void removedMarked_shouldDeleteMarked() { + var contractOfferId1 = UUID.randomUUID().toString(); + var contractOfferId2 = UUID.randomUUID().toString(); + var assetId1 = UUID.randomUUID().toString(); + var assetId2 = UUID.randomUUID().toString(); + var entry1 = createCatalog(contractOfferId1, createAsset(assetId1)); + var entry2 = createCatalog(contractOfferId2, createAsset(assetId2)); + + getStore().save(entry1); + getStore().save(entry2); + + assertThat(getStore().query(QuerySpec.none())).hasSize(2); + + getStore().expireAll(); // two items marked + getStore().save(createCatalog(UUID.randomUUID().toString(), createAsset(UUID.randomUUID().toString()))); + getStore().deleteExpired(); // should delete only marked items + assertThat(getStore().query(QuerySpec.none())).hasSize(1) + .doesNotContain(entry1, entry2); + + } + } +}