Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[QoS] live reload (#2710)
Browse files Browse the repository at this point in the history
* live reload and logging

* millis

* checkpoint

* fix tests

* comments

* checkstyle
  • Loading branch information
nziebart authored Nov 20, 2017
1 parent fdbdd78 commit cfee25a
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import com.palantir.atlasdb.transaction.service.TransactionService;
import com.palantir.atlasdb.transaction.service.TransactionServices;
import com.palantir.atlasdb.util.AtlasDbMetrics;
import com.palantir.atlasdb.util.JavaSuppliers;
import com.palantir.leader.LeaderElectionService;
import com.palantir.leader.PingableLeader;
import com.palantir.leader.proxy.AwaitingLeadershipProxy;
Expand Down Expand Up @@ -309,7 +310,7 @@ SerializableTransactionManager serializable() {
java.util.function.Supplier<AtlasDbRuntimeConfig> runtimeConfigSupplier =
() -> runtimeConfigSupplier().get().orElse(defaultRuntime);

QosClient qosClient = getQosClient(runtimeConfigSupplier.get().qos());
QosClient qosClient = getQosClient(JavaSuppliers.compose(conf -> conf.qos(), runtimeConfigSupplier));

ServiceDiscoveringAtlasSupplier atlasFactory =
new ServiceDiscoveringAtlasSupplier(
Expand Down Expand Up @@ -403,11 +404,10 @@ SerializableTransactionManager serializable() {
return transactionManager;
}

private QosClient getQosClient(QosClientConfig config) {
// TODO(nziebart): create a RefreshingRateLimiter
private QosClient getQosClient(Supplier<QosClientConfig> config) {
QosRateLimiters rateLimiters = QosRateLimiters.create(
config.limits(),
config.maxBackoffSleepTime().toMilliseconds());
JavaSuppliers.compose(conf -> conf.maxBackoffSleepTime().toMilliseconds(), config),
JavaSuppliers.compose(QosClientConfig::limits, config));
return AtlasDbQosClient.create(rateLimiters);
}

Expand Down
3 changes: 2 additions & 1 deletion qos-service-impl/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ jacocoTestReport {

check.dependsOn integTest
dependencies {
compile (project(":qos-service-api"));
compile (project(":atlasdb-client")) {
exclude group: 'com.squareup.okhttp3'
exclude group: 'com.google.guava'
}
compile (project(":qos-service-api"));

processor project(":atlasdb-processors")
processor group: 'org.immutables', name: 'value'

testCompile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.palantir.atlasdb.qos.client;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

Expand Down Expand Up @@ -65,7 +66,13 @@ private <T, E extends Exception> T execute(
QosRateLimiter rateLimiter,
Consumer<QueryWeight> weightMetric) throws E {
long estimatedNumBytes = weigher.estimate().numBytes();
rateLimiter.consumeWithBackoff(estimatedNumBytes);
try {
Duration waitTime = rateLimiter.consumeWithBackoff(estimatedNumBytes);
metrics.recordBackoffMicros(TimeUnit.NANOSECONDS.toMicros(waitTime.toNanos()));
} catch (RuntimeException ex) { // TODO(nziebart): use rate limited exception here
metrics.recordRateLimitedException();
throw ex;
}

Stopwatch timer = Stopwatch.createStarted(ticker);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ public abstract class QosLimitsConfig {
public static final QosLimitsConfig DEFAULT_NO_LIMITS = ImmutableQosLimitsConfig.builder().build();

@Value.Default
public int readBytesPerSecond() {
return Integer.MAX_VALUE;
public long readBytesPerSecond() {
return Long.MAX_VALUE;
}

@Value.Default
public int writeBytesPerSecond() {
return Integer.MAX_VALUE;
public long writeBytesPerSecond() {
return Long.MAX_VALUE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,19 @@

package com.palantir.atlasdb.qos.metrics;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Meter;
import com.palantir.atlasdb.qos.QueryWeight;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.logsafe.SafeArg;

// TODO(nziebart): needs tests
public class QosMetrics {

private static final Logger log = LoggerFactory.getLogger(QosMetrics.class);

private final MetricsManager metricsManager = new MetricsManager();

private final Meter readRequestCount;
Expand All @@ -34,6 +41,8 @@ public class QosMetrics {
private final Meter writeTime;
private final Meter rowsWritten;

private final Meter backoffTime;
private final Meter rateLimitedExceptions;

public QosMetrics() {
readRequestCount = metricsManager.registerMeter(QosMetrics.class, "numReadRequests");
Expand All @@ -45,6 +54,9 @@ public QosMetrics() {
bytesWritten = metricsManager.registerMeter(QosMetrics.class, "bytesWritten");
writeTime = metricsManager.registerMeter(QosMetrics.class, "writeTime");
rowsWritten = metricsManager.registerMeter(QosMetrics.class, "rowsWritten");

backoffTime = metricsManager.registerMeter(QosMetrics.class, "backoffTime");
rateLimitedExceptions = metricsManager.registerMeter(QosMetrics.class, "rateLimitedExceptions");
}

public void recordRead(QueryWeight weight) {
Expand All @@ -61,4 +73,17 @@ public void recordWrite(QueryWeight weight) {
rowsWritten.mark(weight.numDistinctRows());
}

public void recordBackoffMicros(long backoffTimeMicros) {
if (backoffTimeMicros > 0) {
log.info("Backing off for {} micros", SafeArg.of("backoffTimeMicros", backoffTimeMicros));
backoffTime.mark(backoffTimeMicros);
}
}

public void recordRateLimitedException() {
log.info("Rate limit exceeded and backoff time would be more than the configured maximum. "
+ "Throwing a throttling exception");
rateLimitedExceptions.mark();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;
import com.palantir.atlasdb.qos.ratelimit.guava.RateLimiter;
import com.palantir.atlasdb.qos.ratelimit.guava.SmoothRateLimiter;
import com.palantir.logsafe.SafeArg;

/**
* A rate limiter for database queries, based on "units" of expense. This limiter strives to maintain an upper limit on
Expand All @@ -35,31 +40,30 @@
*/
public class QosRateLimiter {

private static final double MAX_BURST_SECONDS = 5;
private static final double UNLIMITED_RATE = Double.MAX_VALUE;
private static final Logger log = LoggerFactory.getLogger(QosRateLimiter.class);

private static final long MAX_BURST_SECONDS = 5;

private final Supplier<Long> maxBackoffTimeMillis;
private final Supplier<Long> unitsPerSecond;
private final RateLimiter.SleepingStopwatch stopwatch;

private final long maxBackoffTimeMillis;
private RateLimiter rateLimiter;
private volatile RateLimiter rateLimiter;
private volatile long currentRate;

public static QosRateLimiter create(long maxBackoffTimeMillis) {
return new QosRateLimiter(RateLimiter.SleepingStopwatch.createFromSystemTimer(), maxBackoffTimeMillis);
public static QosRateLimiter create(Supplier<Long> maxBackoffTimeMillis, Supplier<Long> unitsPerSecond) {
return new QosRateLimiter(RateLimiter.SleepingStopwatch.createFromSystemTimer(), maxBackoffTimeMillis,
unitsPerSecond);
}

@VisibleForTesting
QosRateLimiter(RateLimiter.SleepingStopwatch stopwatch, long maxBackoffTimeMillis) {
rateLimiter = new SmoothRateLimiter.SmoothBursty(
stopwatch,
MAX_BURST_SECONDS);

rateLimiter.setRate(UNLIMITED_RATE);
QosRateLimiter(RateLimiter.SleepingStopwatch stopwatch, Supplier<Long> maxBackoffTimeMillis,
Supplier<Long> unitsPerSecond) {
this.stopwatch = stopwatch;
this.unitsPerSecond = unitsPerSecond;
this.maxBackoffTimeMillis = maxBackoffTimeMillis;
}

/**
* Update the allowed rate, in units per second.
*/
public void updateRate(double unitsPerSecond) {
rateLimiter.setRate(unitsPerSecond);
createRateLimiterAtomically();
}

/**
Expand All @@ -69,9 +73,11 @@ public void updateRate(double unitsPerSecond) {
* @return the amount of time slept for, if any
*/
public Duration consumeWithBackoff(long estimatedNumUnits) {
updateRateIfNeeded();

Optional<Duration> waitTime = rateLimiter.tryAcquire(
Ints.saturatedCast(estimatedNumUnits), // TODO(nziebart): deal with longs
maxBackoffTimeMillis,
maxBackoffTimeMillis.get(),
TimeUnit.MILLISECONDS);

if (!waitTime.isPresent()) {
Expand All @@ -81,6 +87,31 @@ public Duration consumeWithBackoff(long estimatedNumUnits) {
return waitTime.get();
}

/**
* The RateLimiter's rate requires a lock acquisition to read, and is returned as a double. To avoid
* overhead and double comparisons, we maintain the current rate ourselves.
*/
private void updateRateIfNeeded() {
if (currentRate != unitsPerSecond.get()) {
createRateLimiterAtomically();
}
}

/**
* Guava's RateLimiter has strange behavior around updating the rate. Namely, if you set the rate very small and ask
* for a large number of permits, you will end up having to wait until that small rate is satisfied before acquiring
* more, even if you update the rate to something very large. So, we just create a new rate limiter if the rate
* changes.
*/
private synchronized void createRateLimiterAtomically() {
currentRate = unitsPerSecond.get();
rateLimiter = new SmoothRateLimiter.SmoothBursty(stopwatch, MAX_BURST_SECONDS);
rateLimiter.setRate(currentRate);

// TODO(nziebart): distinguish between read/write rate limiters
log.info("Units per second set to {}", SafeArg.of("unitsPerSecond", currentRate));
}

/**
* Records an adjustment to the original estimate of units consumed passed to {@link #consumeWithBackoff}. This
* should be called after a query returns, when the exact number of units consumed is known. This value may be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,21 @@

package com.palantir.atlasdb.qos.ratelimit;

import java.util.function.Supplier;

import org.immutables.value.Value;

import com.palantir.atlasdb.qos.config.QosLimitsConfig;

@Value.Immutable
public interface QosRateLimiters {

static QosRateLimiters create(QosLimitsConfig config, long maxBackoffSleepTimeMillis) {
QosRateLimiter readLimiter = QosRateLimiter.create(maxBackoffSleepTimeMillis);
readLimiter.updateRate(config.readBytesPerSecond());
static QosRateLimiters create(Supplier<Long> maxBackoffSleepTimeMillis, Supplier<QosLimitsConfig> config) {
QosRateLimiter readLimiter = QosRateLimiter.create(maxBackoffSleepTimeMillis,
() -> config.get().readBytesPerSecond());

QosRateLimiter writeLimiter = QosRateLimiter.create(maxBackoffSleepTimeMillis);
writeLimiter.updateRate(config.writeBytesPerSecond());
QosRateLimiter writeLimiter = QosRateLimiter.create(maxBackoffSleepTimeMillis,
() -> config.get().writeBytesPerSecond());

return ImmutableQosRateLimiters.builder()
.read(readLimiter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import java.time.Duration;

import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -76,6 +78,8 @@ public void setUp() {
when(weigher.weighSuccess(any(), anyLong())).thenReturn(ACTUAL_WEIGHT);
when(weigher.weighFailure(any(), anyLong())).thenReturn(ACTUAL_WEIGHT);

when(readLimiter.consumeWithBackoff(anyLong())).thenReturn(Duration.ZERO);
when(writeLimiter.consumeWithBackoff(anyLong())).thenReturn(Duration.ZERO);
}

@Test
Expand All @@ -92,7 +96,6 @@ public void recordsReadMetrics() throws TestCheckedException {
qosClient.executeRead(() -> "foo", weigher);

verify(metrics).recordRead(ACTUAL_WEIGHT);
verifyNoMoreInteractions(metrics);
}

@Test
Expand All @@ -116,7 +119,6 @@ public void recordsWriteMetrics() throws TestCheckedException {
qosClient.executeWrite(() -> null, weigher);

verify(metrics).recordWrite(ACTUAL_WEIGHT);
verifyNoMoreInteractions(metrics);
}

@Test
Expand All @@ -127,7 +129,6 @@ public void recordsReadMetricsOnFailure() throws TestCheckedException {
}, weigher)).isInstanceOf(TestCheckedException.class);

verify(metrics).recordRead(ACTUAL_WEIGHT);
verifyNoMoreInteractions(metrics);
}

@Test
Expand All @@ -138,7 +139,6 @@ public void recordsWriteMetricsOnFailure() throws TestCheckedException {
}, weigher)).isInstanceOf(TestCheckedException.class);

verify(metrics).recordWrite(ACTUAL_WEIGHT);
verifyNoMoreInteractions(metrics);
}

@Test
Expand All @@ -163,5 +163,22 @@ public void propagatesCheckedExceptions() throws TestCheckedException {
}, weigher)).isInstanceOf(TestCheckedException.class);
}

@Test
public void recordsBackoffTime() {
when(readLimiter.consumeWithBackoff(anyLong())).thenReturn(Duration.ofMillis(1_100));
qosClient.executeRead(() -> "foo", weigher);

verify(metrics).recordBackoffMicros(1_100_000);
}

@Test
public void recordsBackoffExceptions() {
// TODO(nziebart): use rate limited exception here
when(readLimiter.consumeWithBackoff(anyLong())).thenThrow(new RuntimeException("rate limited"));
assertThatThrownBy(() -> qosClient.executeRead(() -> "foo", weigher)).isInstanceOf(RuntimeException.class);

verify(metrics).recordRateLimitedException();
}

static class TestCheckedException extends Exception {}
}
Loading

0 comments on commit cfee25a

Please sign in to comment.