Skip to content

Commit

Permalink
feat: SQL implementation of the EDR index store (eclipse-edc#4025)
Browse files Browse the repository at this point in the history
  • Loading branch information
wolf4ood authored Mar 21, 2024
1 parent 169eecc commit 7f9b059
Show file tree
Hide file tree
Showing 18 changed files with 786 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ static void createDatabase(String participantName) {
"control-plane/store/sql/policy-definition-store-sql",
"control-plane/store/sql/transfer-process-store-sql",
"data-plane/store/sql/data-plane-store-sql",
"policy-monitor/store/sql/policy-monitor-store-sql"
"policy-monitor/store/sql/policy-monitor-store-sql",
"common/store/sql/edr-index-sql"
)
.map(extensionsFolder::resolve)
.map(it -> it.resolve("docs"))
Expand Down
30 changes: 30 additions & 0 deletions extensions/common/store/sql/edr-index-sql/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

plugins {
`java-library`
}

dependencies {
api(project(":spi:common:core-spi"))
api(project(":spi:common:transaction-spi"))

implementation(project(":extensions:common:sql:sql-core"))
implementation(project(":spi:common:edr-store-spi"))
implementation(project(":spi:common:transaction-datasource-spi"))
testImplementation(project(":core:common:junit"))
testImplementation(testFixtures(project(":extensions:common:sql:sql-core")))
testImplementation(testFixtures(project(":spi:common:edr-store-spi")))

}
11 changes: 11 additions & 0 deletions extensions/common/store/sql/edr-index-sql/docs/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@

CREATE TABLE IF NOT EXISTS edc_edr_entry
(
transfer_process_id VARCHAR NOT NULL PRIMARY KEY,
agreement_id VARCHAR NOT NULL,
asset_id VARCHAR NOT NULL,
provider_id VARCHAR NOT NULL,
contract_negotiation_id VARCHAR,
created_at BIGINT NOT NULL
);

Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.store.sql.edr;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.edc.connector.store.sql.edr.schema.EndpointDataReferenceEntryStatements;
import org.eclipse.edc.edr.spi.store.EndpointDataReferenceEntryIndex;
import org.eclipse.edc.edr.spi.types.EndpointDataReferenceEntry;
import org.eclipse.edc.spi.persistence.EdcPersistenceException;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.StoreResult;
import org.eclipse.edc.sql.QueryExecutor;
import org.eclipse.edc.sql.store.AbstractSqlStore;
import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry;
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.jetbrains.annotations.Nullable;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import static java.lang.String.format;

public class SqlEndpointDataReferenceEntryIndex extends AbstractSqlStore implements EndpointDataReferenceEntryIndex {

private final EndpointDataReferenceEntryStatements statements;

public SqlEndpointDataReferenceEntryIndex(DataSourceRegistry dataSourceRegistry, String dataSourceName, TransactionContext transactionContext,
ObjectMapper objectMapper, EndpointDataReferenceEntryStatements statements, QueryExecutor queryExecutor) {
super(dataSourceRegistry, dataSourceName, transactionContext, objectMapper, queryExecutor);
this.statements = statements;
}

@Override
public @Nullable EndpointDataReferenceEntry findById(String transferProcessId) {
Objects.requireNonNull(transferProcessId);
return transactionContext.execute(() -> {
try (var connection = getConnection()) {
return findById(connection, transferProcessId);
} catch (Exception exception) {
throw new EdcPersistenceException(exception);
}
});
}

@Override
public StoreResult<List<EndpointDataReferenceEntry>> query(QuerySpec querySpec) {
return transactionContext.execute(() -> {
Objects.requireNonNull(querySpec);
try {
var queryStmt = statements.createQuery(querySpec);
try (var stream = queryExecutor.query(getConnection(), true, this::mapResultSet, queryStmt.getQueryAsString(), queryStmt.getParameters())) {
return StoreResult.success(stream.collect(Collectors.toList()));
}
} catch (SQLException exception) {
throw new EdcPersistenceException(exception);
}
});
}

@Override
public StoreResult<Void> save(EndpointDataReferenceEntry entry) {
return transactionContext.execute(() -> {
try (var connection = getConnection()) {
if (existsById(connection, entry.getTransferProcessId())) {
updateInternal(connection, entry);
} else {
insertInternal(connection, entry);
}
return StoreResult.success();
} catch (Exception e) {
throw new EdcPersistenceException(e.getMessage(), e);
}
});
}

@Override
public StoreResult<EndpointDataReferenceEntry> delete(String transferProcessId) {
Objects.requireNonNull(transferProcessId);
return transactionContext.execute(() -> {
try (var connection = getConnection()) {
var entity = findById(connection, transferProcessId);
if (entity != null) {
queryExecutor.execute(connection, statements.getDeleteByIdTemplate(), transferProcessId);
return StoreResult.success(entity);
} else {
return StoreResult.notFound(format(ENDPOINT_DATA_REFERENCE_ENTRY_FOUND, transferProcessId));
}
} catch (Exception e) {
throw new EdcPersistenceException(e.getMessage(), e);
}
});
}

private EndpointDataReferenceEntry findById(Connection connection, String id) {
var sql = statements.getFindByTemplate();
return queryExecutor.single(connection, false, this::mapResultSet, sql, id);
}

private boolean existsById(Connection connection, String definitionId) {
var sql = statements.getCountTemplate();
try (var stream = queryExecutor.query(connection, false, this::mapCount, sql, definitionId)) {
return stream.findFirst().orElse(0L) > 0;
}
}

private long mapCount(ResultSet resultSet) throws SQLException {
return resultSet.getLong(1);
}

private void insertInternal(Connection connection, EndpointDataReferenceEntry entry) {
transactionContext.execute(() -> {
queryExecutor.execute(connection, statements.getInsertTemplate(),
entry.getTransferProcessId(),
entry.getAssetId(),
entry.getProviderId(),
entry.getAgreementId(),
entry.getContractNegotiationId(),
entry.getCreatedAt());
});
}

private void updateInternal(Connection connection, EndpointDataReferenceEntry entry) {
transactionContext.execute(() -> {
queryExecutor.execute(connection, statements.getUpdateTemplate(),
entry.getTransferProcessId(),
entry.getAssetId(),
entry.getProviderId(),
entry.getAgreementId(),
entry.getContractNegotiationId(),
entry.getCreatedAt(),
entry.getTransferProcessId());
});
}

private EndpointDataReferenceEntry mapResultSet(ResultSet resultSet) throws Exception {
return EndpointDataReferenceEntry.Builder.newInstance()
.createdAt(resultSet.getLong(statements.getCreatedAtColumn()))
.assetId(resultSet.getString(statements.getAssetIdColumn()))
.transferProcessId(resultSet.getString(statements.getTransferProcessIdColumn()))
.agreementId(resultSet.getString(statements.getAgreementIdColumn()))
.providerId(resultSet.getString(statements.getProviderIdColumn()))
.contractNegotiationId(resultSet.getString(statements.getContractNegotiationIdColumn()))
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.store.sql.edr;


import org.eclipse.edc.connector.store.sql.edr.schema.EndpointDataReferenceEntryStatements;
import org.eclipse.edc.connector.store.sql.edr.schema.postgres.PostgresDialectStatements;
import org.eclipse.edc.edr.spi.store.EndpointDataReferenceEntryIndex;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provides;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.sql.QueryExecutor;
import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry;
import org.eclipse.edc.transaction.spi.TransactionContext;

@Provides({ EndpointDataReferenceEntryIndex.class })
@Extension(value = "SQL edr entry store")
public class SqlEndpointDataReferenceEntryIndexExtension implements ServiceExtension {

/**
* Name of the datasource to use for accessing edr entries.
*/
@Setting(required = true)
public static final String DATASOURCE_SETTING_NAME = "edc.datasource.edr.name";

@Inject
private DataSourceRegistry dataSourceRegistry;

@Inject
private TransactionContext transactionContext;

@Inject(required = false)
private EndpointDataReferenceEntryStatements statements;

@Inject
private QueryExecutor queryExecutor;

@Inject
private TypeManager typeManager;

@Override
public void initialize(ServiceExtensionContext context) {
var dataSourceName = context.getConfig().getString(DATASOURCE_SETTING_NAME, DataSourceRegistry.DEFAULT_DATASOURCE);

var sqlStore = new SqlEndpointDataReferenceEntryIndex(dataSourceRegistry, dataSourceName, transactionContext, typeManager.getMapper(),
getStatementImpl(), queryExecutor);

context.registerService(EndpointDataReferenceEntryIndex.class, sqlStore);
}

private EndpointDataReferenceEntryStatements getStatementImpl() {
return statements == null ? new PostgresDialectStatements() : statements;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.store.sql.edr.schema;

import org.eclipse.edc.connector.store.sql.edr.schema.postgres.EndpointDataReferenceEntryMapping;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.sql.translation.SqlOperatorTranslator;
import org.eclipse.edc.sql.translation.SqlQueryStatement;

import static java.lang.String.format;

public class BaseSqlDialectStatements implements EndpointDataReferenceEntryStatements {

protected final SqlOperatorTranslator operatorTranslator;

public BaseSqlDialectStatements(SqlOperatorTranslator operatorTranslator) {
this.operatorTranslator = operatorTranslator;
}

@Override
public String getDeleteByIdTemplate() {
return executeStatement().delete(getEdrEntryTable(), getTransferProcessIdColumn());
}

@Override
public String getFindByTemplate() {
return format("SELECT * FROM %s WHERE %s = ?", getEdrEntryTable(), getTransferProcessIdColumn());
}

@Override
public String getInsertTemplate() {
return executeStatement()
.column(getTransferProcessIdColumn())
.column(getAssetIdColumn())
.column(getProviderIdColumn())
.column(getAgreementIdColumn())
.column(getContractNegotiationIdColumn())
.column(getCreatedAtColumn())
.insertInto(getEdrEntryTable());
}

@Override
public String getCountTemplate() {
return format("SELECT COUNT (%s) FROM %s WHERE %s = ?",
getTransferProcessIdColumn(),
getEdrEntryTable(),
getTransferProcessIdColumn());
}

@Override
public String getUpdateTemplate() {
return executeStatement()
.column(getTransferProcessIdColumn())
.column(getAssetIdColumn())
.column(getProviderIdColumn())
.column(getAgreementIdColumn())
.column(getContractNegotiationIdColumn())
.column(getCreatedAtColumn())
.update(getEdrEntryTable(), getTransferProcessIdColumn());

}

@Override
public SqlQueryStatement createQuery(QuerySpec querySpec) {
var select = format("SELECT * FROM %s", getEdrEntryTable());
return new SqlQueryStatement(select, querySpec, new EndpointDataReferenceEntryMapping(this), operatorTranslator);
}

}
Loading

0 comments on commit 7f9b059

Please sign in to comment.