Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Refactorings based on comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
OStevan committed Oct 16, 2019
1 parent 2e9c61b commit 0b1374d
Show file tree
Hide file tree
Showing 13 changed files with 138 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ Map<byte[], Iterator<Map.Entry<Cell, byte[]>>> getRowsColumnRangeIterator(
@Idempotent
Map<Cell, byte[]> get(TableReference tableRef, Set<Cell> cells);

/**
* Gets the values associated for each {@code cells} from table specified by {@code tableRef}. It is not guaranteed
* that the actual implementations are in fact asynchronous.
*
* @param tableRef the table from which to get the values
* @param cells the cells for which we want to get the values
* @return a {@link Map} from {@link Cell} to {@code byte[]} representing cell/value pairs
*/
@Idempotent
ListenableFuture<Map<Cell, byte[]>> getAsync(TableReference tableRef, Set<Cell> cells);

/**
* Creates a visitable that scans the provided range.
*
Expand Down Expand Up @@ -239,7 +250,4 @@ enum TransactionType {
default void disableReadWriteConflictChecking(TableReference tableRef) {
throw new UnsupportedOperationException();
}

@Idempotent
ListenableFuture<Map<Cell, byte[]>> getAsync(TableReference tableRef, Set<Cell> cells);
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -80,6 +81,7 @@
import com.palantir.atlasdb.transaction.api.ConflictHandler;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.atlasdb.util.MetricsManagers;
import com.palantir.common.base.Throwables;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;

Expand All @@ -92,13 +94,15 @@ public class CassandraKeyValueServiceIntegrationTest extends AbstractKeyValueSer
private static final int ONE_HOUR_IN_SECONDS = 60 * 60;
private static final TableReference NEVER_SEEN = TableReference.createFromFullyQualifiedName("ns.never_seen");
private static final Cell CELL = Cell.create(PtBytes.toBytes("row"), PtBytes.toBytes("column"));
private static final String ASYNC = "async";
private static final String SYNC = "sync";
private final String name;

@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> data() {
Object[][] data = new Object[][] {
{"sync", (Function<CassandraKeyValueServiceImpl, CassandraKeyValueService>) SynchronousDelegate::new},
{"async", (Function<CassandraKeyValueServiceImpl, CassandraKeyValueService>) AsyncDelegate::new}
{SYNC, (Function<CassandraKeyValueServiceImpl, CassandraKeyValueService>) SynchronousDelegate::new},
{ASYNC, (Function<CassandraKeyValueServiceImpl, CassandraKeyValueService>) AsyncDelegate::new}
};
return Arrays.asList(data);
}
Expand Down Expand Up @@ -465,8 +469,7 @@ private static IllegalArgumentException getUnrecognizedKeyValueServiceException(
return new IllegalArgumentException("Can't run this cassandra-specific test against a non-cassandra KVS");
}

private static class SynchronousDelegate
implements AutoDelegate_CassandraKeyValueService {
private static class SynchronousDelegate implements AutoDelegate_CassandraKeyValueService {
private final CassandraKeyValueServiceImpl delegate;

SynchronousDelegate(CassandraKeyValueServiceImpl cassandraKeyValueService) {
Expand Down Expand Up @@ -495,8 +498,10 @@ public CassandraKeyValueServiceImpl delegate() {
public Map<Cell, Value> get(TableReference tableRef, Map<Cell, Long> timestampByCell) {
try {
return delegate.getAsync(tableRef, timestampByCell).get();
} catch (Exception e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw Throwables.rewrapAndThrowUncheckedException(e);
} catch (ExecutionException e) {
throw Throwables.rewrapAndThrowUncheckedException(e.getCause());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

Expand Down Expand Up @@ -62,8 +62,8 @@ public class CassandraKeyValueServiceTransactionIntegrationTest extends Abstract
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> data() {
Object[][] data = new Object[][] {
{SYNC, (Function<Transaction, Transaction>) GetSynchronousDelegate::new},
{ASYNC, (Function<Transaction, Transaction>) GetAsyncDelegate::new}
{SYNC, (UnaryOperator<Transaction>) GetSynchronousDelegate::new},
{ASYNC, (UnaryOperator<Transaction>) GetAsyncDelegate::new}
};
return Arrays.asList(data);
}
Expand All @@ -78,11 +78,11 @@ public static Collection<Object[]> data() {
@Rule
public final TestRule flakeRetryingRule = new FlakeRetryingRule();
private final String name;
private final Function<Transaction, Transaction> transactionWrapper;
private final UnaryOperator<Transaction> transactionWrapper;

public CassandraKeyValueServiceTransactionIntegrationTest(
String name,
Function<Transaction, Transaction> transactionWrapper) {
UnaryOperator<Transaction> transactionWrapper) {
super(CASSANDRA, CASSANDRA);
this.name = name;
this.transactionWrapper = transactionWrapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.function.Function;
import java.util.function.UnaryOperator;

import org.junit.ClassRule;
import org.junit.runner.RunWith;
Expand All @@ -39,21 +39,23 @@
public class CassandraKvsSerializableTransactionTest extends AbstractSerializableTransactionTest {
@ClassRule
public static final CassandraResource CASSANDRA = new CassandraResource();
private static final String SYNC = "sync";
private static final String ASYNC = "async";

@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> data() {
Object[][] data = new Object[][] {
{"sync", (Function<Transaction, Transaction>) GetSynchronousDelegate::new},
{"async", (Function<Transaction, Transaction>) GetAsyncDelegate::new}
{SYNC, (UnaryOperator<Transaction>) GetSynchronousDelegate::new},
{ASYNC, (UnaryOperator<Transaction>) GetAsyncDelegate::new}
};
return Arrays.asList(data);
}

private final Function<Transaction, Transaction> transactionWrapper;
private final UnaryOperator<Transaction> transactionWrapper;

public CassandraKvsSerializableTransactionTest(
String name,
Function<Transaction, Transaction> transactionWrapper) {
UnaryOperator<Transaction> transactionWrapper) {
super(CASSANDRA, CASSANDRA);
this.transactionWrapper = transactionWrapper;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.function.Predicate;
import java.util.function.Supplier;

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -157,7 +159,7 @@ public static <T, E extends Exception> ListenableFuture<T> maybeLogAsync(
slowLogPredicate);
Futures.addCallback(future, new FutureCallback<T>() {
@Override
public void onSuccess(T result) {
public void onSuccess(@Nullable T result) {
monitor.registerResult(result);
monitor.log();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ public Transaction delegate() {
public Map<Cell, byte[]> get(TableReference tableRef, Set<Cell> cells) {
try {
return super.getAsync(tableRef, cells).get();
} catch (InterruptedException | ExecutionException e) {
} catch (InterruptedException e) {
throw Throwables.rewrapAndThrowUncheckedException(e);
} catch (ExecutionException e) {
throw Throwables.rewrapAndThrowUncheckedException(e.getCause());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ public interface TestTransactionManager extends TransactionManager {
Transaction commitAndStartNewTransaction(Transaction txn);
Transaction createNewTransaction();
void overrideConflictHandlerForTable(TableReference table, ConflictHandler conflictHandler);
void setUnreadableTimestamp(long timestamp);
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ public long getUnreadableTimestamp() {
return unreadableTs.orElseGet(super::getUnreadableTimestamp);
}

@Override
public void setUnreadableTimestamp(long timestamp) {
unreadableTs = Optional.of(timestamp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,9 @@ public Transaction createNewTransaction() {
public void overrideConflictHandlerForTable(TableReference table, ConflictHandler conflictHandler) {
delegate.overrideConflictHandlerForTable(table, conflictHandler);
}

@Override
public void setUnreadableTimestamp(long timestamp) {
delegate.setUnreadableTimestamp(timestamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class AtlasDbTestCase {
protected InMemoryTimestampService timestampService;
protected ConflictDetectionManager conflictDetectionManager;
protected SweepStrategyManager sweepStrategyManager;
protected TestTransactionManagerImpl serializableTxManager;
protected TestTransactionManager serializableTxManager;
protected TestTransactionManager txManager;
protected TransactionService transactionService;
protected TargetedSweeper sweepQueue;
Expand All @@ -85,7 +85,13 @@ public void setUp() throws Exception {
sweepStrategyManager = SweepStrategyManagers.createDefault(keyValueService);

sweepQueue = spy(TargetedSweeper.createUninitializedForTest(() -> sweepQueueShards));
setUpTransactionManagers();
sweepQueue.initialize(serializableTxManager);
sweepTimestampSupplier = new SpecialTimestampsSupplier(
() -> txManager.getUnreadableTimestamp(), () -> txManager.getImmutableTimestamp());
}

protected void setUpTransactionManagers() {
serializableTxManager = new TestTransactionManagerImpl(
metricsManager,
keyValueService,
Expand All @@ -99,10 +105,7 @@ public void setUp() throws Exception {
sweepQueue,
MoreExecutors.newDirectExecutorService());

sweepQueue.initialize(serializableTxManager);
txManager = new CachingTestTransactionManager(serializableTxManager);
sweepTimestampSupplier = new SpecialTimestampsSupplier(
() -> txManager.getUnreadableTimestamp(), () -> txManager.getImmutableTimestamp());
}

protected KeyValueService getBaseKeyValueService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@
*/
package com.palantir.atlasdb.transaction.impl;


import static org.junit.Assert.assertEquals;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.UnaryOperator;

import org.jmock.Expectations;
import org.jmock.Mockery;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -54,8 +57,8 @@ public class CachingTransactionTest {
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> data() {
Object[][] data = new Object[][] {
{SYNC, (Function<Transaction, Transaction>) GetSynchronousDelegate::new},
{ASYNC, (Function<Transaction, Transaction>) GetAsyncDelegate::new}
{SYNC, (UnaryOperator<Transaction>) GetSynchronousDelegate::new},
{ASYNC, (UnaryOperator<Transaction>) GetAsyncDelegate::new}
};
return Arrays.asList(data);
}
Expand Down Expand Up @@ -95,8 +98,8 @@ public void testCacheEmptyGets() {
}
});

Assert.assertEquals(emptyResults, cachingTransaction.getRows(table, oneRow, oneColumn));
Assert.assertEquals(emptyResults, cachingTransaction.getRows(table, oneRow, oneColumn));
assertEquals(emptyResults, cachingTransaction.getRows(table, oneRow, oneColumn));
assertEquals(emptyResults, cachingTransaction.getRows(table, oneRow, oneColumn));

mockery.assertIsSatisfied();
}
Expand Down Expand Up @@ -127,8 +130,8 @@ public void testGetRows() {
}
});

Assert.assertEquals(oneResult, cachingTransaction.getRows(table, oneRow, oneColumn));
Assert.assertEquals(oneResult, cachingTransaction.getRows(table, oneRow, oneColumn));
assertEquals(oneResult, cachingTransaction.getRows(table, oneRow, oneColumn));
assertEquals(oneResult, cachingTransaction.getRows(table, oneRow, oneColumn));

mockery.assertIsSatisfied();
}
Expand Down Expand Up @@ -157,8 +160,8 @@ private void testGetCellResults(Cell cell, Map<Cell, byte[]> cellValueMap) {
final Set<Cell> cellSet = ImmutableSet.of(cell);
mockery.checking(expectationsMapping.get(name).apply(cellSet, cellValueMap));

Assert.assertEquals(cellValueMap, cachingTransaction.get(table, cellSet));
Assert.assertEquals(cellValueMap, cachingTransaction.get(table, cellSet));
assertEquals(cellValueMap, cachingTransaction.get(table, cellSet));
assertEquals(cellValueMap, cachingTransaction.get(table, cellSet));

mockery.assertIsSatisfied();
}
Expand Down
Loading

0 comments on commit 0b1374d

Please sign in to comment.