diff --git a/atlasdb-dropwizard-bundle/src/main/java/com/palantir/atlasdb/dropwizard/AtlasDbConfigurationProvider.java b/atlasdb-dropwizard-bundle/src/main/java/com/palantir/atlasdb/dropwizard/AtlasDbConfigurationProvider.java index 14de47696c1..3020c18668b 100644 --- a/atlasdb-dropwizard-bundle/src/main/java/com/palantir/atlasdb/dropwizard/AtlasDbConfigurationProvider.java +++ b/atlasdb-dropwizard-bundle/src/main/java/com/palantir/atlasdb/dropwizard/AtlasDbConfigurationProvider.java @@ -15,8 +15,13 @@ */ package com.palantir.atlasdb.dropwizard; +import java.util.Optional; + import com.palantir.atlasdb.config.AtlasDbConfig; +import com.palantir.atlasdb.config.AtlasDbRuntimeConfig; public interface AtlasDbConfigurationProvider { AtlasDbConfig getAtlasDbConfig(); + + Optional getAtlasDbRuntimeConfig(); } diff --git a/atlasdb-dropwizard-bundle/src/test/java/com/palantir/atlasdb/dropwizard/commands/AtlasDbConsoleCommandTest.java b/atlasdb-dropwizard-bundle/src/test/java/com/palantir/atlasdb/dropwizard/commands/AtlasDbConsoleCommandTest.java index 635a642c3dc..2063efe73d5 100644 --- a/atlasdb-dropwizard-bundle/src/test/java/com/palantir/atlasdb/dropwizard/commands/AtlasDbConsoleCommandTest.java +++ b/atlasdb-dropwizard-bundle/src/test/java/com/palantir/atlasdb/dropwizard/commands/AtlasDbConsoleCommandTest.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; @@ -30,6 +31,7 @@ import com.google.common.base.Throwables; import com.palantir.atlasdb.config.AtlasDbConfig; import com.palantir.atlasdb.config.AtlasDbConfigs; +import com.palantir.atlasdb.config.AtlasDbRuntimeConfig; import com.palantir.atlasdb.config.ImmutableAtlasDbConfig; import com.palantir.atlasdb.config.ImmutableLeaderConfig; import com.palantir.atlasdb.dropwizard.AtlasDbConfigurationProvider; @@ -100,5 +102,10 @@ private class AtlasDbDropwizardConfig extends Configuration implements AtlasDbCo public AtlasDbConfig getAtlasDbConfig() { return atlasDbConfig; } + + @Override + public Optional getAtlasDbRuntimeConfig() { + return Optional.empty(); + } } } diff --git a/atlasdb-ete-tests/src/main/java/com/palantir/atlasdb/AtlasDbEteConfiguration.java b/atlasdb-ete-tests/src/main/java/com/palantir/atlasdb/AtlasDbEteConfiguration.java index d35e8acebbb..a3a137580a7 100644 --- a/atlasdb-ete-tests/src/main/java/com/palantir/atlasdb/AtlasDbEteConfiguration.java +++ b/atlasdb-ete-tests/src/main/java/com/palantir/atlasdb/AtlasDbEteConfiguration.java @@ -15,21 +15,33 @@ */ package com.palantir.atlasdb; +import java.util.Optional; + import com.fasterxml.jackson.annotation.JsonProperty; import com.palantir.atlasdb.config.AtlasDbConfig; +import com.palantir.atlasdb.config.AtlasDbRuntimeConfig; import com.palantir.atlasdb.dropwizard.AtlasDbConfigurationProvider; import io.dropwizard.Configuration; public class AtlasDbEteConfiguration extends Configuration implements AtlasDbConfigurationProvider { private final AtlasDbConfig atlasdb; + private final Optional atlasdbRuntime; - public AtlasDbEteConfiguration(@JsonProperty("atlasdb") AtlasDbConfig atlasdb) { + public AtlasDbEteConfiguration(@JsonProperty("atlasdb") AtlasDbConfig atlasdb, + @JsonProperty("atlasDbRuntime") Optional atlasDbRuntimeConfig) { this.atlasdb = atlasdb; + this.atlasdbRuntime = atlasDbRuntimeConfig; } @Override public AtlasDbConfig getAtlasDbConfig() { return atlasdb; } + + @Override + public Optional getAtlasDbRuntimeConfig() { + return atlasdbRuntime; + } + } diff --git a/atlasdb-ete-tests/src/main/java/com/palantir/atlasdb/AtlasDbEteServer.java b/atlasdb-ete-tests/src/main/java/com/palantir/atlasdb/AtlasDbEteServer.java index fb01dc45baa..4917373614c 100644 --- a/atlasdb-ete-tests/src/main/java/com/palantir/atlasdb/AtlasDbEteServer.java +++ b/atlasdb-ete-tests/src/main/java/com/palantir/atlasdb/AtlasDbEteServer.java @@ -15,6 +15,7 @@ */ package com.palantir.atlasdb; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -28,6 +29,7 @@ import com.palantir.atlasdb.cas.CheckAndSetSchema; import com.palantir.atlasdb.cas.SimpleCheckAndSetResource; import com.palantir.atlasdb.config.AtlasDbConfig; +import com.palantir.atlasdb.config.AtlasDbRuntimeConfig; import com.palantir.atlasdb.dropwizard.AtlasDbBundle; import com.palantir.atlasdb.factory.TransactionManagers; import com.palantir.atlasdb.http.NotInitializedExceptionMapper; @@ -77,18 +79,22 @@ public void run(AtlasDbEteConfiguration config, final Environment environment) t private TransactionManager tryToCreateTransactionManager(AtlasDbEteConfiguration config, Environment environment) throws InterruptedException { if (config.getAtlasDbConfig().initializeAsync()) { - return createTransactionManager(config.getAtlasDbConfig(), environment); + return createTransactionManager(config.getAtlasDbConfig(), config.getAtlasDbRuntimeConfig(), environment); } else { - return createTransactionManagerWithRetry(config.getAtlasDbConfig(), environment); + return createTransactionManagerWithRetry(config.getAtlasDbConfig(), + config.getAtlasDbRuntimeConfig(), + environment); } } - private TransactionManager createTransactionManagerWithRetry(AtlasDbConfig config, Environment environment) + private TransactionManager createTransactionManagerWithRetry(AtlasDbConfig config, + Optional atlasDbRuntimeConfig, + Environment environment) throws InterruptedException { Stopwatch sw = Stopwatch.createStarted(); while (sw.elapsed(TimeUnit.SECONDS) < CREATE_TRANSACTION_MANAGER_MAX_WAIT_TIME_SECS) { try { - return createTransactionManager(config, environment); + return createTransactionManager(config, atlasDbRuntimeConfig, environment); } catch (RuntimeException e) { log.warn("An error occurred while trying to create transaction manager. Retrying...", e); Thread.sleep(CREATE_TRANSACTION_MANAGER_POLL_INTERVAL_SECS); @@ -97,12 +103,14 @@ private TransactionManager createTransactionManagerWithRetry(AtlasDbConfig confi throw new IllegalStateException("Timed-out because we were unable to create transaction manager"); } - private TransactionManager createTransactionManager(AtlasDbConfig config, Environment environment) { + private TransactionManager createTransactionManager(AtlasDbConfig config, + Optional atlasDbRuntimeConfigOptional, Environment environment) { return TransactionManagers.builder() .config(config) .schemas(ETE_SCHEMAS) .registrar(environment.jersey()::register) .userAgent("ete test") + .runtimeConfigSupplier(() -> atlasDbRuntimeConfigOptional) .buildSerializable(); } diff --git a/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/QosCassandraEteTestSetup.java b/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/QosCassandraEteTestSetup.java new file mode 100644 index 00000000000..676f6599548 --- /dev/null +++ b/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/QosCassandraEteTestSetup.java @@ -0,0 +1,154 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.palantir.atlasdb.ete; + + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Random; + +import org.awaitility.Awaitility; +import org.awaitility.Duration; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; + +import com.codahale.metrics.MetricRegistry; +import com.google.common.collect.ImmutableList; +import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig; +import com.palantir.atlasdb.cassandra.ImmutableCassandraCredentialsConfig; +import com.palantir.atlasdb.cassandra.ImmutableCassandraKeyValueServiceConfig; +import com.palantir.atlasdb.config.AtlasDbConfig; +import com.palantir.atlasdb.config.AtlasDbRuntimeConfig; +import com.palantir.atlasdb.config.ImmutableAtlasDbConfig; +import com.palantir.atlasdb.config.ImmutableAtlasDbRuntimeConfig; +import com.palantir.atlasdb.config.ImmutableSweepConfig; +import com.palantir.atlasdb.factory.TransactionManagers; +import com.palantir.atlasdb.keyvalue.api.Cell; +import com.palantir.atlasdb.qos.config.ImmutableQosClientConfig; +import com.palantir.atlasdb.qos.config.ImmutableQosLimitsConfig; +import com.palantir.atlasdb.table.description.ValueType; +import com.palantir.atlasdb.todo.TodoSchema; +import com.palantir.atlasdb.transaction.impl.SerializableTransactionManager; +import com.palantir.atlasdb.util.AtlasDbMetrics; +import com.palantir.docker.compose.DockerComposeRule; +import com.palantir.docker.compose.configuration.ShutdownStrategy; +import com.palantir.docker.compose.connection.Container; +import com.palantir.docker.compose.connection.DockerPort; +import com.palantir.docker.compose.logging.LogDirectory; +import com.palantir.remoting.api.config.service.HumanReadableDuration; + +public class QosCassandraEteTestSetup { + private static final Random random = new Random(); + protected static SerializableTransactionManager serializableTransactionManager; + protected static final int readBytesPerSecond = 10_000; + protected static final int writeBytesPerSecond = 10_000; + private static final int CASSANDRA_PORT_NUMBER = 9160; + protected static final int MAX_SOFT_LIMITING_SLEEP_MILLIS = 2000; + + @ClassRule + public static DockerComposeRule docker = DockerComposeRule.builder() + .file("src/test/resources/cassandra-docker-compose.yml") + .waitingForService("cassandra", Container::areAllPortsOpen) + .saveLogsTo(LogDirectory.circleAwareLogDirectory(QosCassandraReadEteTest.class)) + .shutdownStrategy(ShutdownStrategy.AGGRESSIVE_WITH_NETWORK_CLEANUP) + .build(); + + @Before + public void setup() { + AtlasDbMetrics.setMetricRegistry(new MetricRegistry()); + ensureTransactionManagerIsCreated(); + } + + protected static void ensureTransactionManagerIsCreated() { + serializableTransactionManager = TransactionManagers.builder() + .config(getAtlasDbConfig()) + .runtimeConfigSupplier(QosCassandraEteTestSetup::getAtlasDbRuntimeConfig) + .schemas(ImmutableList.of(TodoSchema.getSchema())) + .userAgent("qos-test") + .buildSerializable(); + + Awaitility.await() + .atMost(Duration.ONE_MINUTE) + .pollInterval(Duration.ONE_SECOND) + .until(serializableTransactionManager::isInitialized); + } + + protected static void writeNTodosOfSize(int numTodos, int size) { + serializableTransactionManager.runTaskWithRetry((transaction) -> { + Map write = new HashMap<>(); + for (int i = 0; i < numTodos; i++) { + Cell thisCell = Cell.create(ValueType.FIXED_LONG.convertFromJava(random.nextLong()), + TodoSchema.todoTextColumn()); + write.put(thisCell, ValueType.STRING.convertFromJava(getTodoOfSize(size))); + } + transaction.put(TodoSchema.todoTable(), write); + return null; + }); + } + + private static AtlasDbConfig getAtlasDbConfig() { + DockerPort cassandraPort = docker.containers() + .container("cassandra") + .port(CASSANDRA_PORT_NUMBER); + + InetSocketAddress cassandraAddress = new InetSocketAddress(cassandraPort.getIp(), + cassandraPort.getExternalPort()); + + CassandraKeyValueServiceConfig kvsConfig = ImmutableCassandraKeyValueServiceConfig.builder() + .servers(ImmutableList.of(cassandraAddress)) + .credentials(ImmutableCassandraCredentialsConfig.builder() + .username("cassandra") + .password("cassandra") + .build()) + .ssl(false) + .replicationFactor(1) + .autoRefreshNodes(false) + .build(); + + return ImmutableAtlasDbConfig.builder() + .namespace("qosete") + .keyValueService(kvsConfig) + .initializeAsync(true) + .build(); + } + + private static Optional getAtlasDbRuntimeConfig() { + return Optional.of(ImmutableAtlasDbRuntimeConfig.builder() + .sweep(ImmutableSweepConfig.builder().enabled(false).build()) + .qos(ImmutableQosClientConfig.builder() + .limits(ImmutableQosLimitsConfig.builder() + .readBytesPerSecond(readBytesPerSecond) + .writeBytesPerSecond(writeBytesPerSecond) + .build()) + .maxBackoffSleepTime(HumanReadableDuration.milliseconds(MAX_SOFT_LIMITING_SLEEP_MILLIS)) + .build()) + .build()); + } + + private static String getTodoOfSize(int size) { + // Note that the size of the cell for 1000 length text is actually 1050. + return String.join("", Collections.nCopies(size, "a")); + } + + @After + public void after() { + serializableTransactionManager.close(); + } +} diff --git a/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/QosCassandraReadEteTest.java b/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/QosCassandraReadEteTest.java new file mode 100644 index 00000000000..b49402e19ca --- /dev/null +++ b/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/QosCassandraReadEteTest.java @@ -0,0 +1,155 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.palantir.atlasdb.ete; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.palantir.atlasdb.keyvalue.api.RangeRequest; +import com.palantir.atlasdb.keyvalue.api.RowResult; +import com.palantir.atlasdb.qos.ratelimit.RateLimitExceededException; +import com.palantir.atlasdb.table.description.ValueType; +import com.palantir.atlasdb.todo.ImmutableTodo; +import com.palantir.atlasdb.todo.Todo; +import com.palantir.atlasdb.todo.TodoSchema; +import com.palantir.common.base.BatchingVisitable; + +public class QosCassandraReadEteTest extends QosCassandraEteTestSetup { + private static final int ONE_TODO_SIZE_IN_BYTES = 1050; + + @BeforeClass + public static void createTransactionManagerAndWriteData() { + ensureTransactionManagerIsCreated(); + writeNTodosOfSize(200, 1_000); + serializableTransactionManager.close(); + } + + @Test + public void shouldBeAbleToReadSmallAmountOfBytesIfDoesNotExceedLimit() { + assertThat(readOneBatchOfSize(1)).hasSize(1); + } + + @Test + public void shouldBeAbleToReadSmallAmountOfBytesSeriallyIfDoesNotExceedLimit() { + IntStream.range(0, 50).forEach(i -> assertThat(readOneBatchOfSize(1)).hasSize(1)); + } + + @Test + public void shouldBeAbleToReadLargeAmountsExceedingTheLimitFirstTime() { + assertThat(readOneBatchOfSize(12)).hasSize(12); + } + + @Test + public void shouldBeAbleToReadLargeAmountsExceedingTheLimitSecondTimeWithSoftLimiting() { + assertThat(readOneBatchOfSize(12)).hasSize(12); + // The second read might actually be faster as the transaction/metadata is cached + assertThat(readOneBatchOfSize(12)).hasSize(12); + } + + @Test + public void shouldNotBeAbleToReadLargeAmountsIfSoftLimitSleepWillBeMoreThanConfiguredBackoffTime() { + assertThatThrownBy(() -> readOneBatchOfSize(200)) + .isInstanceOf(RateLimitExceededException.class) + .hasMessage("Rate limited. Available capacity has been exhausted."); + + // TODO(hsaraogi): This should not happen. + assertThatThrownBy(() -> readOneBatchOfSize(1)) + .isInstanceOf(RateLimitExceededException.class) + .hasMessage("Rate limited. Available capacity has been exhausted."); + } + + @Test + public void readRateLimitShouldBeRespectedByConcurrentReadingThreads() throws InterruptedException { + int numThreads = 5; + int numReadsPerThread = 10; + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + List>> futures = new ArrayList<>(numThreads); + + long start = System.nanoTime(); + IntStream.range(0, numThreads).forEach(i -> + futures.add(executorService.submit(() -> { + List results = new ArrayList<>(numReadsPerThread); + IntStream.range(0, numReadsPerThread) + .forEach(j -> results.addAll(readOneBatchOfSize(1))); + return results; + }))); + executorService.shutdown(); + Preconditions.checkState(executorService.awaitTermination(30L, TimeUnit.SECONDS), + "Read tasks did not finish in 30s"); + long readTime = System.nanoTime() - start; + + assertThatAllReadsWereSuccessful(futures, numReadsPerThread); + double actualBytesRead = numThreads * numReadsPerThread * ONE_TODO_SIZE_IN_BYTES; + double maxReadBytesLimit = readBytesPerSecond * ((double) readTime / TimeUnit.SECONDS.toNanos(1) + + 5 /* to allow for rate-limiter burst */); + + assertThat(actualBytesRead).isLessThan(maxReadBytesLimit); + } + + private void assertThatAllReadsWereSuccessful(List>> futures, int numReadsPerThread) { + AtomicInteger exceptionCounter = new AtomicInteger(0); + futures.forEach(future -> { + try { + assertThat(future.get()).hasSize(numReadsPerThread); + } catch (ExecutionException e) { + if (e.getCause() instanceof RateLimitExceededException) { + exceptionCounter.getAndIncrement(); + } + } catch (InterruptedException e) { + throw Throwables.propagate(e); + } + }); + assertThat(exceptionCounter.get()).isEqualTo(0); + } + + private List readOneBatchOfSize(int batchSize) { + ImmutableList> results = serializableTransactionManager.runTaskWithRetry((transaction) -> { + BatchingVisitable> rowResultBatchingVisitable = transaction.getRange( + TodoSchema.todoTable(), RangeRequest.all()); + ImmutableList.Builder> rowResults = ImmutableList.builder(); + + rowResultBatchingVisitable.batchAccept(batchSize, items -> { + rowResults.addAll(items); + return false; + }); + + return rowResults.build(); + }); + + return results.stream() + .map(RowResult::getOnlyColumnValue) + .map(ValueType.STRING::convertToString) + .map(ImmutableTodo::of) + .collect(Collectors.toList()); + } +} diff --git a/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/QosCassandraWriteEteTest.java b/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/QosCassandraWriteEteTest.java new file mode 100644 index 00000000000..cf965c249a1 --- /dev/null +++ b/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/QosCassandraWriteEteTest.java @@ -0,0 +1,129 @@ +/* + * Copyright 2017 Palantir Technologies, Inc. All rights reserved. + * + * Licensed under the BSD-3 License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.ete; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import org.junit.Test; + +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.base.Throwables; +import com.palantir.atlasdb.qos.ratelimit.RateLimitExceededException; + +public class QosCassandraWriteEteTest extends QosCassandraEteTestSetup { + + @Test + public void shouldBeAbleToWriteSmallAmountOfBytesIfDoesNotExceedLimit() { + writeNTodosOfSize(1, 100); + } + + @Test + public void shouldBeAbleToWriteSmallAmountOfBytesSeriallyIfDoesNotExceedLimit() { + IntStream.range(0, 50).forEach(i -> writeNTodosOfSize(1, 100)); + } + + @Test + public void shouldBeAbleToWriteLargeAmountsExceedingTheLimitFirstTime() { + writeNTodosOfSize(12, 1_000); + } + + @Test + public void shouldBeAbleToWriteLargeAmountsExceedingTheLimitSecondTimeWithSoftLimiting() { + Stopwatch stopwatch = Stopwatch.createStarted(); + writeNTodosOfSize(1, 20_000); + long firstWriteTime = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + stopwatch = Stopwatch.createStarted(); + writeNTodosOfSize(200, 1_000); + long secondWriteTime = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + assertThat(secondWriteTime).isGreaterThan(firstWriteTime); + assertThat(secondWriteTime - firstWriteTime).isLessThan(MAX_SOFT_LIMITING_SLEEP_MILLIS); + } + + @Test + public void shouldNotBeAbleToWriteLargeAmountsIfSoftLimitSleepWillBeMoreThanConfiguredBackoffTime() { + // Have one quick limit-exceeding write, as the rate-limiter + // will let anything pass through until the limit is exceeded. + writeNTodosOfSize(1, 100_000); + + assertThatThrownBy(() -> writeNTodosOfSize(1, 100_000)) + .isInstanceOf(RateLimitExceededException.class) + .hasMessage("Rate limited. Available capacity has been exhausted."); + + // One write smaller than the rate limit should also be rate limited. + assertThatThrownBy(() -> writeNTodosOfSize(5, 10)) + .isInstanceOf(RateLimitExceededException.class) + .hasMessage("Rate limited. Available capacity has been exhausted."); + } + + @Test + public void writeRateLimitShouldBeRespectedByConcurrentWritingThreads() throws InterruptedException { + int oneTodoSizeInBytes = 167; + + int numThreads = 5; + int numWritesPerThread = 10; + + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + List futures = new ArrayList<>(numThreads); + + long start = System.nanoTime(); + IntStream.range(0, numThreads).forEach(i -> + futures.add(executorService.submit(() -> { + IntStream.range(0, numWritesPerThread).forEach(j -> writeNTodosOfSize(1, 100)); + return null; + }))); + executorService.shutdown(); + Preconditions.checkState(executorService.awaitTermination(30L, TimeUnit.SECONDS), + "Read tasks did not finish in 30s"); + long writeTime = System.nanoTime() - start; + + assertThatAllWritesWereSuccessful(futures); + double actualBytesWritten = numThreads * numWritesPerThread * oneTodoSizeInBytes; + double maxReadBytesLimit = readBytesPerSecond * ((double) writeTime / TimeUnit.SECONDS.toNanos(1) + + 5 /* to allow for rate-limiter burst */); + assertThat(actualBytesWritten).isLessThan(maxReadBytesLimit); + } + + private void assertThatAllWritesWereSuccessful(List futures) { + AtomicInteger exceptionCounter = new AtomicInteger(0); + futures.forEach(future -> { + try { + future.get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof RateLimitExceededException) { + exceptionCounter.getAndIncrement(); + } + } catch (InterruptedException e) { + throw Throwables.propagate(e); + } + }); + assertThat(exceptionCounter.get()).isEqualTo(0); + } +} diff --git a/atlasdb-ete-tests/src/test/resources/cassandra-docker-compose.yml b/atlasdb-ete-tests/src/test/resources/cassandra-docker-compose.yml new file mode 100644 index 00000000000..be21170b800 --- /dev/null +++ b/atlasdb-ete-tests/src/test/resources/cassandra-docker-compose.yml @@ -0,0 +1,14 @@ +version: '2' + +services: + cassandra: + image: palantirtechnologies/docker-cassandra-atlasdb:2.2.8 + ports: + - "9160" + - "9042" + - "7199" + environment: + - MAX_HEAP_SIZE=512m + - HEAP_NEWSIZE=64m + - LOCAL_JMX=no + - CASSANDRA_VERSION=2.2.8 diff --git a/qos-service-impl/build.gradle b/qos-service-impl/build.gradle index 75c1f1db3d6..24b402edf35 100644 --- a/qos-service-impl/build.gradle +++ b/qos-service-impl/build.gradle @@ -20,6 +20,10 @@ dependencies { exclude group: 'com.google.guava' } compile (project(":qos-service-api")); + compile (project(":atlasdb-client")) { + exclude group: 'com.squareup.okhttp3' + exclude group: 'com.google.guava' + } processor project(":atlasdb-processors") processor group: 'org.immutables', name: 'value'