Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

optional validation of immutable ts on read operations #3414

Merged
merged 8 commits into from
Aug 16, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ boolean allowHiddenTableAccess() {
return false;
}

@Value.Default
boolean validateLocksOnReads() {
return true;
}

abstract String userAgent();

abstract MetricRegistry globalMetricsRegistry();
Expand Down Expand Up @@ -375,7 +380,8 @@ runtimeConfigSupplier, registrar(), () -> LockServiceImpl.create(lockServerOptio
new TimestampCache(metricsManager.getRegistry(),
() -> runtimeConfigSupplier.get().getTimestampCacheSize()),
targetedSweep,
callbacks),
callbacks,
validateLocksOnReads()),
closeables);
TransactionManager instrumentedTransactionManager =
AtlasDbMetrics.instrument(metricsManager.getRegistry(), TransactionManager.class, transactionManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ public SerializableTransactionManager provideTransactionManager(MetricsManager m
config.atlasDbConfig().keyValueService().defaultGetRangesConcurrency(),
MultiTableSweepQueueWriter.NO_OP,
Executors.newSingleThreadExecutor(
new NamedThreadFactory(TransactionManagerModule.class + "-delete-executor", true)));
new NamedThreadFactory(TransactionManagerModule.class + "-delete-executor", true)),
true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public SerializableTransactionManager provideTransactionManager(MetricsManager m
config.atlasDbConfig().keyValueService().concurrentGetRangesThreadPoolSize(),
config.atlasDbConfig().keyValueService().defaultGetRangesConcurrency(),
MultiTableSweepQueueWriter.NO_OP,
PTExecutors.newSingleThreadExecutor(true));
PTExecutors.newSingleThreadExecutor(true),
true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public SerializableTransaction(MetricsManager metricsManager,
int defaultGetRangesConcurrency,
MultiTableSweepQueueWriter sweepQueue,
ExecutorService deleteExecutor,
CommitProfileProcessor commitProfileProcessor) {
CommitProfileProcessor commitProfileProcessor,
boolean validateLocksOnReads) {
super(metricsManager,
keyValueService,
timelockService,
Expand All @@ -149,7 +150,8 @@ public SerializableTransaction(MetricsManager metricsManager,
defaultGetRangesConcurrency,
sweepQueue,
deleteExecutor,
commitProfileProcessor);
commitProfileProcessor,
validateLocksOnReads);
}

@Override
Expand Down Expand Up @@ -709,7 +711,8 @@ private Transaction getReadOnlyTransaction(final long commitTs) {
defaultGetRangesConcurrency,
MultiTableSweepQueueWriter.NO_OP,
deleteExecutor,
commitProfileProcessor) {
commitProfileProcessor,
validateLocksOnReads) {
@Override
protected Map<Long, Long> getCommitTimestamps(TableReference tableRef,
Iterable<Long> startTimestamps,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ public static TransactionManager create(MetricsManager metricsManager,
boolean initializeAsync,
TimestampCache timestampCache,
MultiTableSweepQueueWriter sweepQueueWriter,
Callback<TransactionManager> callback) {
Callback<TransactionManager> callback,
boolean validateLocksOnReads) {
TransactionManager serializableTransactionManager = new SerializableTransactionManager(
metricsManager,
keyValueService,
Expand All @@ -213,7 +214,8 @@ public static TransactionManager create(MetricsManager metricsManager,
concurrentGetRangesThreadPoolSize,
defaultGetRangesConcurrency,
sweepQueueWriter,
PTExecutors.newSingleThreadExecutor(true));
PTExecutors.newSingleThreadExecutor(true),
validateLocksOnReads);

if (!initializeAsync) {
callback.runWithRetry(serializableTransactionManager);
Expand Down Expand Up @@ -253,7 +255,8 @@ public static SerializableTransactionManager createForTest(MetricsManager metric
concurrentGetRangesThreadPoolSize,
defaultGetRangesConcurrency,
sweepQueue,
PTExecutors.newSingleThreadExecutor(true));
PTExecutors.newSingleThreadExecutor(true),
true);
}

public SerializableTransactionManager(MetricsManager metricsManager,
Expand All @@ -271,7 +274,8 @@ public SerializableTransactionManager(MetricsManager metricsManager,
int concurrentGetRangesThreadPoolSize,
int defaultGetRangesConcurrency,
MultiTableSweepQueueWriter sweepQueueWriter,
ExecutorService deleteExecutor) {
ExecutorService deleteExecutor,
boolean validateLocksOnReads) {
super(
metricsManager,
keyValueService,
Expand All @@ -288,7 +292,8 @@ public SerializableTransactionManager(MetricsManager metricsManager,
defaultGetRangesConcurrency,
timestampCache,
sweepQueueWriter,
deleteExecutor
deleteExecutor,
validateLocksOnReads
);
}

Expand Down Expand Up @@ -319,7 +324,8 @@ protected SnapshotTransaction createTransaction(long immutableTimestamp,
defaultGetRangesConcurrency,
sweepQueueWriter,
deleteExecutor,
commitProfileProcessor);
commitProfileProcessor,
validateLocksOnReads);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public ShouldNotDeleteAndRollbackTransaction(MetricsManager metricsManager,
defaultGetRangesConcurrency,
MultiTableSweepQueueWriter.NO_OP,
IGNORING_EXECUTOR,
CommitProfileProcessor.createNonLogging(metricsManager));
CommitProfileProcessor.createNonLogging(metricsManager),
true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ private enum State {
private final Timer.Context transactionTimerContext;
protected final CommitProfileProcessor commitProfileProcessor;
protected final TransactionOutcomeMetrics transactionOutcomeMetrics;
protected final boolean validateLocksOnReads;

protected volatile boolean hasReads;

Expand Down Expand Up @@ -248,7 +249,8 @@ private enum State {
int defaultGetRangesConcurrency,
MultiTableSweepQueueWriter sweepQueue,
ExecutorService deleteExecutor,
CommitProfileProcessor commitProfileProcessor) {
CommitProfileProcessor commitProfileProcessor,
boolean validateLocksOnReads) {
this.metricsManager = metricsManager;
this.transactionTimerContext = getTimer("transactionMillis").time();
this.keyValueService = keyValueService;
Expand All @@ -274,6 +276,7 @@ private enum State {
this.hasReads = false;
this.commitProfileProcessor = commitProfileProcessor;
this.transactionOutcomeMetrics = TransactionOutcomeMetrics.create(metricsManager);
this.validateLocksOnReads = validateLocksOnReads;
}

@Override
Expand Down Expand Up @@ -333,7 +336,7 @@ public SortedMap<byte[], RowResult<byte[]>> getRows(TableReference tableRef, Ite
perfLogger.debug("getRows({}, {} rows) found {} rows, took {} ms",
tableRef, Iterables.size(rows), results.size(), getRowsMillis);
}
validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validateExternalAndCommitLocksOnReadIfNecessary(tableRef, getStartTimestamp());
return results;
}

Expand Down Expand Up @@ -378,7 +381,7 @@ public Iterator<Map.Entry<Cell, byte[]>> getRowsColumnRange(TableReference table
batchHint,
getStartTimestamp());
if (!rawResults.hasNext()) {
validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validateExternalAndCommitLocksOnReadIfNecessary(tableRef, getStartTimestamp());
} // else the postFiltered iterator will check for each batch.

Iterator<Map.Entry<byte[], RowColumnRangeIterator>> rawResultsByRow = partitionByRow(rawResults);
Expand Down Expand Up @@ -431,7 +434,7 @@ protected Iterator<Map.Entry<Cell, byte[]>> computeNext() {
rawBuilder.put(result);
}
Map<Cell, Value> raw = rawBuilder.build();
validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validateExternalAndCommitLocksOnReadIfNecessary(tableRef, getStartTimestamp());
if (raw.isEmpty()) {
return endOfData();
}
Expand Down Expand Up @@ -508,7 +511,7 @@ public SortedMap<byte[], RowResult<byte[]>> getRowsIgnoringLocalWrites(
ColumnSelection.all(),
getStartTimestamp()));

validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validateExternalAndCommitLocksOnReadIfNecessary(tableRef, getStartTimestamp());
return filterRowResults(tableRef, rawResults, ImmutableMap.builderWithExpectedSize(rawResults.size()));
}

Expand Down Expand Up @@ -572,7 +575,7 @@ public Map<Cell, byte[]> get(TableReference tableRef, Set<Cell> cells) {
perfLogger.debug("get({}, {} cells) found {} cells (some possibly deleted), took {} ms",
tableRef, cells.size(), result.size(), getMillis);
}
validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validateExternalAndCommitLocksOnReadIfNecessary(tableRef, getStartTimestamp());
return Maps.filterValues(result, Predicates.not(Value.IS_EMPTY));
}

Expand All @@ -585,7 +588,7 @@ public Map<Cell, byte[]> getIgnoringLocalWrites(TableReference tableRef, Set<Cel
hasReads = true;

Map<Cell, byte[]> result = getFromKeyValueService(tableRef, cells);
validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validateExternalAndCommitLocksOnReadIfNecessary(tableRef, getStartTimestamp());

return Maps.filterValues(result, Predicates.not(Value.IS_EMPTY));
}
Expand Down Expand Up @@ -631,7 +634,7 @@ public Iterable<BatchingVisitable<RowResult<byte[]>>> getRanges(final TableRefer
Timer.Context timer = getTimer("processedRangeMillis").time();
Map<RangeRequest, TokenBackedBasicResultsPage<RowResult<Value>, byte[]>> firstPages =
keyValueService.getFirstBatchForRanges(tableRef, input, getStartTimestamp());
validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validateExternalAndCommitLocksOnReadIfNecessary(tableRef, getStartTimestamp());

SortedMap<Cell, byte[]> postFiltered = postFilterPages(
tableRef,
Expand Down Expand Up @@ -747,13 +750,17 @@ protected <K extends Exception> void batchAcceptSizeHint(
};
}

private void validateExternalAndCommitLocksIfNecessary(TableReference tableRef, long timestamp) {
if (!isValidationNecessary(tableRef)) {
private void validateExternalAndCommitLocksOnReadIfNecessary(TableReference tableRef, long timestamp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we rename this to validatePreCommitRequirementsOnReadIfNecessary? Mainly because this checks the pre-commit conditions too, which may not be 'external locks'

if (!isValidationNecessaryOnReads(tableRef)) {
return;
}
throwIfPreCommitRequirementsNotMet(null, timestamp);
}

private boolean isValidationNecessaryOnReads(TableReference tableRef) {
return isValidationNecessary(tableRef) && validateLocksOnReads;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably premature, but if in most usage validateLocksOnReads is going to be false, then flipping the arguments makes sense.

}

private boolean isValidationNecessary(TableReference tableRef) {
return sweepStrategyManager.get().get(tableRef) == SweepStrategy.THOROUGH;
}
Expand Down Expand Up @@ -902,7 +909,7 @@ protected <T> ClosableIterator<RowResult<T>> postFilterIterator(
@Override
protected Iterator<RowResult<T>> computeNext() {
List<RowResult<Value>> batch = results.getBatch();
validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validateExternalAndCommitLocksOnReadIfNecessary(tableRef, getStartTimestamp());
if (batch.isEmpty()) {
return endOfData();
}
Expand Down Expand Up @@ -1131,7 +1138,7 @@ private <T> Map<Cell, Value> getWithPostFilteringInternal(TableReference tableRe

if (!keysToReload.isEmpty()) {
Map<Cell, Value> nextRawResults = keyValueService.get(tableRef, keysToReload);
validateExternalAndCommitLocksIfNecessary(tableRef, getStartTimestamp());
validateExternalAndCommitLocksOnReadIfNecessary(tableRef, getStartTimestamp());
return nextRawResults;
} else {
return ImmutableMap.of();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
final ExecutorService deleteExecutor;
final int defaultGetRangesConcurrency;
final MultiTableSweepQueueWriter sweepQueueWriter;
final boolean validateLocksOnReads;

final List<Runnable> closingCallbacks;
final AtomicBoolean isClosed;
Expand All @@ -98,7 +99,8 @@ protected SnapshotTransactionManager(
int defaultGetRangesConcurrency,
TimestampCache timestampCache,
MultiTableSweepQueueWriter sweepQueueWriter,
ExecutorService deleteExecutor) {
ExecutorService deleteExecutor,
boolean validateLocksOnReads) {
super(metricsManager, timestampCache);
TimestampTracker.instrumentTimestamps(metricsManager, timelockService, cleaner);
this.metricsManager = metricsManager;
Expand All @@ -119,6 +121,7 @@ protected SnapshotTransactionManager(
this.sweepQueueWriter = sweepQueueWriter;
this.deleteExecutor = deleteExecutor;
this.commitProfileProcessor = CommitProfileProcessor.createDefault(metricsManager);
this.validateLocksOnReads = validateLocksOnReads;
}

@Override
Expand Down Expand Up @@ -219,7 +222,8 @@ protected SnapshotTransaction createTransaction(
defaultGetRangesConcurrency,
sweepQueueWriter,
deleteExecutor,
commitProfileProcessor);
commitProfileProcessor,
validateLocksOnReads);
}

@Override
Expand Down Expand Up @@ -249,7 +253,8 @@ public <T, C extends PreCommitCondition, E extends Exception> T runTaskWithCondi
defaultGetRangesConcurrency,
sweepQueueWriter,
deleteExecutor,
commitProfileProcessor);
commitProfileProcessor,
validateLocksOnReads);
try {
return runTaskThrowOnConflict(txn -> task.execute(txn, condition),
new ReadTransaction(transaction, sweepStrategyManager));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ private TransactionManager getManagerWithCallback(boolean initializeAsync,
initializeAsync,
TimestampCache.createForTests(),
MultiTableSweepQueueWriter.NO_OP,
callBack);
callBack,
true);
}

private void nothingInitialized() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public class SnapshotTransactionManagerTest {
TransactionTestConstants.DEFAULT_GET_RANGES_CONCURRENCY,
TimestampCache.createForTests(),
MultiTableSweepQueueWriter.NO_OP,
executorService);
executorService,
true);

@Test
public void isAlwaysInitialized() {
Expand Down Expand Up @@ -127,7 +128,8 @@ public void canCloseTransactionManagerWithNonCloseableLockService() {
TransactionTestConstants.DEFAULT_GET_RANGES_CONCURRENCY,
TimestampCache.createForTests(),
MultiTableSweepQueueWriter.NO_OP,
executorService);
executorService,
true);
newTransactionManager.close(); // should not throw
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ protected Transaction startTransaction() {
AbstractTransactionTest.DEFAULT_GET_RANGES_CONCURRENCY,
getSweepQueueWriterInitialized(),
MoreExecutors.newDirectExecutorService(),
CommitProfileProcessor.createNonLogging(metricsManager)) {
CommitProfileProcessor.createNonLogging(metricsManager),
true) {
@Override
protected Map<Cell, byte[]> transformGetsForTesting(Map<Cell, byte[]> map) {
return Maps.transformValues(map, input -> input.clone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ protected Transaction startTransaction() {
DEFAULT_GET_RANGES_CONCURRENCY,
MultiTableSweepQueueWriter.NO_OP,
MoreExecutors.newDirectExecutorService(),
CommitProfileProcessor.createNonLogging(metricsManager));
CommitProfileProcessor.createNonLogging(metricsManager),
true);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ private Transaction startTransaction(PreCommitCondition preCommitCondition, Conf
AbstractTransactionTest.DEFAULT_GET_RANGES_CONCURRENCY,
MultiTableSweepQueueWriter.NO_OP,
MoreExecutors.newDirectExecutorService(),
CommitProfileProcessor.createNonLogging(metricsManager)) {
CommitProfileProcessor.createNonLogging(metricsManager),
true) {
@Override
protected Map<Cell, byte[]> transformGetsForTesting(Map<Cell, byte[]> map) {
return Maps.transformValues(map, input -> input.clone());
Expand Down
Loading