From 2318f6930e4e52dab96e9991012011ada45de0e9 Mon Sep 17 00:00:00 2001 From: Nathan Ziebart Date: Wed, 15 Nov 2017 21:30:07 +0000 Subject: [PATCH 1/3] qos config --- .../atlasdb/config/AtlasDbRuntimeConfig.java | 7 +- .../atlasdb/factory/TransactionManagers.java | 30 ++------ .../factory/TransactionManagersTest.java | 3 +- .../qos/QosServiceIntegrationTest.java | 4 +- .../atlasdb/qos/client/AtlasDbQosClient.java | 18 ++--- .../atlasdb/qos/config/QosClientConfig.java | 47 +++++++++++++ .../atlasdb/qos/config/QosLimitsConfig.java | 41 +++++++++++ .../atlasdb/qos/ratelimit/QosRateLimiter.java | 2 +- .../qos/ratelimit/QosRateLimiters.java | 43 ++++++++++++ .../QosClientConfigDeserializationTest.java | 68 +++++++++++++++++++ .../QosRuntimeConfigDeserializationTest.java | 2 +- .../{ => client}/AtlasDbQosClientTest.java | 24 ++++--- .../src/test/resources/qos-client.yml | 10 +++ .../resources/{qos.yml => qos-server.yml} | 0 14 files changed, 250 insertions(+), 49 deletions(-) create mode 100644 qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosClientConfig.java create mode 100644 qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosLimitsConfig.java create mode 100644 qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiters.java create mode 100644 qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosClientConfigDeserializationTest.java rename qos-service-impl/src/test/java/com/palantir/atlasdb/qos/{ => client}/AtlasDbQosClientTest.java (79%) create mode 100644 qos-service-impl/src/test/resources/qos-client.yml rename qos-service-impl/src/test/resources/{qos.yml => qos-server.yml} (100%) diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/config/AtlasDbRuntimeConfig.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/config/AtlasDbRuntimeConfig.java index 30ca96ce1cc..df5fbf01319 100644 --- a/atlasdb-config/src/main/java/com/palantir/atlasdb/config/AtlasDbRuntimeConfig.java +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/config/AtlasDbRuntimeConfig.java @@ -23,7 +23,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.palantir.atlasdb.AtlasDbConstants; -import com.palantir.remoting.api.config.service.ServiceConfiguration; +import com.palantir.atlasdb.qos.config.QosClientConfig; @JsonDeserialize(as = ImmutableAtlasDbRuntimeConfig.class) @JsonSerialize(as = ImmutableAtlasDbRuntimeConfig.class) @@ -61,7 +61,10 @@ public long getTimestampCacheSize() { return AtlasDbConstants.DEFAULT_TIMESTAMP_CACHE_SIZE; } - public abstract Optional getQosServiceConfiguration(); + @Value.Default + public QosClientConfig qos() { + return QosClientConfig.DEFAULT; + } /** * Runtime live-reloadable parameters for communicating with TimeLock. diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java index e57ee08c3d9..fd662a8fae3 100644 --- a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java @@ -18,8 +18,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Supplier; @@ -30,7 +28,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.InstrumentedScheduledExecutorService; import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.annotations.VisibleForTesting; @@ -72,11 +69,10 @@ import com.palantir.atlasdb.persistentlock.KvsBackedPersistentLockService; import com.palantir.atlasdb.persistentlock.NoOpPersistentLockService; import com.palantir.atlasdb.persistentlock.PersistentLockService; -import com.palantir.atlasdb.qos.FakeQosClient; import com.palantir.atlasdb.qos.QosClient; -import com.palantir.atlasdb.qos.QosService; import com.palantir.atlasdb.qos.client.AtlasDbQosClient; -import com.palantir.atlasdb.qos.ratelimit.QosRateLimiter; +import com.palantir.atlasdb.qos.config.QosClientConfig; +import com.palantir.atlasdb.qos.ratelimit.QosRateLimiters; import com.palantir.atlasdb.schema.generated.SweepTableFactory; import com.palantir.atlasdb.spi.AtlasDbFactory; import com.palantir.atlasdb.spi.KeyValueServiceConfig; @@ -117,9 +113,6 @@ import com.palantir.lock.impl.LockServiceImpl; import com.palantir.lock.v2.TimelockService; import com.palantir.logsafe.UnsafeArg; -import com.palantir.remoting.api.config.service.ServiceConfiguration; -import com.palantir.remoting3.clients.ClientConfigurations; -import com.palantir.remoting3.jaxrs.JaxRsClient; import com.palantir.timestamp.TimestampService; import com.palantir.timestamp.TimestampStoreInvalidator; import com.palantir.util.OptionalResolver; @@ -316,8 +309,7 @@ SerializableTransactionManager serializable() { java.util.function.Supplier runtimeConfigSupplier = () -> runtimeConfigSupplier().get().orElse(defaultRuntime); - - QosClient qosClient = getQosClient(runtimeConfigSupplier.get().getQosServiceConfiguration()); + QosClient qosClient = getQosClient(runtimeConfigSupplier.get().qos()); ServiceDiscoveringAtlasSupplier atlasFactory = new ServiceDiscoveringAtlasSupplier( @@ -411,20 +403,10 @@ SerializableTransactionManager serializable() { return transactionManager; } - private QosClient getQosClient(Optional serviceConfiguration) { - return serviceConfiguration.map(this::createAtlasDbQosClient).orElse(FakeQosClient.INSTANCE); - } - - private QosClient createAtlasDbQosClient(ServiceConfiguration serviceConfiguration) { - QosService qosService = JaxRsClient.create(QosService.class, - userAgent(), - ClientConfigurations.of(serviceConfiguration)); + private QosClient getQosClient(QosClientConfig config) { // TODO(nziebart): create a RefreshingRateLimiter - ScheduledExecutorService scheduler = new InstrumentedScheduledExecutorService( - Executors.newSingleThreadScheduledExecutor(), - AtlasDbMetrics.getMetricRegistry(), - "qos-client-executor"); - return AtlasDbQosClient.create(QosRateLimiter.create()); + QosRateLimiters rateLimiters = QosRateLimiters.create(config.limits()); + return AtlasDbQosClient.create(rateLimiters); } private static boolean areTransactionManagerInitializationPrerequisitesSatisfied( diff --git a/atlasdb-config/src/test/java/com/palantir/atlasdb/factory/TransactionManagersTest.java b/atlasdb-config/src/test/java/com/palantir/atlasdb/factory/TransactionManagersTest.java index 729b4f0e69f..5a6258b43bd 100644 --- a/atlasdb-config/src/test/java/com/palantir/atlasdb/factory/TransactionManagersTest.java +++ b/atlasdb-config/src/test/java/com/palantir/atlasdb/factory/TransactionManagersTest.java @@ -73,6 +73,7 @@ import com.palantir.atlasdb.config.TimeLockClientConfig; import com.palantir.atlasdb.factory.startup.TimeLockMigrator; import com.palantir.atlasdb.memory.InMemoryAtlasDbConfig; +import com.palantir.atlasdb.qos.config.QosClientConfig; import com.palantir.atlasdb.table.description.GenericTestSchema; import com.palantir.atlasdb.transaction.impl.SerializableTransactionManager; import com.palantir.atlasdb.util.MetricsRule; @@ -193,7 +194,7 @@ public void setup() throws JsonProcessingException { runtimeConfig = mock(AtlasDbRuntimeConfig.class); when(runtimeConfig.timestampClient()).thenReturn(ImmutableTimestampClientConfig.of(false)); - when(runtimeConfig.getQosServiceConfiguration()).thenReturn(Optional.empty()); + when(runtimeConfig.qos()).thenReturn(QosClientConfig.DEFAULT); when(runtimeConfig.timelockRuntime()).thenReturn(Optional.empty()); environment = mock(Consumer.class); diff --git a/qos-service-impl/src/integTest/java/com/palantir/atlasdb/qos/QosServiceIntegrationTest.java b/qos-service-impl/src/integTest/java/com/palantir/atlasdb/qos/QosServiceIntegrationTest.java index 51642b0c450..4d355fa2978 100644 --- a/qos-service-impl/src/integTest/java/com/palantir/atlasdb/qos/QosServiceIntegrationTest.java +++ b/qos-service-impl/src/integTest/java/com/palantir/atlasdb/qos/QosServiceIntegrationTest.java @@ -50,8 +50,8 @@ public class QosServiceIntegrationTest { @Test public void returnsConfiguredLimits() { - assertThat(service.getLimit("test")).isEqualTo(10L); - assertThat(service.getLimit("test2")).isEqualTo(20L); + assertThat(service.getLimit("test")).isEqualTo(10); + assertThat(service.getLimit("test2")).isEqualTo(20); } } diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/client/AtlasDbQosClient.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/client/AtlasDbQosClient.java index 997c75cce36..09dc7ca64fc 100644 --- a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/client/AtlasDbQosClient.java +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/client/AtlasDbQosClient.java @@ -26,24 +26,24 @@ import com.google.common.base.Ticker; import com.palantir.atlasdb.qos.QosClient; import com.palantir.atlasdb.qos.QosMetrics; -import com.palantir.atlasdb.qos.ratelimit.QosRateLimiter; +import com.palantir.atlasdb.qos.ratelimit.QosRateLimiters; public class AtlasDbQosClient implements QosClient { private static final Logger log = LoggerFactory.getLogger(AtlasDbQosClient.class); - private final QosRateLimiter rateLimiter; + private final QosRateLimiters rateLimiters; private final QosMetrics metrics; private final Ticker ticker; - public static AtlasDbQosClient create(QosRateLimiter rateLimiter) { - return new AtlasDbQosClient(rateLimiter, new QosMetrics(), Ticker.systemTicker()); + public static AtlasDbQosClient create(QosRateLimiters rateLimiters) { + return new AtlasDbQosClient(rateLimiters, new QosMetrics(), Ticker.systemTicker()); } @VisibleForTesting - AtlasDbQosClient(QosRateLimiter rateLimiter, QosMetrics metrics, Ticker ticker) { + AtlasDbQosClient(QosRateLimiters rateLimiters, QosMetrics metrics, Ticker ticker) { this.metrics = metrics; - this.rateLimiter = rateLimiter; + this.rateLimiters = rateLimiters; this.ticker = ticker; } @@ -53,7 +53,7 @@ public T executeRead( ReadQuery query, Function weigher) throws E { int estimatedWeight = getWeight(estimatedWeigher, 1); - rateLimiter.consumeWithBackoff(estimatedWeight); + rateLimiters.read().consumeWithBackoff(estimatedWeight); // TODO(nziebart): decide what to do if we encounter a timeout exception long startTimeNanos = ticker.read(); @@ -64,7 +64,7 @@ public T executeRead( metrics.updateReadCount(); metrics.updateBytesRead(actualWeight); metrics.updateReadTimeMicros(TimeUnit.NANOSECONDS.toMicros(totalTimeNanos)); - rateLimiter.recordAdjustment(actualWeight - estimatedWeight); + rateLimiters.read().recordAdjustment(actualWeight - estimatedWeight); return result; } @@ -72,7 +72,7 @@ public T executeRead( @Override public void executeWrite(Supplier weigher, WriteQuery query) throws E { int weight = getWeight(weigher, 1); - rateLimiter.consumeWithBackoff(weight); + rateLimiters.write().consumeWithBackoff(weight); // TODO(nziebart): decide what to do if we encounter a timeout exception long startTimeNanos = ticker.read(); diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosClientConfig.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosClientConfig.java new file mode 100644 index 00000000000..313a02f3d7d --- /dev/null +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosClientConfig.java @@ -0,0 +1,47 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.qos.config; + +import java.util.Optional; + +import org.immutables.value.Value; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.palantir.remoting.api.config.service.HumanReadableDuration; +import com.palantir.remoting.api.config.service.ServiceConfiguration; + +@Value.Immutable +@JsonDeserialize(as = ImmutableQosClientConfig.class) +@JsonSerialize(as = ImmutableQosClientConfig.class) +public abstract class QosClientConfig { + + public static final QosClientConfig DEFAULT = ImmutableQosClientConfig.builder().build(); + + public abstract Optional qosService(); + + @Value.Default + public HumanReadableDuration maxBackoffSleepTime() { + return HumanReadableDuration.seconds(10); + } + + @Value.Default + public QosLimitsConfig limits() { + return QosLimitsConfig.DEFAULT_NO_LIMITS; + } + +} diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosLimitsConfig.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosLimitsConfig.java new file mode 100644 index 00000000000..e932632ddaa --- /dev/null +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosLimitsConfig.java @@ -0,0 +1,41 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.qos.config; + +import org.immutables.value.Value; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +@Value.Immutable +@JsonDeserialize(as = ImmutableQosLimitsConfig.class) +@JsonSerialize(as = ImmutableQosLimitsConfig.class) +public abstract class QosLimitsConfig { + + public static final QosLimitsConfig DEFAULT_NO_LIMITS = ImmutableQosLimitsConfig.builder().build(); + + @Value.Default + public int readBytesPerSecond() { + return Integer.MAX_VALUE; + } + + @Value.Default + public int writeBytesPerSecond() { + return Integer.MAX_VALUE; + } + +} diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiter.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiter.java index 4112e03d9e1..4ea8fc5e227 100644 --- a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiter.java +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiter.java @@ -80,7 +80,7 @@ public Duration consumeWithBackoff(int estimatedNumUnits) { /** * Records an adjustment to the original estimate of units consumed passed to {@link #consumeWithBackoff(int)}. This * should be called after a query returns, when the exact number of units consumed is known. This value may be - * positive (if the original estimate was too small) or negative (if the original estimate was too large. + * positive (if the original estimate was too small) or negative (if the original estimate was too large). */ public void recordAdjustment(int adjustmentUnits) { if (adjustmentUnits > 0) { diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiters.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiters.java new file mode 100644 index 00000000000..503b5b79d39 --- /dev/null +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiters.java @@ -0,0 +1,43 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.qos.ratelimit; + +import org.immutables.value.Value; + +import com.palantir.atlasdb.qos.config.QosLimitsConfig; + +@Value.Immutable +public interface QosRateLimiters { + + static QosRateLimiters create(QosLimitsConfig config) { + QosRateLimiter readLimiter = QosRateLimiter.create(); + readLimiter.updateRate(config.readBytesPerSecond()); + + QosRateLimiter writeLimiter = QosRateLimiter.create(); + writeLimiter.updateRate(config.writeBytesPerSecond()); + + return ImmutableQosRateLimiters.builder() + .read(readLimiter) + .write(writeLimiter) + .build(); + } + + QosRateLimiter read(); + + QosRateLimiter write(); + +} diff --git a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosClientConfigDeserializationTest.java b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosClientConfigDeserializationTest.java new file mode 100644 index 00000000000..1edfe1e9b16 --- /dev/null +++ b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosClientConfigDeserializationTest.java @@ -0,0 +1,68 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.qos; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; + +import org.junit.Test; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.datatype.guava.GuavaModule; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import com.palantir.atlasdb.qos.config.ImmutableQosClientConfig; +import com.palantir.atlasdb.qos.config.ImmutableQosLimitsConfig; +import com.palantir.atlasdb.qos.config.QosClientConfig; +import com.palantir.atlasdb.qos.config.QosServiceRuntimeConfig; +import com.palantir.remoting.api.config.service.HumanReadableDuration; +import com.palantir.remoting.api.config.service.ServiceConfiguration; +import com.palantir.remoting.api.config.ssl.SslConfiguration; +import com.palantir.remoting3.ext.jackson.ShimJdk7Module; + +public class QosClientConfigDeserializationTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new YAMLFactory()) + .registerModule(new GuavaModule()) + .registerModule(new ShimJdk7Module()) + .registerModule(new Jdk8Module()); + + @Test + public void canDeserializeFromYaml() throws IOException { + QosClientConfig expected = ImmutableQosClientConfig.builder() + .qosService( + ServiceConfiguration.builder() + .addUris("http://localhost:8080") + .security(SslConfiguration.of(Paths.get("trustStore.jks"))) + .build()) + .maxBackoffSleepTime(HumanReadableDuration.seconds(20)) + .limits(ImmutableQosLimitsConfig.builder() + .readBytesPerSecond(123) + .writeBytesPerSecond(456) + .build()) + .build(); + + File configFile = new File(QosServiceRuntimeConfig.class.getResource("/qos-client.yml").getPath()); + QosClientConfig config = OBJECT_MAPPER.readValue(configFile, QosClientConfig.class); + + assertThat(config).isEqualTo(expected); + } + +} diff --git a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosRuntimeConfigDeserializationTest.java b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosRuntimeConfigDeserializationTest.java index 2b7bb0449ae..55a662bb1cb 100644 --- a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosRuntimeConfigDeserializationTest.java +++ b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosRuntimeConfigDeserializationTest.java @@ -37,7 +37,7 @@ public class QosRuntimeConfigDeserializationTest { @Test public void canDeserializeQosServerConfiguration() throws IOException { - File testConfigFile = new File(QosServiceRuntimeConfig.class.getResource("/qos.yml").getPath()); + File testConfigFile = new File(QosServiceRuntimeConfig.class.getResource("/qos-server.yml").getPath()); QosServiceRuntimeConfig configuration = OBJECT_MAPPER.readValue(testConfigFile, QosServiceRuntimeConfig.class); assertThat(configuration).isEqualTo(ImmutableQosServiceRuntimeConfig.builder() diff --git a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/AtlasDbQosClientTest.java b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/client/AtlasDbQosClientTest.java similarity index 79% rename from qos-service-impl/src/test/java/com/palantir/atlasdb/qos/AtlasDbQosClientTest.java rename to qos-service-impl/src/test/java/com/palantir/atlasdb/qos/client/AtlasDbQosClientTest.java index f6f1ff61c74..27a8ddbda4a 100644 --- a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/AtlasDbQosClientTest.java +++ b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/client/AtlasDbQosClientTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.palantir.atlasdb.qos; +package com.palantir.atlasdb.qos.client; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.mock; @@ -26,8 +26,11 @@ import org.junit.Test; import com.google.common.base.Ticker; -import com.palantir.atlasdb.qos.client.AtlasDbQosClient; +import com.palantir.atlasdb.qos.QosMetrics; +import com.palantir.atlasdb.qos.QosService; +import com.palantir.atlasdb.qos.ratelimit.ImmutableQosRateLimiters; import com.palantir.atlasdb.qos.ratelimit.QosRateLimiter; +import com.palantir.atlasdb.qos.ratelimit.QosRateLimiters; public class AtlasDbQosClientTest { @@ -38,11 +41,14 @@ public class AtlasDbQosClientTest { private static final long TOTAL_TIME_MICROS = 4; private QosService qosService = mock(QosService.class); - private QosRateLimiter rateLimiter = mock(QosRateLimiter.class); + private QosRateLimiter readLimiter = mock(QosRateLimiter.class); + private QosRateLimiter writeLimiter = mock(QosRateLimiter.class); + private QosRateLimiters rateLimiters = ImmutableQosRateLimiters.builder() + .read(readLimiter).write(writeLimiter).build(); private QosMetrics metrics = mock(QosMetrics.class); private Ticker ticker = mock(Ticker.class); - private AtlasDbQosClient qosClient = new AtlasDbQosClient(rateLimiter, metrics, ticker); + private AtlasDbQosClient qosClient = new AtlasDbQosClient(rateLimiters, metrics, ticker); @Before public void setUp() { @@ -55,9 +61,9 @@ public void setUp() { public void consumesSpecifiedNumUnitsForReads() { qosClient.executeRead(() -> ESTIMATED_BYTES, () -> "foo", ignored -> ACTUAL_BYTES); - verify(rateLimiter).consumeWithBackoff(ESTIMATED_BYTES); - verify(rateLimiter).recordAdjustment(ACTUAL_BYTES - ESTIMATED_BYTES); - verifyNoMoreInteractions(rateLimiter); + verify(readLimiter).consumeWithBackoff(ESTIMATED_BYTES); + verify(readLimiter).recordAdjustment(ACTUAL_BYTES - ESTIMATED_BYTES); + verifyNoMoreInteractions(readLimiter, writeLimiter); } @Test @@ -74,8 +80,8 @@ public void recordsReadMetrics() throws TestCheckedException { public void consumesSpecifiedNumUnitsForWrites() { qosClient.executeWrite(() -> ACTUAL_BYTES, () -> { }); - verify(rateLimiter).consumeWithBackoff(ACTUAL_BYTES); - verifyNoMoreInteractions(rateLimiter); + verify(writeLimiter).consumeWithBackoff(ACTUAL_BYTES); + verifyNoMoreInteractions(readLimiter, writeLimiter); } @Test diff --git a/qos-service-impl/src/test/resources/qos-client.yml b/qos-service-impl/src/test/resources/qos-client.yml new file mode 100644 index 00000000000..163da250c04 --- /dev/null +++ b/qos-service-impl/src/test/resources/qos-client.yml @@ -0,0 +1,10 @@ +qosService: + uris: + - http://localhost:8080 + security: + trustStorePath: trustStore.jks +maxBackoffSleepTime: 20 seconds +limits: + readBytesPerSecond: 123 + writeBytesPerSecond: 456 + diff --git a/qos-service-impl/src/test/resources/qos.yml b/qos-service-impl/src/test/resources/qos-server.yml similarity index 100% rename from qos-service-impl/src/test/resources/qos.yml rename to qos-service-impl/src/test/resources/qos-server.yml From c7dff29470a9247a61a1ed952f153c7f84dae3e4 Mon Sep 17 00:00:00 2001 From: Nathan Ziebart Date: Wed, 15 Nov 2017 21:38:02 +0000 Subject: [PATCH 2/3] respect max backoff itme --- .../atlasdb/factory/TransactionManagers.java | 4 +++- .../atlasdb/qos/ratelimit/QosRateLimiter.java | 13 +++++++------ .../atlasdb/qos/ratelimit/QosRateLimiters.java | 6 +++--- .../atlasdb/qos/ratelimit/QosRateLimiterTest.java | 12 +++++++++++- 4 files changed, 24 insertions(+), 11 deletions(-) diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java index fd662a8fae3..cf1450fb8eb 100644 --- a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java @@ -405,7 +405,9 @@ SerializableTransactionManager serializable() { private QosClient getQosClient(QosClientConfig config) { // TODO(nziebart): create a RefreshingRateLimiter - QosRateLimiters rateLimiters = QosRateLimiters.create(config.limits()); + QosRateLimiters rateLimiters = QosRateLimiters.create( + config.limits(), + config.maxBackoffSleepTime().toMilliseconds()); return AtlasDbQosClient.create(rateLimiters); } diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiter.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiter.java index 4ea8fc5e227..7bb5773349b 100644 --- a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiter.java +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiter.java @@ -34,21 +34,22 @@ public class QosRateLimiter { private static final double MAX_BURST_SECONDS = 5; private static final double UNLIMITED_RATE = Double.MAX_VALUE; - private static final int MAX_WAIT_TIME_SECONDS = 10; + private final long maxBackoffTimeMillis; private RateLimiter rateLimiter; - public static QosRateLimiter create() { - return new QosRateLimiter(RateLimiter.SleepingStopwatch.createFromSystemTimer()); + public static QosRateLimiter create(long maxBackoffTimeMillis) { + return new QosRateLimiter(RateLimiter.SleepingStopwatch.createFromSystemTimer(), maxBackoffTimeMillis); } @VisibleForTesting - QosRateLimiter(RateLimiter.SleepingStopwatch stopwatch) { + QosRateLimiter(RateLimiter.SleepingStopwatch stopwatch, long maxBackoffTimeMillis) { rateLimiter = new SmoothRateLimiter.SmoothBursty( stopwatch, MAX_BURST_SECONDS); rateLimiter.setRate(UNLIMITED_RATE); + this.maxBackoffTimeMillis = maxBackoffTimeMillis; } /** @@ -67,8 +68,8 @@ public void updateRate(int unitsPerSecond) { public Duration consumeWithBackoff(int estimatedNumUnits) { Optional waitTime = rateLimiter.tryAcquire( estimatedNumUnits, - MAX_WAIT_TIME_SECONDS, - TimeUnit.SECONDS); + maxBackoffTimeMillis, + TimeUnit.MILLISECONDS); if (!waitTime.isPresent()) { throw new RuntimeException("rate limited"); diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiters.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiters.java index 503b5b79d39..a60696dc649 100644 --- a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiters.java +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiters.java @@ -23,11 +23,11 @@ @Value.Immutable public interface QosRateLimiters { - static QosRateLimiters create(QosLimitsConfig config) { - QosRateLimiter readLimiter = QosRateLimiter.create(); + static QosRateLimiters create(QosLimitsConfig config, long maxBackoffSleepTimeMillis) { + QosRateLimiter readLimiter = QosRateLimiter.create(maxBackoffSleepTimeMillis); readLimiter.updateRate(config.readBytesPerSecond()); - QosRateLimiter writeLimiter = QosRateLimiter.create(); + QosRateLimiter writeLimiter = QosRateLimiter.create(maxBackoffSleepTimeMillis); writeLimiter.updateRate(config.writeBytesPerSecond()); return ImmutableQosRateLimiters.builder() diff --git a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiterTest.java b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiterTest.java index 8d88ad58b21..c68278e43d3 100644 --- a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiterTest.java +++ b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiterTest.java @@ -30,9 +30,10 @@ public class QosRateLimiterTest { private static final long START_TIME_MICROS = 0L; + private static final long MAX_BACKOFF_TIME_MILLIS = 10_000; RateLimiter.SleepingStopwatch stopwatch = mock(RateLimiter.SleepingStopwatch.class); - QosRateLimiter limiter = new QosRateLimiter(stopwatch); + QosRateLimiter limiter = new QosRateLimiter(stopwatch, MAX_BACKOFF_TIME_MILLIS); @Before public void before() { @@ -63,6 +64,15 @@ public void limitsByThrowingIfSleepTimeIsTooGreat() { .hasMessageContaining("rate limited"); } + @Test + public void doesNotThrowIfMaxBackoffTimeIsVeryLarge() { + QosRateLimiter limiterWithLargeBackoffLimit = new QosRateLimiter(stopwatch, Long.MAX_VALUE); + limiterWithLargeBackoffLimit.updateRate(10); + + limiterWithLargeBackoffLimit.consumeWithBackoff(1_000_000_000); + limiterWithLargeBackoffLimit.consumeWithBackoff(1_000_000_000); + } + @Test public void consumingAdditionalUnitsPenalizesFutureCallers() { limiter.updateRate(10); From bde9a88eb94463b64c4165064248a65f00ffad52 Mon Sep 17 00:00:00 2001 From: nziebart Date: Fri, 17 Nov 2017 16:55:56 +0000 Subject: [PATCH 3/3] [QoS] [Refactor] Query Weights (#2697) * query weights * extra tests * [QoS] Number of rows per query (#2698) * num rows * checkstyle * fix tests * no int casting * fix numRows calculation on batch_mutate --- .../cassandra/CassandraClientFactory.java | 1 + .../{ => qos}/QosCassandraClient.java | 48 ++----- .../{ => qos}/ThriftObjectSizeUtils.java | 79 ++++++----- .../cassandra/qos/ThriftQueryWeighers.java | 123 ++++++++++++++++++ .../atlasdb/ThriftObjectSizeUtilsTest.java | 64 ++++++++- .../cassandra/QosCassandraClientTest.java | 7 +- .../qos/ThriftQueryWeighersTest.java | 94 +++++++++++++ qos-service-api/build.gradle | 3 + .../palantir/atlasdb/qos/FakeQosClient.java | 11 +- .../com/palantir/atlasdb/qos/QosClient.java | 15 ++- .../com/palantir/atlasdb/qos/QosService.java | 2 +- .../com/palantir/atlasdb/qos/QueryWeight.java | 37 ++++++ .../com/palantir/atlasdb/qos/QosResource.java | 4 +- .../atlasdb/qos/client/AtlasDbQosClient.java | 48 +++---- .../qos/config/QosServiceRuntimeConfig.java | 2 +- .../atlasdb/qos/{ => metrics}/QosMetrics.java | 34 +++-- .../atlasdb/qos/ratelimit/QosRateLimiter.java | 13 +- .../QosRuntimeConfigDeserializationTest.java | 2 +- .../qos/QosServiceRuntimeConfigTest.java | 4 +- .../palantir/atlasdb/qos/QosServiceTest.java | 12 +- .../qos/client/AtlasDbQosClientTest.java | 66 ++++++---- 21 files changed, 489 insertions(+), 180 deletions(-) rename atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/{ => qos}/QosCassandraClient.java (70%) rename atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/{ => qos}/ThriftObjectSizeUtils.java (66%) create mode 100644 atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftQueryWeighers.java create mode 100644 atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftQueryWeighersTest.java create mode 100644 qos-service-api/src/main/java/com/palantir/atlasdb/qos/QueryWeight.java rename qos-service-impl/src/main/java/com/palantir/atlasdb/qos/{ => metrics}/QosMetrics.java (69%) diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientFactory.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientFactory.java index 3d8a3a5e7d8..a29facdd13a 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientFactory.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraClientFactory.java @@ -46,6 +46,7 @@ import com.google.common.collect.Maps; import com.palantir.atlasdb.cassandra.CassandraCredentialsConfig; import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig; +import com.palantir.atlasdb.keyvalue.cassandra.qos.QosCassandraClient; import com.palantir.atlasdb.qos.QosClient; import com.palantir.atlasdb.util.AtlasDbMetrics; import com.palantir.common.exception.AtlasDbDependencyException; diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/QosCassandraClient.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/QosCassandraClient.java similarity index 70% rename from atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/QosCassandraClient.java rename to atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/QosCassandraClient.java index 18d60ca8f13..be0a912be7f 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/QosCassandraClient.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/QosCassandraClient.java @@ -14,13 +14,11 @@ * limitations under the License. */ -package com.palantir.atlasdb.keyvalue.cassandra; +package com.palantir.atlasdb.keyvalue.cassandra.qos; import java.nio.ByteBuffer; -import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.function.Function; import org.apache.cassandra.thrift.CASResult; import org.apache.cassandra.thrift.Cassandra; @@ -43,13 +41,13 @@ import org.slf4j.LoggerFactory; import com.palantir.atlasdb.keyvalue.api.TableReference; +import com.palantir.atlasdb.keyvalue.cassandra.CassandraClient; +import com.palantir.atlasdb.keyvalue.cassandra.CqlQuery; import com.palantir.atlasdb.qos.QosClient; @SuppressWarnings({"all"}) // thrift variable names. public class QosCassandraClient implements CassandraClient { - private static final int DEFAULT_ESTIMATED_READ_BYTES = 100; - private static final Logger log = LoggerFactory.getLogger(CassandraClient.class); private final CassandraClient client; @@ -70,17 +68,8 @@ public Map> multiget_slice(String kvsMetho List keys, SlicePredicate predicate, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException { return qosClient.executeRead( - () -> DEFAULT_ESTIMATED_READ_BYTES, - () -> client.multiget_slice(kvsMethodName, tableRef, keys, - predicate, consistency_level), - this::getApproximateReadByteCount); - } - - private int getApproximateReadByteCount(Map> result) { - return getCollectionSize(result.entrySet(), - rowResult -> ThriftObjectSizeUtils.getByteBufferSize(rowResult.getKey()) - + getCollectionSize(rowResult.getValue(), - ThriftObjectSizeUtils::getColumnOrSuperColumnSize)); + () -> client.multiget_slice(kvsMethodName, tableRef, keys, predicate, consistency_level), + ThriftQueryWeighers.MULTIGET_SLICE); } @Override @@ -88,9 +77,8 @@ public List get_range_slices(String kvsMethodName, TableReference tabl KeyRange range, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException { return qosClient.executeRead( - () -> DEFAULT_ESTIMATED_READ_BYTES, () -> client.get_range_slices(kvsMethodName, tableRef, predicate, range, consistency_level), - result -> getCollectionSize(result, ThriftObjectSizeUtils::getKeySliceSize)); + ThriftQueryWeighers.GET_RANGE_SLICES); } @Override @@ -98,17 +86,8 @@ public void batch_mutate(String kvsMethodName, Map getApproximateWriteByteCount(mutation_map), - () -> client.batch_mutate(kvsMethodName, mutation_map, consistency_level)); - } - - private int getApproximateWriteByteCount(Map>> batchMutateMap) { - int approxBytesForKeys = getCollectionSize(batchMutateMap.keySet(), ThriftObjectSizeUtils::getByteBufferSize); - int approxBytesForValues = getCollectionSize(batchMutateMap.values(), currentMap -> - getCollectionSize(currentMap.keySet(), ThriftObjectSizeUtils::getStringSize) - + getCollectionSize(currentMap.values(), - mutations -> getCollectionSize(mutations, ThriftObjectSizeUtils::getMutationSize))); - return approxBytesForKeys + approxBytesForValues; + () -> client.batch_mutate(kvsMethodName, mutation_map, consistency_level), + ThriftQueryWeighers.batchMutate(mutation_map)); } @Override @@ -116,9 +95,8 @@ public ColumnOrSuperColumn get(TableReference tableReference, ByteBuffer key, by ConsistencyLevel consistency_level) throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException, TException { return qosClient.executeRead( - () -> DEFAULT_ESTIMATED_READ_BYTES, () -> client.get(tableReference, key, column, consistency_level), - ThriftObjectSizeUtils::getColumnOrSuperColumnSize); + ThriftQueryWeighers.GET); } @Override @@ -134,14 +112,10 @@ public CASResult cas(TableReference tableReference, ByteBuffer key, List public CqlResult execute_cql3_query(CqlQuery cqlQuery, Compression compression, ConsistencyLevel consistency) throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException { - return qosClient.executeRead( - () -> DEFAULT_ESTIMATED_READ_BYTES, () -> client.execute_cql3_query(cqlQuery, compression, consistency), - ThriftObjectSizeUtils::getCqlResultSize); + ThriftQueryWeighers.EXECUTE_CQL3_QUERY); } - private int getCollectionSize(Collection collection, Function singleObjectSizeFunction) { - return ThriftObjectSizeUtils.getCollectionSize(collection, singleObjectSizeFunction); - } + } diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/ThriftObjectSizeUtils.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftObjectSizeUtils.java similarity index 66% rename from atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/ThriftObjectSizeUtils.java rename to atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftObjectSizeUtils.java index f5f95e0a2a0..42afbd151a6 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/ThriftObjectSizeUtils.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftObjectSizeUtils.java @@ -14,10 +14,11 @@ * limitations under the License. */ -package com.palantir.atlasdb.keyvalue.cassandra; +package com.palantir.atlasdb.keyvalue.cassandra.qos; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.function.Function; @@ -37,13 +38,33 @@ public final class ThriftObjectSizeUtils { - private static final int ONE_BYTE = 1; + private static final long ONE_BYTE = 1; private ThriftObjectSizeUtils() { // utility class } - public static int getColumnOrSuperColumnSize(ColumnOrSuperColumn columnOrSuperColumn) { + public static long getApproximateWriteByteCount(Map>> batchMutateMap) { + long approxBytesForKeys = getCollectionSize(batchMutateMap.keySet(), ThriftObjectSizeUtils::getByteBufferSize); + long approxBytesForValues = getCollectionSize(batchMutateMap.values(), + currentMap -> getCollectionSize(currentMap.keySet(), ThriftObjectSizeUtils::getStringSize) + + getCollectionSize(currentMap.values(), + mutations -> getCollectionSize(mutations, ThriftObjectSizeUtils::getMutationSize))); + return approxBytesForKeys + approxBytesForValues; + } + + public static long getApproximateReadByteCount(Map> result) { + return getCollectionSize(result.entrySet(), + rowResult -> ThriftObjectSizeUtils.getByteBufferSize(rowResult.getKey()) + + getCollectionSize(rowResult.getValue(), + ThriftObjectSizeUtils::getColumnOrSuperColumnSize)); + } + + public static long getApproximateReadByteCount(List slices) { + return getCollectionSize(slices, ThriftObjectSizeUtils::getKeySliceSize); + } + + public static long getColumnOrSuperColumnSize(ColumnOrSuperColumn columnOrSuperColumn) { if (columnOrSuperColumn == null) { return getNullSize(); } @@ -53,14 +74,14 @@ public static int getColumnOrSuperColumnSize(ColumnOrSuperColumn columnOrSuperCo + getCounterSuperColumnSize(columnOrSuperColumn.getCounter_super_column()); } - public static int getByteBufferSize(ByteBuffer byteBuffer) { + public static long getByteBufferSize(ByteBuffer byteBuffer) { if (byteBuffer == null) { return getNullSize(); } return byteBuffer.remaining(); } - public static int getMutationSize(Mutation mutation) { + public static long getMutationSize(Mutation mutation) { if (mutation == null) { return getNullSize(); } @@ -69,7 +90,7 @@ public static int getMutationSize(Mutation mutation) { mutation.getDeletion()); } - public static int getCqlResultSize(CqlResult cqlResult) { + public static long getCqlResultSize(CqlResult cqlResult) { if (cqlResult == null) { return getNullSize(); } @@ -79,7 +100,7 @@ public static int getCqlResultSize(CqlResult cqlResult) { + getCqlMetadataSize(cqlResult.getSchema()); } - public static int getKeySliceSize(KeySlice keySlice) { + public static long getKeySliceSize(KeySlice keySlice) { if (keySlice == null) { return getNullSize(); } @@ -88,15 +109,15 @@ public static int getKeySliceSize(KeySlice keySlice) { + getCollectionSize(keySlice.getColumns(), ThriftObjectSizeUtils::getColumnOrSuperColumnSize); } - public static int getStringSize(String string) { + public static long getStringSize(String string) { if (string == null) { return getNullSize(); } - return string.length() * Character.SIZE; + return string.length(); } - public static int getColumnSize(Column column) { + public static long getColumnSize(Column column) { if (column == null) { return getNullSize(); } @@ -107,7 +128,7 @@ public static int getColumnSize(Column column) { + getTimestampSize(); } - private static int getCounterSuperColumnSize(CounterSuperColumn counterSuperColumn) { + private static long getCounterSuperColumnSize(CounterSuperColumn counterSuperColumn) { if (counterSuperColumn == null) { return getNullSize(); } @@ -116,7 +137,7 @@ private static int getCounterSuperColumnSize(CounterSuperColumn counterSuperColu + getCollectionSize(counterSuperColumn.getColumns(), ThriftObjectSizeUtils::getCounterColumnSize); } - private static int getCounterColumnSize(CounterColumn counterColumn) { + private static long getCounterColumnSize(CounterColumn counterColumn) { if (counterColumn == null) { return getNullSize(); } @@ -124,7 +145,7 @@ private static int getCounterColumnSize(CounterColumn counterColumn) { return getByteArraySize(counterColumn.getName()) + getCounterValueSize(); } - private static int getSuperColumnSize(SuperColumn superColumn) { + private static long getSuperColumnSize(SuperColumn superColumn) { if (superColumn == null) { return getNullSize(); } @@ -133,7 +154,7 @@ private static int getSuperColumnSize(SuperColumn superColumn) { + getCollectionSize(superColumn.getColumns(), ThriftObjectSizeUtils::getColumnSize); } - private static int getDeletionSize(Deletion deletion) { + private static long getDeletionSize(Deletion deletion) { if (deletion == null) { return getNullSize(); } @@ -143,7 +164,7 @@ private static int getDeletionSize(Deletion deletion) { + getSlicePredicateSize(deletion.getPredicate()); } - private static int getSlicePredicateSize(SlicePredicate predicate) { + private static long getSlicePredicateSize(SlicePredicate predicate) { if (predicate == null) { return getNullSize(); } @@ -152,7 +173,7 @@ private static int getSlicePredicateSize(SlicePredicate predicate) { + getSliceRangeSize(predicate.getSlice_range()); } - private static int getSliceRangeSize(SliceRange sliceRange) { + private static long getSliceRangeSize(SliceRange sliceRange) { if (sliceRange == null) { return getNullSize(); } @@ -163,7 +184,7 @@ private static int getSliceRangeSize(SliceRange sliceRange) { + getSliceRangeCountSize(); } - private static int getCqlMetadataSize(CqlMetadata schema) { + private static long getCqlMetadataSize(CqlMetadata schema) { if (schema == null) { return getNullSize(); } @@ -174,13 +195,13 @@ private static int getCqlMetadataSize(CqlMetadata schema) { + getStringSize(schema.getDefault_value_type()); } - private static int getByteBufferStringMapSize(Map nameTypes) { + private static long getByteBufferStringMapSize(Map nameTypes) { return getCollectionSize(nameTypes.entrySet(), entry -> ThriftObjectSizeUtils.getByteBufferSize(entry.getKey()) + ThriftObjectSizeUtils.getStringSize(entry.getValue())); } - private static int getCqlRowSize(CqlRow cqlRow) { + private static long getCqlRowSize(CqlRow cqlRow) { if (cqlRow == null) { return getNullSize(); } @@ -188,47 +209,47 @@ private static int getCqlRowSize(CqlRow cqlRow) { + getCollectionSize(cqlRow.getColumns(), ThriftObjectSizeUtils::getColumnSize); } - private static int getThriftEnumSize() { + private static long getThriftEnumSize() { return Integer.BYTES; } - private static int getByteArraySize(byte[] byteArray) { + private static long getByteArraySize(byte[] byteArray) { if (byteArray == null) { return getNullSize(); } return byteArray.length; } - private static int getTimestampSize() { + private static long getTimestampSize() { return Long.BYTES; } - private static int getTtlSize() { + private static long getTtlSize() { return Integer.BYTES; } - private static int getCounterValueSize() { + private static long getCounterValueSize() { return Long.BYTES; } - private static int getReversedBooleanSize() { + private static long getReversedBooleanSize() { return ONE_BYTE; } - private static int getSliceRangeCountSize() { + private static long getSliceRangeCountSize() { return Integer.BYTES; } - private static int getNullSize() { + private static long getNullSize() { return Integer.BYTES; } - public static int getCollectionSize(Collection collection, Function sizeFunction) { + public static long getCollectionSize(Collection collection, Function sizeFunction) { if (collection == null) { return getNullSize(); } - int sum = 0; + long sum = 0; for (T item : collection) { sum += sizeFunction.apply(item); } diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftQueryWeighers.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftQueryWeighers.java new file mode 100644 index 00000000000..c9a018d9e69 --- /dev/null +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftQueryWeighers.java @@ -0,0 +1,123 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.keyvalue.cassandra.qos; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.apache.cassandra.thrift.ColumnOrSuperColumn; +import org.apache.cassandra.thrift.CqlResult; +import org.apache.cassandra.thrift.KeySlice; +import org.apache.cassandra.thrift.Mutation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Suppliers; +import com.palantir.atlasdb.keyvalue.cassandra.CassandraClient; +import com.palantir.atlasdb.qos.ImmutableQueryWeight; +import com.palantir.atlasdb.qos.QosClient; +import com.palantir.atlasdb.qos.QueryWeight; + +public final class ThriftQueryWeighers { + + private static final Logger log = LoggerFactory.getLogger(CassandraClient.class); + + public static final QueryWeight DEFAULT_ESTIMATED_WEIGHT = ImmutableQueryWeight.builder() + .numBytes(100) + .numDistinctRows(1) + .timeTakenNanos(TimeUnit.MILLISECONDS.toNanos(2)) + .build(); + + private ThriftQueryWeighers() { } + + public static final QosClient.QueryWeigher>> MULTIGET_SLICE = + readWeigher(ThriftObjectSizeUtils::getApproximateReadByteCount, Map::size); + + public static final QosClient.QueryWeigher> GET_RANGE_SLICES = + readWeigher(ThriftObjectSizeUtils::getApproximateReadByteCount, List::size); + + public static final QosClient.QueryWeigher GET = + readWeigher(ThriftObjectSizeUtils::getColumnOrSuperColumnSize, ignored -> 1); + + public static final QosClient.QueryWeigher EXECUTE_CQL3_QUERY = + // TODO(nziebart): we need to inspect the schema to see how many rows there are - a CQL row is NOT a + // partition. rows here will depend on the type of query executed in CqlExecutor: either (column, ts) pairs, + // or (key, column, ts) triplets + readWeigher(ThriftObjectSizeUtils::getCqlResultSize, ignored -> 1); + + public static QosClient.QueryWeigher batchMutate( + Map>> mutationMap) { + long numRows = mutationMap.size(); + return writeWeigher(numRows, () -> ThriftObjectSizeUtils.getApproximateWriteByteCount(mutationMap)); + } + + public static QosClient.QueryWeigher readWeigher(Function bytesRead, Function numRows) { + return new QosClient.QueryWeigher() { + @Override + public QueryWeight estimate() { + return DEFAULT_ESTIMATED_WEIGHT; + } + + @Override + public QueryWeight weigh(T result, long timeTakenNanos) { + return ImmutableQueryWeight.builder() + .numBytes(safeGetNumBytesOrDefault(() -> bytesRead.apply(result))) + .timeTakenNanos(timeTakenNanos) + .numDistinctRows(numRows.apply(result)) + .build(); + } + }; + } + + public static QosClient.QueryWeigher writeWeigher(long numRows, Supplier bytesWritten) { + Supplier weight = Suppliers.memoize(() -> safeGetNumBytesOrDefault(bytesWritten))::get; + + return new QosClient.QueryWeigher() { + @Override + public QueryWeight estimate() { + return ImmutableQueryWeight.builder() + .from(DEFAULT_ESTIMATED_WEIGHT) + .numBytes(weight.get()) + .numDistinctRows(numRows) + .build(); + } + + @Override + public QueryWeight weigh(T result, long timeTakenNanos) { + return ImmutableQueryWeight.builder() + .from(estimate()) + .timeTakenNanos(timeTakenNanos) + .build(); + } + }; + } + + // TODO(nziebart): we really shouldn't be needing to catch exceptions here + private static long safeGetNumBytesOrDefault(Supplier numBytes) { + try { + return numBytes.get(); + } catch (Exception e) { + log.warn("Error calculating number of bytes", e); + return DEFAULT_ESTIMATED_WEIGHT.numBytes(); + } + } + +} diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/ThriftObjectSizeUtilsTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/ThriftObjectSizeUtilsTest.java index a9f623361e0..a55b655be25 100644 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/ThriftObjectSizeUtilsTest.java +++ b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/ThriftObjectSizeUtilsTest.java @@ -19,6 +19,8 @@ import static org.assertj.core.api.Assertions.assertThat; import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; import org.apache.cassandra.thrift.Column; import org.apache.cassandra.thrift.ColumnOrSuperColumn; @@ -33,15 +35,18 @@ import org.junit.Test; import com.google.common.collect.ImmutableList; -import com.palantir.atlasdb.keyvalue.cassandra.ThriftObjectSizeUtils; +import com.google.common.collect.ImmutableMap; +import com.palantir.atlasdb.keyvalue.cassandra.qos.ThriftObjectSizeUtils; public class ThriftObjectSizeUtilsTest { - private static final String TEST_MAME = "test"; - private static final Column TEST_COLUMN = new Column(ByteBuffer.wrap(TEST_MAME.getBytes())); + private static final String TEST_MAME = "foo"; + private static final ByteBuffer TEST_NAME_BYTES = ByteBuffer.wrap(TEST_MAME.getBytes()); + private static final Column TEST_COLUMN = new Column(TEST_NAME_BYTES); - - private static final long TEST_COLUMN_SIZE = 4L + TEST_MAME.getBytes().length + 4L + 8L; + private static final long TEST_NAME_SIZE = 3L; + private static final long TEST_NAME_BYTES_SIZE = TEST_NAME_BYTES.remaining(); + private static final long TEST_COLUMN_SIZE = TEST_NAME_BYTES_SIZE + 4L + 4L + 8L; private static final ColumnOrSuperColumn EMPTY_COLUMN_OR_SUPERCOLUMN = new ColumnOrSuperColumn(); private static final long EMPTY_COLUMN_OR_SUPERCOLUMN_SIZE = Integer.BYTES * 4; @@ -74,7 +79,7 @@ public void getSizeForColumnOrSuperColumnWithANonEmptyColumnAndSuperColumn() { .setColumn(TEST_COLUMN) .setSuper_column(new SuperColumn(ByteBuffer.wrap(TEST_MAME.getBytes()), ImmutableList.of(TEST_COLUMN))))) - .isEqualTo(Integer.BYTES * 2 + TEST_COLUMN_SIZE + TEST_MAME.getBytes().length + TEST_COLUMN_SIZE); + .isEqualTo(Integer.BYTES * 2 + TEST_COLUMN_SIZE + TEST_NAME_BYTES_SIZE + TEST_COLUMN_SIZE); } @Test @@ -198,4 +203,51 @@ public void getSizeForKeySliceWithKeyAndColumns() { .setColumns(ImmutableList.of(EMPTY_COLUMN_OR_SUPERCOLUMN)))) .isEqualTo(TEST_MAME.getBytes().length + EMPTY_COLUMN_OR_SUPERCOLUMN_SIZE); } + + @Test + public void getSizeForBatchMutate() { + Map>> batchMutateMap = ImmutableMap.of( + TEST_NAME_BYTES, + ImmutableMap.of( + TEST_MAME, + ImmutableList.of(new Mutation().setColumn_or_supercolumn(EMPTY_COLUMN_OR_SUPERCOLUMN)))); + + long expectedSize = TEST_NAME_BYTES_SIZE + + TEST_NAME_SIZE + + Integer.BYTES + + EMPTY_COLUMN_OR_SUPERCOLUMN_SIZE; + + assertThat(ThriftObjectSizeUtils.getApproximateWriteByteCount(batchMutateMap)).isEqualTo(expectedSize); + } + + @Test + public void getStringSize() { + assertThat(ThriftObjectSizeUtils.getStringSize(TEST_MAME)).isEqualTo(TEST_NAME_SIZE); + } + + @Test + public void getMultigetResultSize() { + Map> result = ImmutableMap.of( + TEST_NAME_BYTES, ImmutableList.of(EMPTY_COLUMN_OR_SUPERCOLUMN)); + + long expectedSize = TEST_NAME_BYTES_SIZE + + EMPTY_COLUMN_OR_SUPERCOLUMN_SIZE; + + assertThat(ThriftObjectSizeUtils.getApproximateReadByteCount(result)).isEqualTo(expectedSize); + } + + @Test + public void getKeySlicesSize() { + List slices = ImmutableList.of( + new KeySlice() + .setKey(TEST_NAME_BYTES) + .setColumns(ImmutableList.of(EMPTY_COLUMN_OR_SUPERCOLUMN))); + + long expectedSize = TEST_NAME_BYTES_SIZE + + EMPTY_COLUMN_OR_SUPERCOLUMN_SIZE; + + assertThat(ThriftObjectSizeUtils.getApproximateReadByteCount(slices)).isEqualTo(expectedSize); + + } + } diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/QosCassandraClientTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/QosCassandraClientTest.java index 38d762b3563..3283aab1bcc 100644 --- a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/QosCassandraClientTest.java +++ b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/QosCassandraClientTest.java @@ -38,6 +38,7 @@ import com.google.common.collect.ImmutableMap; import com.palantir.atlasdb.encoding.PtBytes; import com.palantir.atlasdb.keyvalue.api.TableReference; +import com.palantir.atlasdb.keyvalue.cassandra.qos.QosCassandraClient; import com.palantir.atlasdb.keyvalue.cassandra.thrift.SlicePredicates; import com.palantir.atlasdb.qos.QosClient; @@ -62,7 +63,7 @@ public void setUp() { public void multigetSliceChecksLimit() throws TException, LimitExceededException { client.multiget_slice("get", TEST_TABLE, ImmutableList.of(ROW_KEY), SLICE_PREDICATE, ConsistencyLevel.ANY); - verify(qosClient, times(1)).executeRead(any(), any(), any()); + verify(qosClient, times(1)).executeRead(any(), any()); verifyNoMoreInteractions(qosClient); } @@ -79,7 +80,7 @@ public void executeCqlQueryChecksLimit() throws TException, LimitExceededExcepti CqlQuery query = new CqlQuery("SELECT * FROM test_table LIMIT 1"); client.execute_cql3_query(query, Compression.NONE, ConsistencyLevel.ANY); - verify(qosClient, times(1)).executeRead(any(), any(), any()); + verify(qosClient, times(1)).executeRead(any(), any()); verifyNoMoreInteractions(qosClient); } @@ -87,7 +88,7 @@ public void executeCqlQueryChecksLimit() throws TException, LimitExceededExcepti public void getRangeSlicesChecksLimit() throws TException, LimitExceededException { client.get_range_slices("get", TEST_TABLE, SLICE_PREDICATE, new KeyRange(), ConsistencyLevel.ANY); - verify(qosClient, times(1)).executeRead(any(), any(), any()); + verify(qosClient, times(1)).executeRead(any(), any()); verifyNoMoreInteractions(qosClient); } } diff --git a/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftQueryWeighersTest.java b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftQueryWeighersTest.java new file mode 100644 index 00000000000..c19d01e508d --- /dev/null +++ b/atlasdb-cassandra/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/qos/ThriftQueryWeighersTest.java @@ -0,0 +1,94 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.keyvalue.cassandra.qos; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +import org.apache.cassandra.thrift.ColumnOrSuperColumn; +import org.apache.cassandra.thrift.CqlResult; +import org.apache.cassandra.thrift.KeySlice; +import org.apache.cassandra.thrift.Mutation; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +public class ThriftQueryWeighersTest { + + private static final ByteBuffer BYTES1 = ByteBuffer.allocate(3); + private static final ByteBuffer BYTES2 = ByteBuffer.allocate(7); + private static final ColumnOrSuperColumn COLUMN = new ColumnOrSuperColumn(); + private static final KeySlice KEY_SLICE = new KeySlice(); + private static final Mutation MUTATION = new Mutation(); + + private static final long UNIMPORTANT_ARG = 123L; + + @Test + public void multigetSliceWeigherReturnsCorrectNumRows() { + Map> result = ImmutableMap.of( + BYTES1, ImmutableList.of(COLUMN, COLUMN), + BYTES2, ImmutableList.of(COLUMN)); + + long actualNumRows = ThriftQueryWeighers.MULTIGET_SLICE.weigh(result, UNIMPORTANT_ARG).numDistinctRows(); + + assertThat(actualNumRows).isEqualTo(2); + } + + @Test + public void rangeSlicesWeigherReturnsCorrectNumRows() { + List result = ImmutableList.of(KEY_SLICE, KEY_SLICE, KEY_SLICE); + + long actualNumRows = ThriftQueryWeighers.GET_RANGE_SLICES.weigh(result, UNIMPORTANT_ARG).numDistinctRows(); + + assertThat(actualNumRows).isEqualTo(3); + } + + @Test + public void getWeigherReturnsCorrectNumRows() { + long actualNumRows = ThriftQueryWeighers.GET.weigh(COLUMN, UNIMPORTANT_ARG).numDistinctRows(); + + assertThat(actualNumRows).isEqualTo(1); + } + + @Test + public void executeCql3QueryWeigherReturnsOneRowAlways() { + long actualNumRows = ThriftQueryWeighers.EXECUTE_CQL3_QUERY.weigh(new CqlResult(), + UNIMPORTANT_ARG).numDistinctRows(); + + assertThat(actualNumRows).isEqualTo(1); + } + + @Test + public void batchMutateWeigherReturnsCorrectNumRows() { + Map>> mutations = ImmutableMap.of( + BYTES1, ImmutableMap.of( + "table1", ImmutableList.of(MUTATION, MUTATION), + "table2", ImmutableList.of(MUTATION)), + BYTES2, ImmutableMap.of( + "table1", ImmutableList.of(MUTATION))); + + long actualNumRows = ThriftQueryWeighers.batchMutate(mutations).weigh(null, UNIMPORTANT_ARG) + .numDistinctRows(); + + assertThat(actualNumRows).isEqualTo(2); + } + +} diff --git a/qos-service-api/build.gradle b/qos-service-api/build.gradle index 60b3ef8dbf4..421a6862cd3 100644 --- a/qos-service-api/build.gradle +++ b/qos-service-api/build.gradle @@ -17,5 +17,8 @@ dependencies { exclude (module:'okhttp') exclude (module:'jsr305') } + + processor group: 'org.immutables', name: 'value' + } diff --git a/qos-service-api/src/main/java/com/palantir/atlasdb/qos/FakeQosClient.java b/qos-service-api/src/main/java/com/palantir/atlasdb/qos/FakeQosClient.java index 4b24852a266..7aaa05293e0 100644 --- a/qos-service-api/src/main/java/com/palantir/atlasdb/qos/FakeQosClient.java +++ b/qos-service-api/src/main/java/com/palantir/atlasdb/qos/FakeQosClient.java @@ -16,22 +16,19 @@ package com.palantir.atlasdb.qos; -import java.util.function.Function; -import java.util.function.Supplier; - public class FakeQosClient implements QosClient { public static final FakeQosClient INSTANCE = new FakeQosClient(); @Override - public T executeRead(Supplier estimatedWeight, ReadQuery query, - Function weigher) throws E { + public T executeRead(ReadQuery query, QueryWeigher weigher) + throws E { return query.execute(); } @Override - public void executeWrite(Supplier weight, WriteQuery query) - throws E { + public void executeWrite(WriteQuery query, + QueryWeigher weigher) throws E { query.execute(); } } diff --git a/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QosClient.java b/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QosClient.java index 26d85e9d4c0..656be55b66b 100644 --- a/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QosClient.java +++ b/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QosClient.java @@ -16,9 +16,6 @@ package com.palantir.atlasdb.qos; -import java.util.function.Function; -import java.util.function.Supplier; - public interface QosClient { interface ReadQuery { @@ -29,12 +26,16 @@ interface WriteQuery { void execute() throws E; } + interface QueryWeigher { + QueryWeight estimate(); + QueryWeight weigh(T result, long timeTakenNanos); + } + T executeRead( - Supplier estimatedWeight, ReadQuery query, - Function weigher) throws E; + QueryWeigher weigher) throws E; void executeWrite( - Supplier weight, - WriteQuery query) throws E; + WriteQuery query, + QueryWeigher weigher) throws E; } diff --git a/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QosService.java b/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QosService.java index 469caed1427..1dced841626 100644 --- a/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QosService.java +++ b/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QosService.java @@ -30,5 +30,5 @@ public interface QosService { @POST @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) - int getLimit(@Safe @PathParam("client") String client); + long getLimit(@Safe @PathParam("client") String client); } diff --git a/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QueryWeight.java b/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QueryWeight.java new file mode 100644 index 00000000000..6e36a73c4dc --- /dev/null +++ b/qos-service-api/src/main/java/com/palantir/atlasdb/qos/QueryWeight.java @@ -0,0 +1,37 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.qos; + +import java.util.concurrent.TimeUnit; + +import org.immutables.value.Value; + +@Value.Immutable +public interface QueryWeight { + + long numBytes(); + + long numDistinctRows(); + + // TODO(nziebart): need to standardize everyhting to longs, and handle casting to int in QosRateLimiter + long timeTakenNanos(); + + default long timeTakenMicros() { + return TimeUnit.NANOSECONDS.toMicros(timeTakenNanos()); + } + +} diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/QosResource.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/QosResource.java index b7443421cfb..ed199b69e4b 100644 --- a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/QosResource.java +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/QosResource.java @@ -29,7 +29,7 @@ public QosResource(Supplier config) { } @Override - public int getLimit(String client) { - return config.get().clientLimits().getOrDefault(client, Integer.MAX_VALUE); + public long getLimit(String client) { + return config.get().clientLimits().getOrDefault(client, Long.MAX_VALUE); } } diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/client/AtlasDbQosClient.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/client/AtlasDbQosClient.java index 09dc7ca64fc..f1832ba2199 100644 --- a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/client/AtlasDbQosClient.java +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/client/AtlasDbQosClient.java @@ -15,23 +15,22 @@ */ package com.palantir.atlasdb.qos.client; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Supplier; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Ticker; import com.palantir.atlasdb.qos.QosClient; -import com.palantir.atlasdb.qos.QosMetrics; +import com.palantir.atlasdb.qos.QueryWeight; +import com.palantir.atlasdb.qos.metrics.QosMetrics; import com.palantir.atlasdb.qos.ratelimit.QosRateLimiters; public class AtlasDbQosClient implements QosClient { private static final Logger log = LoggerFactory.getLogger(AtlasDbQosClient.class); + private static final Void NO_RESULT = null; + private final QosRateLimiters rateLimiters; private final QosMetrics metrics; private final Ticker ticker; @@ -48,50 +47,35 @@ public static AtlasDbQosClient create(QosRateLimiters rateLimiters) { } @Override - public T executeRead( - Supplier estimatedWeigher, - ReadQuery query, - Function weigher) throws E { - int estimatedWeight = getWeight(estimatedWeigher, 1); - rateLimiters.read().consumeWithBackoff(estimatedWeight); + public T executeRead(ReadQuery query, QueryWeigher weigher) throws E { + long estimatedNumBytes = weigher.estimate().numBytes(); + rateLimiters.read().consumeWithBackoff(estimatedNumBytes); // TODO(nziebart): decide what to do if we encounter a timeout exception long startTimeNanos = ticker.read(); T result = query.execute(); long totalTimeNanos = ticker.read() - startTimeNanos; - int actualWeight = getWeight(() -> weigher.apply(result), estimatedWeight); - metrics.updateReadCount(); - metrics.updateBytesRead(actualWeight); - metrics.updateReadTimeMicros(TimeUnit.NANOSECONDS.toMicros(totalTimeNanos)); - rateLimiters.read().recordAdjustment(actualWeight - estimatedWeight); + QueryWeight actualWeight = weigher.weigh(result, totalTimeNanos); + metrics.recordRead(actualWeight); + rateLimiters.read().recordAdjustment(actualWeight.numBytes() - estimatedNumBytes); return result; } @Override - public void executeWrite(Supplier weigher, WriteQuery query) throws E { - int weight = getWeight(weigher, 1); - rateLimiters.write().consumeWithBackoff(weight); + public void executeWrite(WriteQuery query, QueryWeigher weigher) throws E { + long estimatedNumBytes = weigher.estimate().numBytes(); + rateLimiters.write().consumeWithBackoff(estimatedNumBytes); // TODO(nziebart): decide what to do if we encounter a timeout exception long startTimeNanos = ticker.read(); query.execute(); long totalTimeNanos = ticker.read() - startTimeNanos; - metrics.updateWriteCount(); - metrics.updateWriteTimeMicros(TimeUnit.NANOSECONDS.toMicros(totalTimeNanos)); - metrics.updateBytesWritten(weight); - } - - // TODO(nziebart): error handling in the weight calculation should be responsibility of the caller - private Integer getWeight(Supplier weigher, int fallback) { - try { - return weigher.get(); - } catch (Exception e) { - log.warn("Exception while calculating response weight", e); - return fallback; - } + QueryWeight actualWeight = weigher.weigh(NO_RESULT, totalTimeNanos); + metrics.recordWrite(actualWeight); + rateLimiters.write().recordAdjustment(actualWeight.numBytes() - estimatedNumBytes); } } diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosServiceRuntimeConfig.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosServiceRuntimeConfig.java index 0faa8222f34..8ce8305589d 100644 --- a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosServiceRuntimeConfig.java +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/config/QosServiceRuntimeConfig.java @@ -27,5 +27,5 @@ @JsonSerialize(as = ImmutableQosServiceRuntimeConfig.class) @Value.Immutable public abstract class QosServiceRuntimeConfig { - public abstract Map clientLimits(); + public abstract Map clientLimits(); } diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/QosMetrics.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/metrics/QosMetrics.java similarity index 69% rename from qos-service-impl/src/main/java/com/palantir/atlasdb/qos/QosMetrics.java rename to qos-service-impl/src/main/java/com/palantir/atlasdb/qos/metrics/QosMetrics.java index d3d8f2f0573..d8ee536efb2 100644 --- a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/QosMetrics.java +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/metrics/QosMetrics.java @@ -14,53 +14,51 @@ * limitations under the License. */ -package com.palantir.atlasdb.qos; +package com.palantir.atlasdb.qos.metrics; import com.codahale.metrics.Meter; +import com.palantir.atlasdb.qos.QueryWeight; import com.palantir.atlasdb.util.MetricsManager; +// TODO(nziebart): needs tests public class QosMetrics { private final MetricsManager metricsManager = new MetricsManager(); private final Meter readRequestCount; private final Meter bytesRead; private final Meter readTime; + private final Meter rowsRead; private final Meter writeRequestCount; private final Meter bytesWritten; private final Meter writeTime; + private final Meter rowsWritten; + public QosMetrics() { readRequestCount = metricsManager.registerMeter(QosMetrics.class, "numReadRequests"); bytesRead = metricsManager.registerMeter(QosMetrics.class, "bytesRead"); readTime = metricsManager.registerMeter(QosMetrics.class, "readTime"); + rowsRead = metricsManager.registerMeter(QosMetrics.class, "rowsRead"); writeRequestCount = metricsManager.registerMeter(QosMetrics.class, "numWriteRequests"); bytesWritten = metricsManager.registerMeter(QosMetrics.class, "bytesWritten"); writeTime = metricsManager.registerMeter(QosMetrics.class, "writeTime"); + rowsWritten = metricsManager.registerMeter(QosMetrics.class, "rowsWritten"); } - public void updateReadCount() { + public void recordRead(QueryWeight weight) { readRequestCount.mark(); + bytesRead.mark(weight.numBytes()); + readTime.mark(weight.timeTakenMicros()); + rowsRead.mark(weight.numDistinctRows()); } - public void updateWriteCount() { + public void recordWrite(QueryWeight weight) { writeRequestCount.mark(); + bytesWritten.mark(weight.numBytes()); + writeTime.mark(weight.timeTakenMicros()); + rowsWritten.mark(weight.numDistinctRows()); } - public void updateBytesRead(long numBytes) { - bytesRead.mark(numBytes); - } - - public void updateBytesWritten(long numBytes) { - bytesWritten.mark(numBytes); - } - - public void updateReadTimeMicros(long readTimeMicros) { - readTime.mark(readTimeMicros); - } - - public void updateWriteTimeMicros(long writeTimeMicros) { - readTime.mark(writeTimeMicros); - } } diff --git a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiter.java b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiter.java index 7bb5773349b..299a417f371 100644 --- a/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiter.java +++ b/qos-service-impl/src/main/java/com/palantir/atlasdb/qos/ratelimit/QosRateLimiter.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; +import com.google.common.primitives.Ints; /** * A rate limiter for database queries, based on "units" of expense. This limiter strives to maintain an upper limit on @@ -55,7 +56,7 @@ public static QosRateLimiter create(long maxBackoffTimeMillis) { /** * Update the allowed rate, in units per second. */ - public void updateRate(int unitsPerSecond) { + public void updateRate(double unitsPerSecond) { rateLimiter.setRate(unitsPerSecond); } @@ -65,9 +66,9 @@ public void updateRate(int unitsPerSecond) { * * @return the amount of time slept for, if any */ - public Duration consumeWithBackoff(int estimatedNumUnits) { + public Duration consumeWithBackoff(long estimatedNumUnits) { Optional waitTime = rateLimiter.tryAcquire( - estimatedNumUnits, + Ints.saturatedCast(estimatedNumUnits), // TODO(nziebart): deal with longs maxBackoffTimeMillis, TimeUnit.MILLISECONDS); @@ -79,13 +80,13 @@ public Duration consumeWithBackoff(int estimatedNumUnits) { } /** - * Records an adjustment to the original estimate of units consumed passed to {@link #consumeWithBackoff(int)}. This + * Records an adjustment to the original estimate of units consumed passed to {@link #consumeWithBackoff}. This * should be called after a query returns, when the exact number of units consumed is known. This value may be * positive (if the original estimate was too small) or negative (if the original estimate was too large). */ - public void recordAdjustment(int adjustmentUnits) { + public void recordAdjustment(long adjustmentUnits) { if (adjustmentUnits > 0) { - rateLimiter.steal(adjustmentUnits); + rateLimiter.steal(Ints.saturatedCast(adjustmentUnits)); // TODO(nziebart): deal with longs } // TODO(nziebart): handle negative case } diff --git a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosRuntimeConfigDeserializationTest.java b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosRuntimeConfigDeserializationTest.java index 55a662bb1cb..9edcedafa2b 100644 --- a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosRuntimeConfigDeserializationTest.java +++ b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosRuntimeConfigDeserializationTest.java @@ -41,7 +41,7 @@ public void canDeserializeQosServerConfiguration() throws IOException { QosServiceRuntimeConfig configuration = OBJECT_MAPPER.readValue(testConfigFile, QosServiceRuntimeConfig.class); assertThat(configuration).isEqualTo(ImmutableQosServiceRuntimeConfig.builder() - .clientLimits(ImmutableMap.of("test", 10, "test2", 20)) + .clientLimits(ImmutableMap.of("test", 10L, "test2", 20L)) .build()); } } diff --git a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosServiceRuntimeConfigTest.java b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosServiceRuntimeConfigTest.java index 129ac04db62..c5c81c11e23 100644 --- a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosServiceRuntimeConfigTest.java +++ b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosServiceRuntimeConfigTest.java @@ -30,14 +30,14 @@ public void canBuildFromEmptyClientLimits() { @Test public void canBuildFromSingleClientLimit() { ImmutableQosServiceRuntimeConfig.builder() - .clientLimits(ImmutableMap.of("test_client", 10)) + .clientLimits(ImmutableMap.of("test_client", 10L)) .build(); } @Test public void canBuildFromMultipleClientLimits() { ImmutableQosServiceRuntimeConfig.builder() - .clientLimits(ImmutableMap.of("test_client", 10, "test_client2", 100)) + .clientLimits(ImmutableMap.of("test_client", 10L, "test_client2", 100L)) .build(); } } diff --git a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosServiceTest.java b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosServiceTest.java index 15fdeb1b64b..006cafef984 100644 --- a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosServiceTest.java +++ b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/QosServiceTest.java @@ -39,20 +39,20 @@ public class QosServiceTest { public void defaultsToNoLimit() { when(config.get()).thenReturn(configWithLimits(ImmutableMap.of())); - assertThat(resource.getLimit("foo")).isEqualTo(Integer.MAX_VALUE); + assertThat(resource.getLimit("foo")).isEqualTo(Long.MAX_VALUE); } @Test public void canLiveReloadLimits() { when(config.get()) - .thenReturn(configWithLimits(ImmutableMap.of("foo", 10))) - .thenReturn(configWithLimits(ImmutableMap.of("foo", 20))); + .thenReturn(configWithLimits(ImmutableMap.of("foo", 10L))) + .thenReturn(configWithLimits(ImmutableMap.of("foo", 20L))); - assertEquals(10, resource.getLimit("foo")); - assertEquals(20, resource.getLimit("foo")); + assertEquals(10L, resource.getLimit("foo")); + assertEquals(20L, resource.getLimit("foo")); } - private QosServiceRuntimeConfig configWithLimits(Map limits) { + private QosServiceRuntimeConfig configWithLimits(Map limits) { return ImmutableQosServiceRuntimeConfig.builder().clientLimits(limits).build(); } } diff --git a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/client/AtlasDbQosClientTest.java b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/client/AtlasDbQosClientTest.java index 27a8ddbda4a..b59df46e598 100644 --- a/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/client/AtlasDbQosClientTest.java +++ b/qos-service-impl/src/test/java/com/palantir/atlasdb/qos/client/AtlasDbQosClientTest.java @@ -17,6 +17,8 @@ package com.palantir.atlasdb.qos.client; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -26,8 +28,10 @@ import org.junit.Test; import com.google.common.base.Ticker; -import com.palantir.atlasdb.qos.QosMetrics; -import com.palantir.atlasdb.qos.QosService; +import com.palantir.atlasdb.qos.ImmutableQueryWeight; +import com.palantir.atlasdb.qos.QosClient; +import com.palantir.atlasdb.qos.QueryWeight; +import com.palantir.atlasdb.qos.metrics.QosMetrics; import com.palantir.atlasdb.qos.ratelimit.ImmutableQosRateLimiters; import com.palantir.atlasdb.qos.ratelimit.QosRateLimiter; import com.palantir.atlasdb.qos.ratelimit.QosRateLimiters; @@ -38,9 +42,22 @@ public class AtlasDbQosClientTest { private static final int ACTUAL_BYTES = 51; private static final long START_NANOS = 1100L; private static final long END_NANOS = 5500L; - private static final long TOTAL_TIME_MICROS = 4; + private static final long TOTAL_NANOS = END_NANOS - START_NANOS; + + private static final QueryWeight ESTIMATED_WEIGHT = ImmutableQueryWeight.builder() + .numBytes(ESTIMATED_BYTES) + .numDistinctRows(1) + .timeTakenNanos((int) TOTAL_NANOS) + .build(); + + private static final QueryWeight ACTUAL_WEIGHT = ImmutableQueryWeight.builder() + .numBytes(ACTUAL_BYTES) + .numDistinctRows(10) + .timeTakenNanos((int) TOTAL_NANOS) + .build(); + + private QosClient.QueryWeigher weigher = mock(QosClient.QueryWeigher.class); - private QosService qosService = mock(QosService.class); private QosRateLimiter readLimiter = mock(QosRateLimiter.class); private QosRateLimiter writeLimiter = mock(QosRateLimiter.class); private QosRateLimiters rateLimiters = ImmutableQosRateLimiters.builder() @@ -52,14 +69,15 @@ public class AtlasDbQosClientTest { @Before public void setUp() { - when(qosService.getLimit("test-client")).thenReturn(100); - when(ticker.read()).thenReturn(START_NANOS).thenReturn(END_NANOS); + + when(weigher.estimate()).thenReturn(ESTIMATED_WEIGHT); + when(weigher.weigh(any(), anyLong())).thenReturn(ACTUAL_WEIGHT); } @Test public void consumesSpecifiedNumUnitsForReads() { - qosClient.executeRead(() -> ESTIMATED_BYTES, () -> "foo", ignored -> ACTUAL_BYTES); + qosClient.executeRead(() -> "foo", weigher); verify(readLimiter).consumeWithBackoff(ESTIMATED_BYTES); verify(readLimiter).recordAdjustment(ACTUAL_BYTES - ESTIMATED_BYTES); @@ -68,42 +86,46 @@ public void consumesSpecifiedNumUnitsForReads() { @Test public void recordsReadMetrics() throws TestCheckedException { - qosClient.executeRead(() -> ESTIMATED_BYTES, () -> "foo", ignored -> ACTUAL_BYTES); + qosClient.executeRead(() -> "foo", weigher); - verify(metrics).updateReadCount(); - verify(metrics).updateBytesRead(ACTUAL_BYTES); - verify(metrics).updateReadTimeMicros(TOTAL_TIME_MICROS); + verify(metrics).recordRead(ACTUAL_WEIGHT); verifyNoMoreInteractions(metrics); } + @Test + public void passesResultAndTimeToReadWeigher() throws TestCheckedException { + qosClient.executeRead(() -> "foo", weigher); + + verify(weigher).weigh("foo", TOTAL_NANOS); + } + @Test public void consumesSpecifiedNumUnitsForWrites() { - qosClient.executeWrite(() -> ACTUAL_BYTES, () -> { }); + qosClient.executeWrite(() -> { }, weigher); - verify(writeLimiter).consumeWithBackoff(ACTUAL_BYTES); + verify(writeLimiter).consumeWithBackoff(ESTIMATED_BYTES); + verify(writeLimiter).recordAdjustment(ACTUAL_BYTES - ESTIMATED_BYTES); verifyNoMoreInteractions(readLimiter, writeLimiter); } @Test public void recordsWriteMetrics() throws TestCheckedException { - qosClient.executeWrite(() -> ACTUAL_BYTES, () -> { }); + qosClient.executeWrite(() -> { }, weigher); - verify(metrics).updateWriteCount(); - verify(metrics).updateBytesWritten(ACTUAL_BYTES); - verify(metrics).updateWriteTimeMicros(TOTAL_TIME_MICROS); + verify(metrics).recordWrite(ACTUAL_WEIGHT); verifyNoMoreInteractions(metrics); } @Test public void propagatesCheckedExceptions() throws TestCheckedException { - assertThatThrownBy(() -> qosClient.executeRead(() -> 1, () -> { + assertThatThrownBy(() -> qosClient.executeRead(() -> { throw new TestCheckedException(); - }, ignored -> 1)).isInstanceOf(TestCheckedException.class); + }, weigher)).isInstanceOf(TestCheckedException.class); - assertThatThrownBy(() -> qosClient.executeWrite(() -> 1, () -> { + assertThatThrownBy(() -> qosClient.executeWrite(() -> { throw new TestCheckedException(); - })).isInstanceOf(TestCheckedException.class); + }, weigher)).isInstanceOf(TestCheckedException.class); } - static class TestCheckedException extends Exception { } + static class TestCheckedException extends Exception {} }