Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
ABR9: CleanTransactionsTablesTask (#5842)
Browse files Browse the repository at this point in the history
  • Loading branch information
gsheasby authored Jan 11, 2022
1 parent 39cb1a0 commit c1130c9
Show file tree
Hide file tree
Showing 14 changed files with 800 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.palantir.atlasdb.backup;

import com.datastax.driver.core.policies.DefaultRetryPolicy;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.palantir.atlasdb.backup.api.AtlasRestoreClientBlocking;
Expand All @@ -25,6 +26,7 @@
import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig;
import com.palantir.atlasdb.cassandra.backup.CassandraRepairHelper;
import com.palantir.atlasdb.cassandra.backup.RangesForRepair;
import com.palantir.atlasdb.cassandra.backup.transaction.TransactionsTableInteraction;
import com.palantir.atlasdb.internalschema.InternalSchemaMetadataState;
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.timelock.api.Namespace;
Expand All @@ -37,6 +39,7 @@
import com.palantir.refreshable.Refreshable;
import com.palantir.timestamp.FullyBoundedTimestampRange;
import com.palantir.tokens.auth.AuthHeader;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -102,15 +105,20 @@ public Set<Namespace> repairInternalTables(
// RepairTransactionsTablesTask
KeyedStream.stream(completedBackups)
.forEach((namespace, completedBackup) ->
repairTransactionsTables(namespace, completedBackup, repairTable));
restoreTransactionsTables(namespace, completedBackup, repairTable));

return namespacesToRepair;
}

private void repairTransactionsTables(
private void restoreTransactionsTables(
Namespace namespace, CompletedBackup completedBackup, BiConsumer<String, RangesForRepair> repairTable) {
Map<FullyBoundedTimestampRange, Integer> coordinationMap = getCoordinationMap(namespace, completedBackup);
cassandraRepairHelper.repairTransactionsTables(namespace, coordinationMap, repairTable);
List<TransactionsTableInteraction> transactionsTableInteractions =
TransactionsTableInteraction.getTransactionTableInteractions(
coordinationMap, DefaultRetryPolicy.INSTANCE);
cassandraRepairHelper.repairTransactionsTables(namespace, transactionsTableInteractions, repairTable);
cassandraRepairHelper.cleanTransactionsTables(
namespace, completedBackup.getBackupStartTimestamp(), transactionsTableInteractions);
}

private Map<FullyBoundedTimestampRange, Integer> getCoordinationMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.palantir.atlasdb.backup;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
Expand Down Expand Up @@ -48,6 +48,7 @@ public class AtlasRestoreServiceTest {
private static final Namespace WITH_BACKUP = Namespace.of("with-backup");
private static final Namespace NO_BACKUP = Namespace.of("no-backup");
private static final Namespace FAILING_NAMESPACE = Namespace.of("failing");
private static final long BACKUP_START_TIMESTAMP = 2L;

@Mock
private AuthHeader authHeader;
Expand Down Expand Up @@ -75,7 +76,7 @@ private void storeCompletedBackup(Namespace namespace) {
CompletedBackup completedBackup = CompletedBackup.builder()
.namespace(namespace)
.immutableTimestamp(1L)
.backupStartTimestamp(2L)
.backupStartTimestamp(BACKUP_START_TIMESTAMP)
.backupEndTimestamp(3L)
.build();
backupPersister.storeCompletedBackup(completedBackup);
Expand All @@ -87,7 +88,8 @@ public void repairsOnlyWhenBackupPresent() {
atlasRestoreService.repairInternalTables(ImmutableSet.of(WITH_BACKUP, NO_BACKUP), doNothingConsumer);

verify(cassandraRepairHelper).repairInternalTables(WITH_BACKUP, doNothingConsumer);
verify(cassandraRepairHelper).repairTransactionsTables(eq(WITH_BACKUP), anyMap(), eq(doNothingConsumer));
verify(cassandraRepairHelper).repairTransactionsTables(eq(WITH_BACKUP), anyList(), eq(doNothingConsumer));
verify(cassandraRepairHelper).cleanTransactionsTables(eq(WITH_BACKUP), eq(BACKUP_START_TIMESTAMP), anyList());
verifyNoMoreInteractions(cassandraRepairHelper);
}

Expand Down
1 change: 1 addition & 0 deletions atlasdb-cassandra/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-annotations'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.github.ben-manes.caffeine:caffeine'
implementation 'com.github.rholder:guava-retrying'
implementation 'com.google.auto.service:auto-service-annotations'
implementation 'com.google.errorprone:error_prone_annotations'
implementation 'com.google.protobuf:protobuf-java'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.palantir.atlasdb.cassandra.backup;

import com.datastax.driver.core.policies.DefaultRetryPolicy;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
Expand All @@ -36,7 +35,6 @@
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.timestamp.FullyBoundedTimestampRange;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
Expand Down Expand Up @@ -102,12 +100,8 @@ private static Stream<String> getTableNamesToRepair(KeyValueService kvs) {

public void repairTransactionsTables(
Namespace namespace,
Map<FullyBoundedTimestampRange, Integer> coordinationMap,
List<TransactionsTableInteraction> transactionsTableInteractions,
BiConsumer<String, RangesForRepair> repairTable) {
List<TransactionsTableInteraction> transactionsTableInteractions =
TransactionsTableInteraction.getTransactionTableInteractions(
coordinationMap, DefaultRetryPolicy.INSTANCE);

Map<String, RangesForRepair> tokenRangesForRepair =
getRangesForRepairByTable(namespace, transactionsTableInteractions);

Expand All @@ -117,6 +111,13 @@ public void repairTransactionsTables(
});
}

public void cleanTransactionsTables(
Namespace namespace,
long startTimestamp,
List<TransactionsTableInteraction> transactionsTableInteractions) {
cqlClusters.get(namespace).abortTransactions(startTimestamp, transactionsTableInteractions);
}

private Map<String, RangesForRepair> getRangesForRepairByTable(
Namespace namespace, List<TransactionsTableInteraction> transactionsTableInteractions) {
return KeyedStream.stream(getRawRangesForRepairByTable(namespace, transactionsTableInteractions))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,10 @@ public Map<String, Map<InetSocketAddress, RangeSet<LightweightOppToken>>> getTra
.getTransactionTableRangesForRepair(transactionsTableInteractions);
}
}

public void abortTransactions(long timestamp, List<TransactionsTableInteraction> transactionsTableInteractions) {
try (CqlSession session = new CqlSession(cluster.connect())) {
new TransactionAborter(session, config).abortTransactions(timestamp, transactionsTableInteractions);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.palantir.atlasdb.cassandra.backup;

import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.palantir.atlasdb.keyvalue.cassandra.CassandraConstants;
Expand Down Expand Up @@ -51,4 +53,12 @@ public Set<LightweightOppToken> retrieveRowKeysAtConsistencyAll(List<Statement>
.map(LightweightOppToken::serialize)
.collect(Collectors.toSet());
}

public PreparedStatement prepare(Statement statement) {
return session.prepare(statement.toString());
}

public ResultSet execute(Statement statement) {
return session.execute(statement);
}
}
Loading

0 comments on commit c1130c9

Please sign in to comment.