Skip to content

Commit

Permalink
feat: sql implementation of target node directory
Browse files Browse the repository at this point in the history
  • Loading branch information
bscholtes1A committed Aug 14, 2024
1 parent a1b7e8f commit ced8663
Show file tree
Hide file tree
Showing 18 changed files with 667 additions and 5 deletions.
1 change: 1 addition & 0 deletions core/federated-catalog-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,5 @@ dependencies {
testImplementation(libs.awaitility)

testImplementation(testFixtures(project(":spi:federated-catalog-spi")))
testImplementation(testFixtures(project(":spi:crawler-spi")))
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@
import org.eclipse.edc.crawler.spi.TargetNodeDirectory;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class InMemoryNodeDirectory implements TargetNodeDirectory {
private final List<TargetNode> cache = new CopyOnWriteArrayList<>();
private final Map<String, TargetNode> cache = new ConcurrentHashMap<>();

@Override
public List<TargetNode> getAll() {
return List.copyOf(cache); //never return the internal copy
return List.copyOf(cache.values()); //never return the internal copy
}

@Override
public void insert(TargetNode node) {
cache.add(node);
cache.put(node.id(), node);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2024 Amadeus IT Group
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Amadeus IT Group - initial API and implementation
*
*/

package org.eclipse.edc.catalog.directory;

import org.eclipse.edc.catalog.spi.testfixtures.TargetNodeDirectoryTestBase;
import org.eclipse.edc.crawler.spi.TargetNodeDirectory;

class InMemoryNodeDirectoryTest extends TargetNodeDirectoryTestBase {

private final InMemoryNodeDirectory store = new InMemoryNodeDirectory();

@Override
protected TargetNodeDirectory getStore() {
return store;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
#
#

org.eclipse.edc.catalog.store.sql.SqlFederatedCatalogCacheExtension
org.eclipse.edc.catalog.store.sql.SqlTargetNodeDirectoryExtension
29 changes: 29 additions & 0 deletions extensions/store/sql/target-node-directory-sql/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

plugins {
`java-library`
}

dependencies {
api(project(":spi:crawler-spi"))
implementation(libs.edc.sql.core) // for the SqlStatements
implementation(libs.edc.sql.bootstrapper)
implementation(libs.edc.spi.transaction.datasource)
implementation(libs.edc.lib.util)

testImplementation(libs.edc.junit)
testImplementation(testFixtures(libs.edc.sql.core))
testImplementation(testFixtures(project(":spi:crawler-spi")))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2024 Amadeus IT Group
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Amadeus IT Group - initial API and implementation
*
*/

package org.eclipse.edc.catalog.store.sql;

import org.eclipse.edc.catalog.store.sql.schema.postgres.TargetNodeMapping;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.sql.translation.PostgresqlOperatorTranslator;
import org.eclipse.edc.sql.translation.SqlQueryStatement;

import static java.lang.String.format;

public abstract class BaseSqlDialectStatements implements TargetNodeStatements {

@Override
public String getFindByIdTemplate() {
return format("SELECT * FROM %s WHERE %s = ?", getTargetNodeDirectoryTable(), getIdColumn());
}

@Override
public String getUpdateTemplate() {
return executeStatement()
.column(getNameColumn())
.column(getTargetUrlColumn())
.jsonColumn(getSupportedProtocolsColumn())
.update(getTargetNodeDirectoryTable(), getIdColumn());
}

@Override
public String getInsertTemplate() {
return executeStatement()
.column(getIdColumn())
.column(getNameColumn())
.column(getTargetUrlColumn())
.jsonColumn(getSupportedProtocolsColumn())
.insertInto(getTargetNodeDirectoryTable());
}

@Override
public SqlQueryStatement createQuery(QuerySpec querySpec) {
var select = getSelectStatement();
return new SqlQueryStatement(select, querySpec, new TargetNodeMapping(this), new PostgresqlOperatorTranslator());
}

@Override
public String getSelectStatement() {
return format("SELECT * FROM %s", getTargetNodeDirectoryTable());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright (c) 2024 Amadeus IT Group
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Amadeus IT Group - initial API and implementation
*
*/

package org.eclipse.edc.catalog.store.sql;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.edc.crawler.spi.TargetNode;
import org.eclipse.edc.crawler.spi.TargetNodeDirectory;
import org.eclipse.edc.spi.persistence.EdcPersistenceException;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.sql.QueryExecutor;
import org.eclipse.edc.sql.store.AbstractSqlStore;
import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry;
import org.eclipse.edc.transaction.spi.TransactionContext;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

public class SqlTargetNodeDirectory extends AbstractSqlStore implements TargetNodeDirectory {

private final TargetNodeStatements statements;

public SqlTargetNodeDirectory(DataSourceRegistry dataSourceRegistry, String dataSourceName, TransactionContext transactionContext,
ObjectMapper objectMapper, QueryExecutor queryExecutor, TargetNodeStatements statements) {
super(dataSourceRegistry, dataSourceName, transactionContext, objectMapper, queryExecutor);
this.statements = statements;
}

@Override
public List<TargetNode> getAll() {
return transactionContext.execute(() -> {
try (var connection = getConnection()) {
var query = statements.createQuery(QuerySpec.max());
return queryExecutor.query(connection, true, this::mapResultSet, query.getQueryAsString(), query.getParameters()).toList();
} catch (SQLException e) {
throw new EdcPersistenceException(e);
}
});
}

@Override
public void insert(TargetNode node) {
transactionContext.execute(() -> {
try (var connection = getConnection()) {
var id = node.id();

if (findByIdInternal(connection, id) == null) {
insertInternal(connection, id, node);
} else {
updateInternal(connection, id, node);
}

} catch (SQLException e) {
throw new EdcPersistenceException(e);
}
});
}

private TargetNode findByIdInternal(Connection connection, String id) {
var stmt = statements.getFindByIdTemplate();
return queryExecutor.single(connection, false, this::mapResultSet, stmt, id);
}

private void insertInternal(Connection connection, String id, TargetNode targetNode) {
var stmt = statements.getInsertTemplate();
queryExecutor.execute(connection,
stmt,
id,
targetNode.name(),
targetNode.targetUrl(),
toJson(targetNode.supportedProtocols())
);
}

private void updateInternal(Connection connection, String id, TargetNode targetNode) {
var stmt = statements.getUpdateTemplate();
queryExecutor.execute(connection,
stmt,
targetNode.name(),
targetNode.targetUrl(),
toJson(targetNode.supportedProtocols()),
id
);
}

private TargetNode mapResultSet(ResultSet resultSet) throws Exception {
return new TargetNode(
resultSet.getString(statements.getNameColumn()),
resultSet.getString(statements.getIdColumn()),
resultSet.getString(statements.getTargetUrlColumn()),
fromJson(resultSet.getString(statements.getSupportedProtocolsColumn()), new TypeReference<>() {
})
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2024 Amadeus IT Group
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Amadeus IT Group - initial API and implementation
*
*/

package org.eclipse.edc.catalog.store.sql;

import org.eclipse.edc.catalog.store.sql.schema.postgres.PostgresDialectStatements;
import org.eclipse.edc.crawler.spi.TargetNode;
import org.eclipse.edc.crawler.spi.TargetNodeDirectory;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provides;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.sql.QueryExecutor;
import org.eclipse.edc.sql.bootstrapper.SqlSchemaBootstrapper;
import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry;
import org.eclipse.edc.transaction.spi.TransactionContext;

@Provides(TargetNodeDirectory.class)
@Extension(value = "SQL target node directory")
public class SqlTargetNodeDirectoryExtension implements ServiceExtension {

@Setting(value = "The datasource to be used", defaultValue = DataSourceRegistry.DEFAULT_DATASOURCE)
public static final String DATASOURCE_NAME = "edc.sql.store.targetnodedirectory.datasource";

@Inject
private DataSourceRegistry dataSourceRegistry;
@Inject
private TransactionContext trxContext;
@Inject(required = false)
private TargetNodeStatements statements;
@Inject
private TypeManager typeManager;

@Inject
private QueryExecutor queryExecutor;

@Inject
private SqlSchemaBootstrapper sqlSchemaBootstrapper;

@Override
public void initialize(ServiceExtensionContext context) {
typeManager.registerTypes(TargetNode.class);
var dataSourceName = context.getSetting(DATASOURCE_NAME, DataSourceRegistry.DEFAULT_DATASOURCE);
var targetNodeDirectory = new SqlTargetNodeDirectory(dataSourceRegistry, dataSourceName, trxContext,
typeManager.getMapper(), queryExecutor, getStatementImpl());
context.registerService(TargetNodeDirectory.class, targetNodeDirectory);
sqlSchemaBootstrapper.addStatementFromResource(dataSourceName, "target-node-directory-schema.sql");
}

/**
* returns an externally-provided sql statement dialect, or postgres as a default
*/
private TargetNodeStatements getStatementImpl() {
return statements != null ? statements : new PostgresDialectStatements();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (c) 2024 Amadeus IT Group
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Amadeus IT Group - initial API and implementation
*
*/

package org.eclipse.edc.catalog.store.sql;

import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.sql.statement.SqlStatements;
import org.eclipse.edc.sql.translation.SqlQueryStatement;

public interface TargetNodeStatements extends SqlStatements {

default String getTargetNodeDirectoryTable() {
return "edc_target_node_directory";
}

default String getIdColumn() {
return "id";
}

default String getNameColumn() {
return "name";
}

default String getTargetUrlColumn() {
return "target_url";
}

default String getSupportedProtocolsColumn() {
return "supported_protocols";
}

String getInsertTemplate();

String getFindByIdTemplate();

String getUpdateTemplate();

SqlQueryStatement createQuery(QuerySpec query);

String getSelectStatement();
}
Loading

0 comments on commit ced8663

Please sign in to comment.