Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
ABR6: TimestampEndpointRestoreTask (#5825)
Browse files Browse the repository at this point in the history
  • Loading branch information
gsheasby authored Dec 17, 2021
1 parent 2072bee commit b7a3860
Show file tree
Hide file tree
Showing 6 changed files with 337 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,24 @@
package com.palantir.atlasdb.backup;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Range;
import com.palantir.atlasdb.backup.api.AtlasRestoreClientBlocking;
import com.palantir.atlasdb.backup.api.CompleteRestoreRequest;
import com.palantir.atlasdb.backup.api.CompleteRestoreResponse;
import com.palantir.atlasdb.backup.api.CompletedBackup;
import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig;
import com.palantir.atlasdb.cassandra.backup.CassandraRepairHelper;
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.keyvalue.cassandra.LightweightOppToken;
import com.palantir.atlasdb.timelock.api.Namespace;
import com.palantir.conjure.java.api.config.service.ServicesConfigBlock;
import com.palantir.dialogue.clients.DialogueClients;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.refreshable.Refreshable;
import com.palantir.tokens.auth.AuthHeader;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Optional;
Expand All @@ -38,22 +46,37 @@
public class AtlasRestoreService {
private static final SafeLogger log = SafeLoggerFactory.get(AtlasRestoreService.class);

private final AuthHeader authHeader;
private final AtlasRestoreClientBlocking atlasRestoreClientBlocking;
private final BackupPersister backupPersister;
private final CassandraRepairHelper cassandraRepairHelper;

@VisibleForTesting
AtlasRestoreService(BackupPersister backupPersister, CassandraRepairHelper cassandraRepairHelper) {
AtlasRestoreService(
AuthHeader authHeader,
AtlasRestoreClientBlocking atlasRestoreClientBlocking,
BackupPersister backupPersister,
CassandraRepairHelper cassandraRepairHelper) {
this.authHeader = authHeader;
this.atlasRestoreClientBlocking = atlasRestoreClientBlocking;
this.backupPersister = backupPersister;
this.cassandraRepairHelper = cassandraRepairHelper;
}

public static AtlasRestoreService create(
AuthHeader authHeader,
Refreshable<ServicesConfigBlock> servicesConfigBlock,
String serviceName,
BackupPersister backupPersister,
Function<Namespace, CassandraKeyValueServiceConfig> keyValueServiceConfigFactory,
Function<Namespace, KeyValueService> keyValueServiceFactory) {
DialogueClients.ReloadingFactory reloadingFactory = DialogueClients.create(servicesConfigBlock);
AtlasRestoreClientBlocking atlasRestoreClientBlocking =
reloadingFactory.get(AtlasRestoreClientBlocking.class, serviceName);

CassandraRepairHelper cassandraRepairHelper =
new CassandraRepairHelper(keyValueServiceConfigFactory, keyValueServiceFactory);
return new AtlasRestoreService(backupPersister, cassandraRepairHelper);
return new AtlasRestoreService(authHeader, atlasRestoreClientBlocking, backupPersister, cassandraRepairHelper);
}

/**
Expand All @@ -75,6 +98,24 @@ public Set<Namespace> repairInternalTables(
return namespacesToRepair;
}

public Set<Namespace> completeRestore(Set<Namespace> namespaces) {
Set<CompletedBackup> completedBackups = namespaces.stream()
.map(backupPersister::getCompletedBackup)
.flatMap(Optional::stream)
.collect(Collectors.toSet());

if (completedBackups.isEmpty()) {
log.info(
"Attempted to complete restore, but no completed backups were found",
SafeArg.of("namespaces", namespaces));
return ImmutableSet.of();
}

CompleteRestoreResponse response =
atlasRestoreClientBlocking.completeRestore(authHeader, CompleteRestoreRequest.of(completedBackups));
return response.getSuccessfulNamespaces();
}

private boolean backupExists(Namespace namespace) {
Optional<CompletedBackup> maybeCompletedBackup = backupPersister.getCompletedBackup(namespace);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,28 @@

package com.palantir.atlasdb.backup;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Range;
import com.palantir.atlasdb.backup.api.AtlasRestoreClientBlocking;
import com.palantir.atlasdb.backup.api.CompleteRestoreRequest;
import com.palantir.atlasdb.backup.api.CompleteRestoreResponse;
import com.palantir.atlasdb.backup.api.CompletedBackup;
import com.palantir.atlasdb.cassandra.backup.CassandraRepairHelper;
import com.palantir.atlasdb.keyvalue.cassandra.LightweightOppToken;
import com.palantir.atlasdb.timelock.api.Namespace;
import com.palantir.tokens.auth.AuthHeader;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -39,19 +48,33 @@
public class AtlasRestoreServiceTest {
private static final Namespace WITH_BACKUP = Namespace.of("with-backup");
private static final Namespace NO_BACKUP = Namespace.of("no-backup");
private static final Namespace FAILING_NAMESPACE = Namespace.of("failing");

@Mock
private AuthHeader authHeader;

@Mock
private AtlasRestoreClientBlocking atlasRestoreClient;

@Mock
private CassandraRepairHelper cassandraRepairHelper;

private AtlasRestoreService atlasRestoreService;
private InMemoryBackupPersister backupPersister;

@Before
public void setup() {
BackupPersister backupPersister = new InMemoryBackupPersister();
atlasRestoreService = new AtlasRestoreService(backupPersister, cassandraRepairHelper);
backupPersister = new InMemoryBackupPersister();
atlasRestoreService =
new AtlasRestoreService(authHeader, atlasRestoreClient, backupPersister, cassandraRepairHelper);

storeCompletedBackup(WITH_BACKUP);
storeCompletedBackup(FAILING_NAMESPACE);
}

private void storeCompletedBackup(Namespace namespace) {
CompletedBackup completedBackup = CompletedBackup.builder()
.namespace(WITH_BACKUP)
.namespace(namespace)
.backupStartTimestamp(1L)
.backupEndTimestamp(2L)
.build();
Expand All @@ -66,4 +89,29 @@ public void repairsOnlyWhenBackupPresent() {
verify(cassandraRepairHelper).repairInternalTables(WITH_BACKUP, doNothingConsumer);
verifyNoMoreInteractions(cassandraRepairHelper);
}

@Test
public void completeRestoreDoesNotRunNamespacesWithoutCompletedBackup() {
Set<Namespace> namespaces = atlasRestoreService.completeRestore(ImmutableSet.of(NO_BACKUP));

assertThat(namespaces).isEmpty();
verifyNoInteractions(atlasRestoreClient);
}

@Test
public void completeRestoreReturnsSuccessfulNamespaces() {
Set<Namespace> namespaces = ImmutableSet.of(WITH_BACKUP, FAILING_NAMESPACE);
Set<CompletedBackup> completedBackups = namespaces.stream()
.map(backupPersister::getCompletedBackup)
.flatMap(Optional::stream)
.collect(Collectors.toSet());

CompleteRestoreRequest request = CompleteRestoreRequest.of(completedBackups);
when(atlasRestoreClient.completeRestore(authHeader, request))
.thenReturn(CompleteRestoreResponse.of(ImmutableSet.of(WITH_BACKUP)));

Set<Namespace> successfulNamespaces = atlasRestoreService.completeRestore(namespaces);
assertThat(successfulNamespaces).containsExactly(WITH_BACKUP);
verify(atlasRestoreClient).completeRestore(authHeader, request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.ImmutableSet;
import com.palantir.atlasdb.AtlasDbConstants;
import com.palantir.atlasdb.backup.AtlasBackupResource;
import com.palantir.atlasdb.backup.AtlasRestoreResource;
import com.palantir.atlasdb.config.AuxiliaryRemotingParameters;
import com.palantir.atlasdb.config.ImmutableLeaderConfig;
import com.palantir.atlasdb.config.ImmutableServerListConfig;
Expand Down Expand Up @@ -353,6 +354,9 @@ private void createAndRegisterResources() {
registerCorruptionHandlerWrappedService(
presentUndertowRegistrar,
AtlasBackupResource.undertow(redirectRetryTargeter, asyncTimelockServiceGetter));
registerCorruptionHandlerWrappedService(
presentUndertowRegistrar,
AtlasRestoreResource.undertow(redirectRetryTargeter, asyncTimelockServiceGetter));
} else {
registrar.accept(ConjureTimelockResource.jersey(redirectRetryTargeter, asyncTimelockServiceGetter));
registrar.accept(ConjureLockWatchingResource.jersey(redirectRetryTargeter, asyncTimelockServiceGetter));
Expand All @@ -361,6 +365,7 @@ private void createAndRegisterResources() {
registrar.accept(
MultiClientConjureTimelockResource.jersey(redirectRetryTargeter, asyncTimelockServiceGetter));
registrar.accept(AtlasBackupResource.jersey(redirectRetryTargeter, asyncTimelockServiceGetter));
registrar.accept(AtlasRestoreResource.jersey(redirectRetryTargeter, asyncTimelockServiceGetter));
}
}

Expand Down
78 changes: 49 additions & 29 deletions timelock-api/src/main/conjure/timelock-api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,35 +86,6 @@ types:
union:
successful: SuccessfulLockResponse
unsuccessful: UnsuccessfulLockResponse
InProgressBackupToken:
package: com.palantir.atlasdb.backup.api
fields:
namespace: Namespace
lockToken: LockToken
immutableTimestamp: Long
backupStartTimestamp: Long
CompletedBackup:
package: com.palantir.atlasdb.backup.api
fields:
namespace: Namespace
backupStartTimestamp: Long
backupEndTimestamp: Long
PrepareBackupRequest:
package: com.palantir.atlasdb.backup.api
fields:
namespaces: set<Namespace>
PrepareBackupResponse:
package: com.palantir.atlasdb.backup.api
fields:
successful: set<InProgressBackupToken>
CompleteBackupRequest:
package: com.palantir.atlasdb.backup.api
fields:
backupTokens: set<InProgressBackupToken>
CompleteBackupResponse:
package: com.palantir.atlasdb.backup.api
fields:
successfulBackups: set<CompletedBackup>
ConjureWaitForLocksResponse:
fields:
wasSuccessful: boolean
Expand Down Expand Up @@ -148,6 +119,44 @@ types:
LeaderTimes:
fields:
leaderTimes: map<Namespace, LeaderTime>
# backup and restore
InProgressBackupToken:
package: com.palantir.atlasdb.backup.api
fields:
namespace: Namespace
lockToken: LockToken
immutableTimestamp: Long
backupStartTimestamp: Long
CompletedBackup:
package: com.palantir.atlasdb.backup.api
fields:
namespace: Namespace
backupStartTimestamp: Long
backupEndTimestamp: Long
PrepareBackupRequest:
package: com.palantir.atlasdb.backup.api
fields:
namespaces: set<Namespace>
PrepareBackupResponse:
package: com.palantir.atlasdb.backup.api
fields:
successful: set<InProgressBackupToken>
CompleteBackupRequest:
package: com.palantir.atlasdb.backup.api
fields:
backupTokens: set<InProgressBackupToken>
CompleteBackupResponse:
package: com.palantir.atlasdb.backup.api
fields:
successfulBackups: set<CompletedBackup>
CompleteRestoreRequest:
package: com.palantir.atlasdb.backup.api
fields:
completedBackups: set<CompletedBackup>
CompleteRestoreResponse:
package: com.palantir.atlasdb.backup.api
fields:
successfulNamespaces: set<Namespace>

services:
AtlasBackupClient:
Expand All @@ -166,6 +175,17 @@ services:
args:
request: CompleteBackupRequest
returns: CompleteBackupResponse
AtlasRestoreClient:
name: Internal restore service
default-auth: header
package: com.palantir.atlasdb.backup.api
base-path: /restore
endpoints:
completeRestore:
http: POST /complete
args:
request: CompleteRestoreRequest
returns: CompleteRestoreResponse
ConjureTimelockService:
name: Timelock service
default-auth: header
Expand Down
Loading

0 comments on commit b7a3860

Please sign in to comment.