Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
sweep full table (#2409)
Browse files Browse the repository at this point in the history
* sweep full table

* fix tests

* logsafe

* release notes

* pr comments

* docs

* unnecessary method call

* checkstyle

* fix build

* fix build

* javadocs

* Random logging bits
  • Loading branch information
nziebart authored and fsamuel-bs committed Oct 17, 2017
1 parent 259290b commit 13a2b81
Show file tree
Hide file tree
Showing 16 changed files with 392 additions and 217 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import java.nio.charset.StandardCharsets;

import javax.annotation.Nullable;

import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.cache.Cache;
Expand Down Expand Up @@ -120,13 +122,20 @@ public static String toString(final byte[] arr, int off, int len) {
return new String(arr, off, len, StandardCharsets.UTF_8);
}

public static String encodeHexString(byte[] name) {
public static String encodeHexString(@Nullable byte[] name) {
if (name == null) {
return "";
}
return BaseEncoding.base16().lowerCase().encode(name);
}

public static byte[] decodeHexString(@Nullable String hexString) {
if (hexString == null) {
return PtBytes.EMPTY_BYTE_ARRAY;
}
return BaseEncoding.base16().lowerCase().decode(hexString.toLowerCase());
}

public static final Function<byte[], String> BYTES_TO_HEX_STRING = PtBytes::encodeHexString;

public static void addIfNotEmpty(MoreObjects.ToStringHelper helper, String name, byte[] bytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,34 @@ public Optional<byte[]> getNextStartRow() {

public abstract long getSweptTimestamp();

/**
* Returns a new {@link SweepResults} representing cumulative results from this instance and {@code other}. Assumes
* that {@code other} represents results from subsequent iteration of sweep (i.e., it happened after the run that
* produced this instance).
*/
public SweepResults accumulateWith(SweepResults other) {
return SweepResults.builder()
.cellTsPairsExamined(getCellTsPairsExamined() + other.getCellTsPairsExamined())
.staleValuesDeleted(getStaleValuesDeleted() + other.getStaleValuesDeleted())
.sweptTimestamp(other.getSweptTimestamp())
.nextStartRow(other.getNextStartRow())
.build();
}

public static ImmutableSweepResults.Builder builder() {
return ImmutableSweepResults.builder();
}

public static SweepResults createEmptySweepResult() {
return createEmptySweepResult(Optional.empty());
}

public static SweepResults createEmptySweepResult(Optional<byte[]> startRow) {
return builder()
.cellTsPairsExamined(0)
.staleValuesDeleted(0)
.sweptTimestamp(0)
.nextStartRow(startRow)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.google.common.base.Supplier;
import com.palantir.atlasdb.keyvalue.api.InsufficientConsistencyException;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.logging.LoggingArgs;
import com.palantir.atlasdb.sweep.priority.NextTableToSweepProvider;
import com.palantir.atlasdb.sweep.priority.NextTableToSweepProviderImpl;
import com.palantir.atlasdb.sweep.progress.SweepProgress;
Expand All @@ -35,7 +34,6 @@
import com.palantir.common.base.Throwables;
import com.palantir.lock.LockService;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;

public final class BackgroundSweeperImpl implements BackgroundSweeper {
private static final Logger log = LoggerFactory.getLogger(BackgroundSweeperImpl.class);
Expand Down Expand Up @@ -147,7 +145,7 @@ private long grabLocksAndRun(SweepLocks locks) throws InterruptedException {
if (checkAndRepairTableDrop()) {
log.info("The table being swept by the background sweeper was dropped, moving on...");
} else {
SweepBatchConfig lastBatchConfig = getAdjustedBatchConfig();
SweepBatchConfig lastBatchConfig = specificTableSweeper.getAdjustedBatchConfig();
log.warn("The background sweep job failed unexpectedly with candidate batch size {},"
+ " delete batch size {},"
+ " and {} cell+timestamp pairs to examine."
Expand Down Expand Up @@ -181,24 +179,11 @@ boolean runOnce() {
log.debug("Skipping sweep because no table has enough new writes to be worth sweeping at the moment.");
return false;
} else {
specificTableSweeper.runOnceForTable(tableToSweep.get(), Optional.empty(), true);
specificTableSweeper.runOnceAndSaveResults(tableToSweep.get());
return true;
}
}

private SweepBatchConfig getAdjustedBatchConfig() {
SweepBatchConfig baseConfig = specificTableSweeper.getSweepBatchConfig().get();
return ImmutableSweepBatchConfig.builder()
.maxCellTsPairsToExamine(adjustBatchParameter(baseConfig.maxCellTsPairsToExamine()))
.candidateBatchSize(adjustBatchParameter(baseConfig.candidateBatchSize()))
.deleteBatchSize(adjustBatchParameter(baseConfig.deleteBatchSize()))
.build();
}

static int adjustBatchParameter(int parameterValue) {
return Math.max(1, (int) (batchSizeMultiplier * parameterValue));
}

// there's a bug in older jdk8s around type inference here, don't make the same mistake two of us made
// and try to lambda refactor this unless you live far enough in the future that this isn't an issue
private Optional<TableToSweep> getTableToSweep() {
Expand All @@ -209,17 +194,12 @@ public Optional<TableToSweep> execute(Transaction tx) {
Optional<SweepProgress> progress = specificTableSweeper.getSweepProgressStore().loadProgress(
tx);
if (progress.isPresent()) {
log.info("Sweeping another batch of table: {}. Batch starts on row {}",
LoggingArgs.tableRef("table name", progress.get().tableRef()),
UnsafeArg.of("startRow", progress.get().startRow()));
return Optional.of(new TableToSweep(progress.get().tableRef(), progress.get()));
return Optional.of(new TableToSweep(progress.get().tableRef(), progress));
} else {
Optional<TableReference> nextTable = nextTableToSweepProvider.chooseNextTableToSweep(
tx, specificTableSweeper.getSweepRunner().getConservativeSweepTimestamp());
if (nextTable.isPresent()) {
log.info("Now starting to sweep next table: {}.",
LoggingArgs.tableRef("table name", nextTable.get()));
return Optional.of(new TableToSweep(nextTable.get(), null));
return Optional.of(new TableToSweep(nextTable.get(), Optional.empty()));
} else {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.palantir.atlasdb.sweep;

import java.util.Optional;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -123,13 +122,21 @@ public SweepMetrics getSweepMetrics() {
return sweepMetrics;
}

void runOnceForTable(TableToSweep tableToSweep,
Optional<SweepBatchConfig> newSweepBatchConfig,
boolean saveSweepResults) {
Stopwatch watch = Stopwatch.createStarted();
void runOnceAndSaveResults(TableToSweep tableToSweep) {
TableReference tableRef = tableToSweep.getTableRef();
byte[] startRow = tableToSweep.getStartRow();
SweepBatchConfig batchConfig = newSweepBatchConfig.orElse(getAdjustedBatchConfig());
SweepBatchConfig batchConfig = getAdjustedBatchConfig();

SweepResults results = runOneIteration(tableRef, startRow, batchConfig);
saveSweepResults(tableToSweep, results);
}

SweepResults runOneIteration(
TableReference tableRef,
byte[] startRow,
SweepBatchConfig batchConfig) {

Stopwatch watch = Stopwatch.createStarted();
try {
SweepResults results = sweepRunner.run(
tableRef,
Expand All @@ -154,9 +161,7 @@ void runOnceForTable(TableToSweep tableToSweep,
.tableName(tableRef.getQualifiedName())
.elapsedMillis(elapsedMillis)
.build());
if (saveSweepResults) {
saveSweepResults(tableToSweep, results);
}
return results;
} catch (RuntimeException e) {
// This error may be logged on some paths above, but I prefer to log defensively.
log.info("Failed to sweep table {}"
Expand All @@ -174,14 +179,9 @@ void runOnceForTable(TableToSweep tableToSweep,
}
}

private SweepBatchConfig getAdjustedBatchConfig() {
public SweepBatchConfig getAdjustedBatchConfig() {
SweepBatchConfig baseConfig = sweepBatchConfig.get();
return ImmutableSweepBatchConfig.builder()
.maxCellTsPairsToExamine(
BackgroundSweeperImpl.adjustBatchParameter(baseConfig.maxCellTsPairsToExamine()))
.candidateBatchSize(BackgroundSweeperImpl.adjustBatchParameter(baseConfig.candidateBatchSize()))
.deleteBatchSize(BackgroundSweeperImpl.adjustBatchParameter(baseConfig.deleteBatchSize()))
.build();
return baseConfig.adjust(BackgroundSweeperImpl.batchSizeMultiplier);
}

private static String startRowToHex(@Nullable byte[] row) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,16 @@ default void check() {
Preconditions.checkState(candidateBatchSize() > 0, "Candidate batch size must be greater than zero");
Preconditions.checkState(deleteBatchSize() > 0, "Delete batch size must be greater than zero");
}

default SweepBatchConfig adjust(double multiplier) {
return ImmutableSweepBatchConfig.builder()
.maxCellTsPairsToExamine(adjust(maxCellTsPairsToExamine(), multiplier))
.candidateBatchSize(adjust(candidateBatchSize(), multiplier))
.deleteBatchSize(adjust(deleteBatchSize(), multiplier))
.build();
}

default int adjust(int parameterValue, double multiplier) {
return Math.max(1, (int) (multiplier * parameterValue));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2017 Palantir Technologies, Inc. All rights reserved.
*
* Licensed under the BSD-3 License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.sweep;

import java.util.Optional;

import org.immutables.value.Value;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.palantir.atlasdb.encoding.PtBytes;
import com.palantir.atlasdb.keyvalue.api.SweepResults;

@Value.Immutable
@JsonSerialize(as = ImmutableSweepTableResponse.class)
@JsonDeserialize(as = ImmutableSweepTableResponse.class)
public interface SweepTableResponse {

Optional<String> nextStartRow();

long numCellTsPairsExamined();

long staleValuesDeleted();

static SweepTableResponse from(SweepResults results) {
return ImmutableSweepTableResponse.builder()
.numCellTsPairsExamined(results.getCellTsPairsExamined())
.staleValuesDeleted(results.getStaleValuesDeleted())
.nextStartRow(results.getNextStartRow().map(PtBytes::encodeHexString))
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.palantir.atlasdb.AtlasDbConstants;
import com.palantir.atlasdb.encoding.PtBytes;
import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweeping;
import com.palantir.atlasdb.keyvalue.api.CandidateCellForSweepingRequest;
import com.palantir.atlasdb.keyvalue.api.Cell;
import com.palantir.atlasdb.keyvalue.api.ImmutableCandidateCellForSweepingRequest;
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.keyvalue.api.SweepResults;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.logging.LoggingArgs;
import com.palantir.atlasdb.protos.generated.TableMetadataPersistence.SweepStrategy;
import com.palantir.atlasdb.sweep.CellsToSweepPartitioningIterator.ExaminedCellLimit;
import com.palantir.atlasdb.transaction.impl.SweepStrategyManager;
Expand Down Expand Up @@ -124,7 +126,7 @@ private SweepResults runInternal(
}
if (keyValueService.getMetadataForTable(tableRef).length == 0) {
log.warn("The sweeper tried to sweep table '{}', but the table does not exist. Skipping table.",
UnsafeArg.of("table name", tableRef));
LoggingArgs.tableRef("tableRef", tableRef));
return SweepResults.createEmptySweepResult();
}
SweepStrategy sweepStrategy = sweepStrategyManager.get().getOrDefault(tableRef, SweepStrategy.CONSERVATIVE);
Expand All @@ -140,6 +142,10 @@ private SweepResults doRun(TableReference tableRef,
byte[] startRow,
RunType runType,
Sweeper sweeper) {
log.info("Beginning iteration of sweep for table {} starting at row {}",
LoggingArgs.tableRef(tableRef),
UnsafeArg.of("startRow", PtBytes.encodeHexString(startRow)));

// Earliest start timestamp of any currently open transaction, with two caveats:
// (1) unreadableTimestamps are calculated via wall-clock time, and so may not be correct
// under pathological clock conditions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,47 +15,53 @@
*/
package com.palantir.atlasdb.sweep;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Optional;

import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;

import com.palantir.logsafe.Safe;

/**
* Provides endpoints for sweeping a specific table.
*/
@Path("/sweep")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public interface SweeperService {
/**
* Sweep a particular table from EMPTY startRow with default {@link SweepBatchConfig}.
*/
@POST
@Path("sweep-table")
@Produces(MediaType.APPLICATION_JSON)
void sweepTable(@QueryParam("tablename") String tableName);

/**
* Sweep a particular table from specified startRow with default {@link SweepBatchConfig}.
*/
@POST
@Path("sweep-table-from-row")
@Produces(MediaType.APPLICATION_JSON)
void sweepTableFromStartRow(
@QueryParam("tablename") String tableName,
@Nonnull @QueryParam("startRow") String startRow);
default SweepTableResponse sweepTableFully(String tableName) {
return sweepTable(tableName, Optional.empty(), Optional.empty(),
Optional.empty(), Optional.empty(), Optional.empty());
}

default SweepTableResponse sweepTableFrom(String tableName, String startRow) {
return sweepTable(tableName, Optional.of(startRow), Optional.empty(),
Optional.empty(), Optional.empty(), Optional.empty());
}

/**
* Sweep a particular table from specified startRow with specified {@link SweepBatchConfig} parameters.
* Sweeps a particular table.
*
* @param tableName the table to sweep, in the format namespace.table_name (e.g. myapp.users)
* @param startRow (Optional) the row to start from, encoded as a hex string (e.g. 0x12345abcde)
* @param fullSweep (Optional; default true) whether to sweep the full table; if false just runs one batch
* @param maxCellTsPairsToExamine (Optional) see {@link SweepBatchConfig#maxCellTsPairsToExamine()}
* @param candidateBatchSize (Optional) see {@link SweepBatchConfig#candidateBatchSize()}
* @param deleteBatchSize (Optional) see {@link SweepBatchConfig#deleteBatchSize()}
*/
@POST
@Path("sweep-table-from-row-with-batch")
@Produces(MediaType.APPLICATION_JSON)
void sweepTableFromStartRowWithBatchConfig(
@Path("sweep-table")
SweepTableResponse sweepTable(
@QueryParam("tablename") String tableName,
@Nullable @QueryParam("startRow") String startRow,
@Nullable @QueryParam("maxCellTsPairsToExamine") Integer maxCellTsPairsToExamine,
@Nullable @QueryParam("candidateBatchSize") Integer candidateBatchSize,
@Nullable @QueryParam("deleteBatchSize") Integer deleteBatchSize);
@QueryParam("startRow") Optional<String> startRow,
@Safe @QueryParam("fullSweep") @DefaultValue("true") Optional<Boolean> fullSweep,
@Safe @QueryParam("maxCellTsPairsToExamine") Optional<Integer> maxCellTsPairsToExamine,
@Safe @QueryParam("candidateBatchSize") Optional<Integer> candidateBatchSize,
@Safe @QueryParam("deleteBatchSize") Optional<Integer> deleteBatchSize);
}
Loading

0 comments on commit 13a2b81

Please sign in to comment.