Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
lock immutable timestamp on reads (#3888)
Browse files Browse the repository at this point in the history
* configurable locking

* tests

* rename config flag

* increase test coverage

* actually check immutable ts locks if flag is set

* tests for lock checks

* refactor tests

* release notes
  • Loading branch information
gozakdag authored and jeremyk-91 committed Apr 4, 2019
1 parent 32c8ee8 commit f18113b
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,9 @@ public int getThresholdForLoggingLargeNumberOfTransactionLookups() {
public TransactionRetryStrategy retryStrategy() {
return TransactionRetryStrategy.Strategies.LEGACY.get();
}

@Value.Default
public boolean lockImmutableTsOnReadOnlyTransactions() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -749,11 +749,15 @@ private void validatePreCommitRequirementsOnReadIfNecessary(TableReference table
}

private boolean isValidationNecessaryOnReads(TableReference tableRef) {
return validateLocksOnReads && isThoroughlySwept(tableRef);
return validateLocksOnReads && requiresImmutableTimestampLocking(tableRef);
}

private boolean isValidationNecessaryOnCommit(TableReference tableRef) {
return !validateLocksOnReads && isThoroughlySwept(tableRef);
return !validateLocksOnReads && requiresImmutableTimestampLocking(tableRef);
}

private boolean requiresImmutableTimestampLocking(TableReference tableRef) {
return isThoroughlySwept(tableRef) || transactionConfig.get().lockImmutableTsOnReadOnlyTransactions();
}

private boolean isThoroughlySwept(TableReference tableRef) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,18 @@ protected SnapshotTransaction createTransaction(
validateLocksOnReads,
transactionConfig);
}

@Override
public <T, C extends PreCommitCondition, E extends Exception> T runTaskWithConditionReadOnly(
C condition, ConditionAwareTransactionTask<T, C, E> task) throws E {
if (transactionConfig.get().lockImmutableTsOnReadOnlyTransactions()) {
return runTaskWithConditionThrowOnConflict(condition, task);
} else {
return runTaskWithConditionReadOnlyInternal(condition, task);
}
}

private <T, C extends PreCommitCondition, E extends Exception> T runTaskWithConditionReadOnlyInternal(
C condition, ConditionAwareTransactionTask<T, C, E> task) throws E {
checkOpen();
long immutableTs = getApproximateImmutableTimestamp();
SnapshotTransaction transaction = new SnapshotTransaction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -49,6 +50,7 @@
import com.palantir.lock.LockRefreshToken;
import com.palantir.lock.LockService;
import com.palantir.lock.impl.LegacyTimelockService;
import com.palantir.lock.v2.TimelockService;
import com.palantir.timestamp.InMemoryTimestampService;

public class SnapshotTransactionManagerTest {
Expand Down Expand Up @@ -175,4 +177,54 @@ public void registersMetrics() throws InterruptedException {
assertThat(registry.getTimers().get(SETUP_TASK_METRIC_NAME).getCount()).isGreaterThanOrEqualTo(1);
assertThat(registry.getTimers().get(FINISH_TASK_METRIC_NAME).getCount()).isGreaterThanOrEqualTo(1);
}

@Test
public void callsStartTransactionForReadOnlyTransactionsIfFlagIsSet() throws InterruptedException {
TimelockService timelockService =
spy(new LegacyTimelockService(timestampService, closeableLockService, LockClient.of("lock")));
when(closeableLockService.lock(any(), any())).thenReturn(new LockRefreshToken(BigInteger.ONE, Long.MAX_VALUE));
SnapshotTransactionManager transactionManager = createSnapshotTransactionManager(timelockService, true);

transactionManager.runTaskReadOnly(tx -> "ignored");
verify(timelockService).startIdentifiedAtlasDbTransaction();

transactionManager.runTaskWithConditionReadOnly(PreCommitConditions.NO_OP, (tx, condition) -> "ignored");
verify(timelockService, times(2)).startIdentifiedAtlasDbTransaction();
}

@Test
public void doesNotCallStartTransactionForReadOnlyTransactionsIfFlagIsNotSet() {
TimelockService timelockService =
spy(new LegacyTimelockService(timestampService, closeableLockService, LockClient.of("lock")));
SnapshotTransactionManager transactionManager = createSnapshotTransactionManager(timelockService, false);

transactionManager.runTaskReadOnly(tx -> "ignored");
transactionManager.runTaskWithConditionReadOnly(PreCommitConditions.NO_OP, (tx, condition) -> "ignored");
verify(timelockService, never()).startIdentifiedAtlasDbTransaction();
}

private SnapshotTransactionManager createSnapshotTransactionManager(
TimelockService timelockService, boolean grabImmutableTsLockOnReads) {
return new SnapshotTransactionManager(
metricsManager,
keyValueService,
timelockService,
timestampService,
mock(LockService.class), // not closeable
mock(TransactionService.class),
() -> null,
null,
null,
cleaner,
false,
TransactionTestConstants.GET_RANGES_THREAD_POOL_SIZE,
TransactionTestConstants.DEFAULT_GET_RANGES_CONCURRENCY,
TimestampCache.createForTests(),
MultiTableSweepQueueWriter.NO_OP,
executorService,
true,
() -> ImmutableTransactionConfig.builder()
.lockImmutableTsOnReadOnlyTransactions(grabImmutableTsLockOnReads)
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@

@SuppressWarnings("checkstyle:all")
public class SnapshotTransactionTest extends AtlasDbTestCase {
private static final TransactionConfig TRANSACTION_CONFIG = ImmutableTransactionConfig.builder().build();
private TransactionConfig transactionConfig;

protected final TimestampCache timestampCache = new TimestampCache(
metricsManager.getRegistry(), () -> AtlasDbConstants.DEFAULT_TIMESTAMP_CACHE_SIZE);
Expand Down Expand Up @@ -203,20 +203,28 @@ public void cleanup() {}
static final TableReference TABLE1 = TableReference.createFromFullyQualifiedName("default.table1");
static final TableReference TABLE2 = TableReference.createFromFullyQualifiedName("default.table2");

static final TableReference TABLE_SWEPT_THOROUGH = TableReference.createFromFullyQualifiedName("default.table2");
static final TableReference TABLE_SWEPT_THOROUGH = TableReference.createFromFullyQualifiedName("default.table3");
static final TableReference TABLE_SWEPT_CONSERVATIVE =
TableReference.createFromFullyQualifiedName("default.table4");

private static final Cell TEST_CELL = Cell.create(PtBytes.toBytes("row1"), PtBytes.toBytes("column1"));

@Override
@Before
public void setUp() throws Exception {
super.setUp();

transactionConfig = ImmutableTransactionConfig.builder().build();

keyValueService.createTable(TABLE, AtlasDbConstants.GENERIC_TABLE_METADATA);
keyValueService.createTable(TABLE1, AtlasDbConstants.GENERIC_TABLE_METADATA);
keyValueService.createTable(TABLE2, AtlasDbConstants.GENERIC_TABLE_METADATA);
keyValueService.createTable(
TABLE_SWEPT_THOROUGH,
getTableMetadataForSweepStrategy(SweepStrategy.THOROUGH).persistToBytes());
keyValueService.createTable(
TABLE_SWEPT_CONSERVATIVE,
getTableMetadataForSweepStrategy(SweepStrategy.CONSERVATIVE).persistToBytes());
}

@Test
Expand Down Expand Up @@ -297,7 +305,7 @@ public void testLockAfterGet() throws Exception {
MultiTableSweepQueueWriter.NO_OP,
MoreExecutors.newDirectExecutorService(),
true,
() -> TRANSACTION_CONFIG);
() -> transactionConfig);
try {
snapshot.get(TABLE, ImmutableSet.of(cell));
fail();
Expand Down Expand Up @@ -363,7 +371,7 @@ public void testPutCleanup() throws Exception {
MultiTableSweepQueueWriter.NO_OP,
MoreExecutors.newDirectExecutorService(),
true,
() -> TRANSACTION_CONFIG);
() -> transactionConfig);
snapshot.delete(TABLE, ImmutableSet.of(cell));
snapshot.commit();

Expand Down Expand Up @@ -1208,6 +1216,36 @@ public void testIgnoresOrphanedSweepSentinel() {
assertThat(txn.get(TABLE, ImmutableSet.of(cell)), is(ImmutableMap.of()));
}

@Test
public void checkImmutableTsLockAfterReadsForConservativeIfFlagIsSet() {
TimelockService timelockService = spy(new LegacyTimelockService(timestampService, lockService, lockClient));
long transactionTs = timelockService.getFreshTimestamp();
LockImmutableTimestampResponse res =
timelockService.lockImmutableTimestamp();

setTransactionConfig(ImmutableTransactionConfig.builder()
.lockImmutableTsOnReadOnlyTransactions(true)
.build());

SnapshotTransaction transaction = getSnapshotTransactionWith(
timelockService,
() -> transactionTs,
res,
PreCommitConditions.NO_OP,
true);

transaction.get(TABLE_SWEPT_CONSERVATIVE, ImmutableSet.of(TEST_CELL));
verify(timelockService).refreshLockLeases(ImmutableSet.of(res.getLock()));

transaction.commit();
timelockService.unlock(ImmutableSet.of(res.getLock()));

}

private void setTransactionConfig(TransactionConfig config) {
transactionConfig = config;
}

private SnapshotTransaction getSnapshotTransactionWith(
TimelockService timelockService,
Supplier<Long> startTs,
Expand Down Expand Up @@ -1250,7 +1288,7 @@ private SnapshotTransaction getSnapshotTransactionWith(
MultiTableSweepQueueWriter.NO_OP,
MoreExecutors.newDirectExecutorService(),
validateLocksOnReads,
() -> TRANSACTION_CONFIG);
() -> transactionConfig);
}

private void writeCells(TableReference table, ImmutableMap<Cell, byte[]> cellsToWrite) {
Expand Down
6 changes: 6 additions & 0 deletions docs/source/release_notes/release-notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ develop
* - Type
- Change

* - |new|
- A new configuration option ``lockImmutableTsOnReadOnlyTransactions`` is added under ``atlas-runtime.transaction``. Default value for this flag is ``false``, and setting it to ``true``
enables running read-only transactions on thorough sweep tables; but introduces a perf overhead to read-only transactions on conservative sweep tables. This is an experimental feature,
please do not change the default value for this flag without talking to AtlasDB team.
(`Pull Request <https://github.com/palantir/atlasdb/pull/3888>`__)

* - |fixed|
- ``putUnlessExists`` in Cassandra KVS now produces correct cell names when failing with a ``KeyAlreadyExistsException``.
Previously, Cassandra KVS used to produce incorrect cell names (that were the concatenation of the correct cell name and an encoding of the AtlasDB timestamp).
Expand Down

0 comments on commit f18113b

Please sign in to comment.