Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KOGITO-5921 Support For OracleDB Peristence #1612

Merged
merged 20 commits into from
Oct 7, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion addons/common/persistence/jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down Expand Up @@ -80,4 +86,4 @@
</plugin>
</plugins>
</build>
</project>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2021 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.kie.kogito.persistence.jdbc;

import java.io.InputStream;
import java.util.List;

public class FileLoader {

static List<String> getQueryFromFile(final String dbType, final String scriptName) {
final String fileName = String.format("sql/%s_%s.sql", scriptName, dbType);
try (InputStream stream = Thread.currentThread().getContextClassLoader().getResourceAsStream(fileName)) {
if (stream == null) {
throw new IllegalStateException(String.format("Impossible to find %s", fileName));
}
byte[] buffer = new byte[stream.available()];
stream.read(buffer);
String[] statements = new String(buffer).split(";");
Scuilion marked this conversation as resolved.
Show resolved Hide resolved
return List.of(statements);
} catch (Exception e) {
throw new RuntimeException(String.format("Error reading query script file %s", fileName), e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
* Copyright 2021 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.kie.kogito.persistence.jdbc;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.kie.kogito.persistence.jdbc.JDBCProcessInstances.PAYLOAD;
import static org.kie.kogito.persistence.jdbc.JDBCProcessInstances.VERSION;

public class GenericRepository extends Repository {

private static final Logger LOGGER = LoggerFactory.getLogger(GenericRepository.class);

private enum DatabaseType {
ANSI("ansi", "process_instances"),
ORACLE("Oracle", "PROCESS_INSTANCES"),
POSTGRES("PostgreSQL", "process_instances");

private final String dbIdentifier;
private final String tableNamePattern;

DatabaseType(final String dbIdentifier, final String tableNamePattern) {
this.dbIdentifier = dbIdentifier;
this.tableNamePattern = tableNamePattern;
}

String getDbIdentifier() {
return this.dbIdentifier;
}

public static DatabaseType create(final String dbIdentifier) {
if (ORACLE.getDbIdentifier().equals(dbIdentifier)) {
return ORACLE;
} else if (POSTGRES.getDbIdentifier().equals(dbIdentifier)) {
return POSTGRES;
} else {
var msg = String.format("Unrecognized DB (%s), defaulting to ansi", dbIdentifier);
LOGGER.warn(msg);
return ANSI;
}
}
}

private DatabaseType getDataBaseType(Connection connection) throws SQLException {
final DatabaseMetaData metaData = connection.getMetaData();
final String dbProductName = metaData.getDatabaseProductName();
return DatabaseType.create(dbProductName);
}

@Override
boolean tableExists(DataSource dataSource) {
try (Connection connection = dataSource.getConnection()) {
DatabaseType databaseType = getDataBaseType(connection);
final DatabaseMetaData metaData = connection.getMetaData();
final String[] types = { "TABLE" };
ResultSet tables = metaData.getTables(null, null, databaseType.tableNamePattern, types);
while (tables.next()) {
LOGGER.debug("Found process_instance table");
return true;
}
return false;
} catch (SQLException e) {
var msg = "Failed to read table metadata";
throw new RuntimeException(msg);
}
}

@Override
void createTable(DataSource dataSource) {
try (Connection connection = dataSource.getConnection()) {
DatabaseType databaseType = getDataBaseType(connection);
final List<String> statements = FileLoader.getQueryFromFile(databaseType.dbIdentifier, "create_tables");
for (String s : statements) {
try (PreparedStatement prepareStatement = connection.prepareStatement(s.trim())) {
prepareStatement.execute();
}
}
LOGGER.info("DDL successfully done for ProcessInstance");
} catch (SQLException e) {
var msg = "Error creating process_instances table, the database should be configured properly before starting the application";
throw new RuntimeException(msg);
}
}

@Override
void insertInternal(DataSource dataSource, String processId, UUID id, byte[] payload) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(INSERT)) {
statement.setString(1, id.toString());
statement.setBytes(2, payload);
statement.setString(3, processId);
statement.setLong(4, 1L);
statement.executeUpdate();
} catch (Exception e) {
throw uncheckedException(e, "Error inserting process instance %s", id);
}
}

@Override
void updateInternal(DataSource dataSource, UUID id, byte[] payload) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(UPDATE)) {
statement.setBytes(1, payload);
statement.setString(2, id.toString());
statement.executeUpdate();
} catch (Exception e) {
throw uncheckedException(e, "Error updating process instance %s", id);
}
}

@Override
boolean updateWithLock(DataSource dataSource, UUID id, byte[] payload, long version) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(UPDATE_WITH_LOCK)) {
statement.setBytes(1, payload);
statement.setLong(2, version + 1);
statement.setString(3, id.toString());
statement.setLong(4, version);
int count = statement.executeUpdate();
return count == 1;
} catch (Exception e) {
throw uncheckedException(e, "Error updating with lock process instance %s", id);
}
}

@Override
boolean deleteInternal(DataSource dataSource, UUID id) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(DELETE)) {
statement.setString(1, id.toString());
int count = statement.executeUpdate();
return count == 1;
} catch (Exception e) {
throw uncheckedException(e, "Error deleting process instance %s", id);
}
}

@Override
Map<String, Object> findByIdInternal(DataSource dataSource, UUID id) {
Map<String, Object> result = new HashMap<>();
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(FIND_BY_ID)) {
statement.setString(1, id.toString());
try (ResultSet resultSet = statement.executeQuery()) {
if (resultSet.next()) {
Optional<byte[]> b = Optional.ofNullable(resultSet.getBytes(PAYLOAD));
if (b.isPresent()) {
result.put(PAYLOAD, b.get());
}
result.put(VERSION, resultSet.getLong(VERSION));
return result;
}
}
} catch (Exception e) {
throw uncheckedException(e, "Error finding process instance %s", id);
}
return result;
}

@Override
List<byte[]> findAllInternal(DataSource dataSource, String processId) {
List<byte[]> result = new ArrayList<>();
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(FIND_ALL)) {
statement.setString(1, processId);
try (ResultSet resultSet = statement.executeQuery()) {
while (resultSet.next()) {
result.add(resultSet.getBytes(PAYLOAD));
}
}
return result;
} catch (Exception e) {
throw uncheckedException(e, "Error finding all process instances, for processId %s", processId);
}
}

@Override
Long countInternal(DataSource dataSource, String processId) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(COUNT)) {
statement.setString(1, processId);
try (ResultSet resultSet = statement.executeQuery()) {
if (resultSet.next()) {
return resultSet.getLong("count");
}
}
} catch (Exception e) {
throw uncheckedException(e, "Error counting process instances, for processId %s", processId);
}
return 0l;
}

}
Loading