diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java index 158349420d..b01136a176 100644 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java @@ -21,6 +21,7 @@ import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder; import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.metadata.TokenMap; import com.datastax.oss.driver.api.core.servererrors.QueryValidationException; @@ -305,7 +306,19 @@ private boolean shouldInitializeTable() { .orElse(true); } + private static int getCassandraMajorVersion(final CqlSession session) { + try { + ResultSet rs = session.execute("SELECT release_version FROM system.local"); + Row row = rs.one(); + String version = row.getString("release_version"); + return Integer.parseInt(version.split("\\.")[0]); + } catch (final Exception e) { + return 0; + } + } + private static void initializeTable(final CqlSession session, final String keyspaceName, final String tableName, final Configuration configuration) { + int cassandraMajorVersion = getCassandraMajorVersion(session); CreateTableWithOptions createTable = createTable(keyspaceName, tableName) .ifNotExists() .withPartitionKey(KEY_COLUMN_NAME, DataTypes.BLOB) @@ -313,7 +326,7 @@ private static void initializeTable(final CqlSession session, final String keysp .withColumn(VALUE_COLUMN_NAME, DataTypes.BLOB); createTable = compactionOptions(createTable, configuration); - createTable = compressionOptions(createTable, configuration); + createTable = compressionOptions(createTable, configuration, cassandraMajorVersion); createTable = gcGraceSeconds(createTable, configuration); createTable = speculativeRetryOptions(createTable, configuration); @@ -321,7 +334,8 @@ private static void initializeTable(final CqlSession session, final String keysp } private static CreateTableWithOptions compressionOptions(final CreateTableWithOptions createTable, - final Configuration configuration) { + final Configuration configuration, + final int cassandraMajorVersion) { if (!configuration.get(CF_COMPRESSION)) { // No compression return createTable.withNoCompression(); @@ -329,9 +343,14 @@ private static CreateTableWithOptions compressionOptions(final CreateTableWithOp String compressionType = configuration.get(CF_COMPRESSION_TYPE); int chunkLengthInKb = configuration.get(CF_COMPRESSION_BLOCK_SIZE); + Map options; + + if (cassandraMajorVersion >= 5) + options = ImmutableMap.of("class", compressionType, "chunk_length_in_kb", chunkLengthInKb); + else + options = ImmutableMap.of("sstable_compression", compressionType, "chunk_length_kb", chunkLengthInKb); - return createTable.withOption("compression", - ImmutableMap.of("class", compressionType, "chunk_length_in_kb", chunkLengthInKb)); + return createTable.withOption("compression", options); } static CreateTableWithOptions compactionOptions(final CreateTableWithOptions createTable,