Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[PDS-265616] Handle runtime config in migration CLI (#6186)
Browse files Browse the repository at this point in the history
AtlasDB CLI Migrate command supports optional `-frc / --fromRuntimeConfig` and `-mrc / --migrateRuntimeConfig` for specifying the runtime config file path for the initial and target KVSs. The optional `--inline-runtime-config` command allows for specifying the runtime config as a string for the initial KVS, and the optional `--runtime-config-root` command allows for specifying the config root within the file.
  • Loading branch information
mdaudali authored Sep 7, 2022
1 parent f0fda27 commit dd619ef
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public abstract class AbstractCommand implements Callable<Integer> {
name = {"--runtime-config-root"},
title = "RUNTIME CONFIG ROOT",
type = OptionType.GLOBAL,
description = "field in the config yaml file that contains the atlasdb configuration root")
description = "field in the runtime config yaml file that contains the atlasdb configuration root")
private String runtimeConfigRoot = AtlasDbConfigs.ATLASDB_CONFIG_OBJECT_PATH;

@Option(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,30 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.palantir.atlasdb.cli.output.OutputPrinter;
import com.palantir.atlasdb.compact.CompactorConfig;
import com.palantir.atlasdb.config.AtlasDbConfig;
import com.palantir.atlasdb.config.AtlasDbConfigs;
import com.palantir.atlasdb.config.AtlasDbRuntimeConfig;
import com.palantir.atlasdb.config.ImmutableAtlasDbConfig;
import com.palantir.atlasdb.config.ImmutableAtlasDbRuntimeConfig;
import com.palantir.atlasdb.config.SweepConfig;
import com.palantir.atlasdb.schema.KeyValueServiceMigrator;
import com.palantir.atlasdb.schema.KeyValueServiceValidator;
import com.palantir.atlasdb.services.AtlasDbServices;
import com.palantir.atlasdb.services.DaggerAtlasDbServices;
import com.palantir.atlasdb.services.ServicesConfigModule;
import com.palantir.atlasdb.sweep.queue.config.TargetedSweepRuntimeConfig;
import com.palantir.common.base.FunctionCheckedException;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import io.airlift.airline.OptionType;
import java.io.File;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.slf4j.LoggerFactory;

@Command(name = "migrate", description = "Migrate your data from one key value service to another.")
Expand Down Expand Up @@ -61,6 +70,29 @@ public class KvsMigrationCommand implements Callable<Integer> {
description = "field in the config yaml file that contains the atlasdb configuration root")
private String configRoot = AtlasDbConfigs.ATLASDB_CONFIG_OBJECT_PATH;

@Option(
name = {"-frc", "--fromRuntimeConfig"},
title = "RUNTIME CONFIG PATH",
description = "path to yaml runtime configuration file for the KVS you're migrating from. Note that this "
+ "config will not be reloaded while the CLI is running",
required = false)
private File fromRuntimeConfigFile;

@Option(
name = {"-mrc", "--migrateRuntimeConfig"},
title = "RUNTIME CONFIG PATH",
description = "path to yaml runtime configuration file for the KVS you're migrating to. Note that this "
+ "config will not be reloaded while the CLI is running",
required = false)
private File toRuntimeConfigFile;

@Option(
name = {"--runtime-config-root"},
title = "RUNTIME CONFIG ROOT",
type = OptionType.GLOBAL,
description = "field in the runtime config yaml file that contains the atlasdb configuration root")
private String runtimeConfigRoot = AtlasDbConfigs.ATLASDB_CONFIG_OBJECT_PATH;

@Option(
name = {"-t", "--threads"},
title = "THREADS",
Expand Down Expand Up @@ -107,6 +139,13 @@ public class KvsMigrationCommand implements Callable<Integer> {
description = "inline configuration file for atlasdb")
private String inlineConfig;

@Option(
name = {"--inline-runtime-config"},
title = "INLINE RUNTIME CONFIG",
type = OptionType.GLOBAL,
description = "inline runtime configuration file for atlasdb")
private String inlineRuntimeConfig;

@Override
public Integer call() throws Exception {
if (inlineConfig == null && toConfigFile == null) {
Expand Down Expand Up @@ -159,21 +198,27 @@ private AtlasDbConfig makeOfflineIfNecessary(AtlasDbConfig atlasDbConfig) {
}
}

public AtlasDbServices connectFromServices() throws IOException {
AtlasDbConfig fromConfig =
overrideTransactionTimeoutMillis(AtlasDbConfigs.load(fromConfigFile, configRoot, AtlasDbConfig.class));
ServicesConfigModule scm = ServicesConfigModule.create(
makeOfflineIfNecessary(fromConfig), AtlasDbRuntimeConfig.withSweepDisabled());
public AtlasDbServices connectFromServices() {
AtlasDbConfig fromConfig = overrideTransactionTimeoutMillis(
loadFromFile(fromConfigFile, configRoot, AtlasDbConfig.class).orElseThrow());

AtlasDbRuntimeConfig fromRuntimeConfig = disableSweepAndCompaction(
loadFromFile(fromRuntimeConfigFile, runtimeConfigRoot, AtlasDbRuntimeConfig.class)
.orElseGet(AtlasDbRuntimeConfig::withSweepDisabled));

ServicesConfigModule scm = ServicesConfigModule.create(makeOfflineIfNecessary(fromConfig), fromRuntimeConfig);
return DaggerAtlasDbServices.builder().servicesConfigModule(scm).build();
}

public AtlasDbServices connectToServices() throws IOException {
AtlasDbConfig toConfig = overrideTransactionTimeoutMillis(
toConfigFile != null
? AtlasDbConfigs.load(toConfigFile, configRoot, AtlasDbConfig.class)
: AtlasDbConfigs.loadFromString(inlineConfig, null, AtlasDbConfig.class));
ServicesConfigModule scm =
ServicesConfigModule.create(makeOfflineIfNecessary(toConfig), AtlasDbRuntimeConfig.withSweepDisabled());
public AtlasDbServices connectToServices() {
AtlasDbConfig toConfig = overrideTransactionTimeoutMillis(loadFromFileOrInline(
toConfigFile, configRoot, inlineConfig, AtlasDbConfig.class)
.orElseThrow(() -> new SafeRuntimeException("At least one of -mc / --inline-config is required")));

AtlasDbRuntimeConfig toRuntimeConfig = disableSweepAndCompaction(loadFromFileOrInline(
toRuntimeConfigFile, runtimeConfigRoot, inlineRuntimeConfig, AtlasDbRuntimeConfig.class)
.orElseGet(AtlasDbRuntimeConfig::withSweepDisabled));
ServicesConfigModule scm = ServicesConfigModule.create(makeOfflineIfNecessary(toConfig), toRuntimeConfig);
return DaggerAtlasDbServices.builder().servicesConfigModule(scm).build();
}

Expand All @@ -192,4 +237,35 @@ private KeyValueServiceMigrator getMigrator(AtlasDbServices fromServices, AtlasD
.batchSize(batchSize)
.build());
}

private static AtlasDbRuntimeConfig disableSweepAndCompaction(AtlasDbRuntimeConfig atlasDbRuntimeConfig) {
return ImmutableAtlasDbRuntimeConfig.builder()
.from(atlasDbRuntimeConfig)
.sweep(SweepConfig.disabled())
.targetedSweep(TargetedSweepRuntimeConfig.disabled())
.compact(CompactorConfig.disabled())
.build();
}

private static <K> Optional<K> loadFromFileOrInline(
@Nullable File configFile, String fileConfigRoot, @Nullable String inline, Class<K> clazz) {
return loadFromFile(configFile, fileConfigRoot, clazz).or(() -> Optional.ofNullable(inline)
.map(rethrowIoExceptionAsUnchecked(config -> AtlasDbConfigs.loadFromString(config, null, clazz))));
}

private static <K> Optional<K> loadFromFile(@Nullable File configFile, String fileConfigRoot, Class<K> clazz) {
return Optional.ofNullable(configFile)
.map(rethrowIoExceptionAsUnchecked(file -> AtlasDbConfigs.load(file, fileConfigRoot, clazz)));
}

private static <F, T, K extends IOException> Function<F, T> rethrowIoExceptionAsUnchecked(
FunctionCheckedException<F, T, K> function) {
return f -> {
try {
return function.apply(f);
} catch (IOException e) {
throw new SafeRuntimeException(e);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,19 @@
import com.palantir.atlasdb.AtlasDbConstants;
import com.palantir.atlasdb.cli.runner.AbstractTestRunner;
import com.palantir.atlasdb.cli.runner.InMemoryTestRunner;
import com.palantir.atlasdb.compact.CompactorConfig;
import com.palantir.atlasdb.config.AtlasDbConfigs;
import com.palantir.atlasdb.config.AtlasDbRuntimeConfig;
import com.palantir.atlasdb.config.ImmutableAtlasDbRuntimeConfig;
import com.palantir.atlasdb.config.SweepConfig;
import com.palantir.atlasdb.encoding.PtBytes;
import com.palantir.atlasdb.keyvalue.api.Cell;
import com.palantir.atlasdb.keyvalue.api.Namespace;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.services.AtlasDbServices;
import com.palantir.atlasdb.sweep.queue.config.TargetedSweepRuntimeConfig;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.concurrent.Callable;
Expand All @@ -35,12 +43,25 @@
import org.junit.Test;

public class TestKvsMigrationCommand {
private static final String[] EMPTY_ARGS = new String[] {};
private static final String RUNTIME_CONFIG = "sweep:\n"
+ " enabled: true\n"
+ "targetedSweep:\n"
+ " enabled: true\n"
+ "compact:\n"
+ " enableCompaction: true\n"
+ "keyValueService:\n"
+ " type: cassandra\n"
+ " servers:\n"
+ " - 127.0.0.1:1337\n"
+ " replicationFactor: 21";

private AtlasDbServices fromServices;
private AtlasDbServices toServices;

@Before
public void setupServices() throws Exception {
KvsMigrationCommand cmd = getCommand(new String[] {});
KvsMigrationCommand cmd = getCommand(EMPTY_ARGS, EMPTY_ARGS);
fromServices = cmd.connectFromServices();
toServices = cmd.connectToServices();
}
Expand All @@ -52,14 +73,46 @@ public void close() {
}

@Test
public void doesNotSweepDuringMigration() {
assertThat(fromServices.getAtlasDbRuntimeConfig().sweep().enabled()).contains(false);
assertThat(fromServices.getAtlasDbRuntimeConfig().targetedSweep().enabled())
.isFalse();
public void loadsRuntimeConfigFromFileWithSweepAndCompactionDisabled() throws URISyntaxException, IOException {
String runtimeFilePath = AbstractTestRunner.getResourcePath(InMemoryTestRunner.RUNTIME_CONFIG_LOCATION);
String[] commandArgs = new String[] {"-frc", runtimeFilePath, "-mrc", runtimeFilePath};
KvsMigrationCommand command = getCommand(commandArgs, EMPTY_ARGS);

AtlasDbRuntimeConfig expected = withDisabledSweepAndCompaction(loadFromFile());

assertThat(command.connectFromServices().getAtlasDbRuntimeConfig()).isEqualTo(expected);
assertThat(command.connectToServices().getAtlasDbRuntimeConfig()).isEqualTo(expected);
}

@Test
public void loadsInlineRuntimeConfigWithSweepAndCompactionDisabledForToService()
throws IOException, URISyntaxException {
String[] globalArgs = new String[] {"--inline-runtime-config", RUNTIME_CONFIG};
KvsMigrationCommand command = getCommand(EMPTY_ARGS, globalArgs);

AtlasDbRuntimeConfig expected = withDisabledSweepAndCompaction(loadFromString());

assertThat(command.connectToServices().getAtlasDbRuntimeConfig()).isEqualTo(expected);
}

@Test
public void loadsFileRuntimeConfigWhenBothInlineAndFileAreSpecified() throws URISyntaxException, IOException {
String runtimeFilePath = AbstractTestRunner.getResourcePath(InMemoryTestRunner.RUNTIME_CONFIG_LOCATION);
String[] commandArgs = new String[] {"-frc", runtimeFilePath, "-mrc", runtimeFilePath};
String[] globalArgs = new String[] {"--inline-runtime-config", RUNTIME_CONFIG};
KvsMigrationCommand command = getCommand(commandArgs, globalArgs);

assertThat(toServices.getAtlasDbRuntimeConfig().sweep().enabled()).contains(false);
assertThat(toServices.getAtlasDbRuntimeConfig().targetedSweep().enabled())
.isFalse();
AtlasDbRuntimeConfig expected = withDisabledSweepAndCompaction(loadFromFile());

assertThat(command.connectFromServices().getAtlasDbRuntimeConfig()).isEqualTo(expected);
}

@Test
public void usesDefaultRuntimeConfigWithSweepAndCompactionDisabledWhenFileNorInlineSpecified() {
AtlasDbRuntimeConfig expected = withDisabledSweepAndCompaction(AtlasDbRuntimeConfig.defaultRuntimeConfig());

assertThat(fromServices.getAtlasDbRuntimeConfig()).isEqualTo(expected);
assertThat(toServices.getAtlasDbRuntimeConfig()).isEqualTo(expected);
}

@Test
Expand Down Expand Up @@ -95,10 +148,11 @@ public void canMigrateMultipleTables() throws Exception {
checkKvs(toServices, 10, 257);
}

private KvsMigrationCommand getCommand(String[] args) throws URISyntaxException {
private KvsMigrationCommand getCommand(String[] commandArgs, String[] globalArgs) throws URISyntaxException {
String filePath = AbstractTestRunner.getResourcePath(InMemoryTestRunner.CONFIG_LOCATION);
String[] initArgs = new String[] {"migrate", "-fc", filePath, "-mc", filePath};
String[] fullArgs = ObjectArrays.concat(initArgs, args, String.class);
String[] fullArgs =
ObjectArrays.concat(globalArgs, ObjectArrays.concat(initArgs, commandArgs, String.class), String.class);
return AbstractTestRunner.buildCommand(KvsMigrationCommand.class, fullArgs);
}

Expand All @@ -107,7 +161,7 @@ private void assertSuccess(Callable<Integer> callable) throws Exception {
}

private int runWithOptions(String... args) throws URISyntaxException {
KvsMigrationCommand command = getCommand(args);
KvsMigrationCommand command = getCommand(args, EMPTY_ARGS);
return command.execute(fromServices, toServices);
}

Expand Down Expand Up @@ -150,4 +204,24 @@ private void checkKvs(AtlasDbServices services, int numTables, int numEntriesPer
private static TableReference getTableRef(int number) {
return TableReference.create(Namespace.create("ns"), "table" + number);
}

private static AtlasDbRuntimeConfig loadFromFile() throws URISyntaxException, IOException {
return AtlasDbConfigs.load(
new File(AbstractTestRunner.getResourcePath(InMemoryTestRunner.RUNTIME_CONFIG_LOCATION)),
AtlasDbConfigs.ATLASDB_CONFIG_OBJECT_PATH,
AtlasDbRuntimeConfig.class);
}

private static AtlasDbRuntimeConfig loadFromString() throws IOException {
return AtlasDbConfigs.loadFromString(RUNTIME_CONFIG, null, AtlasDbRuntimeConfig.class);
}

private static AtlasDbRuntimeConfig withDisabledSweepAndCompaction(AtlasDbRuntimeConfig config) {
return ImmutableAtlasDbRuntimeConfig.builder()
.from(config)
.sweep(SweepConfig.disabled())
.targetedSweep(TargetedSweepRuntimeConfig.disabled())
.compact(CompactorConfig.disabled())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
public class InMemoryTestRunner extends AbstractTestRunner {

public static final String CONFIG_LOCATION = "cli_test_config.yml";
public static final String RUNTIME_CONFIG_LOCATION = "cli_test_runtime_config.yml";

public InMemoryTestRunner(Class<? extends SingleBackendCommand> cmdClass, String... args) {
super(cmdClass, args);
Expand Down
20 changes: 20 additions & 0 deletions atlasdb-cli/src/test/resources/cli_test_runtime_config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
server:
applicationConnectors:
- type: http
port: 3828
adminConnectors:
- type: http
port: 3829

atlasdb:
sweep:
enabled: true
targetedSweep:
enabled: true
compact:
enableCompaction: true
keyValueService:
type: cassandra
servers:
- 127.0.0.1:1337
replicationFactor: 3
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,9 @@ default void checkIntervalsNonnegative() {
static CompactorConfig defaultCompactorConfig() {
return ImmutableCompactorConfig.builder().build();
}

static CompactorConfig disabled() {
// Explicitly disabled compaction in case default ever changes.
return ImmutableCompactorConfig.builder().enableCompaction(false).build();
}
}
10 changes: 10 additions & 0 deletions changelog/@unreleased/pr-6186.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
type: improvement
improvement:
description: AtlasDB CLI Migrate command supports optional `-frc / --fromRuntimeConfig`
and `-mrc / --migrateRuntimeConfig` for specifying the runtime config file path
for the initial and target KVSs. The optional `--inline-runtime-config` command
allows for specifying the runtime config as a string for the initial KVS, and
the optional `--runtime-config-root` command allows for specifying the config
root within the file.
links:
- https://github.com/palantir/atlasdb/pull/6186
6 changes: 3 additions & 3 deletions docs/source/cluster_management/kvs-migration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ Setup

.. code-block:: bash
./bin/atlasdb-cli --offline migrate --fromConfig from.yml --migrateConfig to.yml --setup
./bin/atlasdb-cli --offline migrate --fromConfig from.yml [--fromRuntimeConfig fromRuntime.yml] --migrateConfig to.yml [--migrateRuntimeConfig toRuntime.yml] --setup
Running this command will prepare the target KVS for the migration.
The CLI will first **drop all tables in the target KVS** except atomic tables and Cassandra hidden tables.
Expand All @@ -148,7 +148,7 @@ Migrate

.. code-block:: bash
./bin/atlasdb-cli --offline migrate --fromConfig from.yml --migrateConfig to.yml --migrate
./bin/atlasdb-cli --offline migrate --fromConfig from.yml [--fromRuntimeConfig fromRuntime.yml] --migrateConfig to.yml [--migrateRuntimeConfig toRuntime.yml] --migrate
Running this command will migrate the actual data from source KVS to target KVS.
For each table in the source KVS that is not in the list of special tables above, the entire table is transactionally
Expand All @@ -172,7 +172,7 @@ Validate

.. code-block:: bash
./bin/atlasdb-cli --offline migrate --fromConfig from.yml --migrateConfig to.yml --validate
./bin/atlasdb-cli --offline migrate --fromConfig from.yml [--fromRuntimeConfig fromRuntime.yml] --migrateConfig to.yml [--migrateRuntimeConfig toRuntime.yml] --validate
Running this command will validate the correctness of the migration.
For each table in the source KVS that can be migrated, except the legacy sweep priority tables, the table is
Expand Down

0 comments on commit dd619ef

Please sign in to comment.