diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/encoding/PtBytes.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/encoding/PtBytes.java index 9895480ea54..679a1017ee7 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/encoding/PtBytes.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/encoding/PtBytes.java @@ -17,6 +17,8 @@ import java.nio.charset.StandardCharsets; +import javax.annotation.Nullable; + import com.google.common.base.Function; import com.google.common.base.MoreObjects; import com.google.common.cache.Cache; @@ -120,13 +122,20 @@ public static String toString(final byte[] arr, int off, int len) { return new String(arr, off, len, StandardCharsets.UTF_8); } - public static String encodeHexString(byte[] name) { + public static String encodeHexString(@Nullable byte[] name) { if (name == null) { return ""; } return BaseEncoding.base16().lowerCase().encode(name); } + public static byte[] decodeHexString(@Nullable String hexString) { + if (hexString == null) { + return PtBytes.EMPTY_BYTE_ARRAY; + } + return BaseEncoding.base16().lowerCase().decode(hexString.toLowerCase()); + } + public static final Function BYTES_TO_HEX_STRING = PtBytes::encodeHexString; public static void addIfNotEmpty(MoreObjects.ToStringHelper helper, String name, byte[] bytes) { diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/SweepResults.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/SweepResults.java index 0bc397ede69..06f08f2331c 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/SweepResults.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/SweepResults.java @@ -46,15 +46,34 @@ public Optional getNextStartRow() { public abstract long getSweptTimestamp(); + /** + * Returns a new {@link SweepResults} representing cumulative results from this instance and {@code other}. Assumes + * that {@code other} represents results from subsequent iteration of sweep (i.e., it happened after the run that + * produced this instance). + */ + public SweepResults accumulateWith(SweepResults other) { + return SweepResults.builder() + .cellTsPairsExamined(getCellTsPairsExamined() + other.getCellTsPairsExamined()) + .staleValuesDeleted(getStaleValuesDeleted() + other.getStaleValuesDeleted()) + .sweptTimestamp(other.getSweptTimestamp()) + .nextStartRow(other.getNextStartRow()) + .build(); + } + public static ImmutableSweepResults.Builder builder() { return ImmutableSweepResults.builder(); } public static SweepResults createEmptySweepResult() { + return createEmptySweepResult(Optional.empty()); + } + + public static SweepResults createEmptySweepResult(Optional startRow) { return builder() .cellTsPairsExamined(0) .staleValuesDeleted(0) .sweptTimestamp(0) + .nextStartRow(startRow) .build(); } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/BackgroundSweeperImpl.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/BackgroundSweeperImpl.java index e214321b538..83c5efd3938 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/BackgroundSweeperImpl.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/BackgroundSweeperImpl.java @@ -26,7 +26,6 @@ import com.google.common.base.Supplier; import com.palantir.atlasdb.keyvalue.api.InsufficientConsistencyException; import com.palantir.atlasdb.keyvalue.api.TableReference; -import com.palantir.atlasdb.logging.LoggingArgs; import com.palantir.atlasdb.sweep.priority.NextTableToSweepProvider; import com.palantir.atlasdb.sweep.priority.NextTableToSweepProviderImpl; import com.palantir.atlasdb.sweep.progress.SweepProgress; @@ -35,7 +34,6 @@ import com.palantir.common.base.Throwables; import com.palantir.lock.LockService; import com.palantir.logsafe.SafeArg; -import com.palantir.logsafe.UnsafeArg; public final class BackgroundSweeperImpl implements BackgroundSweeper { private static final Logger log = LoggerFactory.getLogger(BackgroundSweeperImpl.class); @@ -147,7 +145,7 @@ private long grabLocksAndRun(SweepLocks locks) throws InterruptedException { if (checkAndRepairTableDrop()) { log.info("The table being swept by the background sweeper was dropped, moving on..."); } else { - SweepBatchConfig lastBatchConfig = getAdjustedBatchConfig(); + SweepBatchConfig lastBatchConfig = specificTableSweeper.getAdjustedBatchConfig(); log.warn("The background sweep job failed unexpectedly with candidate batch size {}," + " delete batch size {}," + " and {} cell+timestamp pairs to examine." @@ -181,24 +179,11 @@ boolean runOnce() { log.debug("Skipping sweep because no table has enough new writes to be worth sweeping at the moment."); return false; } else { - specificTableSweeper.runOnceForTable(tableToSweep.get(), Optional.empty(), true); + specificTableSweeper.runOnceAndSaveResults(tableToSweep.get()); return true; } } - private SweepBatchConfig getAdjustedBatchConfig() { - SweepBatchConfig baseConfig = specificTableSweeper.getSweepBatchConfig().get(); - return ImmutableSweepBatchConfig.builder() - .maxCellTsPairsToExamine(adjustBatchParameter(baseConfig.maxCellTsPairsToExamine())) - .candidateBatchSize(adjustBatchParameter(baseConfig.candidateBatchSize())) - .deleteBatchSize(adjustBatchParameter(baseConfig.deleteBatchSize())) - .build(); - } - - static int adjustBatchParameter(int parameterValue) { - return Math.max(1, (int) (batchSizeMultiplier * parameterValue)); - } - // there's a bug in older jdk8s around type inference here, don't make the same mistake two of us made // and try to lambda refactor this unless you live far enough in the future that this isn't an issue private Optional getTableToSweep() { @@ -209,17 +194,12 @@ public Optional execute(Transaction tx) { Optional progress = specificTableSweeper.getSweepProgressStore().loadProgress( tx); if (progress.isPresent()) { - log.info("Sweeping another batch of table: {}. Batch starts on row {}", - LoggingArgs.tableRef("table name", progress.get().tableRef()), - UnsafeArg.of("startRow", progress.get().startRow())); - return Optional.of(new TableToSweep(progress.get().tableRef(), progress.get())); + return Optional.of(new TableToSweep(progress.get().tableRef(), progress)); } else { Optional nextTable = nextTableToSweepProvider.chooseNextTableToSweep( tx, specificTableSweeper.getSweepRunner().getConservativeSweepTimestamp()); if (nextTable.isPresent()) { - log.info("Now starting to sweep next table: {}.", - LoggingArgs.tableRef("table name", nextTable.get())); - return Optional.of(new TableToSweep(nextTable.get(), null)); + return Optional.of(new TableToSweep(nextTable.get(), Optional.empty())); } else { return Optional.empty(); } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SpecificTableSweeper.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SpecificTableSweeper.java index c3ff9f0ae96..149ea2873f6 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SpecificTableSweeper.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SpecificTableSweeper.java @@ -15,7 +15,6 @@ */ package com.palantir.atlasdb.sweep; -import java.util.Optional; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -123,13 +122,21 @@ public SweepMetrics getSweepMetrics() { return sweepMetrics; } - void runOnceForTable(TableToSweep tableToSweep, - Optional newSweepBatchConfig, - boolean saveSweepResults) { - Stopwatch watch = Stopwatch.createStarted(); + void runOnceAndSaveResults(TableToSweep tableToSweep) { TableReference tableRef = tableToSweep.getTableRef(); byte[] startRow = tableToSweep.getStartRow(); - SweepBatchConfig batchConfig = newSweepBatchConfig.orElse(getAdjustedBatchConfig()); + SweepBatchConfig batchConfig = getAdjustedBatchConfig(); + + SweepResults results = runOneIteration(tableRef, startRow, batchConfig); + saveSweepResults(tableToSweep, results); + } + + SweepResults runOneIteration( + TableReference tableRef, + byte[] startRow, + SweepBatchConfig batchConfig) { + + Stopwatch watch = Stopwatch.createStarted(); try { SweepResults results = sweepRunner.run( tableRef, @@ -154,9 +161,7 @@ void runOnceForTable(TableToSweep tableToSweep, .tableName(tableRef.getQualifiedName()) .elapsedMillis(elapsedMillis) .build()); - if (saveSweepResults) { - saveSweepResults(tableToSweep, results); - } + return results; } catch (RuntimeException e) { // This error may be logged on some paths above, but I prefer to log defensively. log.info("Failed to sweep table {}" @@ -174,14 +179,9 @@ void runOnceForTable(TableToSweep tableToSweep, } } - private SweepBatchConfig getAdjustedBatchConfig() { + public SweepBatchConfig getAdjustedBatchConfig() { SweepBatchConfig baseConfig = sweepBatchConfig.get(); - return ImmutableSweepBatchConfig.builder() - .maxCellTsPairsToExamine( - BackgroundSweeperImpl.adjustBatchParameter(baseConfig.maxCellTsPairsToExamine())) - .candidateBatchSize(BackgroundSweeperImpl.adjustBatchParameter(baseConfig.candidateBatchSize())) - .deleteBatchSize(BackgroundSweeperImpl.adjustBatchParameter(baseConfig.deleteBatchSize())) - .build(); + return baseConfig.adjust(BackgroundSweeperImpl.batchSizeMultiplier); } private static String startRowToHex(@Nullable byte[] row) { diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SweepBatchConfig.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SweepBatchConfig.java index b8b8bc929d4..a9d455a4453 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SweepBatchConfig.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SweepBatchConfig.java @@ -44,4 +44,16 @@ default void check() { Preconditions.checkState(candidateBatchSize() > 0, "Candidate batch size must be greater than zero"); Preconditions.checkState(deleteBatchSize() > 0, "Delete batch size must be greater than zero"); } + + default SweepBatchConfig adjust(double multiplier) { + return ImmutableSweepBatchConfig.builder() + .maxCellTsPairsToExamine(adjust(maxCellTsPairsToExamine(), multiplier)) + .candidateBatchSize(adjust(candidateBatchSize(), multiplier)) + .deleteBatchSize(adjust(deleteBatchSize(), multiplier)) + .build(); + } + + default int adjust(int parameterValue, double multiplier) { + return Math.max(1, (int) (multiplier * parameterValue)); + } } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SweepTableResponse.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SweepTableResponse.java new file mode 100644 index 00000000000..f9dfaeb33c1 --- /dev/null +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SweepTableResponse.java @@ -0,0 +1,47 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.sweep; + +import java.util.Optional; + +import org.immutables.value.Value; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.palantir.atlasdb.encoding.PtBytes; +import com.palantir.atlasdb.keyvalue.api.SweepResults; + +@Value.Immutable +@JsonSerialize(as = ImmutableSweepTableResponse.class) +@JsonDeserialize(as = ImmutableSweepTableResponse.class) +public interface SweepTableResponse { + + Optional nextStartRow(); + + long numCellTsPairsExamined(); + + long staleValuesDeleted(); + + static SweepTableResponse from(SweepResults results) { + return ImmutableSweepTableResponse.builder() + .numCellTsPairsExamined(results.getCellTsPairsExamined()) + .staleValuesDeleted(results.getStaleValuesDeleted()) + .nextStartRow(results.getNextStartRow().map(PtBytes::encodeHexString)) + .build(); + } + +} diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SweepTaskRunner.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SweepTaskRunner.java index 0a53d52ddc2..b9c2f84cd7e 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SweepTaskRunner.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SweepTaskRunner.java @@ -30,6 +30,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import com.palantir.atlasdb.AtlasDbConstants; +import com.palantir.atlasdb.encoding.PtBytes; import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweeping; import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweepingRequest; import com.palantir.atlasdb.keyvalue.api.Cell; @@ -37,6 +38,7 @@ import com.palantir.atlasdb.keyvalue.api.KeyValueService; import com.palantir.atlasdb.keyvalue.api.SweepResults; import com.palantir.atlasdb.keyvalue.api.TableReference; +import com.palantir.atlasdb.logging.LoggingArgs; import com.palantir.atlasdb.protos.generated.TableMetadataPersistence.SweepStrategy; import com.palantir.atlasdb.sweep.CellsToSweepPartitioningIterator.ExaminedCellLimit; import com.palantir.atlasdb.transaction.impl.SweepStrategyManager; @@ -124,7 +126,7 @@ private SweepResults runInternal( } if (keyValueService.getMetadataForTable(tableRef).length == 0) { log.warn("The sweeper tried to sweep table '{}', but the table does not exist. Skipping table.", - UnsafeArg.of("table name", tableRef)); + LoggingArgs.tableRef("tableRef", tableRef)); return SweepResults.createEmptySweepResult(); } SweepStrategy sweepStrategy = sweepStrategyManager.get().getOrDefault(tableRef, SweepStrategy.CONSERVATIVE); @@ -140,6 +142,10 @@ private SweepResults doRun(TableReference tableRef, byte[] startRow, RunType runType, Sweeper sweeper) { + log.info("Beginning iteration of sweep for table {} starting at row {}", + LoggingArgs.tableRef(tableRef), + UnsafeArg.of("startRow", PtBytes.encodeHexString(startRow))); + // Earliest start timestamp of any currently open transaction, with two caveats: // (1) unreadableTimestamps are calculated via wall-clock time, and so may not be correct // under pathological clock conditions diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SweeperService.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SweeperService.java index d4d97644192..8a915fde53f 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SweeperService.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SweeperService.java @@ -15,47 +15,53 @@ */ package com.palantir.atlasdb.sweep; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; +import java.util.Optional; + +import javax.ws.rs.Consumes; +import javax.ws.rs.DefaultValue; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; +import com.palantir.logsafe.Safe; + /** * Provides endpoints for sweeping a specific table. */ @Path("/sweep") +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) public interface SweeperService { - /** - * Sweep a particular table from EMPTY startRow with default {@link SweepBatchConfig}. - */ - @POST - @Path("sweep-table") - @Produces(MediaType.APPLICATION_JSON) - void sweepTable(@QueryParam("tablename") String tableName); - /** - * Sweep a particular table from specified startRow with default {@link SweepBatchConfig}. - */ - @POST - @Path("sweep-table-from-row") - @Produces(MediaType.APPLICATION_JSON) - void sweepTableFromStartRow( - @QueryParam("tablename") String tableName, - @Nonnull @QueryParam("startRow") String startRow); + default SweepTableResponse sweepTableFully(String tableName) { + return sweepTable(tableName, Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty(), Optional.empty()); + } + + default SweepTableResponse sweepTableFrom(String tableName, String startRow) { + return sweepTable(tableName, Optional.of(startRow), Optional.empty(), + Optional.empty(), Optional.empty(), Optional.empty()); + } /** - * Sweep a particular table from specified startRow with specified {@link SweepBatchConfig} parameters. + * Sweeps a particular table. + * + * @param tableName the table to sweep, in the format namespace.table_name (e.g. myapp.users) + * @param startRow (Optional) the row to start from, encoded as a hex string (e.g. 0x12345abcde) + * @param fullSweep (Optional; default true) whether to sweep the full table; if false just runs one batch + * @param maxCellTsPairsToExamine (Optional) see {@link SweepBatchConfig#maxCellTsPairsToExamine()} + * @param candidateBatchSize (Optional) see {@link SweepBatchConfig#candidateBatchSize()} + * @param deleteBatchSize (Optional) see {@link SweepBatchConfig#deleteBatchSize()} */ @POST - @Path("sweep-table-from-row-with-batch") - @Produces(MediaType.APPLICATION_JSON) - void sweepTableFromStartRowWithBatchConfig( + @Path("sweep-table") + SweepTableResponse sweepTable( @QueryParam("tablename") String tableName, - @Nullable @QueryParam("startRow") String startRow, - @Nullable @QueryParam("maxCellTsPairsToExamine") Integer maxCellTsPairsToExamine, - @Nullable @QueryParam("candidateBatchSize") Integer candidateBatchSize, - @Nullable @QueryParam("deleteBatchSize") Integer deleteBatchSize); + @QueryParam("startRow") Optional startRow, + @Safe @QueryParam("fullSweep") @DefaultValue("true") Optional fullSweep, + @Safe @QueryParam("maxCellTsPairsToExamine") Optional maxCellTsPairsToExamine, + @Safe @QueryParam("candidateBatchSize") Optional candidateBatchSize, + @Safe @QueryParam("deleteBatchSize") Optional deleteBatchSize); } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SweeperServiceImpl.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SweeperServiceImpl.java index 42ee0820445..eeb9fa5a345 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SweeperServiceImpl.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/SweeperServiceImpl.java @@ -17,16 +17,10 @@ import java.util.Optional; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - import com.google.common.base.Preconditions; -import com.google.common.io.BaseEncoding; -import com.palantir.atlasdb.AtlasDbConstants; import com.palantir.atlasdb.encoding.PtBytes; +import com.palantir.atlasdb.keyvalue.api.SweepResults; import com.palantir.atlasdb.keyvalue.api.TableReference; -import com.palantir.atlasdb.sweep.progress.ImmutableSweepProgress; -import com.palantir.atlasdb.sweep.progress.SweepProgress; import com.palantir.remoting3.servers.jersey.WebPreconditions; public final class SweeperServiceImpl implements SweeperService { @@ -37,52 +31,45 @@ public SweeperServiceImpl(SpecificTableSweeper specificTableSweeper) { } @Override - public void sweepTable(String tableName) { + public SweepTableResponse sweepTable( + String tableName, + Optional startRow, + Optional fullSweep, + Optional maxCellTsPairsToExamine, + Optional candidateBatchSize, + Optional deleteBatchSize) { TableReference tableRef = getTableRef(tableName); checkTableExists(tableName, tableRef); - runSweepWithoutSavingResults(tableRef); + byte[] decodedStartRow = startRow.map(PtBytes::decodeHexString).orElse(PtBytes.EMPTY_BYTE_ARRAY); + SweepBatchConfig config = buildConfigWithOverrides(maxCellTsPairsToExamine, candidateBatchSize, + deleteBatchSize); + + SweepResults sweepResults = fullSweep.orElse(true) + ? runFullSweepWithoutSavingResults( + tableRef, + decodedStartRow, + config) + : runOneBatchWithoutSavingResults( + tableRef, + decodedStartRow, + config); + + return SweepTableResponse.from(sweepResults); } - @Override - public void sweepTableFromStartRow(String tableName, @Nonnull String startRow) { - WebPreconditions.checkArgument(startRow != null, "startRow must not be null."); - - TableReference tableRef = getTableRef(tableName); - checkTableExists(tableName, tableRef); + private SweepBatchConfig buildConfigWithOverrides( + Optional maxCellTsPairsToExamine, + Optional candidateBatchSize, + Optional deleteBatchSize) { + ImmutableSweepBatchConfig.Builder batchConfigBuilder = ImmutableSweepBatchConfig.builder() + .from(specificTableSweeper.getAdjustedBatchConfig()); - ImmutableSweepProgress sweepProgress = getSweepProgress(startRow, tableRef); + maxCellTsPairsToExamine.ifPresent(batchConfigBuilder::maxCellTsPairsToExamine); + candidateBatchSize.ifPresent(batchConfigBuilder::candidateBatchSize); + deleteBatchSize.ifPresent(batchConfigBuilder::deleteBatchSize); - runSweepWithoutSavingResults(tableRef, sweepProgress); - } - - @Override - public void sweepTableFromStartRowWithBatchConfig(String tableName, - @Nullable String startRow, - @Nullable Integer maxCellTsPairsToExamine, - @Nullable Integer candidateBatchSize, - @Nullable Integer deleteBatchSize) { - TableReference tableRef = getTableRef(tableName); - checkTableExists(tableName, tableRef); - - ImmutableSweepProgress sweepProgress = getSweepProgress(startRow, tableRef); - - WebPreconditions.checkArgument( - !(maxCellTsPairsToExamine == null && candidateBatchSize == null && deleteBatchSize == null), - "No batch size config parameters were provided"); - - ImmutableSweepBatchConfig sweepBatchConfig = ImmutableSweepBatchConfig.builder() - .maxCellTsPairsToExamine( - maxCellTsPairsToExamine == null - ? AtlasDbConstants.DEFAULT_SWEEP_READ_LIMIT : maxCellTsPairsToExamine) - .candidateBatchSize( - candidateBatchSize == null - ? AtlasDbConstants.DEFAULT_SWEEP_CANDIDATE_BATCH_HINT : candidateBatchSize) - .deleteBatchSize(deleteBatchSize == null - ? AtlasDbConstants.DEFAULT_SWEEP_DELETE_BATCH_HINT : deleteBatchSize) - .build(); - - runSweepWithoutSavingResults(tableRef, sweepProgress, Optional.of(sweepBatchConfig)); + return batchConfigBuilder.build(); } private TableReference getTableRef(String tableName) { @@ -96,41 +83,34 @@ private void checkTableExists(String tableName, TableReference tableRef) { String.format("Table requested to sweep %s does not exist", tableName)); } - private ImmutableSweepProgress getSweepProgress(String startRow, TableReference tableRef) { - return ImmutableSweepProgress.builder() - .tableRef(tableRef) - .staleValuesDeleted(0) - .cellTsPairsExamined(0) - .minimumSweptTimestamp(0) - .startRow(decodeStartRow(startRow)) - .build(); - } - - private byte[] decodeStartRow(String startRow) { - if (startRow == null) { - return PtBytes.EMPTY_BYTE_ARRAY; + private SweepResults runFullSweepWithoutSavingResults( + TableReference tableRef, + byte[] startRow, + SweepBatchConfig sweepBatchConfig) { + SweepResults cumulativeResults = SweepResults.createEmptySweepResult( + Optional.of(startRow)); + + while (cumulativeResults.getNextStartRow().isPresent()) { + SweepResults results = runOneBatchWithoutSavingResults( + tableRef, + cumulativeResults.getNextStartRow().get(), + sweepBatchConfig); + + cumulativeResults = cumulativeResults.accumulateWith(results); } - return BaseEncoding.base16().decode(startRow.toUpperCase()); - } - private void runSweepWithoutSavingResults(TableReference tableRef) { - runSweepWithoutSavingResults(tableRef, null); + return cumulativeResults; } - private void runSweepWithoutSavingResults(TableReference tableRef, SweepProgress sweepProgress) { - runSweepWithoutSavingResults(tableRef, sweepProgress, Optional.empty()); - } - - private void runSweepWithoutSavingResults( + private SweepResults runOneBatchWithoutSavingResults( TableReference tableRef, - SweepProgress sweepProgress, - Optional sweepBatchConfig) { - TableToSweep tableToSweep = getTableToSweep(tableRef, sweepProgress); - specificTableSweeper.runOnceForTable(tableToSweep, sweepBatchConfig, false); + byte[] startRow, + SweepBatchConfig sweepBatchConfig) { + return specificTableSweeper.runOneIteration( + tableRef, + startRow, + sweepBatchConfig); } - private TableToSweep getTableToSweep(TableReference tableRef, SweepProgress sweepProgress) { - return new TableToSweep(tableRef, sweepProgress); - } } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/TableToSweep.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/TableToSweep.java index d3f7004065e..40dca276f13 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/TableToSweep.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/sweep/TableToSweep.java @@ -16,9 +16,7 @@ package com.palantir.atlasdb.sweep; -import java.util.OptionalLong; - -import javax.annotation.Nullable; +import java.util.Optional; import com.palantir.atlasdb.encoding.PtBytes; import com.palantir.atlasdb.keyvalue.api.TableReference; @@ -26,10 +24,9 @@ public final class TableToSweep { private final TableReference tableRef; - @Nullable - private final SweepProgress progress; + private final Optional progress; - TableToSweep(TableReference tableRef, SweepProgress progress) { + TableToSweep(TableReference tableRef, Optional progress) { this.tableRef = tableRef; this.progress = progress; } @@ -39,22 +36,22 @@ TableReference getTableRef() { } boolean hasPreviousProgress() { - return progress != null; + return progress.isPresent(); } long getStaleValuesDeletedPreviously() { - return progress == null ? 0L : progress.staleValuesDeleted(); + return progress.map(SweepProgress::staleValuesDeleted).orElse(0L); } long getCellsExaminedPreviously() { - return progress == null ? 0L : progress.cellTsPairsExamined(); + return progress.map(SweepProgress::cellTsPairsExamined).orElse(0L); } - OptionalLong getPreviousMinimumSweptTimestamp() { - return progress == null ? OptionalLong.empty() : OptionalLong.of(progress.minimumSweptTimestamp()); + Optional getPreviousMinimumSweptTimestamp() { + return progress.map(SweepProgress::minimumSweptTimestamp); } byte[] getStartRow() { - return progress == null ? PtBytes.EMPTY_BYTE_ARRAY : progress.startRow(); + return progress.map(SweepProgress::startRow).orElse(PtBytes.EMPTY_BYTE_ARRAY); } } diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/SweeperServiceImplTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/SweeperServiceImplTest.java index 4b7eb379400..80330f6e2c4 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/SweeperServiceImplTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/SweeperServiceImplTest.java @@ -16,26 +16,36 @@ package com.palantir.atlasdb.sweep; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import java.util.List; +import java.util.Optional; + import javax.ws.rs.core.Response; +import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.mockito.Mockito; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.io.BaseEncoding; +import com.palantir.atlasdb.encoding.PtBytes; import com.palantir.atlasdb.keyvalue.api.SweepResults; import com.palantir.atlasdb.persistentlock.CheckAndSetExceptionMapper; +import com.palantir.atlasdb.util.DropwizardClientRule; import com.palantir.atlasdb.util.TestJaxRsClientFactory; import com.palantir.remoting.api.errors.RemoteException; import com.palantir.remoting3.servers.jersey.HttpRemotingJerseyFeature; -import io.dropwizard.testing.junit.DropwizardClientRule; - public class SweeperServiceImplTest extends SweeperTestSetup { private static final String VALID_START_ROW = "0102030A"; @@ -44,6 +54,10 @@ public class SweeperServiceImplTest extends SweeperTestSetup { private static final String INVALID_START_ROW = "xyz"; SweeperService sweeperService; + private static final SweepResults RESULTS_WITH_NO_MORE_TO_SWEEP = SweepResults.createEmptySweepResult(); + private static final SweepResults RESULTS_WITH_MORE_TO_SWEEP = SweepResults.createEmptySweepResult( + Optional.of(new byte[] {0x55})); + @Rule public DropwizardClientRule dropwizardClientRule = new DropwizardClientRule( new SweeperServiceImpl(getSpecificTableSweeperService()), @@ -59,109 +73,117 @@ public void setup() { SweeperService.class, SweeperServiceImplTest.class, dropwizardClientRule.baseUri().toString()); + + setupTaskRunner(RESULTS_WITH_NO_MORE_TO_SWEEP); + when(kvs.getAllTableNames()).thenReturn(ImmutableSet.of(TABLE_REF)); + } + + @After + public void after() { + verifyNoSweepResultsSaved(); } @Test public void sweepingNonFullyTableShouldNotBeSuccessful() { assertThatExceptionOfType(RemoteException.class) - .isThrownBy(() -> sweeperService.sweepTable("non_fully_qualified_name")) + .isThrownBy(() -> sweeperService.sweepTableFully("non_fully_qualified_name")) .matches(ex -> ex.getStatus() == Response.Status.BAD_REQUEST.getStatusCode()); } - @Test public void sweepingNonExistingTableShouldNotBeSuccessful() { assertThatExceptionOfType(RemoteException.class) - .isThrownBy(() -> sweeperService.sweepTable("ns.non_existing_table")) + .isThrownBy(() -> sweeperService.sweepTableFully("ns.non_existing_table")) .matches(ex -> ex.getStatus() == Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); } - @Test - public void sweepTableFromStartRowWithStartRowNullShouldThrow() { - when(kvs.getAllTableNames()).thenReturn(ImmutableSet.of(TABLE_REF)); - - assertThatExceptionOfType(RemoteException.class) - .isThrownBy(() -> sweeperService.sweepTableFromStartRow(TABLE_REF.getQualifiedName(), null)) - .matches(ex -> ex.getStatus() == Response.Status.BAD_REQUEST.getStatusCode()); - } - @Test public void sweepTableFromStartRowWithValidStartRowShouldBeSuccessful() { - setupTaskRunner(Mockito.mock(SweepResults.class)); - - when(kvs.getAllTableNames()).thenReturn(ImmutableSet.of(TABLE_REF)); - - sweeperService.sweepTableFromStartRow(TABLE_REF.getQualifiedName(), VALID_START_ROW); + sweeperService.sweepTableFrom(TABLE_REF.getQualifiedName(), VALID_START_ROW); } @Test public void sweepTableFromStartRowShouldAcceptLowercaseBase16Encodings() { - setupTaskRunner(Mockito.mock(SweepResults.class)); - when(kvs.getAllTableNames()).thenReturn(ImmutableSet.of(TABLE_REF)); - sweeperService.sweepTableFromStartRow(TABLE_REF.getQualifiedName(), LOWERCASE_BUT_VALID_START_ROW); + sweeperService.sweepTableFrom(TABLE_REF.getQualifiedName(), LOWERCASE_BUT_VALID_START_ROW); } @Test public void sweepTableFromStartRowShouldAcceptMixedCaseBase16Encodings() { - setupTaskRunner(Mockito.mock(SweepResults.class)); - when(kvs.getAllTableNames()).thenReturn(ImmutableSet.of(TABLE_REF)); - sweeperService.sweepTableFromStartRow(TABLE_REF.getQualifiedName(), MIXED_CASE_START_ROW); + sweeperService.sweepTableFrom(TABLE_REF.getQualifiedName(), MIXED_CASE_START_ROW); } @Test public void sweepTableFromStartRowWithInValidStartRowShouldThrow() { - when(kvs.getAllTableNames()).thenReturn(ImmutableSet.of(TABLE_REF)); - assertThatExceptionOfType(RemoteException.class) .isThrownBy(() -> - sweeperService.sweepTableFromStartRow(TABLE_REF.getQualifiedName(), INVALID_START_ROW)) - .matches(ex -> ex.getStatus() == Response.Status.BAD_REQUEST.getStatusCode()); - } - - @Test - public void sweepTableFromStartRowWithBatchConfigWithNullBatchConfigShouldThrow() { - setupTaskRunner(Mockito.mock(SweepResults.class)); - - when(kvs.getAllTableNames()).thenReturn(ImmutableSet.of(TABLE_REF)); - - assertThatExceptionOfType(RemoteException.class) - .isThrownBy(() -> sweeperService.sweepTableFromStartRowWithBatchConfig(TABLE_REF.getQualifiedName(), - encodeStartRow(new byte[] {1, 2, 3}), null, null, null)) + sweeperService.sweepTableFrom(TABLE_REF.getQualifiedName(), INVALID_START_ROW)) .matches(ex -> ex.getStatus() == Response.Status.BAD_REQUEST.getStatusCode()); } @Test public void sweepTableFromStartRowWithBatchConfigWithNullStartRowShouldBeSuccessful() { - setupTaskRunner(Mockito.mock(SweepResults.class)); - - when(kvs.getAllTableNames()).thenReturn(ImmutableSet.of(TABLE_REF)); - sweeperService.sweepTableFromStartRowWithBatchConfig(TABLE_REF.getQualifiedName(), null, 1000, 1000, 500); + sweeperService.sweepTable(TABLE_REF.getQualifiedName(), Optional.empty(), Optional.empty(), Optional.of(1000), + Optional.of(1000), Optional.of(500)); } @Test public void sweepTableFromStartRowWithBatchConfigWithExactlyOneNonNullBatchConfigShouldBeSuccessful() { - setupTaskRunner(Mockito.mock(SweepResults.class)); - - when(kvs.getAllTableNames()).thenReturn(ImmutableSet.of(TABLE_REF)); - sweeperService.sweepTableFromStartRowWithBatchConfig(TABLE_REF.getQualifiedName(), - encodeStartRow(new byte[] {1, 2, 3}), null, 10, null); + sweeperService.sweepTable(TABLE_REF.getQualifiedName(), + Optional.of(encodeStartRow(new byte[] {1, 2, 3})), Optional.empty(), Optional.of(10), Optional.empty(), + Optional.empty()); } @Test public void testWriteProgressOrPriorityOrMetricsNotUpdatedAfterSweepRunsSuccessfully() { - setupTaskRunner(Mockito.mock(SweepResults.class)); - when(kvs.getAllTableNames()).thenReturn(ImmutableSet.of(TABLE_REF)); - sweeperService.sweepTableFromStartRow(TABLE_REF.getQualifiedName(), encodeStartRow(new byte[] {1, 2, 3})); - Mockito.verify(priorityStore, never()).update(Mockito.any(), Mockito.any(), Mockito.any()); - Mockito.verify(progressStore, never()).saveProgress(Mockito.any(), Mockito.any()); + sweeperService.sweepTableFrom(TABLE_REF.getQualifiedName(), encodeStartRow(new byte[] {1, 2, 3})); Mockito.verifyZeroInteractions(sweepMetrics); } + @Test + public void sweepsEntireTableByDefault() { + List startRows = ImmutableList.of( + PtBytes.EMPTY_BYTE_ARRAY, + new byte[] {0x10}, + new byte[] {0x50}); + + for (int i = 0; i < startRows.size(); i++) { + byte[] currentRow = startRows.get(i); + Optional nextRow = (i + 1) == startRows.size() + ? Optional.empty() + : Optional.of(startRows.get(i + 1)); + + SweepResults results = SweepResults.createEmptySweepResult(nextRow); + when(sweepTaskRunner.run(any(), any(), eq(currentRow))).thenReturn(results); + } + + sweeperService.sweepTableFully(TABLE_REF.getQualifiedName()); + + startRows.forEach(row -> verify(sweepTaskRunner).run(any(), any(), eq(row))); + verifyNoMoreInteractions(sweepTaskRunner); + } + + @Test + public void runsOneIterationIfRequested() { + setupTaskRunner(RESULTS_WITH_MORE_TO_SWEEP); + + sweeperService.sweepTable(TABLE_REF.getQualifiedName(), Optional.empty(), Optional.of(false), + Optional.empty(), Optional.empty(), Optional.empty()); + + verify(sweepTaskRunner, times(1)).run(any(), any(), any()); + verifyNoMoreInteractions(sweepTaskRunner); + } + private String encodeStartRow(byte[] rowBytes) { return BaseEncoding.base16().encode(rowBytes); } + + private void verifyNoSweepResultsSaved() { + verify(progressStore, never()).saveProgress(any(), any()); + verify(priorityStore, never()).update(any(), any(), any()); + } + } diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/SweeperTestSetup.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/SweeperTestSetup.java index 332e14c4604..95d4fe9b525 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/SweeperTestSetup.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/sweep/SweeperTestSetup.java @@ -45,7 +45,7 @@ public class SweeperTestSetup { protected SweepProgressStore progressStore = Mockito.mock(SweepProgressStore.class); protected SweepPriorityStore priorityStore = Mockito.mock(SweepPriorityStore.class); private NextTableToSweepProvider nextTableToSweepProvider = Mockito.mock(NextTableToSweepProvider.class); - private SweepTaskRunner sweepTaskRunner = Mockito.mock(SweepTaskRunner.class); + protected SweepTaskRunner sweepTaskRunner = Mockito.mock(SweepTaskRunner.class); private boolean sweepEnabled = true; protected SweepMetrics sweepMetrics = Mockito.mock(SweepMetrics.class); protected long currentTimeMillis = 1000200300L; diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/util/DropwizardClientRule.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/util/DropwizardClientRule.java new file mode 100644 index 00000000000..0392f8debd7 --- /dev/null +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/util/DropwizardClientRule.java @@ -0,0 +1,97 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.util; + +import java.net.URI; + +import org.junit.rules.ExternalResource; + +import com.codahale.metrics.health.HealthCheck; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; + +import io.dropwizard.Application; +import io.dropwizard.Configuration; +import io.dropwizard.jetty.HttpConnectorFactory; +import io.dropwizard.server.SimpleServerFactory; +import io.dropwizard.setup.Bootstrap; +import io.dropwizard.setup.Environment; +import io.dropwizard.testing.DropwizardTestSupport; + +/** + * Copied from {@link io.dropwizard.testing.junit.DropwizardClientRule} so that we can configure the ObjectMapper. + */ +public class DropwizardClientRule extends ExternalResource { + private final Object[] resources; + private final DropwizardTestSupport testSupport; + + public DropwizardClientRule(Object... resources) { + testSupport = new DropwizardTestSupport(FakeApplication.class, "") { + @Override + public Application newApplication() { + return new FakeApplication(); + } + }; + this.resources = resources; + } + + public URI baseUri() { + return URI.create("http://localhost:" + testSupport.getLocalPort() + "/application"); + } + + @Override + protected void before() throws Throwable { + testSupport.before(); + } + + @Override + protected void after() { + testSupport.after(); + } + + private static class DummyHealthCheck extends HealthCheck { + @Override + protected HealthCheck.Result check() { + return Result.healthy(); + } + } + + private class FakeApplication extends Application { + + @Override + public void initialize(Bootstrap bootstrap) { + bootstrap.getObjectMapper().registerModule(new Jdk8Module()); + } + + @Override + public void run(Configuration configuration, Environment environment) { + final SimpleServerFactory serverConfig = new SimpleServerFactory(); + configuration.setServerFactory(serverConfig); + final HttpConnectorFactory connectorConfig = (HttpConnectorFactory) serverConfig.getConnector(); + connectorConfig.setPort(0); + + environment.healthChecks().register("dummy", new DummyHealthCheck()); + + for (Object resource : resources) { + if (resource instanceof Class) { + environment.jersey().register((Class) resource); + } else { + environment.jersey().register(resource); + } + } + } + } +} diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/SweeperServiceImplIntegrationTest.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/SweeperServiceImplIntegrationTest.java index 99160b12fac..5dc1f126eaa 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/SweeperServiceImplIntegrationTest.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/sweep/SweeperServiceImplIntegrationTest.java @@ -41,7 +41,7 @@ public void smokeTest() throws Exception { putManyCells(TABLE_1, 103, 113); putManyCells(TABLE_1, 105, 115); sweepTimestamp.set(150); - sweeperService.sweepTable(TABLE_1.getQualifiedName()); + sweeperService.sweepTableFully(TABLE_1.getQualifiedName()); verifyTableSwept(TABLE_1, 75, true); } diff --git a/docs/source/cluster_management/sweep/sweep-logs.rst b/docs/source/cluster_management/sweep/sweep-logs.rst index 3f6180f5672..bbd154f0273 100644 --- a/docs/source/cluster_management/sweep/sweep-logs.rst +++ b/docs/source/cluster_management/sweep/sweep-logs.rst @@ -18,13 +18,9 @@ In order to know if sweep is working, or it's current progress, look for the fol Logged when the service has started, to indicate that background sweeper thread is running. -- ``Now starting to sweep next table: {table name}.`` +- ``Beginning iteration of sweep for table {} starting at row {}`` -Logged when a new table has been selected to be swept, after 1. - -- ``Sweeping another batch of table: {table name}. Batch starts on row {start row}`` - -Logged when a new batch of the same table is going to be swept, after 5. +Logged before we begin an iteration of sweep, after 1 or 5. (note that this could be triggered by background sweep, the sweep CLI, or the ``SweeperService`` endpoints) - ``Analyzed {number of values read} cell+timestamp pairs from table {table name} ...`` diff --git a/docs/source/release_notes/release-notes.rst b/docs/source/release_notes/release-notes.rst index a62ef5aa8f0..b9591de8a77 100644 --- a/docs/source/release_notes/release-notes.rst +++ b/docs/source/release_notes/release-notes.rst @@ -47,6 +47,10 @@ develop * - |improved| - Sweep is now more efficient on Cassandra, Postgres and Oracle. (`Pull Request `__) + * - |improved| + - The ``SweeperService`` endpoint registered on all clients will now sweeps the full table by default, rather than a single batch. + It also now returns information about how much data was swept. + (`Pull Request `__) * - |fixed| - Sweep candidate batches are now logged correctly.