From 713d6fb8f9d6913b1fd11326c390fb8ed164b3de Mon Sep 17 00:00:00 2001 From: Maxime Wiewiora <48218208+maximevw@users.noreply.github.com> Date: Wed, 26 Jun 2024 16:35:10 +0200 Subject: [PATCH] Add compatibility mode for AWS Keyspaces (#303) --- README.md | 8 +- .../cassandra/database/CassandraDatabase.java | 44 +++++++- .../database/CassandraDatabaseConnection.java | 2 +- .../lockservice/LockServiceCassandra.java | 36 +++++-- ...eChangeLogLockTableGeneratorCassandra.java | 14 ++- .../DeleteGeneratorCassandra.java | 79 +++++++++++++- ...eChangeLogLockTableGeneratorCassandra.java | 33 +++++- .../TagDatabaseGeneratorCassandra.java | 101 +++++++++++++----- .../database/CassandraDatabaseTest.groovy | 7 ++ 9 files changed, 265 insertions(+), 59 deletions(-) diff --git a/README.md b/README.md index fab6bb96..7a975153 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -liquibase-cassandra[![Build and Test Extention](https://github.com/liquibase/liquibase-cassandra/actions/workflows/build.yml/badge.svg)](https://github.com/liquibase/liquibase-cassandra/actions/workflows/build.yml) +liquibase-cassandra [![Build and Test Extension](https://github.com/liquibase/liquibase-cassandra/actions/workflows/test.yml/badge.svg)](https://github.com/liquibase/liquibase-cassandra/actions/workflows/test.yml) =================== Liquibase extension for Cassandra Support. @@ -88,19 +88,19 @@ INSERT INTO posts(id, author_id, title, description, content, inserted_date) VAL INSERT INTO posts(id, author_id, title, description, content, inserted_date) VALUES (4,4,'itaque','deleniti','Magni nam optio id recusandae.','2010-07-28'); INSERT INTO posts(id, author_id, title, description, content, inserted_date) VALUES -(5,5,'ad','similique','Rerum tempore quis ut nesciunt qui excepturi est.','2006-10-09');; +(5,5,'ad','similique','Rerum tempore quis ut nesciunt qui excepturi est.','2006-10-09'); ``` #### Executing the tests First you need to build project - `mvn package` will do the job. ##### from IDE -From your IDE, right click on the `liquibase.ext.cassandra.LiquibaseHarnessSuiteIT` test class present in `src/test/groovy` directory. +From your IDE, right-click on the `liquibase.ext.cassandra.LiquibaseHarnessSuiteIT` test class present in `src/test/groovy` directory. Doing so, will allow you to execute all the standard change object tests in the liquibase-test-harness as well as the Cassandra specific change objects tests created exclusively to test this extension (You can find this in the `src/test/resources/liquibase/harness/change/changelogs/cassandra` directory). -To run single test case, let's say `addColumn`, create JUit configuration for `liquibase.harness.change.ChangeObjectTests` with arg `-DchangeObjects=addColumn` +To run single test case, let's say `addColumn`, create JUnit configuration for `liquibase.harness.change.ChangeObjectTests` with arg `-DchangeObjects=addColumn` More details about different options can be found in [liquibase-test-harness readme](https://github.com/liquibase/liquibase-test-harness) ##### from command line diff --git a/src/main/java/liquibase/ext/cassandra/database/CassandraDatabase.java b/src/main/java/liquibase/ext/cassandra/database/CassandraDatabase.java index 1f3c50f2..0a5cffa8 100644 --- a/src/main/java/liquibase/ext/cassandra/database/CassandraDatabase.java +++ b/src/main/java/liquibase/ext/cassandra/database/CassandraDatabase.java @@ -2,20 +2,25 @@ import com.ing.data.cassandra.jdbc.CassandraConnection; import liquibase.Scope; +import liquibase.configuration.ConfigurationDefinition; +import liquibase.configuration.LiquibaseConfiguration; import liquibase.database.AbstractJdbcDatabase; import liquibase.database.DatabaseConnection; import liquibase.database.jvm.JdbcConnection; import liquibase.exception.DatabaseException; import liquibase.structure.core.Index; +import java.sql.PreparedStatement; import java.sql.Statement; /** - * Cassandra 1.2.0 NoSQL database support. - * Javadocs for ING Cassandra JDBC Wrapper: https://javadoc.io/doc/com.ing.data/cassandra-jdbc-wrapper/latest/index.html - * - * Javadocs for DataStax OSS Driver: https://javadoc.io/doc/com.datastax.oss/java-driver-core/latest/index.html - * Jar file for DataStax OSS Driver: https://search.maven.org/search?q=com.DataStax.oss + * Cassandra NoSQL database support. + * Javadocs for ING Cassandra + * JDBC Wrapper
+ * Javadocs for Apache Cassandra + * OSS Java driver
+ * Apache Cassandra OSS Java + * driver */ public class CassandraDatabase extends AbstractJdbcDatabase { public static final String PRODUCT_NAME = "Cassandra"; @@ -24,6 +29,31 @@ public class CassandraDatabase extends AbstractJdbcDatabase { public static final String DEFAULT_DRIVER = "com.ing.data.cassandra.jdbc.CassandraDriver"; private String keyspace; + /** + * When running on AWS Keyspaces, a specific compatibility mode has to be activated for Liquibase because some + * behaviors need to be modified since AWS Keyspaces does not fully support CQL syntax. + * See: Issue #297 + * and: Support Cassandra APIs + * in AWS Keyspaces + */ + public static final ConfigurationDefinition AWS_KEYSPACES_COMPATIBILITY_MODE; + static { + final ConfigurationDefinition.Builder builder = new ConfigurationDefinition.Builder("liquibase.cassandra"); + + AWS_KEYSPACES_COMPATIBILITY_MODE = builder.define("awsKeyspacesCompatibilityModeEnabled", Boolean.class) + .setDescription("Whether the compatibility mode for AWS Keyspaces must be enabled") + .addAliasKey("liquibase.cassandra.awsKeyspacesCompatibilityModeEnabled") + .setDefaultValue(false) + .build(); + + Scope.getCurrentScope().getSingleton(LiquibaseConfiguration.class) + .registerDefinition(AWS_KEYSPACES_COMPATIBILITY_MODE); + } + + public static boolean isAwsKeyspacesCompatibilityModeEnabled() { + return AWS_KEYSPACES_COMPATIBILITY_MODE.getCurrentValue(); + } + @Override public String getShortName() { return SHORT_PRODUCT_NAME; @@ -143,6 +173,10 @@ public Statement getStatement() throws DatabaseException { return ((JdbcConnection) super.getConnection()).createStatement(); } + public PreparedStatement prepareStatement(String query) throws DatabaseException { + return ((JdbcConnection) super.getConnection()).prepareStatement(query); + } + @Override public boolean jdbcCallsCatalogsSchemas() { return true; diff --git a/src/main/java/liquibase/ext/cassandra/database/CassandraDatabaseConnection.java b/src/main/java/liquibase/ext/cassandra/database/CassandraDatabaseConnection.java index 4432f971..3a6c7463 100644 --- a/src/main/java/liquibase/ext/cassandra/database/CassandraDatabaseConnection.java +++ b/src/main/java/liquibase/ext/cassandra/database/CassandraDatabaseConnection.java @@ -13,7 +13,7 @@ public class CassandraDatabaseConnection extends JdbcConnection { @Override public int getPriority() { - return 201; + return PRIORITY_DATABASE + 200; } @Override diff --git a/src/main/java/liquibase/ext/cassandra/lockservice/LockServiceCassandra.java b/src/main/java/liquibase/ext/cassandra/lockservice/LockServiceCassandra.java index 8910eb43..1e385f9c 100644 --- a/src/main/java/liquibase/ext/cassandra/lockservice/LockServiceCassandra.java +++ b/src/main/java/liquibase/ext/cassandra/lockservice/LockServiceCassandra.java @@ -21,6 +21,8 @@ import java.util.List; import java.util.Map; +import static liquibase.ext.cassandra.database.CassandraDatabase.isAwsKeyspacesCompatibilityModeEnabled; + public class LockServiceCassandra extends StandardLockService { private boolean isDatabaseChangeLogLockTableInitialized; @@ -149,7 +151,7 @@ public boolean isDatabaseChangeLogLockTableInitialized(final boolean tableJustCr Executor executor = Scope.getCurrentScope().getSingleton(ExecutorService.class).getExecutor("jdbc", database); try { - isDatabaseChangeLogLockTableInitialized = executeCountQueryWithAlternative(executor, + isDatabaseChangeLogLockTableInitialized = executeCountQuery(executor, "SELECT COUNT(*) FROM " + getChangeLogLockTableName()) > 0; } catch (LiquibaseException e) { if (executor.updatesDatabase()) { @@ -170,7 +172,7 @@ private boolean isLocked(Executor executor) throws DatabaseException { private boolean isLockedByCurrentInstance(Executor executor) throws DatabaseException { final String lockedBy = NetUtil.getLocalHostName() + " (" + NetUtil.getLocalHostAddress() + ")"; - return executeCountQueryWithAlternative(executor, + return executeCountQuery(executor, "SELECT COUNT(*) FROM " + getChangeLogLockTableName() + " WHERE " + "LOCKED = TRUE AND LOCKEDBY = '" + lockedBy + "' ALLOW FILTERING") > 0; } @@ -183,20 +185,32 @@ private String getChangeLogLockTableName() { } } - private int executeCountQueryWithAlternative(final Executor executor, final String query) throws DatabaseException { + /** + * Execute a count query using an alternative if the AWS Keyspaces compatibility mode is enabled. + * + * @implNote Since aggregate functions like COUNT are not supported by AWS Keyspaces (see + * + * Cassandra functions in AWS Keyspaces), this method tries to execute the same query without the COUNT + * function then programmatically count returned rows, when the AWS Keyspaces compatibility mode is enabled. + * + * @param executor The query executor. + * @param query The query to execute. + * @return The result of the count query. + * @throws DatabaseException in case something goes wrong during the query execution or if the provided query is + * not a count query. + */ + private int executeCountQuery(final Executor executor, final String query) throws DatabaseException { if (!query.contains("SELECT COUNT(*)")) { throw new UnexpectedLiquibaseException("Invalid count query: " + query); } - try { - return executor.queryForInt(new RawSqlStatement(query)); - } catch (DatabaseException e) { - // If the count query failed (for example, because counting rows is not implemented - see issue #289 with - // AWS Keyspaces where aggregate functions like COUNT are not supported: - // https://docs.aws.amazon.com/keyspaces/latest/devguide/cassandra-apis.html#cassandra-functions), try to - // execute the same query without the COUNT function then programmatically count returned rows. - final String altQuery = query.replace("SELECT COUNT(*)", "SELECT *"); + if (isAwsKeyspacesCompatibilityModeEnabled()) { + Scope.getCurrentScope().getLog(LockServiceCassandra.class) + .fine("AWS Keyspaces compatibility mode enabled: using alternative count query"); + final String altQuery = query.replaceAll("(?i)SELECT COUNT\\(\\*\\)", "SELECT *"); final List> rows = executor.queryForList(new RawSqlStatement(altQuery)); return rows.size(); + } else { + return executor.queryForInt(new RawSqlStatement(query)); } } } diff --git a/src/main/java/liquibase/ext/cassandra/sqlgenerator/CreateDatabaseChangeLogLockTableGeneratorCassandra.java b/src/main/java/liquibase/ext/cassandra/sqlgenerator/CreateDatabaseChangeLogLockTableGeneratorCassandra.java index 13ce333c..e7e7a27d 100644 --- a/src/main/java/liquibase/ext/cassandra/sqlgenerator/CreateDatabaseChangeLogLockTableGeneratorCassandra.java +++ b/src/main/java/liquibase/ext/cassandra/sqlgenerator/CreateDatabaseChangeLogLockTableGeneratorCassandra.java @@ -24,12 +24,20 @@ public boolean supports(CreateDatabaseChangeLogLockTableStatement statement, Dat @Override public Sql[] generateSql(CreateDatabaseChangeLogLockTableStatement statement, Database database, SqlGeneratorChain sqlGeneratorChain) { - RawSqlStatement createTableStatement = new RawSqlStatement("CREATE TABLE IF NOT EXISTS " + - database.escapeTableName(database.getLiquibaseCatalogName(), database.getLiquibaseSchemaName(), database.getDatabaseChangeLogLockTableName()) + - " (ID INT, LOCKED BOOLEAN, LOCKGRANTED timestamp, LOCKEDBY TEXT, PRIMARY KEY (ID))"); + RawSqlStatement createTableStatement = buildCreateTableStatement( + database.escapeTableName( + database.getLiquibaseCatalogName(), + database.getLiquibaseSchemaName(), + database.getDatabaseChangeLogLockTableName()) + ); return SqlGeneratorFactory.getInstance().generateSql(createTableStatement, database); } + protected static RawSqlStatement buildCreateTableStatement(String tableName) { + return new RawSqlStatement("CREATE TABLE IF NOT EXISTS " + tableName + + " (ID INT, LOCKED BOOLEAN, LOCKGRANTED timestamp, LOCKEDBY TEXT, PRIMARY KEY (ID))"); + } + } diff --git a/src/main/java/liquibase/ext/cassandra/sqlgenerator/DeleteGeneratorCassandra.java b/src/main/java/liquibase/ext/cassandra/sqlgenerator/DeleteGeneratorCassandra.java index 01e9edd3..467de7b4 100644 --- a/src/main/java/liquibase/ext/cassandra/sqlgenerator/DeleteGeneratorCassandra.java +++ b/src/main/java/liquibase/ext/cassandra/sqlgenerator/DeleteGeneratorCassandra.java @@ -1,12 +1,26 @@ package liquibase.ext.cassandra.sqlgenerator; +import liquibase.Scope; import liquibase.database.Database; +import liquibase.exception.DatabaseException; +import liquibase.exception.UnexpectedLiquibaseException; import liquibase.ext.cassandra.database.CassandraDatabase; +import liquibase.ext.cassandra.lockservice.LockServiceCassandra; import liquibase.sql.Sql; import liquibase.sql.UnparsedSql; import liquibase.sqlgenerator.SqlGeneratorChain; import liquibase.sqlgenerator.core.DeleteGenerator; import liquibase.statement.core.DeleteStatement; +import liquibase.structure.core.PrimaryKey; +import liquibase.structure.core.Table; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +import static liquibase.ext.cassandra.database.CassandraDatabase.isAwsKeyspacesCompatibilityModeEnabled; public class DeleteGeneratorCassandra extends DeleteGenerator { @@ -24,15 +38,70 @@ public boolean supports(DeleteStatement statement, Database database) { public Sql[] generateSql(DeleteStatement statement, Database database, SqlGeneratorChain sqlGeneratorChain) { if (statement.getWhere() == null) { - - String sql = "TRUNCATE " + database - .escapeTableName(statement.getCatalogName(), statement.getSchemaName(), statement.getTableName()); - return new Sql[] { new UnparsedSql(sql, getAffectedTable(statement)) }; + if (isAwsKeyspacesCompatibilityModeEnabled()) { + Scope.getCurrentScope().getLog(DeleteGeneratorCassandra.class) + .fine("AWS Keyspaces compatibility mode enabled: using alternative queries to truncate " + statement.getTableName()); + return buildDeleteStatements(statement, database, sqlGeneratorChain).toArray(new Sql[]{}); + } else { + String sql = "TRUNCATE " + database.escapeTableName(statement.getCatalogName(), + statement.getSchemaName(), statement.getTableName()); + return new Sql[] { new UnparsedSql(sql, getAffectedTable(statement)) }; + } } else { - return super.generateSql(statement, database, sqlGeneratorChain); } } + /** + * Builds a list of DELETE statements to remove each row of a given table. + * + * @implNote Since AWS Keyspaces does not support TRUNCATE TABLE statements (see + * + * Cassandra APIs in AWS Keyspaces), this method tries to build a DELETE statement for each row in the table + * to truncate, when the AWS Keyspaces compatibility mode is enabled. + * Here, we cannot simply drop then re-create the table because the delete generator is too generic, so we can't + * easily know the structure of the table to re-create. + * + * @param statement The original DELETE statement not having any WHERE clause. + * @param database The database where the queries will be executed. + * @param sqlGeneratorChain The SQL generator chain. + * @return The list of DELETE statements to execute to truncate the given table. + * @throws UnexpectedLiquibaseException in case something goes wrong during the preparation of the statements. + */ + private List buildDeleteStatements(final DeleteStatement statement, final Database database, + final SqlGeneratorChain sqlGeneratorChain) { + List sqlStatements = new ArrayList<>(); + + String tableName = database.escapeTableName(statement.getCatalogName(), + statement.getSchemaName(), statement.getTableName()); + + // Get the primary key values of each row in the table. + Table table = (Table) getAffectedTable(statement); + PrimaryKey pk = table.getPrimaryKey(); + String pkColumnNames = pk.getColumnNames(); + String selectTableContent = "SELECT " + pkColumnNames + " FROM " + tableName; + + // For each row in the table, build a DELETE statement for which the WHERE clause is based on the primary key + // of the row. + try { + Statement stmt = ((CassandraDatabase) database).getStatement(); + ResultSet rs = stmt.executeQuery(selectTableContent); + while (rs.next()) { + DeleteStatement deleteRowStatement = new DeleteStatement(statement.getCatalogName(), + statement.getSchemaName(), statement.getTableName()); + statement.addWhereColumnName(pkColumnNames); + for (int i = 1; i <= rs.getMetaData().getColumnCount(); i++) { + statement.addWhereParameter(rs.getObject(i)); + } + sqlStatements.add(super.generateSql(deleteRowStatement, database, sqlGeneratorChain)[0]); + } + rs.close(); + stmt.close(); + } catch (DatabaseException | SQLException e) { + throw new UnexpectedLiquibaseException( + "Failed to build DELETE statements to truncate table " + tableName, e); + } + return sqlStatements; + } } diff --git a/src/main/java/liquibase/ext/cassandra/sqlgenerator/InitializeDatabaseChangeLogLockTableGeneratorCassandra.java b/src/main/java/liquibase/ext/cassandra/sqlgenerator/InitializeDatabaseChangeLogLockTableGeneratorCassandra.java index 0dfdf921..5fc898b4 100644 --- a/src/main/java/liquibase/ext/cassandra/sqlgenerator/InitializeDatabaseChangeLogLockTableGeneratorCassandra.java +++ b/src/main/java/liquibase/ext/cassandra/sqlgenerator/InitializeDatabaseChangeLogLockTableGeneratorCassandra.java @@ -1,7 +1,9 @@ package liquibase.ext.cassandra.sqlgenerator; +import liquibase.Scope; import liquibase.database.Database; import liquibase.ext.cassandra.database.CassandraDatabase; +import liquibase.ext.cassandra.lockservice.LockServiceCassandra; import liquibase.sql.Sql; import liquibase.sqlgenerator.SqlGeneratorChain; import liquibase.sqlgenerator.SqlGeneratorFactory; @@ -11,6 +13,9 @@ import liquibase.statement.core.InsertStatement; import liquibase.statement.core.RawSqlStatement; +import static liquibase.ext.cassandra.database.CassandraDatabase.isAwsKeyspacesCompatibilityModeEnabled; +import static liquibase.ext.cassandra.sqlgenerator.CreateDatabaseChangeLogLockTableGeneratorCassandra.buildCreateTableStatement; + public class InitializeDatabaseChangeLogLockTableGeneratorCassandra extends InitializeDatabaseChangeLogLockTableGenerator { @Override @@ -26,18 +31,36 @@ public boolean supports(InitializeDatabaseChangeLogLockTableStatement statement, @Override public Sql[] generateSql(InitializeDatabaseChangeLogLockTableStatement statement, Database database, SqlGeneratorChain sqlGeneratorChain) { - RawSqlStatement deleteStatement = new RawSqlStatement("TRUNCATE TABLE " + database.escapeTableName( + SqlStatement[] statements; + + String databaseChangelogLockTableName = database.escapeTableName( database.getLiquibaseCatalogName(), database.getLiquibaseSchemaName(), - database.getDatabaseChangeLogLockTableName().toUpperCase())); + database.getDatabaseChangeLogLockTableName().toUpperCase()); - InsertStatement insertStatement = new InsertStatement(database.getLiquibaseCatalogName(), database.getLiquibaseSchemaName(), database.getDatabaseChangeLogLockTableName().toUpperCase()) + InsertStatement insertStatement = new InsertStatement(database.getLiquibaseCatalogName(), + database.getLiquibaseSchemaName(), database.getDatabaseChangeLogLockTableName().toUpperCase()) .addColumnValue("ID", 1) .addColumnValue("LOCKED", Boolean.FALSE); - return SqlGeneratorFactory.getInstance().generateSql(new SqlStatement[]{deleteStatement, insertStatement}, database); + if (isAwsKeyspacesCompatibilityModeEnabled()) { + // Since AWS Keyspaces does not support TRUNCATE TABLE statements + // (https://docs.aws.amazon.com/keyspaces/latest/devguide/cassandra-apis.html#cassandra-api-support), drop + // then re-create the changelog lock table when the AWS Keyspaces compatibility mode is enabled. + Scope.getCurrentScope().getLog(InitializeDatabaseChangeLogLockTableGeneratorCassandra.class) + .fine("AWS Keyspaces compatibility mode enabled: using alternative queries to truncate changelog lock table"); + RawSqlStatement dropStatement = new RawSqlStatement("DROP TABLE " + databaseChangelogLockTableName); + RawSqlStatement createStatement = buildCreateTableStatement(databaseChangelogLockTableName); - } + statements = new SqlStatement[]{dropStatement, createStatement, insertStatement}; + } else { + RawSqlStatement deleteStatement = new RawSqlStatement("TRUNCATE TABLE " + databaseChangelogLockTableName); + statements = new SqlStatement[]{deleteStatement, insertStatement}; + } + + return SqlGeneratorFactory.getInstance().generateSql(statements, database); + + } } diff --git a/src/main/java/liquibase/ext/cassandra/sqlgenerator/TagDatabaseGeneratorCassandra.java b/src/main/java/liquibase/ext/cassandra/sqlgenerator/TagDatabaseGeneratorCassandra.java index da89bc14..59adf9d2 100644 --- a/src/main/java/liquibase/ext/cassandra/sqlgenerator/TagDatabaseGeneratorCassandra.java +++ b/src/main/java/liquibase/ext/cassandra/sqlgenerator/TagDatabaseGeneratorCassandra.java @@ -1,28 +1,26 @@ package liquibase.ext.cassandra.sqlgenerator; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.sql.Timestamp; -import java.text.SimpleDateFormat; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.OffsetDateTime; -import java.util.Calendar; -import java.util.TimeZone; - import liquibase.Scope; import liquibase.database.Database; import liquibase.database.ObjectQuotingStrategy; import liquibase.datatype.DataTypeFactory; import liquibase.exception.DatabaseException; +import liquibase.exception.UnexpectedLiquibaseException; import liquibase.ext.cassandra.database.CassandraDatabase; +import liquibase.ext.cassandra.lockservice.LockServiceCassandra; import liquibase.sql.Sql; import liquibase.sql.UnparsedSql; import liquibase.sqlgenerator.SqlGeneratorChain; import liquibase.sqlgenerator.core.TagDatabaseGenerator; import liquibase.statement.core.TagDatabaseStatement; -import liquibase.structure.core.Column; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; + +import static liquibase.ext.cassandra.database.CassandraDatabase.isAwsKeyspacesCompatibilityModeEnabled; public class TagDatabaseGeneratorCassandra extends TagDatabaseGenerator { @@ -43,22 +41,45 @@ public Sql[] generateSql(TagDatabaseStatement statement, Database database, SqlG try { String tagEscaped = DataTypeFactory.getInstance().fromObject(statement.getTag(), database).objectToSql(statement.getTag(), database); - Statement statement1 = ((CassandraDatabase) database).getStatement(); + String databaseChangelogTableName = database.escapeTableName(database.getLiquibaseCatalogName(), + database.getLiquibaseSchemaName(), "databasechangelog"); + //Query to get last executed changeset date - String query1 = "SELECT TOUNIXTIMESTAMP(MAX(DATEEXECUTED)) as DATEEXECUTED FROM " + - database.escapeTableName(database.getLiquibaseCatalogName(), database.getLiquibaseSchemaName(), "databasechangelog"); - ResultSet rs1 = statement1.executeQuery(query1); + ResultSet rs1; + PreparedStatement ps1 = null; + // When AWS Keyspaces compatibility mode is enabled, the max date must be calculated programmatically. + if (isAwsKeyspacesCompatibilityModeEnabled()) { + Scope.getCurrentScope().getLog(LockServiceCassandra.class) + .fine("AWS Keyspaces compatibility mode enabled: using alternative to get last executed changeset"); + Timestamp maxDateExecuted = selectLastExecutedChangesetTimestamp(database, databaseChangelogTableName); + String query1 = "SELECT TOUNIXTIMESTAMP(DATEEXECUTED) as LAST_DATEEXECUTED FROM " + + databaseChangelogTableName + "WHERE DATEEXECUTED = ? ALLOW FILTERING;"; + ps1 = ((CassandraDatabase) database).prepareStatement(query1); + ps1.setTimestamp(1, maxDateExecuted); + rs1 = ps1.executeQuery(); + } else { + String query1 = "SELECT TOUNIXTIMESTAMP(MAX(DATEEXECUTED)) as LAST_DATEEXECUTED FROM " + + databaseChangelogTableName; + rs1 = statement1.executeQuery(query1); + } String date = ""; + if (rs1 == null) { + throw new UnexpectedLiquibaseException( + "Unexpected null result set when getting last executed changeset date"); + } while (rs1.next()) { - date = rs1.getString("DATEEXECUTED"); + date = rs1.getString("LAST_DATEEXECUTED"); } rs1.close(); + if (ps1 != null) { + ps1.close(); + } + //Query to get composite key details of last executed change set - String query2 = "select id,author, filename from " + - database.escapeTableName(database.getLiquibaseCatalogName(), database.getLiquibaseSchemaName(), "databasechangelog") - + " where dateexecuted = '"+date+"' ALLOW FILTERING"; + String query2 = "SELECT id, author, filename FROM " + databaseChangelogTableName + + " WHERE dateexecuted = '" + date + "' ALLOW FILTERING"; ResultSet rs2 = statement1.executeQuery(query2); String id = "", author = "", filename = ""; while (rs2.next()) { @@ -68,22 +89,52 @@ public Sql[] generateSql(TagDatabaseStatement statement, Database database, SqlG } rs2.close(); statement1.close(); + //Query to update tag String updateQuery = "UPDATE " - + database.escapeTableName(database.getLiquibaseCatalogName(), database.getLiquibaseSchemaName(), "databasechangelog") - + " SET TAG = "+tagEscaped - + " WHERE id = '"+ id +"' and author = '"+ author +"' and filename = '"+ filename+ "'"; + + databaseChangelogTableName + + " SET TAG = " + tagEscaped + + " WHERE id = '" + id + "' AND author = '" + author + "' AND filename = '" + filename + "'"; return new Sql[]{ new UnparsedSql(updateQuery) }; - - } catch (SQLException | DatabaseException e) { return super.generateSql(statement, database, sqlGeneratorChain); } finally { database.setObjectQuotingStrategy(currentStrategy); } } + + /** + * Gets the timestamp of the last executed changeset without using {@code MAX} aggregate function. + * + * @implNote Since aggregate functions like MAX are not supported by AWS Keyspaces (see + * + * Cassandra functions in AWS Keyspaces), this method tries to get all the values of the column DATEEXECUTED in + * the table of the executed changesets to calculate and return the maximal value of this column. + * + * @param database The database where the query is executed. + * @param changelogTableName The name of the changelog table. + * @return The timestamp of the last executed changeset. + * @throws SQLException in case something goes wrong during the query execution. + * @throws DatabaseException in case something goes wrong during the query execution. + */ + private Timestamp selectLastExecutedChangesetTimestamp(final Database database, final String changelogTableName) + throws SQLException, DatabaseException { + Statement statement = ((CassandraDatabase) database).getStatement(); + ResultSet rs = statement.executeQuery("SELECT DATEEXECUTED FROM " + changelogTableName); + Timestamp maxValue = null; + while (rs.next()) { + Timestamp result = rs.getTimestamp("DATEEXECUTED"); + if (maxValue == null || maxValue.compareTo(result) < 0) { + maxValue = result; + } + } + rs.close(); + statement.close(); + return maxValue; + } + } diff --git a/src/test/groovy/liquibase/ext/cassandra/database/CassandraDatabaseTest.groovy b/src/test/groovy/liquibase/ext/cassandra/database/CassandraDatabaseTest.groovy index 3a0f3dc2..9ee29d81 100644 --- a/src/test/groovy/liquibase/ext/cassandra/database/CassandraDatabaseTest.groovy +++ b/src/test/groovy/liquibase/ext/cassandra/database/CassandraDatabaseTest.groovy @@ -1,5 +1,7 @@ package liquibase.ext.cassandra.database +import liquibase.Scope +import liquibase.configuration.LiquibaseConfiguration import spock.lang.Specification class CassandraDatabaseTest extends Specification { @@ -16,4 +18,9 @@ class CassandraDatabaseTest extends Specification { new CassandraDatabase().getDefaultDriver("jdbc:cassandra://localhost") != null } + def isAwsKeyspacesCompatibilityModeDisabledByDefault() { + expect: + !CassandraDatabase.isAwsKeyspacesCompatibilityModeEnabled() + } + }