Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
optional validation of immutable ts on read operations (#3414)
Browse files Browse the repository at this point in the history
* validate on reads if necessary

* add tests

* renaming

* adding wrapping task

* release notes
  • Loading branch information
gozakdag authored Aug 16, 2018
1 parent 7cf1b8e commit c06b311
Show file tree
Hide file tree
Showing 19 changed files with 384 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ boolean allowHiddenTableAccess() {
return false;
}

@Value.Default
boolean validateLocksOnReads() {
return true;
}

abstract String userAgent();

abstract MetricRegistry globalMetricsRegistry();
Expand Down Expand Up @@ -375,7 +380,8 @@ runtimeConfigSupplier, registrar(), () -> LockServiceImpl.create(lockServerOptio
new TimestampCache(metricsManager.getRegistry(),
() -> runtimeConfigSupplier.get().getTimestampCacheSize()),
targetedSweep,
callbacks),
callbacks,
validateLocksOnReads()),
closeables);
TransactionManager instrumentedTransactionManager =
AtlasDbMetrics.instrument(metricsManager.getRegistry(), TransactionManager.class, transactionManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ public SerializableTransactionManager provideTransactionManager(MetricsManager m
config.atlasDbConfig().keyValueService().defaultGetRangesConcurrency(),
MultiTableSweepQueueWriter.NO_OP,
Executors.newSingleThreadExecutor(
new NamedThreadFactory(TransactionManagerModule.class + "-delete-executor", true)));
new NamedThreadFactory(TransactionManagerModule.class + "-delete-executor", true)),
true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public SerializableTransactionManager provideTransactionManager(MetricsManager m
config.atlasDbConfig().keyValueService().concurrentGetRangesThreadPoolSize(),
config.atlasDbConfig().keyValueService().defaultGetRangesConcurrency(),
MultiTableSweepQueueWriter.NO_OP,
PTExecutors.newSingleThreadExecutor(true));
PTExecutors.newSingleThreadExecutor(true),
true);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2018 Palantir Technologies, Inc. All rights reserved.
*
* Licensed under the BSD-3 License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.palantir.atlasdb.transaction.impl;

import com.google.common.collect.ImmutableSet;
import com.palantir.atlasdb.transaction.api.Transaction;
import com.palantir.atlasdb.transaction.api.TransactionFailedNonRetriableException;
import com.palantir.atlasdb.transaction.api.TransactionLockTimeoutException;
import com.palantir.atlasdb.transaction.api.TransactionTask;
import com.palantir.lock.v2.LockToken;
import com.palantir.lock.v2.TimelockService;

/**
* Best effort attempt to keep backwards compatibility while making immutableTs lock validation optional on reads.
* <p>
* If a read is performed without validating immutableTs lock on a thoroughly swept table, there is no guarantee on
* consistency of data read; as sweep may remove data that is being read. This will not cause any correctness
* issues as immutableTs lock will be checked on commit time, and transaction will be aborted if lock is invalid.
* Although this prevents committing corrupted values, reading inconsistent data may cause tasks to throw
* non-retriable exceptions; causing a possible behaviour change.
* <p>
* This wrapper task will convert the exception thrown by the underlying task into a retriable lock timeout exception
* if immutableTs lock is not valid anymore.
*/
public class LockCheckingTransactionTask<T, E extends Exception> implements TransactionTask<T, E> {
private final TransactionTask<T, E> delegate;
private final TimelockService timelockService;
private final LockToken immutableTsLock;

public LockCheckingTransactionTask(TransactionTask<T, E> delegate,
TimelockService timelockService,
LockToken immutableTsLock) {
this.delegate = delegate;
this.timelockService = timelockService;
this.immutableTsLock = immutableTsLock;
}

public T execute(Transaction transaction) throws E {
try {
return delegate.execute(transaction);
} catch (Exception ex) {
if (shouldRethrowWithoutLockValidation(ex) || immutableTsLockIsValid()) {
throw ex;
}
throw new TransactionLockTimeoutException(
"The following immutable timestamp lock is no longer valid: " + immutableTsLock);
}
}

private boolean shouldRethrowWithoutLockValidation(Exception ex) {
return ex instanceof InterruptedException || ex instanceof TransactionFailedNonRetriableException;
}

private boolean immutableTsLockIsValid() {
return !timelockService.refreshLockLeases(ImmutableSet.of(immutableTsLock)).isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public SerializableTransaction(MetricsManager metricsManager,
int defaultGetRangesConcurrency,
MultiTableSweepQueueWriter sweepQueue,
ExecutorService deleteExecutor,
CommitProfileProcessor commitProfileProcessor) {
CommitProfileProcessor commitProfileProcessor,
boolean validateLocksOnReads) {
super(metricsManager,
keyValueService,
timelockService,
Expand All @@ -149,7 +150,8 @@ public SerializableTransaction(MetricsManager metricsManager,
defaultGetRangesConcurrency,
sweepQueue,
deleteExecutor,
commitProfileProcessor);
commitProfileProcessor,
validateLocksOnReads);
}

@Override
Expand Down Expand Up @@ -709,7 +711,8 @@ private Transaction getReadOnlyTransaction(final long commitTs) {
defaultGetRangesConcurrency,
MultiTableSweepQueueWriter.NO_OP,
deleteExecutor,
commitProfileProcessor) {
commitProfileProcessor,
validateLocksOnReads) {
@Override
protected Map<Long, Long> getCommitTimestamps(TableReference tableRef,
Iterable<Long> startTimestamps,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ public static TransactionManager create(MetricsManager metricsManager,
boolean initializeAsync,
TimestampCache timestampCache,
MultiTableSweepQueueWriter sweepQueueWriter,
Callback<TransactionManager> callback) {
Callback<TransactionManager> callback,
boolean validateLocksOnReads) {

return create(metricsManager,
keyValueService,
Expand All @@ -219,7 +220,8 @@ public static TransactionManager create(MetricsManager metricsManager,
sweepQueueWriter,
callback,
PTExecutors.newSingleThreadScheduledExecutor(
new NamedThreadFactory("AsyncInitializer-SerializableTransactionManager", true)));
new NamedThreadFactory("AsyncInitializer-SerializableTransactionManager", true)),
validateLocksOnReads);
}

public static TransactionManager create(MetricsManager metricsManager,
Expand All @@ -240,7 +242,8 @@ public static TransactionManager create(MetricsManager metricsManager,
TimestampCache timestampCache,
MultiTableSweepQueueWriter sweepQueueWriter,
Callback<TransactionManager> callback,
ScheduledExecutorService initializer) {
ScheduledExecutorService initializer,
boolean validateLocksOnReads) {
TransactionManager transactionManager = new SerializableTransactionManager(
metricsManager,
keyValueService,
Expand All @@ -257,7 +260,8 @@ public static TransactionManager create(MetricsManager metricsManager,
concurrentGetRangesThreadPoolSize,
defaultGetRangesConcurrency,
sweepQueueWriter,
PTExecutors.newSingleThreadExecutor(true));
PTExecutors.newSingleThreadExecutor(true),
validateLocksOnReads);

if (!initializeAsync) {
callback.runWithRetry(transactionManager);
Expand Down Expand Up @@ -297,7 +301,8 @@ public static SerializableTransactionManager createForTest(MetricsManager metric
concurrentGetRangesThreadPoolSize,
defaultGetRangesConcurrency,
sweepQueue,
PTExecutors.newSingleThreadExecutor(true));
PTExecutors.newSingleThreadExecutor(true),
true);
}

public SerializableTransactionManager(MetricsManager metricsManager,
Expand All @@ -315,7 +320,8 @@ public SerializableTransactionManager(MetricsManager metricsManager,
int concurrentGetRangesThreadPoolSize,
int defaultGetRangesConcurrency,
MultiTableSweepQueueWriter sweepQueueWriter,
ExecutorService deleteExecutor) {
ExecutorService deleteExecutor,
boolean validateLocksOnReads) {
super(
metricsManager,
keyValueService,
Expand All @@ -332,7 +338,8 @@ public SerializableTransactionManager(MetricsManager metricsManager,
defaultGetRangesConcurrency,
timestampCache,
sweepQueueWriter,
deleteExecutor
deleteExecutor,
validateLocksOnReads
);
}

Expand Down Expand Up @@ -363,7 +370,8 @@ protected SnapshotTransaction createTransaction(long immutableTimestamp,
defaultGetRangesConcurrency,
sweepQueueWriter,
deleteExecutor,
commitProfileProcessor);
commitProfileProcessor,
validateLocksOnReads);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public ShouldNotDeleteAndRollbackTransaction(MetricsManager metricsManager,
defaultGetRangesConcurrency,
MultiTableSweepQueueWriter.NO_OP,
IGNORING_EXECUTOR,
CommitProfileProcessor.createNonLogging(metricsManager));
CommitProfileProcessor.createNonLogging(metricsManager),
true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ private enum State {
private final Timer.Context transactionTimerContext;
protected final CommitProfileProcessor commitProfileProcessor;
protected final TransactionOutcomeMetrics transactionOutcomeMetrics;
protected final boolean validateLocksOnReads;

protected volatile boolean hasReads;

Expand Down Expand Up @@ -248,7 +249,8 @@ private enum State {
int defaultGetRangesConcurrency,
MultiTableSweepQueueWriter sweepQueue,
ExecutorService deleteExecutor,
CommitProfileProcessor commitProfileProcessor) {
CommitProfileProcessor commitProfileProcessor,
boolean validateLocksOnReads) {
this.metricsManager = metricsManager;
this.transactionTimerContext = getTimer("transactionMillis").time();
this.keyValueService = keyValueService;
Expand All @@ -274,6 +276,7 @@ private enum State {
this.hasReads = false;
this.commitProfileProcessor = commitProfileProcessor;
this.transactionOutcomeMetrics = TransactionOutcomeMetrics.create(metricsManager);
this.validateLocksOnReads = validateLocksOnReads;
}

@Override
Expand Down Expand Up @@ -333,7 +336,7 @@ public SortedMap<byte[], RowResult<byte[]>> getRows(TableReference tableRef, Ite
perfLogger.debug("getRows({}, {} rows) found {} rows, took {} ms",
tableRef, Iterables.size(rows), results.size(), getRowsMillis);
}
validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());
return results;
}

Expand Down Expand Up @@ -378,7 +381,7 @@ public Iterator<Map.Entry<Cell, byte[]>> getRowsColumnRange(TableReference table
batchHint,
getStartTimestamp());
if (!rawResults.hasNext()) {
validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());
} // else the postFiltered iterator will check for each batch.

Iterator<Map.Entry<byte[], RowColumnRangeIterator>> rawResultsByRow = partitionByRow(rawResults);
Expand Down Expand Up @@ -431,7 +434,7 @@ protected Iterator<Map.Entry<Cell, byte[]>> computeNext() {
rawBuilder.put(result);
}
Map<Cell, Value> raw = rawBuilder.build();
validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());
if (raw.isEmpty()) {
return endOfData();
}
Expand Down Expand Up @@ -508,7 +511,7 @@ public SortedMap<byte[], RowResult<byte[]>> getRowsIgnoringLocalWrites(
ColumnSelection.all(),
getStartTimestamp()));

validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());
return filterRowResults(tableRef, rawResults, ImmutableMap.builderWithExpectedSize(rawResults.size()));
}

Expand Down Expand Up @@ -572,7 +575,7 @@ public Map<Cell, byte[]> get(TableReference tableRef, Set<Cell> cells) {
perfLogger.debug("get({}, {} cells) found {} cells (some possibly deleted), took {} ms",
tableRef, cells.size(), result.size(), getMillis);
}
validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());
return Maps.filterValues(result, Predicates.not(Value.IS_EMPTY));
}

Expand All @@ -585,7 +588,7 @@ public Map<Cell, byte[]> getIgnoringLocalWrites(TableReference tableRef, Set<Cel
hasReads = true;

Map<Cell, byte[]> result = getFromKeyValueService(tableRef, cells);
validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());

return Maps.filterValues(result, Predicates.not(Value.IS_EMPTY));
}
Expand Down Expand Up @@ -631,7 +634,7 @@ public Iterable<BatchingVisitable<RowResult<byte[]>>> getRanges(final TableRefer
Timer.Context timer = getTimer("processedRangeMillis").time();
Map<RangeRequest, TokenBackedBasicResultsPage<RowResult<Value>, byte[]>> firstPages =
keyValueService.getFirstBatchForRanges(tableRef, input, getStartTimestamp());
validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());

SortedMap<Cell, byte[]> postFiltered = postFilterPages(
tableRef,
Expand Down Expand Up @@ -747,13 +750,17 @@ protected <K extends Exception> void batchAcceptSizeHint(
};
}

private void validateExternalAndCommitLocksIfNecessary(TableReference tableRef, long timestamp) {
if (!isValidationNecessary(tableRef)) {
private void validatePreCommitRequirementsOnReadIfNecessary(TableReference tableRef, long timestamp) {
if (!isValidationNecessaryOnReads(tableRef)) {
return;
}
throwIfPreCommitRequirementsNotMet(null, timestamp);
}

private boolean isValidationNecessaryOnReads(TableReference tableRef) {
return validateLocksOnReads && isValidationNecessary(tableRef);
}

private boolean isValidationNecessary(TableReference tableRef) {
return sweepStrategyManager.get().get(tableRef) == SweepStrategy.THOROUGH;
}
Expand Down Expand Up @@ -902,7 +909,7 @@ protected <T> ClosableIterator<RowResult<T>> postFilterIterator(
@Override
protected Iterator<RowResult<T>> computeNext() {
List<RowResult<Value>> batch = results.getBatch();
validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());
if (batch.isEmpty()) {
return endOfData();
}
Expand Down Expand Up @@ -1132,7 +1139,7 @@ private <T> Map<Cell, Value> getWithPostFilteringInternal(TableReference tableRe

if (!keysToReload.isEmpty()) {
Map<Cell, Value> nextRawResults = keyValueService.get(tableRef, keysToReload);
validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validatePreCommitRequirementsOnReadIfNecessary(tableRef, getStartTimestamp());
return getRemainingResults(nextRawResults, keysAddedToResults);
} else {
return ImmutableMap.of();
Expand Down
Loading

0 comments on commit c06b311

Please sign in to comment.