Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

ABR 18: More ETEs #5911

Merged
merged 32 commits into from
Feb 21, 2022
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
164d860
completeBackup ete
gsheasby Feb 14, 2022
a673645
Add restore methods to BackupAndRestoreResource
gsheasby Feb 14, 2022
d9cc93f
basic restore tests
gsheasby Feb 14, 2022
d1ba2e6
wiring
gsheasby Feb 15, 2022
0493647
addTodo patiently
gsheasby Feb 15, 2022
1ba7fa7
only register backup and restore resource if runtime config exists
gsheasby Feb 15, 2022
0da4925
ignorance is bliss
gsheasby Feb 15, 2022
e4b85cf
check timelock is disabled/reenabled
gsheasby Feb 15, 2022
6a1f76b
try commenting addTodo
gsheasby Feb 15, 2022
462eb4a
fix token
gsheasby Feb 15, 2022
0177cdb
[temp] alert when shutting down pooling
gsheasby Feb 15, 2022
b2a26aa
refactors
gsheasby Feb 15, 2022
0e1c6c1
fix exception assert
gsheasby Feb 15, 2022
3875630
fine, be that way
gsheasby Feb 17, 2022
63cc107
fix test setup
gsheasby Feb 17, 2022
4b17080
fix test interdependence
gsheasby Feb 17, 2022
1d813ee
fix assert
gsheasby Feb 17, 2022
72bc02b
optimism
gsheasby Feb 17, 2022
98239d9
close kvs for prod only
gsheasby Feb 17, 2022
2b2da18
internal
gsheasby Feb 17, 2022
84febf6
Make TimestampManagementService work for disabled namespaces
gsheasby Feb 17, 2022
72231c2
fix ete setup
gsheasby Feb 17, 2022
62a0bf2
CassRepairHelper: use KvsRunner
gsheasby Feb 17, 2022
9f2a68b
consistency
gsheasby Feb 17, 2022
726c960
remove debug code
gsheasby Feb 17, 2022
ecfa876
check
gsheasby Feb 17, 2022
7dbd4e6
Add generated changelog entries
svc-changelog Feb 17, 2022
c26ac47
Autorelease 0.546.0-rc1
gsheasby Feb 21, 2022
61edb8b
getIgnoringDisabled should not add to cache
gsheasby Feb 21, 2022
0aca074
more fixes
gsheasby Feb 21, 2022
3167013
Add generated changelog entries
svc-changelog Feb 21, 2022
98f11df
avoid race condition
gsheasby Feb 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.palantir.atlasdb.http.AtlasDbRemotingConstants;
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.timelock.api.Namespace;
import com.palantir.atlasdb.transaction.api.TransactionManager;
import com.palantir.conjure.java.api.config.service.ServicesConfigBlock;
import com.palantir.conjure.java.api.config.service.UserAgent;
import com.palantir.dialogue.clients.DialogueClients;
Expand Down Expand Up @@ -75,20 +76,22 @@ public static AtlasBackupService create(
reloadingFactory.get(AtlasBackupClientBlocking.class, serviceName));

BackupPersister backupPersister = new ExternalBackupPersister(backupFolderFactory);
KvsRunner kvsRunner = KvsRunner.create(keyValueServiceFactory);
CoordinationServiceRecorder coordinationServiceRecorder =
new CoordinationServiceRecorder(keyValueServiceFactory, backupPersister);
new CoordinationServiceRecorder(kvsRunner, backupPersister);

return new AtlasBackupService(authHeader, atlasBackupClient, coordinationServiceRecorder, backupPersister);
}

public static AtlasBackupService create(
public static AtlasBackupService createForTests(
AuthHeader authHeader,
AtlasBackupClient atlasBackupClient,
Function<Namespace, Path> backupFolderFactory,
Function<Namespace, KeyValueService> keyValueServiceFactory) {
TransactionManager transactionManager,
Function<Namespace, Path> backupFolderFactory) {
BackupPersister backupPersister = new ExternalBackupPersister(backupFolderFactory);
KvsRunner kvsRunner = KvsRunner.create(transactionManager);
CoordinationServiceRecorder coordinationServiceRecorder =
new CoordinationServiceRecorder(keyValueServiceFactory, backupPersister);
new CoordinationServiceRecorder(kvsRunner, backupPersister);

return new AtlasBackupService(authHeader, atlasBackupClient, coordinationServiceRecorder, backupPersister);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.palantir.atlasdb.timelock.api.UnsuccessfulDisableNamespacesResponse;
import com.palantir.atlasdb.timelock.api.management.TimeLockManagementService;
import com.palantir.atlasdb.timelock.api.management.TimeLockManagementServiceBlocking;
import com.palantir.atlasdb.transaction.api.TransactionManager;
import com.palantir.common.annotation.NonIdempotent;
import com.palantir.common.streams.KeyedStream;
import com.palantir.conjure.java.api.config.service.ServicesConfigBlock;
Expand Down Expand Up @@ -97,7 +98,21 @@ public static AtlasRestoreService create(
TimeLockManagementService timeLockManagementService = new DialogueAdaptingTimeLockManagementService(
reloadingFactory.get(TimeLockManagementServiceBlocking.class, serviceName));
CassandraRepairHelper cassandraRepairHelper =
new CassandraRepairHelper(keyValueServiceConfigFactory, keyValueServiceFactory);
new CassandraRepairHelper(KvsRunner.create(keyValueServiceFactory), keyValueServiceConfigFactory);

return new AtlasRestoreService(
authHeader, atlasRestoreClient, timeLockManagementService, backupPersister, cassandraRepairHelper);
}

public static AtlasRestoreService createForTests(
AuthHeader authHeader,
AtlasRestoreClient atlasRestoreClient,
TimeLockManagementService timeLockManagementService,
BackupPersister backupPersister,
TransactionManager transactionManager,
Function<Namespace, CassandraKeyValueServiceConfig> keyValueServiceConfigFactory) {
CassandraRepairHelper cassandraRepairHelper =
new CassandraRepairHelper(KvsRunner.create(transactionManager), keyValueServiceConfigFactory);

return new AtlasRestoreService(
authHeader, atlasRestoreClient, timeLockManagementService, backupPersister, cassandraRepairHelper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,15 @@
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import java.util.Optional;
import java.util.function.Function;

final class CoordinationServiceRecorder {
private static final SafeLogger log = SafeLoggerFactory.get(CoordinationServiceRecorder.class);

private final Function<Namespace, KeyValueService> keyValueServiceFactory;
private final KvsRunner kvsRunner;
private final BackupPersister backupPersister;

CoordinationServiceRecorder(
Function<Namespace, KeyValueService> keyValueServiceFactory, BackupPersister backupPersister) {
this.keyValueServiceFactory = keyValueServiceFactory;
CoordinationServiceRecorder(KvsRunner kvsRunner, BackupPersister backupPersister) {
this.kvsRunner = kvsRunner;
this.backupPersister = backupPersister;
}

Expand All @@ -60,15 +58,17 @@ private void logEmptyMetadata(Namespace namespace) {
}

private Optional<InternalSchemaMetadataState> fetchSchemaMetadata(Namespace namespace, long timestamp) {
try (KeyValueService kvs = keyValueServiceFactory.apply(namespace)) {
if (!kvs.getAllTableNames().contains(AtlasDbConstants.COORDINATION_TABLE)) {
return Optional.empty();
}
CoordinationService<InternalSchemaMetadata> coordination =
CoordinationServices.createDefault(kvs, () -> timestamp, false);
return kvsRunner.run(namespace, kvs -> getInternalSchemaMetadataState(kvs, timestamp));
}

return Optional.of(InternalSchemaMetadataState.of(getValidMetadata(coordination, timestamp)));
private Optional<InternalSchemaMetadataState> getInternalSchemaMetadataState(KeyValueService kvs, long timestamp) {
if (!kvs.getAllTableNames().contains(AtlasDbConstants.COORDINATION_TABLE)) {
return Optional.empty();
}
CoordinationService<InternalSchemaMetadata> coordination =
CoordinationServices.createDefault(kvs, () -> timestamp, false);

return Optional.of(InternalSchemaMetadataState.of(getValidMetadata(coordination, timestamp)));
}

private ValueAndBound<InternalSchemaMetadata> getValidMetadata(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* (c) Copyright 2022 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.backup;

import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.timelock.api.Namespace;
import java.util.function.Function;

final class ClosingKvsRunner implements KvsRunner {
private final Function<Namespace, KeyValueService> keyValueServiceFactory;

ClosingKvsRunner(Function<Namespace, KeyValueService> keyValueServiceFactory) {
this.keyValueServiceFactory = keyValueServiceFactory;
}

@Override
public <T> T run(Namespace namespace, Function<KeyValueService, T> function) {
try (KeyValueService kvs = keyValueServiceFactory.apply(namespace)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a bit excessive to create and close on every call -- unless we really only do it once. We should Ideally just make the service closeable and close the KVS on close if necessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a given namespace, the KVS is indeed created and fetched exactly once per backup or restore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then gtg

return function.apply(kvs);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* (c) Copyright 2022 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.backup;

import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.timelock.api.Namespace;
import com.palantir.atlasdb.transaction.api.TransactionManager;
import java.util.function.Function;

public interface KvsRunner {
<T> T run(Namespace namespace, Function<KeyValueService, T> function);

static KvsRunner create(Function<Namespace, KeyValueService> kvsFactory) {
return new ClosingKvsRunner(kvsFactory);
}

static KvsRunner create(TransactionManager txnManager) {
return new TransactionManagerKvsRunner(txnManager);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* (c) Copyright 2022 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.backup;

import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.timelock.api.Namespace;
import com.palantir.atlasdb.transaction.api.TransactionManager;
import java.util.function.Function;

final class TransactionManagerKvsRunner implements KvsRunner {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe rename

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TransactionManagerScopedKvsRunner?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant more agnostic and descriptive of behaviour/intent: SharedKvsRunner or even NonClosingKvsRunner?

private final TransactionManager txnManager;

TransactionManagerKvsRunner(TransactionManager txnManager) {
this.txnManager = txnManager;
}

@Override
public <T> T run(Namespace _namespace, Function<KeyValueService, T> function) {
return function.apply(txnManager.getKeyValueService());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.RangeSet;
import com.google.common.collect.Sets;
import com.palantir.atlasdb.AtlasDbConstants;
import com.palantir.atlasdb.backup.KvsRunner;
import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig;
import com.palantir.atlasdb.cassandra.backup.transaction.TransactionsTableInteraction;
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
Expand Down Expand Up @@ -52,14 +53,13 @@ public class CassandraRepairHelper {
Sets.union(ImmutableSet.of(AtlasDbConstants.COORDINATION_TABLE), TargetedSweepTables.REPAIR_ON_RESTORE);

private final Function<Namespace, CassandraKeyValueServiceConfig> keyValueServiceConfigFactory;
private final Function<Namespace, KeyValueService> keyValueServiceFactory;
private final LoadingCache<Namespace, CqlCluster> cqlClusters;
private final KvsRunner kvsRunner;

public CassandraRepairHelper(
Function<Namespace, CassandraKeyValueServiceConfig> keyValueServiceConfigFactory,
Function<Namespace, KeyValueService> keyValueServiceFactory) {
KvsRunner kvsRunner, Function<Namespace, CassandraKeyValueServiceConfig> keyValueServiceConfigFactory) {
this.kvsRunner = kvsRunner;
this.keyValueServiceConfigFactory = keyValueServiceConfigFactory;
this.keyValueServiceFactory = keyValueServiceFactory;

this.cqlClusters = Caffeine.newBuilder()
.maximumSize(100)
Expand All @@ -83,11 +83,16 @@ private CqlCluster getCqlClusterUncached(Namespace namespace) {
}

public void repairInternalTables(Namespace namespace, BiConsumer<String, RangesForRepair> repairTable) {
KeyValueService kvs = keyValueServiceFactory.apply(namespace);
kvsRunner.run(namespace, kvs -> repairInternalTables(kvs, namespace, repairTable));
}

public Void repairInternalTables(
KeyValueService kvs, Namespace namespace, BiConsumer<String, RangesForRepair> repairTable) {
CqlCluster cqlCluster = cqlClusters.get(namespace);
KeyedStream.of(getTableNamesToRepair(kvs))
.map(tableName -> getRangesToRepair(cqlCluster, namespace, tableName))
.forEach(repairTable);
return null;
}

private static Stream<String> getTableNamesToRepair(KeyValueService kvs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ protected final void check() {
checkSweepConfigs();
}

@Value.Derived
public boolean remoteTimestampAndLockOrLeaderBlocksPresent() {
return (timestamp().isPresent() && lock().isPresent()) || leader().isPresent();
}

private void checkSweepConfigs() {
if (getSweepBatchSize() != null
|| getSweepCellBatchSize() != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.palantir.atlasdb.config;

import com.palantir.logsafe.Preconditions;
import com.palantir.refreshable.Refreshable;
import java.util.Optional;
import java.util.Set;
Expand All @@ -25,6 +26,30 @@ private ServerListConfigs() {
// utilities
}

public static Refreshable<ServerListConfig> getTimeLockServersFromAtlasDbConfig(
AtlasDbConfig config, Optional<AtlasDbRuntimeConfig> atlasDbRuntimeConfig) {
Refreshable<Optional<TimeLockRuntimeConfig>> timelockRuntimeConfig =
Refreshable.only(atlasDbRuntimeConfig.flatMap(AtlasDbRuntimeConfig::timelockRuntime));
return getServerListConfigSupplierForTimeLock(config, timelockRuntimeConfig);
}

public static Refreshable<ServerListConfig> getTimeLockServersFromAtlasDbConfig(
AtlasDbConfig config, Refreshable<AtlasDbRuntimeConfig> runtimeConfigSupplier) {
Refreshable<Optional<TimeLockRuntimeConfig>> timelockRuntimeConfig =
runtimeConfigSupplier.map(AtlasDbRuntimeConfig::timelockRuntime);
return getServerListConfigSupplierForTimeLock(config, timelockRuntimeConfig);
}

private static Refreshable<ServerListConfig> getServerListConfigSupplierForTimeLock(
AtlasDbConfig config, Refreshable<Optional<TimeLockRuntimeConfig>> timelockRuntimeConfig) {
Preconditions.checkState(
!config.remoteTimestampAndLockOrLeaderBlocksPresent(),
"Cannot create raw services from timelock with another source of timestamps/locks configured!");
TimeLockClientConfig clientConfig = config.timelock()
.orElseGet(() -> ImmutableTimeLockClientConfig.builder().build());
return ServerListConfigs.parseInstallAndRuntimeConfigs(clientConfig, timelockRuntimeConfig);
}

public static Refreshable<ServerListConfig> parseInstallAndRuntimeConfigs(
TimeLockClientConfig installClientConfig, Refreshable<Optional<TimeLockRuntimeConfig>> runtimeConfig) {
return runtimeConfig.map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.palantir.atlasdb.factory;

import com.google.common.collect.ImmutableMap;
import com.palantir.atlasdb.backup.DialogueAdaptingTimeLockManagementService;
import com.palantir.atlasdb.config.AuxiliaryRemotingParameters;
import com.palantir.atlasdb.config.ServerListConfig;
import com.palantir.atlasdb.factory.timelock.ImmutableShortAndLongTimeoutServices;
Expand All @@ -34,6 +35,8 @@
import com.palantir.atlasdb.timelock.api.ConjureTimelockService;
import com.palantir.atlasdb.timelock.api.ConjureTimelockServiceBlocking;
import com.palantir.atlasdb.timelock.api.MultiClientConjureTimelockServiceBlocking;
import com.palantir.atlasdb.timelock.api.management.TimeLockManagementService;
import com.palantir.atlasdb.timelock.api.management.TimeLockManagementServiceBlocking;
import com.palantir.atlasdb.timelock.lock.watch.ConjureLockWatchingServiceBlocking;
import com.palantir.atlasdb.util.AtlasDbMetrics;
import com.palantir.conjure.java.api.config.service.UserAgent;
Expand Down Expand Up @@ -164,6 +167,12 @@ ConjureLockWatchingServiceBlocking getConjureLockWatchingService() {
wrapInProxy(ConjureLockWatchingServiceBlocking.class, blockingService));
}

public TimeLockManagementService getTimeLockManagementService() {
TimeLockManagementServiceBlocking blockingService =
dialogueClientFactory.get(TimeLockManagementServiceBlocking.class, TIMELOCK_SHORT_TIMEOUT);
return new DialogueAdaptingTimeLockManagementService(blockingService);
}

private <T> T createDialogueProxyWithShortTimeout(Class<T> type) {
return createDialogueProxy(type, dialogueClientFactory.getChannel(TIMELOCK_SHORT_TIMEOUT));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.palantir.atlasdb.config.AtlasDbRuntimeConfig;
import com.palantir.atlasdb.config.AuxiliaryRemotingParameters;
import com.palantir.atlasdb.config.ImmutableServerListConfig;
import com.palantir.atlasdb.config.ImmutableTimeLockClientConfig;
import com.palantir.atlasdb.config.LeaderConfig;
import com.palantir.atlasdb.config.RemotingClientConfigs;
import com.palantir.atlasdb.config.ServerListConfig;
Expand Down Expand Up @@ -76,7 +75,6 @@
import com.palantir.lock.v2.NamespacedTimelockRpcClient;
import com.palantir.lock.v2.TimelockRpcClient;
import com.palantir.lock.v2.TimelockService;
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.exceptions.SafeIllegalArgumentException;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import com.palantir.logsafe.logger.SafeLogger;
Expand Down Expand Up @@ -233,7 +231,7 @@ private static void assertNoSpuriousTimeLockBlockInRuntimeConfig(
AtlasDbConfig config, AtlasDbRuntimeConfig initialRuntimeConfig) {
// Note: The other direction (timelock install config without a runtime block) should be maintained for
// backwards compatibility.
if (remoteTimestampAndLockOrLeaderBlocksPresent(config)
if (config.remoteTimestampAndLockOrLeaderBlocksPresent()
&& initialRuntimeConfig.timelockRuntime().isPresent()) {
throw new SafeIllegalStateException("Found a service configured not to use timelock, with a timelock"
+ " block in the runtime config! This is unexpected. If you wish to use non-timelock services,"
Expand All @@ -242,11 +240,6 @@ private static void assertNoSpuriousTimeLockBlockInRuntimeConfig(
}
}

private static boolean remoteTimestampAndLockOrLeaderBlocksPresent(AtlasDbConfig config) {
return (config.timestamp().isPresent() && config.lock().isPresent())
|| config.leader().isPresent();
}

private static LockAndTimestampServices createRawServicesFromTimeLock(
MetricsManager metricsManager,
AtlasDbConfig config,
Expand Down Expand Up @@ -285,13 +278,7 @@ private static LockAndTimestampServices createRawServicesFromTimeLock(

static Refreshable<ServerListConfig> getServerListConfigSupplierForTimeLock(
AtlasDbConfig config, Refreshable<AtlasDbRuntimeConfig> runtimeConfigSupplier) {
Preconditions.checkState(
!remoteTimestampAndLockOrLeaderBlocksPresent(config),
"Cannot create raw services from timelock with another source of timestamps/locks configured!");
TimeLockClientConfig clientConfig = config.timelock()
.orElseGet(() -> ImmutableTimeLockClientConfig.builder().build());
return ServerListConfigs.parseInstallAndRuntimeConfigs(
clientConfig, runtimeConfigSupplier.map(AtlasDbRuntimeConfig::timelockRuntime));
return ServerListConfigs.getTimeLockServersFromAtlasDbConfig(config, runtimeConfigSupplier);
}

private static LockAndTimestampServices getLockAndTimestampServices(
Expand Down
Loading