From 0b1374db77cabb51a4b3991c136f33583a752df4 Mon Sep 17 00:00:00 2001 From: Stevan Ognjanovic Date: Wed, 16 Oct 2019 10:42:59 +0100 Subject: [PATCH] Refactorings based on comments. --- .../atlasdb/transaction/api/Transaction.java | 14 +- ...ssandraKeyValueServiceIntegrationTest.java | 17 +- ...alueServiceTransactionIntegrationTest.java | 10 +- ...ssandraKvsSerializableTransactionTest.java | 12 +- .../atlasdb/logging/KvsProfilingLogger.java | 4 +- .../transaction/impl/GetAsyncDelegate.java | 4 +- .../impl/TestTransactionManager.java | 1 + .../impl/TestTransactionManagerImpl.java | 1 + .../impl/WrappingTestTransactionManager.java | 5 + .../com/palantir/atlasdb/AtlasDbTestCase.java | 11 +- .../impl/CachingTransactionTest.java | 21 ++- .../impl/SnapshotTransactionTest.java | 158 ++++++++---------- changelog/@unreleased/pr-4301.v2.yml | 2 +- 13 files changed, 138 insertions(+), 122 deletions(-) diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/Transaction.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/Transaction.java index 8d1dea3cc59..14e0b8a577e 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/Transaction.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/Transaction.java @@ -69,6 +69,17 @@ Map>> getRowsColumnRangeIterator( @Idempotent Map get(TableReference tableRef, Set cells); + /** + * Gets the values associated for each {@code cells} from table specified by {@code tableRef}. It is not guaranteed + * that the actual implementations are in fact asynchronous. + * + * @param tableRef the table from which to get the values + * @param cells the cells for which we want to get the values + * @return a {@link Map} from {@link Cell} to {@code byte[]} representing cell/value pairs + */ + @Idempotent + ListenableFuture> getAsync(TableReference tableRef, Set cells); + /** * Creates a visitable that scans the provided range. * @@ -239,7 +250,4 @@ enum TransactionType { default void disableReadWriteConflictChecking(TableReference tableRef) { throw new UnsupportedOperationException(); } - - @Idempotent - ListenableFuture> getAsync(TableReference tableRef, Set cells); } diff --git a/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceIntegrationTest.java b/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceIntegrationTest.java index bb35aa4abc2..e4ad4040a6d 100644 --- a/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceIntegrationTest.java +++ b/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceIntegrationTest.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; @@ -80,6 +81,7 @@ import com.palantir.atlasdb.transaction.api.ConflictHandler; import com.palantir.atlasdb.util.MetricsManager; import com.palantir.atlasdb.util.MetricsManagers; +import com.palantir.common.base.Throwables; import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.UnsafeArg; @@ -92,13 +94,15 @@ public class CassandraKeyValueServiceIntegrationTest extends AbstractKeyValueSer private static final int ONE_HOUR_IN_SECONDS = 60 * 60; private static final TableReference NEVER_SEEN = TableReference.createFromFullyQualifiedName("ns.never_seen"); private static final Cell CELL = Cell.create(PtBytes.toBytes("row"), PtBytes.toBytes("column")); + private static final String ASYNC = "async"; + private static final String SYNC = "sync"; private final String name; @Parameterized.Parameters(name = "{0}") public static Collection data() { Object[][] data = new Object[][] { - {"sync", (Function) SynchronousDelegate::new}, - {"async", (Function) AsyncDelegate::new} + {SYNC, (Function) SynchronousDelegate::new}, + {ASYNC, (Function) AsyncDelegate::new} }; return Arrays.asList(data); } @@ -465,8 +469,7 @@ private static IllegalArgumentException getUnrecognizedKeyValueServiceException( return new IllegalArgumentException("Can't run this cassandra-specific test against a non-cassandra KVS"); } - private static class SynchronousDelegate - implements AutoDelegate_CassandraKeyValueService { + private static class SynchronousDelegate implements AutoDelegate_CassandraKeyValueService { private final CassandraKeyValueServiceImpl delegate; SynchronousDelegate(CassandraKeyValueServiceImpl cassandraKeyValueService) { @@ -495,8 +498,10 @@ public CassandraKeyValueServiceImpl delegate() { public Map get(TableReference tableRef, Map timestampByCell) { try { return delegate.getAsync(tableRef, timestampByCell).get(); - } catch (Exception e) { - throw new RuntimeException(e); + } catch (InterruptedException e) { + throw Throwables.rewrapAndThrowUncheckedException(e); + } catch (ExecutionException e) { + throw Throwables.rewrapAndThrowUncheckedException(e.getCause()); } } } diff --git a/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceTransactionIntegrationTest.java b/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceTransactionIntegrationTest.java index 4b8c2a557c2..9ae2ddc9246 100644 --- a/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceTransactionIntegrationTest.java +++ b/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKeyValueServiceTransactionIntegrationTest.java @@ -21,8 +21,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.function.Function; import java.util.function.Supplier; +import java.util.function.UnaryOperator; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -62,8 +62,8 @@ public class CassandraKeyValueServiceTransactionIntegrationTest extends Abstract @Parameterized.Parameters(name = "{0}") public static Collection data() { Object[][] data = new Object[][] { - {SYNC, (Function) GetSynchronousDelegate::new}, - {ASYNC, (Function) GetAsyncDelegate::new} + {SYNC, (UnaryOperator) GetSynchronousDelegate::new}, + {ASYNC, (UnaryOperator) GetAsyncDelegate::new} }; return Arrays.asList(data); } @@ -78,11 +78,11 @@ public static Collection data() { @Rule public final TestRule flakeRetryingRule = new FlakeRetryingRule(); private final String name; - private final Function transactionWrapper; + private final UnaryOperator transactionWrapper; public CassandraKeyValueServiceTransactionIntegrationTest( String name, - Function transactionWrapper) { + UnaryOperator transactionWrapper) { super(CASSANDRA, CASSANDRA); this.name = name; this.transactionWrapper = transactionWrapper; diff --git a/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKvsSerializableTransactionTest.java b/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKvsSerializableTransactionTest.java index 95a46833e9c..2e89882b66a 100644 --- a/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKvsSerializableTransactionTest.java +++ b/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/CassandraKvsSerializableTransactionTest.java @@ -19,7 +19,7 @@ import java.util.Arrays; import java.util.Collection; -import java.util.function.Function; +import java.util.function.UnaryOperator; import org.junit.ClassRule; import org.junit.runner.RunWith; @@ -39,21 +39,23 @@ public class CassandraKvsSerializableTransactionTest extends AbstractSerializableTransactionTest { @ClassRule public static final CassandraResource CASSANDRA = new CassandraResource(); + private static final String SYNC = "sync"; + private static final String ASYNC = "async"; @Parameterized.Parameters(name = "{0}") public static Collection data() { Object[][] data = new Object[][] { - {"sync", (Function) GetSynchronousDelegate::new}, - {"async", (Function) GetAsyncDelegate::new} + {SYNC, (UnaryOperator) GetSynchronousDelegate::new}, + {ASYNC, (UnaryOperator) GetAsyncDelegate::new} }; return Arrays.asList(data); } - private final Function transactionWrapper; + private final UnaryOperator transactionWrapper; public CassandraKvsSerializableTransactionTest( String name, - Function transactionWrapper) { + UnaryOperator transactionWrapper) { super(CASSANDRA, CASSANDRA); this.transactionWrapper = transactionWrapper; } diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/logging/KvsProfilingLogger.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/logging/KvsProfilingLogger.java index 9a13be480e1..5388a5b0b1c 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/logging/KvsProfilingLogger.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/logging/KvsProfilingLogger.java @@ -23,6 +23,8 @@ import java.util.function.Predicate; import java.util.function.Supplier; +import javax.annotation.Nullable; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -157,7 +159,7 @@ public static ListenableFuture maybeLogAsync( slowLogPredicate); Futures.addCallback(future, new FutureCallback() { @Override - public void onSuccess(T result) { + public void onSuccess(@Nullable T result) { monitor.registerResult(result); monitor.log(); } diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/GetAsyncDelegate.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/GetAsyncDelegate.java index 69c955d375a..605c23c41a4 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/GetAsyncDelegate.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/GetAsyncDelegate.java @@ -42,7 +42,9 @@ public Transaction delegate() { public Map get(TableReference tableRef, Set cells) { try { return super.getAsync(tableRef, cells).get(); - } catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException e) { + throw Throwables.rewrapAndThrowUncheckedException(e); + } catch (ExecutionException e) { throw Throwables.rewrapAndThrowUncheckedException(e.getCause()); } } diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TestTransactionManager.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TestTransactionManager.java index 7052171b521..e3ba7a4dbeb 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TestTransactionManager.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TestTransactionManager.java @@ -24,4 +24,5 @@ public interface TestTransactionManager extends TransactionManager { Transaction commitAndStartNewTransaction(Transaction txn); Transaction createNewTransaction(); void overrideConflictHandlerForTable(TableReference table, ConflictHandler conflictHandler); + void setUnreadableTimestamp(long timestamp); } diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TestTransactionManagerImpl.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TestTransactionManagerImpl.java index 604295f5c78..f48146579f5 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TestTransactionManagerImpl.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TestTransactionManagerImpl.java @@ -166,6 +166,7 @@ public long getUnreadableTimestamp() { return unreadableTs.orElseGet(super::getUnreadableTimestamp); } + @Override public void setUnreadableTimestamp(long timestamp) { unreadableTs = Optional.of(timestamp); } diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/WrappingTestTransactionManager.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/WrappingTestTransactionManager.java index df20597d29f..25d37217a66 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/WrappingTestTransactionManager.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/WrappingTestTransactionManager.java @@ -42,4 +42,9 @@ public Transaction createNewTransaction() { public void overrideConflictHandlerForTable(TableReference table, ConflictHandler conflictHandler) { delegate.overrideConflictHandlerForTable(table, conflictHandler); } + + @Override + public void setUnreadableTimestamp(long timestamp) { + delegate.setUnreadableTimestamp(timestamp); + } } diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/AtlasDbTestCase.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/AtlasDbTestCase.java index ac59acadf02..d09c2867147 100644 --- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/AtlasDbTestCase.java +++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/AtlasDbTestCase.java @@ -64,7 +64,7 @@ public class AtlasDbTestCase { protected InMemoryTimestampService timestampService; protected ConflictDetectionManager conflictDetectionManager; protected SweepStrategyManager sweepStrategyManager; - protected TestTransactionManagerImpl serializableTxManager; + protected TestTransactionManager serializableTxManager; protected TestTransactionManager txManager; protected TransactionService transactionService; protected TargetedSweeper sweepQueue; @@ -85,7 +85,13 @@ public void setUp() throws Exception { sweepStrategyManager = SweepStrategyManagers.createDefault(keyValueService); sweepQueue = spy(TargetedSweeper.createUninitializedForTest(() -> sweepQueueShards)); + setUpTransactionManagers(); + sweepQueue.initialize(serializableTxManager); + sweepTimestampSupplier = new SpecialTimestampsSupplier( + () -> txManager.getUnreadableTimestamp(), () -> txManager.getImmutableTimestamp()); + } + protected void setUpTransactionManagers() { serializableTxManager = new TestTransactionManagerImpl( metricsManager, keyValueService, @@ -99,10 +105,7 @@ public void setUp() throws Exception { sweepQueue, MoreExecutors.newDirectExecutorService()); - sweepQueue.initialize(serializableTxManager); txManager = new CachingTestTransactionManager(serializableTxManager); - sweepTimestampSupplier = new SpecialTimestampsSupplier( - () -> txManager.getUnreadableTimestamp(), () -> txManager.getImmutableTimestamp()); } protected KeyValueService getBaseKeyValueService() { diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/CachingTransactionTest.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/CachingTransactionTest.java index 31cee914369..9c65b245751 100644 --- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/CachingTransactionTest.java +++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/CachingTransactionTest.java @@ -15,6 +15,9 @@ */ package com.palantir.atlasdb.transaction.impl; + +import static org.junit.Assert.assertEquals; + import java.util.Arrays; import java.util.Collection; import java.util.Map; @@ -22,10 +25,10 @@ import java.util.SortedMap; import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.UnaryOperator; import org.jmock.Expectations; import org.jmock.Mockery; -import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -54,8 +57,8 @@ public class CachingTransactionTest { @Parameterized.Parameters(name = "{0}") public static Collection data() { Object[][] data = new Object[][] { - {SYNC, (Function) GetSynchronousDelegate::new}, - {ASYNC, (Function) GetAsyncDelegate::new} + {SYNC, (UnaryOperator) GetSynchronousDelegate::new}, + {ASYNC, (UnaryOperator) GetAsyncDelegate::new} }; return Arrays.asList(data); } @@ -95,8 +98,8 @@ public void testCacheEmptyGets() { } }); - Assert.assertEquals(emptyResults, cachingTransaction.getRows(table, oneRow, oneColumn)); - Assert.assertEquals(emptyResults, cachingTransaction.getRows(table, oneRow, oneColumn)); + assertEquals(emptyResults, cachingTransaction.getRows(table, oneRow, oneColumn)); + assertEquals(emptyResults, cachingTransaction.getRows(table, oneRow, oneColumn)); mockery.assertIsSatisfied(); } @@ -127,8 +130,8 @@ public void testGetRows() { } }); - Assert.assertEquals(oneResult, cachingTransaction.getRows(table, oneRow, oneColumn)); - Assert.assertEquals(oneResult, cachingTransaction.getRows(table, oneRow, oneColumn)); + assertEquals(oneResult, cachingTransaction.getRows(table, oneRow, oneColumn)); + assertEquals(oneResult, cachingTransaction.getRows(table, oneRow, oneColumn)); mockery.assertIsSatisfied(); } @@ -157,8 +160,8 @@ private void testGetCellResults(Cell cell, Map cellValueMap) { final Set cellSet = ImmutableSet.of(cell); mockery.checking(expectationsMapping.get(name).apply(cellSet, cellValueMap)); - Assert.assertEquals(cellValueMap, cachingTransaction.get(table, cellSet)); - Assert.assertEquals(cellValueMap, cachingTransaction.get(table, cellSet)); + assertEquals(cellValueMap, cachingTransaction.get(table, cellSet)); + assertEquals(cellValueMap, cachingTransaction.get(table, cellSet)); mockery.assertIsSatisfied(); } diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionTest.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionTest.java index adf658b18d4..bf941fcf86f 100644 --- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionTest.java +++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionTest.java @@ -57,6 +57,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; +import java.util.function.UnaryOperator; import java.util.stream.Collectors; import org.apache.commons.lang3.mutable.MutableInt; @@ -110,7 +111,6 @@ import com.palantir.atlasdb.transaction.ImmutableTransactionConfig; import com.palantir.atlasdb.transaction.TransactionConfig; import com.palantir.atlasdb.transaction.api.AtlasDbConstraintCheckingMode; -import com.palantir.atlasdb.transaction.api.AutoDelegate_TransactionManager; import com.palantir.atlasdb.transaction.api.ConflictHandler; import com.palantir.atlasdb.transaction.api.LockAwareTransactionTask; import com.palantir.atlasdb.transaction.api.PreCommitCondition; @@ -121,7 +121,6 @@ import com.palantir.atlasdb.transaction.api.TransactionFailedRetriableException; import com.palantir.atlasdb.transaction.api.TransactionLockTimeoutException; import com.palantir.atlasdb.transaction.api.TransactionLockTimeoutNonRetriableException; -import com.palantir.atlasdb.transaction.api.TransactionManager; import com.palantir.atlasdb.transaction.api.TransactionReadSentinelBehavior; import com.palantir.atlasdb.transaction.impl.metrics.TransactionOutcomeMetrics; import com.palantir.atlasdb.transaction.impl.metrics.TransactionOutcomeMetricsAssert; @@ -151,18 +150,37 @@ public class SnapshotTransactionTest extends AtlasDbTestCase { private static final String SYNC = "sync"; private static final String ASYNC = "async"; + @Parameterized.Parameters(name = "{0}") + public static Collection data() { + Object[][] data = new Object[][] { + {SYNC, (UnaryOperator) GetSynchronousDelegate::new}, + {ASYNC, (UnaryOperator) GetAsyncDelegate::new} + }; + return Arrays.asList(data); + } private final String name; - private final Function transactionWrapper; - private final Map> + private final UnaryOperator transactionWrapper; + private final Map expectationsMapping = - ImmutableMap.>builder() + ImmutableMap.builder() .put(SYNC, SnapshotTransactionTest.this::syncGetExpectation) .put(ASYNC, SnapshotTransactionTest.this::asyncGetExpectation) .build(); + private TransactionConfig transactionConfig; + private final TimestampCache timestampCache = new DefaultTimestampCache( + metricsManager.getRegistry(), () -> AtlasDbConstants.DEFAULT_TIMESTAMP_CACHE_SIZE); + private final ExecutorService getRangesExecutor = Executors.newFixedThreadPool(8); + private final int defaultGetRangesConcurrency = 2; + private final TransactionOutcomeMetrics transactionOutcomeMetrics + = TransactionOutcomeMetrics.create(metricsManager); @FunctionalInterface - interface ExpectationFactory { - Five apply(One one, Two two, Three three, Four four) throws Exception; + interface ExpectationFactory { + Expectations apply( + KeyValueService keyValueService, + Cell cell, + long transactionTs, + LockService lockService) throws InterruptedException; } private Expectations syncGetExpectation( @@ -170,49 +188,24 @@ private Expectations syncGetExpectation( Cell cell, long transactionTs, LockService lockMock) { - try { - return new Expectations() {{ - oneOf(kvMock).get(TABLE, ImmutableMap.of(cell, transactionTs)); - will(throwException(new RuntimeException())); - never(lockMock).lockWithFullLockResponse(with(LockClient.ANONYMOUS), with(any(LockRequest.class))); - }}; - } catch (Exception e) { - throw new RuntimeException(e.getCause()); - } + return new Expectations() {{ + oneOf(kvMock).get(TABLE, ImmutableMap.of(cell, transactionTs)); + will(throwException(new RuntimeException())); + }}; } private Expectations asyncGetExpectation( KeyValueService kvMock, Cell cell, long transactionTs, - LockService lockMock) throws Exception { + LockService lockMock) { return new Expectations() {{ oneOf(kvMock).getAsync(TABLE, ImmutableMap.of(cell, transactionTs)); will(returnValue(Futures.immediateFailedFuture(new RuntimeException()))); - never(lockMock).lockWithFullLockResponse(with(LockClient.ANONYMOUS), with(any(LockRequest.class))); }}; } - private TransactionConfig transactionConfig; - - private final TimestampCache timestampCache = new DefaultTimestampCache( - metricsManager.getRegistry(), () -> AtlasDbConstants.DEFAULT_TIMESTAMP_CACHE_SIZE); - private final ExecutorService getRangesExecutor = Executors.newFixedThreadPool(8); - private final int defaultGetRangesConcurrency = 2; - private final TransactionOutcomeMetrics transactionOutcomeMetrics - = TransactionOutcomeMetrics.create(metricsManager); - private WrappingTestTransactionManager wrappedSerializableTxManager; - - @Parameterized.Parameters(name = "{0}") - public static Collection data() { - Object[][] data = new Object[][] { - {SYNC, (Function) GetSynchronousDelegate::new}, - {ASYNC, (Function) GetAsyncDelegate::new} - }; - return Arrays.asList(data); - } - - public SnapshotTransactionTest(String name, Function transactionWrapper) { + public SnapshotTransactionTest(String name, UnaryOperator transactionWrapper) { this.name = name; this.transactionWrapper = transactionWrapper; } @@ -284,8 +277,6 @@ public void cleanup() {} public void setUp() throws Exception { super.setUp(); - txManager = new WrappingTestTransactionManager(txManager, transactionWrapper); - wrappedSerializableTxManager = new WrappingTestTransactionManager(serializableTxManager, transactionWrapper); transactionConfig = ImmutableTransactionConfig.builder().build(); keyValueService.createTable(TABLE, AtlasDbConstants.GENERIC_TABLE_METADATA); @@ -299,6 +290,13 @@ public void setUp() throws Exception { getTableMetadataForSweepStrategy(SweepStrategy.CONSERVATIVE).persistToBytes()); } + @Override + protected void setUpTransactionManagers() { + super.setUpTransactionManagers(); + txManager = new WrappingTestTransactionManagerImpl(txManager, transactionWrapper); + serializableTxManager = new WrappingTestTransactionManagerImpl(serializableTxManager, transactionWrapper); + } + @Test public void testConcurrentWriteChangedConflicts() throws InterruptedException, ExecutionException { overrideConflictHandlerForTable(TABLE, ConflictHandler.RETRY_ON_VALUE_CHANGED); @@ -339,10 +337,10 @@ public void testImmutableTs() throws Exception { @Test public void testLockAfterGet() throws Exception { byte[] rowName = PtBytes.toBytes("1"); - Mockery m = new Mockery(); - m.setThreadingPolicy(new Synchroniser()); - final KeyValueService kvMock = m.mock(KeyValueService.class); - final LockService lockMock = m.mock(LockService.class); + Mockery mockery = new Mockery(); + mockery.setThreadingPolicy(new Synchroniser()); + final KeyValueService kvMock = mockery.mock(KeyValueService.class); + final LockService lockMock = mockery.mock(LockService.class); LockService lock = MultiDelegateProxy.newProxyInstance(LockService.class, lockService, lockMock); final Cell cell = Cell.create(rowName, rowName); @@ -351,7 +349,10 @@ public void testLockAfterGet() throws Exception { final long transactionTs = timestampService.getFreshTimestamp(); keyValueService.put(TABLE, ImmutableMap.of(cell, PtBytes.EMPTY_BYTE_ARRAY), startTs); - m.checking(expectationsMapping.get(name).apply(kvMock, cell, transactionTs, lockMock)); + mockery.checking(expectationsMapping.get(name).apply(kvMock, cell, transactionTs, lockMock)); + mockery.checking(new Expectations() {{ + never(lockMock).lockWithFullLockResponse(with(LockClient.ANONYMOUS), with(any(LockRequest.class))); + }}); Transaction snapshot = transactionWrapper.apply(new SnapshotTransaction(metricsManager, kvMock, @@ -382,7 +383,7 @@ public void testLockAfterGet() throws Exception { //expected } - m.assertIsSatisfied(); + mockery.assertIsSatisfied(); } @Ignore("Was ignored long ago, and now we need to fix the mocking logic.") @@ -457,7 +458,7 @@ public void testTransactionAtomicity() throws Exception { Random random = new Random(1); final UnstableKeyValueService unstableKvs = new UnstableKeyValueService(keyValueService, random); - final TestTransactionManager unstableTransactionManager = new WrappingTestTransactionManager( + final TestTransactionManager unstableTransactionManager = new WrappingTestTransactionManagerImpl( new TestTransactionManagerImpl( metricsManager, unstableKvs, @@ -600,7 +601,7 @@ public void testGetRowsLocalWritesWithColumnSelection() { byte[] row1Column1Value = BigInteger.valueOf(1).toByteArray(); byte[] row1Column2Value = BigInteger.valueOf(2).toByteArray(); - Transaction snapshotTx = wrappedSerializableTxManager.createNewTransaction(); + Transaction snapshotTx = serializableTxManager.createNewTransaction(); snapshotTx.put(TABLE, ImmutableMap.of( row1Column1, row1Column1Value, row1Column2, row1Column2Value)); @@ -900,7 +901,7 @@ public void noRetryOnExpiredLockTokens() throws InterruptedException { @Test public void commitIfPreCommitConditionSucceeds() { - wrappedSerializableTxManager.runTaskWithConditionThrowOnConflict( + serializableTxManager.runTaskWithConditionThrowOnConflict( PreCommitConditions.NO_OP, (tx, condition) -> { tx.put(TABLE, ImmutableMap.of(TEST_CELL, PtBytes.toBytes("value"))); @@ -911,7 +912,7 @@ public void commitIfPreCommitConditionSucceeds() { @Test public void failToCommitIfPreCommitConditionFails() { try { - wrappedSerializableTxManager.runTaskWithConditionThrowOnConflict( + serializableTxManager.runTaskWithConditionThrowOnConflict( ALWAYS_FAILS_CONDITION, (tx, condition) -> { tx.put(TABLE, ImmutableMap.of(TEST_CELL, PtBytes.toBytes("value"))); @@ -929,7 +930,7 @@ public void commitWithPreCommitConditionOnRetry() { when(conditionSupplier.get()).thenReturn(ALWAYS_FAILS_CONDITION) .thenReturn(PreCommitConditions.NO_OP); - wrappedSerializableTxManager.runTaskWithConditionWithRetry( + serializableTxManager.runTaskWithConditionWithRetry( conditionSupplier, (tx, condition) -> { tx.put(TABLE, ImmutableMap.of(TEST_CELL, PtBytes.toBytes("value"))); @@ -985,7 +986,7 @@ public void cleanup() {} }; Supplier conditionSupplier = Suppliers.ofInstance(nonRetriableFailure); try { - wrappedSerializableTxManager.runTaskWithConditionWithRetry( + serializableTxManager.runTaskWithConditionWithRetry( conditionSupplier, (tx, condition) -> { tx.put(TABLE, ImmutableMap.of(TEST_CELL, PtBytes.toBytes("value"))); @@ -999,7 +1000,7 @@ public void cleanup() {} @Test public void readTransactionSucceedsIfConditionSucceeds() { - wrappedSerializableTxManager.runTaskWithConditionReadOnly( + serializableTxManager.runTaskWithConditionReadOnly( PreCommitConditions.NO_OP, (tx, condition) -> tx.get(TABLE, ImmutableSet.of(TEST_CELL))); TransactionOutcomeMetricsAssert.assertThat(transactionOutcomeMetrics) @@ -1009,7 +1010,7 @@ public void readTransactionSucceedsIfConditionSucceeds() { @Test public void readTransactionFailsIfConditionFails() { try { - wrappedSerializableTxManager.runTaskWithConditionReadOnly( + serializableTxManager.runTaskWithConditionReadOnly( ALWAYS_FAILS_CONDITION, (tx, condition) -> tx.get(TABLE, ImmutableSet.of(TEST_CELL))); fail(); @@ -1033,7 +1034,7 @@ public void cleanup() { } }; - wrappedSerializableTxManager.runTaskWithConditionThrowOnConflict( + serializableTxManager.runTaskWithConditionThrowOnConflict( succeedsCondition, (tx, condition) -> { tx.put(TABLE, ImmutableMap.of(TEST_CELL, PtBytes.toBytes("value"))); @@ -1041,7 +1042,7 @@ public void cleanup() { }); assertThat(counter.intValue(), is(1)); - wrappedSerializableTxManager.runTaskWithConditionReadOnly(succeedsCondition, + serializableTxManager.runTaskWithConditionReadOnly(succeedsCondition, (tx, condition) -> tx.get(TABLE, ImmutableSet.of(TEST_CELL))); assertThat(counter.intValue(), is(2)); } @@ -1062,7 +1063,7 @@ public void cleanup() { }; try { - wrappedSerializableTxManager.runTaskWithConditionThrowOnConflict(failsCondition, (tx, condition) -> { + serializableTxManager.runTaskWithConditionThrowOnConflict(failsCondition, (tx, condition) -> { tx.put(TABLE, ImmutableMap.of(TEST_CELL, PtBytes.toBytes("value"))); return null; }); @@ -1073,7 +1074,7 @@ public void cleanup() { assertThat(counter.intValue(), is(1)); try { - wrappedSerializableTxManager.runTaskWithConditionReadOnly(failsCondition, + serializableTxManager.runTaskWithConditionReadOnly(failsCondition, (tx, condition) -> tx.get(TABLE, ImmutableSet.of(TEST_CELL))); fail(); } catch (TransactionFailedRetriableException e) { @@ -1089,14 +1090,14 @@ public void getRowsColumnRangesReturnsInOrderInCaseOfAbortedTxns() { Cell secondCell = Cell.create(row, "b".getBytes()); byte[] value = new byte[1]; - wrappedSerializableTxManager.runTaskWithRetry(tx -> { + serializableTxManager.runTaskWithRetry(tx -> { tx.put(TABLE, ImmutableMap.of(firstCell, value, secondCell, value)); return null; }); // this will write into the DB, because the protocol demands we write before we get a commit timestamp RuntimeException conditionFailure = new RuntimeException(); - assertThatThrownBy(() -> wrappedSerializableTxManager.runTaskWithConditionWithRetry(() -> + assertThatThrownBy(() -> serializableTxManager.runTaskWithConditionWithRetry(() -> new PreCommitCondition() { @Override public void throwIfConditionInvalid(long timestamp) { @@ -1110,7 +1111,7 @@ public void cleanup() {} return null; })).isSameAs(conditionFailure); - List cells = wrappedSerializableTxManager.runTaskReadOnly(tx -> + List cells = serializableTxManager.runTaskReadOnly(tx -> BatchingVisitableView.of(tx.getRowsColumnRange( TABLE, ImmutableList.of(row), @@ -1460,42 +1461,25 @@ private long concurrentlyIncrementValueThousandTimesAndGet() throws InterruptedE * Hack to get reference to underlying {@link SnapshotTransaction}. See how transaction managers are composed at * {@link AtlasDbTestCase#setUp()}. */ - private static SnapshotTransaction unwrapSnapshotTransaction(Transaction cachingTransaction) { - Transaction unwrapped = ((CachingTransaction) cachingTransaction).delegate(); - return (SnapshotTransaction) unwrapped; + private static SnapshotTransaction unwrapSnapshotTransaction(Transaction transaction) { + if (transaction instanceof ForwardingTransaction) + return unwrapSnapshotTransaction(((ForwardingTransaction) transaction).delegate()); + return (SnapshotTransaction) transaction; } - private static class WrappingTestTransactionManager - implements TestTransactionManager, AutoDelegate_TransactionManager { - - private final TestTransactionManager testTransactionManager; + private static class WrappingTestTransactionManagerImpl extends WrappingTestTransactionManager { private final Function transactionWrapper; - WrappingTestTransactionManager( + WrappingTestTransactionManagerImpl( TestTransactionManager testTransactionManager, Function transactionWrapper) { - this.testTransactionManager = testTransactionManager; + super(testTransactionManager); this.transactionWrapper = transactionWrapper; } @Override - public TransactionManager delegate() { - return testTransactionManager; - } - - @Override - public Transaction commitAndStartNewTransaction(Transaction txn) { - return testTransactionManager.commitAndStartNewTransaction(txn); - } - - @Override - public Transaction createNewTransaction() { - return transactionWrapper.apply(testTransactionManager.createNewTransaction()); - } - - @Override - public void overrideConflictHandlerForTable(TableReference table, ConflictHandler conflictHandler) { - testTransactionManager.overrideConflictHandlerForTable(table, conflictHandler); + protected Transaction wrap(Transaction transaction) { + return transactionWrapper.apply(transaction); } } } diff --git a/changelog/@unreleased/pr-4301.v2.yml b/changelog/@unreleased/pr-4301.v2.yml index 948664eb2d1..796b1467108 100644 --- a/changelog/@unreleased/pr-4301.v2.yml +++ b/changelog/@unreleased/pr-4301.v2.yml @@ -1,6 +1,6 @@ type: feature feature: description: | - Transactions can now support asynchronous get operations. This has been implemented for all transaction types. Feature is designed for direct uses of transactions. + `Transaction` can now support asynchronous get operations. Feature is designed for direct users of the `Transaction` api. links: - https://github.com/palantir/atlasdb/pull/4301