Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

[QoS] live reload #2710

Merged
merged 6 commits into from
Nov 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import com.palantir.atlasdb.transaction.service.TransactionService;
import com.palantir.atlasdb.transaction.service.TransactionServices;
import com.palantir.atlasdb.util.AtlasDbMetrics;
import com.palantir.atlasdb.util.JavaSuppliers;
import com.palantir.leader.LeaderElectionService;
import com.palantir.leader.PingableLeader;
import com.palantir.leader.proxy.AwaitingLeadershipProxy;
Expand Down Expand Up @@ -309,7 +310,7 @@ SerializableTransactionManager serializable() {
java.util.function.Supplier<AtlasDbRuntimeConfig> runtimeConfigSupplier =
() -> runtimeConfigSupplier().get().orElse(defaultRuntime);

QosClient qosClient = getQosClient(runtimeConfigSupplier.get().qos());
QosClient qosClient = getQosClient(JavaSuppliers.compose(conf -> conf.qos(), runtimeConfigSupplier));

ServiceDiscoveringAtlasSupplier atlasFactory =
new ServiceDiscoveringAtlasSupplier(
Expand Down Expand Up @@ -403,11 +404,10 @@ SerializableTransactionManager serializable() {
return transactionManager;
}

private QosClient getQosClient(QosClientConfig config) {
// TODO(nziebart): create a RefreshingRateLimiter
private QosClient getQosClient(Supplier<QosClientConfig> config) {
QosRateLimiters rateLimiters = QosRateLimiters.create(
config.limits(),
config.maxBackoffSleepTime().toMilliseconds());
JavaSuppliers.compose(conf -> conf.maxBackoffSleepTime().toMilliseconds(), config),
JavaSuppliers.compose(QosClientConfig::limits, config));
return AtlasDbQosClient.create(rateLimiters);
}

Expand Down
3 changes: 2 additions & 1 deletion qos-service-impl/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ jacocoTestReport {

check.dependsOn integTest
dependencies {
compile (project(":qos-service-api"));
compile (project(":atlasdb-client")) {
exclude group: 'com.squareup.okhttp3'
exclude group: 'com.google.guava'
}
compile (project(":qos-service-api"));

processor project(":atlasdb-processors")
processor group: 'org.immutables', name: 'value'

testCompile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.palantir.atlasdb.qos.client;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

Expand Down Expand Up @@ -65,7 +66,13 @@ private <T, E extends Exception> T execute(
QosRateLimiter rateLimiter,
Consumer<QueryWeight> weightMetric) throws E {
long estimatedNumBytes = weigher.estimate().numBytes();
rateLimiter.consumeWithBackoff(estimatedNumBytes);
try {
Duration waitTime = rateLimiter.consumeWithBackoff(estimatedNumBytes);
metrics.recordBackoffMicros(TimeUnit.NANOSECONDS.toMicros(waitTime.toNanos()));
} catch (RuntimeException ex) { // TODO(nziebart): use rate limited exception here
metrics.recordRateLimitedException();
throw ex;
}

Stopwatch timer = Stopwatch.createStarted(ticker);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ public abstract class QosLimitsConfig {
public static final QosLimitsConfig DEFAULT_NO_LIMITS = ImmutableQosLimitsConfig.builder().build();

@Value.Default
public int readBytesPerSecond() {
return Integer.MAX_VALUE;
public long readBytesPerSecond() {
return Long.MAX_VALUE;
}

@Value.Default
public int writeBytesPerSecond() {
return Integer.MAX_VALUE;
public long writeBytesPerSecond() {
return Long.MAX_VALUE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,19 @@

package com.palantir.atlasdb.qos.metrics;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Meter;
import com.palantir.atlasdb.qos.QueryWeight;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.logsafe.SafeArg;

// TODO(nziebart): needs tests
public class QosMetrics {

private static final Logger log = LoggerFactory.getLogger(QosMetrics.class);

private final MetricsManager metricsManager = new MetricsManager();

private final Meter readRequestCount;
Expand All @@ -34,6 +41,8 @@ public class QosMetrics {
private final Meter writeTime;
private final Meter rowsWritten;

private final Meter backoffTime;
private final Meter rateLimitedExceptions;

public QosMetrics() {
readRequestCount = metricsManager.registerMeter(QosMetrics.class, "numReadRequests");
Expand All @@ -45,6 +54,9 @@ public QosMetrics() {
bytesWritten = metricsManager.registerMeter(QosMetrics.class, "bytesWritten");
writeTime = metricsManager.registerMeter(QosMetrics.class, "writeTime");
rowsWritten = metricsManager.registerMeter(QosMetrics.class, "rowsWritten");

backoffTime = metricsManager.registerMeter(QosMetrics.class, "backoffTime");
rateLimitedExceptions = metricsManager.registerMeter(QosMetrics.class, "rateLimitedExceptions");
}

public void recordRead(QueryWeight weight) {
Expand All @@ -61,4 +73,17 @@ public void recordWrite(QueryWeight weight) {
rowsWritten.mark(weight.numDistinctRows());
}

public void recordBackoffMicros(long backoffTimeMicros) {
if (backoffTimeMicros > 0) {
log.info("Backing off for {} micros", SafeArg.of("backoffTimeMicros", backoffTimeMicros));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be "Backed off" since this gets logged after sleeping

backoffTime.mark(backoffTimeMicros);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could make strange graphs if we only mark when backing off.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'd look at the derivative or m1/m5 rates for this graph (it's basically a indicator of whether or not you're currently getting rate limited, and by how much)

}
}

public void recordRateLimitedException() {
log.info("Rate limit exceeded and backoff time would be more than the configured maximum. "
+ "Throwing a throttling exception");
rateLimitedExceptions.mark();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;
import com.palantir.atlasdb.qos.ratelimit.guava.RateLimiter;
import com.palantir.atlasdb.qos.ratelimit.guava.SmoothRateLimiter;
import com.palantir.logsafe.SafeArg;

/**
* A rate limiter for database queries, based on "units" of expense. This limiter strives to maintain an upper limit on
Expand All @@ -35,31 +40,30 @@
*/
public class QosRateLimiter {

private static final double MAX_BURST_SECONDS = 5;
private static final double UNLIMITED_RATE = Double.MAX_VALUE;
private static final Logger log = LoggerFactory.getLogger(QosRateLimiter.class);

private static final long MAX_BURST_SECONDS = 5;

private final Supplier<Long> maxBackoffTimeMillis;
private final Supplier<Long> unitsPerSecond;
private final RateLimiter.SleepingStopwatch stopwatch;

private final long maxBackoffTimeMillis;
private RateLimiter rateLimiter;
private volatile RateLimiter rateLimiter;
private volatile long currentRate;

public static QosRateLimiter create(long maxBackoffTimeMillis) {
return new QosRateLimiter(RateLimiter.SleepingStopwatch.createFromSystemTimer(), maxBackoffTimeMillis);
public static QosRateLimiter create(Supplier<Long> maxBackoffTimeMillis, Supplier<Long> unitsPerSecond) {
return new QosRateLimiter(RateLimiter.SleepingStopwatch.createFromSystemTimer(), maxBackoffTimeMillis,
unitsPerSecond);
}

@VisibleForTesting
QosRateLimiter(RateLimiter.SleepingStopwatch stopwatch, long maxBackoffTimeMillis) {
rateLimiter = new SmoothRateLimiter.SmoothBursty(
stopwatch,
MAX_BURST_SECONDS);

rateLimiter.setRate(UNLIMITED_RATE);
QosRateLimiter(RateLimiter.SleepingStopwatch stopwatch, Supplier<Long> maxBackoffTimeMillis,
Supplier<Long> unitsPerSecond) {
this.stopwatch = stopwatch;
this.unitsPerSecond = unitsPerSecond;
this.maxBackoffTimeMillis = maxBackoffTimeMillis;
}

/**
* Update the allowed rate, in units per second.
*/
public void updateRate(double unitsPerSecond) {
rateLimiter.setRate(unitsPerSecond);
createRateLimiterAtomically();
}

/**
Expand All @@ -69,9 +73,11 @@ public void updateRate(double unitsPerSecond) {
* @return the amount of time slept for, if any
*/
public Duration consumeWithBackoff(long estimatedNumUnits) {
updateRateIfNeeded();

Optional<Duration> waitTime = rateLimiter.tryAcquire(
Ints.saturatedCast(estimatedNumUnits), // TODO(nziebart): deal with longs
maxBackoffTimeMillis,
maxBackoffTimeMillis.get(),
TimeUnit.MILLISECONDS);

if (!waitTime.isPresent()) {
Expand All @@ -81,6 +87,31 @@ public Duration consumeWithBackoff(long estimatedNumUnits) {
return waitTime.get();
}

/**
* The RateLimiter's rate requires a lock acquisition to read, and is returned as a double. To avoid
* overhead and double comparisons, we maintain the current rate ourselves.
*/
private void updateRateIfNeeded() {
if (currentRate != unitsPerSecond.get()) {
createRateLimiterAtomically();
}
}

/**
* Guava's RateLimiter has strange behavior around updating the rate. Namely, if you set the rate very small and ask
* for a large number of permits, you will end up having to wait until that small rate is satisfied before acquiring
* more, even if you update the rate to something very large. So, we just create a new rate limiter if the rate
* changes.
*/
private synchronized void createRateLimiterAtomically() {
currentRate = unitsPerSecond.get();
rateLimiter = new SmoothRateLimiter.SmoothBursty(stopwatch, MAX_BURST_SECONDS);
rateLimiter.setRate(currentRate);

// TODO(nziebart): distinguish between read/write rate limiters
log.info("Units per second set to {}", SafeArg.of("unitsPerSecond", currentRate));
}

/**
* Records an adjustment to the original estimate of units consumed passed to {@link #consumeWithBackoff}. This
* should be called after a query returns, when the exact number of units consumed is known. This value may be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,21 @@

package com.palantir.atlasdb.qos.ratelimit;

import java.util.function.Supplier;

import org.immutables.value.Value;

import com.palantir.atlasdb.qos.config.QosLimitsConfig;

@Value.Immutable
public interface QosRateLimiters {

static QosRateLimiters create(QosLimitsConfig config, long maxBackoffSleepTimeMillis) {
QosRateLimiter readLimiter = QosRateLimiter.create(maxBackoffSleepTimeMillis);
readLimiter.updateRate(config.readBytesPerSecond());
static QosRateLimiters create(Supplier<Long> maxBackoffSleepTimeMillis, Supplier<QosLimitsConfig> config) {
QosRateLimiter readLimiter = QosRateLimiter.create(maxBackoffSleepTimeMillis,
() -> config.get().readBytesPerSecond());

QosRateLimiter writeLimiter = QosRateLimiter.create(maxBackoffSleepTimeMillis);
writeLimiter.updateRate(config.writeBytesPerSecond());
QosRateLimiter writeLimiter = QosRateLimiter.create(maxBackoffSleepTimeMillis,
() -> config.get().writeBytesPerSecond());

return ImmutableQosRateLimiters.builder()
.read(readLimiter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import java.time.Duration;

import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -76,6 +78,8 @@ public void setUp() {
when(weigher.weighSuccess(any(), anyLong())).thenReturn(ACTUAL_WEIGHT);
when(weigher.weighFailure(any(), anyLong())).thenReturn(ACTUAL_WEIGHT);

when(readLimiter.consumeWithBackoff(anyLong())).thenReturn(Duration.ZERO);
when(writeLimiter.consumeWithBackoff(anyLong())).thenReturn(Duration.ZERO);
}

@Test
Expand All @@ -92,7 +96,6 @@ public void recordsReadMetrics() throws TestCheckedException {
qosClient.executeRead(() -> "foo", weigher);

verify(metrics).recordRead(ACTUAL_WEIGHT);
verifyNoMoreInteractions(metrics);
}

@Test
Expand All @@ -116,7 +119,6 @@ public void recordsWriteMetrics() throws TestCheckedException {
qosClient.executeWrite(() -> null, weigher);

verify(metrics).recordWrite(ACTUAL_WEIGHT);
verifyNoMoreInteractions(metrics);
}

@Test
Expand All @@ -127,7 +129,6 @@ public void recordsReadMetricsOnFailure() throws TestCheckedException {
}, weigher)).isInstanceOf(TestCheckedException.class);

verify(metrics).recordRead(ACTUAL_WEIGHT);
verifyNoMoreInteractions(metrics);
}

@Test
Expand All @@ -138,7 +139,6 @@ public void recordsWriteMetricsOnFailure() throws TestCheckedException {
}, weigher)).isInstanceOf(TestCheckedException.class);

verify(metrics).recordWrite(ACTUAL_WEIGHT);
verifyNoMoreInteractions(metrics);
}

@Test
Expand All @@ -163,5 +163,22 @@ public void propagatesCheckedExceptions() throws TestCheckedException {
}, weigher)).isInstanceOf(TestCheckedException.class);
}

@Test
public void recordsBackoffTime() {
when(readLimiter.consumeWithBackoff(anyLong())).thenReturn(Duration.ofMillis(1_100));
qosClient.executeRead(() -> "foo", weigher);

verify(metrics).recordBackoffMicros(1_100_000);
}

@Test
public void recordsBackoffExceptions() {
// TODO(nziebart): use rate limited exception here
when(readLimiter.consumeWithBackoff(anyLong())).thenThrow(new RuntimeException("rate limited"));
assertThatThrownBy(() -> qosClient.executeRead(() -> "foo", weigher)).isInstanceOf(RuntimeException.class);

verify(metrics).recordRateLimitedException();
}

static class TestCheckedException extends Exception {}
}
Loading