Skip to content

Commit

Permalink
Add compatibility mode for AWS Keyspaces (#303)
Browse files Browse the repository at this point in the history
  • Loading branch information
maximevw authored Jun 26, 2024
1 parent 7c41a2c commit 713d6fb
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 59 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
liquibase-cassandra[![Build and Test Extention](https://github.com/liquibase/liquibase-cassandra/actions/workflows/build.yml/badge.svg)](https://github.com/liquibase/liquibase-cassandra/actions/workflows/build.yml)
liquibase-cassandra [![Build and Test Extension](https://github.com/liquibase/liquibase-cassandra/actions/workflows/test.yml/badge.svg)](https://github.com/liquibase/liquibase-cassandra/actions/workflows/test.yml)
===================

Liquibase extension for Cassandra Support.
Expand Down Expand Up @@ -88,19 +88,19 @@ INSERT INTO posts(id, author_id, title, description, content, inserted_date) VAL
INSERT INTO posts(id, author_id, title, description, content, inserted_date) VALUES
(4,4,'itaque','deleniti','Magni nam optio id recusandae.','2010-07-28');
INSERT INTO posts(id, author_id, title, description, content, inserted_date) VALUES
(5,5,'ad','similique','Rerum tempore quis ut nesciunt qui excepturi est.','2006-10-09');;
(5,5,'ad','similique','Rerum tempore quis ut nesciunt qui excepturi est.','2006-10-09');
```

#### Executing the tests
First you need to build project - `mvn package` will do the job.

##### from IDE
From your IDE, right click on the `liquibase.ext.cassandra.LiquibaseHarnessSuiteIT` test class present in `src/test/groovy` directory.
From your IDE, right-click on the `liquibase.ext.cassandra.LiquibaseHarnessSuiteIT` test class present in `src/test/groovy` directory.
Doing so, will allow you to execute all the standard change object tests in the liquibase-test-harness as well as the
Cassandra specific change objects tests created exclusively to test this extension (You can find this in the
`src/test/resources/liquibase/harness/change/changelogs/cassandra` directory).

To run single test case, let's say `addColumn`, create JUit configuration for `liquibase.harness.change.ChangeObjectTests` with arg `-DchangeObjects=addColumn`
To run single test case, let's say `addColumn`, create JUnit configuration for `liquibase.harness.change.ChangeObjectTests` with arg `-DchangeObjects=addColumn`
More details about different options can be found in [liquibase-test-harness readme](https://github.com/liquibase/liquibase-test-harness)

##### from command line
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,25 @@

import com.ing.data.cassandra.jdbc.CassandraConnection;
import liquibase.Scope;
import liquibase.configuration.ConfigurationDefinition;
import liquibase.configuration.LiquibaseConfiguration;
import liquibase.database.AbstractJdbcDatabase;
import liquibase.database.DatabaseConnection;
import liquibase.database.jvm.JdbcConnection;
import liquibase.exception.DatabaseException;
import liquibase.structure.core.Index;

import java.sql.PreparedStatement;
import java.sql.Statement;

/**
* Cassandra 1.2.0 NoSQL database support.
* Javadocs for ING Cassandra JDBC Wrapper: https://javadoc.io/doc/com.ing.data/cassandra-jdbc-wrapper/latest/index.html
*
* Javadocs for DataStax OSS Driver: https://javadoc.io/doc/com.datastax.oss/java-driver-core/latest/index.html
* Jar file for DataStax OSS Driver: https://search.maven.org/search?q=com.DataStax.oss
* Cassandra NoSQL database support.
* <a href="https://javadoc.io/doc/com.ing.data/cassandra-jdbc-wrapper/latest/index.html">Javadocs for ING Cassandra
* JDBC Wrapper</a><br>
* <a href="https://javadoc.io/doc/com.datastax.oss/java-driver-core/latest/index.html">Javadocs for Apache Cassandra
* OSS Java driver</a><br>
* <a href="https://central.sonatype.com/artifact/org.apache.cassandra/java-driver-core">Apache Cassandra OSS Java
* driver</a>
*/
public class CassandraDatabase extends AbstractJdbcDatabase {
public static final String PRODUCT_NAME = "Cassandra";
Expand All @@ -24,6 +29,31 @@ public class CassandraDatabase extends AbstractJdbcDatabase {
public static final String DEFAULT_DRIVER = "com.ing.data.cassandra.jdbc.CassandraDriver";
private String keyspace;

/**
* When running on AWS Keyspaces, a specific compatibility mode has to be activated for Liquibase because some
* behaviors need to be modified since AWS Keyspaces does not fully support CQL syntax.
* See: <a href="https://github.com/liquibase/liquibase-cassandra/issues/297">Issue #297</a>
* and: <a href="https://docs.aws.amazon.com/keyspaces/latest/devguide/cassandra-apis.html">Support Cassandra APIs
* in AWS Keyspaces</a>
*/
public static final ConfigurationDefinition<Boolean> AWS_KEYSPACES_COMPATIBILITY_MODE;
static {
final ConfigurationDefinition.Builder builder = new ConfigurationDefinition.Builder("liquibase.cassandra");

AWS_KEYSPACES_COMPATIBILITY_MODE = builder.define("awsKeyspacesCompatibilityModeEnabled", Boolean.class)
.setDescription("Whether the compatibility mode for AWS Keyspaces must be enabled")
.addAliasKey("liquibase.cassandra.awsKeyspacesCompatibilityModeEnabled")
.setDefaultValue(false)
.build();

Scope.getCurrentScope().getSingleton(LiquibaseConfiguration.class)
.registerDefinition(AWS_KEYSPACES_COMPATIBILITY_MODE);
}

public static boolean isAwsKeyspacesCompatibilityModeEnabled() {
return AWS_KEYSPACES_COMPATIBILITY_MODE.getCurrentValue();
}

@Override
public String getShortName() {
return SHORT_PRODUCT_NAME;
Expand Down Expand Up @@ -143,6 +173,10 @@ public Statement getStatement() throws DatabaseException {
return ((JdbcConnection) super.getConnection()).createStatement();
}

public PreparedStatement prepareStatement(String query) throws DatabaseException {
return ((JdbcConnection) super.getConnection()).prepareStatement(query);
}

@Override
public boolean jdbcCallsCatalogsSchemas() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class CassandraDatabaseConnection extends JdbcConnection {

@Override
public int getPriority() {
return 201;
return PRIORITY_DATABASE + 200;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.List;
import java.util.Map;

import static liquibase.ext.cassandra.database.CassandraDatabase.isAwsKeyspacesCompatibilityModeEnabled;

public class LockServiceCassandra extends StandardLockService {

private boolean isDatabaseChangeLogLockTableInitialized;
Expand Down Expand Up @@ -149,7 +151,7 @@ public boolean isDatabaseChangeLogLockTableInitialized(final boolean tableJustCr
Executor executor = Scope.getCurrentScope().getSingleton(ExecutorService.class).getExecutor("jdbc", database);

try {
isDatabaseChangeLogLockTableInitialized = executeCountQueryWithAlternative(executor,
isDatabaseChangeLogLockTableInitialized = executeCountQuery(executor,
"SELECT COUNT(*) FROM " + getChangeLogLockTableName()) > 0;
} catch (LiquibaseException e) {
if (executor.updatesDatabase()) {
Expand All @@ -170,7 +172,7 @@ private boolean isLocked(Executor executor) throws DatabaseException {

private boolean isLockedByCurrentInstance(Executor executor) throws DatabaseException {
final String lockedBy = NetUtil.getLocalHostName() + " (" + NetUtil.getLocalHostAddress() + ")";
return executeCountQueryWithAlternative(executor,
return executeCountQuery(executor,
"SELECT COUNT(*) FROM " + getChangeLogLockTableName() + " WHERE " +
"LOCKED = TRUE AND LOCKEDBY = '" + lockedBy + "' ALLOW FILTERING") > 0;
}
Expand All @@ -183,20 +185,32 @@ private String getChangeLogLockTableName() {
}
}

private int executeCountQueryWithAlternative(final Executor executor, final String query) throws DatabaseException {
/**
* Execute a count query using an alternative if the AWS Keyspaces compatibility mode is enabled.
*
* @implNote Since aggregate functions like COUNT are not supported by AWS Keyspaces (see
* <a href="https://docs.aws.amazon.com/keyspaces/latest/devguide/cassandra-apis.html#cassandra-functions">
* Cassandra functions in AWS Keyspaces</a>), this method tries to execute the same query without the COUNT
* function then programmatically count returned rows, when the AWS Keyspaces compatibility mode is enabled.
*
* @param executor The query executor.
* @param query The query to execute.
* @return The result of the count query.
* @throws DatabaseException in case something goes wrong during the query execution or if the provided query is
* not a count query.
*/
private int executeCountQuery(final Executor executor, final String query) throws DatabaseException {
if (!query.contains("SELECT COUNT(*)")) {
throw new UnexpectedLiquibaseException("Invalid count query: " + query);
}
try {
return executor.queryForInt(new RawSqlStatement(query));
} catch (DatabaseException e) {
// If the count query failed (for example, because counting rows is not implemented - see issue #289 with
// AWS Keyspaces where aggregate functions like COUNT are not supported:
// https://docs.aws.amazon.com/keyspaces/latest/devguide/cassandra-apis.html#cassandra-functions), try to
// execute the same query without the COUNT function then programmatically count returned rows.
final String altQuery = query.replace("SELECT COUNT(*)", "SELECT *");
if (isAwsKeyspacesCompatibilityModeEnabled()) {
Scope.getCurrentScope().getLog(LockServiceCassandra.class)
.fine("AWS Keyspaces compatibility mode enabled: using alternative count query");
final String altQuery = query.replaceAll("(?i)SELECT COUNT\\(\\*\\)", "SELECT *");
final List<Map<String, ?>> rows = executor.queryForList(new RawSqlStatement(altQuery));
return rows.size();
} else {
return executor.queryForInt(new RawSqlStatement(query));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,20 @@ public boolean supports(CreateDatabaseChangeLogLockTableStatement statement, Dat
@Override
public Sql[] generateSql(CreateDatabaseChangeLogLockTableStatement statement, Database database, SqlGeneratorChain sqlGeneratorChain) {

RawSqlStatement createTableStatement = new RawSqlStatement("CREATE TABLE IF NOT EXISTS " +
database.escapeTableName(database.getLiquibaseCatalogName(), database.getLiquibaseSchemaName(), database.getDatabaseChangeLogLockTableName()) +
" (ID INT, LOCKED BOOLEAN, LOCKGRANTED timestamp, LOCKEDBY TEXT, PRIMARY KEY (ID))");
RawSqlStatement createTableStatement = buildCreateTableStatement(
database.escapeTableName(
database.getLiquibaseCatalogName(),
database.getLiquibaseSchemaName(),
database.getDatabaseChangeLogLockTableName())
);

return SqlGeneratorFactory.getInstance().generateSql(createTableStatement, database);

}

protected static RawSqlStatement buildCreateTableStatement(String tableName) {
return new RawSqlStatement("CREATE TABLE IF NOT EXISTS " + tableName
+ " (ID INT, LOCKED BOOLEAN, LOCKGRANTED timestamp, LOCKEDBY TEXT, PRIMARY KEY (ID))");
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,26 @@
package liquibase.ext.cassandra.sqlgenerator;

import liquibase.Scope;
import liquibase.database.Database;
import liquibase.exception.DatabaseException;
import liquibase.exception.UnexpectedLiquibaseException;
import liquibase.ext.cassandra.database.CassandraDatabase;
import liquibase.ext.cassandra.lockservice.LockServiceCassandra;
import liquibase.sql.Sql;
import liquibase.sql.UnparsedSql;
import liquibase.sqlgenerator.SqlGeneratorChain;
import liquibase.sqlgenerator.core.DeleteGenerator;
import liquibase.statement.core.DeleteStatement;
import liquibase.structure.core.PrimaryKey;
import liquibase.structure.core.Table;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;

import static liquibase.ext.cassandra.database.CassandraDatabase.isAwsKeyspacesCompatibilityModeEnabled;

public class DeleteGeneratorCassandra extends DeleteGenerator {

Expand All @@ -24,15 +38,70 @@ public boolean supports(DeleteStatement statement, Database database) {
public Sql[] generateSql(DeleteStatement statement, Database database, SqlGeneratorChain sqlGeneratorChain) {

if (statement.getWhere() == null) {

String sql = "TRUNCATE " + database
.escapeTableName(statement.getCatalogName(), statement.getSchemaName(), statement.getTableName());
return new Sql[] { new UnparsedSql(sql, getAffectedTable(statement)) };
if (isAwsKeyspacesCompatibilityModeEnabled()) {
Scope.getCurrentScope().getLog(DeleteGeneratorCassandra.class)
.fine("AWS Keyspaces compatibility mode enabled: using alternative queries to truncate " + statement.getTableName());
return buildDeleteStatements(statement, database, sqlGeneratorChain).toArray(new Sql[]{});
} else {
String sql = "TRUNCATE " + database.escapeTableName(statement.getCatalogName(),
statement.getSchemaName(), statement.getTableName());
return new Sql[] { new UnparsedSql(sql, getAffectedTable(statement)) };
}
} else {

return super.generateSql(statement, database, sqlGeneratorChain);
}

}

/**
* Builds a list of DELETE statements to remove each row of a given table.
*
* @implNote Since AWS Keyspaces does not support TRUNCATE TABLE statements (see
* <a href="https://docs.aws.amazon.com/keyspaces/latest/devguide/cassandra-apis.html#cassandra-api-support">
* Cassandra APIs in AWS Keyspaces</a>), this method tries to build a DELETE statement for each row in the table
* to truncate, when the AWS Keyspaces compatibility mode is enabled.
* Here, we cannot simply drop then re-create the table because the delete generator is too generic, so we can't
* easily know the structure of the table to re-create.
*
* @param statement The original DELETE statement not having any WHERE clause.
* @param database The database where the queries will be executed.
* @param sqlGeneratorChain The SQL generator chain.
* @return The list of DELETE statements to execute to truncate the given table.
* @throws UnexpectedLiquibaseException in case something goes wrong during the preparation of the statements.
*/
private List<Sql> buildDeleteStatements(final DeleteStatement statement, final Database database,
final SqlGeneratorChain sqlGeneratorChain) {
List<Sql> sqlStatements = new ArrayList<>();

String tableName = database.escapeTableName(statement.getCatalogName(),
statement.getSchemaName(), statement.getTableName());

// Get the primary key values of each row in the table.
Table table = (Table) getAffectedTable(statement);
PrimaryKey pk = table.getPrimaryKey();
String pkColumnNames = pk.getColumnNames();
String selectTableContent = "SELECT " + pkColumnNames + " FROM " + tableName;

// For each row in the table, build a DELETE statement for which the WHERE clause is based on the primary key
// of the row.
try {
Statement stmt = ((CassandraDatabase) database).getStatement();
ResultSet rs = stmt.executeQuery(selectTableContent);
while (rs.next()) {
DeleteStatement deleteRowStatement = new DeleteStatement(statement.getCatalogName(),
statement.getSchemaName(), statement.getTableName());
statement.addWhereColumnName(pkColumnNames);
for (int i = 1; i <= rs.getMetaData().getColumnCount(); i++) {
statement.addWhereParameter(rs.getObject(i));
}
sqlStatements.add(super.generateSql(deleteRowStatement, database, sqlGeneratorChain)[0]);
}
rs.close();
stmt.close();
} catch (DatabaseException | SQLException e) {
throw new UnexpectedLiquibaseException(
"Failed to build DELETE statements to truncate table " + tableName, e);
}
return sqlStatements;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package liquibase.ext.cassandra.sqlgenerator;

import liquibase.Scope;
import liquibase.database.Database;
import liquibase.ext.cassandra.database.CassandraDatabase;
import liquibase.ext.cassandra.lockservice.LockServiceCassandra;
import liquibase.sql.Sql;
import liquibase.sqlgenerator.SqlGeneratorChain;
import liquibase.sqlgenerator.SqlGeneratorFactory;
Expand All @@ -11,6 +13,9 @@
import liquibase.statement.core.InsertStatement;
import liquibase.statement.core.RawSqlStatement;

import static liquibase.ext.cassandra.database.CassandraDatabase.isAwsKeyspacesCompatibilityModeEnabled;
import static liquibase.ext.cassandra.sqlgenerator.CreateDatabaseChangeLogLockTableGeneratorCassandra.buildCreateTableStatement;

public class InitializeDatabaseChangeLogLockTableGeneratorCassandra extends InitializeDatabaseChangeLogLockTableGenerator {

@Override
Expand All @@ -26,18 +31,36 @@ public boolean supports(InitializeDatabaseChangeLogLockTableStatement statement,
@Override
public Sql[] generateSql(InitializeDatabaseChangeLogLockTableStatement statement, Database database, SqlGeneratorChain sqlGeneratorChain) {

RawSqlStatement deleteStatement = new RawSqlStatement("TRUNCATE TABLE " + database.escapeTableName(
SqlStatement[] statements;

String databaseChangelogLockTableName = database.escapeTableName(
database.getLiquibaseCatalogName(),
database.getLiquibaseSchemaName(),
database.getDatabaseChangeLogLockTableName().toUpperCase()));
database.getDatabaseChangeLogLockTableName().toUpperCase());

InsertStatement insertStatement = new InsertStatement(database.getLiquibaseCatalogName(), database.getLiquibaseSchemaName(), database.getDatabaseChangeLogLockTableName().toUpperCase())
InsertStatement insertStatement = new InsertStatement(database.getLiquibaseCatalogName(),
database.getLiquibaseSchemaName(), database.getDatabaseChangeLogLockTableName().toUpperCase())
.addColumnValue("ID", 1)
.addColumnValue("LOCKED", Boolean.FALSE);

return SqlGeneratorFactory.getInstance().generateSql(new SqlStatement[]{deleteStatement, insertStatement}, database);
if (isAwsKeyspacesCompatibilityModeEnabled()) {
// Since AWS Keyspaces does not support TRUNCATE TABLE statements
// (https://docs.aws.amazon.com/keyspaces/latest/devguide/cassandra-apis.html#cassandra-api-support), drop
// then re-create the changelog lock table when the AWS Keyspaces compatibility mode is enabled.
Scope.getCurrentScope().getLog(InitializeDatabaseChangeLogLockTableGeneratorCassandra.class)
.fine("AWS Keyspaces compatibility mode enabled: using alternative queries to truncate changelog lock table");
RawSqlStatement dropStatement = new RawSqlStatement("DROP TABLE " + databaseChangelogLockTableName);
RawSqlStatement createStatement = buildCreateTableStatement(databaseChangelogLockTableName);

}
statements = new SqlStatement[]{dropStatement, createStatement, insertStatement};
} else {
RawSqlStatement deleteStatement = new RawSqlStatement("TRUNCATE TABLE " + databaseChangelogLockTableName);

statements = new SqlStatement[]{deleteStatement, insertStatement};
}

return SqlGeneratorFactory.getInstance().generateSql(statements, database);

}

}
Loading

0 comments on commit 713d6fb

Please sign in to comment.