Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

[TS | LIP-164000] Reset Sweep Progress #5277

Merged
merged 9 commits into from
Mar 3, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,32 @@ private long increaseValueFromToAtLeast(ShardAndStrategy shardAndStrategy, long
return currentValue;
}

public void resetProgressForShard(ShardAndStrategy shardAndStrategy) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh, not a fan of the code duplication, but it's different enough to make it awkward to reuse. We could just delete the entry to make this much simpler but I assume we want HA -- not that sweep will work if we don't have delete consistency anyway...

// TODO (jkong): This is a bit crappy. Really we want this to be INITIAL_TIMESTAMP, but that doesn't
// serialise because we require an unsigned integer.
byte[] colValZero = createColumnValue(SweepQueueUtils.RESET_TIMESTAMP);

long currentValue = getLastSweptTimestamp(shardAndStrategy);
while (currentValue > SweepQueueUtils.RESET_TIMESTAMP) {
CheckAndSetRequest casRequest = createRequest(shardAndStrategy, currentValue, colValZero);
try {
log.info(
"Attempting to reset targeted sweep progress for a shard.",
SafeArg.of("shardAndStrategy", shardAndStrategy));
kvs.checkAndSet(casRequest);
log.info(
"Reset targeted sweep progress for a shard.", SafeArg.of("shardAndStrategy", shardAndStrategy));
} catch (CheckAndSetException e) {
log.info(
"Failed to reset targeted sweep progress for a shard; trying again if someone changed it "
+ "(unless they also reset it).",
SafeArg.of("shardAndStrategy", shardAndStrategy),
e);
currentValue = rethrowIfUnchanged(shardAndStrategy, currentValue, e);
}
}
}

static byte[] createColumnValue(long newVal) {
return SweepShardProgressTable.Value.of(newVal).persistValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,16 @@ public long sweepNextBatch(ShardAndStrategy shardStrategy, long sweepTs) {
return sweepBatch.entriesRead();
}

public void resetSweepProgress() {
int shards = getNumShards();
log.info("Now attempting to reset sweep progress for both strategies...", SafeArg.of("numShards", shards));
for (int shard = 0; shard < shards; shard++) {
progress.resetProgressForShard(ShardAndStrategy.conservative(shard));
progress.resetProgressForShard(ShardAndStrategy.thorough(shard));
}
log.info("Sweep progress was reset for shards for both strategies.", SafeArg.of("numShards", shards));
}

/**
* Returns the most recently known number of shards.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public final class SweepQueueUtils {
public static final int BATCH_SIZE_KVS = 1000;
public static final long READ_TS = Long.MAX_VALUE;
public static final long INITIAL_TIMESTAMP = -1L;
public static final long RESET_TIMESTAMP = 0L;
public static final ColumnRangeSelection ALL_COLUMNS = allPossibleColumns();
public static final int MINIMUM_WRITE_INDEX = -TargetedSweepMetadata.MAX_DEDICATED_ROWS;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
public class TargetedSweeper implements MultiTableSweepQueueWriter, BackgroundSweeper {
private static final Logger log = LoggerFactory.getLogger(TargetedSweeper.class);

private final boolean shouldResetAndStopSweep;
private final Supplier<TargetedSweepRuntimeConfig> runtime;
private final List<Follower> followers;
private final MetricsManager metricsManager;
Expand All @@ -76,6 +77,7 @@ private TargetedSweeper(
this.conservativeScheduler =
new BackgroundSweepScheduler(install.conservativeThreads(), SweeperStrategy.CONSERVATIVE);
this.thoroughScheduler = new BackgroundSweepScheduler(install.thoroughThreads(), SweeperStrategy.THOROUGH);
this.shouldResetAndStopSweep = install.resetTargetedSweepQueueProgressAndStopSweep();
this.followers = followers;
this.metricsConfiguration = install.metricsConfiguration();
}
Expand Down Expand Up @@ -175,8 +177,15 @@ public void initializeWithoutRunning(
@Override
public void runInBackground() {
assertInitialized();
conservativeScheduler.scheduleBackgroundThreads();
thoroughScheduler.scheduleBackgroundThreads();
if (shouldResetAndStopSweep) {
log.warn("This AtlasDB node is operating in a mode where it is attempting to reset the progress of "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really what we want? We could just allow it to start sweeping. Also the nodes on old versions will still be attempting to sweep anyway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline: this centers around the behaviour of targeted sweep where nodes CAS the bound from (thing I read -> my progress) repeatedly. We need to wait for all nodes to report that they're done with this.

+ "targeted sweep. While in this mode, your data is not getting swept: please restart your node "
+ "once it is confirmed that sweep progress has been reset.");
queue.resetSweepProgress();
} else {
conservativeScheduler.scheduleBackgroundThreads();
thoroughScheduler.scheduleBackgroundThreads();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,21 @@ public TargetedSweepMetrics.MetricsConfiguration metricsConfiguration() {
return TargetedSweepMetricsConfigurations.DEFAULT;
}

/**
* Specifies whether on startup we should reset progress in the targeted sweep queue. This may be useful to deal
* with circumstances where entries are written to the targeted sweep queue after the sweep timestamp has
* progressed past it - while the transaction in question will necessarily fail, there may still be cruft in the
* targeted sweep queue.
*
* If set to true, resets progress to zero for each shard and strategy on startup. This configuration can also only
* be safely used if nodes are not actively sweeping, and so if configured to be true will prevent targeted sweep
* from running.
*/
@Value.Default
public boolean resetTargetedSweepQueueProgressAndStopSweep() {
return false;
}

public static TargetedSweepInstallConfig defaultTargetedSweepConfig() {
return ImmutableTargetedSweepInstallConfig.builder().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.palantir.atlasdb.sweep.queue;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
Expand All @@ -37,6 +38,7 @@

public class ShardProgressTest {
private static final long INITIAL_TIMESTAMP = SweepQueueUtils.INITIAL_TIMESTAMP;
private static final long RESET_TIMESTAMP = SweepQueueUtils.RESET_TIMESTAMP;

private ShardProgress progress;
private KeyValueService kvs;
Expand Down Expand Up @@ -186,6 +188,46 @@ public void repeatedlyFailingCasThrows() {
.isInstanceOf(CheckAndSetException.class);
}

@Test
public void canResetProgressForSpecificShards() {
progress.updateLastSweptTimestamp(CONSERVATIVE_TEN, 8888L);
progress.updateLastSweptTimestamp(CONSERVATIVE_TWENTY, 8888L);
assertThat(progress.getLastSweptTimestamp(CONSERVATIVE_TEN)).isEqualTo(8888L);
assertThat(progress.getLastSweptTimestamp(CONSERVATIVE_TWENTY)).isEqualTo(8888L);

progress.resetProgressForShard(CONSERVATIVE_TEN);
assertThat(progress.getLastSweptTimestamp(CONSERVATIVE_TEN)).isEqualTo(RESET_TIMESTAMP);
assertThat(progress.getLastSweptTimestamp(CONSERVATIVE_TWENTY)).isEqualTo(8888L);
}

@Test
public void stopsTryingToResetIfSomeoneElseDid() {
KeyValueService mockKvs = mock(KeyValueService.class);
when(mockKvs.get(any(), anyMap()))
.thenReturn(ImmutableMap.of(DUMMY, createValue(8L)))
.thenReturn(ImmutableMap.of(DUMMY, createValue(4L)))
.thenReturn(ImmutableMap.of(DUMMY, createValue(RESET_TIMESTAMP)));
doThrow(new CheckAndSetException("sadness")).when(mockKvs).checkAndSet(any());
ShardProgress instrumentedProgress = new ShardProgress(mockKvs);

assertThatCode(() -> instrumentedProgress.resetProgressForShard(CONSERVATIVE_TEN))
.doesNotThrowAnyException();
}

@Test
public void repeatedlyFailingCasThrowsForReset() {
KeyValueService mockKvs = mock(KeyValueService.class);
when(mockKvs.get(any(), anyMap()))
.thenReturn(ImmutableMap.of(DUMMY, createValue(8L)))
.thenReturn(ImmutableMap.of(DUMMY, createValue(9L)))
.thenReturn(ImmutableMap.of(DUMMY, createValue(10L)));
doThrow(new CheckAndSetException("sadness")).when(mockKvs).checkAndSet(any());
ShardProgress instrumentedProgress = new ShardProgress(mockKvs);

assertThatCode(() -> instrumentedProgress.resetProgressForShard(CONSERVATIVE_TEN))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably want to verify that it only throws once the value actually repeats

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think this is tested by stopsTryingToResetIfSomeoneElseDid() above - in practice you could check the CheckAndSetException's actual values but we don't get that here because of mocks, so I don't see another way of easily doing this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OH I meant just verify that the method was called 4 times

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added!

.isInstanceOf(CheckAndSetException.class);
}

private Value createValue(long num) {
SweepShardProgressTable.Value value = SweepShardProgressTable.Value.of(num);
return Value.create(value.persistValue(), 0L);
Expand Down
8 changes: 8 additions & 0 deletions changelog/@unreleased/pr-5277.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
type: improvement
improvement:
description: |-
Targeted sweep progress may be reset with the `resetTargetedSweepQueueProgressAndStopSweep` flag in targeted sweep install configuration. This may be useful in cleaning up cruft in the targeted sweep queue that may have been written by failed transactions.

As the name suggests, this will prevent sweep from cleaning up old cells, so users should not run with this configuration in the steady state. If running your service in HA, once the last node rolls and reports that it has successfully reset the sweep progress table, we can be certain that progress has been reset to zero.
links:
- https://github.com/palantir/atlasdb/pull/5277