This repository has been archived by the owner on Nov 14, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 15
ABR15: "clear locks task" wiring #5892
Merged
Merged
Changes from 17 commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
ac52091
Revert "Remove wiring stubs from this PR"
gsheasby 0086577
fix typo
gsheasby 127b688
wire up local updater
gsheasby 034fc3a
Wire ANDNU up to TimeLockManagementResource
gsheasby 318f302
wire up to AtlasRestoreService
gsheasby bbc919d
disable/reenable during restore process
gsheasby d96a048
bail out if disable unsuccessful
gsheasby 86d8142
separate prepare method
gsheasby b07e208
pull out AuthHeaderValidator
gsheasby 36ba1b3
validate auth headers in mgmt resource
gsheasby 7734754
more tests
gsheasby be64218
compile
gsheasby 48c6834
cleanup
gsheasby 0b7cb0e
javadoc
gsheasby bad7a50
authenticate internal calls too
gsheasby d136888
refactor usages of AuthHeaderValidator
gsheasby b38c83f
Add generated changelog entries
svc-changelog 4ba9f67
nits
gsheasby File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
import com.datastax.driver.core.policies.DefaultRetryPolicy; | ||
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.common.collect.ImmutableSet; | ||
import com.google.common.collect.Sets; | ||
import com.palantir.atlasdb.backup.api.AtlasRestoreClientBlocking; | ||
import com.palantir.atlasdb.backup.api.CompleteRestoreRequest; | ||
import com.palantir.atlasdb.backup.api.CompleteRestoreResponse; | ||
|
@@ -29,11 +30,17 @@ | |
import com.palantir.atlasdb.cassandra.backup.transaction.TransactionsTableInteraction; | ||
import com.palantir.atlasdb.internalschema.InternalSchemaMetadataState; | ||
import com.palantir.atlasdb.keyvalue.api.KeyValueService; | ||
import com.palantir.atlasdb.timelock.api.DisableNamespacesResponse; | ||
import com.palantir.atlasdb.timelock.api.Namespace; | ||
import com.palantir.atlasdb.timelock.api.ReenableNamespacesRequest; | ||
import com.palantir.atlasdb.timelock.api.SuccessfulDisableNamespacesResponse; | ||
import com.palantir.atlasdb.timelock.api.UnsuccessfulDisableNamespacesResponse; | ||
import com.palantir.atlasdb.timelock.api.management.TimeLockManagementServiceBlocking; | ||
import com.palantir.common.streams.KeyedStream; | ||
import com.palantir.conjure.java.api.config.service.ServicesConfigBlock; | ||
import com.palantir.dialogue.clients.DialogueClients; | ||
import com.palantir.logsafe.SafeArg; | ||
import com.palantir.logsafe.exceptions.SafeIllegalStateException; | ||
import com.palantir.logsafe.logger.SafeLogger; | ||
import com.palantir.logsafe.logger.SafeLoggerFactory; | ||
import com.palantir.refreshable.Refreshable; | ||
|
@@ -52,17 +59,20 @@ public class AtlasRestoreService { | |
|
||
private final AuthHeader authHeader; | ||
private final AtlasRestoreClientBlocking atlasRestoreClientBlocking; | ||
private final TimeLockManagementServiceBlocking timeLockManagementService; | ||
private final BackupPersister backupPersister; | ||
private final CassandraRepairHelper cassandraRepairHelper; | ||
|
||
@VisibleForTesting | ||
AtlasRestoreService( | ||
AuthHeader authHeader, | ||
AtlasRestoreClientBlocking atlasRestoreClientBlocking, | ||
TimeLockManagementServiceBlocking timeLockManagementService, | ||
BackupPersister backupPersister, | ||
CassandraRepairHelper cassandraRepairHelper) { | ||
this.authHeader = authHeader; | ||
this.atlasRestoreClientBlocking = atlasRestoreClientBlocking; | ||
this.timeLockManagementService = timeLockManagementService; | ||
this.backupPersister = backupPersister; | ||
this.cassandraRepairHelper = cassandraRepairHelper; | ||
} | ||
|
@@ -77,10 +87,53 @@ public static AtlasRestoreService create( | |
DialogueClients.ReloadingFactory reloadingFactory = DialogueClients.create(servicesConfigBlock); | ||
AtlasRestoreClientBlocking atlasRestoreClientBlocking = | ||
reloadingFactory.get(AtlasRestoreClientBlocking.class, serviceName); | ||
TimeLockManagementServiceBlocking timeLockManagementService = | ||
reloadingFactory.get(TimeLockManagementServiceBlocking.class, serviceName); | ||
|
||
CassandraRepairHelper cassandraRepairHelper = | ||
new CassandraRepairHelper(keyValueServiceConfigFactory, keyValueServiceFactory); | ||
return new AtlasRestoreService(authHeader, atlasRestoreClientBlocking, backupPersister, cassandraRepairHelper); | ||
return new AtlasRestoreService( | ||
authHeader, | ||
atlasRestoreClientBlocking, | ||
timeLockManagementService, | ||
backupPersister, | ||
cassandraRepairHelper); | ||
} | ||
|
||
/** | ||
* Disables TimeLock on all nodes for the given namespaces. | ||
* Should be called exactly once prior to a restore operation. Calling this on multiple nodes will cause conflicts. | ||
* | ||
* @param namespaces the namespaces to disable | ||
* | ||
* @return the result of the request, including a lock ID which must later be passed to completeRestore. | ||
*/ | ||
public DisableNamespacesResponse prepareRestore(Set<Namespace> namespaces) { | ||
Map<Namespace, CompletedBackup> completedBackups = getCompletedBackups(namespaces); | ||
Set<Namespace> namespacesToRepair = completedBackups.keySet(); | ||
|
||
DisableNamespacesResponse response = timeLockManagementService.disableTimelock(authHeader, namespacesToRepair); | ||
gsheasby marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return response.accept(new DisableNamespacesResponse.Visitor<>() { | ||
@Override | ||
public DisableNamespacesResponse visitSuccessful(SuccessfulDisableNamespacesResponse value) { | ||
return response; | ||
} | ||
|
||
@Override | ||
public DisableNamespacesResponse visitUnsuccessful(UnsuccessfulDisableNamespacesResponse value) { | ||
log.error( | ||
"Failed to disable namespaces prior to restore", | ||
SafeArg.of("namespaces", namespaces), | ||
SafeArg.of("response", value)); | ||
return response; | ||
} | ||
|
||
@Override | ||
public DisableNamespacesResponse visitUnknown(String unknownType) { | ||
throw new SafeIllegalStateException( | ||
"Unknown DisableNamespacesResponse", SafeArg.of("unknownType", unknownType)); | ||
} | ||
}); | ||
} | ||
|
||
/** | ||
|
@@ -98,16 +151,65 @@ public Set<Namespace> repairInternalTables( | |
Set<Namespace> namespaces, BiConsumer<String, RangesForRepair> repairTable) { | ||
Map<Namespace, CompletedBackup> completedBackups = getCompletedBackups(namespaces); | ||
Set<Namespace> namespacesToRepair = completedBackups.keySet(); | ||
restoreTables(repairTable, completedBackups, namespacesToRepair); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. restore -> repair There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done (also renamed the |
||
return namespacesToRepair; | ||
} | ||
|
||
/** | ||
* Completes the restore process for the requested namespaces. | ||
* This includes fast-forwarding the timestamp, and then re-enabling the TimeLock namespaces. | ||
* | ||
* @param request the request object, which must include the lock ID returned by {@link #prepareRestore(Set)} | ||
* @return the set of namespaces that were successfully fast-forwarded and re-enabled. | ||
*/ | ||
public Set<Namespace> completeRestore(ReenableNamespacesRequest request) { | ||
Set<CompletedBackup> completedBackups = request.getNamespaces().stream() | ||
.map(backupPersister::getCompletedBackup) | ||
.flatMap(Optional::stream) | ||
.collect(Collectors.toSet()); | ||
|
||
if (completedBackups.isEmpty()) { | ||
log.info( | ||
"Attempted to complete restore, but no completed backups were found", | ||
SafeArg.of("namespaces", request.getNamespaces())); | ||
return ImmutableSet.of(); | ||
} | ||
|
||
// Fast forward timestamps | ||
CompleteRestoreResponse response = | ||
atlasRestoreClientBlocking.completeRestore(authHeader, CompleteRestoreRequest.of(completedBackups)); | ||
Set<Namespace> successfulNamespaces = response.getSuccessfulNamespaces(); | ||
Set<Namespace> failedNamespaces = Sets.difference(request.getNamespaces(), successfulNamespaces); | ||
if (!failedNamespaces.isEmpty()) { | ||
log.error( | ||
"Failed to fast-forward timestamp for some namespaces. These will not be re-enabled.", | ||
SafeArg.of("failedNamespaces", failedNamespaces), | ||
SafeArg.of("fastForwardedNamespaces", successfulNamespaces)); | ||
} | ||
|
||
// Re-enable timelock | ||
timeLockManagementService.reenableTimelock( | ||
authHeader, ReenableNamespacesRequest.of(successfulNamespaces, request.getLockId())); | ||
if (successfulNamespaces.containsAll(request.getNamespaces())) { | ||
log.info( | ||
"Successfully completed restore for all namespaces", | ||
SafeArg.of("namespaces", successfulNamespaces)); | ||
} | ||
|
||
return successfulNamespaces; | ||
} | ||
|
||
private void restoreTables( | ||
BiConsumer<String, RangesForRepair> repairTable, | ||
Map<Namespace, CompletedBackup> completedBackups, | ||
Set<Namespace> namespacesToRepair) { | ||
// ConsistentCasTablesTask | ||
namespacesToRepair.forEach(namespace -> cassandraRepairHelper.repairInternalTables(namespace, repairTable)); | ||
|
||
// RepairTransactionsTablesTask | ||
KeyedStream.stream(completedBackups) | ||
.forEach((namespace, completedBackup) -> | ||
restoreTransactionsTables(namespace, completedBackup, repairTable)); | ||
|
||
return namespacesToRepair; | ||
} | ||
|
||
private void restoreTransactionsTables( | ||
|
@@ -132,24 +234,6 @@ private Map<FullyBoundedTimestampRange, Integer> getCoordinationMap( | |
schemaMetadataState, fastForwardTs, immutableTs); | ||
} | ||
|
||
public Set<Namespace> completeRestore(Set<Namespace> namespaces) { | ||
Set<CompletedBackup> completedBackups = namespaces.stream() | ||
.map(backupPersister::getCompletedBackup) | ||
.flatMap(Optional::stream) | ||
.collect(Collectors.toSet()); | ||
|
||
if (completedBackups.isEmpty()) { | ||
log.info( | ||
"Attempted to complete restore, but no completed backups were found", | ||
SafeArg.of("namespaces", namespaces)); | ||
return ImmutableSet.of(); | ||
} | ||
|
||
CompleteRestoreResponse response = | ||
atlasRestoreClientBlocking.completeRestore(authHeader, CompleteRestoreRequest.of(completedBackups)); | ||
return response.getSuccessfulNamespaces(); | ||
} | ||
|
||
private Map<Namespace, CompletedBackup> getCompletedBackups(Set<Namespace> namespaces) { | ||
return KeyedStream.of(namespaces) | ||
.map(backupPersister::getCompletedBackup) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
type: feature | ||
feature: | ||
description: Added AtlasRestoreService methods `prepareRestore` and `completeRestore`, | ||
which will respectively disable and re-enable TimeLock, and should be called during | ||
the restore process. | ||
links: | ||
- https://github.com/palantir/atlasdb/pull/5892 |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
funny fact: we have a
NonIdempotent
annotation. Check its javadoc...There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🌶️