Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

ABR15: "clear locks task" wiring #5892

Merged
merged 18 commits into from
Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.datastax.driver.core.policies.DefaultRetryPolicy;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.palantir.atlasdb.backup.api.AtlasRestoreClientBlocking;
import com.palantir.atlasdb.backup.api.CompleteRestoreRequest;
import com.palantir.atlasdb.backup.api.CompleteRestoreResponse;
Expand All @@ -29,11 +30,18 @@
import com.palantir.atlasdb.cassandra.backup.transaction.TransactionsTableInteraction;
import com.palantir.atlasdb.internalschema.InternalSchemaMetadataState;
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.timelock.api.DisableNamespacesResponse;
import com.palantir.atlasdb.timelock.api.Namespace;
import com.palantir.atlasdb.timelock.api.ReenableNamespacesRequest;
import com.palantir.atlasdb.timelock.api.SuccessfulDisableNamespacesResponse;
import com.palantir.atlasdb.timelock.api.UnsuccessfulDisableNamespacesResponse;
import com.palantir.atlasdb.timelock.api.management.TimeLockManagementServiceBlocking;
import com.palantir.common.annotation.NonIdempotent;
import com.palantir.common.streams.KeyedStream;
import com.palantir.conjure.java.api.config.service.ServicesConfigBlock;
import com.palantir.dialogue.clients.DialogueClients;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.refreshable.Refreshable;
Expand All @@ -52,17 +60,20 @@ public class AtlasRestoreService {

private final AuthHeader authHeader;
private final AtlasRestoreClientBlocking atlasRestoreClientBlocking;
private final TimeLockManagementServiceBlocking timeLockManagementService;
private final BackupPersister backupPersister;
private final CassandraRepairHelper cassandraRepairHelper;

@VisibleForTesting
AtlasRestoreService(
AuthHeader authHeader,
AtlasRestoreClientBlocking atlasRestoreClientBlocking,
TimeLockManagementServiceBlocking timeLockManagementService,
BackupPersister backupPersister,
CassandraRepairHelper cassandraRepairHelper) {
this.authHeader = authHeader;
this.atlasRestoreClientBlocking = atlasRestoreClientBlocking;
this.timeLockManagementService = timeLockManagementService;
this.backupPersister = backupPersister;
this.cassandraRepairHelper = cassandraRepairHelper;
}
Expand All @@ -77,10 +88,54 @@ public static AtlasRestoreService create(
DialogueClients.ReloadingFactory reloadingFactory = DialogueClients.create(servicesConfigBlock);
AtlasRestoreClientBlocking atlasRestoreClientBlocking =
reloadingFactory.get(AtlasRestoreClientBlocking.class, serviceName);
TimeLockManagementServiceBlocking timeLockManagementService =
reloadingFactory.get(TimeLockManagementServiceBlocking.class, serviceName);

CassandraRepairHelper cassandraRepairHelper =
new CassandraRepairHelper(keyValueServiceConfigFactory, keyValueServiceFactory);
return new AtlasRestoreService(authHeader, atlasRestoreClientBlocking, backupPersister, cassandraRepairHelper);
return new AtlasRestoreService(
authHeader,
atlasRestoreClientBlocking,
timeLockManagementService,
backupPersister,
cassandraRepairHelper);
}

/**
* Disables TimeLock on all nodes for the given namespaces.
* Should be called exactly once prior to a restore operation. Calling this on multiple nodes will cause conflicts.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

funny fact: we have a NonIdempotent annotation. Check its javadoc...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🌶️

*
* @param namespaces the namespaces to disable
*
* @return the result of the request, including a lock ID which must later be passed to completeRestore.
*/
@NonIdempotent
public DisableNamespacesResponse prepareRestore(Set<Namespace> namespaces) {
Map<Namespace, CompletedBackup> completedBackups = getCompletedBackups(namespaces);
Set<Namespace> namespacesToRestore = completedBackups.keySet();

DisableNamespacesResponse response = timeLockManagementService.disableTimelock(authHeader, namespacesToRestore);
return response.accept(new DisableNamespacesResponse.Visitor<>() {
@Override
public DisableNamespacesResponse visitSuccessful(SuccessfulDisableNamespacesResponse value) {
return response;
}

@Override
public DisableNamespacesResponse visitUnsuccessful(UnsuccessfulDisableNamespacesResponse value) {
log.error(
"Failed to disable namespaces prior to restore",
SafeArg.of("namespaces", namespaces),
SafeArg.of("response", value));
return response;
}

@Override
public DisableNamespacesResponse visitUnknown(String unknownType) {
throw new SafeIllegalStateException(
"Unknown DisableNamespacesResponse", SafeArg.of("unknownType", unknownType));
}
});
}

/**
Expand All @@ -98,19 +153,69 @@ public Set<Namespace> repairInternalTables(
Set<Namespace> namespaces, BiConsumer<String, RangesForRepair> repairTable) {
Map<Namespace, CompletedBackup> completedBackups = getCompletedBackups(namespaces);
Set<Namespace> namespacesToRepair = completedBackups.keySet();
repairTables(repairTable, completedBackups, namespacesToRepair);
return namespacesToRepair;
}

/**
* Completes the restore process for the requested namespaces.
* This includes fast-forwarding the timestamp, and then re-enabling the TimeLock namespaces.
*
* @param request the request object, which must include the lock ID returned by {@link #prepareRestore(Set)}
* @return the set of namespaces that were successfully fast-forwarded and re-enabled.
*/
@NonIdempotent
public Set<Namespace> completeRestore(ReenableNamespacesRequest request) {
Set<CompletedBackup> completedBackups = request.getNamespaces().stream()
.map(backupPersister::getCompletedBackup)
.flatMap(Optional::stream)
.collect(Collectors.toSet());

if (completedBackups.isEmpty()) {
log.info(
"Attempted to complete restore, but no completed backups were found",
SafeArg.of("namespaces", request.getNamespaces()));
return ImmutableSet.of();
}

// Fast forward timestamps
CompleteRestoreResponse response =
atlasRestoreClientBlocking.completeRestore(authHeader, CompleteRestoreRequest.of(completedBackups));
Set<Namespace> successfulNamespaces = response.getSuccessfulNamespaces();
Set<Namespace> failedNamespaces = Sets.difference(request.getNamespaces(), successfulNamespaces);
if (!failedNamespaces.isEmpty()) {
log.error(
"Failed to fast-forward timestamp for some namespaces. These will not be re-enabled.",
SafeArg.of("failedNamespaces", failedNamespaces),
SafeArg.of("fastForwardedNamespaces", successfulNamespaces));
}

// Re-enable timelock
timeLockManagementService.reenableTimelock(
authHeader, ReenableNamespacesRequest.of(successfulNamespaces, request.getLockId()));
if (successfulNamespaces.containsAll(request.getNamespaces())) {
log.info(
"Successfully completed restore for all namespaces",
SafeArg.of("namespaces", successfulNamespaces));
}

return successfulNamespaces;
}

private void repairTables(
BiConsumer<String, RangesForRepair> repairTable,
Map<Namespace, CompletedBackup> completedBackups,
Set<Namespace> namespacesToRepair) {
// ConsistentCasTablesTask
namespacesToRepair.forEach(namespace -> cassandraRepairHelper.repairInternalTables(namespace, repairTable));

// RepairTransactionsTablesTask
KeyedStream.stream(completedBackups)
.forEach((namespace, completedBackup) ->
restoreTransactionsTables(namespace, completedBackup, repairTable));

return namespacesToRepair;
repairTransactionsTables(namespace, completedBackup, repairTable));
}

private void restoreTransactionsTables(
private void repairTransactionsTables(
Namespace namespace, CompletedBackup completedBackup, BiConsumer<String, RangesForRepair> repairTable) {
Map<FullyBoundedTimestampRange, Integer> coordinationMap = getCoordinationMap(namespace, completedBackup);
List<TransactionsTableInteraction> transactionsTableInteractions =
Expand All @@ -132,24 +237,6 @@ private Map<FullyBoundedTimestampRange, Integer> getCoordinationMap(
schemaMetadataState, fastForwardTs, immutableTs);
}

public Set<Namespace> completeRestore(Set<Namespace> namespaces) {
Set<CompletedBackup> completedBackups = namespaces.stream()
.map(backupPersister::getCompletedBackup)
.flatMap(Optional::stream)
.collect(Collectors.toSet());

if (completedBackups.isEmpty()) {
log.info(
"Attempted to complete restore, but no completed backups were found",
SafeArg.of("namespaces", namespaces));
return ImmutableSet.of();
}

CompleteRestoreResponse response =
atlasRestoreClientBlocking.completeRestore(authHeader, CompleteRestoreRequest.of(completedBackups));
return response.getSuccessfulNamespaces();
}

private Map<Namespace, CompletedBackup> getCompletedBackups(Set<Namespace> namespaces) {
return KeyedStream.of(namespaces)
.map(backupPersister::getCompletedBackup)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,24 @@
import com.palantir.atlasdb.backup.api.CompletedBackup;
import com.palantir.atlasdb.cassandra.backup.CassandraRepairHelper;
import com.palantir.atlasdb.cassandra.backup.RangesForRepair;
import com.palantir.atlasdb.timelock.api.DisableNamespacesResponse;
import com.palantir.atlasdb.timelock.api.Namespace;
import com.palantir.atlasdb.timelock.api.ReenableNamespacesRequest;
import com.palantir.atlasdb.timelock.api.SuccessfulDisableNamespacesResponse;
import com.palantir.atlasdb.timelock.api.UnsuccessfulDisableNamespacesResponse;
import com.palantir.atlasdb.timelock.api.management.TimeLockManagementServiceBlocking;
import com.palantir.tokens.auth.AuthHeader;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
Expand All @@ -49,13 +57,17 @@ public class AtlasRestoreServiceTest {
private static final Namespace NO_BACKUP = Namespace.of("no-backup");
private static final Namespace FAILING_NAMESPACE = Namespace.of("failing");
private static final long BACKUP_START_TIMESTAMP = 2L;
private static final UUID LOCK_ID = new UUID(12, 9);

@Mock
private AuthHeader authHeader;

@Mock
private AtlasRestoreClientBlocking atlasRestoreClient;

@Mock
private TimeLockManagementServiceBlocking timeLockManagementService;

@Mock
private CassandraRepairHelper cassandraRepairHelper;

Expand All @@ -65,8 +77,8 @@ public class AtlasRestoreServiceTest {
@Before
public void setup() {
backupPersister = new InMemoryBackupPersister();
atlasRestoreService =
new AtlasRestoreService(authHeader, atlasRestoreClient, backupPersister, cassandraRepairHelper);
atlasRestoreService = new AtlasRestoreService(
authHeader, atlasRestoreClient, timeLockManagementService, backupPersister, cassandraRepairHelper);

storeCompletedBackup(WITH_BACKUP);
storeCompletedBackup(FAILING_NAMESPACE);
Expand All @@ -83,22 +95,74 @@ private void storeCompletedBackup(Namespace namespace) {
}

@Test
public void repairsOnlyWhenBackupPresent() {
public void prepareReturnsOnlyCompletedBackups() {
DisableNamespacesResponse successfulDisable =
DisableNamespacesResponse.successful(SuccessfulDisableNamespacesResponse.of(LOCK_ID));
when(timeLockManagementService.disableTimelock(authHeader, ImmutableSet.of(WITH_BACKUP)))
.thenReturn(successfulDisable);

DisableNamespacesResponse actualDisable =
atlasRestoreService.prepareRestore(ImmutableSet.of(WITH_BACKUP, NO_BACKUP));
assertThat(actualDisable).isEqualTo(successfulDisable);
}

@Test
public void prepareBackupFailsIfDisableFails() {
DisableNamespacesResponse failedDisable = DisableNamespacesResponse.unsuccessful(
UnsuccessfulDisableNamespacesResponse.of(ImmutableSet.of(WITH_BACKUP), ImmutableSet.of()));
when(timeLockManagementService.disableTimelock(authHeader, ImmutableSet.of(WITH_BACKUP)))
.thenReturn(failedDisable);

DisableNamespacesResponse actualDisable =
atlasRestoreService.prepareRestore(ImmutableSet.of(WITH_BACKUP, NO_BACKUP));
assertThat(actualDisable).isEqualTo(failedDisable);
}

@Test
public void repairsOnlyWhenBackupPresentAndDisableSuccessful() {
BiConsumer<String, RangesForRepair> doNothingConsumer = (_unused1, _unused2) -> {};
atlasRestoreService.repairInternalTables(ImmutableSet.of(WITH_BACKUP, NO_BACKUP), doNothingConsumer);

Set<Namespace> repairedNamespaces =
atlasRestoreService.repairInternalTables(ImmutableSet.of(WITH_BACKUP, NO_BACKUP), doNothingConsumer);
assertThat(repairedNamespaces).containsExactly(WITH_BACKUP);

verify(cassandraRepairHelper).repairInternalTables(WITH_BACKUP, doNothingConsumer);
verify(cassandraRepairHelper).repairTransactionsTables(eq(WITH_BACKUP), anyList(), eq(doNothingConsumer));
verify(cassandraRepairHelper).cleanTransactionsTables(eq(WITH_BACKUP), eq(BACKUP_START_TIMESTAMP), anyList());
verifyNoMoreInteractions(cassandraRepairHelper);
}

@Test
public void completesRestoreAfterFastForwardingTimestamp() {
Set<Namespace> namespaces = ImmutableSet.of(WITH_BACKUP);
Set<CompletedBackup> completedBackups = namespaces.stream()
.map(backupPersister::getCompletedBackup)
.flatMap(Optional::stream)
.collect(Collectors.toSet());

CompleteRestoreRequest completeRequest = CompleteRestoreRequest.of(completedBackups);
when(atlasRestoreClient.completeRestore(authHeader, completeRequest))
.thenReturn(CompleteRestoreResponse.of(ImmutableSet.of(WITH_BACKUP)));

ReenableNamespacesRequest reenableRequest = ReenableNamespacesRequest.of(namespaces, LOCK_ID);

Set<Namespace> successfulNamespaces = atlasRestoreService.completeRestore(reenableRequest);
assertThat(successfulNamespaces).containsExactly(WITH_BACKUP);

InOrder inOrder = Mockito.inOrder(atlasRestoreClient, timeLockManagementService);
inOrder.verify(atlasRestoreClient).completeRestore(authHeader, completeRequest);
inOrder.verify(timeLockManagementService).reenableTimelock(authHeader, reenableRequest);
}

@Test
public void completeRestoreDoesNotRunNamespacesWithoutCompletedBackup() {
Set<Namespace> namespaces = atlasRestoreService.completeRestore(ImmutableSet.of(NO_BACKUP));
ReenableNamespacesRequest reenableRequest = ReenableNamespacesRequest.of(ImmutableSet.of(NO_BACKUP), LOCK_ID);

Set<Namespace> namespaces = atlasRestoreService.completeRestore(reenableRequest);

assertThat(namespaces).isEmpty();
verifyNoInteractions(atlasRestoreClient);
verifyNoInteractions(timeLockManagementService);
}

@Test
Expand All @@ -113,7 +177,9 @@ public void completeRestoreReturnsSuccessfulNamespaces() {
when(atlasRestoreClient.completeRestore(authHeader, request))
.thenReturn(CompleteRestoreResponse.of(ImmutableSet.of(WITH_BACKUP)));

Set<Namespace> successfulNamespaces = atlasRestoreService.completeRestore(namespaces);
ReenableNamespacesRequest reenableRequest = ReenableNamespacesRequest.of(namespaces, LOCK_ID);

Set<Namespace> successfulNamespaces = atlasRestoreService.completeRestore(reenableRequest);
assertThat(successfulNamespaces).containsExactly(WITH_BACKUP);
verify(atlasRestoreClient).completeRestore(authHeader, request);
}
Expand Down
7 changes: 7 additions & 0 deletions changelog/@unreleased/pr-5892.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
type: feature
feature:
description: Added AtlasRestoreService methods `prepareRestore` and `completeRestore`,
which will respectively disable and re-enable TimeLock, and should be called during
the restore process.
links:
- https://github.com/palantir/atlasdb/pull/5892
Loading