From c1130c9faa68e0088274c7bb9fb2332459db7150 Mon Sep 17 00:00:00 2001 From: Glenn Sheasby Date: Tue, 11 Jan 2022 17:34:48 +0000 Subject: [PATCH] ABR9: CleanTransactionsTablesTask (#5842) --- .../atlasdb/backup/AtlasRestoreService.java | 14 +- .../backup/AtlasRestoreServiceTest.java | 8 +- atlasdb-cassandra/build.gradle | 1 + .../backup/CassandraRepairHelper.java | 15 +- .../atlasdb/cassandra/backup/CqlCluster.java | 6 + .../atlasdb/cassandra/backup/CqlSession.java | 10 + .../cassandra/backup/TransactionAborter.java | 248 +++++++++++++ .../Transactions1TableInteraction.java | 10 +- .../Transactions2TableInteraction.java | 10 +- .../Transactions3TableInteraction.java | 10 +- .../TransactionsTableInteraction.java | 6 +- .../backup/TransactionAborterTest.java | 340 ++++++++++++++++++ .../TransactionsTableInteractionTest.java | 121 +++++++ .../atlasdb/ete/CassandraRepairEteTest.java | 51 ++- 14 files changed, 800 insertions(+), 50 deletions(-) create mode 100644 atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/TransactionAborter.java create mode 100644 atlasdb-cassandra/src/test/java/com/palantir/atlasdb/cassandra/backup/TransactionAborterTest.java create mode 100644 atlasdb-cassandra/src/test/java/com/palantir/atlasdb/cassandra/backup/transaction/TransactionsTableInteractionTest.java diff --git a/atlasdb-backup/src/main/java/com/palantir/atlasdb/backup/AtlasRestoreService.java b/atlasdb-backup/src/main/java/com/palantir/atlasdb/backup/AtlasRestoreService.java index baa2cc44755..323540b9a40 100644 --- a/atlasdb-backup/src/main/java/com/palantir/atlasdb/backup/AtlasRestoreService.java +++ b/atlasdb-backup/src/main/java/com/palantir/atlasdb/backup/AtlasRestoreService.java @@ -16,6 +16,7 @@ package com.palantir.atlasdb.backup; +import com.datastax.driver.core.policies.DefaultRetryPolicy; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import com.palantir.atlasdb.backup.api.AtlasRestoreClientBlocking; @@ -25,6 +26,7 @@ import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig; import com.palantir.atlasdb.cassandra.backup.CassandraRepairHelper; import com.palantir.atlasdb.cassandra.backup.RangesForRepair; +import com.palantir.atlasdb.cassandra.backup.transaction.TransactionsTableInteraction; import com.palantir.atlasdb.internalschema.InternalSchemaMetadataState; import com.palantir.atlasdb.keyvalue.api.KeyValueService; import com.palantir.atlasdb.timelock.api.Namespace; @@ -37,6 +39,7 @@ import com.palantir.refreshable.Refreshable; import com.palantir.timestamp.FullyBoundedTimestampRange; import com.palantir.tokens.auth.AuthHeader; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -102,15 +105,20 @@ public Set repairInternalTables( // RepairTransactionsTablesTask KeyedStream.stream(completedBackups) .forEach((namespace, completedBackup) -> - repairTransactionsTables(namespace, completedBackup, repairTable)); + restoreTransactionsTables(namespace, completedBackup, repairTable)); return namespacesToRepair; } - private void repairTransactionsTables( + private void restoreTransactionsTables( Namespace namespace, CompletedBackup completedBackup, BiConsumer repairTable) { Map coordinationMap = getCoordinationMap(namespace, completedBackup); - cassandraRepairHelper.repairTransactionsTables(namespace, coordinationMap, repairTable); + List transactionsTableInteractions = + TransactionsTableInteraction.getTransactionTableInteractions( + coordinationMap, DefaultRetryPolicy.INSTANCE); + cassandraRepairHelper.repairTransactionsTables(namespace, transactionsTableInteractions, repairTable); + cassandraRepairHelper.cleanTransactionsTables( + namespace, completedBackup.getBackupStartTimestamp(), transactionsTableInteractions); } private Map getCoordinationMap( diff --git a/atlasdb-backup/src/test/java/com/palantir/atlasdb/backup/AtlasRestoreServiceTest.java b/atlasdb-backup/src/test/java/com/palantir/atlasdb/backup/AtlasRestoreServiceTest.java index dc91f719afa..f353ca6327c 100644 --- a/atlasdb-backup/src/test/java/com/palantir/atlasdb/backup/AtlasRestoreServiceTest.java +++ b/atlasdb-backup/src/test/java/com/palantir/atlasdb/backup/AtlasRestoreServiceTest.java @@ -17,7 +17,7 @@ package com.palantir.atlasdb.backup; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; @@ -48,6 +48,7 @@ public class AtlasRestoreServiceTest { private static final Namespace WITH_BACKUP = Namespace.of("with-backup"); private static final Namespace NO_BACKUP = Namespace.of("no-backup"); private static final Namespace FAILING_NAMESPACE = Namespace.of("failing"); + private static final long BACKUP_START_TIMESTAMP = 2L; @Mock private AuthHeader authHeader; @@ -75,7 +76,7 @@ private void storeCompletedBackup(Namespace namespace) { CompletedBackup completedBackup = CompletedBackup.builder() .namespace(namespace) .immutableTimestamp(1L) - .backupStartTimestamp(2L) + .backupStartTimestamp(BACKUP_START_TIMESTAMP) .backupEndTimestamp(3L) .build(); backupPersister.storeCompletedBackup(completedBackup); @@ -87,7 +88,8 @@ public void repairsOnlyWhenBackupPresent() { atlasRestoreService.repairInternalTables(ImmutableSet.of(WITH_BACKUP, NO_BACKUP), doNothingConsumer); verify(cassandraRepairHelper).repairInternalTables(WITH_BACKUP, doNothingConsumer); - verify(cassandraRepairHelper).repairTransactionsTables(eq(WITH_BACKUP), anyMap(), eq(doNothingConsumer)); + verify(cassandraRepairHelper).repairTransactionsTables(eq(WITH_BACKUP), anyList(), eq(doNothingConsumer)); + verify(cassandraRepairHelper).cleanTransactionsTables(eq(WITH_BACKUP), eq(BACKUP_START_TIMESTAMP), anyList()); verifyNoMoreInteractions(cassandraRepairHelper); } diff --git a/atlasdb-cassandra/build.gradle b/atlasdb-cassandra/build.gradle index e259e749153..1a15ff6ca22 100644 --- a/atlasdb-cassandra/build.gradle +++ b/atlasdb-cassandra/build.gradle @@ -53,6 +53,7 @@ dependencies { implementation 'com.fasterxml.jackson.core:jackson-annotations' implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.github.ben-manes.caffeine:caffeine' + implementation 'com.github.rholder:guava-retrying' implementation 'com.google.auto.service:auto-service-annotations' implementation 'com.google.errorprone:error_prone_annotations' implementation 'com.google.protobuf:protobuf-java' diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/CassandraRepairHelper.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/CassandraRepairHelper.java index f3543d9577e..c2fcecd4622 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/CassandraRepairHelper.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/CassandraRepairHelper.java @@ -16,7 +16,6 @@ package com.palantir.atlasdb.cassandra.backup; -import com.datastax.driver.core.policies.DefaultRetryPolicy; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import com.github.benmanes.caffeine.cache.RemovalCause; @@ -36,7 +35,6 @@ import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.logger.SafeLogger; import com.palantir.logsafe.logger.SafeLoggerFactory; -import com.palantir.timestamp.FullyBoundedTimestampRange; import java.io.IOException; import java.net.InetSocketAddress; import java.time.Duration; @@ -102,12 +100,8 @@ private static Stream getTableNamesToRepair(KeyValueService kvs) { public void repairTransactionsTables( Namespace namespace, - Map coordinationMap, + List transactionsTableInteractions, BiConsumer repairTable) { - List transactionsTableInteractions = - TransactionsTableInteraction.getTransactionTableInteractions( - coordinationMap, DefaultRetryPolicy.INSTANCE); - Map tokenRangesForRepair = getRangesForRepairByTable(namespace, transactionsTableInteractions); @@ -117,6 +111,13 @@ public void repairTransactionsTables( }); } + public void cleanTransactionsTables( + Namespace namespace, + long startTimestamp, + List transactionsTableInteractions) { + cqlClusters.get(namespace).abortTransactions(startTimestamp, transactionsTableInteractions); + } + private Map getRangesForRepairByTable( Namespace namespace, List transactionsTableInteractions) { return KeyedStream.stream(getRawRangesForRepairByTable(namespace, transactionsTableInteractions)) diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/CqlCluster.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/CqlCluster.java index 9d73322e869..6b4e788f3d2 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/CqlCluster.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/CqlCluster.java @@ -61,4 +61,10 @@ public Map>> getTra .getTransactionTableRangesForRepair(transactionsTableInteractions); } } + + public void abortTransactions(long timestamp, List transactionsTableInteractions) { + try (CqlSession session = new CqlSession(cluster.connect())) { + new TransactionAborter(session, config).abortTransactions(timestamp, transactionsTableInteractions); + } + } } diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/CqlSession.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/CqlSession.java index a9199e460a8..7b00c8742fc 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/CqlSession.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/CqlSession.java @@ -17,6 +17,8 @@ package com.palantir.atlasdb.cassandra.backup; import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; import com.datastax.driver.core.Statement; import com.palantir.atlasdb.keyvalue.cassandra.CassandraConstants; @@ -51,4 +53,12 @@ public Set retrieveRowKeysAtConsistencyAll(List .map(LightweightOppToken::serialize) .collect(Collectors.toSet()); } + + public PreparedStatement prepare(Statement statement) { + return session.prepare(statement.toString()); + } + + public ResultSet execute(Statement statement) { + return session.execute(statement); + } } diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/TransactionAborter.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/TransactionAborter.java new file mode 100644 index 00000000000..11a4ee1ad10 --- /dev/null +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/TransactionAborter.java @@ -0,0 +1,248 @@ +/* + * (c) Copyright 2022 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.cassandra.backup; + +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.TableMetadata; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.Retryer; +import com.github.rholder.retry.StopStrategies; +import com.github.rholder.retry.WaitStrategies; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterators; +import com.google.common.collect.Streams; +import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig; +import com.palantir.atlasdb.cassandra.backup.transaction.TransactionTableEntries; +import com.palantir.atlasdb.cassandra.backup.transaction.TransactionTableEntry; +import com.palantir.atlasdb.cassandra.backup.transaction.TransactionsTableInteraction; +import com.palantir.atlasdb.pue.PutUnlessExistsValue; +import com.palantir.common.streams.KeyedStream; +import com.palantir.logsafe.Preconditions; +import com.palantir.logsafe.SafeArg; +import com.palantir.logsafe.exceptions.SafeIllegalStateException; +import com.palantir.logsafe.logger.SafeLogger; +import com.palantir.logsafe.logger.SafeLoggerFactory; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +final class TransactionAborter { + private static final SafeLogger log = SafeLoggerFactory.get(TransactionAborter.class); + + private static final int RETRY_COUNT = 3; + + private final CqlSession cqlSession; + private final CassandraKeyValueServiceConfig config; + private final Retryer abortRetryer; + + public TransactionAborter(CqlSession cqlSession, CassandraKeyValueServiceConfig config) { + this.cqlSession = cqlSession; + this.config = config; + + this.abortRetryer = new Retryer<>( + StopStrategies.stopAfterAttempt(RETRY_COUNT), + WaitStrategies.fixedWait(1L, TimeUnit.SECONDS), + attempt -> !attempt.hasResult() || !attempt.getResult()); + } + + public void abortTransactions(long timestamp, List transactionsTableInteractions) { + CqlMetadata clusterMetadata = cqlSession.getMetadata(); + String keyspaceName = config.getKeyspaceOrThrow(); + transactionsTableInteractions.forEach( + txnInteraction -> abortTransactions(clusterMetadata, keyspaceName, timestamp, txnInteraction)); + } + + private void abortTransactions( + CqlMetadata clusterMetadata, + String keyspaceName, + long timestamp, + TransactionsTableInteraction txnInteraction) { + log.info( + "Aborting transactions after backup timestamp", + SafeArg.of("backupTimestamp", timestamp), + SafeArg.of("keyspace", keyspaceName), + SafeArg.of("table", txnInteraction.getTransactionsTableName())); + + TableMetadata transactionsTable = ClusterMetadataUtils.getTableMetadata( + clusterMetadata, keyspaceName, txnInteraction.getTransactionsTableName()); + + PreparedStatement preparedAbortStatement = txnInteraction.prepareAbortStatement(transactionsTable, cqlSession); + PreparedStatement preparedCheckStatement = txnInteraction.prepareCheckStatement(transactionsTable, cqlSession); + Stream keysToAbort = + getTransactionsToAbort(keyspaceName, txnInteraction, transactionsTable, timestamp); + executeTransactionAborts( + keyspaceName, txnInteraction, preparedAbortStatement, preparedCheckStatement, keysToAbort); + } + + @VisibleForTesting + Stream getTransactionsToAbort( + String keyspaceName, + TransactionsTableInteraction txnInteraction, + TableMetadata transactionsTable, + long timestamp) { + Stream rowResults = + txnInteraction.createSelectStatementsForScanningFullTimestampRange(transactionsTable).stream() + .map(select -> cqlSession.execute(select).iterator()) + .flatMap(Streams::stream); + + return KeyedStream.of(rowResults) + .map(txnInteraction::extractTimestamps) + .filter(entry -> isInRange(keyspaceName, txnInteraction, entry, timestamp)) + .values(); + } + + private static boolean isInRange( + String keyspaceName, + TransactionsTableInteraction txnInteraction, + TransactionTableEntry entry, + long timestamp) { + Optional maybeCommitTimestamp = getCommitTimestamp(entry); + if (maybeCommitTimestamp.isEmpty()) { + return false; + } + + long startTimestamp = TransactionTableEntries.getStartTimestamp(entry); + long commitTimestamp = maybeCommitTimestamp.get(); + boolean isInRange = txnInteraction.getTimestampRange().contains(startTimestamp); + if (commitTimestamp <= timestamp || !isInRange) { + return false; + } + + log.debug( + "Found transaction to abort", + SafeArg.of("startTimestamp", startTimestamp), + SafeArg.of("commitTimestamp", commitTimestamp), + SafeArg.of("keyspace", keyspaceName), + SafeArg.of("table", txnInteraction.getTransactionsTableName())); + return true; + } + + @VisibleForTesting + void executeTransactionAborts( + String keyspace, + TransactionsTableInteraction txnInteraction, + PreparedStatement preparedAbortStatement, + PreparedStatement preparedCheckStatement, + Stream entries) { + entries.forEach(entry -> { + Statement abortStatement = txnInteraction.bindAbortStatement(preparedAbortStatement, entry); + Statement checkStatement = txnInteraction.bindCheckStatement(preparedCheckStatement, entry); + executeWithRetry(keyspace, txnInteraction, abortStatement, checkStatement, entry); + }); + } + + private void executeWithRetry( + String keyspace, + TransactionsTableInteraction txnInteraction, + Statement abortStatement, + Statement checkStatement, + TransactionTableEntry entry) { + long startTs = TransactionTableEntries.getStartTimestamp(entry); + long commitTs = getCommitTimestamp(entry).orElseThrow(); + + Preconditions.checkArgument( + abortStatement.getSerialConsistencyLevel() == ConsistencyLevel.SERIAL, + "Abort statement was not at expected consistency level", + SafeArg.of("consistencyLevel", abortStatement.getSerialConsistencyLevel()), + SafeArg.of("expectedConsistencyLevel", ConsistencyLevel.SERIAL)); + Preconditions.checkArgument( + checkStatement.getSerialConsistencyLevel() == ConsistencyLevel.SERIAL, + "Check statement was not at expected consistency level", + SafeArg.of("consistencyLevel", checkStatement.getSerialConsistencyLevel()), + SafeArg.of("expectedConsistencyLevel", ConsistencyLevel.SERIAL)); + + try { + abortRetryer.call(() -> + tryAbortTransactions(keyspace, txnInteraction, abortStatement, checkStatement, startTs, commitTs)); + } catch (ExecutionException e) { + throw new SafeIllegalStateException( + "Failed to execute transaction abort", + e, + SafeArg.of("startTs", startTs), + SafeArg.of("commitTs", commitTs), + SafeArg.of("retryCount", RETRY_COUNT), + SafeArg.of("keyspace", keyspace)); + + } catch (RetryException e) { + throw new SafeIllegalStateException( + "Unable to abort transactions even with retry", + e, + SafeArg.of("startTs", startTs), + SafeArg.of("commitTs", commitTs), + SafeArg.of("retryCount", RETRY_COUNT), + SafeArg.of("keyspace", keyspace)); + } + } + + private boolean tryAbortTransactions( + String keyspace, + TransactionsTableInteraction txnInteraction, + Statement abortStatement, + Statement checkStatement, + long startTs, + long commitTs) { + log.info( + "Aborting transaction", + SafeArg.of("startTs", startTs), + SafeArg.of("commitTs", commitTs), + SafeArg.of("keyspace", keyspace)); + ResultSet abortResultSet = cqlSession.execute(abortStatement); + if (abortResultSet.wasApplied()) { + return true; + } + + log.debug( + "Executing check statement", + SafeArg.of("startTs", startTs), + SafeArg.of("commitTs", commitTs), + SafeArg.of("keyspace", keyspace)); + ResultSet checkResultSet = cqlSession.execute(checkStatement); + Row result = Iterators.getOnlyElement(checkResultSet.all().iterator()); + + TransactionTableEntry transactionTableEntry = txnInteraction.extractTimestamps(result); + if (isAborted(transactionTableEntry)) { + return true; + } + + log.warn( + "Retrying abort statement", + SafeArg.of("startTs", startTs), + SafeArg.of("commitTs", commitTs), + SafeArg.of("keyspace", keyspace)); + return false; + } + + private static boolean isAborted(TransactionTableEntry transactionTableEntry) { + return TransactionTableEntries.caseOf(transactionTableEntry) + .explicitlyAborted(_startTs -> true) + .otherwise(() -> false); + } + + private static Optional getCommitTimestamp(TransactionTableEntry entry) { + return TransactionTableEntries.getCommitTimestamp(entry).or(() -> getCommitValue(entry)); + } + + private static Optional getCommitValue(TransactionTableEntry entry) { + return TransactionTableEntries.getCommitValue(entry).map(PutUnlessExistsValue::value); + } +} diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/transaction/Transactions1TableInteraction.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/transaction/Transactions1TableInteraction.java index ed08611066a..29db3eb2bcc 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/transaction/Transactions1TableInteraction.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/transaction/Transactions1TableInteraction.java @@ -20,7 +20,6 @@ import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; import com.datastax.driver.core.Statement; import com.datastax.driver.core.TableMetadata; import com.datastax.driver.core.policies.DefaultRetryPolicy; @@ -28,6 +27,7 @@ import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.utils.Bytes; import com.google.common.collect.ImmutableList; +import com.palantir.atlasdb.cassandra.backup.CqlSession; import com.palantir.atlasdb.keyvalue.api.Cell; import com.palantir.atlasdb.keyvalue.cassandra.CassandraConstants; import com.palantir.atlasdb.transaction.encoding.V1EncodingStrategy; @@ -61,7 +61,7 @@ public String getTransactionsTableName() { } @Override - public PreparedStatement prepareAbortStatement(TableMetadata transactionsTable, Session session) { + public PreparedStatement prepareAbortStatement(TableMetadata transactionsTable, CqlSession session) { Statement abortStatement = QueryBuilder.update(transactionsTable) .with(QueryBuilder.set(CassandraConstants.VALUE, ByteBuffer.wrap(ABORT_COMMIT_TS_ENCODED))) .where(QueryBuilder.eq(CassandraConstants.ROW, QueryBuilder.bindMarker())) @@ -69,17 +69,17 @@ public PreparedStatement prepareAbortStatement(TableMetadata transactionsTable, .and(QueryBuilder.eq(CassandraConstants.TIMESTAMP, CassandraConstants.ENCODED_CAS_TABLE_TIMESTAMP)) .onlyIf(QueryBuilder.eq(CassandraConstants.VALUE, QueryBuilder.bindMarker())); // if you change this from CAS then you must update RetryPolicy - return session.prepare(abortStatement.toString()); + return session.prepare(abortStatement); } @Override - public PreparedStatement prepareCheckStatement(TableMetadata transactionsTable, Session session) { + public PreparedStatement prepareCheckStatement(TableMetadata transactionsTable, CqlSession session) { Statement checkStatement = QueryBuilder.select() .from(transactionsTable) .where(QueryBuilder.eq(CassandraConstants.ROW, QueryBuilder.bindMarker())) .and(QueryBuilder.eq(CassandraConstants.COLUMN, COLUMN_NAME_BB)) .and(QueryBuilder.eq(CassandraConstants.TIMESTAMP, CassandraConstants.ENCODED_CAS_TABLE_TIMESTAMP)); - return session.prepare(checkStatement.toString()); + return session.prepare(checkStatement); } @Override diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/transaction/Transactions2TableInteraction.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/transaction/Transactions2TableInteraction.java index 56867ae2654..f8303a41c09 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/transaction/Transactions2TableInteraction.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/transaction/Transactions2TableInteraction.java @@ -20,13 +20,13 @@ import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; import com.datastax.driver.core.Statement; import com.datastax.driver.core.TableMetadata; import com.datastax.driver.core.policies.DefaultRetryPolicy; import com.datastax.driver.core.policies.RetryPolicy; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.utils.Bytes; +import com.palantir.atlasdb.cassandra.backup.CqlSession; import com.palantir.atlasdb.keyvalue.api.Cell; import com.palantir.atlasdb.keyvalue.cassandra.CassandraConstants; import com.palantir.atlasdb.transaction.encoding.TicketsEncodingStrategy; @@ -57,7 +57,7 @@ public String getTransactionsTableName() { } @Override - public PreparedStatement prepareAbortStatement(TableMetadata transactionsTable, Session session) { + public PreparedStatement prepareAbortStatement(TableMetadata transactionsTable, CqlSession session) { ByteBuffer abortCommitTsBb = ByteBuffer.wrap(TicketsEncodingStrategy.ABORTED_TRANSACTION_VALUE); Statement abortStatement = QueryBuilder.update(transactionsTable) @@ -67,17 +67,17 @@ public PreparedStatement prepareAbortStatement(TableMetadata transactionsTable, .and(QueryBuilder.eq(CassandraConstants.TIMESTAMP, CassandraConstants.ENCODED_CAS_TABLE_TIMESTAMP)) .onlyIf(QueryBuilder.eq(CassandraConstants.VALUE, QueryBuilder.bindMarker())); // if you change this from CAS then you must update RetryPolicy - return session.prepare(abortStatement.toString()); + return session.prepare(abortStatement); } @Override - public PreparedStatement prepareCheckStatement(TableMetadata transactionsTable, Session session) { + public PreparedStatement prepareCheckStatement(TableMetadata transactionsTable, CqlSession session) { Statement checkStatement = QueryBuilder.select() .from(transactionsTable) .where(QueryBuilder.eq(CassandraConstants.ROW, QueryBuilder.bindMarker())) .and(QueryBuilder.eq(CassandraConstants.COLUMN, QueryBuilder.bindMarker())) .and(QueryBuilder.eq(CassandraConstants.TIMESTAMP, CassandraConstants.ENCODED_CAS_TABLE_TIMESTAMP)); - return session.prepare(checkStatement.toString()); + return session.prepare(checkStatement); } @Override diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/transaction/Transactions3TableInteraction.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/transaction/Transactions3TableInteraction.java index 1b440d41172..f9cff9f17f5 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/transaction/Transactions3TableInteraction.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/transaction/Transactions3TableInteraction.java @@ -20,13 +20,13 @@ import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; import com.datastax.driver.core.Statement; import com.datastax.driver.core.TableMetadata; import com.datastax.driver.core.policies.DefaultRetryPolicy; import com.datastax.driver.core.policies.RetryPolicy; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.utils.Bytes; +import com.palantir.atlasdb.cassandra.backup.CqlSession; import com.palantir.atlasdb.keyvalue.api.Cell; import com.palantir.atlasdb.keyvalue.cassandra.CassandraConstants; import com.palantir.atlasdb.keyvalue.cassandra.CellValuePutter; @@ -59,7 +59,7 @@ public String getTransactionsTableName() { } @Override - public PreparedStatement prepareAbortStatement(TableMetadata transactionsTable, Session session) { + public PreparedStatement prepareAbortStatement(TableMetadata transactionsTable, CqlSession session) { // we are declaring bankruptcy if this fails anyway ByteBuffer abortCommitTsBb = ByteBuffer.wrap(TwoPhaseEncodingStrategy.ABORTED_TRANSACTION_COMMITTED_VALUE); @@ -71,17 +71,17 @@ public PreparedStatement prepareAbortStatement(TableMetadata transactionsTable, .using(QueryBuilder.timestamp(CellValuePutter.SET_TIMESTAMP + 1)) .onlyIf(QueryBuilder.eq(CassandraConstants.VALUE, QueryBuilder.bindMarker())); // if you change this from CAS then you must update RetryPolicy - return session.prepare(abortStatement.toString()); + return session.prepare(abortStatement); } @Override - public PreparedStatement prepareCheckStatement(TableMetadata transactionsTable, Session session) { + public PreparedStatement prepareCheckStatement(TableMetadata transactionsTable, CqlSession session) { Statement checkStatement = QueryBuilder.select() .from(transactionsTable) .where(QueryBuilder.eq(CassandraConstants.ROW, QueryBuilder.bindMarker())) .and(QueryBuilder.eq(CassandraConstants.COLUMN, QueryBuilder.bindMarker())) .and(QueryBuilder.eq(CassandraConstants.TIMESTAMP, CassandraConstants.ENCODED_CAS_TABLE_TIMESTAMP)); - return session.prepare(checkStatement.toString()); + return session.prepare(checkStatement); } @Override diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/transaction/TransactionsTableInteraction.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/transaction/TransactionsTableInteraction.java index 7d8ebe6e9d5..42f7cac9a41 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/transaction/TransactionsTableInteraction.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/cassandra/backup/transaction/TransactionsTableInteraction.java @@ -18,10 +18,10 @@ import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; import com.datastax.driver.core.Statement; import com.datastax.driver.core.TableMetadata; import com.datastax.driver.core.policies.RetryPolicy; +import com.palantir.atlasdb.cassandra.backup.CqlSession; import com.palantir.atlasdb.transaction.impl.TransactionConstants; import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.exceptions.SafeIllegalArgumentException; @@ -42,9 +42,9 @@ public interface TransactionsTableInteraction { List createSelectStatementsForScanningFullTimestampRange(TableMetadata transactionsTable); - PreparedStatement prepareAbortStatement(TableMetadata transactionsTable, Session session); + PreparedStatement prepareAbortStatement(TableMetadata transactionsTable, CqlSession session); - PreparedStatement prepareCheckStatement(TableMetadata transactionsTable, Session session); + PreparedStatement prepareCheckStatement(TableMetadata transactionsTable, CqlSession session); Statement bindCheckStatement(PreparedStatement preparedCheckStatement, TransactionTableEntry entry); diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/cassandra/backup/TransactionAborterTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/cassandra/backup/TransactionAborterTest.java new file mode 100644 index 00000000000..68533a7fe8f --- /dev/null +++ b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/cassandra/backup/TransactionAborterTest.java @@ -0,0 +1,340 @@ +/* + * (c) Copyright 2022 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.cassandra.backup; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.TableMetadata; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Range; +import com.palantir.atlasdb.AtlasDbConstants; +import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig; +import com.palantir.atlasdb.cassandra.backup.transaction.TransactionTableEntries; +import com.palantir.atlasdb.cassandra.backup.transaction.TransactionTableEntry; +import com.palantir.atlasdb.cassandra.backup.transaction.TransactionsTableInteraction; +import com.palantir.atlasdb.pue.PutUnlessExistsValue; +import com.palantir.timestamp.FullyBoundedTimestampRange; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TransactionAborterTest { + private static final long BACKUP_TIMESTAMP = 123; + + private static final FullyBoundedTimestampRange TIMESTAMP_RANGE = + FullyBoundedTimestampRange.of(Range.closed(AtlasDbConstants.STARTING_TS, 200L)); + + private static final String TXN_TABLE_NAME = "txn_table"; + private static final String KEYSPACE = "keyspace"; + + @Mock + private CqlSession cqlSession; + + @Mock + private CassandraKeyValueServiceConfig config; + + @Mock + private TransactionsTableInteraction transactionInteraction; + + @Mock + private Statement selectStatement; + + @Mock + private PreparedStatement preparedAbortStatement; + + @Mock + private Statement abortStatement; + + @Mock + private PreparedStatement preparedCheckStatement; + + @Mock + private Statement checkStatement; + + @Mock + private TableMetadata tableMetadata; + + private TransactionAborter transactionAborter; + + @Before + public void before() { + when(transactionInteraction.getTransactionsTableName()).thenReturn(TXN_TABLE_NAME); + + when(abortStatement.getSerialConsistencyLevel()).thenReturn(ConsistencyLevel.SERIAL); + when(transactionInteraction.bindAbortStatement(eq(preparedAbortStatement), any())) + .thenReturn(abortStatement); + + when(checkStatement.getSerialConsistencyLevel()).thenReturn(ConsistencyLevel.SERIAL); + when(transactionInteraction.bindCheckStatement(eq(preparedCheckStatement), any())) + .thenReturn(checkStatement); + + when(config.getKeyspaceOrThrow()).thenReturn(KEYSPACE); + + when(tableMetadata.getName()).thenReturn(TXN_TABLE_NAME); + KeyspaceMetadata keyspaceMetadata = mock(KeyspaceMetadata.class); + when(keyspaceMetadata.getTables()).thenReturn(ImmutableList.of(tableMetadata)); + CqlMetadata cqlMetadata = mock(CqlMetadata.class); + when(cqlMetadata.getKeyspaceMetadata(KEYSPACE)).thenReturn(keyspaceMetadata); + when(cqlSession.getMetadata()).thenReturn(cqlMetadata); + + doReturn(ImmutableList.of(selectStatement)) + .when(transactionInteraction) + .createSelectStatementsForScanningFullTimestampRange(any()); + ResultSet selectResponse = createSelectResponse(ImmutableList.of()); + when(cqlSession.execute(selectStatement)).thenReturn(selectResponse); + + doReturn(mock(PreparedStatement.class)).when(transactionInteraction).prepareAbortStatement(any(), any()); + doReturn(mock(PreparedStatement.class)).when(transactionInteraction).prepareCheckStatement(any(), any()); + + transactionAborter = new TransactionAborter(cqlSession, config); + } + + @Test + public void willNotRunAbortWhenNothingToAbort() { + transactionAborter.abortTransactions(BACKUP_TIMESTAMP, List.of(transactionInteraction)); + + verify(cqlSession, times(1)).execute(selectStatement); + verify(cqlSession, times(1)).getMetadata(); + verifyNoMoreInteractions(cqlSession); + } + + @Test + public void willNotAbortTimestampsLowerThanBackupTimestamp() { + ImmutableList rows = ImmutableList.of( + TransactionTableEntries.committedLegacy(10L, 20L), + TransactionTableEntries.committedTwoPhase(20L, PutUnlessExistsValue.committed(30L)), + TransactionTableEntries.committedLegacy(30L, BACKUP_TIMESTAMP - 1)); + setupAbortTimestampTask(rows, TIMESTAMP_RANGE); + + Stream transactionsToAbort = transactionAborter.getTransactionsToAbort( + KEYSPACE, transactionInteraction, tableMetadata, BACKUP_TIMESTAMP); + assertThat(transactionsToAbort.count()).isZero(); + } + + @Test + public void willNotAbortTimestampsOutsideRange() { + ImmutableList rows = ImmutableList.of( + TransactionTableEntries.committedLegacy(BACKUP_TIMESTAMP + 10, BACKUP_TIMESTAMP + 12), + TransactionTableEntries.committedTwoPhase( + BACKUP_TIMESTAMP + 12, PutUnlessExistsValue.committed(BACKUP_TIMESTAMP + 13)), + TransactionTableEntries.committedLegacy(BACKUP_TIMESTAMP + 14, BACKUP_TIMESTAMP + 15)); + setupAbortTimestampTask(rows, FullyBoundedTimestampRange.of(Range.closed(BACKUP_TIMESTAMP + 16, 1000L))); + + Stream transactionsToAbort = transactionAborter.getTransactionsToAbort( + KEYSPACE, transactionInteraction, tableMetadata, BACKUP_TIMESTAMP); + assertThat(transactionsToAbort.count()).isZero(); + } + + @Test + public void willNotAbortTimestampsAlreadyAborted() { + ImmutableList rows = + ImmutableList.of(TransactionTableEntries.explicitlyAborted(BACKUP_TIMESTAMP + 1)); + setupAbortTimestampTask(rows, TIMESTAMP_RANGE); + + Stream transactionsToAbort = transactionAborter.getTransactionsToAbort( + KEYSPACE, transactionInteraction, tableMetadata, BACKUP_TIMESTAMP); + assertThat(transactionsToAbort.count()).isZero(); + } + + @Test + public void willAbortTimestampInRange() { + ImmutableList rows = ImmutableList.of( + TransactionTableEntries.committedLegacy(BACKUP_TIMESTAMP + 1, BACKUP_TIMESTAMP + 2), + TransactionTableEntries.committedTwoPhase( + BACKUP_TIMESTAMP + 3, PutUnlessExistsValue.committed(BACKUP_TIMESTAMP + 4))); + setupAbortTimestampTask(rows, FullyBoundedTimestampRange.of(Range.closed(25L, 1000L))); + + Stream transactionsToAbort = transactionAborter.getTransactionsToAbort( + KEYSPACE, transactionInteraction, tableMetadata, BACKUP_TIMESTAMP); + assertThat(transactionsToAbort.count()).isEqualTo(2L); + } + + @Test + public void willAbortTimestampsHigherThanBackupTimestamp() { + ImmutableList rows = ImmutableList.of( + TransactionTableEntries.committedLegacy(BACKUP_TIMESTAMP + 1, BACKUP_TIMESTAMP + 2), + TransactionTableEntries.committedTwoPhase( + BACKUP_TIMESTAMP + 3, PutUnlessExistsValue.committed(BACKUP_TIMESTAMP + 4))); + setupAbortTimestampTask(rows, TIMESTAMP_RANGE); + + Stream transactionsToAbort = transactionAborter.getTransactionsToAbort( + KEYSPACE, transactionInteraction, tableMetadata, BACKUP_TIMESTAMP); + assertThat(transactionsToAbort.count()).isEqualTo(2L); + } + + @Test + public void willAbortTimestampsThatStartBeforeButEndAfterBackupTimestamp() { + ImmutableList rows = ImmutableList.of( + TransactionTableEntries.committedLegacy(BACKUP_TIMESTAMP - 2, BACKUP_TIMESTAMP + 1), + TransactionTableEntries.committedTwoPhase( + BACKUP_TIMESTAMP - 1, PutUnlessExistsValue.committed(BACKUP_TIMESTAMP + 2))); + setupAbortTimestampTask(rows, TIMESTAMP_RANGE); + + Stream transactionsToAbort = transactionAborter.getTransactionsToAbort( + KEYSPACE, transactionInteraction, tableMetadata, BACKUP_TIMESTAMP); + assertThat(transactionsToAbort.count()).isEqualTo(2L); + } + + @Test + public void willAbortTimestampsThatStartAfterBackupTimestampAndEndOutsideRange() { + long endOfRange = TIMESTAMP_RANGE.inclusiveUpperBound(); + ImmutableList rows = ImmutableList.of( + TransactionTableEntries.committedLegacy(BACKUP_TIMESTAMP + 1, endOfRange + 1), + TransactionTableEntries.committedTwoPhase( + BACKUP_TIMESTAMP + 2, PutUnlessExistsValue.committed(endOfRange + 2))); + setupAbortTimestampTask(rows, TIMESTAMP_RANGE); + + Stream transactionsToAbort = transactionAborter.getTransactionsToAbort( + KEYSPACE, transactionInteraction, tableMetadata, BACKUP_TIMESTAMP); + assertThat(transactionsToAbort.count()).isEqualTo(2L); + } + + @Test + public void willNotAbortTimestampsThatStartAfterRange() { + long endOfRange = TIMESTAMP_RANGE.inclusiveUpperBound(); + ImmutableList rows = ImmutableList.of( + TransactionTableEntries.committedLegacy(endOfRange + 1, endOfRange + 2), + TransactionTableEntries.committedTwoPhase( + endOfRange + 3, PutUnlessExistsValue.committed(endOfRange + 4))); + setupAbortTimestampTask(rows, TIMESTAMP_RANGE); + + Stream transactionsToAbort = transactionAborter.getTransactionsToAbort( + KEYSPACE, transactionInteraction, tableMetadata, BACKUP_TIMESTAMP); + assertThat(transactionsToAbort.count()).isEqualTo(0L); + } + + @Test + public void executeWithRetryTriesSingleTimeIfAbortSucceeds() { + ResultSet abortResponse = createAbortResponse(true); + when(cqlSession.execute(abortStatement)).thenReturn(abortResponse); + + Stream entries = Stream.of(TransactionTableEntries.committedLegacy(100L, 101L)); + transactionAborter.executeTransactionAborts( + KEYSPACE, transactionInteraction, preparedAbortStatement, preparedCheckStatement, entries); + + verify(cqlSession).execute(abortStatement); + } + + @Test + public void executeWithRetryChecksIfAbortWasNotAppliedAndRetriesIfNoMatch() { + ResultSet abortResponse1 = createAbortResponse(false); + ResultSet abortResponse2 = createAbortResponse(true); + + when(cqlSession.execute(abortStatement)).thenReturn(abortResponse1).thenReturn(abortResponse2); + + Row row = mock(Row.class); + ResultSet checkResponse = createSelectResponse(ImmutableList.of(row)); + when(cqlSession.execute(checkStatement)).thenReturn(checkResponse); + + Stream entries = Stream.of(TransactionTableEntries.committedLegacy(100L, 101L)); + transactionAborter.executeTransactionAborts( + KEYSPACE, transactionInteraction, preparedAbortStatement, preparedCheckStatement, entries); + + verify(cqlSession, times(2)).execute(abortStatement); + verify(cqlSession).execute(checkStatement); + } + + @Test + public void executeWithRetryChecksIfAbortWasNotAppliedAndDoesNotRetryIfEqualsAbortTimestamp() { + ResultSet abortResponse = createAbortResponse(false); + when(cqlSession.execute(abortStatement)).thenReturn(abortResponse); + + Row row = mock(Row.class); + ResultSet checkResponse = createSelectResponse(ImmutableList.of(row)); + + TransactionTableEntry entry = TransactionTableEntries.explicitlyAborted(100L); + when(cqlSession.execute(checkStatement)).thenReturn(checkResponse); + when(transactionInteraction.extractTimestamps(row)).thenReturn(entry); + + TransactionTableEntry notYetAborted = TransactionTableEntries.committedLegacy(100L, 101L); + Stream entries = Stream.of(notYetAborted); + transactionAborter.executeTransactionAborts( + KEYSPACE, transactionInteraction, preparedAbortStatement, preparedCheckStatement, entries); + + verify(cqlSession, times(1)).execute(abortStatement); + verify(cqlSession, times(1)).execute(checkStatement); + } + + @Test + public void executeWithRetryChecksEventuallyFails() { + ResultSet abortResponse = createAbortResponse(false); + when(cqlSession.execute(abortStatement)).thenReturn(abortResponse); + + Row row = mock(Row.class); + ResultSet checkResponse = createSelectResponse(ImmutableList.of(row)); + when(cqlSession.execute(checkStatement)).thenReturn(checkResponse); + + Stream entries = Stream.of(TransactionTableEntries.committedLegacy(100L, 101L)); + + assertThatThrownBy(() -> transactionAborter.executeTransactionAborts( + KEYSPACE, transactionInteraction, preparedAbortStatement, preparedCheckStatement, entries)) + .isInstanceOf(IllegalStateException.class); + + verify(cqlSession, times(3)).execute(abortStatement); + verify(cqlSession, times(3)).execute(checkStatement); + } + + private void setupAbortTimestampTask( + ImmutableList entries, FullyBoundedTimestampRange range) { + when(transactionInteraction.getTimestampRange()).thenReturn(range); + List rows = entries.stream() + .map(entry -> { + Row row = mock(Row.class); + when(transactionInteraction.extractTimestamps(row)).thenReturn(entry); + return row; + }) + .collect(Collectors.toList()); + ResultSet selectResponse = createSelectResponse(rows); + when(transactionInteraction.createSelectStatementsForScanningFullTimestampRange(any())) + .thenReturn(ImmutableList.of(mock(Statement.class))); + when(cqlSession.execute(any(Statement.class))).thenReturn(selectResponse); + } + + private static ResultSet createAbortResponse(boolean wasApplied) { + ResultSet response = mock(ResultSet.class); + when(response.wasApplied()).thenReturn(wasApplied); + return response; + } + + private static ResultSet createSelectResponse(List transactions) { + ResultSet response = mock(ResultSet.class); + when(response.iterator()).thenReturn(transactions.iterator()); + when(response.all()).thenReturn(transactions); + return response; + } +} diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/cassandra/backup/transaction/TransactionsTableInteractionTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/cassandra/backup/transaction/TransactionsTableInteractionTest.java new file mode 100644 index 00000000000..2933e9ba78f --- /dev/null +++ b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/cassandra/backup/transaction/TransactionsTableInteractionTest.java @@ -0,0 +1,121 @@ +/* + * (c) Copyright 2022 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.cassandra.backup.transaction; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.datastax.driver.core.policies.DefaultRetryPolicy; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Range; +import com.palantir.logsafe.exceptions.SafeIllegalArgumentException; +import com.palantir.timestamp.FullyBoundedTimestampRange; +import java.util.List; +import java.util.Map; +import org.junit.Test; + +public class TransactionsTableInteractionTest { + private static final DefaultRetryPolicy POLICY = DefaultRetryPolicy.INSTANCE; + private static final FullyBoundedTimestampRange FULL_RANGE = + FullyBoundedTimestampRange.of(Range.closed(1L, 10_000_000L)); + + @Test + public void testGetOnlyTxn1() { + TransactionsTableInteraction onlyInteraction = getSingleInteraction(1); + + assertThat(onlyInteraction).isExactlyInstanceOf(Transactions1TableInteraction.class); + assertThat(onlyInteraction.getTimestampRange()).isEqualTo(FULL_RANGE); + } + + @Test + public void testGetOnlyTxn2() { + TransactionsTableInteraction onlyInteraction = getSingleInteraction(2); + + assertThat(onlyInteraction).isExactlyInstanceOf(Transactions2TableInteraction.class); + assertThat(onlyInteraction.getTimestampRange()).isEqualTo(FULL_RANGE); + } + + @Test + public void testGetOnlyTxn3() { + TransactionsTableInteraction onlyInteraction = getSingleInteraction(3); + + assertThat(onlyInteraction).isExactlyInstanceOf(Transactions3TableInteraction.class); + assertThat(onlyInteraction.getTimestampRange()).isEqualTo(FULL_RANGE); + } + + @Test + public void testGetAllTxnTables() { + Map ranges = ImmutableMap.of( + range(1L, 5L), 1, + range(6L, 10L), 2, + range(11L, 15L), 3); + + List transactionsTableInteractions = + TransactionsTableInteraction.getTransactionTableInteractions(ranges, POLICY); + assertThat(transactionsTableInteractions).hasSize(3); + + assertThat(transactionsTableInteractions) + .hasExactlyElementsOfTypes( + Transactions1TableInteraction.class, + Transactions2TableInteraction.class, + Transactions3TableInteraction.class); + } + + @Test + public void testGetsEachTableMultipleTimes() { + Map ranges = ImmutableMap.of( + range(1L, 5L), 1, + range(6L, 10L), 2, + range(11L, 15L), 1, + range(16L, 20L), 2); + + List transactionsTableInteractions = + TransactionsTableInteraction.getTransactionTableInteractions(ranges, POLICY); + assertThat(transactionsTableInteractions).hasSize(4); + + assertThat(transactionsTableInteractions) + .hasExactlyElementsOfTypes( + Transactions1TableInteraction.class, + Transactions2TableInteraction.class, + Transactions1TableInteraction.class, + Transactions2TableInteraction.class); + } + + @Test + public void testUnsupportedSchemaVersion() { + Map outOfRange = ImmutableMap.of(FULL_RANGE, 9001); + + assertThatThrownBy(() -> TransactionsTableInteraction.getTransactionTableInteractions(outOfRange, POLICY)) + .isExactlyInstanceOf(SafeIllegalArgumentException.class) + .hasMessageContaining("Found unsupported transactions schema version"); + } + + private TransactionsTableInteraction getSingleInteraction(int version) { + Map ranges = ImmutableMap.of(FULL_RANGE, version); + + List transactionTableInteractions = + TransactionsTableInteraction.getTransactionTableInteractions(ranges, POLICY); + + assertThat(transactionTableInteractions).hasSize(1); + return Iterables.getOnlyElement(transactionTableInteractions); + } + + private static FullyBoundedTimestampRange range(long lower, long upper) { + return FullyBoundedTimestampRange.of(Range.closed(lower, upper)); + } +} diff --git a/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/CassandraRepairEteTest.java b/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/CassandraRepairEteTest.java index 37dd0568d33..55f46eba257 100644 --- a/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/CassandraRepairEteTest.java +++ b/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/CassandraRepairEteTest.java @@ -20,6 +20,8 @@ import static org.assertj.core.api.Assertions.assertThat; import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.policies.DefaultRetryPolicy; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Range; import com.google.common.collect.RangeMap; @@ -30,6 +32,10 @@ import com.palantir.atlasdb.cassandra.backup.CqlCluster; import com.palantir.atlasdb.cassandra.backup.CqlMetadata; import com.palantir.atlasdb.cassandra.backup.RangesForRepair; +import com.palantir.atlasdb.cassandra.backup.transaction.Transactions1TableInteraction; +import com.palantir.atlasdb.cassandra.backup.transaction.Transactions2TableInteraction; +import com.palantir.atlasdb.cassandra.backup.transaction.Transactions3TableInteraction; +import com.palantir.atlasdb.cassandra.backup.transaction.TransactionsTableInteraction; import com.palantir.atlasdb.containers.ThreeNodeCassandraCluster; import com.palantir.atlasdb.encoding.PtBytes; import com.palantir.atlasdb.keyvalue.api.Cell; @@ -71,6 +77,7 @@ public final class CassandraRepairEteTest { private static final String TABLE_1 = "table1"; private static final TableReference TABLE_REF = TableReference.create(com.palantir.atlasdb.keyvalue.api.Namespace.create(NAMESPACE_NAME), TABLE_1); + private static final DefaultRetryPolicy POLICY = DefaultRetryPolicy.INSTANCE; private CassandraRepairHelper cassandraRepairHelper; private CassandraKeyValueService kvs; @@ -112,9 +119,9 @@ public void testRepairOnlyTxn1() { List tablesRepaired = new ArrayList<>(); BiConsumer repairer = (table, _unused) -> tablesRepaired.add(table); - Map ranges = - ImmutableMap.of(FullyBoundedTimestampRange.of(Range.closed(1L, 10_000_000L)), 1); - cassandraRepairHelper.repairTransactionsTables(NAMESPACE, ranges, repairer); + List interactions = + ImmutableList.of(new Transactions1TableInteraction(range(1L, 10_000_000L), POLICY)); + cassandraRepairHelper.repairTransactionsTables(NAMESPACE, interactions, repairer); assertThat(tablesRepaired).containsExactly(TransactionConstants.TRANSACTION_TABLE.getTableName()); } @@ -123,9 +130,9 @@ public void testRepairOnlyTxn2() { List tablesRepaired = new ArrayList<>(); BiConsumer repairer = (table, _unused) -> tablesRepaired.add(table); - Map ranges = - ImmutableMap.of(FullyBoundedTimestampRange.of(Range.closed(1L, 10_000_000L)), 2); - cassandraRepairHelper.repairTransactionsTables(NAMESPACE, ranges, repairer); + List interactions = + ImmutableList.of(new Transactions2TableInteraction(range(1L, 10_000_000L), POLICY)); + cassandraRepairHelper.repairTransactionsTables(NAMESPACE, interactions, repairer); assertThat(tablesRepaired).containsExactly(TransactionConstants.TRANSACTIONS2_TABLE.getTableName()); } @@ -134,9 +141,9 @@ public void testRepairTxn3() { List tablesRepaired = new ArrayList<>(); BiConsumer repairer = (table, _unused) -> tablesRepaired.add(table); - Map ranges = - ImmutableMap.of(FullyBoundedTimestampRange.of(Range.closed(1L, 10_000_000L)), 3); - cassandraRepairHelper.repairTransactionsTables(NAMESPACE, ranges, repairer); + List interactions = + ImmutableList.of(new Transactions3TableInteraction(range(1L, 10_000_000L), POLICY)); + cassandraRepairHelper.repairTransactionsTables(NAMESPACE, interactions, repairer); // Transactions3 is backed by Transactions2 under the hood, so this is the table that will be repaired. assertThat(tablesRepaired).containsExactly(TransactionConstants.TRANSACTIONS2_TABLE.getTableName()); @@ -147,10 +154,11 @@ public void testRepairBothTxnTables() { List tablesRepaired = new ArrayList<>(); BiConsumer repairer = (table, _unused) -> tablesRepaired.add(table); - Map ranges = ImmutableMap.of( - FullyBoundedTimestampRange.of(Range.closed(1L, 5L)), 1, - FullyBoundedTimestampRange.of(Range.closed(6L, 10L)), 2); - cassandraRepairHelper.repairTransactionsTables(NAMESPACE, ranges, repairer); + List interactions = ImmutableList.of( + new Transactions1TableInteraction(range(1L, 5L), POLICY), + new Transactions2TableInteraction(range(6L, 10L), POLICY)); + + cassandraRepairHelper.repairTransactionsTables(NAMESPACE, interactions, repairer); assertThat(tablesRepaired) .containsExactlyInAnyOrder( TransactionConstants.TRANSACTION_TABLE.getTableName(), @@ -166,12 +174,13 @@ public void testRepairsEachTableOnceOnly() { tablesRepaired.add(table); }; - Map ranges = ImmutableMap.of( - FullyBoundedTimestampRange.of(Range.closed(1L, 5L)), 1, - FullyBoundedTimestampRange.of(Range.closed(6L, 10L)), 2, - FullyBoundedTimestampRange.of(Range.closed(11L, 15L)), 1, - FullyBoundedTimestampRange.of(Range.closed(16L, 20L)), 2); - cassandraRepairHelper.repairTransactionsTables(NAMESPACE, ranges, repairer); + List interactions = ImmutableList.of( + new Transactions1TableInteraction(range(1L, 5L), POLICY), + new Transactions2TableInteraction(range(6L, 10L), POLICY), + new Transactions1TableInteraction(range(11L, 15L), POLICY), + new Transactions2TableInteraction(range(16L, 20L), POLICY)); + + cassandraRepairHelper.repairTransactionsTables(NAMESPACE, interactions, repairer); assertThat(tablesRepaired) .containsExactlyInAnyOrder( TransactionConstants.TRANSACTION_TABLE.getTableName(), @@ -246,4 +255,8 @@ private Map>> invert( return invertedMap; } + + private static FullyBoundedTimestampRange range(long lower, long upper) { + return FullyBoundedTimestampRange.of(Range.closed(lower, upper)); + } }