diff --git a/core/edr-core/build.gradle.kts b/core/edr-core/build.gradle.kts index 8335ef370..cb3428696 100644 --- a/core/edr-core/build.gradle.kts +++ b/core/edr-core/build.gradle.kts @@ -26,11 +26,15 @@ dependencies { implementation(libs.edc.spi.edrstore) implementation(libs.edc.spi.transactionspi) + implementation(libs.edc.spi.transaction.datasource) + implementation(project(":spi:tokenrefresh-spi")) implementation(project(":spi:edr-spi")) implementation(project(":spi:core-spi")) testImplementation(libs.edc.junit) + testImplementation(libs.edc.core.edrstore) + testImplementation(libs.edc.lib.query) testImplementation(libs.awaitility) testImplementation(testFixtures(project(":spi:edr-spi"))) diff --git a/core/edr-core/src/main/java/org/eclipse/tractusx/edc/edr/core/EdrCoreServiceExtension.java b/core/edr-core/src/main/java/org/eclipse/tractusx/edc/edr/core/EdrCoreServiceExtension.java index 8c579a4cc..41254ed98 100644 --- a/core/edr-core/src/main/java/org/eclipse/tractusx/edc/edr/core/EdrCoreServiceExtension.java +++ b/core/edr-core/src/main/java/org/eclipse/tractusx/edc/edr/core/EdrCoreServiceExtension.java @@ -27,6 +27,7 @@ import org.eclipse.edc.spi.system.ServiceExtension; import org.eclipse.edc.transaction.spi.TransactionContext; import org.eclipse.tractusx.edc.edr.core.service.EdrServiceImpl; +import org.eclipse.tractusx.edc.edr.spi.index.lock.EndpointDataReferenceLock; import org.eclipse.tractusx.edc.edr.spi.service.EdrService; import org.eclipse.tractusx.edc.spi.tokenrefresh.common.TokenRefreshHandler; @@ -49,6 +50,9 @@ public class EdrCoreServiceExtension implements ServiceExtension { @Inject private TransactionContext transactionContext; + @Inject + private EndpointDataReferenceLock edrLock; + @Override public String name() { return NAME; @@ -56,6 +60,6 @@ public String name() { @Provider public EdrService edrService() { - return new EdrServiceImpl(edrStore, tokenRefreshHandler, transactionContext, monitor); + return new EdrServiceImpl(edrStore, tokenRefreshHandler, transactionContext, monitor, edrLock); } } diff --git a/core/edr-core/src/main/java/org/eclipse/tractusx/edc/edr/core/lock/DefaultEdrLockProviderExtension.java b/core/edr-core/src/main/java/org/eclipse/tractusx/edc/edr/core/lock/DefaultEdrLockProviderExtension.java new file mode 100644 index 000000000..90c47b735 --- /dev/null +++ b/core/edr-core/src/main/java/org/eclipse/tractusx/edc/edr/core/lock/DefaultEdrLockProviderExtension.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.eclipse.tractusx.edc.edr.core.lock; + +import org.eclipse.edc.edr.spi.store.EndpointDataReferenceEntryIndex; +import org.eclipse.edc.runtime.metamodel.annotation.Extension; +import org.eclipse.edc.runtime.metamodel.annotation.Inject; +import org.eclipse.edc.runtime.metamodel.annotation.Provider; +import org.eclipse.edc.spi.system.ServiceExtension; +import org.eclipse.edc.transaction.spi.TransactionContext; +import org.eclipse.tractusx.edc.edr.spi.index.lock.EndpointDataReferenceLock; + +@Extension("Provides A Default EdrLock Provider") +public class DefaultEdrLockProviderExtension implements ServiceExtension { + + @Inject + EndpointDataReferenceEntryIndex entryIndex; + + @Inject + TransactionContext transactionContext; + + @Provider(isDefault = true) + public EndpointDataReferenceLock createInMemoryEdrLock() { + return new InMemoryEdrLock(entryIndex, transactionContext); + } +} diff --git a/core/edr-core/src/main/java/org/eclipse/tractusx/edc/edr/core/lock/InMemoryEdrLock.java b/core/edr-core/src/main/java/org/eclipse/tractusx/edc/edr/core/lock/InMemoryEdrLock.java new file mode 100644 index 000000000..f8627be43 --- /dev/null +++ b/core/edr-core/src/main/java/org/eclipse/tractusx/edc/edr/core/lock/InMemoryEdrLock.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.eclipse.tractusx.edc.edr.core.lock; + +import org.eclipse.edc.edr.spi.store.EndpointDataReferenceEntryIndex; +import org.eclipse.edc.spi.result.StoreResult; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.transaction.spi.TransactionContext; +import org.eclipse.tractusx.edc.edr.spi.index.lock.EndpointDataReferenceLock; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; + + +public class InMemoryEdrLock implements EndpointDataReferenceLock { + + private final EndpointDataReferenceEntryIndex entryIndex; + private final TransactionContext transactionContext; + private final Map lockedEdrs = new ConcurrentHashMap<>(); + + public InMemoryEdrLock(EndpointDataReferenceEntryIndex entryIndex, TransactionContext transactionContext) { + this.entryIndex = entryIndex; + this.transactionContext = transactionContext; + } + + @Override + public StoreResult acquireLock(String edrId, DataAddress edr) { + + var rowLock = lockedEdrs.computeIfAbsent(edrId, k -> new ReentrantReadWriteLock()); + + rowLock.writeLock().lock(); // this lock synchronizes row-level access + + var edrEntry = transactionContext.execute(() -> entryIndex.findById(edrId)); + + return StoreResult.success(isExpired(edr, edrEntry)); + + } + + + @Override + public StoreResult releaseLock(String edrId) { + + lockedEdrs.computeIfPresent(edrId, (k, rowLock) -> { + if (rowLock.writeLock().isHeldByCurrentThread()) { + rowLock.writeLock().unlock(); + if (!rowLock.hasQueuedThreads()) { + return null; + } + } + return rowLock; + }); + + return StoreResult.success(); + } + +} diff --git a/core/edr-core/src/main/java/org/eclipse/tractusx/edc/edr/core/service/EdrServiceImpl.java b/core/edr-core/src/main/java/org/eclipse/tractusx/edc/edr/core/service/EdrServiceImpl.java index 17eb5adf6..d9a175de5 100644 --- a/core/edr-core/src/main/java/org/eclipse/tractusx/edc/edr/core/service/EdrServiceImpl.java +++ b/core/edr-core/src/main/java/org/eclipse/tractusx/edc/edr/core/service/EdrServiceImpl.java @@ -26,27 +26,27 @@ import org.eclipse.edc.spi.result.ServiceResult; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.transaction.spi.TransactionContext; +import org.eclipse.tractusx.edc.edr.spi.index.lock.EndpointDataReferenceLock; import org.eclipse.tractusx.edc.edr.spi.service.EdrService; import org.eclipse.tractusx.edc.edr.spi.types.RefreshMode; import org.eclipse.tractusx.edc.spi.tokenrefresh.common.TokenRefreshHandler; -import java.time.Instant; import java.util.List; -import static org.eclipse.tractusx.edc.edr.spi.CoreConstants.EDR_PROPERTY_EXPIRES_IN; - public class EdrServiceImpl implements EdrService { private final EndpointDataReferenceStore edrStore; private final TokenRefreshHandler tokenRefreshHandler; private final TransactionContext transactionContext; private final Monitor monitor; + private final EndpointDataReferenceLock edrLock; - public EdrServiceImpl(EndpointDataReferenceStore edrStore, TokenRefreshHandler tokenRefreshHandler, TransactionContext transactionContext, Monitor monitor) { + public EdrServiceImpl(EndpointDataReferenceStore edrStore, TokenRefreshHandler tokenRefreshHandler, TransactionContext transactionContext, Monitor monitor, EndpointDataReferenceLock edrLock) { this.edrStore = edrStore; this.tokenRefreshHandler = tokenRefreshHandler; this.transactionContext = transactionContext; this.monitor = monitor; + this.edrLock = edrLock; } @Override @@ -74,12 +74,26 @@ private ServiceResult autoRefresh(String id, DataAddress edr, Refre if (edrEntry == null) { return ServiceResult.notFound("An EndpointDataReferenceEntry with ID '%s' does not exist".formatted(id)); } - if (isExpired(edr, edrEntry) || mode.equals(RefreshMode.FORCE_REFRESH)) { - monitor.debug("Token expired, need to refresh."); - return tokenRefreshHandler.refreshToken(id, edr) - .compose(updated -> updateEdr(edrEntry, updated)); + if (edrLock.isExpired(edr, edrEntry) || mode.equals(RefreshMode.FORCE_REFRESH)) { + var result = ServiceResult.from(edrLock.acquireLock(id, edr)) + .compose(shouldRefresh -> { + if (!shouldRefresh && !mode.equals(RefreshMode.FORCE_REFRESH)) { + monitor.debug("Dont need to refresh. Will resolve existing."); + var refreshedEdr = edrStore.resolveByTransferProcess(id); + return ServiceResult.from(refreshedEdr); + } else { + monitor.debug("Token '%s' expired, need to refresh.".formatted(id)); + return tokenRefreshHandler.refreshToken(id, edr) + .compose(updated -> updateEdr(edrEntry, updated)); + } + }); + edrLock.releaseLock(id) + .onFailure(error -> monitor.warning("Error releasing lock: %s".formatted(error))); + return result; + } - return ServiceResult.success(edr); + var refreshedEdr = edrStore.resolveByTransferProcess(id); + return ServiceResult.from(refreshedEdr); } private ServiceResult updateEdr(EndpointDataReferenceEntry entry, DataAddress dataAddress) { @@ -94,24 +108,12 @@ private ServiceResult updateEdr(EndpointDataReferenceEntry entry, D var updateResult = edrStore.save(newEntry, dataAddress); + if (updateResult.failed()) { return ServiceResult.fromFailure(updateResult); } return ServiceResult.success(dataAddress); } - private boolean isExpired(DataAddress edr, EndpointDataReferenceEntry metadata) { - var expiresInString = edr.getStringProperty(EDR_PROPERTY_EXPIRES_IN); - if (expiresInString == null) { - return false; - } - - var expiresIn = Long.parseLong(expiresInString); - // createdAt is in millis, expires-in is in seconds - var expiresAt = metadata.getCreatedAt() / 1000L + expiresIn; - var expiresAtInstant = Instant.ofEpochSecond(expiresAt); - - return expiresAtInstant.isBefore(Instant.now()); - } } diff --git a/core/edr-core/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension b/core/edr-core/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension index bddcb77ff..6a0f65a77 100644 --- a/core/edr-core/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension +++ b/core/edr-core/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension @@ -18,3 +18,4 @@ ################################################################################# org.eclipse.tractusx.edc.edr.core.EdrCoreServiceExtension +org.eclipse.tractusx.edc.edr.core.lock.DefaultEdrLockProviderExtension diff --git a/core/edr-core/src/test/java/org/eclipse/tractusx/edc/edr/core/lock/InMemoryEdrLockTest.java b/core/edr-core/src/test/java/org/eclipse/tractusx/edc/edr/core/lock/InMemoryEdrLockTest.java new file mode 100644 index 000000000..03b3e3758 --- /dev/null +++ b/core/edr-core/src/test/java/org/eclipse/tractusx/edc/edr/core/lock/InMemoryEdrLockTest.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.eclipse.tractusx.edc.edr.core.lock; + +import org.eclipse.edc.edr.store.defaults.InMemoryEndpointDataReferenceEntryIndex; +import org.eclipse.edc.junit.annotations.ComponentTest; +import org.eclipse.edc.query.CriterionOperatorRegistryImpl; +import org.eclipse.edc.transaction.spi.NoopTransactionContext; +import org.eclipse.edc.transaction.spi.TransactionContext; +import org.eclipse.tractusx.edc.edr.spi.index.lock.EndpointDataReferenceLock; +import org.eclipse.tractusx.edc.edr.spi.testfixtures.index.lock.EndpointDataReferenceLockBaseTest; +import org.junit.jupiter.api.BeforeEach; + + +@ComponentTest +class InMemoryEdrLockTest extends EndpointDataReferenceLockBaseTest { + + private InMemoryEdrLock edrLock; + private final TransactionContext transactionContext = new NoopTransactionContext(); + + @BeforeEach + void setUp() { + var entryIndex = new InMemoryEndpointDataReferenceEntryIndex(CriterionOperatorRegistryImpl.ofDefaults()); + edrLock = new InMemoryEdrLock(entryIndex, transactionContext); + entryIndex.save(edrEntry("mock", ACQUIRE_LOCK_TP)); + } + + @Override + protected EndpointDataReferenceLock getStore() { + return edrLock; + } +} \ No newline at end of file diff --git a/core/edr-core/src/test/java/org/eclipse/tractusx/edc/edr/core/service/EdrServiceImplTest.java b/core/edr-core/src/test/java/org/eclipse/tractusx/edc/edr/core/service/EdrServiceImplTest.java index a99877af6..1368ea4f8 100644 --- a/core/edr-core/src/test/java/org/eclipse/tractusx/edc/edr/core/service/EdrServiceImplTest.java +++ b/core/edr-core/src/test/java/org/eclipse/tractusx/edc/edr/core/service/EdrServiceImplTest.java @@ -27,8 +27,10 @@ import org.eclipse.edc.spi.result.StoreResult; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.transaction.spi.NoopTransactionContext; +import org.eclipse.tractusx.edc.edr.spi.index.lock.EndpointDataReferenceLock; import org.eclipse.tractusx.edc.spi.tokenrefresh.common.TokenRefreshHandler; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -43,20 +45,22 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -public class EdrServiceImplTest { +class EdrServiceImplTest { private final TokenRefreshHandler tokenRefreshHandler = mock(); private final EndpointDataReferenceStore edrStore = mock(); + private final EndpointDataReferenceLock edrLock = mock(); private EdrServiceImpl edrService; @BeforeEach void setup() { - edrService = new EdrServiceImpl(edrStore, tokenRefreshHandler, new NoopTransactionContext(), mock()); + edrService = new EdrServiceImpl(edrStore, tokenRefreshHandler, new NoopTransactionContext(), mock(), edrLock); } @Test @@ -82,7 +86,7 @@ void resolveByTransferProcess_whenNoRefresh() { verify(edrStore).resolveByTransferProcess(transferProcess); verifyNoMoreInteractions(edrStore); - verifyNoInteractions(tokenRefreshHandler); + verifyNoInteractions(tokenRefreshHandler, edrLock); } @Test @@ -90,17 +94,21 @@ void resolveByTransferProcess_whenRefreshNotExpired() { var transferProcess = "tp"; var assetId = "assetId"; - when(edrStore.resolveByTransferProcess(transferProcess)).thenReturn(StoreResult.success(edr("1000"))); - when(edrStore.findById(transferProcess)).thenReturn(edrEntry(assetId, transferProcess)); + var edrEntry = edrEntry(assetId, transferProcess); + var edr = edr("1000"); + when(edrStore.resolveByTransferProcess(transferProcess)).thenReturn(StoreResult.success(edr)); + when(edrStore.findById(transferProcess)).thenReturn(edrEntry); + when(edrLock.isExpired(edr, edrEntry)).thenReturn(false); var result = edrService.resolveByTransferProcess(transferProcess, AUTO_REFRESH); assertThat(result).isSucceeded(); - verify(edrStore).resolveByTransferProcess(transferProcess); + verify(edrStore, times(2)).resolveByTransferProcess(transferProcess); verify(edrStore).findById(transferProcess); + verify(edrLock).isExpired(edr, edrEntry); - verifyNoMoreInteractions(edrStore); + verifyNoMoreInteractions(edrStore, edrLock); verifyNoInteractions(tokenRefreshHandler); } @@ -110,12 +118,16 @@ void resolveByTransferProcess_whenRefreshExpired() { var transferProcess = "tp"; var assetId = "assetId"; var entry = edrEntry(assetId, transferProcess); + var expiredEdr = edr("-1000"); var refreshedEdr = edr(); - when(edrStore.resolveByTransferProcess(transferProcess)).thenReturn(StoreResult.success(edr("-1000"))); + when(edrStore.resolveByTransferProcess(transferProcess)).thenReturn(StoreResult.success(expiredEdr)); when(edrStore.findById(transferProcess)).thenReturn(entry); + when(edrLock.isExpired(any(), any())).thenReturn(true); + when(edrLock.acquireLock(any(), any())).thenReturn(StoreResult.success(true)); when(tokenRefreshHandler.refreshToken(eq(transferProcess), any())).thenReturn(ServiceResult.success(refreshedEdr)); when(edrStore.save(any(), eq(refreshedEdr))).thenReturn(StoreResult.success()); + when(edrLock.releaseLock(transferProcess)).thenReturn(StoreResult.success()); var result = edrService.resolveByTransferProcess(transferProcess, AUTO_REFRESH); @@ -126,23 +138,63 @@ void resolveByTransferProcess_whenRefreshExpired() { verify(edrStore).findById(transferProcess); verify(edrStore).save(captor.capture(), eq(refreshedEdr)); verify(tokenRefreshHandler).refreshToken(eq(transferProcess), any()); + verify(edrLock).isExpired(any(), any()); + verify(edrLock).acquireLock(any(), any()); + verify(edrLock).releaseLock(transferProcess); - verifyNoMoreInteractions(edrStore, tokenRefreshHandler); + verifyNoMoreInteractions(edrStore, tokenRefreshHandler, edrLock); Assertions.assertThat(captor.getValue()).usingRecursiveComparison().ignoringFields("createdAt").isEqualTo(entry); } + @Test + @DisplayName("Resolve an EDR which initially was expired but was refreshed in the meantime") + void resolveByTransferProcess_whenRefreshExpiredButWasAlreadyRefreshed() { + + var transferProcess = "tp"; + var assetId = "assetId"; + var entry = edrEntry(assetId, transferProcess); + var expiredEdr = edr("-1000"); + var refreshedEdr = edr(); + + when(edrStore.resolveByTransferProcess(transferProcess)).thenReturn(StoreResult.success(expiredEdr)); + when(edrStore.findById(transferProcess)).thenReturn(entry); + when(edrLock.isExpired(any(), any())).thenReturn(true); + when(edrLock.acquireLock(any(), any())).thenReturn(StoreResult.success(false)); + when(edrStore.resolveByTransferProcess(transferProcess)).thenReturn(StoreResult.success(refreshedEdr)); + when(edrLock.releaseLock(transferProcess)).thenReturn(StoreResult.success()); + + var result = edrService.resolveByTransferProcess(transferProcess, AUTO_REFRESH); + + assertThat(result).isSucceeded(); + + verify(edrStore, times(2)).resolveByTransferProcess(transferProcess); + verify(edrStore).findById(transferProcess); + verify(edrLock).isExpired(any(), any()); + verify(edrLock).acquireLock(any(), any()); + verify(edrLock).releaseLock(transferProcess); + + Assertions.assertThat(result.getContent()).isEqualTo(refreshedEdr); + + verifyNoMoreInteractions(edrStore, tokenRefreshHandler, edrLock); + } + @Test void resolveByTransferProcess_forceRefresh() { var transferProcess = "tp"; var assetId = "assetId"; var entry = edrEntry(assetId, transferProcess); + var edr = edr("1000"); var refreshedEdr = edr(); - when(edrStore.resolveByTransferProcess(transferProcess)).thenReturn(StoreResult.success(edr("1000"))); + + when(edrStore.resolveByTransferProcess(transferProcess)).thenReturn(StoreResult.success(edr)); when(edrStore.findById(transferProcess)).thenReturn(entry); + when(edrLock.isExpired(any(), any())).thenReturn(false); + when(edrLock.acquireLock(any(), any())).thenReturn(StoreResult.success(false)); when(tokenRefreshHandler.refreshToken(eq(transferProcess), any())).thenReturn(ServiceResult.success(refreshedEdr)); when(edrStore.save(any(), eq(refreshedEdr))).thenReturn(StoreResult.success()); + when(edrLock.releaseLock(transferProcess)).thenReturn(StoreResult.success()); var result = edrService.resolveByTransferProcess(transferProcess, FORCE_REFRESH); @@ -153,8 +205,11 @@ void resolveByTransferProcess_forceRefresh() { verify(edrStore).findById(transferProcess); verify(edrStore).save(captor.capture(), eq(refreshedEdr)); verify(tokenRefreshHandler).refreshToken(eq(transferProcess), any()); + verify(edrLock).isExpired(any(), any()); + verify(edrLock).acquireLock(any(), any()); + verify(edrLock).releaseLock(transferProcess); - verifyNoMoreInteractions(edrStore, tokenRefreshHandler); + verifyNoMoreInteractions(edrStore, tokenRefreshHandler, edrLock); Assertions.assertThat(captor.getValue()).usingRecursiveComparison().ignoringFields("createdAt").isEqualTo(entry); diff --git a/edc-controlplane/edc-controlplane-postgresql-hashicorp-vault/build.gradle.kts b/edc-controlplane/edc-controlplane-postgresql-hashicorp-vault/build.gradle.kts index 288eff9fc..0d9da8e75 100644 --- a/edc-controlplane/edc-controlplane-postgresql-hashicorp-vault/build.gradle.kts +++ b/edc-controlplane/edc-controlplane-postgresql-hashicorp-vault/build.gradle.kts @@ -31,6 +31,7 @@ dependencies { runtimeOnly(project(":edc-extensions:migrations::control-plane-migration")) runtimeOnly(project(":edc-extensions:bpn-validation:business-partner-store-sql")) runtimeOnly(project(":edc-extensions:agreements:retirement-evaluation-store-sql")) + runtimeOnly(project(":edc-extensions:edr:edr-index-lock-sql")) runtimeOnly(libs.edc.vault.hashicorp) runtimeOnly(libs.bundles.edc.sqlstores) runtimeOnly(libs.edc.transaction.local) diff --git a/edc-extensions/edr/edr-index-lock-sql/build.gradle.kts b/edc-extensions/edr/edr-index-lock-sql/build.gradle.kts new file mode 100644 index 000000000..bc757ac3c --- /dev/null +++ b/edc-extensions/edr/edr-index-lock-sql/build.gradle.kts @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + `java-library` +} + + +dependencies { + + implementation(libs.edc.lib.util) + implementation(libs.edc.sql.core) + implementation(libs.edc.sql.edrindex) + implementation(libs.edc.spi.transactionspi) + implementation(libs.edc.spi.transaction.datasource) + implementation(libs.edc.spi.edrstore) + api(libs.edc.spi.edrstore) + + + implementation(project(":spi:core-spi")) + implementation(project(":spi:edr-spi")) + + testImplementation(testFixtures(project(":spi:edr-spi"))) + testImplementation(testFixtures(libs.edc.core.sql)) + testImplementation(testFixtures(libs.edc.junit)) +} diff --git a/edc-extensions/edr/edr-index-lock-sql/src/main/java/org/eclipse/tractusx/edc/edr/index/sql/lock/EdrLockStatements.java b/edc-extensions/edr/edr-index-lock-sql/src/main/java/org/eclipse/tractusx/edc/edr/index/sql/lock/EdrLockStatements.java new file mode 100644 index 000000000..dda76574d --- /dev/null +++ b/edc-extensions/edr/edr-index-lock-sql/src/main/java/org/eclipse/tractusx/edc/edr/index/sql/lock/EdrLockStatements.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.eclipse.tractusx.edc.edr.index.sql.lock; + + +public interface EdrLockStatements { + + String getSelectForUpdateTemplate(); + + String getEdrEntryTableName(); + + String getCreatedAtColumn(); + + String getAssetIdColumn(); + + String getTransferProcessIdColumn(); + + String getAgreementIdColumn(); + + String getProviderIdColumn(); + + String getContractNegotiationIdColumn(); +} diff --git a/edc-extensions/edr/edr-index-lock-sql/src/main/java/org/eclipse/tractusx/edc/edr/index/sql/lock/PostgresEdrLockStatements.java b/edc-extensions/edr/edr-index-lock-sql/src/main/java/org/eclipse/tractusx/edc/edr/index/sql/lock/PostgresEdrLockStatements.java new file mode 100644 index 000000000..d05ab8201 --- /dev/null +++ b/edc-extensions/edr/edr-index-lock-sql/src/main/java/org/eclipse/tractusx/edc/edr/index/sql/lock/PostgresEdrLockStatements.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.eclipse.tractusx.edc.edr.index.sql.lock; + + +import org.eclipse.edc.edr.store.index.sql.schema.EndpointDataReferenceEntryStatements; +import org.eclipse.edc.edr.store.index.sql.schema.postgres.PostgresDialectStatements; + +import static java.lang.String.format; + + +public class PostgresEdrLockStatements implements EdrLockStatements { + + private final EndpointDataReferenceEntryStatements edrEntryStatements; + + PostgresEdrLockStatements() { + edrEntryStatements = new PostgresDialectStatements(); + } + + @Override + public String getSelectForUpdateTemplate() { + return format("SELECT * FROM %s WHERE %s = ? FOR UPDATE;", edrEntryStatements.getEdrEntryTable(), edrEntryStatements.getTransferProcessIdColumn()); + } + + @Override + public String getCreatedAtColumn() { + return edrEntryStatements.getCreatedAtColumn(); + } + + @Override + public String getAssetIdColumn() { + return edrEntryStatements.getAssetIdColumn(); + } + + @Override + public String getTransferProcessIdColumn() { + return edrEntryStatements.getTransferProcessIdColumn(); + } + + @Override + public String getAgreementIdColumn() { + return edrEntryStatements.getAgreementIdColumn(); + } + + @Override + public String getProviderIdColumn() { + return edrEntryStatements.getProviderIdColumn(); + } + + @Override + public String getContractNegotiationIdColumn() { + return edrEntryStatements.getContractNegotiationIdColumn(); + } + + @Override + public String getEdrEntryTableName() { + return edrEntryStatements.getEdrEntryTable(); + } + +} diff --git a/edc-extensions/edr/edr-index-lock-sql/src/main/java/org/eclipse/tractusx/edc/edr/index/sql/lock/SqlEdrLock.java b/edc-extensions/edr/edr-index-lock-sql/src/main/java/org/eclipse/tractusx/edc/edr/index/sql/lock/SqlEdrLock.java new file mode 100644 index 000000000..584db75d4 --- /dev/null +++ b/edc-extensions/edr/edr-index-lock-sql/src/main/java/org/eclipse/tractusx/edc/edr/index/sql/lock/SqlEdrLock.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.eclipse.tractusx.edc.edr.index.sql.lock; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.edc.edr.spi.types.EndpointDataReferenceEntry; +import org.eclipse.edc.spi.persistence.EdcPersistenceException; +import org.eclipse.edc.spi.result.StoreResult; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.sql.QueryExecutor; +import org.eclipse.edc.sql.store.AbstractSqlStore; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; +import org.eclipse.edc.transaction.spi.TransactionContext; +import org.eclipse.tractusx.edc.edr.spi.index.lock.EndpointDataReferenceLock; + +import java.sql.ResultSet; +import java.sql.SQLException; + +public class SqlEdrLock extends AbstractSqlStore implements EndpointDataReferenceLock { + private final EdrLockStatements statements; + + public SqlEdrLock(DataSourceRegistry dataSourceRegistry, String dataSourceName, TransactionContext transactionContext, + ObjectMapper objectMapper, QueryExecutor queryExecutor, EdrLockStatements statements) { + super(dataSourceRegistry, dataSourceName, transactionContext, objectMapper, queryExecutor); + this.statements = statements; + } + + @Override + public StoreResult acquireLock(String edrId, DataAddress edr) { + return transactionContext.execute(() -> { + try (var connection = getConnection()) { + var sql = statements.getSelectForUpdateTemplate(); + // this blocks until Postgres can acquire the row-level lock + var edrEntry = queryExecutor.single(connection, false, this::mapEdr, sql, edrId); + + return StoreResult.success(isExpired(edr, edrEntry)); + + } catch (SQLException e) { + throw new EdcPersistenceException(e); + } + }); + } + + @Override + public StoreResult releaseLock(String edrId) { + return StoreResult.success(); + } + + private EndpointDataReferenceEntry mapEdr(ResultSet resultSet) throws SQLException { + return EndpointDataReferenceEntry.Builder.newInstance() + .createdAt(resultSet.getLong(statements.getCreatedAtColumn())) + .assetId(resultSet.getString(statements.getAssetIdColumn())) + .transferProcessId(resultSet.getString(statements.getTransferProcessIdColumn())) + .agreementId(resultSet.getString(statements.getAgreementIdColumn())) + .providerId(resultSet.getString(statements.getProviderIdColumn())) + .contractNegotiationId(resultSet.getString(statements.getContractNegotiationIdColumn())) + .build(); + } + +} diff --git a/edc-extensions/edr/edr-index-lock-sql/src/main/java/org/eclipse/tractusx/edc/edr/index/sql/lock/SqlEdrLockExtension.java b/edc-extensions/edr/edr-index-lock-sql/src/main/java/org/eclipse/tractusx/edc/edr/index/sql/lock/SqlEdrLockExtension.java new file mode 100644 index 000000000..6780cd228 --- /dev/null +++ b/edc-extensions/edr/edr-index-lock-sql/src/main/java/org/eclipse/tractusx/edc/edr/index/sql/lock/SqlEdrLockExtension.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.eclipse.tractusx.edc.edr.index.sql.lock; + + +import org.eclipse.edc.edr.spi.store.EndpointDataReferenceEntryIndex; +import org.eclipse.edc.runtime.metamodel.annotation.Extension; +import org.eclipse.edc.runtime.metamodel.annotation.Inject; +import org.eclipse.edc.runtime.metamodel.annotation.Provides; +import org.eclipse.edc.runtime.metamodel.annotation.Setting; +import org.eclipse.edc.spi.system.ServiceExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.eclipse.edc.spi.types.TypeManager; +import org.eclipse.edc.sql.QueryExecutor; +import org.eclipse.edc.sql.configuration.DataSourceName; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; +import org.eclipse.edc.transaction.spi.TransactionContext; +import org.eclipse.tractusx.edc.edr.spi.index.lock.EndpointDataReferenceLock; + +@Provides({ EndpointDataReferenceEntryIndex.class, EndpointDataReferenceLock.class }) +@Extension(value = "Database-level EDR Lock extension (PostgreSQL)") +public class SqlEdrLockExtension implements ServiceExtension { + + + @Deprecated(since = "0.8.1") + public static final String DATASOURCE_SETTING_NAME = "edc.datasource.edr.name"; + + @Setting(value = "The datasource to be used", defaultValue = DataSourceRegistry.DEFAULT_DATASOURCE) + public static final String DATASOURCE_NAME = "edc.sql.store.edr.datasource"; + + @Inject + private DataSourceRegistry dataSourceRegistry; + + @Inject + private TransactionContext transactionContext; + + + @Inject + private QueryExecutor queryExecutor; + + @Inject + private TypeManager typeManager; + + + @Override + public void initialize(ServiceExtensionContext context) { + var dataSourceName = DataSourceName.getDataSourceName(DATASOURCE_NAME, DATASOURCE_SETTING_NAME, context.getConfig(), context.getMonitor()); + + var statements = new PostgresEdrLockStatements(); + var sqlStore = new SqlEdrLock(dataSourceRegistry, dataSourceName, transactionContext, typeManager.getMapper(), queryExecutor, statements); + + context.registerService(EndpointDataReferenceLock.class, sqlStore); + } + + +} diff --git a/edc-extensions/edr/edr-index-lock-sql/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension b/edc-extensions/edr/edr-index-lock-sql/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension new file mode 100644 index 000000000..1349fbb9e --- /dev/null +++ b/edc-extensions/edr/edr-index-lock-sql/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension @@ -0,0 +1,20 @@ +################################################################################# +# Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License, Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0. +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# SPDX-License-Identifier: Apache-2.0 +################################################################################# + +org.eclipse.tractusx.edc.edr.index.sql.lock.SqlEdrLockExtension diff --git a/edc-extensions/edr/edr-index-lock-sql/src/test/java/org/eclipse/tractusx/edc/edr/index/sql/lock/SqlEdrLockTest.java b/edc-extensions/edr/edr-index-lock-sql/src/test/java/org/eclipse/tractusx/edc/edr/index/sql/lock/SqlEdrLockTest.java new file mode 100644 index 000000000..5293fae0b --- /dev/null +++ b/edc-extensions/edr/edr-index-lock-sql/src/test/java/org/eclipse/tractusx/edc/edr/index/sql/lock/SqlEdrLockTest.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.eclipse.tractusx.edc.edr.index.sql.lock; + +import org.eclipse.edc.edr.store.index.SqlEndpointDataReferenceEntryIndex; +import org.eclipse.edc.edr.store.index.sql.schema.EndpointDataReferenceEntryStatements; +import org.eclipse.edc.edr.store.index.sql.schema.postgres.PostgresDialectStatements; +import org.eclipse.edc.json.JacksonTypeManager; +import org.eclipse.edc.junit.annotations.PostgresqlIntegrationTest; +import org.eclipse.edc.junit.testfixtures.TestUtils; +import org.eclipse.edc.spi.types.TypeManager; +import org.eclipse.edc.sql.QueryExecutor; +import org.eclipse.edc.sql.testfixtures.PostgresqlStoreSetupExtension; +import org.eclipse.tractusx.edc.edr.spi.index.lock.EndpointDataReferenceLock; +import org.eclipse.tractusx.edc.edr.spi.testfixtures.index.lock.EndpointDataReferenceLockBaseTest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.IOException; + +@PostgresqlIntegrationTest +@ExtendWith(PostgresqlStoreSetupExtension.class) +class SqlEdrLockTest extends EndpointDataReferenceLockBaseTest { + + private SqlEdrLock edrLock; + private final TypeManager typeManager = new JacksonTypeManager(); + private final PostgresEdrLockStatements statements = new PostgresEdrLockStatements(); + private final EndpointDataReferenceEntryStatements edrEntryStatements = new PostgresDialectStatements(); + + @BeforeEach + void setUp(PostgresqlStoreSetupExtension extension, QueryExecutor queryExecutor) throws IOException { + edrLock = new SqlEdrLock(extension.getDataSourceRegistry(), extension.getDatasourceName(), extension.getTransactionContext(), typeManager.getMapper(), queryExecutor, statements); + var entryIndexStore = new SqlEndpointDataReferenceEntryIndex(extension.getDataSourceRegistry(), extension.getDatasourceName(), extension.getTransactionContext(), typeManager.getMapper(), edrEntryStatements, queryExecutor); + var schema = TestUtils.getResourceFileContentAsString("schema.sql"); + extension.runQuery(schema); + + entryIndexStore.save(edrEntry("mock", ACQUIRE_LOCK_TP)); + + } + + @AfterEach + void tearDown(PostgresqlStoreSetupExtension extension) { + extension.runQuery("DROP TABLE " + statements.getEdrEntryTableName() + " CASCADE"); + } + + @Override + protected EndpointDataReferenceLock getStore() { + return edrLock; + } +} \ No newline at end of file diff --git a/edc-extensions/edr/edr-index-lock-sql/src/test/resources/schema.sql b/edc-extensions/edr/edr-index-lock-sql/src/test/resources/schema.sql new file mode 100644 index 000000000..a400c505d --- /dev/null +++ b/edc-extensions/edr/edr-index-lock-sql/src/test/resources/schema.sql @@ -0,0 +1,28 @@ +-- +-- Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft +-- +-- See the NOTICE file(s) distributed with this work for additional +-- information regarding copyright ownership. +-- +-- This program and the accompanying materials are made available under the +-- terms of the Apache License, Version 2.0 which is available at +-- https://www.apache.org/licenses/LICENSE-2.0. +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +-- WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +-- License for the specific language governing permissions and limitations +-- under the License. +-- +-- SPDX-License-Identifier: Apache-2.0 +-- + +CREATE TABLE IF NOT EXISTS edc_edr_entry +( + transfer_process_id VARCHAR NOT NULL PRIMARY KEY, + agreement_id VARCHAR NOT NULL, + asset_id VARCHAR NOT NULL, + provider_id VARCHAR NOT NULL, + contract_negotiation_id VARCHAR, + created_at BIGINT NOT NULL +); diff --git a/edc-tests/edc-controlplane/edr-api-tests/src/test/java/org/eclipse/tractusx/edc/tests/edrv2/EdrCacheApiEndToEndTest.java b/edc-tests/edc-controlplane/edr-api-tests/src/test/java/org/eclipse/tractusx/edc/tests/edrv2/EdrCacheApiEndToEndTest.java index 41097357e..bb7c661e2 100644 --- a/edc-tests/edc-controlplane/edr-api-tests/src/test/java/org/eclipse/tractusx/edc/tests/edrv2/EdrCacheApiEndToEndTest.java +++ b/edc-tests/edc-controlplane/edr-api-tests/src/test/java/org/eclipse/tractusx/edc/tests/edrv2/EdrCacheApiEndToEndTest.java @@ -35,17 +35,22 @@ import org.eclipse.edc.edr.spi.store.EndpointDataReferenceStore; import org.eclipse.edc.edr.spi.types.EndpointDataReferenceEntry; import org.eclipse.edc.junit.annotations.EndToEndTest; +import org.eclipse.edc.junit.annotations.PostgresqlIntegrationTest; import org.eclipse.edc.junit.extensions.RuntimeExtension; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.tractusx.edc.spi.tokenrefresh.dataplane.model.TokenResponse; import org.eclipse.tractusx.edc.tests.participant.TransferParticipant; +import org.eclipse.tractusx.edc.tests.runtimes.PostgresExtension; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.mockserver.client.MockServerClient; import org.mockserver.integration.ClientAndServer; +import org.mockserver.model.Delay; import org.mockserver.verify.VerificationTimes; import java.time.Clock; @@ -53,6 +58,10 @@ import java.time.ZoneId; import java.util.HashMap; import java.util.Map; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE; @@ -64,6 +73,9 @@ import static org.eclipse.tractusx.edc.tests.TestRuntimeConfiguration.CONSUMER_BPN; import static org.eclipse.tractusx.edc.tests.TestRuntimeConfiguration.CONSUMER_NAME; import static org.eclipse.tractusx.edc.tests.runtimes.Runtimes.memoryRuntime; +import static org.eclipse.tractusx.edc.tests.runtimes.Runtimes.pgRuntime; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; import static org.mockserver.integration.ClientAndServer.startClientAndServer; import static org.mockserver.matchers.Times.exactly; import static org.mockserver.model.HttpRequest.request; @@ -81,13 +93,7 @@ public class EdrCacheApiEndToEndTest { .name(CONSUMER_NAME) .id(CONSUMER_BPN) .build(); - @RegisterExtension - protected static final RuntimeExtension CONSUMER_RUNTIME = memoryRuntime(CONSUMER.getName(), CONSUMER.getId(), with(CONSUMER.getConfiguration(), Map.of("edc.iam.issuer.id", "did:web:consumer"))); - private final ObjectMapper mapper = new ObjectMapper(); - private String refreshEndpoint; - private String refreshAudience; - private ClientAndServer mockedRefreshApi; - private ECKey providerSigningKey; + protected static Map with(Map original, Map additional) { var newMap = new HashMap<>(original); @@ -95,243 +101,349 @@ protected static Map with(Map original, Map { + var wait = random.nextInt(1, jitter); + try { + Thread.sleep(wait); + new Thread(() -> { + var edrNumber = random.nextInt(1, 3); + try { + var tr = CONSUMER.edrs().getEdrWithRefresh("test-id-%s".formatted(edrNumber), true) + .assertThat() + .log().ifValidationFails() + .statusCode(anyOf(equalTo(200), equalTo(409))) + .extract().asString(); + + assertThat(tr).contains(accessToken); + } catch (AssertionError e) { + failed.set(true); + } finally { + latch.countDown(); + } + + + }).start(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + latch.await(); + assertThat(failed.get()).isFalse(); + + client.verify(request() + .withQueryStringParameter("grant_type", "refresh_token") + .withMethod("POST") + .withPath("/refresh/token"), VerificationTimes.exactly(2)); + + } } - } - @DisplayName("Verify the refresh endpoint is not called when auto_refresh=false") - @Test - void getEdrWithRefresh_whenNotAutorefresh_shouldNotCallEndpoint() { - - try (var client = new MockServerClient("localhost", mockedRefreshApi.getPort())) { - // mock the provider dataplane's refresh endpoint - - storeEdr("test-id", true); - var edr = CONSUMER.edrs() - .getEdrWithRefresh("test-id", false) - .statusCode(200) - .extract().body().as(JsonObject.class); - assertThat(edr).isNotNull(); - - // assert the correct endpoint was called - client.verify( - request() - .withQueryStringParameter("grant_type", "refresh_token") - .withMethod("POST") - .withPath("/refresh/token"), - VerificationTimes.never()); + @DisplayName("Verify the refresh endpoint is not called when token not yet expired") + @Test + void getEdrWithRefresh_notExpired_shouldNotCallEndpoint() { + + try (var client = new MockServerClient("localhost", mockedRefreshApi.getPort())) { + // mock the provider dataplane's refresh endpoint + + storeEdr("test-id", false); + var edr = CONSUMER.edrs().getEdrWithRefresh("test-id", true) + .statusCode(200) + .extract().body().as(JsonObject.class); + assertThat(edr).isNotNull(); + + // assert the correct endpoint was called + client.verify( + request() + .withQueryStringParameter("grant_type", "refresh_token") + .withMethod("POST") + .withPath("/refresh/token"), + VerificationTimes.never()); + } } - } - @DisplayName("Verify HTTP 403 response when refreshing the token is not allowed") - @Test - void getEdrWithRefresh_unauthorized() { - - try (var client = new MockServerClient("localhost", mockedRefreshApi.getPort())) { - // mock the provider dataplane's refresh endpoint - client.when(request() - .withMethod("POST") - .withPath("/refresh/token") - .withBody(exact("")), - exactly(1)) - .respond(response() - .withStatusCode(401) - .withBody("unauthorized") - ); - - storeEdr("test-id", true); - CONSUMER.edrs().getEdrWithRefresh("test-id", true) - .statusCode(403); - - // assert the correct endpoint was called - client.verify( - request() - .withQueryStringParameter("grant_type", "refresh_token") - .withMethod("POST") - .withPath("/refresh/token"), - VerificationTimes.exactly(1)); + @DisplayName("Verify the refresh endpoint is not called when auto_refresh=false") + @Test + void getEdrWithRefresh_whenNotAutorefresh_shouldNotCallEndpoint() { + + try (var client = new MockServerClient("localhost", mockedRefreshApi.getPort())) { + // mock the provider dataplane's refresh endpoint + + storeEdr("test-id", true); + var edr = CONSUMER.edrs() + .getEdrWithRefresh("test-id", false) + .statusCode(200) + .extract().body().as(JsonObject.class); + assertThat(edr).isNotNull(); + + // assert the correct endpoint was called + client.verify( + request() + .withQueryStringParameter("grant_type", "refresh_token") + .withMethod("POST") + .withPath("/refresh/token"), + VerificationTimes.never()); + } } - } - @Test - void refreshEdr() { - try (var client = new MockServerClient("localhost", mockedRefreshApi.getPort())) { - // mock the provider dataplane's refresh endpoint - client.when(request() - .withMethod("POST") - .withPath("/refresh/token") - .withBody(exact("")), - exactly(1)) - .respond(response() - .withStatusCode(200) - .withBody(tokenResponseBody()) - ); - - storeEdr("test-id", true); - var edr = CONSUMER.edrs().refreshEdr("test-id") - .statusCode(200) - .extract().body().as(JsonObject.class); - assertThat(edr).isNotNull(); - - // assert the correct endpoint was called - client.verify( - request() - .withQueryStringParameter("grant_type", "refresh_token") - .withMethod("POST") - .withPath("/refresh/token"), - VerificationTimes.exactly(1)); + @DisplayName("Verify HTTP 403 response when refreshing the token is not allowed") + @Test + void getEdrWithRefresh_unauthorized() { + + try (var client = new MockServerClient("localhost", mockedRefreshApi.getPort())) { + // mock the provider dataplane's refresh endpoint + client.when(request() + .withMethod("POST") + .withPath("/refresh/token") + .withBody(exact("")), + exactly(1)) + .respond(response() + .withStatusCode(401) + .withBody("unauthorized") + ); + + storeEdr("test-id", true); + CONSUMER.edrs().getEdrWithRefresh("test-id", true) + .statusCode(403); + + // assert the correct endpoint was called + client.verify( + request() + .withQueryStringParameter("grant_type", "refresh_token") + .withMethod("POST") + .withPath("/refresh/token"), + VerificationTimes.exactly(1)); + } + } + @Test + void refreshEdr() { + try (var client = new MockServerClient("localhost", mockedRefreshApi.getPort())) { + // mock the provider dataplane's refresh endpoint + client.when(request() + .withMethod("POST") + .withPath("/refresh/token") + .withBody(exact("")), + exactly(1)) + .respond(response() + .withStatusCode(200) + .withBody(tokenResponseBody()) + ); + + storeEdr("test-id", true); + var edr = CONSUMER.edrs().refreshEdr("test-id") + .statusCode(200) + .extract().body().as(JsonObject.class); + assertThat(edr).isNotNull(); + + // assert the correct endpoint was called + client.verify( + request() + .withQueryStringParameter("grant_type", "refresh_token") + .withMethod("POST") + .withPath("/refresh/token"), + VerificationTimes.exactly(1)); + + } } - } - @Test - void refreshEdr_whenNotFound() { - var edr = CONSUMER.edrs().refreshEdr("does-not-exist") - .statusCode(404); - } + @Test + void refreshEdr_whenNotFound() { + CONSUMER.edrs().refreshEdr("does-not-exist") + .statusCode(404); + } - @Test - void refreshEdr_whenNotAuthorized() { - try (var client = new MockServerClient("localhost", mockedRefreshApi.getPort())) { - // mock the provider dataplane's refresh endpoint - client.when(request() - .withMethod("POST") - .withPath("/refresh/token") - .withBody(exact("")), - exactly(1)) - .respond(response() - .withStatusCode(401) - .withBody("unauthorized") - ); - - storeEdr("test-id", true); - CONSUMER.edrs().refreshEdr("test-id") - .statusCode(403); - - // assert the correct endpoint was called - client.verify( - request() - .withQueryStringParameter("grant_type", "refresh_token") - .withMethod("POST") - .withPath("/refresh/token"), - VerificationTimes.exactly(1)); + @Test + void refreshEdr_whenNotAuthorized() { + try (var client = new MockServerClient("localhost", mockedRefreshApi.getPort())) { + // mock the provider dataplane's refresh endpoint + client.when(request() + .withMethod("POST") + .withPath("/refresh/token") + .withBody(exact("")), + exactly(1)) + .respond(response() + .withStatusCode(401) + .withBody("unauthorized") + ); + + storeEdr("test-id", true); + CONSUMER.edrs().refreshEdr("test-id") + .statusCode(403); + + // assert the correct endpoint was called + client.verify( + request() + .withQueryStringParameter("grant_type", "refresh_token") + .withMethod("POST") + .withPath("/refresh/token"), + VerificationTimes.exactly(1)); + } } - } - private String tokenResponseBody() { - var claims = new JWTClaimsSet.Builder().claim("iss", "did:web:provider").build(); + protected abstract EndpointDataReferenceStore getStore(); - var accessToken = createJwt(providerSigningKey, claims); - var refreshToken = createJwt(providerSigningKey, new JWTClaimsSet.Builder().build()); - var response = new TokenResponse(accessToken, refreshToken, 300L, "bearer"); - try { - return mapper.writeValueAsString(response); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); + private String tokenResponseBody() { + var claims = new JWTClaimsSet.Builder().claim("iss", "did:web:provider").build(); + var accessToken = createJwt(providerSigningKey, claims); + var refreshToken = createJwt(providerSigningKey, new JWTClaimsSet.Builder().build()); + return tokenResponseBody(accessToken, refreshToken); + } + + private String tokenResponseBody(String accessToken, String refreshToken) { + var response = new TokenResponse(accessToken, refreshToken, 300L, "bearer"); + try { + return mapper.writeValueAsString(response); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + private void storeEdr(String transferProcessId, boolean isExpired) { + var claims = new JWTClaimsSet.Builder().claim("iss", "did:web:provider").build(); + var store = getStore(); + var edr = DataAddress.Builder.newInstance() + .type("test-type") + .property(EDC_NAMESPACE + "authorization", createJwt(providerSigningKey, claims)) + .property(EDC_NAMESPACE + "authType", "bearer") + .property(EDR_PROPERTY_REFRESH_TOKEN, createJwt(providerSigningKey, new JWTClaimsSet.Builder().build())) + .property(EDR_PROPERTY_EXPIRES_IN, "300") + .property(EDR_PROPERTY_REFRESH_ENDPOINT, refreshEndpoint) + .property(EDR_PROPERTY_REFRESH_AUDIENCE, refreshAudience) + .build(); + var entry = EndpointDataReferenceEntry.Builder.newInstance() + .clock(isExpired ? // defaults to an expired token + Clock.fixed(Instant.now().minusSeconds(3600), ZoneId.systemDefault()) : + Clock.systemUTC()) + .agreementId("test-agreement") + .assetId("test-asset") + .transferProcessId(transferProcessId) + .providerId("test-provider") + .contractNegotiationId("test-negotiation") + .build(); + store.save(entry, edr).orElseThrow(f -> new AssertionError(f.getFailureDetail())); + } + + private String createJwt(ECKey signerKey, JWTClaimsSet claims) { + var header = new JWSHeader.Builder(JWSAlgorithm.ES256).keyID(signerKey.getKeyID()).build(); + var jwt = new SignedJWT(header, claims); + try { + jwt.sign(new ECDSASigner(signerKey)); + return jwt.serialize(); + } catch (JOSEException e) { + throw new RuntimeException(e); + } } - } - private void storeEdr(String transferProcessId, boolean isExpired) { - var claims = new JWTClaimsSet.Builder().claim("iss", "did:web:provider").build(); - var store = CONSUMER_RUNTIME.getService(EndpointDataReferenceStore.class); - var edr = DataAddress.Builder.newInstance() - .type("test-type") - .property(EDC_NAMESPACE + "authorization", createJwt(providerSigningKey, claims)) - .property(EDC_NAMESPACE + "authType", "bearer") - .property(EDR_PROPERTY_REFRESH_TOKEN, createJwt(providerSigningKey, new JWTClaimsSet.Builder().build())) - .property(EDR_PROPERTY_EXPIRES_IN, "300") - .property(EDR_PROPERTY_REFRESH_ENDPOINT, refreshEndpoint) - .property(EDR_PROPERTY_REFRESH_AUDIENCE, refreshAudience) - .build(); - var entry = EndpointDataReferenceEntry.Builder.newInstance() - .clock(isExpired ? // defaults to an expired token - Clock.fixed(Instant.now().minusSeconds(3600), ZoneId.systemDefault()) : - Clock.systemUTC()) - .agreementId("test-agreement") - .assetId("test-asset") - .transferProcessId(transferProcessId) - .providerId("test-provider") - .contractNegotiationId("test-negotiation") - .build(); - store.save(entry, edr).orElseThrow(f -> new AssertionError(f.getFailureDetail())); } + @Nested + @EndToEndTest + class InMemory extends Tests { - private String createJwt(ECKey signerKey, JWTClaimsSet claims) { - var header = new JWSHeader.Builder(JWSAlgorithm.ES256).keyID(signerKey.getKeyID()).build(); - var jwt = new SignedJWT(header, claims); - try { - jwt.sign(new ECDSASigner(signerKey)); - return jwt.serialize(); - } catch (JOSEException e) { - throw new RuntimeException(e); + @RegisterExtension + protected static final RuntimeExtension CONSUMER_RUNTIME = memoryRuntime(CONSUMER.getName(), CONSUMER.getId(), with(CONSUMER.getConfiguration(), Map.of("edc.iam.issuer.id", "did:web:consumer"))); + + @Override + protected EndpointDataReferenceStore getStore() { + return CONSUMER_RUNTIME.getService(EndpointDataReferenceStore.class); } } + @Nested + @PostgresqlIntegrationTest + class Postgres extends Tests { + + @RegisterExtension + @Order(0) + private static final PostgresExtension POSTGRES = new PostgresExtension(CONSUMER.getName()); + + @RegisterExtension + protected static final RuntimeExtension CONSUMER_RUNTIME = pgRuntime(CONSUMER, POSTGRES); + + + @Override + protected EndpointDataReferenceStore getStore() { + return CONSUMER_RUNTIME.getService(EndpointDataReferenceStore.class); + } + } } diff --git a/settings.gradle.kts b/settings.gradle.kts index 8f746a301..9408bbb4b 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -47,6 +47,7 @@ include(":edc-extensions:provision-additional-headers") include(":edc-extensions:federated-catalog") include(":edc-extensions:edr:edr-api-v2") include(":edc-extensions:edr:edr-callback") +include("edc-extensions:edr:edr-index-lock-sql") include(":edc-extensions:cx-policy") include(":edc-extensions:dcp:tx-dcp") include(":edc-extensions:dcp:tx-dcp-sts-dim") diff --git a/spi/edr-spi/build.gradle.kts b/spi/edr-spi/build.gradle.kts index 0096d50ca..9a5030cdd 100644 --- a/spi/edr-spi/build.gradle.kts +++ b/spi/edr-spi/build.gradle.kts @@ -28,10 +28,12 @@ dependencies { implementation(libs.edc.spi.contract) implementation(libs.edc.spi.edrstore) + testFixturesImplementation(project(":spi:core-spi")) testFixturesImplementation(libs.edc.junit) testFixturesImplementation(libs.junit.jupiter.api) testFixturesImplementation(libs.assertj) testFixturesImplementation(libs.awaitility) + testFixturesImplementation(libs.edc.spi.edrstore) } diff --git a/spi/edr-spi/src/main/java/org/eclipse/tractusx/edc/edr/spi/index/lock/EndpointDataReferenceLock.java b/spi/edr-spi/src/main/java/org/eclipse/tractusx/edc/edr/spi/index/lock/EndpointDataReferenceLock.java new file mode 100644 index 000000000..53e22d2a7 --- /dev/null +++ b/spi/edr-spi/src/main/java/org/eclipse/tractusx/edc/edr/spi/index/lock/EndpointDataReferenceLock.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.eclipse.tractusx.edc.edr.spi.index.lock; + +import org.eclipse.edc.edr.spi.types.EndpointDataReferenceEntry; +import org.eclipse.edc.spi.result.StoreResult; +import org.eclipse.edc.spi.types.domain.DataAddress; + +import java.time.Instant; + +import static org.eclipse.tractusx.edc.edr.spi.CoreConstants.EDR_PROPERTY_EXPIRES_IN; + +public interface EndpointDataReferenceLock { + StoreResult acquireLock(String edrId, DataAddress edr); + + StoreResult releaseLock(String edrId); + + default boolean isExpired(DataAddress edr, EndpointDataReferenceEntry edrEntry) { + var expiresInString = edr.getStringProperty(EDR_PROPERTY_EXPIRES_IN); + if (expiresInString == null) { + return false; + } + var expiresIn = Long.parseLong(expiresInString); + var expiresAt = edrEntry.getCreatedAt() / 1000L + expiresIn; + var expiresAtInstant = Instant.ofEpochSecond(expiresAt); + + return expiresAtInstant.isBefore(Instant.now()); + } +} diff --git a/spi/edr-spi/src/testFixtures/java/org/eclipse/tractusx/edc/edr/spi/testfixtures/index/lock/EndpointDataReferenceLockBaseTest.java b/spi/edr-spi/src/testFixtures/java/org/eclipse/tractusx/edc/edr/spi/testfixtures/index/lock/EndpointDataReferenceLockBaseTest.java new file mode 100644 index 000000000..162820d0f --- /dev/null +++ b/spi/edr-spi/src/testFixtures/java/org/eclipse/tractusx/edc/edr/spi/testfixtures/index/lock/EndpointDataReferenceLockBaseTest.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.eclipse.tractusx.edc.edr.spi.testfixtures.index.lock; + +import org.eclipse.edc.edr.spi.types.EndpointDataReferenceEntry; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.tractusx.edc.edr.spi.CoreConstants; +import org.eclipse.tractusx.edc.edr.spi.index.lock.EndpointDataReferenceLock; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; + + +public abstract class EndpointDataReferenceLockBaseTest { + + protected static final String ACQUIRE_LOCK_TP = "acquire-lock-tp"; + + protected abstract EndpointDataReferenceLock getStore(); + + @Test + @DisplayName("Verify acquireLock returns true when expired and leaves with lock") + void verify_acquireLockReturnsTrueWhenExpired() { + + var result = getStore().acquireLock(ACQUIRE_LOCK_TP, edr("-1000")); + + assertThat(result).isSucceeded(); + assertThat(result.getContent()).isTrue(); + assertThat(getStore().releaseLock(ACQUIRE_LOCK_TP)).isSucceeded(); + } + + @Test + @DisplayName("Verify acquireLock returns false when not expired and leaves with lock") + void verify_acquireLockReturnsFalseWhenNotExpired() { + + var edr = edr("2000"); + + var result = getStore().acquireLock(ACQUIRE_LOCK_TP, edr); + + assertThat(result).isSucceeded(); + assertThat(result.getContent()).isFalse(); + assertThat(getStore().releaseLock(ACQUIRE_LOCK_TP)).isSucceeded(); + + } + + @Test + @DisplayName("Verify release lock returns true when release") + void verify_releaseLockReturnsSuccessWhenReleased() { + + getStore().acquireLock(ACQUIRE_LOCK_TP, edr("2000")); + + var result = getStore().releaseLock(ACQUIRE_LOCK_TP); + assertThat(result).isSucceeded(); + + } + + @Test + @DisplayName("Verify isExpired Returns true when expired") + void verify_isExpiredReturnTrueWhenExpired() { + var result = getStore().isExpired(edr("-1000"), edrEntry("mock", "mock")); + assertThat(result).isTrue(); + } + + @Test + @DisplayName("Verify isExpired Returns false when not expired") + void verify_isExpiredReturnFalseWhenNotExpired() { + var result = getStore().isExpired(edr("1000"), edrEntry("mock", "mock")); + assertThat(result).isFalse(); + } + + protected DataAddress edr(String expireIn) { + return DataAddress.Builder.newInstance().type("test").property(CoreConstants.TX_AUTH_NS + "expiresIn", expireIn).build(); + } + + protected EndpointDataReferenceEntry edrEntry(String assetId, String transferProcessId) { + return EndpointDataReferenceEntry.Builder.newInstance() + .assetId(assetId) + .transferProcessId(transferProcessId) + .contractNegotiationId(UUID.randomUUID().toString()) + .agreementId(UUID.randomUUID().toString()) + .providerId(UUID.randomUUID().toString()) + .build(); + } + +} \ No newline at end of file