From 9fafbf96823e364acc6428650daedc80817c89c2 Mon Sep 17 00:00:00 2001 From: Glenn Sheasby Date: Mon, 8 Nov 2021 13:49:50 +0000 Subject: [PATCH] ABR1: AtlasBackupService (#5708) --- atlasdb-backup/build.gradle | 10 + .../atlasdb/backup/AtlasBackupService.java | 87 +++++++++ .../backup/AtlasBackupServiceTest.java | 109 +++++++++++ .../atlasdb/debug/LocalLockTracker.java | 2 +- changelog/@unreleased/pr-5708.v2.yml | 7 + .../com/palantir/lock/v2/TimelockService.java | 2 +- settings.gradle | 1 + .../timelock/paxos/TimeLockAgent.java | 23 ++- .../src/main/conjure/timelock-api.yml | 43 +++++ .../atlasdb/backup/AtlasBackupResource.java | 171 +++++++++++++++++ .../backup/AtlasBackupResourceTest.java | 175 ++++++++++++++++++ .../timelock/ConjureTimelockResourceTest.java | 14 +- ...ultiClientConjureTimelockResourceTest.java | 14 +- .../atlasdb/util/TimelockTestUtils.java | 32 ++++ 14 files changed, 657 insertions(+), 33 deletions(-) create mode 100644 atlasdb-backup/build.gradle create mode 100644 atlasdb-backup/src/main/java/com/palantir/atlasdb/backup/AtlasBackupService.java create mode 100644 atlasdb-backup/src/test/java/com/palantir/atlasdb/backup/AtlasBackupServiceTest.java create mode 100644 changelog/@unreleased/pr-5708.v2.yml create mode 100644 timelock-impl/src/main/java/com/palantir/atlasdb/backup/AtlasBackupResource.java create mode 100644 timelock-impl/src/test/java/com/palantir/atlasdb/backup/AtlasBackupResourceTest.java create mode 100644 timelock-impl/src/test/java/com/palantir/atlasdb/util/TimelockTestUtils.java diff --git a/atlasdb-backup/build.gradle b/atlasdb-backup/build.gradle new file mode 100644 index 00000000000..2a6598df225 --- /dev/null +++ b/atlasdb-backup/build.gradle @@ -0,0 +1,10 @@ +apply from: "../gradle/shared.gradle" + +dependencies { + implementation project(':atlasdb-config') + implementation project(':timelock-api:timelock-api-objects') + + implementation 'com.palantir.safe-logging:safe-logging' + + testImplementation 'org.mockito:mockito-core' +} diff --git a/atlasdb-backup/src/main/java/com/palantir/atlasdb/backup/AtlasBackupService.java b/atlasdb-backup/src/main/java/com/palantir/atlasdb/backup/AtlasBackupService.java new file mode 100644 index 00000000000..0e93b3cecea --- /dev/null +++ b/atlasdb-backup/src/main/java/com/palantir/atlasdb/backup/AtlasBackupService.java @@ -0,0 +1,87 @@ +/* + * (c) Copyright 2021 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.backup; + +import com.google.common.annotations.VisibleForTesting; +import com.palantir.atlasdb.backup.api.AtlasBackupClientBlocking; +import com.palantir.atlasdb.timelock.api.CompleteBackupRequest; +import com.palantir.atlasdb.timelock.api.CompleteBackupResponse; +import com.palantir.atlasdb.timelock.api.CompletedBackup; +import com.palantir.atlasdb.timelock.api.InProgressBackupToken; +import com.palantir.atlasdb.timelock.api.Namespace; +import com.palantir.atlasdb.timelock.api.PrepareBackupRequest; +import com.palantir.atlasdb.timelock.api.PrepareBackupResponse; +import com.palantir.conjure.java.api.config.service.ServicesConfigBlock; +import com.palantir.dialogue.clients.DialogueClients; +import com.palantir.dialogue.clients.DialogueClients.ReloadingFactory; +import com.palantir.refreshable.Refreshable; +import com.palantir.tokens.auth.AuthHeader; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +public final class AtlasBackupService { + private final AuthHeader authHeader; + private final AtlasBackupClientBlocking atlasBackupClientBlocking; + private final Map storedTokens; + + @VisibleForTesting + AtlasBackupService(AuthHeader authHeader, AtlasBackupClientBlocking atlasBackupClientBlocking) { + this.authHeader = authHeader; + this.atlasBackupClientBlocking = atlasBackupClientBlocking; + this.storedTokens = new ConcurrentHashMap<>(); + } + + public static AtlasBackupService create( + AuthHeader authHeader, Refreshable servicesConfigBlock, String serviceName) { + ReloadingFactory reloadingFactory = DialogueClients.create(servicesConfigBlock); + AtlasBackupClientBlocking atlasBackupClientBlocking = + reloadingFactory.get(AtlasBackupClientBlocking.class, serviceName); + return new AtlasBackupService(authHeader, atlasBackupClientBlocking); + } + + public Set prepareBackup(Set namespaces) { + PrepareBackupRequest request = PrepareBackupRequest.of(namespaces); + PrepareBackupResponse response = atlasBackupClientBlocking.prepareBackup(authHeader, request); + + return response.getSuccessful().stream() + .peek(this::storeBackupToken) + .map(InProgressBackupToken::getNamespace) + .collect(Collectors.toSet()); + } + + private void storeBackupToken(InProgressBackupToken backupToken) { + storedTokens.put(backupToken.getNamespace(), backupToken); + } + + // TODO(gs): actually persist the token using a persister passed into this class. + // Then we have an atlas-side implementation of the persister that conforms with the current backup story + public Set completeBackup(Set namespaces) { + Set tokens = namespaces.stream() + .map(storedTokens::remove) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + CompleteBackupRequest request = CompleteBackupRequest.of(tokens); + CompleteBackupResponse response = atlasBackupClientBlocking.completeBackup(authHeader, request); + + return response.getSuccessfulBackups().stream() + .map(CompletedBackup::getNamespace) + .collect(Collectors.toSet()); + } +} diff --git a/atlasdb-backup/src/test/java/com/palantir/atlasdb/backup/AtlasBackupServiceTest.java b/atlasdb-backup/src/test/java/com/palantir/atlasdb/backup/AtlasBackupServiceTest.java new file mode 100644 index 00000000000..d64540ebf10 --- /dev/null +++ b/atlasdb-backup/src/test/java/com/palantir/atlasdb/backup/AtlasBackupServiceTest.java @@ -0,0 +1,109 @@ +/* + * (c) Copyright 2021 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.backup; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableSet; +import com.palantir.atlasdb.backup.api.AtlasBackupClientBlocking; +import com.palantir.atlasdb.timelock.api.CompleteBackupRequest; +import com.palantir.atlasdb.timelock.api.CompleteBackupResponse; +import com.palantir.atlasdb.timelock.api.CompletedBackup; +import com.palantir.atlasdb.timelock.api.InProgressBackupToken; +import com.palantir.atlasdb.timelock.api.Namespace; +import com.palantir.atlasdb.timelock.api.PrepareBackupRequest; +import com.palantir.atlasdb.timelock.api.PrepareBackupResponse; +import com.palantir.lock.v2.LockToken; +import com.palantir.tokens.auth.AuthHeader; +import java.util.Set; +import java.util.UUID; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class AtlasBackupServiceTest { + private static final Namespace NAMESPACE = Namespace.of("foo"); + private static final Namespace OTHER_NAMESPACE = Namespace.of("other"); + private static final InProgressBackupToken IN_PROGRESS = inProgressBackupToken(NAMESPACE); + + @Mock + private AuthHeader authHeader; + + @Mock + private AtlasBackupClientBlocking atlasBackupClient; + + private AtlasBackupService atlasBackupService; + + @Before + public void setup() { + atlasBackupService = new AtlasBackupService(authHeader, atlasBackupClient); + } + + @Test + public void prepareBackupReturnsSuccessfulNamespaces() { + when(atlasBackupClient.prepareBackup( + authHeader, PrepareBackupRequest.of(ImmutableSet.of(NAMESPACE, OTHER_NAMESPACE)))) + .thenReturn(PrepareBackupResponse.of(ImmutableSet.of(IN_PROGRESS))); + + assertThat(atlasBackupService.prepareBackup(ImmutableSet.of(NAMESPACE, OTHER_NAMESPACE))) + .containsExactly(NAMESPACE); + } + + @Test + public void completeBackupDoesNotRunUnpreparedNamespaces() { + when(atlasBackupClient.completeBackup(authHeader, CompleteBackupRequest.of(ImmutableSet.of()))) + .thenReturn(CompleteBackupResponse.of(ImmutableSet.of())); + + assertThat(atlasBackupService.completeBackup(ImmutableSet.of(OTHER_NAMESPACE))) + .isEmpty(); + } + + @Test + public void completeBackupReturnsSuccessfulNamespaces() { + InProgressBackupToken otherInProgress = inProgressBackupToken(OTHER_NAMESPACE); + Set namespaces = ImmutableSet.of(NAMESPACE, OTHER_NAMESPACE); + + when(atlasBackupClient.prepareBackup(authHeader, PrepareBackupRequest.of(namespaces))) + .thenReturn(PrepareBackupResponse.of(ImmutableSet.of(IN_PROGRESS, otherInProgress))); + + CompletedBackup completedBackup = CompletedBackup.builder() + .namespace(NAMESPACE) + .backupStartTimestamp(2L) + .backupEndTimestamp(3L) + .build(); + when(atlasBackupClient.completeBackup( + authHeader, CompleteBackupRequest.of(ImmutableSet.of(IN_PROGRESS, otherInProgress)))) + .thenReturn(CompleteBackupResponse.of(ImmutableSet.of(completedBackup))); + + atlasBackupService.prepareBackup(namespaces); + + assertThat(atlasBackupService.completeBackup(namespaces)).containsExactly(NAMESPACE); + } + + private static InProgressBackupToken inProgressBackupToken(Namespace namespace) { + return InProgressBackupToken.builder() + .namespace(namespace) + .immutableTimestamp(1L) + .backupStartTimestamp(2L) + .lockToken(LockToken.of(UUID.randomUUID())) + .build(); + } +} diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/debug/LocalLockTracker.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/debug/LocalLockTracker.java index 094524967cd..8c5ead5df1d 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/debug/LocalLockTracker.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/debug/LocalLockTracker.java @@ -54,7 +54,7 @@ List getLocalLockHistory() { void logLockResponse(Set lockDescriptors, ConjureLockResponse response) { TrackedLockEvent event = getTimestampedLockEventBuilder() .eventType(EventType.LOCK) - .eventDescription(response.accept(new ConjureLockResponse.Visitor() { + .eventDescription(response.accept(new ConjureLockResponse.Visitor<>() { @Override public String visitSuccessful(SuccessfulLockResponse value) { return "SUCCESS - locked " + lockDescriptors + "; obtained " + value; diff --git a/changelog/@unreleased/pr-5708.v2.yml b/changelog/@unreleased/pr-5708.v2.yml new file mode 100644 index 00000000000..9efd8a1e19a --- /dev/null +++ b/changelog/@unreleased/pr-5708.v2.yml @@ -0,0 +1,7 @@ +type: feature +feature: + description: Add AtlasBackupService, which encapsulates the steps of the backup + process where communication with Timelock and/or knowledge of AtlasDB internals + is required. + links: + - https://github.com/palantir/atlasdb/pull/5708 diff --git a/lock-api-objects/src/main/java/com/palantir/lock/v2/TimelockService.java b/lock-api-objects/src/main/java/com/palantir/lock/v2/TimelockService.java index 29c36b4b457..db047fde4a3 100644 --- a/lock-api-objects/src/main/java/com/palantir/lock/v2/TimelockService.java +++ b/lock-api-objects/src/main/java/com/palantir/lock/v2/TimelockService.java @@ -46,7 +46,7 @@ default boolean isInitialized() { TimestampRange getFreshTimestamps(@Safe @QueryParam("number") int numTimestampsRequested); - // TODO (jkong): Can this be deprecated? Are there users outside of Atlas transactions? + // TODO(gs): Deprecate this once users outside of Atlas transactions have been eliminated LockImmutableTimestampResponse lockImmutableTimestamp(); @DoNotDelegate diff --git a/settings.gradle b/settings.gradle index 11e599f28ba..f17b11fbd39 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,6 +1,7 @@ rootProject.name = 'atlasdb' include ":atlasdb-api" include ":atlasdb-autobatch" +include ":atlasdb-backup" include ":atlasdb-cassandra" include ":atlasdb-cassandra-integration-tests" include ":atlasdb-cassandra-multinode-tests" diff --git a/timelock-agent/src/main/java/com/palantir/timelock/paxos/TimeLockAgent.java b/timelock-agent/src/main/java/com/palantir/timelock/paxos/TimeLockAgent.java index ba0a4ff6d92..da08b612c04 100644 --- a/timelock-agent/src/main/java/com/palantir/timelock/paxos/TimeLockAgent.java +++ b/timelock-agent/src/main/java/com/palantir/timelock/paxos/TimeLockAgent.java @@ -24,6 +24,7 @@ import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableSet; import com.palantir.atlasdb.AtlasDbConstants; +import com.palantir.atlasdb.backup.AtlasBackupResource; import com.palantir.atlasdb.config.AuxiliaryRemotingParameters; import com.palantir.atlasdb.config.ImmutableLeaderConfig; import com.palantir.atlasdb.config.ImmutableServerListConfig; @@ -332,30 +333,34 @@ private void createAndRegisterResources() { Function lockServiceGetter = namespace -> namespaces.get(namespace).getLockService(); + RedirectRetryTargeter redirectRetryTargeter = redirectRetryTargeter(); if (undertowRegistrar.isPresent()) { Consumer presentUndertowRegistrar = undertowRegistrar.get(); registerCorruptionHandlerWrappedService( presentUndertowRegistrar, - ConjureTimelockResource.undertow(redirectRetryTargeter(), asyncTimelockServiceGetter)); + ConjureTimelockResource.undertow(redirectRetryTargeter, asyncTimelockServiceGetter)); registerCorruptionHandlerWrappedService( presentUndertowRegistrar, - ConjureLockWatchingResource.undertow(redirectRetryTargeter(), asyncTimelockServiceGetter)); + ConjureLockWatchingResource.undertow(redirectRetryTargeter, asyncTimelockServiceGetter)); registerCorruptionHandlerWrappedService( - presentUndertowRegistrar, - ConjureLockV1Resource.undertow(redirectRetryTargeter(), lockServiceGetter)); + presentUndertowRegistrar, ConjureLockV1Resource.undertow(redirectRetryTargeter, lockServiceGetter)); registerCorruptionHandlerWrappedService( presentUndertowRegistrar, TimeLockPaxosHistoryProviderResource.undertow(corruptionComponents.localHistoryLoader())); registerCorruptionHandlerWrappedService( presentUndertowRegistrar, - MultiClientConjureTimelockResource.undertow(redirectRetryTargeter(), asyncTimelockServiceGetter)); + MultiClientConjureTimelockResource.undertow(redirectRetryTargeter, asyncTimelockServiceGetter)); + registerCorruptionHandlerWrappedService( + presentUndertowRegistrar, + AtlasBackupResource.undertow(redirectRetryTargeter, asyncTimelockServiceGetter)); } else { - registrar.accept(ConjureTimelockResource.jersey(redirectRetryTargeter(), asyncTimelockServiceGetter)); - registrar.accept(ConjureLockWatchingResource.jersey(redirectRetryTargeter(), asyncTimelockServiceGetter)); - registrar.accept(ConjureLockV1Resource.jersey(redirectRetryTargeter(), lockServiceGetter)); + registrar.accept(ConjureTimelockResource.jersey(redirectRetryTargeter, asyncTimelockServiceGetter)); + registrar.accept(ConjureLockWatchingResource.jersey(redirectRetryTargeter, asyncTimelockServiceGetter)); + registrar.accept(ConjureLockV1Resource.jersey(redirectRetryTargeter, lockServiceGetter)); registrar.accept(TimeLockPaxosHistoryProviderResource.jersey(corruptionComponents.localHistoryLoader())); registrar.accept( - MultiClientConjureTimelockResource.jersey(redirectRetryTargeter(), asyncTimelockServiceGetter)); + MultiClientConjureTimelockResource.jersey(redirectRetryTargeter, asyncTimelockServiceGetter)); + registrar.accept(AtlasBackupResource.jersey(redirectRetryTargeter, asyncTimelockServiceGetter)); } } diff --git a/timelock-api/src/main/conjure/timelock-api.yml b/timelock-api/src/main/conjure/timelock-api.yml index 723d6f1c932..6c31677bd83 100644 --- a/timelock-api/src/main/conjure/timelock-api.yml +++ b/timelock-api/src/main/conjure/timelock-api.yml @@ -20,6 +20,10 @@ types: base-type: any external: java: com.palantir.lock.v2.Lease + LockToken: + base-type: any + external: + java: com.palantir.lock.v2.LockToken LockWatchStateUpdate: base-type: any external: @@ -82,6 +86,29 @@ types: union: successful: SuccessfulLockResponse unsuccessful: UnsuccessfulLockResponse + InProgressBackupToken: + fields: + namespace: Namespace + lockToken: LockToken + immutableTimestamp: Long + backupStartTimestamp: Long + CompletedBackup: + fields: + namespace: Namespace + backupStartTimestamp: Long + backupEndTimestamp: Long + PrepareBackupRequest: + fields: + namespaces: set + PrepareBackupResponse: + fields: + successful: set + CompleteBackupRequest: + fields: + backupTokens: set + CompleteBackupResponse: + fields: + successfulBackups: set ConjureWaitForLocksResponse: fields: wasSuccessful: boolean @@ -117,6 +144,22 @@ types: leaderTimes: map services: + AtlasBackupClient: + name: Internal backup service + default-auth: header + package: com.palantir.atlasdb.backup.api + base-path: /backup + endpoints: + prepareBackup: + http: POST /prepare + args: + request: PrepareBackupRequest + returns: PrepareBackupResponse + completeBackup: + http: POST /complete + args: + request: CompleteBackupRequest + returns: CompleteBackupResponse ConjureTimelockService: name: Timelock service default-auth: header diff --git a/timelock-impl/src/main/java/com/palantir/atlasdb/backup/AtlasBackupResource.java b/timelock-impl/src/main/java/com/palantir/atlasdb/backup/AtlasBackupResource.java new file mode 100644 index 00000000000..aa7fceb3992 --- /dev/null +++ b/timelock-impl/src/main/java/com/palantir/atlasdb/backup/AtlasBackupResource.java @@ -0,0 +1,171 @@ +/* + * (c) Copyright 2021 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.backup; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.palantir.atlasdb.backup.api.AtlasBackupClient; +import com.palantir.atlasdb.backup.api.AtlasBackupClientEndpoints; +import com.palantir.atlasdb.backup.api.UndertowAtlasBackupClient; +import com.palantir.atlasdb.futures.AtlasFutures; +import com.palantir.atlasdb.http.RedirectRetryTargeter; +import com.palantir.atlasdb.timelock.AsyncTimelockService; +import com.palantir.atlasdb.timelock.ConjureResourceExceptionHandler; +import com.palantir.atlasdb.timelock.api.CompleteBackupRequest; +import com.palantir.atlasdb.timelock.api.CompleteBackupResponse; +import com.palantir.atlasdb.timelock.api.CompletedBackup; +import com.palantir.atlasdb.timelock.api.InProgressBackupToken; +import com.palantir.atlasdb.timelock.api.Namespace; +import com.palantir.atlasdb.timelock.api.PrepareBackupRequest; +import com.palantir.atlasdb.timelock.api.PrepareBackupResponse; +import com.palantir.conjure.java.undertow.lib.UndertowService; +import com.palantir.lock.v2.IdentifiedTimeLockRequest; +import com.palantir.lock.v2.LockImmutableTimestampResponse; +import com.palantir.lock.v2.LockToken; +import com.palantir.logsafe.logger.SafeLogger; +import com.palantir.logsafe.logger.SafeLoggerFactory; +import com.palantir.tokens.auth.AuthHeader; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class AtlasBackupResource implements UndertowAtlasBackupClient { + private static final SafeLogger log = SafeLoggerFactory.get(AtlasBackupResource.class); + private final Function timelockServices; + private final ConjureResourceExceptionHandler exceptionHandler; + + @VisibleForTesting + AtlasBackupResource( + RedirectRetryTargeter redirectRetryTargeter, Function timelockServices) { + this.exceptionHandler = new ConjureResourceExceptionHandler(redirectRetryTargeter); + this.timelockServices = timelockServices; + } + + public static UndertowService undertow( + RedirectRetryTargeter redirectRetryTargeter, Function timelockServices) { + return AtlasBackupClientEndpoints.of(new AtlasBackupResource(redirectRetryTargeter, timelockServices)); + } + + public static AtlasBackupClient jersey( + RedirectRetryTargeter redirectRetryTargeter, Function timelockServices) { + return new JerseyAtlasBackupClientAdapter(new AtlasBackupResource(redirectRetryTargeter, timelockServices)); + } + + @Override + public ListenableFuture prepareBackup(AuthHeader authHeader, PrepareBackupRequest request) { + return handleExceptions(() -> Futures.immediateFuture(prepareBackupInternal(request))); + } + + private PrepareBackupResponse prepareBackupInternal(PrepareBackupRequest request) { + Set preparedBackups = + request.getNamespaces().stream().map(this::prepareBackup).collect(Collectors.toSet()); + return PrepareBackupResponse.of(preparedBackups); + } + + private InProgressBackupToken prepareBackup(Namespace namespace) { + AsyncTimelockService timelock = timelock(namespace); + LockImmutableTimestampResponse response = timelock.lockImmutableTimestamp(IdentifiedTimeLockRequest.create()); + long timestamp = timelock.getFreshTimestamp(); + return InProgressBackupToken.builder() + .namespace(namespace) + .lockToken(response.getLock()) + .immutableTimestamp(response.getImmutableTimestamp()) + .backupStartTimestamp(timestamp) + .build(); + } + + @Override + public ListenableFuture completeBackup( + AuthHeader authHeader, CompleteBackupRequest request) { + return handleExceptions(() -> completeBackupInternal(request)); + } + + @SuppressWarnings("ConstantConditions") + private ListenableFuture completeBackupInternal(CompleteBackupRequest request) { + Map>> futureMap = + request.getBackupTokens().stream().collect(Collectors.toMap(token -> token, this::completeBackupAsync)); + ListenableFuture> singleFuture = + AtlasFutures.allAsMap(futureMap, MoreExecutors.newDirectExecutorService()); + + return Futures.transform( + singleFuture, + map -> CompleteBackupResponse.of(ImmutableSet.copyOf(map.values())), + MoreExecutors.directExecutor()); + } + + @SuppressWarnings("ConstantConditions") // optional token is never null + private ListenableFuture> completeBackupAsync(InProgressBackupToken backupToken) { + return Futures.transform( + maybeUnlock(backupToken), + maybeToken -> maybeToken.map(_successfulUnlock -> fetchFastForwardTimestamp(backupToken)), + MoreExecutors.directExecutor()); + } + + @SuppressWarnings("ConstantConditions") // Set of locks is never null + private ListenableFuture> maybeUnlock(InProgressBackupToken backupToken) { + return Futures.transform( + timelock(backupToken.getNamespace()).unlock(ImmutableSet.of(backupToken.getLockToken())), + singletonOrEmptySet -> singletonOrEmptySet.stream().findFirst(), + MoreExecutors.directExecutor()); + } + + private CompletedBackup fetchFastForwardTimestamp(InProgressBackupToken backupToken) { + Namespace namespace = backupToken.getNamespace(); + long fastForwardTimestamp = timelock(namespace).getFreshTimestamp(); + return CompletedBackup.builder() + .namespace(namespace) + .backupStartTimestamp(backupToken.getBackupStartTimestamp()) + .backupEndTimestamp(fastForwardTimestamp) + .build(); + } + + private AsyncTimelockService timelock(Namespace namespace) { + return timelockServices.apply(namespace.get()); + } + + private ListenableFuture handleExceptions(Supplier> supplier) { + return exceptionHandler.handleExceptions(supplier); + } + + public static final class JerseyAtlasBackupClientAdapter implements AtlasBackupClient { + private final AtlasBackupResource resource; + + public JerseyAtlasBackupClientAdapter(AtlasBackupResource resource) { + this.resource = resource; + } + + @Override + public PrepareBackupResponse prepareBackup(AuthHeader authHeader, PrepareBackupRequest request) { + return unwrap(resource.prepareBackup(authHeader, request)); + } + + @Override + public CompleteBackupResponse completeBackup(AuthHeader authHeader, CompleteBackupRequest request) { + return unwrap(resource.completeBackup(authHeader, request)); + } + + private static T unwrap(ListenableFuture future) { + return AtlasFutures.getUnchecked(future); + } + } +} diff --git a/timelock-impl/src/test/java/com/palantir/atlasdb/backup/AtlasBackupResourceTest.java b/timelock-impl/src/test/java/com/palantir/atlasdb/backup/AtlasBackupResourceTest.java new file mode 100644 index 00000000000..41d7ad4f901 --- /dev/null +++ b/timelock-impl/src/test/java/com/palantir/atlasdb/backup/AtlasBackupResourceTest.java @@ -0,0 +1,175 @@ +/* + * (c) Copyright 2021 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.backup; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.Futures; +import com.palantir.atlasdb.futures.AtlasFutures; +import com.palantir.atlasdb.http.RedirectRetryTargeter; +import com.palantir.atlasdb.timelock.AsyncTimelockService; +import com.palantir.atlasdb.timelock.api.CompleteBackupRequest; +import com.palantir.atlasdb.timelock.api.CompleteBackupResponse; +import com.palantir.atlasdb.timelock.api.CompletedBackup; +import com.palantir.atlasdb.timelock.api.InProgressBackupToken; +import com.palantir.atlasdb.timelock.api.Namespace; +import com.palantir.atlasdb.timelock.api.PrepareBackupRequest; +import com.palantir.atlasdb.timelock.api.PrepareBackupResponse; +import com.palantir.atlasdb.util.TimelockTestUtils; +import com.palantir.lock.v2.LockImmutableTimestampResponse; +import com.palantir.lock.v2.LockToken; +import com.palantir.tokens.auth.AuthHeader; +import java.net.URL; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import org.junit.Test; + +public class AtlasBackupResourceTest { + private static final int REMOTE_PORT = 4321; + private static final URL LOCAL = TimelockTestUtils.url("https://localhost:1234"); + private static final URL REMOTE = TimelockTestUtils.url("https://localhost:" + REMOTE_PORT); + private static final RedirectRetryTargeter TARGETER = RedirectRetryTargeter.create(LOCAL, List.of(LOCAL, REMOTE)); + + private static final AuthHeader AUTH_HEADER = AuthHeader.valueOf("header"); + private static final Namespace NAMESPACE = Namespace.of("test"); + private static final Namespace OTHER_NAMESPACE = Namespace.of("other"); + private static final PrepareBackupRequest PREPARE_BACKUP_REQUEST = + PrepareBackupRequest.of(ImmutableSet.of(NAMESPACE)); + private static final long IMMUTABLE_TIMESTAMP = 1L; + private static final long BACKUP_START_TIMESTAMP = 2L; + private static final CompleteBackupResponse EMPTY_COMPLETE_BACKUP_RESPONSE = + CompleteBackupResponse.of(ImmutableSet.of()); + + private final AsyncTimelockService mockTimelock = mock(AsyncTimelockService.class); + private final AsyncTimelockService otherTimelock = mock(AsyncTimelockService.class); + + private final AtlasBackupResource atlasBackupService = + new AtlasBackupResource(TARGETER, str -> str.equals("test") ? mockTimelock : otherTimelock); + + @Test + public void preparesBackupSuccessfully() { + LockToken lockToken = lockToken(); + when(mockTimelock.lockImmutableTimestamp(any())) + .thenReturn(LockImmutableTimestampResponse.of(IMMUTABLE_TIMESTAMP, lockToken)); + when(mockTimelock.getFreshTimestamp()).thenReturn(BACKUP_START_TIMESTAMP); + + InProgressBackupToken expectedBackupToken = inProgressBackupToken(lockToken); + + assertThat(AtlasFutures.getUnchecked(atlasBackupService.prepareBackup(AUTH_HEADER, PREPARE_BACKUP_REQUEST))) + .isEqualTo(prepareBackupResponseWith(expectedBackupToken)); + } + + @Test + public void completeBackupContainsNamespaceWhenLockIsHeld() { + when(mockTimelock.getFreshTimestamp()).thenReturn(3L); + + InProgressBackupToken backupToken = validBackupToken(); + CompletedBackup expected = completedBackup(backupToken); + + assertThat(AtlasFutures.getUnchecked( + atlasBackupService.completeBackup(AUTH_HEADER, completeBackupRequest(backupToken)))) + .isEqualTo(completeBackupResponseWith(expected)); + } + + @Test + public void completeBackupDoesNotContainNamespaceWhenLockIsLost() { + InProgressBackupToken backupToken = invalidBackupToken(); + + assertThat(AtlasFutures.getUnchecked( + atlasBackupService.completeBackup(AUTH_HEADER, completeBackupRequest(backupToken)))) + .isEqualTo(EMPTY_COMPLETE_BACKUP_RESPONSE); + } + + @Test + public void completeBackupFiltersOutUnsuccessfulNamespaces() { + when(mockTimelock.getFreshTimestamp()).thenReturn(3L); + + InProgressBackupToken validToken = validBackupToken(); + InProgressBackupToken invalidToken = invalidBackupToken(OTHER_NAMESPACE, otherTimelock); + CompletedBackup expected = completedBackup(validToken); + + assertThat(AtlasFutures.getUnchecked(atlasBackupService.completeBackup( + AUTH_HEADER, completeBackupRequest(validToken, invalidToken)))) + .isEqualTo(completeBackupResponseWith(expected)); + } + + private InProgressBackupToken validBackupToken() { + LockToken lockToken = lockToken(); + InProgressBackupToken backupToken = inProgressBackupToken(lockToken); + + Set singleLockToken = ImmutableSet.of(lockToken); + when(mockTimelock.unlock(singleLockToken)).thenReturn(Futures.immediateFuture(singleLockToken)); + + return backupToken; + } + + private InProgressBackupToken invalidBackupToken() { + return invalidBackupToken(NAMESPACE, mockTimelock); + } + + private static InProgressBackupToken invalidBackupToken(Namespace namespace, AsyncTimelockService timelock) { + LockToken lockToken = lockToken(); + InProgressBackupToken backupToken = inProgressBackupToken(namespace, lockToken); + + when(timelock.unlock(ImmutableSet.of(lockToken))).thenReturn(Futures.immediateFuture(ImmutableSet.of())); + + return backupToken; + } + + private static PrepareBackupResponse prepareBackupResponseWith(InProgressBackupToken expected) { + return PrepareBackupResponse.of(ImmutableSet.of(expected)); + } + + private static CompleteBackupRequest completeBackupRequest(InProgressBackupToken... backupTokens) { + return CompleteBackupRequest.of(ImmutableSet.copyOf(backupTokens)); + } + + private static CompleteBackupResponse completeBackupResponseWith(CompletedBackup expected) { + return CompleteBackupResponse.of(ImmutableSet.of(expected)); + } + + private static InProgressBackupToken inProgressBackupToken(LockToken lockToken) { + return inProgressBackupToken(NAMESPACE, lockToken); + } + + private static InProgressBackupToken inProgressBackupToken(Namespace namespace, LockToken lockToken) { + return InProgressBackupToken.builder() + .namespace(namespace) + .lockToken(lockToken) + .immutableTimestamp(IMMUTABLE_TIMESTAMP) + .backupStartTimestamp(BACKUP_START_TIMESTAMP) + .build(); + } + + private static LockToken lockToken() { + UUID requestId = UUID.randomUUID(); + return LockToken.of(requestId); + } + + private static CompletedBackup completedBackup(InProgressBackupToken backupToken) { + return CompletedBackup.builder() + .namespace(backupToken.getNamespace()) + .backupStartTimestamp(backupToken.getBackupStartTimestamp()) + .backupEndTimestamp(3L) + .build(); + } +} diff --git a/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/ConjureTimelockResourceTest.java b/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/ConjureTimelockResourceTest.java index 6dd3afe1857..37c5646cae7 100644 --- a/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/ConjureTimelockResourceTest.java +++ b/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/ConjureTimelockResourceTest.java @@ -27,13 +27,13 @@ import com.google.common.util.concurrent.MoreExecutors; import com.palantir.atlasdb.http.RedirectRetryTargeter; import com.palantir.atlasdb.timelock.api.ConjureTimelockService; +import com.palantir.atlasdb.util.TimelockTestUtils; import com.palantir.conjure.java.api.errors.QosException; import com.palantir.leader.NotCurrentLeaderException; import com.palantir.lock.impl.TooManyRequestsException; import com.palantir.lock.remoting.BlockingTimeoutException; import com.palantir.lock.v2.LeaderTime; import com.palantir.tokens.auth.AuthHeader; -import java.net.MalformedURLException; import java.net.URL; import java.time.Duration; import java.util.concurrent.ExecutionException; @@ -47,8 +47,8 @@ public class ConjureTimelockResourceTest { private static final AuthHeader AUTH_HEADER = AuthHeader.valueOf("Bearer test"); private static final int REMOTE_PORT = 4321; - private static final URL LOCAL = url("https://localhost:1234"); - private static final URL REMOTE = url("https://localhost:" + REMOTE_PORT); + private static final URL LOCAL = TimelockTestUtils.url("https://localhost:1234"); + private static final URL REMOTE = TimelockTestUtils.url("https://localhost:" + REMOTE_PORT); private static final RedirectRetryTargeter TARGETER = RedirectRetryTargeter.create(LOCAL, ImmutableList.of(LOCAL, REMOTE)); @@ -160,12 +160,4 @@ public Void visit(QosException.Unavailable exception) { return null; } } - - private static URL url(String url) { - try { - return new URL(url); - } catch (MalformedURLException e) { - throw new RuntimeException(e); - } - } } diff --git a/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/batch/MultiClientConjureTimelockResourceTest.java b/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/batch/MultiClientConjureTimelockResourceTest.java index c469fb29617..2d1192fba5d 100644 --- a/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/batch/MultiClientConjureTimelockResourceTest.java +++ b/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/batch/MultiClientConjureTimelockResourceTest.java @@ -34,6 +34,7 @@ import com.palantir.atlasdb.timelock.api.GetCommitTimestampsResponse; import com.palantir.atlasdb.timelock.api.LeaderTimes; import com.palantir.atlasdb.timelock.api.Namespace; +import com.palantir.atlasdb.util.TimelockTestUtils; import com.palantir.common.streams.KeyedStream; import com.palantir.common.time.NanoTime; import com.palantir.conjure.java.api.errors.QosException.RetryOther; @@ -47,7 +48,6 @@ import com.palantir.lock.v2.PartitionedTimestamps; import com.palantir.lock.watch.LockWatchStateUpdate; import com.palantir.tokens.auth.AuthHeader; -import java.net.MalformedURLException; import java.net.URL; import java.time.Duration; import java.util.HashMap; @@ -62,8 +62,8 @@ public class MultiClientConjureTimelockResourceTest { private static final AuthHeader AUTH_HEADER = AuthHeader.valueOf("Bearer test"); private static final int REMOTE_PORT = 4321; - private static final URL LOCAL = url("https://localhost:1234"); - private static final URL REMOTE = url("https://localhost:" + REMOTE_PORT); + private static final URL LOCAL = TimelockTestUtils.url("https://localhost:1234"); + private static final URL REMOTE = TimelockTestUtils.url("https://localhost:" + REMOTE_PORT); private static final RedirectRetryTargeter TARGETER = RedirectRetryTargeter.create(LOCAL, ImmutableList.of(LOCAL, REMOTE)); private static final int DUMMY_COMMIT_TS_COUNT = 5; @@ -211,12 +211,4 @@ private GetCommitTimestampsResponse getCommitTimestampResponse(String namespace) private Integer getInclusiveLowerCommitTs(String namespace) { return namespaceToCommitTsLowerBound.computeIfAbsent(namespace, _u -> commitTsLowerInclusive++); } - - private static URL url(String url) { - try { - return new URL(url); - } catch (MalformedURLException e) { - throw new RuntimeException(e); - } - } } diff --git a/timelock-impl/src/test/java/com/palantir/atlasdb/util/TimelockTestUtils.java b/timelock-impl/src/test/java/com/palantir/atlasdb/util/TimelockTestUtils.java new file mode 100644 index 00000000000..7f8dac45830 --- /dev/null +++ b/timelock-impl/src/test/java/com/palantir/atlasdb/util/TimelockTestUtils.java @@ -0,0 +1,32 @@ +/* + * (c) Copyright 2021 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.util; + +import java.net.MalformedURLException; +import java.net.URL; + +public final class TimelockTestUtils { + private TimelockTestUtils() {} + + public static URL url(String url) { + try { + return new URL(url); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } +}