Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Async get on transactions #4301

Merged
merged 16 commits into from
Oct 16, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.function.BiFunction;
import java.util.stream.Stream;

import com.google.common.util.concurrent.ListenableFuture;
import com.palantir.atlasdb.keyvalue.api.BatchColumnRangeSelection;
import com.palantir.atlasdb.keyvalue.api.Cell;
import com.palantir.atlasdb.keyvalue.api.ColumnRangeSelection;
Expand Down Expand Up @@ -238,4 +239,7 @@ enum TransactionType {
default void disableReadWriteConflictChecking(TableReference tableRef) {
throw new UnsupportedOperationException();
}

@Idempotent
ListenableFuture<Map<Cell, byte[]>> getAsync(TableReference tableRef, Set<Cell> cells);
OStevan marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
*/
package com.palantir.atlasdb.keyvalue.cassandra;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
Expand All @@ -28,13 +31,18 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import com.google.common.base.Suppliers;
import com.google.common.util.concurrent.Futures;
import com.palantir.atlasdb.containers.CassandraResource;
import com.palantir.atlasdb.keyvalue.api.KeyAlreadyExistsException;
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.transaction.api.Transaction;
import com.palantir.atlasdb.transaction.impl.AbstractTransactionTest;
import com.palantir.atlasdb.transaction.impl.GetAsyncDelegate;
import com.palantir.atlasdb.transaction.impl.GetSynchronousDelegate;
import com.palantir.atlasdb.transaction.impl.TransactionTables;
import com.palantir.atlasdb.transaction.service.SimpleTransactionService;
import com.palantir.atlasdb.transaction.service.TransactionService;
Expand All @@ -44,10 +52,22 @@
import com.palantir.timestamp.TimestampManagementService;

@ShouldRetry // The first test can fail with a TException: No host tried was able to create the keyspace requested.
@RunWith(Parameterized.class)
public class CassandraKeyValueServiceTransactionIntegrationTest extends AbstractTransactionTest {
private static final String SYNC = "sync";
OStevan marked this conversation as resolved.
Show resolved Hide resolved
private static final String ASYNC = "async";
private static final Supplier<KeyValueService> KVS_SUPPLIER =
Suppliers.memoize(CassandraKeyValueServiceTransactionIntegrationTest::createAndRegisterKeyValueService);

@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> data() {
Object[][] data = new Object[][] {
{SYNC, (Function<Transaction, Transaction>) GetSynchronousDelegate::new},
OStevan marked this conversation as resolved.
Show resolved Hide resolved
{ASYNC, (Function<Transaction, Transaction>) GetAsyncDelegate::new}
};
return Arrays.asList(data);
}

@ClassRule
public static final CassandraResource CASSANDRA = new CassandraResource(KVS_SUPPLIER);

Expand All @@ -57,9 +77,15 @@ public class CassandraKeyValueServiceTransactionIntegrationTest extends Abstract

@Rule
public final TestRule flakeRetryingRule = new FlakeRetryingRule();
private final String name;
private final Function<Transaction, Transaction> transactionWrapper;

public CassandraKeyValueServiceTransactionIntegrationTest() {
public CassandraKeyValueServiceTransactionIntegrationTest(
String name,
Function<Transaction, Transaction> transactionWrapper) {
super(CASSANDRA, CASSANDRA);
this.name = name;
this.transactionWrapper = transactionWrapper;
}

@Before
Expand Down Expand Up @@ -106,4 +132,8 @@ protected boolean supportsReverse() {
return false;
}

@Override
protected Transaction startTransaction() {
return transactionWrapper.apply(super.startTransaction());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,50 @@

import static org.mockito.Mockito.mock;

import java.util.Arrays;
import java.util.Collection;
import java.util.function.Function;

import org.junit.ClassRule;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import com.palantir.atlasdb.containers.CassandraResource;
import com.palantir.atlasdb.sweep.metrics.TargetedSweepMetrics;
import com.palantir.atlasdb.sweep.queue.MultiTableSweepQueueWriter;
import com.palantir.atlasdb.sweep.queue.SweepQueue;
import com.palantir.atlasdb.sweep.queue.TargetedSweeper;
import com.palantir.atlasdb.transaction.api.Transaction;
import com.palantir.atlasdb.transaction.impl.AbstractSerializableTransactionTest;
import com.palantir.atlasdb.transaction.impl.GetAsyncDelegate;
import com.palantir.atlasdb.transaction.impl.GetSynchronousDelegate;

@RunWith(Parameterized.class)
public class CassandraKvsSerializableTransactionTest extends AbstractSerializableTransactionTest {
@ClassRule
public static final CassandraResource CASSANDRA = new CassandraResource();

@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> data() {
Object[][] data = new Object[][] {
{"sync", (Function<Transaction, Transaction>) GetSynchronousDelegate::new},
{"async", (Function<Transaction, Transaction>) GetAsyncDelegate::new}
};
OStevan marked this conversation as resolved.
Show resolved Hide resolved
return Arrays.asList(data);
}

private final Function<Transaction, Transaction> transactionWrapper;

public CassandraKvsSerializableTransactionTest() {
public CassandraKvsSerializableTransactionTest(
String name,
Function<Transaction, Transaction> transactionWrapper) {
super(CASSANDRA, CASSANDRA);
this.transactionWrapper = transactionWrapper;
}

@Override
protected Transaction startTransaction() {
return transactionWrapper.apply(super.startTransaction());
}

@Override
Expand All @@ -54,5 +82,4 @@ protected MultiTableSweepQueueWriter getSweepQueueWriterInitialized() {
protected boolean supportsReverse() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.function.Predicate;
import java.util.function.Supplier;

import org.checkerframework.checker.nullness.compatqual.NullableDecl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -158,7 +157,7 @@ public static <T, E extends Exception> ListenableFuture<T> maybeLogAsync(
slowLogPredicate);
Futures.addCallback(future, new FutureCallback<T>() {
@Override
public void onSuccess(@NullableDecl T result) {
OStevan marked this conversation as resolved.
Show resolved Hide resolved
public void onSuccess(T result) {
monitor.registerResult(result);
monitor.log();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public SortedMap<byte[], RowResult<byte[]>> getRows(TableReference tableRef, Ite
@Override
public Map<Cell, byte[]> get(TableReference tableRef, Set<Cell> cells) {
try {
return get(
return getWithLoader(
tableRef,
cells,
(tableReference, toRead) -> Futures.immediateFuture(super.get(tableReference, toRead))).get();
Expand All @@ -128,7 +128,15 @@ public Map<Cell, byte[]> get(TableReference tableRef, Set<Cell> cells) {
}
}

private ListenableFuture<Map<Cell, byte[]>> get(TableReference tableRef, Set<Cell> cells, CellLoader cellLoader) {
@Override
public ListenableFuture<Map<Cell, byte[]>> getAsync(TableReference tableRef, Set<Cell> cells) {
return getWithLoader(tableRef, cells, super::getAsync);
}

private ListenableFuture<Map<Cell, byte[]>> getWithLoader(
TableReference tableRef,
Set<Cell> cells,
CellLoader cellLoader) {
if (cells.isEmpty()) {
OStevan marked this conversation as resolved.
Show resolved Hide resolved
return Futures.immediateFuture(ImmutableMap.of());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.stream.Stream;

import com.google.common.collect.ForwardingObject;
import com.google.common.util.concurrent.ListenableFuture;
import com.palantir.atlasdb.keyvalue.api.BatchColumnRangeSelection;
import com.palantir.atlasdb.keyvalue.api.Cell;
import com.palantir.atlasdb.keyvalue.api.ColumnRangeSelection;
Expand Down Expand Up @@ -175,4 +176,9 @@ public TransactionType getTransactionType() {
public void disableReadWriteConflictChecking(TableReference tableRef) {
delegate().disableReadWriteConflictChecking(tableRef);
}

@Override
public ListenableFuture<Map<Cell, byte[]>> getAsync(TableReference tableRef, Set<Cell> cells) {
return delegate().getAsync(tableRef, cells);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class NoDuplicateWritesTransaction extends ForwardingTransaction {
new CacheLoader<TableReference, Map<Cell, byte[]>>() {
@Override
public Map<Cell, byte[]> load(TableReference input) {
return Collections.synchronizedMap(Maps.<Cell, byte[]>newHashMap());
return Collections.synchronizedMap(Maps.newHashMap());
OStevan marked this conversation as resolved.
Show resolved Hide resolved
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.function.BiFunction;
import java.util.stream.Stream;

import com.google.common.util.concurrent.ListenableFuture;
import com.palantir.atlasdb.keyvalue.api.BatchColumnRangeSelection;
import com.palantir.atlasdb.keyvalue.api.Cell;
import com.palantir.atlasdb.keyvalue.api.ColumnRangeSelection;
Expand Down Expand Up @@ -142,4 +143,9 @@ public void delete(TableReference tableRef, Set<Cell> keys) {
throw new SafeIllegalArgumentException("This is a read only transaction.");
}

@Override
public ListenableFuture<Map<Cell, byte[]>> getAsync(TableReference tableRef, Set<Cell> cells) {
checkTableName(tableRef);
return delegate().getAsync(tableRef, cells);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public Entry<Cell, byte[]> next() {
@Idempotent
public Map<Cell, byte[]> get(TableReference tableRef, Set<Cell> cells) {
try {
return get(
return getWithLoader(
tableRef,
cells,
(tableReference, toRead) -> Futures.immediateFuture(super.get(tableRef, toRead))).get();
Expand All @@ -262,7 +262,15 @@ public Map<Cell, byte[]> get(TableReference tableRef, Set<Cell> cells) {
}
}

public ListenableFuture<Map<Cell, byte[]>> get(TableReference tableRef, Set<Cell> cells, CellLoader cellLoader) {
@Override
@Idempotent
public ListenableFuture<Map<Cell, byte[]>> getAsync(TableReference tableRef, Set<Cell> cells) {
return getWithLoader(tableRef, cells, super::getAsync);
}

private ListenableFuture<Map<Cell, byte[]>> getWithLoader(
TableReference tableRef, Set<Cell> cells,
CellLoader cellLoader) {
return Futures.transform(cellLoader.load(tableRef, cells),
loadedCells -> {
markCellsRead(tableRef, cells, loadedCells);
Expand Down
Loading