Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KOGITO-5921 Support For OracleDB Peristence #1612

Merged
merged 20 commits into from
Oct 7, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions addons/common/persistence/jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
</parent>
<artifactId>kogito-addons-persistence-jdbc</artifactId>
<name>Kogito :: Add-Ons :: Persistence :: JDBC</name>
<!-- <version>2.0.1-SNAPSHOT</version> -->
Scuilion marked this conversation as resolved.
Show resolved Hide resolved

<description>JDBC based persistence for Kogito</description>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

import java.io.InputStream;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -45,9 +44,30 @@

public class JDBCProcessInstances implements MutableProcessInstances {

private static final String VERSION = "version";
static final String PAYLOAD = "payload";
static final String VERSION = "version";

private static final String PAYLOAD = "payload";
enum DatabaseType {
POSTGRES("PostgreSQL", "process_instances"),
ORACLE("Oracle", "PROCESS_INSTANCES");

private final String dbIdentifier;
private final String tableNamePattern;

DatabaseType(final String dbIdentifier, final String tableNamePattern) {
this.dbIdentifier = dbIdentifier;
this.tableNamePattern = tableNamePattern;
}
Scuilion marked this conversation as resolved.
Show resolved Hide resolved

public static DatabaseType create(final String dbIdentifier) {
if ("Oracle".equals(dbIdentifier)) {
return ORACLE;
} else if ("PostgreSQL".equals(dbIdentifier)) {
return POSTGRES;
}
return null;
Scuilion marked this conversation as resolved.
Show resolved Hide resolved
}
}

private static final Logger LOGGER = LoggerFactory.getLogger(JDBCProcessInstances.class);

Expand All @@ -57,13 +77,7 @@ public class JDBCProcessInstances implements MutableProcessInstances {
private final DataSource dataSource;
private final boolean lock;

private static final String FIND_ALL = "SELECT payload FROM process_instances WHERE process_id = ?";
private static final String FIND_BY_ID = "SELECT payload, version FROM process_instances WHERE id = ?";
private static final String INSERT = "INSERT INTO process_instances (id, payload, process_id, version) VALUES (?, ?, ?, ?)";
private static final String UPDATE = "UPDATE process_instances SET payload = ? WHERE id = ?";
private static final String UPDATE_WITH_LOCK = "UPDATE process_instances SET payload = ?, version = ? WHERE id = ? and version = ?";
private static final String DELETE = "DELETE FROM process_instances WHERE id = ?";
private static final String COUNT = "SELECT COUNT(id) FROM process_instances WHERE process_id = ?";
private Repository repository;

public JDBCProcessInstances(Process<?> process, DataSource dataSource, boolean autoDDL, boolean lock) {
this.dataSource = dataSource;
Expand All @@ -80,10 +94,34 @@ private void init() {
return;
}

try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(getQueryFromFile("exists_tables"))) {
try (Connection connection = dataSource.getConnection()) {
final DatabaseMetaData metaData = connection.getMetaData();
final String dbProductName = metaData.getDatabaseProductName();
DatabaseType databaseType = DatabaseType.create(dbProductName);
if (databaseType == null) {
throw new Exception("Database (" + dbProductName + ") not suported");
}
switch (databaseType) {
case ORACLE:
repository = new OracleRepository();
break;
case POSTGRES:
repository = new PostgresRepository();
break;
}
Scuilion marked this conversation as resolved.
Show resolved Hide resolved

createTable(connection, statement);
final String[] types = { "TABLE" };
ResultSet tables = metaData.getTables(null, null, databaseType.tableNamePattern, types);
boolean exist = false;
while (tables.next()) {
LOGGER.debug("Found process_instance table");
exist = true;
}

if (!exist) {
LOGGER.info("dynamically creating process_instances table");
createTable(connection, databaseType);
}

} catch (Exception e) {
//not break the execution flow in case of any missing permission for db application user, for instance.
Expand All @@ -92,46 +130,31 @@ private void init() {
}
}

private void createTable(Connection connection, PreparedStatement statement) {
boolean result = false;
try (ResultSet resultSet = statement.executeQuery()) {
if (resultSet.next()) {
result = Optional.ofNullable(resultSet.getBoolean("exists"))
.filter(Boolean.FALSE::equals)
.map(e -> {
try {
PreparedStatement prepareStatement = connection.prepareStatement(getQueryFromFile("create_tables"));
return prepareStatement.execute();
} catch (SQLException e1) {
LOGGER.error("Error creating process_instances table", e1);
}
return false;
})
.orElseGet(() -> {
LOGGER.info("Table process_instances already exists.");
return false;
});
private void createTable(final Connection connection, final DatabaseType dbType) {
try {
final List<String> statements = getQueryFromFile(dbType.dbIdentifier, "create_tables");
for (String s : statements) {
PreparedStatement prepareStatement = connection.prepareStatement(s.trim());
prepareStatement.execute();
}

} catch (Exception e) {
throw uncheckedException(e, "Error creating process_instances table");
}

if (result) {
LOGGER.info("DDL successfully done for ProcessInstance");
} else {
LOGGER.info("DDL executed with no changes for ProcessInstance");
} catch (SQLException e1) {
LOGGER.error("Error creating process_instances table", e1);
}
}

private String getQueryFromFile(String scriptName) {

try (InputStream stream = Thread.currentThread().getContextClassLoader().getResourceAsStream(String.format("sql/%s.sql", scriptName))) {
private List<String> getQueryFromFile(final String dbType, final String scriptName) {
final String fileName = String.format("sql/%s_%s.sql", scriptName, dbType);
try (InputStream stream = Thread.currentThread().getContextClassLoader().getResourceAsStream(fileName)) {
if (stream == null) {
throw new Exception();
}
byte[] buffer = new byte[stream.available()];
stream.read(buffer);
return new String(buffer);
String[] statments = new String(buffer).split(";");
return List.of(statments);
} catch (Exception e) {
throw uncheckedException(e, "Error reading query script file %s", scriptName);
throw uncheckedException(e, "Error reading query script file %s", fileName);
}
}

Expand All @@ -144,89 +167,39 @@ public boolean exists(String id) {
@Override
public void create(String id, ProcessInstance instance) {
if (isActive(instance)) {
insertInternal(UUID.fromString(id), marshaller.marshallProcessInstance(instance));
repository.insertInternal(dataSource, process.id(), UUID.fromString(id), marshaller.marshallProcessInstance(instance));
}
disconnect(instance);
}

private void insertInternal(UUID id, byte[] payload) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(INSERT)) {
statement.setObject(1, id);
statement.setBytes(2, payload);
statement.setString(3, process.id());
statement.setLong(4, 1L);
statement.executeUpdate();
} catch (Exception e) {
throw uncheckedException(e, "Error inserting process instance %s", id);
}
}

@SuppressWarnings("unchecked")
@Override
public void update(String id, ProcessInstance instance) {
if (isActive(instance)) {
if (lock) {
boolean isUpdated = updateWithLock(UUID.fromString(id), marshaller.marshallProcessInstance(instance), instance.version());
boolean isUpdated = repository.updateWithLock(dataSource, UUID.fromString(id), marshaller.marshallProcessInstance(instance), instance.version());
if (!isUpdated) {
throw uncheckedException(null, "The document with ID: %s was updated or deleted by other request.", id);
}
} else {
updateInternal(UUID.fromString(id), marshaller.marshallProcessInstance(instance));
repository.updateInternal(dataSource, UUID.fromString(id), marshaller.marshallProcessInstance(instance));
}
}
disconnect(instance);
}

private void updateInternal(UUID id, byte[] payload) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(UPDATE)) {
statement.setBytes(1, payload);
statement.setObject(2, id);
statement.executeUpdate();
} catch (Exception e) {
throw uncheckedException(e, "Error updating process instance %s", id);
}
}

private boolean updateWithLock(UUID id, byte[] payload, long version) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(UPDATE_WITH_LOCK)) {
statement.setBytes(1, payload);
statement.setLong(2, version + 1);
statement.setObject(3, id);
statement.setLong(4, version);
int count = statement.executeUpdate();
return count == 1;
} catch (Exception e) {
throw uncheckedException(e, "Error updating process instance %s", id);
}
}

@Override
public void remove(String id) {
boolean isDeleted = deleteInternal(UUID.fromString(id));
boolean isDeleted = repository.deleteInternal(dataSource, UUID.fromString(id));
if (lock && !isDeleted) {
throw uncheckedException(null, "The document with ID: %s was deleted by other request.", id);
}
}

private boolean deleteInternal(UUID id) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(DELETE)) {
statement.setObject(1, id);
int count = statement.executeUpdate();
return count == 1;
} catch (Exception e) {
throw uncheckedException(e, "Error deleting process instance %s", id);
}

}

@Override
public Optional<ProcessInstance> findById(String id, ProcessInstanceReadMode mode) {
ProcessInstance<?> instance = null;
Map<String, Object> map = findByIdInternal(UUID.fromString(id));
Map<String, Object> map = repository.findByIdInternal(dataSource, UUID.fromString(id));
if (map.containsKey(PAYLOAD)) {
byte[] b = (byte[]) map.get(PAYLOAD);
instance = mode == MUTABLE ? marshaller.unmarshallProcessInstance(b, process)
Expand All @@ -239,75 +212,24 @@ public Optional<ProcessInstance> findById(String id, ProcessInstanceReadMode mod

@Override
public Collection<ProcessInstance> values(ProcessInstanceReadMode mode) {
return findAllInternal().stream().map(b -> mode == MUTABLE ? marshaller.unmarshallProcessInstance(b, process) : marshaller.unmarshallReadOnlyProcessInstance(b, process))
return repository.findAllInternal(dataSource, process.id()).stream()
.map(b -> mode == MUTABLE ? marshaller.unmarshallProcessInstance(b, process) : marshaller.unmarshallReadOnlyProcessInstance(b, process))
.collect(Collectors.toList());
}

private Map<String, Object> findByIdInternal(UUID id) {
Map<String, Object> result = new HashMap<>();
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(FIND_BY_ID)) {
statement.setObject(1, id);
try (ResultSet resultSet = statement.executeQuery()) {
if (resultSet.next()) {
Optional<byte[]> b = Optional.ofNullable(resultSet.getBytes(PAYLOAD));
if (b.isPresent()) {
result.put(PAYLOAD, b.get());
}
result.put(VERSION, resultSet.getLong(VERSION));
return result;
}
}
} catch (Exception e) {
throw uncheckedException(e, "Error finding process instance %s", id);
}
return result;
}

private List<byte[]> findAllInternal() {
List<byte[]> result = new ArrayList<>();
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(FIND_ALL)) {
statement.setString(1, process.id());
try (ResultSet resultSet = statement.executeQuery()) {
while (resultSet.next()) {
result.add(resultSet.getBytes(PAYLOAD));
}
}
return result;
} catch (Exception e) {
throw uncheckedException(e, "Error finding all process instances, for processId %s", process.id());
}
}

@Override
public Integer size() {
return countInternal().intValue();
return repository.countInternal(dataSource, process.id()).intValue();
}

@Override
public boolean lock() {
return this.lock;
}

private Long countInternal() {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(COUNT)) {
statement.setString(1, process.id());
try (ResultSet resultSet = statement.executeQuery()) {
if (resultSet.next()) {
return resultSet.getLong("count");
}
}
} catch (Exception e) {
throw uncheckedException(e, "Error counting process instances, for processId %s", process.id());
}
return 0l;
}

private void disconnect(ProcessInstance instance) {
Supplier<byte[]> supplier = () -> {
Map<String, Object> map = findByIdInternal(UUID.fromString(instance.id()));
Map<String, Object> map = repository.findByIdInternal(dataSource, UUID.fromString(instance.id()));
((AbstractProcessInstance<?>) instance).setVersion((Long) map.get(VERSION));
return (byte[]) map.get(PAYLOAD);
};
Expand All @@ -317,4 +239,4 @@ private void disconnect(ProcessInstance instance) {
private RuntimeException uncheckedException(Exception ex, String message, Object... param) {
return new RuntimeException(String.format(message, param), ex);
}
}
}
Loading