Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Fix SweepBatchConfig values to properly decrease to 1 with each failu…
Browse files Browse the repository at this point in the history
…re and increase with each success (#2630)

* Fix SweepBatchConfig values to properly decrease to 1 with each failure and increase with each success

* add logging when we stop reducing the batch size multiplier

* further improve the tests

* Allow sweep to recover faster after backing off.  Before we would increase by 1% for each successive success, if we had reduced a value to 1 it would be 70 iterations before we got 2 and 700 iterations before we got back to 1000.  Now we always 25 iterations with the lower batch size and then try increasing the rate by doubling each time.  This means that when sweep has to back off it should speed up again quickly.

* Use an AtomicInteger to handle concurrent updates
  • Loading branch information
tboam authored Nov 8, 2017
1 parent 6e939b2 commit 2e8c960
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.palantir.atlasdb.sweep;

import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang3.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,6 +32,7 @@ public final class AdjustableSweepBatchConfigSource {
private final Supplier<SweepBatchConfig> rawSweepBatchConfig;

private static volatile double batchSizeMultiplier = 1.0;
private final AtomicInteger successiveIncreases = new AtomicInteger(0);

private AdjustableSweepBatchConfigSource(Supplier<SweepBatchConfig> rawSweepBatchConfig) {
this.rawSweepBatchConfig = rawSweepBatchConfig;
Expand Down Expand Up @@ -67,14 +71,21 @@ private static int adjust(int parameterValue, double multiplier) {
}

public void increaseMultiplier() {
batchSizeMultiplier = Math.min(1.0, batchSizeMultiplier * 1.01);
if (batchSizeMultiplier == 1.0) {
return;
}

if (successiveIncreases.incrementAndGet() > 25) {
batchSizeMultiplier = Math.min(1.0, batchSizeMultiplier * 2);
}
}

public void decreaseMultiplier() {
successiveIncreases.set(0);
SweepBatchConfig lastBatchConfig = getAdjustedSweepConfig();

// Cut batch size in half, always sweep at least one row (we round down).
batchSizeMultiplier = Math.max(batchSizeMultiplier / 2, 1.5 / lastBatchConfig.candidateBatchSize());
// Cut batch size in half, always sweep at least one row.
reduceBatchSizeMultiplier();

log.warn("Sweep failed unexpectedly with candidate batch size {},"
+ " delete batch size {},"
Expand All @@ -85,4 +96,25 @@ public void decreaseMultiplier() {
SafeArg.of("maxCellTsPairsToExamine", lastBatchConfig.maxCellTsPairsToExamine()),
SafeArg.of("batchSizeMultiplier", batchSizeMultiplier));
}

private void reduceBatchSizeMultiplier() {
SweepBatchConfig config = getRawSweepConfig();
double smallestSensibleBatchSizeMultiplier =
1.0 / NumberUtils.max(
config.maxCellTsPairsToExamine(), config.candidateBatchSize(), config.deleteBatchSize());

if (batchSizeMultiplier == smallestSensibleBatchSizeMultiplier) {
return;
}

double newBatchSizeMultiplier = batchSizeMultiplier / 2;
if (newBatchSizeMultiplier < smallestSensibleBatchSizeMultiplier) {
log.info("batchSizeMultiplier reached the smallest sensible value for the current sweep config ({}), "
+ "will not reduce further.",
SafeArg.of("batchSizeMultiplier", smallestSensibleBatchSizeMultiplier));
batchSizeMultiplier = smallestSensibleBatchSizeMultiplier;
} else {
batchSizeMultiplier = newBatchSizeMultiplier;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright 2017 Palantir Technologies, Inc. All rights reserved.
*
* Licensed under the BSD-3 License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.sweep;

import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertThat;

import java.util.function.Function;

import org.junit.Test;

/**
* Created by tboam on 03/11/2017.
*/
public class AdjustableSweepBatchConfigSourceTest {

private AdjustableSweepBatchConfigSource adjustableConfig;
private SweepBatchConfig previousConfig;
private double previousMultiplier;

@Test
public void batchSizeMultiplierDecreasesOnFailure() {
// Given
configWithValues(1000, 1000, 1000);

// When
adjustableConfig.decreaseMultiplier();

// Then
assertThat(adjustableConfig.getBatchSizeMultiplier(), is(lessThan(previousMultiplier)));
}

@Test
public void canDecreaseAndIncreaseConfigWithAllSmallValues() {
//Given
configWithValues(1, 1, 1);

whenDecreasingTheMultiplier_thenAdjustedConfigValuesDecrease();

whenIncreasingTheMultiplier_thenAdjustedConfigValuesIncrease();

assertThat(adjustableConfig.getBatchSizeMultiplier(), is(1.0));
}

@Test
public void canDecreaseAndIncreaseConfigWithAllLargeValues() {
//Given
configWithValues(1000, 1000, 1000);

whenDecreasingTheMultiplier_thenAdjustedConfigValuesDecrease();

whenIncreasingTheMultiplier_thenAdjustedConfigValuesIncrease();

assertThat(adjustableConfig.getBatchSizeMultiplier(), is(1.0));
}

@Test
public void canDecreaseAndIncreaseConfigWithMixOfValues() {
//Given
configWithValues(1000, 1, 100);

whenDecreasingTheMultiplier_thenAdjustedConfigValuesDecrease();

whenIncreasingTheMultiplier_thenAdjustedConfigValuesIncrease();

assertThat(adjustableConfig.getBatchSizeMultiplier(), is(1.0));
}

private void whenDecreasingTheMultiplier_thenAdjustedConfigValuesDecrease() {
for (int i = 0; i < 10_000; i++) {
// When
adjustableConfig.decreaseMultiplier();

// Then
batchSizeMultiplierDecreases();
maxCellTsPairsToExamineDecreasesToAMinimumOfOne();
candidateBatchSizeDecreasesToAMinimumOfOne();
deleteBatchSizeDecreasesToAMinimumOfOne();

updatePreviousValues();
}
}

private void whenIncreasingTheMultiplier_thenAdjustedConfigValuesIncrease() {
for (int i = 0; i < 1_000; i++) {
// When
adjustableConfig.increaseMultiplier();

// Then
batchSizeMultiplierIncreases();
batchSizeMultiplierDoesNotExceedOne();

maxCellTsPairsToExamineIncreasesBackUpToBaseConfig();
candidateBatchSizeIncreasesBackUpToBaseConfig();
deleteBatchSizeIncreasesBackUpToBaseConfig();

updatePreviousValues();
}
}

private void configWithValues(int maxCellTsPairsToExamine, int candidateBatchSize, int deleteBatchSize) {
adjustableConfig = AdjustableSweepBatchConfigSource.create(() ->
ImmutableSweepBatchConfig.builder()
.maxCellTsPairsToExamine(maxCellTsPairsToExamine)
.candidateBatchSize(candidateBatchSize)
.deleteBatchSize(deleteBatchSize)
.build()
);

updatePreviousValues();
}

private void batchSizeMultiplierDecreases() {
assertThat(adjustableConfig.getBatchSizeMultiplier(), is(lessThanOrEqualTo(previousMultiplier)));
}

private void decreasesToOne(Function<SweepBatchConfig, Integer> getValue) {
int newValue = getValue.apply(adjustableConfig.getAdjustedSweepConfig());
int previousValue = getValue.apply(previousConfig);

assertThat(newValue, is(anyOf(equalTo(1), lessThan(previousValue))));
}

private void maxCellTsPairsToExamineDecreasesToAMinimumOfOne() {
decreasesToOne(SweepBatchConfig::maxCellTsPairsToExamine);
}

private void candidateBatchSizeDecreasesToAMinimumOfOne() {
decreasesToOne(SweepBatchConfig::candidateBatchSize);
}

private void deleteBatchSizeDecreasesToAMinimumOfOne() {
decreasesToOne(SweepBatchConfig::deleteBatchSize);
}

private void batchSizeMultiplierIncreases() {
assertThat(adjustableConfig.getBatchSizeMultiplier(), is(greaterThanOrEqualTo(previousMultiplier)));
}

private void batchSizeMultiplierDoesNotExceedOne() {
assertThat(adjustableConfig.getBatchSizeMultiplier(), is(lessThanOrEqualTo(1.0)));
}

private void maxCellTsPairsToExamineIncreasesBackUpToBaseConfig() {
increasesBackUpToBaseConfig(SweepBatchConfig::maxCellTsPairsToExamine);
}

private void candidateBatchSizeIncreasesBackUpToBaseConfig() {
increasesBackUpToBaseConfig(SweepBatchConfig::candidateBatchSize);
}

private void deleteBatchSizeIncreasesBackUpToBaseConfig() {
increasesBackUpToBaseConfig(SweepBatchConfig::deleteBatchSize);
}

private void increasesBackUpToBaseConfig(Function<SweepBatchConfig, Integer> getValue) {
assertThat(getValue.apply(adjustableConfig.getAdjustedSweepConfig()),
is(anyOf(
greaterThan(getValue.apply(previousConfig)),
lessThanOrEqualTo(getValue.apply(adjustableConfig.getRawSweepConfig()))
)));
}

private void updatePreviousValues() {
previousMultiplier = adjustableConfig.getBatchSizeMultiplier();
previousConfig = adjustableConfig.getAdjustedSweepConfig();
}
}
6 changes: 6 additions & 0 deletions docs/source/release_notes/release-notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ develop
Also, all Cassandra KVS exceptions like ``KeyAlreadyExists`` or ``TTransportException`` as well as ``NotInitializedException`` will get wrapped into ``AtlasDbDependencyException`` in the interest of consistent exceptions.
(`Pull Request <https://github.com/palantir/atlasdb/pull/2558>`__)

* - |fixed|
- ``SweepBatchConfig`` values are now decayed correctly when there's an error.
``SweepBatchConfig`` should be decreased until sweep succeeds, however the config actually oscillated between values, these were normally small but could be larger than the original config. This was caused by us fixing one of the values at 1.
``SweepBatchConfig`` values will now be halved with each failure until they reach 1 (previously they only went to about 30% due to another bug). This ensures we fully backoff and gives us the best possible chance of success. Values will slowly increase with each successful run until they are back to their default level.
(`Pull Request <https://github.com/palantir/atlasdb/pull/2630>`__)

* - |fixed|
- Reverted the Cassandra KVS executor PR (`Pull Request <https://github.com/palantir/atlasdb/pull/2534>`__) that caused a performance regression.
(`Pull Request <https://github.com/palantir/atlasdb/pull/2637>`__)
Expand Down

0 comments on commit 2e8c960

Please sign in to comment.