diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java
index 250001cb493..d5612115752 100644
--- a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java
+++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/TransactionManagers.java
@@ -180,6 +180,11 @@ boolean allowHiddenTableAccess() {
return false;
}
+ @Value.Default
+ boolean validateLocksOnReads() {
+ return true;
+ }
+
abstract String userAgent();
abstract MetricRegistry globalMetricsRegistry();
@@ -375,7 +380,8 @@ runtimeConfigSupplier, registrar(), () -> LockServiceImpl.create(lockServerOptio
new TimestampCache(metricsManager.getRegistry(),
() -> runtimeConfigSupplier.get().getTimestampCacheSize()),
targetedSweep,
- callbacks),
+ callbacks,
+ validateLocksOnReads()),
closeables);
TransactionManager instrumentedTransactionManager =
AtlasDbMetrics.instrument(metricsManager.getRegistry(), TransactionManager.class, transactionManager);
diff --git a/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/TransactionManagerModule.java b/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/TransactionManagerModule.java
index c56b042ad28..0400a586c1f 100644
--- a/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/TransactionManagerModule.java
+++ b/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/TransactionManagerModule.java
@@ -112,7 +112,8 @@ public SerializableTransactionManager provideTransactionManager(MetricsManager m
config.atlasDbConfig().keyValueService().defaultGetRangesConcurrency(),
MultiTableSweepQueueWriter.NO_OP,
Executors.newSingleThreadExecutor(
- new NamedThreadFactory(TransactionManagerModule.class + "-delete-executor", true)));
+ new NamedThreadFactory(TransactionManagerModule.class + "-delete-executor", true)),
+ true);
}
}
diff --git a/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/test/TestTransactionManagerModule.java b/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/test/TestTransactionManagerModule.java
index a841b19331a..9cca9f0ecf4 100644
--- a/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/test/TestTransactionManagerModule.java
+++ b/atlasdb-dagger/src/main/java/com/palantir/atlasdb/services/test/TestTransactionManagerModule.java
@@ -115,7 +115,8 @@ public SerializableTransactionManager provideTransactionManager(MetricsManager m
config.atlasDbConfig().keyValueService().concurrentGetRangesThreadPoolSize(),
config.atlasDbConfig().keyValueService().defaultGetRangesConcurrency(),
MultiTableSweepQueueWriter.NO_OP,
- PTExecutors.newSingleThreadExecutor(true));
+ PTExecutors.newSingleThreadExecutor(true),
+ true);
}
}
diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/LockCheckingTransactionTask.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/LockCheckingTransactionTask.java
new file mode 100644
index 00000000000..7651e2fe4cf
--- /dev/null
+++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/LockCheckingTransactionTask.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2018 Palantir Technologies, Inc. All rights reserved.
+ *
+ * Licensed under the BSD-3 License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://opensource.org/licenses/BSD-3-Clause
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.palantir.atlasdb.transaction.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.palantir.atlasdb.transaction.api.Transaction;
+import com.palantir.atlasdb.transaction.api.TransactionFailedNonRetriableException;
+import com.palantir.atlasdb.transaction.api.TransactionLockTimeoutException;
+import com.palantir.atlasdb.transaction.api.TransactionTask;
+import com.palantir.lock.v2.LockToken;
+import com.palantir.lock.v2.TimelockService;
+
+/**
+ * Best effort attempt to keep backwards compatibility while making immutableTs lock validation optional on reads.
+ *
+ * If a read is performed without validating immutableTs lock on a thoroughly swept table, there is no guarantee on
+ * consistency of data read; as sweep may remove data that is being read. This will not cause any correctness
+ * issues as immutableTs lock will be checked on commit time, and transaction will be aborted if lock is invalid.
+ * Although this prevents committing corrupted values, reading inconsistent data may cause tasks to throw
+ * non-retriable exceptions; causing a possible behaviour change.
+ *
+ * This wrapper task will convert the exception thrown by the underlying task into a retriable lock timeout exception
+ * if immutableTs lock is not valid anymore.
+ */
+public class LockCheckingTransactionTask implements TransactionTask {
+ private final TransactionTask delegate;
+ private final TimelockService timelockService;
+ private final LockToken immutableTsLock;
+
+ public LockCheckingTransactionTask(TransactionTask delegate,
+ TimelockService timelockService,
+ LockToken immutableTsLock) {
+ this.delegate = delegate;
+ this.timelockService = timelockService;
+ this.immutableTsLock = immutableTsLock;
+ }
+
+ public T execute(Transaction transaction) throws E {
+ try {
+ return delegate.execute(transaction);
+ } catch (Exception ex) {
+ if (shouldRethrowWithoutLockValidation(ex) || immutableTsLockIsValid()) {
+ throw ex;
+ }
+ throw new TransactionLockTimeoutException(
+ "The following immutable timestamp lock is no longer valid: " + immutableTsLock);
+ }
+ }
+
+ private boolean shouldRethrowWithoutLockValidation(Exception ex) {
+ return ex instanceof InterruptedException || ex instanceof TransactionFailedNonRetriableException;
+ }
+
+ private boolean immutableTsLockIsValid() {
+ return !timelockService.refreshLockLeases(ImmutableSet.of(immutableTsLock)).isEmpty();
+ }
+}
diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SerializableTransaction.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SerializableTransaction.java
index ef6616a0bca..e528e5fce7b 100644
--- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SerializableTransaction.java
+++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SerializableTransaction.java
@@ -127,7 +127,8 @@ public SerializableTransaction(MetricsManager metricsManager,
int defaultGetRangesConcurrency,
MultiTableSweepQueueWriter sweepQueue,
ExecutorService deleteExecutor,
- CommitProfileProcessor commitProfileProcessor) {
+ CommitProfileProcessor commitProfileProcessor,
+ boolean validateLocksOnReads) {
super(metricsManager,
keyValueService,
timelockService,
@@ -149,7 +150,8 @@ public SerializableTransaction(MetricsManager metricsManager,
defaultGetRangesConcurrency,
sweepQueue,
deleteExecutor,
- commitProfileProcessor);
+ commitProfileProcessor,
+ validateLocksOnReads);
}
@Override
@@ -709,7 +711,8 @@ private Transaction getReadOnlyTransaction(final long commitTs) {
defaultGetRangesConcurrency,
MultiTableSweepQueueWriter.NO_OP,
deleteExecutor,
- commitProfileProcessor) {
+ commitProfileProcessor,
+ validateLocksOnReads) {
@Override
protected Map getCommitTimestamps(TableReference tableRef,
Iterable startTimestamps,
diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManager.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManager.java
index 723a155e95e..d9f187b452c 100644
--- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManager.java
+++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManager.java
@@ -198,7 +198,8 @@ public static TransactionManager create(MetricsManager metricsManager,
boolean initializeAsync,
TimestampCache timestampCache,
MultiTableSweepQueueWriter sweepQueueWriter,
- Callback callback) {
+ Callback callback,
+ boolean validateLocksOnReads) {
return create(metricsManager,
keyValueService,
@@ -219,7 +220,8 @@ public static TransactionManager create(MetricsManager metricsManager,
sweepQueueWriter,
callback,
PTExecutors.newSingleThreadScheduledExecutor(
- new NamedThreadFactory("AsyncInitializer-SerializableTransactionManager", true)));
+ new NamedThreadFactory("AsyncInitializer-SerializableTransactionManager", true)),
+ validateLocksOnReads);
}
public static TransactionManager create(MetricsManager metricsManager,
@@ -240,7 +242,8 @@ public static TransactionManager create(MetricsManager metricsManager,
TimestampCache timestampCache,
MultiTableSweepQueueWriter sweepQueueWriter,
Callback callback,
- ScheduledExecutorService initializer) {
+ ScheduledExecutorService initializer,
+ boolean validateLocksOnReads) {
TransactionManager transactionManager = new SerializableTransactionManager(
metricsManager,
keyValueService,
@@ -257,7 +260,8 @@ public static TransactionManager create(MetricsManager metricsManager,
concurrentGetRangesThreadPoolSize,
defaultGetRangesConcurrency,
sweepQueueWriter,
- PTExecutors.newSingleThreadExecutor(true));
+ PTExecutors.newSingleThreadExecutor(true),
+ validateLocksOnReads);
if (!initializeAsync) {
callback.runWithRetry(transactionManager);
@@ -297,7 +301,8 @@ public static SerializableTransactionManager createForTest(MetricsManager metric
concurrentGetRangesThreadPoolSize,
defaultGetRangesConcurrency,
sweepQueue,
- PTExecutors.newSingleThreadExecutor(true));
+ PTExecutors.newSingleThreadExecutor(true),
+ true);
}
public SerializableTransactionManager(MetricsManager metricsManager,
@@ -315,7 +320,8 @@ public SerializableTransactionManager(MetricsManager metricsManager,
int concurrentGetRangesThreadPoolSize,
int defaultGetRangesConcurrency,
MultiTableSweepQueueWriter sweepQueueWriter,
- ExecutorService deleteExecutor) {
+ ExecutorService deleteExecutor,
+ boolean validateLocksOnReads) {
super(
metricsManager,
keyValueService,
@@ -332,7 +338,8 @@ public SerializableTransactionManager(MetricsManager metricsManager,
defaultGetRangesConcurrency,
timestampCache,
sweepQueueWriter,
- deleteExecutor
+ deleteExecutor,
+ validateLocksOnReads
);
}
@@ -363,7 +370,8 @@ protected SnapshotTransaction createTransaction(long immutableTimestamp,
defaultGetRangesConcurrency,
sweepQueueWriter,
deleteExecutor,
- commitProfileProcessor);
+ commitProfileProcessor,
+ validateLocksOnReads);
}
@VisibleForTesting
diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/ShouldNotDeleteAndRollbackTransaction.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/ShouldNotDeleteAndRollbackTransaction.java
index e8bf846b5b2..8f434d4d369 100644
--- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/ShouldNotDeleteAndRollbackTransaction.java
+++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/ShouldNotDeleteAndRollbackTransaction.java
@@ -100,7 +100,8 @@ public ShouldNotDeleteAndRollbackTransaction(MetricsManager metricsManager,
defaultGetRangesConcurrency,
MultiTableSweepQueueWriter.NO_OP,
IGNORING_EXECUTOR,
- CommitProfileProcessor.createNonLogging(metricsManager));
+ CommitProfileProcessor.createNonLogging(metricsManager),
+ true);
}
@Override
diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java
index df51a04ce17..769cab06925 100644
--- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java
+++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java
@@ -218,6 +218,7 @@ private enum State {
private final Timer.Context transactionTimerContext;
protected final CommitProfileProcessor commitProfileProcessor;
protected final TransactionOutcomeMetrics transactionOutcomeMetrics;
+ protected final boolean validateLocksOnReads;
protected volatile boolean hasReads;
@@ -248,7 +249,8 @@ private enum State {
int defaultGetRangesConcurrency,
MultiTableSweepQueueWriter sweepQueue,
ExecutorService deleteExecutor,
- CommitProfileProcessor commitProfileProcessor) {
+ CommitProfileProcessor commitProfileProcessor,
+ boolean validateLocksOnReads) {
this.metricsManager = metricsManager;
this.transactionTimerContext = getTimer("transactionMillis").time();
this.keyValueService = keyValueService;
@@ -274,6 +276,7 @@ private enum State {
this.hasReads = false;
this.commitProfileProcessor = commitProfileProcessor;
this.transactionOutcomeMetrics = TransactionOutcomeMetrics.create(metricsManager);
+ this.validateLocksOnReads = validateLocksOnReads;
}
@Override
@@ -333,7 +336,7 @@ public SortedMap> getRows(TableReference tableRef, Ite
perfLogger.debug("getRows({}, {} rows) found {} rows, took {} ms",
tableRef, Iterables.size(rows), results.size(), getRowsMillis);
}
- validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
+ validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());
return results;
}
@@ -378,7 +381,7 @@ public Iterator> getRowsColumnRange(TableReference table
batchHint,
getStartTimestamp());
if (!rawResults.hasNext()) {
- validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
+ validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());
} // else the postFiltered iterator will check for each batch.
Iterator> rawResultsByRow = partitionByRow(rawResults);
@@ -431,7 +434,7 @@ protected Iterator> computeNext() {
rawBuilder.put(result);
}
Map raw = rawBuilder.build();
- validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
+ validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());
if (raw.isEmpty()) {
return endOfData();
}
@@ -508,7 +511,7 @@ public SortedMap> getRowsIgnoringLocalWrites(
ColumnSelection.all(),
getStartTimestamp()));
- validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
+ validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());
return filterRowResults(tableRef, rawResults, ImmutableMap.builderWithExpectedSize(rawResults.size()));
}
@@ -572,7 +575,7 @@ public Map get(TableReference tableRef, Set cells) {
perfLogger.debug("get({}, {} cells) found {} cells (some possibly deleted), took {} ms",
tableRef, cells.size(), result.size(), getMillis);
}
- validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
+ validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());
return Maps.filterValues(result, Predicates.not(Value.IS_EMPTY));
}
@@ -585,7 +588,7 @@ public Map getIgnoringLocalWrites(TableReference tableRef, Set result = getFromKeyValueService(tableRef, cells);
- validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
+ validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());
return Maps.filterValues(result, Predicates.not(Value.IS_EMPTY));
}
@@ -631,7 +634,7 @@ public Iterable>> getRanges(final TableRefer
Timer.Context timer = getTimer("processedRangeMillis").time();
Map, byte[]>> firstPages =
keyValueService.getFirstBatchForRanges(tableRef, input, getStartTimestamp());
- validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
+ validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());
SortedMap postFiltered = postFilterPages(
tableRef,
@@ -747,13 +750,17 @@ protected void batchAcceptSizeHint(
};
}
- private void validateExternalAndCommitLocksIfNecessary(TableReference tableRef, long timestamp) {
- if (!isValidationNecessary(tableRef)) {
+ private void validatePreCommitRequirementsOnReadIfNecessary(TableReference tableRef, long timestamp) {
+ if (!isValidationNecessaryOnReads(tableRef)) {
return;
}
throwIfPreCommitRequirementsNotMet(null, timestamp);
}
+ private boolean isValidationNecessaryOnReads(TableReference tableRef) {
+ return validateLocksOnReads && isValidationNecessary(tableRef);
+ }
+
private boolean isValidationNecessary(TableReference tableRef) {
return sweepStrategyManager.get().get(tableRef) == SweepStrategy.THOROUGH;
}
@@ -902,7 +909,7 @@ protected ClosableIterator> postFilterIterator(
@Override
protected Iterator> computeNext() {
List> batch = results.getBatch();
- validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
+ validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());
if (batch.isEmpty()) {
return endOfData();
}
@@ -1132,7 +1139,7 @@ private Map getWithPostFilteringInternal(TableReference tableRe
if (!keysToReload.isEmpty()) {
Map nextRawResults = keyValueService.get(tableRef, keysToReload);
- validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
+ validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());
return getRemainingResults(nextRawResults, keysAddedToResults);
} else {
return ImmutableMap.of();
diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java
index 4d6ac90d59e..996555883cc 100644
--- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java
+++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java
@@ -76,6 +76,7 @@
final ExecutorService deleteExecutor;
final int defaultGetRangesConcurrency;
final MultiTableSweepQueueWriter sweepQueueWriter;
+ final boolean validateLocksOnReads;
final List closingCallbacks;
final AtomicBoolean isClosed;
@@ -98,7 +99,8 @@ protected SnapshotTransactionManager(
int defaultGetRangesConcurrency,
TimestampCache timestampCache,
MultiTableSweepQueueWriter sweepQueueWriter,
- ExecutorService deleteExecutor) {
+ ExecutorService deleteExecutor,
+ boolean validateLocksOnReads) {
super(metricsManager, timestampCache);
TimestampTracker.instrumentTimestamps(metricsManager, timelockService, cleaner);
this.metricsManager = metricsManager;
@@ -119,6 +121,7 @@ protected SnapshotTransactionManager(
this.sweepQueueWriter = sweepQueueWriter;
this.deleteExecutor = deleteExecutor;
this.commitProfileProcessor = CommitProfileProcessor.createDefault(metricsManager);
+ this.validateLocksOnReads = validateLocksOnReads;
}
@Override
@@ -169,10 +172,12 @@ public T finishRunTaskWithLockThrowOnConflict(Transacti
Timer postTaskTimer = getTimer("finishTask");
Timer.Context postTaskContext;
+ TransactionTask wrappedTask = wrapTaskIfNecessary(task, txAndLock.immutableTsLock());
+
SnapshotTransaction tx = (SnapshotTransaction) txAndLock.transaction();
T result;
try {
- result = runTaskThrowOnConflict(task, tx);
+ result = runTaskThrowOnConflict(wrappedTask, tx);
} finally {
postTaskContext = postTaskTimer.time();
timelockService.tryUnlock(ImmutableSet.of(txAndLock.immutableTsLock()));
@@ -192,6 +197,18 @@ private void scrubForAggressiveHardDelete(SnapshotTransaction tx) {
}
}
+ private TransactionTask wrapTaskIfNecessary(
+ TransactionTask task, LockToken immutableTsLock) {
+ if (taskWrappingIsNecessary()) {
+ return new LockCheckingTransactionTask<>(task, timelockService, immutableTsLock);
+ }
+ return task;
+ }
+
+ private boolean taskWrappingIsNecessary() {
+ return !validateLocksOnReads;
+ }
+
protected SnapshotTransaction createTransaction(
long immutableTimestamp,
Supplier startTimestampSupplier,
@@ -219,7 +236,8 @@ protected SnapshotTransaction createTransaction(
defaultGetRangesConcurrency,
sweepQueueWriter,
deleteExecutor,
- commitProfileProcessor);
+ commitProfileProcessor,
+ validateLocksOnReads);
}
@Override
@@ -249,7 +267,8 @@ public T runTaskWithCondi
defaultGetRangesConcurrency,
sweepQueueWriter,
deleteExecutor,
- commitProfileProcessor);
+ commitProfileProcessor,
+ validateLocksOnReads);
try {
return runTaskThrowOnConflict(txn -> task.execute(txn, condition),
new ReadTransaction(transaction, sweepStrategyManager));
diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/LockCheckingTransactionTaskTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/LockCheckingTransactionTaskTest.java
new file mode 100644
index 00000000000..af449f82644
--- /dev/null
+++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/LockCheckingTransactionTaskTest.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2018 Palantir Technologies, Inc. All rights reserved.
+ *
+ * Licensed under the BSD-3 License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://opensource.org/licenses/BSD-3-Clause
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.palantir.atlasdb.transaction.impl;
+
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anySet;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
+
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableSet;
+import com.palantir.atlasdb.transaction.api.Transaction;
+import com.palantir.atlasdb.transaction.api.TransactionFailedNonRetriableException;
+import com.palantir.atlasdb.transaction.api.TransactionLockTimeoutException;
+import com.palantir.atlasdb.transaction.api.TransactionTask;
+import com.palantir.lock.v2.LockToken;
+import com.palantir.lock.v2.TimelockService;
+
+public class LockCheckingTransactionTaskTest {
+ private final TimelockService timelockService = mock(TimelockService.class);
+ private final LockToken lockToken = mock(LockToken.class);
+ private final Transaction transaction = mock(Transaction.class);
+ private final TransactionTask delegate = spy(new TransactionTask() {
+ @Override
+ public Object execute(Transaction txn) throws Exception {
+ return "result";
+ }
+ });
+ private final TransactionTask wrappingTask = new LockCheckingTransactionTask(delegate, timelockService, lockToken);
+
+ @Before
+ public void setUp() {
+ Set lockTokens = ImmutableSet.of(lockToken);
+ when(timelockService.refreshLockLeases(lockTokens)).thenReturn(lockTokens);
+ }
+
+ @Test
+ public void shouldCallDelegateOnce() throws Exception {
+ wrappingTask.execute(transaction);
+ verify(delegate, times(1)).execute(transaction);
+ }
+
+ @Test
+ public void shouldReturnResultOfDelegate() throws Exception {
+ assertEquals(wrappingTask.execute(transaction), "result");
+ }
+
+ @Test
+ public void shouldRethrowInterruptedException() throws Exception {
+ Exception exception = new InterruptedException();
+ when(delegate.execute(transaction)).thenThrow(exception);
+ assertThatThrownBy(() -> wrappingTask.execute(transaction)).isEqualTo(exception);
+ }
+
+ @Test
+ public void shouldRethrowNonRetriableException() throws Exception {
+ Exception exception = new TransactionFailedNonRetriableException("msg");
+ when(delegate.execute(transaction)).thenThrow(exception);
+ assertThatThrownBy(() -> wrappingTask.execute(transaction)).isEqualTo(exception);
+ }
+
+ @Test
+ public void shouldRethrowExceptionIfLockIsValid() throws Exception {
+ Exception exception = new IllegalStateException();
+ when(delegate.execute(transaction)).thenThrow(exception);
+ assertThatThrownBy(() -> wrappingTask.execute(transaction)).isEqualTo(exception);
+ }
+
+ @Test
+ public void shouldThrowTransactionLockTimeoutExceptionIfLockIsInvalid() throws Exception {
+ Exception exception = new IllegalStateException();
+ when(delegate.execute(transaction)).thenThrow(exception);
+
+ invalidateLockTokens();
+ assertThatThrownBy(() -> wrappingTask.execute(transaction)).isInstanceOf(TransactionLockTimeoutException.class);
+ }
+
+ private void invalidateLockTokens() {
+ when(timelockService.refreshLockLeases(anySet())).thenReturn(ImmutableSet.of());
+ }
+}
diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManagerTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManagerTest.java
index 1c43653cf13..964b3c76227 100644
--- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManagerTest.java
+++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SerializableTransactionManagerTest.java
@@ -279,7 +279,8 @@ private TransactionManager getManagerWithCallback(boolean initializeAsync,
TimestampCache.createForTests(),
MultiTableSweepQueueWriter.NO_OP,
callBack,
- executor);
+ executor,
+ true);
}
private void nothingInitialized() {
diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManagerTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManagerTest.java
index 7f112a46359..17a0907f3a5 100644
--- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManagerTest.java
+++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManagerTest.java
@@ -83,7 +83,8 @@ public class SnapshotTransactionManagerTest {
TransactionTestConstants.DEFAULT_GET_RANGES_CONCURRENCY,
TimestampCache.createForTests(),
MultiTableSweepQueueWriter.NO_OP,
- executorService);
+ executorService,
+ true);
@Test
public void isAlwaysInitialized() {
@@ -127,7 +128,8 @@ public void canCloseTransactionManagerWithNonCloseableLockService() {
TransactionTestConstants.DEFAULT_GET_RANGES_CONCURRENCY,
TimestampCache.createForTests(),
MultiTableSweepQueueWriter.NO_OP,
- executorService);
+ executorService,
+ true);
newTransactionManager.close(); // should not throw
}
diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/AbstractSerializableTransactionTest.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/AbstractSerializableTransactionTest.java
index bee13f76e12..9f5c8120bfe 100644
--- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/AbstractSerializableTransactionTest.java
+++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/AbstractSerializableTransactionTest.java
@@ -118,7 +118,8 @@ protected Transaction startTransaction() {
AbstractTransactionTest.DEFAULT_GET_RANGES_CONCURRENCY,
getSweepQueueWriterInitialized(),
MoreExecutors.newDirectExecutorService(),
- CommitProfileProcessor.createNonLogging(metricsManager)) {
+ CommitProfileProcessor.createNonLogging(metricsManager),
+ true) {
@Override
protected Map transformGetsForTesting(Map map) {
return Maps.transformValues(map, input -> input.clone());
diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/AbstractTransactionTest.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/AbstractTransactionTest.java
index 8dcbf728ad0..71540c57790 100644
--- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/AbstractTransactionTest.java
+++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/AbstractTransactionTest.java
@@ -127,7 +127,8 @@ protected Transaction startTransaction() {
DEFAULT_GET_RANGES_CONCURRENCY,
MultiTableSweepQueueWriter.NO_OP,
MoreExecutors.newDirectExecutorService(),
- CommitProfileProcessor.createNonLogging(metricsManager));
+ CommitProfileProcessor.createNonLogging(metricsManager),
+ true);
}
@Test
diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/CommitLockTest.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/CommitLockTest.java
index 2b97d360ddb..796f407da6d 100644
--- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/CommitLockTest.java
+++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/CommitLockTest.java
@@ -180,7 +180,8 @@ private Transaction startTransaction(PreCommitCondition preCommitCondition, Conf
AbstractTransactionTest.DEFAULT_GET_RANGES_CONCURRENCY,
MultiTableSweepQueueWriter.NO_OP,
MoreExecutors.newDirectExecutorService(),
- CommitProfileProcessor.createNonLogging(metricsManager)) {
+ CommitProfileProcessor.createNonLogging(metricsManager),
+ true) {
@Override
protected Map transformGetsForTesting(Map map) {
return Maps.transformValues(map, input -> input.clone());
diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TestTransactionManagerImpl.java b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TestTransactionManagerImpl.java
index ba07d4553ac..e5e58a02d43 100644
--- a/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TestTransactionManagerImpl.java
+++ b/atlasdb-tests-shared/src/main/java/com/palantir/atlasdb/transaction/impl/TestTransactionManagerImpl.java
@@ -72,7 +72,8 @@ public TestTransactionManagerImpl(MetricsManager metricsManager,
AbstractTransactionTest.GET_RANGES_THREAD_POOL_SIZE,
AbstractTransactionTest.DEFAULT_GET_RANGES_CONCURRENCY,
sweepQueue,
- deleteExecutor);
+ deleteExecutor,
+ true);
}
@SuppressWarnings("Indentation") // Checkstyle complains about lambda in constructor.
@@ -98,7 +99,8 @@ public TestTransactionManagerImpl(MetricsManager metricsManager,
AbstractTransactionTest.GET_RANGES_THREAD_POOL_SIZE,
AbstractTransactionTest.DEFAULT_GET_RANGES_CONCURRENCY,
MultiTableSweepQueueWriter.NO_OP,
- MoreExecutors.newDirectExecutorService());
+ MoreExecutors.newDirectExecutorService(),
+ true);
}
@Override
@@ -141,7 +143,8 @@ public Transaction createNewTransaction() {
defaultGetRangesConcurrency,
sweepQueueWriter,
deleteExecutor,
- CommitProfileProcessor.createNonLogging(metricsManager));
+ CommitProfileProcessor.createNonLogging(metricsManager),
+ validateLocksOnReads);
}
@Override
diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionTest.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionTest.java
index 8f2da7ac036..40d778c2d7d 100644
--- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionTest.java
+++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionTest.java
@@ -296,7 +296,8 @@ public void testLockAfterGet() throws Exception {
defaultGetRangesConcurrency,
MultiTableSweepQueueWriter.NO_OP,
MoreExecutors.newDirectExecutorService(),
- CommitProfileProcessor.createNonLogging(metricsManager));
+ CommitProfileProcessor.createNonLogging(metricsManager),
+ true);
try {
snapshot.get(TABLE, ImmutableSet.of(cell));
fail();
@@ -363,7 +364,8 @@ public void testPutCleanup() throws Exception {
defaultGetRangesConcurrency,
MultiTableSweepQueueWriter.NO_OP,
MoreExecutors.newDirectExecutorService(),
- CommitProfileProcessor.createNonLogging(metricsManager));
+ CommitProfileProcessor.createNonLogging(metricsManager),
+ true);
snapshot.delete(TABLE, ImmutableSet.of(cell));
snapshot.commit();
@@ -1028,30 +1030,12 @@ public void commitThrowsIfRolledBackAtCommitTime_expiredLocks() {
LockImmutableTimestampResponse res =
timelockService.lockImmutableTimestamp(IdentifiedTimeLockRequest.create());
long transactionTs = timelockService.getFreshTimestamp();
- SnapshotTransaction snapshot = new SnapshotTransaction(
- metricsManager,
- keyValueService,
+
+ SnapshotTransaction snapshot = getSnapshotTransactionWith(
timelockService,
- transactionService,
- NoOpCleaner.INSTANCE,
() -> transactionTs,
- TestConflictDetectionManagers.createWithStaticConflictDetection(
- ImmutableMap.of(TABLE, ConflictHandler.RETRY_ON_WRITE_WRITE)),
- SweepStrategyManagers.createDefault(keyValueService),
- res.getImmutableTimestamp(),
- Optional.of(res.getLock()),
- condition,
- AtlasDbConstraintCheckingMode.NO_CONSTRAINT_CHECKING,
- null,
- TransactionReadSentinelBehavior.THROW_EXCEPTION,
- false,
- timestampCache,
- AtlasDbConstants.DEFAULT_TRANSACTION_LOCK_ACQUIRE_TIMEOUT_MS,
- getRangesExecutor,
- defaultGetRangesConcurrency,
- MultiTableSweepQueueWriter.NO_OP,
- MoreExecutors.newDirectExecutorService(),
- CommitProfileProcessor.createNonLogging(metricsManager));
+ res,
+ condition);
//simulate roll back at commit time
transactionService.putUnlessExists(snapshot.getTimestamp(), TransactionConstants.FAILED_COMMIT_TS);
@@ -1075,30 +1059,12 @@ public void commitThrowsIfRolledBackAtCommitTime_alreadyAborted() {
LockImmutableTimestampResponse res =
timelockService.lockImmutableTimestamp(IdentifiedTimeLockRequest.create());
long transactionTs = timelockService.getFreshTimestamp();
- SnapshotTransaction snapshot = new SnapshotTransaction(
- metricsManager,
- keyValueService,
+
+ SnapshotTransaction snapshot = getSnapshotTransactionWith(
timelockService,
- transactionService,
- NoOpCleaner.INSTANCE,
() -> transactionTs,
- TestConflictDetectionManagers.createWithStaticConflictDetection(
- ImmutableMap.of(TABLE, ConflictHandler.RETRY_ON_WRITE_WRITE)),
- SweepStrategyManagers.createDefault(keyValueService),
- res.getImmutableTimestamp(),
- Optional.of(res.getLock()),
- PreCommitConditions.NO_OP,
- AtlasDbConstraintCheckingMode.NO_CONSTRAINT_CHECKING,
- null,
- TransactionReadSentinelBehavior.THROW_EXCEPTION,
- false,
- timestampCache,
- 10_000L,
- getRangesExecutor,
- defaultGetRangesConcurrency,
- sweepQueue,
- MoreExecutors.newDirectExecutorService(),
- CommitProfileProcessor.createNonLogging(metricsManager));
+ res,
+ PreCommitConditions.NO_OP);
//forcing to try to commit a transaction that is already committed
transactionService.putUnlessExists(transactionTs, TransactionConstants.FAILED_COMMIT_TS);
@@ -1119,19 +1085,107 @@ public void commitDoesNotThrowIfAlreadySuccessfullyCommitted() {
long transactionTs = timelockService.getFreshTimestamp();
LockImmutableTimestampResponse res =
timelockService.lockImmutableTimestamp(IdentifiedTimeLockRequest.create());
- SnapshotTransaction snapshot = new SnapshotTransaction(
+
+ SnapshotTransaction snapshot = getSnapshotTransactionWith(
+ timelockService,
+ () -> transactionTs,
+ res,
+ PreCommitConditions.NO_OP);
+
+ when(timestampServiceSpy.getFreshTimestamp()).thenReturn(10000000L);
+
+ //forcing to try to commit a transaction that is already committed
+ transactionService.putUnlessExists(transactionTs, timelockService.getFreshTimestamp());
+
+ snapshot.put(TABLE, ImmutableMap.of(cell, PtBytes.toBytes("value")));
+ snapshot.commit();
+
+ timelockService.unlock(Collections.singleton(res.getLock()));
+ }
+
+ @Test
+ public void validateLocksOnReadsIfThoroughlySwept() {
+ keyValueService.createTable(
+ TABLE_SWEPT_THOROUGH,
+ getTableMetadataForSweepStrategy(SweepStrategy.THOROUGH).persistToBytes());
+
+ TimelockService timelockService = new LegacyTimelockService(timestampService, lockService, lockClient);
+ long transactionTs = timelockService.getFreshTimestamp();
+ LockImmutableTimestampResponse res =
+ timelockService.lockImmutableTimestamp(IdentifiedTimeLockRequest.create());
+
+ SnapshotTransaction transaction = getSnapshotTransactionWith(
+ timelockService,
+ () -> transactionTs,
+ res,
+ PreCommitConditions.NO_OP,
+ true);
+
+ timelockService.unlock(ImmutableSet.of(res.getLock()));
+
+ Cell cellToRead = Cell.create(PtBytes.toBytes("row1"), PtBytes.toBytes("column1"));
+
+ assertThatExceptionOfType(TransactionLockTimeoutException.class).isThrownBy(() ->
+ transaction.get(TABLE_SWEPT_THOROUGH, ImmutableSet.of(cellToRead)));
+ }
+
+ @Test
+ public void validateLocksOnlyOnCommitIfValidationFlagIsFalse() {
+ keyValueService.createTable(
+ TABLE_SWEPT_THOROUGH,
+ getTableMetadataForSweepStrategy(SweepStrategy.THOROUGH).persistToBytes());
+
+ TimelockService timelockService = new LegacyTimelockService(timestampService, lockService, lockClient);
+ long transactionTs = timelockService.getFreshTimestamp();
+ LockImmutableTimestampResponse res =
+ timelockService.lockImmutableTimestamp(IdentifiedTimeLockRequest.create());
+
+ SnapshotTransaction transaction = getSnapshotTransactionWith(
+ timelockService,
+ () -> transactionTs,
+ res,
+ PreCommitConditions.NO_OP,
+ false);
+
+ timelockService.unlock(ImmutableSet.of(res.getLock()));
+ Cell cellToRead = Cell.create(PtBytes.toBytes("row1"), PtBytes.toBytes("column1"));
+ transaction.get(TABLE_SWEPT_THOROUGH, ImmutableSet.of(cellToRead));
+
+ assertThatExceptionOfType(TransactionLockTimeoutException.class).isThrownBy(() -> transaction.commit());
+ }
+
+ private SnapshotTransaction getSnapshotTransactionWith(
+ TimelockService timelockService,
+ Supplier startTs,
+ LockImmutableTimestampResponse lockImmutableTimestampResponse,
+ PreCommitCondition preCommitCondition) {
+ return getSnapshotTransactionWith(
+ timelockService,
+ startTs,
+ lockImmutableTimestampResponse,
+ preCommitCondition,
+ true);
+ }
+
+ private SnapshotTransaction getSnapshotTransactionWith(
+ TimelockService timelockService,
+ Supplier startTs,
+ LockImmutableTimestampResponse lockImmutableTimestampResponse,
+ PreCommitCondition preCommitCondition,
+ boolean validateLocksOnReads) {
+ return new SnapshotTransaction(
metricsManager,
keyValueService,
timelockService,
transactionService,
NoOpCleaner.INSTANCE,
- () -> transactionTs,
+ startTs,
TestConflictDetectionManagers.createWithStaticConflictDetection(
ImmutableMap.of(TABLE, ConflictHandler.RETRY_ON_WRITE_WRITE)),
SweepStrategyManagers.createDefault(keyValueService),
- res.getImmutableTimestamp(),
- Optional.of(res.getLock()),
- PreCommitConditions.NO_OP,
+ lockImmutableTimestampResponse.getImmutableTimestamp(),
+ Optional.of(lockImmutableTimestampResponse.getLock()),
+ preCommitCondition,
AtlasDbConstraintCheckingMode.NO_CONSTRAINT_CHECKING,
null,
TransactionReadSentinelBehavior.THROW_EXCEPTION,
@@ -1142,17 +1196,8 @@ public void commitDoesNotThrowIfAlreadySuccessfullyCommitted() {
defaultGetRangesConcurrency,
MultiTableSweepQueueWriter.NO_OP,
MoreExecutors.newDirectExecutorService(),
- CommitProfileProcessor.createNonLogging(metricsManager));
-
- when(timestampServiceSpy.getFreshTimestamp()).thenReturn(10000000L);
-
- //forcing to try to commit a transaction that is already committed
- transactionService.putUnlessExists(transactionTs, timelockService.getFreshTimestamp());
-
- snapshot.put(TABLE, ImmutableMap.of(cell, PtBytes.toBytes("value")));
- snapshot.commit();
-
- timelockService.unlock(Collections.singleton(res.getLock()));
+ CommitProfileProcessor.createNonLogging(metricsManager),
+ validateLocksOnReads);
}
private void writeCells(TableReference table, ImmutableMap cellsToWrite) {
diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/TransactionManagerTest.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/TransactionManagerTest.java
index 0446165bb10..12f54bf7d00 100644
--- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/TransactionManagerTest.java
+++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/TransactionManagerTest.java
@@ -166,7 +166,8 @@ private TransactionManager setupTransactionManager() {
AbstractTransactionTest.GET_RANGES_THREAD_POOL_SIZE,
AbstractTransactionTest.DEFAULT_GET_RANGES_CONCURRENCY,
MultiTableSweepQueueWriter.NO_OP,
- MoreExecutors.newDirectExecutorService());
+ MoreExecutors.newDirectExecutorService(),
+ true);
when(timelock.getFreshTimestamp()).thenReturn(1L);
when(timelock.lockImmutableTimestamp(any())).thenReturn(
diff --git a/docs/source/release_notes/release-notes.rst b/docs/source/release_notes/release-notes.rst
index 0c745b37db6..354d9e3fd12 100644
--- a/docs/source/release_notes/release-notes.rst
+++ b/docs/source/release_notes/release-notes.rst
@@ -89,6 +89,12 @@ develop
(`Pull Request `__)
+ * - |new| |devbreak|
+ - ``TransactionManagers`` has a new builder option named ``validateLocksOnReads()``; set to ``true`` by default. This option is passed to ``TransactionManager``'s constructor, to be used in initialization of ``Transaction``.
+ A transaction will validate pre-commit conditions and immutable ts lock after every read operation if underlying table is thoroughly swept (Default behavior). Setting ``validateLocksOnReads`` to ``false`` will stop transaction to do the mentioned validation on read operations; causing validations to take place only at commit time for the sake of reducing number of round-trips to improve overall transaction perf.
+ This change will cause a devbreak if you are constructing a ``TransactionManager`` outside of ``TransactionManagers``. This can be resolved by adding an additional boolean parameter to the constructor (``true`` if you would like to keep previous behaviour)
+ (`Pull Request `__)
+
========
v0.100.0
========
| | | | | | | | | | | |