diff --git a/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/QosCassandraReadTestSuite.java b/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/QosCassandraReadTestSuite.java index f51d351a7a5..42e466eac0a 100644 --- a/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/QosCassandraReadTestSuite.java +++ b/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/QosCassandraReadTestSuite.java @@ -235,7 +235,7 @@ private List readOneBatchOfSize(int batchSize) { .collect(Collectors.toList()); } - public static AtlasDbConfig getAtlasDbConfig() { + private static AtlasDbConfig getAtlasDbConfig() { DockerPort cassandraPort = docker.containers() .container("cassandra") .port(CASSANDRA_PORT_NUMBER); @@ -261,7 +261,7 @@ public static AtlasDbConfig getAtlasDbConfig() { .build(); } - public static Optional getAtlasDbRuntimeConfig() { + private static Optional getAtlasDbRuntimeConfig() { return Optional.of(ImmutableAtlasDbRuntimeConfig.builder() .sweep(ImmutableSweepConfig.builder().enabled(false).build()) .qos(ImmutableQosClientConfig.builder() diff --git a/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/QosCassandraWriteTestSuite.java b/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/QosCassandraWriteTestSuite.java index 09f40de5172..140581bfd7d 100644 --- a/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/QosCassandraWriteTestSuite.java +++ b/atlasdb-ete-tests/src/test/java/com/palantir/atlasdb/ete/QosCassandraWriteTestSuite.java @@ -15,25 +15,17 @@ */ package com.palantir.atlasdb.ete; -import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static com.palantir.atlasdb.ete.QosCassandraReadTestSuite.getAtlasDbConfig; - +import java.net.InetSocketAddress; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; -import org.assertj.core.util.Lists; import org.awaitility.Awaitility; import org.awaitility.Duration; import org.junit.After; @@ -42,28 +34,38 @@ import org.junit.Test; import com.codahale.metrics.MetricRegistry; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Uninterruptibles; +import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig; +import com.palantir.atlasdb.cassandra.ImmutableCassandraCredentialsConfig; +import com.palantir.atlasdb.cassandra.ImmutableCassandraKeyValueServiceConfig; +import com.palantir.atlasdb.config.AtlasDbConfig; +import com.palantir.atlasdb.config.AtlasDbRuntimeConfig; +import com.palantir.atlasdb.config.ImmutableAtlasDbConfig; +import com.palantir.atlasdb.config.ImmutableAtlasDbRuntimeConfig; +import com.palantir.atlasdb.config.ImmutableSweepConfig; import com.palantir.atlasdb.factory.TransactionManagers; -import com.palantir.atlasdb.http.errors.AtlasDbRemoteException; import com.palantir.atlasdb.keyvalue.api.Cell; +import com.palantir.atlasdb.qos.config.ImmutableQosClientConfig; +import com.palantir.atlasdb.qos.config.ImmutableQosLimitsConfig; +import com.palantir.atlasdb.qos.ratelimit.RateLimitExceededException; import com.palantir.atlasdb.table.description.ValueType; -import com.palantir.atlasdb.todo.TodoResource; import com.palantir.atlasdb.todo.TodoSchema; -import com.palantir.atlasdb.transaction.api.Transaction; import com.palantir.atlasdb.transaction.impl.SerializableTransactionManager; import com.palantir.atlasdb.util.AtlasDbMetrics; import com.palantir.docker.compose.DockerComposeRule; import com.palantir.docker.compose.configuration.ShutdownStrategy; import com.palantir.docker.compose.connection.Container; +import com.palantir.docker.compose.connection.DockerPort; import com.palantir.docker.compose.logging.LogDirectory; +import com.palantir.remoting.api.config.service.HumanReadableDuration; public class QosCassandraWriteTestSuite { - private static final Random random = new Random(); private static SerializableTransactionManager serializableTransactionManager; + private static final int readBytesPerSecond = 10_000; + private static final int writeBytesPerSecond = 10_000; + private static final int CASSANDRA_PORT_NUMBER = 9160; @ClassRule public static DockerComposeRule docker = DockerComposeRule.builder() @@ -79,7 +81,7 @@ public void setup() { serializableTransactionManager = TransactionManagers.builder() .config(getAtlasDbConfig()) - .runtimeConfigSupplier(QosCassandraReadTestSuite::getAtlasDbRuntimeConfig) + .runtimeConfigSupplier(QosCassandraWriteTestSuite::getAtlasDbRuntimeConfig) .schemas(ImmutableList.of(TodoSchema.getSchema())) .userAgent("qos-test") .buildSerializable(); @@ -88,95 +90,94 @@ public void setup() { .atMost(Duration.ONE_MINUTE) .pollInterval(Duration.ONE_SECOND) .until(serializableTransactionManager::isInitialized); - } @Test - public void shouldBeAbleToWriteBytesExcee() { - serializableTransactionManager.runTaskWithRetry((transaction) -> { - writeNTodosOfSize(transaction, 200, 1_000); - return null; - }); + public void shouldBeAbleToWriteSmallAmountOfBytesIfDoesNotExceedLimit() { + writeNTodosOfSize(1, 100); } - public static void writeNTodosOfSize(Transaction transaction, int numTodos, int size) { - Map write = new HashMap<>(); - for (int i = 0; i < numTodos; i++) { - Cell thisCell = Cell.create(ValueType.FIXED_LONG.convertFromJava(random.nextLong()), - TodoSchema.todoTextColumn()); - write.put(thisCell, ValueType.STRING.convertFromJava(getTodoOfSize(size))); - } - transaction.put(TodoSchema.todoTable(), write); + @Test + public void shouldBeAbleToWriteSmallAmountOfBytesSeriallyIfDoesNotExceedLimit() { + IntStream.range(0, 50) + .forEach(i -> writeNTodosOfSize(1, 100)); } - private void ensureOneWriteHasOccurred(TodoResource todoClient) { - try { - todoClient.addTodo(getTodoOfSize(100_000)); - // okay as the first huge write is not rate limited. - } catch (Exception e) { - // okay as some other test might have written before - } + @Test + public void shouldBeAbleToWriteLargeAmountsExceedingTheLimitFirstTime() { + writeNTodosOfSize(12, 1_000); } @Test - public void canNotWriteLargeNumberOfBytesConcurrentlyIfAllRequestsComeAtTheExactSameTime() - throws InterruptedException { - TodoResource todoClient = EteSetup.createClientToSingleNode(TodoResource.class); - - CyclicBarrier barrier = new CyclicBarrier(100); - ForkJoinPool threadPool = new ForkJoinPool(100); - List> futures = Lists.newArrayList(); - - IntStream.range(0, 100).parallel().forEach(i -> - futures.add(threadPool.submit( - () -> { - barrier.await(); - todoClient.addTodo(getTodoOfSize(1_000)); - return null; - }))); - - AtomicInteger exceptionCounter = new AtomicInteger(90); - futures.forEach(future -> { - try { - future.get(); - } catch (ExecutionException e) { - if (e.getCause().getClass().equals(AtlasDbRemoteException.class)) { - exceptionCounter.getAndIncrement(); - } - } catch (InterruptedException e) { - throw Throwables.propagate(e); - } - }); - assertThat(exceptionCounter.get()).isGreaterThan(90); + public void shouldBeAbleToWriteLargeAmountsExceedingTheLimitSecondTimeWithSoftLimiting() { + writeNTodosOfSize(12, 1_000); + + writeNTodosOfSize(12, 1_000); } @Test - public void canNotWriteLargeNumberOfBytesConcurrently() throws InterruptedException { - TodoResource todoClient = EteSetup.createClientToSingleNode(TodoResource.class); - - ForkJoinPool threadPool = new ForkJoinPool(100); - List> futures = Lists.newArrayList(); - - IntStream.range(0, 100).parallel() - .forEach(i -> futures.add(threadPool.submit(() -> todoClient.addTodo(getTodoOfSize(1_000))))); - - threadPool.shutdown(); - Preconditions.checkState(threadPool.awaitTermination(90, TimeUnit.SECONDS), - "Not all threads writing data finished in the expected time."); - - AtomicInteger exceptionCounter = new AtomicInteger(0); - futures.forEach(future -> { - try { - future.get(); - } catch (ExecutionException e) { - if (e.getCause().getClass().equals(AtlasDbRemoteException.class)) { - exceptionCounter.getAndIncrement(); - } - } catch (InterruptedException e) { - throw Throwables.propagate(e); + public void shouldNotBeAbleToWriteLargeAmountsIfSoftLimitSleepWillBeMoreThanConfiguredBackoffTime() { + // Have one limit-exceeding write + // as the rate-limiter will let anything pass through until the limit is exceeded. + writeNTodosOfSize(12, 1_000); + + assertThatThrownBy(() -> writeNTodosOfSize(200, 1_000)) + .isInstanceOf(RateLimitExceededException.class) + .hasMessage("Rate limited. Available capacity has been exhausted."); + } + + + public static void writeNTodosOfSize(int numTodos, int size) { + serializableTransactionManager.runTaskWithRetry((transaction) -> { + Map write = new HashMap<>(); + for (int i = 0; i < numTodos; i++) { + Cell thisCell = Cell.create(ValueType.FIXED_LONG.convertFromJava(random.nextLong()), + TodoSchema.todoTextColumn()); + write.put(thisCell, ValueType.STRING.convertFromJava(getTodoOfSize(size))); } + transaction.put(TodoSchema.todoTable(), write); + return null; }); - assertThat(exceptionCounter.get()).isGreaterThan(0); + + } + + private static AtlasDbConfig getAtlasDbConfig() { + DockerPort cassandraPort = docker.containers() + .container("cassandra") + .port(CASSANDRA_PORT_NUMBER); + + InetSocketAddress cassandraAddress = new InetSocketAddress(cassandraPort.getIp(), + cassandraPort.getExternalPort()); + + CassandraKeyValueServiceConfig kvsConfig = ImmutableCassandraKeyValueServiceConfig.builder() + .servers(ImmutableList.of(cassandraAddress)) + .credentials(ImmutableCassandraCredentialsConfig.builder() + .username("cassandra") + .password("cassandra") + .build()) + .ssl(false) + .replicationFactor(1) + .autoRefreshNodes(false) + .build(); + + return ImmutableAtlasDbConfig.builder() + .namespace("qosete") + .keyValueService(kvsConfig) + .initializeAsync(true) + .build(); + } + + private static Optional getAtlasDbRuntimeConfig() { + return Optional.of(ImmutableAtlasDbRuntimeConfig.builder() + .sweep(ImmutableSweepConfig.builder().enabled(false).build()) + .qos(ImmutableQosClientConfig.builder() + .limits(ImmutableQosLimitsConfig.builder() + .readBytesPerSecond(readBytesPerSecond) + .writeBytesPerSecond(writeBytesPerSecond) + .build()) + .maxBackoffSleepTime(HumanReadableDuration.seconds(2)) + .build()) + .build()); } @After