Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

optional validation of immutable ts on read operations #3414

Merged
merged 8 commits into from
Aug 16, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ boolean allowHiddenTableAccess() {
return false;
}

@Value.Default
boolean validateLocksOnReads() {
return true;
}

abstract String userAgent();

abstract MetricRegistry globalMetricsRegistry();
Expand Down Expand Up @@ -375,7 +380,8 @@ runtimeConfigSupplier, registrar(), () -> LockServiceImpl.create(lockServerOptio
new TimestampCache(metricsManager.getRegistry(),
() -> runtimeConfigSupplier.get().getTimestampCacheSize()),
targetedSweep,
callbacks),
callbacks,
validateLocksOnReads()),
closeables);
TransactionManager instrumentedTransactionManager =
AtlasDbMetrics.instrument(metricsManager.getRegistry(), TransactionManager.class, transactionManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ public SerializableTransactionManager provideTransactionManager(MetricsManager m
config.atlasDbConfig().keyValueService().defaultGetRangesConcurrency(),
MultiTableSweepQueueWriter.NO_OP,
Executors.newSingleThreadExecutor(
new NamedThreadFactory(TransactionManagerModule.class + "-delete-executor", true)));
new NamedThreadFactory(TransactionManagerModule.class + "-delete-executor", true)),
true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public SerializableTransactionManager provideTransactionManager(MetricsManager m
config.atlasDbConfig().keyValueService().concurrentGetRangesThreadPoolSize(),
config.atlasDbConfig().keyValueService().defaultGetRangesConcurrency(),
MultiTableSweepQueueWriter.NO_OP,
PTExecutors.newSingleThreadExecutor(true));
PTExecutors.newSingleThreadExecutor(true),
true);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2018 Palantir Technologies, Inc. All rights reserved.
*
* Licensed under the BSD-3 License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.transaction.impl;

import com.google.common.collect.ImmutableSet;
import com.palantir.atlasdb.transaction.api.Transaction;
import com.palantir.atlasdb.transaction.api.TransactionFailedNonRetriableException;
import com.palantir.atlasdb.transaction.api.TransactionLockTimeoutException;
import com.palantir.atlasdb.transaction.api.TransactionTask;
import com.palantir.lock.v2.LockToken;
import com.palantir.lock.v2.TimelockService;

/**
* Best effort attempt to keep backwards compatibility while making immutableTs lock validation optional on reads.
* Validating immutableTs lock only on commits rather than on every read may cause the TransactionTask to throw
* an unexpected non-retriable error as reads are not idempotent. This wrapper task will convert the exception thrown
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two issues:

  • reads aren't idempotent (, because)
  • if the immutableTs lock is lost, sweep may remove data our transaction is reading, affecting the consistency of the data read

* by the underlying task into a retriable lock timeout exception if immutableTs lock is not valid anymore.
*/
public class LockCheckingTransactionTask<T, E extends Exception> implements TransactionTask<T, E> {
private final TransactionTask<T, E> delegate;
private final TimelockService timelockService;
private final LockToken immutableTsLock;

public LockCheckingTransactionTask(TransactionTask<T, E> delegate,
TimelockService timelockService,
LockToken immutableTsLock) {
this.delegate = delegate;
this.timelockService = timelockService;
this.immutableTsLock = immutableTsLock;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: whitespace

}

public T execute(Transaction transaction) throws E {
try {
return delegate.execute(transaction);
} catch (Exception ex) {
if (shouldRethrowWithoutLockValidation(ex) || immutableTsLockIsValid()) {
throw ex;
}
throw new TransactionLockTimeoutException(
"The following immutable timestamp lock is no longer valid: " + immutableTsLock);
}
}

private boolean shouldRethrowWithoutLockValidation(Exception ex) {
return ex instanceof InterruptedException || ex instanceof TransactionFailedNonRetriableException;
}

private boolean immutableTsLockIsValid() {
return !timelockService.refreshLockLeases(ImmutableSet.of(immutableTsLock)).isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public SerializableTransaction(MetricsManager metricsManager,
int defaultGetRangesConcurrency,
MultiTableSweepQueueWriter sweepQueue,
ExecutorService deleteExecutor,
CommitProfileProcessor commitProfileProcessor) {
CommitProfileProcessor commitProfileProcessor,
boolean validateLocksOnReads) {
super(metricsManager,
keyValueService,
timelockService,
Expand All @@ -149,7 +150,8 @@ public SerializableTransaction(MetricsManager metricsManager,
defaultGetRangesConcurrency,
sweepQueue,
deleteExecutor,
commitProfileProcessor);
commitProfileProcessor,
validateLocksOnReads);
}

@Override
Expand Down Expand Up @@ -709,7 +711,8 @@ private Transaction getReadOnlyTransaction(final long commitTs) {
defaultGetRangesConcurrency,
MultiTableSweepQueueWriter.NO_OP,
deleteExecutor,
commitProfileProcessor) {
commitProfileProcessor,
validateLocksOnReads) {
@Override
protected Map<Long, Long> getCommitTimestamps(TableReference tableRef,
Iterable<Long> startTimestamps,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ public static TransactionManager create(MetricsManager metricsManager,
boolean initializeAsync,
TimestampCache timestampCache,
MultiTableSweepQueueWriter sweepQueueWriter,
Callback<TransactionManager> callback) {
Callback<TransactionManager> callback,
boolean validateLocksOnReads) {
TransactionManager serializableTransactionManager = new SerializableTransactionManager(
metricsManager,
keyValueService,
Expand All @@ -213,7 +214,8 @@ public static TransactionManager create(MetricsManager metricsManager,
concurrentGetRangesThreadPoolSize,
defaultGetRangesConcurrency,
sweepQueueWriter,
PTExecutors.newSingleThreadExecutor(true));
PTExecutors.newSingleThreadExecutor(true),
validateLocksOnReads);

if (!initializeAsync) {
callback.runWithRetry(serializableTransactionManager);
Expand Down Expand Up @@ -253,7 +255,8 @@ public static SerializableTransactionManager createForTest(MetricsManager metric
concurrentGetRangesThreadPoolSize,
defaultGetRangesConcurrency,
sweepQueue,
PTExecutors.newSingleThreadExecutor(true));
PTExecutors.newSingleThreadExecutor(true),
true);
}

public SerializableTransactionManager(MetricsManager metricsManager,
Expand All @@ -271,7 +274,8 @@ public SerializableTransactionManager(MetricsManager metricsManager,
int concurrentGetRangesThreadPoolSize,
int defaultGetRangesConcurrency,
MultiTableSweepQueueWriter sweepQueueWriter,
ExecutorService deleteExecutor) {
ExecutorService deleteExecutor,
boolean validateLocksOnReads) {
super(
metricsManager,
keyValueService,
Expand All @@ -288,7 +292,8 @@ public SerializableTransactionManager(MetricsManager metricsManager,
defaultGetRangesConcurrency,
timestampCache,
sweepQueueWriter,
deleteExecutor
deleteExecutor,
validateLocksOnReads
);
}

Expand Down Expand Up @@ -319,7 +324,8 @@ protected SnapshotTransaction createTransaction(long immutableTimestamp,
defaultGetRangesConcurrency,
sweepQueueWriter,
deleteExecutor,
commitProfileProcessor);
commitProfileProcessor,
validateLocksOnReads);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public ShouldNotDeleteAndRollbackTransaction(MetricsManager metricsManager,
defaultGetRangesConcurrency,
MultiTableSweepQueueWriter.NO_OP,
IGNORING_EXECUTOR,
CommitProfileProcessor.createNonLogging(metricsManager));
CommitProfileProcessor.createNonLogging(metricsManager),
true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ private enum State {
private final Timer.Context transactionTimerContext;
protected final CommitProfileProcessor commitProfileProcessor;
protected final TransactionOutcomeMetrics transactionOutcomeMetrics;
protected final boolean validateLocksOnReads;

protected volatile boolean hasReads;

Expand Down Expand Up @@ -248,7 +249,8 @@ private enum State {
int defaultGetRangesConcurrency,
MultiTableSweepQueueWriter sweepQueue,
ExecutorService deleteExecutor,
CommitProfileProcessor commitProfileProcessor) {
CommitProfileProcessor commitProfileProcessor,
boolean validateLocksOnReads) {
this.metricsManager = metricsManager;
this.transactionTimerContext = getTimer("transactionMillis").time();
this.keyValueService = keyValueService;
Expand All @@ -274,6 +276,7 @@ private enum State {
this.hasReads = false;
this.commitProfileProcessor = commitProfileProcessor;
this.transactionOutcomeMetrics = TransactionOutcomeMetrics.create(metricsManager);
this.validateLocksOnReads = validateLocksOnReads;
}

@Override
Expand Down Expand Up @@ -333,7 +336,7 @@ public SortedMap<byte[], RowResult<byte[]>> getRows(TableReference tableRef, Ite
perfLogger.debug("getRows({}, {} rows) found {} rows, took {} ms",
tableRef, Iterables.size(rows), results.size(), getRowsMillis);
}
validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());
return results;
}

Expand Down Expand Up @@ -378,7 +381,7 @@ public Iterator<Map.Entry<Cell, byte[]>> getRowsColumnRange(TableReference table
batchHint,
getStartTimestamp());
if (!rawResults.hasNext()) {
validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());
} // else the postFiltered iterator will check for each batch.

Iterator<Map.Entry<byte[], RowColumnRangeIterator>> rawResultsByRow = partitionByRow(rawResults);
Expand Down Expand Up @@ -431,7 +434,7 @@ protected Iterator<Map.Entry<Cell, byte[]>> computeNext() {
rawBuilder.put(result);
}
Map<Cell, Value> raw = rawBuilder.build();
validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());
if (raw.isEmpty()) {
return endOfData();
}
Expand Down Expand Up @@ -508,7 +511,7 @@ public SortedMap<byte[], RowResult<byte[]>> getRowsIgnoringLocalWrites(
ColumnSelection.all(),
getStartTimestamp()));

validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());
return filterRowResults(tableRef, rawResults, ImmutableMap.builderWithExpectedSize(rawResults.size()));
}

Expand Down Expand Up @@ -572,7 +575,7 @@ public Map<Cell, byte[]> get(TableReference tableRef, Set<Cell> cells) {
perfLogger.debug("get({}, {} cells) found {} cells (some possibly deleted), took {} ms",
tableRef, cells.size(), result.size(), getMillis);
}
validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());
return Maps.filterValues(result, Predicates.not(Value.IS_EMPTY));
}

Expand All @@ -585,7 +588,7 @@ public Map<Cell, byte[]> getIgnoringLocalWrites(TableReference tableRef, Set<Cel
hasReads = true;

Map<Cell, byte[]> result = getFromKeyValueService(tableRef, cells);
validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());

return Maps.filterValues(result, Predicates.not(Value.IS_EMPTY));
}
Expand Down Expand Up @@ -631,7 +634,7 @@ public Iterable<BatchingVisitable<RowResult<byte[]>>> getRanges(final TableRefer
Timer.Context timer = getTimer("processedRangeMillis").time();
Map<RangeRequest, TokenBackedBasicResultsPage<RowResult<Value>, byte[]>> firstPages =
keyValueService.getFirstBatchForRanges(tableRef, input, getStartTimestamp());
validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());

SortedMap<Cell, byte[]> postFiltered = postFilterPages(
tableRef,
Expand Down Expand Up @@ -747,13 +750,17 @@ protected <K extends Exception> void batchAcceptSizeHint(
};
}

private void validateExternalAndCommitLocksIfNecessary(TableReference tableRef, long timestamp) {
if (!isValidationNecessary(tableRef)) {
private void validatePreCommitRequirementsOnReadIfNecessary(TableReference tableRef, long timestamp) {
if (!isValidationNecessaryOnReads(tableRef)) {
return;
}
throwIfPreCommitRequirementsNotMet(null, timestamp);
}

private boolean isValidationNecessaryOnReads(TableReference tableRef) {
return validateLocksOnReads && isValidationNecessary(tableRef);
}

private boolean isValidationNecessary(TableReference tableRef) {
return sweepStrategyManager.get().get(tableRef) == SweepStrategy.THOROUGH;
}
Expand Down Expand Up @@ -902,7 +909,7 @@ protected <T> ClosableIterator<RowResult<T>> postFilterIterator(
@Override
protected Iterator<RowResult<T>> computeNext() {
List<RowResult<Value>> batch = results.getBatch();
validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());
if (batch.isEmpty()) {
return endOfData();
}
Expand Down Expand Up @@ -1131,7 +1138,7 @@ private <T> Map<Cell, Value> getWithPostFilteringInternal(TableReference tableRe

if (!keysToReload.isEmpty()) {
Map<Cell, Value> nextRawResults = keyValueService.get(tableRef, keysToReload);
validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());
return nextRawResults;
} else {
return ImmutableMap.of();
Expand Down
Loading