Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
add metrics to scrubber (#4398)
Browse files Browse the repository at this point in the history
* add metrics to scrubber

* lazy registration of scrub metrics for cost savings
  • Loading branch information
clockfort authored and jeremyk-91 committed Nov 20, 2019
1 parent 4e78280 commit 970313d
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ private CellFilterMetrics() {
public static final String SWEEP_OUTCOME = "outcome";
public static final String TAG_OUTCOME = "status";

public static final String ENQUEUED_CELLS = "enqueuedCells";
public static final String DELETED_CELLS = "deletedCells";
public static final String SCRUBBED_CELLS = "scrubbedCells";
public static final String SCRUB_RETRIES = "retriedBatches";

public static final String TAG_CURRENT_SUSPECTED_LEADER = "isCurrentSuspectedLeader";
public static final String TAG_CLIENT = "client";
public static final String TAG_PAXOS_USE_CASE = "paxosUseCase";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ private TransactionManager serializableInternal(@Output List<AutoCloseable> clos

Cleaner cleaner = initializeCloseable(() ->
new DefaultCleanerBuilder(keyValueService, lockAndTimestampServices.timelock(),
ImmutableList.of(follower), transactionService)
ImmutableList.of(follower), transactionService, metricsManager)
.setBackgroundScrubAggressively(config().backgroundScrubAggressively())
.setBackgroundScrubBatchSize(config().getBackgroundScrubBatchSize())
.setBackgroundScrubFrequencyMillis(config().getBackgroundScrubFrequencyMillis())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@ public Cleaner provideCleaner(ServicesConfig config,
@Named("kvs") KeyValueService kvs,
TimelockService timelock,
Follower follower,
TransactionService transactionService) {
TransactionService transactionService,
MetricsManager metricsManager) {
AtlasDbConfig atlasDbConfig = config.atlasDbConfig();
return new DefaultCleanerBuilder(
kvs,
timelock,
ImmutableList.of(follower),
transactionService)
transactionService,
metricsManager)
.setBackgroundScrubAggressively(atlasDbConfig.backgroundScrubAggressively())
.setBackgroundScrubBatchSize(atlasDbConfig.getBackgroundScrubBatchSize())
.setBackgroundScrubFrequencyMillis(atlasDbConfig.getBackgroundScrubFrequencyMillis())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.palantir.atlasdb.transaction.impl.SweepStrategyManager;
import com.palantir.atlasdb.transaction.service.TransactionService;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.atlasdb.util.MetricsManagers;
import com.palantir.common.concurrent.PTExecutors;
import com.palantir.lock.LockClient;
import com.palantir.lock.LockService;
Expand Down Expand Up @@ -76,7 +77,8 @@ public Cleaner provideCleaner(ServicesConfig config,
tss,
lockClient,
ImmutableList.of(follower),
transactionService)
transactionService,
MetricsManagers.createForTests())
.setBackgroundScrubAggressively(atlasDbConfig.backgroundScrubAggressively())
.setBackgroundScrubBatchSize(atlasDbConfig.getBackgroundScrubBatchSize())
.setBackgroundScrubFrequencyMillis(atlasDbConfig.getBackgroundScrubFrequencyMillis())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.palantir.atlasdb.cleaner.api.Cleaner;
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.transaction.service.TransactionService;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.common.time.Clock;
import com.palantir.lock.LockClient;
import com.palantir.lock.LockService;
Expand All @@ -42,6 +43,7 @@ public class DefaultCleanerBuilder {
private final TimelockService timelockService;
private final List<Follower> followerList;
private final TransactionService transactionService;
private final MetricsManager metricsManager;

private long transactionReadTimeout = AtlasDbConstants.DEFAULT_TRANSACTION_READ_TIMEOUT;
private long punchIntervalMillis = AtlasDbConstants.DEFAULT_PUNCH_INTERVAL_MILLIS;
Expand All @@ -57,19 +59,22 @@ public DefaultCleanerBuilder(KeyValueService keyValueService,
TimestampService timestampService,
LockClient lockClient,
List<? extends Follower> followerList,
TransactionService transactionService) {
TransactionService transactionService,
MetricsManager metricsManager) {
this(keyValueService, new LegacyTimelockService(timestampService, lockService, lockClient), followerList,
transactionService);
transactionService, metricsManager);
}

public DefaultCleanerBuilder(KeyValueService keyValueService,
TimelockService timelockService,
List<? extends Follower> followerList,
TransactionService transactionService) {
TransactionService transactionService,
MetricsManager metricsManager) {
this.keyValueService = keyValueService;
this.timelockService = timelockService;
this.followerList = ImmutableList.copyOf(followerList);
this.transactionService = transactionService;
this.metricsManager = metricsManager;
}

public DefaultCleanerBuilder setTransactionReadTimeout(long transactionReadTimeout) {
Expand Down Expand Up @@ -140,7 +145,8 @@ private Scrubber buildScrubber(Supplier<Long> unreadableTimestampSupplier,
Suppliers.ofInstance(backgroundScrubBatchSize),
backgroundScrubThreads,
backgroundScrubReadThreads,
followerList);
followerList,
metricsManager);
}

public Cleaner buildCleaner() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import com.google.common.primitives.UnsignedBytes;
import com.google.common.util.concurrent.Futures;
import com.palantir.atlasdb.AtlasDbConstants;
import com.palantir.atlasdb.AtlasDbMetricNames;
import com.palantir.atlasdb.encoding.PtBytes;
import com.palantir.atlasdb.keyvalue.api.Cell;
import com.palantir.atlasdb.keyvalue.api.KeyAlreadyExistsException;
Expand All @@ -66,6 +67,7 @@
import com.palantir.atlasdb.transaction.api.TransactionManager;
import com.palantir.atlasdb.transaction.impl.TransactionConstants;
import com.palantir.atlasdb.transaction.service.TransactionService;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.common.base.BatchingVisitable;
import com.palantir.common.base.Throwables;
import com.palantir.common.collect.Maps2;
Expand Down Expand Up @@ -102,6 +104,7 @@ public class Scrubber {
private final Supplier<Long> unreadableTimestampSupplier;
private final TransactionService transactionService;
private final Collection<Follower> followers;
private final MetricsManager metricsManager;
private final boolean aggressiveScrub;
private final Supplier<Integer> batchSizeSupplier;
private final int threadCount;
Expand Down Expand Up @@ -131,7 +134,8 @@ public static Scrubber create(KeyValueService keyValueService,
Supplier<Integer> batchSizeSupplier,
int threadCount,
int readThreadCount,
Collection<Follower> followers) {
Collection<Follower> followers,
MetricsManager metricsManager) {
Scrubber scrubber = new Scrubber(
keyValueService,
scrubberStore,
Expand All @@ -144,7 +148,8 @@ public static Scrubber create(KeyValueService keyValueService,
batchSizeSupplier,
threadCount,
readThreadCount,
followers);
followers,
metricsManager);
return scrubber;
}

Expand All @@ -159,7 +164,8 @@ private Scrubber(KeyValueService keyValueService,
Supplier<Integer> batchSizeSupplier,
int threadCount,
int readThreadCount,
Collection<Follower> followers) {
Collection<Follower> followers,
MetricsManager metricsManager) {
this.keyValueService = keyValueService;
this.scrubberStore = scrubberStore;
this.backgroundScrubFrequencyMillisSupplier = backgroundScrubFrequencyMillisSupplier;
Expand All @@ -172,6 +178,8 @@ private Scrubber(KeyValueService keyValueService,
this.threadCount = threadCount;
this.readThreadCount = readThreadCount;
this.followers = followers;
this.metricsManager = metricsManager;

NamedThreadFactory threadFactory = new NamedThreadFactory(SCRUBBER_THREAD_PREFIX, true);
this.readerExec = PTExecutors.newFixedThreadPool(readThreadCount, threadFactory);
this.exec = PTExecutors.newFixedThreadPool(threadCount, threadFactory);
Expand Down Expand Up @@ -209,6 +217,7 @@ private synchronized void launchBackgroundScrubTask(final TransactionManager txM
log.error("Encountered the following error during background scrub task,"
+ " but continuing anyway", t);
numberOfAttempts++;
lazyWriteMetric(AtlasDbMetricNames.SCRUB_RETRIES, 1);
try {
Thread.sleep(RETRY_SLEEP_INTERVAL_IN_MILLIS);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -319,11 +328,9 @@ void runBackgroundScrubTask(final TransactionManager txManager) {

final Callable<Void> c = () -> {
log.debug("Scrubbing {} cells immediately.", batchMultimap.size());

// Here we don't need to check scrub timestamps because we guarantee that scrubImmediately is called
// AFTER the transaction commits
scrubCells(txManager, batchMultimap, scrubTimestamp, TransactionType.AGGRESSIVE_HARD_DELETE);

log.debug("Completed scrub immediately.");
return null;
};
Expand Down Expand Up @@ -361,6 +368,7 @@ void runBackgroundScrubTask(final TransactionManager txManager) {
return;
}
scrubberStore.queueCellsForScrubbing(cellToTableRefs, scrubTimestamp, batchSizeSupplier.get());
lazyWriteMetric(AtlasDbMetricNames.ENQUEUED_CELLS, cellToTableRefs.size());
}

private long getCommitTimestampRollBackIfNecessary(long startTimestamp,
Expand Down Expand Up @@ -516,6 +524,7 @@ private void scrubCells(TransactionManager txManager,
log.debug("Immediately scrubbed {} cells from table {}", entry.getValue().size(), tableRef);
}
scrubberStore.markCellsAsScrubbed(allCellsToMarkScrubbed, batchSizeSupplier.get());
lazyWriteMetric(AtlasDbMetricNames.SCRUBBED_CELLS, allCellsToMarkScrubbed.size());
}

private void deleteCellsAtTimestamps(TransactionManager txManager,
Expand All @@ -531,10 +540,15 @@ private void deleteCellsAtTimestamps(TransactionManager txManager,
Builder<Cell, Long> builder = ImmutableMultimap.builder();
batch.stream().forEach(e -> builder.put(e));
keyValueService.delete(tableRef, builder.build());
lazyWriteMetric(AtlasDbMetricNames.DELETED_CELLS, batch.size());
}
}
}

private void lazyWriteMetric(String name, long value) {
metricsManager.registerOrGetMeter(Scrubber.class, name).mark(value);
}

public long getUnreadableTimestamp() {
return unreadableTimestampSupplier.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.palantir.atlasdb.transaction.impl.TransactionTables;
import com.palantir.atlasdb.transaction.service.SimpleTransactionService;
import com.palantir.atlasdb.transaction.service.TransactionService;
import com.palantir.atlasdb.util.MetricsManagers;
import com.palantir.common.base.BatchingVisitables;

@RunWith(Parameterized.class)
Expand Down Expand Up @@ -156,6 +157,7 @@ private Scrubber getScrubber(KeyValueService keyValueService, ScrubberStore scru
() -> 100, // batch size
1, // thread count
1, // read thread count
ImmutableList.of()); // followers
ImmutableList.of(), // followers
MetricsManagers.createForTests());
}
}
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-4398.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: feature
feature:
description: The scrubber has been instrumented with metrics.
links:
- https://github.com/palantir/atlasdb/pull/4398

0 comments on commit 970313d

Please sign in to comment.