Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Write tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hsaraogi committed Nov 22, 2017
1 parent 3f080db commit 504f345
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ private List<Todo> readOneBatchOfSize(int batchSize) {
.collect(Collectors.toList());
}

public static AtlasDbConfig getAtlasDbConfig() {
private static AtlasDbConfig getAtlasDbConfig() {
DockerPort cassandraPort = docker.containers()
.container("cassandra")
.port(CASSANDRA_PORT_NUMBER);
Expand All @@ -261,7 +261,7 @@ public static AtlasDbConfig getAtlasDbConfig() {
.build();
}

public static Optional<AtlasDbRuntimeConfig> getAtlasDbRuntimeConfig() {
private static Optional<AtlasDbRuntimeConfig> getAtlasDbRuntimeConfig() {
return Optional.of(ImmutableAtlasDbRuntimeConfig.builder()
.sweep(ImmutableSweepConfig.builder().enabled(false).build())
.qos(ImmutableQosClientConfig.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,17 @@
*/
package com.palantir.atlasdb.ete;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import static com.palantir.atlasdb.ete.QosCassandraReadTestSuite.getAtlasDbConfig;

import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import org.assertj.core.util.Lists;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.After;
Expand All @@ -42,28 +34,38 @@
import org.junit.Test;

import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Uninterruptibles;
import com.palantir.atlasdb.cassandra.CassandraKeyValueServiceConfig;
import com.palantir.atlasdb.cassandra.ImmutableCassandraCredentialsConfig;
import com.palantir.atlasdb.cassandra.ImmutableCassandraKeyValueServiceConfig;
import com.palantir.atlasdb.config.AtlasDbConfig;
import com.palantir.atlasdb.config.AtlasDbRuntimeConfig;
import com.palantir.atlasdb.config.ImmutableAtlasDbConfig;
import com.palantir.atlasdb.config.ImmutableAtlasDbRuntimeConfig;
import com.palantir.atlasdb.config.ImmutableSweepConfig;
import com.palantir.atlasdb.factory.TransactionManagers;
import com.palantir.atlasdb.http.errors.AtlasDbRemoteException;
import com.palantir.atlasdb.keyvalue.api.Cell;
import com.palantir.atlasdb.qos.config.ImmutableQosClientConfig;
import com.palantir.atlasdb.qos.config.ImmutableQosLimitsConfig;
import com.palantir.atlasdb.qos.ratelimit.RateLimitExceededException;
import com.palantir.atlasdb.table.description.ValueType;
import com.palantir.atlasdb.todo.TodoResource;
import com.palantir.atlasdb.todo.TodoSchema;
import com.palantir.atlasdb.transaction.api.Transaction;
import com.palantir.atlasdb.transaction.impl.SerializableTransactionManager;
import com.palantir.atlasdb.util.AtlasDbMetrics;
import com.palantir.docker.compose.DockerComposeRule;
import com.palantir.docker.compose.configuration.ShutdownStrategy;
import com.palantir.docker.compose.connection.Container;
import com.palantir.docker.compose.connection.DockerPort;
import com.palantir.docker.compose.logging.LogDirectory;
import com.palantir.remoting.api.config.service.HumanReadableDuration;

public class QosCassandraWriteTestSuite {

private static final Random random = new Random();
private static SerializableTransactionManager serializableTransactionManager;
private static final int readBytesPerSecond = 10_000;
private static final int writeBytesPerSecond = 10_000;
private static final int CASSANDRA_PORT_NUMBER = 9160;

@ClassRule
public static DockerComposeRule docker = DockerComposeRule.builder()
Expand All @@ -79,7 +81,7 @@ public void setup() {

serializableTransactionManager = TransactionManagers.builder()
.config(getAtlasDbConfig())
.runtimeConfigSupplier(QosCassandraReadTestSuite::getAtlasDbRuntimeConfig)
.runtimeConfigSupplier(QosCassandraWriteTestSuite::getAtlasDbRuntimeConfig)
.schemas(ImmutableList.of(TodoSchema.getSchema()))
.userAgent("qos-test")
.buildSerializable();
Expand All @@ -88,95 +90,94 @@ public void setup() {
.atMost(Duration.ONE_MINUTE)
.pollInterval(Duration.ONE_SECOND)
.until(serializableTransactionManager::isInitialized);

}

@Test
public void shouldBeAbleToWriteBytesExcee() {
serializableTransactionManager.runTaskWithRetry((transaction) -> {
writeNTodosOfSize(transaction, 200, 1_000);
return null;
});
public void shouldBeAbleToWriteSmallAmountOfBytesIfDoesNotExceedLimit() {
writeNTodosOfSize(1, 100);
}

public static void writeNTodosOfSize(Transaction transaction, int numTodos, int size) {
Map<Cell, byte[]> write = new HashMap<>();
for (int i = 0; i < numTodos; i++) {
Cell thisCell = Cell.create(ValueType.FIXED_LONG.convertFromJava(random.nextLong()),
TodoSchema.todoTextColumn());
write.put(thisCell, ValueType.STRING.convertFromJava(getTodoOfSize(size)));
}
transaction.put(TodoSchema.todoTable(), write);
@Test
public void shouldBeAbleToWriteSmallAmountOfBytesSeriallyIfDoesNotExceedLimit() {
IntStream.range(0, 50)
.forEach(i -> writeNTodosOfSize(1, 100));
}

private void ensureOneWriteHasOccurred(TodoResource todoClient) {
try {
todoClient.addTodo(getTodoOfSize(100_000));
// okay as the first huge write is not rate limited.
} catch (Exception e) {
// okay as some other test might have written before
}
@Test
public void shouldBeAbleToWriteLargeAmountsExceedingTheLimitFirstTime() {
writeNTodosOfSize(12, 1_000);
}

@Test
public void canNotWriteLargeNumberOfBytesConcurrentlyIfAllRequestsComeAtTheExactSameTime()
throws InterruptedException {
TodoResource todoClient = EteSetup.createClientToSingleNode(TodoResource.class);

CyclicBarrier barrier = new CyclicBarrier(100);
ForkJoinPool threadPool = new ForkJoinPool(100);
List<Future<?>> futures = Lists.newArrayList();

IntStream.range(0, 100).parallel().forEach(i ->
futures.add(threadPool.submit(
() -> {
barrier.await();
todoClient.addTodo(getTodoOfSize(1_000));
return null;
})));

AtomicInteger exceptionCounter = new AtomicInteger(90);
futures.forEach(future -> {
try {
future.get();
} catch (ExecutionException e) {
if (e.getCause().getClass().equals(AtlasDbRemoteException.class)) {
exceptionCounter.getAndIncrement();
}
} catch (InterruptedException e) {
throw Throwables.propagate(e);
}
});
assertThat(exceptionCounter.get()).isGreaterThan(90);
public void shouldBeAbleToWriteLargeAmountsExceedingTheLimitSecondTimeWithSoftLimiting() {
writeNTodosOfSize(12, 1_000);

writeNTodosOfSize(12, 1_000);
}

@Test
public void canNotWriteLargeNumberOfBytesConcurrently() throws InterruptedException {
TodoResource todoClient = EteSetup.createClientToSingleNode(TodoResource.class);

ForkJoinPool threadPool = new ForkJoinPool(100);
List<Future<?>> futures = Lists.newArrayList();

IntStream.range(0, 100).parallel()
.forEach(i -> futures.add(threadPool.submit(() -> todoClient.addTodo(getTodoOfSize(1_000)))));

threadPool.shutdown();
Preconditions.checkState(threadPool.awaitTermination(90, TimeUnit.SECONDS),
"Not all threads writing data finished in the expected time.");

AtomicInteger exceptionCounter = new AtomicInteger(0);
futures.forEach(future -> {
try {
future.get();
} catch (ExecutionException e) {
if (e.getCause().getClass().equals(AtlasDbRemoteException.class)) {
exceptionCounter.getAndIncrement();
}
} catch (InterruptedException e) {
throw Throwables.propagate(e);
public void shouldNotBeAbleToWriteLargeAmountsIfSoftLimitSleepWillBeMoreThanConfiguredBackoffTime() {
// Have one limit-exceeding write
// as the rate-limiter will let anything pass through until the limit is exceeded.
writeNTodosOfSize(12, 1_000);

assertThatThrownBy(() -> writeNTodosOfSize(200, 1_000))
.isInstanceOf(RateLimitExceededException.class)
.hasMessage("Rate limited. Available capacity has been exhausted.");
}


public static void writeNTodosOfSize(int numTodos, int size) {
serializableTransactionManager.runTaskWithRetry((transaction) -> {
Map<Cell, byte[]> write = new HashMap<>();
for (int i = 0; i < numTodos; i++) {
Cell thisCell = Cell.create(ValueType.FIXED_LONG.convertFromJava(random.nextLong()),
TodoSchema.todoTextColumn());
write.put(thisCell, ValueType.STRING.convertFromJava(getTodoOfSize(size)));
}
transaction.put(TodoSchema.todoTable(), write);
return null;
});
assertThat(exceptionCounter.get()).isGreaterThan(0);

}

private static AtlasDbConfig getAtlasDbConfig() {
DockerPort cassandraPort = docker.containers()
.container("cassandra")
.port(CASSANDRA_PORT_NUMBER);

InetSocketAddress cassandraAddress = new InetSocketAddress(cassandraPort.getIp(),
cassandraPort.getExternalPort());

CassandraKeyValueServiceConfig kvsConfig = ImmutableCassandraKeyValueServiceConfig.builder()
.servers(ImmutableList.of(cassandraAddress))
.credentials(ImmutableCassandraCredentialsConfig.builder()
.username("cassandra")
.password("cassandra")
.build())
.ssl(false)
.replicationFactor(1)
.autoRefreshNodes(false)
.build();

return ImmutableAtlasDbConfig.builder()
.namespace("qosete")
.keyValueService(kvsConfig)
.initializeAsync(true)
.build();
}

private static Optional<AtlasDbRuntimeConfig> getAtlasDbRuntimeConfig() {
return Optional.of(ImmutableAtlasDbRuntimeConfig.builder()
.sweep(ImmutableSweepConfig.builder().enabled(false).build())
.qos(ImmutableQosClientConfig.builder()
.limits(ImmutableQosLimitsConfig.builder()
.readBytesPerSecond(readBytesPerSecond)
.writeBytesPerSecond(writeBytesPerSecond)
.build())
.maxBackoffSleepTime(HumanReadableDuration.seconds(2))
.build())
.build());
}

@After
Expand Down

0 comments on commit 504f345

Please sign in to comment.