Skip to content

Commit

Permalink
feat(EdrCache): add transactional test
Browse files Browse the repository at this point in the history
  • Loading branch information
wolf4ood committed May 24, 2023
1 parent 2df8cbd commit 6f19756
Show file tree
Hide file tree
Showing 7 changed files with 352 additions and 34 deletions.
5 changes: 4 additions & 1 deletion edc-extensions/edr-cache-sql/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ dependencies {
implementation(libs.edc.spi.transactionspi)
implementation(libs.edc.spi.transaction.datasource)

testImplementation(libs.edc.transaction.local)

testImplementation(testFixtures(project(":spi:edr-cache-spi")))
testImplementation(testFixtures(libs.edc.core.sql))

testImplementation(testFixtures(libs.edc.junit))

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@

public class SqlEndpointDataReferenceCache extends AbstractSqlStore implements EndpointDataReferenceCache {

public static final String VAULT_PREFIX = "edr:";
private final EdrStatements statements;

private final Clock clock;

private final Vault vault;


Expand Down Expand Up @@ -108,7 +107,7 @@ public void save(EndpointDataReferenceEntry entry, EndpointDataReference edr) {
var sql = statements.getInsertTemplate();
var createdAt = clock.millis();
executeQuery(connection, sql, entry.getTransferProcessId(), entry.getAssetId(), entry.getAgreementId(), edr.getId(), createdAt, createdAt);
vault.storeSecret(edr.getId(), toJson(edr)).orElseThrow((failure) -> new EdcPersistenceException(failure.getFailureDetail()));
vault.storeSecret(VAULT_PREFIX + edr.getId(), toJson(edr)).orElseThrow((failure) -> new EdcPersistenceException(failure.getFailureDetail()));
} catch (Exception exception) {
throw new EdcPersistenceException(exception);
}
Expand All @@ -119,11 +118,11 @@ public void save(EndpointDataReferenceEntry entry, EndpointDataReference edr) {
public StoreResult<EndpointDataReferenceEntry> deleteByTransferProcessId(String id) {
return transactionContext.execute(() -> {
try (var connection = getConnection()) {
var entry = findById(connection, id, this::mapResultSet);
if (entry != null) {
var entryWrapper = findById(connection, id, this::mapToWrapper);
if (entryWrapper != null) {
executeQuery(connection, statements.getDeleteByIdTemplate(), id);
vault.deleteSecret(id).orElseThrow((failure) -> new EdcPersistenceException(failure.getFailureDetail()));
return StoreResult.success(entry);
vault.deleteSecret(VAULT_PREFIX + entryWrapper.getEdrId()).orElseThrow((failure) -> new EdcPersistenceException(failure.getFailureDetail()));
return StoreResult.success(entryWrapper.getEntry());
} else {
return StoreResult.notFound(format("EDR with id %s not found", id));
}
Expand All @@ -146,6 +145,10 @@ private String mapToEdrId(ResultSet resultSet) throws SQLException {
return resultSet.getString(statements.getEdrId());
}

private EndpointDataReferenceEntryWrapper mapToWrapper(ResultSet resultSet) throws SQLException {
return new EndpointDataReferenceEntryWrapper(mapResultSet(resultSet), mapToEdrId(resultSet));
}

private EndpointDataReference referenceFromEntry(String edrId) {
var edr = vault.resolveSecret(edrId);
if (edr != null) {
Expand All @@ -163,4 +166,22 @@ private QuerySpec queryFor(String field, String value) {

return QuerySpec.Builder.newInstance().filter(filter).build();
}

private static class EndpointDataReferenceEntryWrapper {
private final EndpointDataReferenceEntry entry;
private final String edrId;

private EndpointDataReferenceEntryWrapper(EndpointDataReferenceEntry entry, String edrId) {
this.entry = Objects.requireNonNull(entry);
this.edrId = Objects.requireNonNull(edrId);
}

public EndpointDataReferenceEntry getEntry() {
return entry;
}

public String getEdrId() {
return edrId;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (c) 2022 Microsoft Corporation
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Microsoft Corporation - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.edr.store.sql;

import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.sql.testfixtures.PostgresqlLocalInstance;
import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry;
import org.eclipse.edc.transaction.local.LocalDataSourceRegistry;
import org.eclipse.edc.transaction.local.LocalTransactionContext;
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolutionException;
import org.junit.jupiter.api.extension.ParameterResolver;

import java.sql.Connection;
import java.util.UUID;
import javax.sql.DataSource;

import static org.eclipse.edc.sql.SqlQueryExecutor.executeQuery;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

/**
* Extension for running PG SQL store implementation. It automatically creates a test database and provided all the base data structure
* for a SQL store to run such as {@link DataSourceRegistry}, {@link TransactionContext} and data source name which is automatically generated
*/
public class PostgresqlTransactionalStoreSetupExtension implements BeforeEachCallback, AfterEachCallback, BeforeAllCallback, ParameterResolver {

private final String datasourceName;
private DataSourceRegistry dataSourceRegistry = null;
private DataSource dataSource = null;
private Connection connection = null;
private LocalTransactionContext transactionContext = null;
private Monitor monitor = mock(Monitor.class);

public PostgresqlTransactionalStoreSetupExtension(String datasourceName) {
this.datasourceName = datasourceName;
}

public PostgresqlTransactionalStoreSetupExtension() {
this(UUID.randomUUID().toString());
}


public DataSource getDataSource() {
return dataSource;
}

public String getDatasourceName() {
return datasourceName;
}

public Connection getConnection() {
return connection;
}

public int runQuery(String query) {
return transactionContext.execute(() -> executeQuery(connection, query));
}


public TransactionContext getTransactionContext() {
return transactionContext;
}

public DataSourceRegistry getDataSourceRegistry() {
return dataSourceRegistry;
}

@Override
public void beforeEach(ExtensionContext context) throws Exception {
transactionContext = new LocalTransactionContext(monitor);
dataSourceRegistry = new LocalDataSourceRegistry(transactionContext);
dataSource = mock(DataSource.class);
dataSourceRegistry.register(datasourceName, dataSource);
connection = spy(PostgresqlLocalInstance.getTestConnection());
when(dataSource.getConnection()).thenReturn(connection);
doNothing().when(connection).close();
}

@Override
public void afterEach(ExtensionContext context) throws Exception {
doCallRealMethod().when(connection).close();
connection.close();
}

@Override
public void beforeAll(ExtensionContext context) throws Exception {
PostgresqlLocalInstance.createTestDatabase();
}

@Override
public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
var type = parameterContext.getParameter().getParameterizedType();
return type.equals(PostgresqlTransactionalStoreSetupExtension.class);
}

@Override
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws
ParameterResolutionException {
var type = parameterContext.getParameter().getParameterizedType();
if (type.equals(PostgresqlTransactionalStoreSetupExtension.class)) {
return this;
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void setUp(PostgresqlStoreSetupExtension extension) throws IOException {
}

@Override
protected void prepareEdr(EndpointDataReference edr) {
protected void onBeforeEdrSave(EndpointDataReference edr) {
when(vault.resolveSecret(edr.getId())).thenReturn(typeManager.writeValueAsString(edr));
}

Expand All @@ -78,4 +78,5 @@ void tearDown(PostgresqlStoreSetupExtension extension) throws SQLException {
protected EndpointDataReferenceCache getStore() {
return cache;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.edr.store.sql;

import org.eclipse.edc.junit.annotations.PostgresqlDbIntegrationTest;
import org.eclipse.edc.spi.persistence.EdcPersistenceException;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.result.StoreResult;
import org.eclipse.edc.spi.security.Vault;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.tractusx.edc.edr.store.sql.schema.EdrStatements;
import org.eclipse.tractusx.edc.edr.store.sql.schema.postgres.PostgresEdrStatements;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.time.Clock;

import static java.util.UUID.randomUUID;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.eclipse.tractusx.edc.edr.spi.TestFunctions.edr;
import static org.eclipse.tractusx.edc.edr.spi.TestFunctions.edrEntry;
import static org.eclipse.tractusx.edc.edr.store.sql.SqlEndpointDataReferenceCache.VAULT_PREFIX;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@PostgresqlDbIntegrationTest
@ExtendWith(PostgresqlTransactionalStoreSetupExtension.class)
public class SqlEndpointDataReferenceCacheTransactionalTest {

EdrStatements statements = new PostgresEdrStatements();
SqlEndpointDataReferenceCache cache;

Clock clock = Clock.systemUTC();

Vault vault = mock(Vault.class);

TypeManager typeManager = new TypeManager();

@BeforeEach
void setUp(PostgresqlTransactionalStoreSetupExtension extension) throws IOException {

when(vault.deleteSecret(any())).thenReturn(Result.success());
when(vault.storeSecret(any(), any())).thenReturn(Result.success());

cache = new SqlEndpointDataReferenceCache(extension.getDataSourceRegistry(), extension.getDatasourceName(), extension.getTransactionContext(), statements, typeManager.getMapper(), vault, clock);
var schema = Files.readString(Paths.get("./docs/schema.sql"));
extension.runQuery(schema);

}

@Test
void save_shouldFail_whenVaultError() {

var tpId = "tp1";
var assetId = "asset1";
var edrId = "edr1";

var edr = edr(edrId);
var entry = edrEntry(assetId, randomUUID().toString(), tpId);

when(vault.storeSecret(any(), any())).thenReturn(Result.failure("fail"));
when(vault.resolveSecret(edr.getId())).thenReturn(typeManager.writeValueAsString(edr));

assertThatThrownBy(() -> cache.save(entry, edr)).isInstanceOf(EdcPersistenceException.class);

assertThat(cache.resolveReference(tpId))
.isNull();

}

@Test
void save() {

var tpId = "tp1";
var assetId = "asset1";
var edrId = "edr1";

var edr = edr(edrId);
var entry = edrEntry(assetId, randomUUID().toString(), tpId);

when(vault.storeSecret(any(), any())).thenReturn(Result.success());
when(vault.resolveSecret(edr.getId())).thenReturn(typeManager.writeValueAsString(edr));

cache.save(entry, edr);

assertThat(cache.resolveReference(tpId))
.isNotNull()
.extracting(EndpointDataReference::getId)
.isEqualTo(edrId);

var edrs = cache.referencesForAsset(assetId);
assertThat(edrs.size()).isEqualTo(1);
assertThat(edrs.get((0)).getId()).isEqualTo(edrId);

verify(vault).storeSecret(eq(VAULT_PREFIX + edr.getId()), any());

}

@Test
void deleteByTransferProcessId_shouldDelete_WhenFound() {

var entry = edrEntry("assetId", "agreementId", "tpId");
var edr = edr("edrId");
cache.save(entry, edr);

assertThat(cache.deleteByTransferProcessId(entry.getTransferProcessId()))
.extracting(StoreResult::getContent)
.isEqualTo(entry);

assertThat(cache.resolveReference(entry.getTransferProcessId())).isNull();
assertThat(cache.referencesForAsset(entry.getAssetId())).hasSize(0);
assertThat(cache.queryForEntries(QuerySpec.max())).hasSize(0);

verify(vault).storeSecret(eq(VAULT_PREFIX + edr.getId()), any());
verify(vault).deleteSecret(eq(VAULT_PREFIX + edr.getId()));
}

@AfterEach
void tearDown(PostgresqlTransactionalStoreSetupExtension extension) throws SQLException {
extension.runQuery("DROP TABLE " + statements.getEdrTable() + " CASCADE");
}

}
Loading

0 comments on commit 6f19756

Please sign in to comment.