From c9344f5b2ef9d2d13b64cf4d02142cafe08575bb Mon Sep 17 00:00:00 2001 From: Jolyon Date: Tue, 28 Sep 2021 16:03:45 +0100 Subject: [PATCH] [LW] Rework value cache commit flow (#5652) --- .../api/cache/LockWatchValueScopingCache.java | 19 ++++- .../api/cache/TransactionScopedCache.java | 12 +-- .../api/watch/LockWatchManagerInternal.java | 2 + .../api/watch/NoOpLockWatchManager.java | 5 ++ .../cache/LockWatchValueScopingCacheImpl.java | 49 +++++++---- .../keyvalue/api/watch/ClientLogEvents.java | 38 +++------ .../api/watch/LockWatchEventCacheImpl.java | 33 +++++--- .../api/watch/LockWatchManagerImpl.java | 5 ++ .../transaction/impl/SnapshotTransaction.java | 10 --- .../impl/SnapshotTransactionManager.java | 2 + .../LockWatchValueScopingCacheImplTest.java | 84 +++++++++++-------- .../watch/DuplicatingLockWatchEventCache.java | 11 ++- .../LockWatchEventCacheIntegrationTest.java | 2 +- .../api/watch/LockWatchManagerImplTest.java | 2 +- .../watch/ResilientLockWatchProxyTest.java | 6 +- changelog/@unreleased/pr-5652.v2.yml | 7 ++ .../lock/watch/SpanningCommitUpdate.java | 40 --------- .../palantir/lock/watch/LockWatchCache.java | 4 +- .../lock/watch/LockWatchCacheImpl.java | 14 +++- .../lock/watch/LockWatchEventCache.java | 15 ++-- .../lock/watch/LockWatchValueCache.java | 6 +- .../lock/watch/NoOpLockWatchEventCache.java | 10 +-- .../lock/watch/NoOpLockWatchValueCache.java | 7 +- .../lock/watch/LockWatchCacheImplTest.java | 4 +- .../LockWatchValueIntegrationTest.java | 48 +++++++++++ 25 files changed, 245 insertions(+), 190 deletions(-) create mode 100644 changelog/@unreleased/pr-5652.v2.yml delete mode 100644 lock-api-objects/src/main/java/com/palantir/lock/watch/SpanningCommitUpdate.java diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/LockWatchValueScopingCache.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/LockWatchValueScopingCache.java index d1eb6a58b6a..56d89fa5380 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/LockWatchValueScopingCache.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/LockWatchValueScopingCache.java @@ -37,14 +37,27 @@ public interface LockWatchValueScopingCache extends LockWatchValueCache { void processStartTransactions(Set startTimestamps); /** - * This does *not* remove state from the cache - {@link #removeTransactionState(long)} must be called at the end + * This does *not* remove state from the cache - {@link #ensureStateRemoved(long)} must be called at the end * of the transaction to do so, or else there will be a memory leak. */ @Override - void updateCacheOnCommit(Set startTimestamps); + void updateCacheWithCommitTimestampsInformation(Set startTimestamps); + /** + * Guarantees that all relevant state for the given transaction has been removed. If the state is already gone (by + * calling {@link LockWatchValueScopingCache#onSuccessfulCommit(long)}, this will be a no-op. Failure to call this + * method may result in memory leaks (particularly for aborting transactions). + */ + @Override + void ensureStateRemoved(long startTs); + + /** + * Performs final cleanup for a given transaction (identified by its start timestamp). This removes state associated + * with a transaction, and flushes values to the central value cache. Calling this method before a transaction is + * fully committed may result in incorrect values being cached. + */ @Override - void removeTransactionState(long startTimestamp); + void onSuccessfulCommit(long startTs); TransactionScopedCache getTransactionScopedCache(long startTs); diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/TransactionScopedCache.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/TransactionScopedCache.java index 681bab31bd8..d015a23fb3e 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/TransactionScopedCache.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/TransactionScopedCache.java @@ -85,7 +85,7 @@ NavigableMap> getRows( /** * This method should be called before retrieving the value or hit digest, as it guarantees that no more reads or - * writes will be performed on the cache. + * writes will be performed on the cache. This method is idempotent, and may legitimately be called multiple times. */ void finalise(); @@ -94,14 +94,4 @@ NavigableMap> getRows( HitDigest getHitDigest(); TransactionScopedCache createReadOnlyCache(CommitUpdate commitUpdate); - - /** - * Checks if any values have been read remotely and stored locally for later flushing to the central cache. Note - * that this method **will** finalise the cache in order to retrieve the digest; no further reads or writes may - * be performed once this is called. - */ - default boolean hasUpdates() { - finalise(); - return !getValueDigest().loadedValues().isEmpty(); - } } diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchManagerInternal.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchManagerInternal.java index df1125a2149..1e8a84fde3c 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchManagerInternal.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchManagerInternal.java @@ -24,6 +24,8 @@ public abstract class LockWatchManagerInternal extends LockWatchManager implemen public abstract void removeTransactionStateFromCache(long startTs); + public abstract void onTransactionCommit(long startTs); + public abstract TransactionScopedCache getTransactionScopedCache(long startTs); public abstract TransactionScopedCache getReadOnlyTransactionScopedCache(long startTs); diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/NoOpLockWatchManager.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/NoOpLockWatchManager.java index efea426c813..c72c1313ff0 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/NoOpLockWatchManager.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/NoOpLockWatchManager.java @@ -63,6 +63,11 @@ public void removeTransactionStateFromCache(long startTs) { cache.removeTransactionStateFromCache(startTs); } + @Override + public void onTransactionCommit(long startTs) { + cache.onTransactionCommit(startTs); + } + @Override TransactionsLockWatchUpdate getUpdateForTransactions( Set startTimestamps, Optional version) { diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/LockWatchValueScopingCacheImpl.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/LockWatchValueScopingCacheImpl.java index f07fc8b7854..20836e5e887 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/LockWatchValueScopingCacheImpl.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/LockWatchValueScopingCacheImpl.java @@ -28,11 +28,10 @@ import com.palantir.atlasdb.transaction.api.TransactionLockWatchFailedException; import com.palantir.common.streams.KeyedStream; import com.palantir.lock.LockDescriptor; -import com.palantir.lock.watch.CommitUpdate.Visitor; +import com.palantir.lock.watch.CommitUpdate; import com.palantir.lock.watch.LockWatchEvent; import com.palantir.lock.watch.LockWatchEventCache; import com.palantir.lock.watch.LockWatchVersion; -import com.palantir.lock.watch.SpanningCommitUpdate; import com.palantir.lock.watch.TransactionsLockWatchUpdate; import java.util.Comparator; import java.util.List; @@ -100,28 +99,19 @@ public synchronized void processStartTransactions(Set startTimestamps) { } @Override - public synchronized void updateCacheOnCommit(Set startTimestamps) { + public synchronized void updateCacheWithCommitTimestampsInformation(Set startTimestamps) { startTimestamps.forEach(this::processCommitUpdate); } @Override - public synchronized void removeTransactionState(long startTimestamp) { + public synchronized void ensureStateRemoved(long startTimestamp) { StartTimestamp startTs = StartTimestamp.of(startTimestamp); snapshotStore.removeTimestamp(startTs); cacheStore.removeCache(startTs); } @Override - public TransactionScopedCache getTransactionScopedCache(long startTs) { - return cacheStore.getCache(StartTimestamp.of(startTs)); - } - - @Override - public TransactionScopedCache getReadOnlyTransactionScopedCacheForCommit(long startTs) { - return cacheStore.getReadOnlyCache(StartTimestamp.of(startTs)); - } - - private synchronized void processCommitUpdate(long startTimestamp) { + public synchronized void onSuccessfulCommit(long startTimestamp) { StartTimestamp startTs = StartTimestamp.of(startTimestamp); TransactionScopedCache cache = cacheStore.getCache(startTs); cache.finalise(); @@ -131,10 +121,8 @@ private synchronized void processCommitUpdate(long startTimestamp) { return; } - SpanningCommitUpdate spanningCommitUpdate = eventCache.getSpanningCommitUpdate(startTimestamp); - cacheStore.createReadOnlyCache(startTs, spanningCommitUpdate.transactionCommitUpdate()); - - spanningCommitUpdate.spanningCommitUpdate().accept(new Visitor() { + CommitUpdate commitUpdate = eventCache.getEventUpdate(startTimestamp); + commitUpdate.accept(new CommitUpdate.Visitor() { @Override public Void invalidateAll() { // This might happen due to an election or if we exceeded the maximum number of events held in @@ -155,6 +143,31 @@ public Void invalidateSome(Set invalidatedLocks) { return null; } }); + ensureStateRemoved(startTimestamp); + } + + /** + * Retrieval of transaction scoped caches (read-only or otherwise) does not need to be synchronised. The main + * reason for this is that the only race condition that could conceivably occur is for the state to not exist here + * when it should. However, in all of these cases, a no-op cache is used instead, and thus the transaction will + * simply go ahead but with caching disabled (which is guaranteed to be safe). + */ + @Override + public TransactionScopedCache getTransactionScopedCache(long startTs) { + return cacheStore.getCache(StartTimestamp.of(startTs)); + } + + @Override + public TransactionScopedCache getReadOnlyTransactionScopedCacheForCommit(long startTs) { + return cacheStore.getReadOnlyCache(StartTimestamp.of(startTs)); + } + + private synchronized void processCommitUpdate(long startTimestamp) { + StartTimestamp startTs = StartTimestamp.of(startTimestamp); + TransactionScopedCache cache = cacheStore.getCache(startTs); + cache.finalise(); + CommitUpdate commitUpdate = eventCache.getCommitUpdate(startTimestamp); + cacheStore.createReadOnlyCache(startTs, commitUpdate); } /** diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/ClientLogEvents.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/ClientLogEvents.java index 9d848dc1914..7f3151602fa 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/ClientLogEvents.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/ClientLogEvents.java @@ -29,7 +29,6 @@ import com.palantir.lock.watch.LockWatchCreatedEvent; import com.palantir.lock.watch.LockWatchEvent; import com.palantir.lock.watch.LockWatchVersion; -import com.palantir.lock.watch.SpanningCommitUpdate; import com.palantir.lock.watch.TransactionsLockWatchUpdate; import com.palantir.lock.watch.UnlockEvent; import com.palantir.logsafe.SafeArg; @@ -81,31 +80,20 @@ each version starting with the client version (exclusive) and ending with latest .build(); } - default SpanningCommitUpdate toSpanningCommitUpdate( - LockWatchVersion startVersion, CommitInfo commitInfo, LockWatchVersion endVersion) { + default CommitUpdate toCommitUpdate( + LockWatchVersion startVersion, LockWatchVersion endVersion, Optional commitInfo) { if (clearCache()) { - return SpanningCommitUpdate.invalidateAll(); + return CommitUpdate.invalidateAll(); } // We want to ensure that we do not miss any versions, but we do not care about the event with the same version // as the start version. verifyReturnedEventsEnclosesTransactionVersions(startVersion.version() + 1, endVersion.version()); - LockEventVisitor eventVisitor = new LockEventVisitor(commitInfo.commitLockToken()); - Set transactionLocks = new HashSet<>(); - Set spanningLocks = new HashSet<>(); - events().events().forEach(event -> { - Set descriptors = event.accept(eventVisitor); - spanningLocks.addAll(descriptors); - - if (event.sequence() <= commitInfo.commitVersion().version()) { - transactionLocks.addAll(descriptors); - } - }); - return SpanningCommitUpdate.builder() - .transactionCommitUpdate(CommitUpdate.invalidateSome(transactionLocks)) - .spanningCommitUpdate(CommitUpdate.invalidateSome(spanningLocks)) - .build(); + LockEventVisitor eventVisitor = new LockEventVisitor(commitInfo.map(CommitInfo::commitLockToken)); + Set locksTakenOut = new HashSet<>(); + events().events().forEach(event -> locksTakenOut.addAll(event.accept(eventVisitor))); + return CommitUpdate.invalidateSome(locksTakenOut); } default void verifyReturnedEventsEnclosesTransactionVersions(long lowerBound, long upperBound) { @@ -130,13 +118,11 @@ class Builder extends ImmutableClientLogEvents.Builder {} final class LockEventVisitor implements LockWatchEvent.Visitor> { private final Optional commitRequestId; - private LockEventVisitor(LockToken commitLocksToken) { - if (commitLocksToken instanceof LeasedLockToken) { - commitRequestId = Optional.of( - ((LeasedLockToken) commitLocksToken).serverToken().getRequestId()); - } else { - commitRequestId = Optional.empty(); - } + private LockEventVisitor(Optional commitLocksToken) { + commitRequestId = commitLocksToken + .filter(lockToken -> lockToken instanceof LeasedLockToken) + .map(lockToken -> + ((LeasedLockToken) lockToken).serverToken().getRequestId()); } @Override diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchEventCacheImpl.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchEventCacheImpl.java index cb3b1e459f5..6500a10b6d0 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchEventCacheImpl.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchEventCacheImpl.java @@ -26,7 +26,6 @@ import com.palantir.lock.watch.LockWatchStateUpdate; import com.palantir.lock.watch.LockWatchVersion; import com.palantir.lock.watch.NoOpLockWatchEventCache; -import com.palantir.lock.watch.SpanningCommitUpdate; import com.palantir.lock.watch.TransactionUpdate; import com.palantir.lock.watch.TransactionsLockWatchUpdate; import java.util.Collection; @@ -81,29 +80,39 @@ public synchronized void processGetCommitTimestampsUpdate( @Override public synchronized CommitUpdate getCommitUpdate(long startTs) { - return getSpanningCommitUpdate(startTs).transactionCommitUpdate(); + Optional startVersion = timestampStateStore.getStartVersion(startTs); + Optional maybeCommitInfo = timestampStateStore.getCommitInfo(startTs); + + assertTrue( + maybeCommitInfo.isPresent() && startVersion.isPresent(), + "start or commit info not processed for start timestamp"); + + CommitInfo commitInfo = maybeCommitInfo.get(); + + VersionBounds versionBounds = VersionBounds.builder() + .startVersion(startVersion) + .endVersion(commitInfo.commitVersion()) + .build(); + + return eventLog.getEventsBetweenVersions(versionBounds) + .toCommitUpdate(startVersion.get(), commitInfo.commitVersion(), maybeCommitInfo); } @Override - public SpanningCommitUpdate getSpanningCommitUpdate(long startTs) { + public CommitUpdate getEventUpdate(long startTs) { Optional startVersion = timestampStateStore.getStartVersion(startTs); - Optional maybeCommitInfo = timestampStateStore.getCommitInfo(startTs); + Optional currentVersion = eventLog.getLatestKnownVersion(); assertTrue( - maybeCommitInfo.isPresent() - && startVersion.isPresent() - && eventLog.getLatestKnownVersion().isPresent(), - "start or commit info not processed for start timestamp, or current version missing"); + currentVersion.isPresent() && startVersion.isPresent(), "essential information missing for timestamp"); - LockWatchVersion currentVersion = eventLog.getLatestKnownVersion().get(); - CommitInfo commitInfo = maybeCommitInfo.get(); VersionBounds versionBounds = VersionBounds.builder() .startVersion(startVersion) - .endVersion(currentVersion) + .endVersion(currentVersion.get()) .build(); return eventLog.getEventsBetweenVersions(versionBounds) - .toSpanningCommitUpdate(startVersion.get(), commitInfo, currentVersion); + .toCommitUpdate(startVersion.get(), currentVersion.get(), Optional.empty()); } @Override diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchManagerImpl.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchManagerImpl.java index a7c8a2000c7..52d96c586e2 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchManagerImpl.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchManagerImpl.java @@ -132,6 +132,11 @@ public void removeTransactionStateFromCache(long startTs) { lockWatchCache.removeTransactionStateFromCache(startTs); } + @Override + public void onTransactionCommit(long startTs) { + lockWatchCache.onTransactionCommit(startTs); + } + @Override public TransactionScopedCache getTransactionScopedCache(long startTs) { return valueScopingCache.getTransactionScopedCache(startTs); diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java index c99d31322e6..ecbabbe98fc 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java @@ -155,7 +155,6 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; @@ -1819,15 +1818,6 @@ private void commitWrites(TransactionService transactionService) { if (validationNecessaryForInvolvedTablesOnCommit()) { throwIfImmutableTsOrCommitLocksExpired(null); } - - // if the cache has been used, we must work out which values can be flushed to the central cache by - // obtaining a commit update, which is obtained via the get commit timestamp request. - if (getCache().hasUpdates()) { - timedAndTraced( - "getCommitTimestampForCaching", - () -> timelockService.getCommitTimestamp( - getStartTimestamp(), LockToken.of(UUID.randomUUID()))); - } return; } return; diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java index cea8ca132db..d3bb6985f2d 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java @@ -208,6 +208,8 @@ public List startTransactions(List lockWatchManager.onTransactionCommit(transaction.getTimestamp())); return new OpenTransactionImpl(transaction, immutableTsLock); }) .collect(Collectors.toList()); diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/cache/LockWatchValueScopingCacheImplTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/cache/LockWatchValueScopingCacheImplTest.java index a6be253502c..6ffcd257507 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/cache/LockWatchValueScopingCacheImplTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/cache/LockWatchValueScopingCacheImplTest.java @@ -78,6 +78,8 @@ public final class LockWatchValueScopingCacheImplTest { WATCH_EVENT.sequence(), ImmutableSet.of(), ImmutableSet.of(LockWatchReferences.entireTable(TABLE.getQualifiedName()))); + private static final LockWatchStateUpdate.Success SUCCESS_WITH_NO_UPDATES = + LockWatchStateUpdate.success(LEADER, 0L, ImmutableList.of()); private final CacheMetrics metrics = mock(CacheMetrics.class); private LockWatchEventCache eventCache; @@ -133,11 +135,9 @@ public void updateCacheOnCommitFlushesValuesToCentralCache() { verify(metrics, times(1)).registerHits(0); verify(metrics, times(1)).registerMisses(1); - processCommitTimestamp(TIMESTAMP_1, 0L); - valueCache.updateCacheOnCommit(ImmutableSet.of(TIMESTAMP_1)); + processSuccessfulCommit(TIMESTAMP_1, 0L); - eventCache.processStartTransactionsUpdate( - ImmutableSet.of(TIMESTAMP_2), LockWatchStateUpdate.success(LEADER, 0L, ImmutableList.of())); + eventCache.processStartTransactionsUpdate(ImmutableSet.of(TIMESTAMP_2), SUCCESS_WITH_NO_UPDATES); valueCache.processStartTransactions(ImmutableSet.of(TIMESTAMP_2)); TransactionScopedCache scopedCache2 = valueCache.getTransactionScopedCache(TIMESTAMP_2); @@ -153,7 +153,7 @@ public void updateCacheOnCommitThrowsOnLeaderElection() { TransactionScopedCache scopedCache1 = valueCache.getTransactionScopedCache(TIMESTAMP_1); assertThat(getRemotelyReadCells(scopedCache1, TABLE, CELL_1)).containsExactlyInAnyOrder(CELL_1); - processCommitTimestamp(TIMESTAMP_1, 0L); + processSuccessfulCommit(TIMESTAMP_1, 0L); eventCache.processStartTransactionsUpdate( ImmutableSet.of(TIMESTAMP_2), @@ -161,9 +161,9 @@ public void updateCacheOnCommitThrowsOnLeaderElection() { // Throws this message because the leader election cleared the info entirely (as opposed to us knowing that // there was an election) - assertThatThrownBy(() -> valueCache.updateCacheOnCommit(ImmutableSet.of(TIMESTAMP_1))) + assertThatThrownBy(() -> valueCache.updateCacheWithCommitTimestampsInformation(ImmutableSet.of(TIMESTAMP_1))) .isExactlyInstanceOf(TransactionLockWatchFailedException.class) - .hasMessage("start or commit info not processed for start timestamp, or current version missing"); + .hasMessage("start or commit info not processed for start timestamp"); } @Test @@ -180,8 +180,8 @@ public void readOnlyTransactionCacheFiltersOutNewlyLockedValues() { // This update has a lock taken out for CELL_1: this means that all reads for it must be remote. eventCache.processStartTransactionsUpdate( ImmutableSet.of(TIMESTAMP_2), LockWatchStateUpdate.success(LEADER, 1L, ImmutableList.of(LOCK_EVENT))); - processCommitTimestamp(TIMESTAMP_1, 1L); - valueCache.updateCacheOnCommit(ImmutableSet.of(TIMESTAMP_1)); + processEventCacheCommit(TIMESTAMP_1, 1L); + valueCache.updateCacheWithCommitTimestampsInformation(ImmutableSet.of(TIMESTAMP_1)); // The difference between the read only cache and the new scoped cache, despite being at the same sequence, // is that the read-only cache contains all the locally cached values, including writes, whereas the fresh @@ -191,10 +191,10 @@ public void readOnlyTransactionCacheFiltersOutNewlyLockedValues() { assertThat(getRemotelyReadCells(readOnlyCache, TABLE, CELL_1, CELL_2)).containsExactlyInAnyOrder(CELL_1); verify(metrics, times(2)).registerHits(1); verify(metrics, times(2)).registerMisses(1); + valueCache.onSuccessfulCommit(TIMESTAMP_1); TransactionScopedCache scopedCache2 = valueCache.getTransactionScopedCache(TIMESTAMP_2); assertThat(getRemotelyReadCells(scopedCache2, TABLE, CELL_1, CELL_2)).containsExactlyInAnyOrder(CELL_1, CELL_2); - // noop cache } @Test @@ -204,8 +204,7 @@ public void lockUpdatesPreventCachingAndUnlockUpdatesAllowItAgain() { TransactionScopedCache scopedCache1 = valueCache.getTransactionScopedCache(TIMESTAMP_1); assertThat(getRemotelyReadCells(scopedCache1, TABLE, CELL_1, CELL_3)).containsExactlyInAnyOrder(CELL_1, CELL_3); - processCommitTimestamp(TIMESTAMP_1, 0L); - valueCache.updateCacheOnCommit(ImmutableSet.of(TIMESTAMP_1)); + processSuccessfulCommit(TIMESTAMP_1, 0L); assertThat(scopedCache1.getHitDigest().hitCells()).isEmpty(); verify(metrics, times(1)).registerHits(0); verify(metrics, times(1)).registerMisses(2); @@ -225,8 +224,7 @@ public void lockUpdatesPreventCachingAndUnlockUpdatesAllowItAgain() { verify(metrics, times(1)).registerHits(2); verify(metrics, times(1)).registerMisses(1); - processCommitTimestamp(TIMESTAMP_2, 1L); - valueCache.updateCacheOnCommit(ImmutableSet.of(TIMESTAMP_2)); + processSuccessfulCommit(TIMESTAMP_2, 1L); assertThat(scopedCache2.getHitDigest().hitCells()).containsExactly(CellReference.of(TABLE, CELL_3)); eventCache.processStartTransactionsUpdate( @@ -289,18 +287,15 @@ public void leaderElectionCausesCacheToBeCleared() { // Stores CELL_1 -> VALUE_1 in central cache TransactionScopedCache scopedCache1 = valueCache.getTransactionScopedCache(TIMESTAMP_1); assertThat(getRemotelyReadCells(scopedCache1, TABLE, CELL_1)).containsExactlyInAnyOrder(CELL_1); - processCommitTimestamp(TIMESTAMP_1, 0L); - valueCache.updateCacheOnCommit(ImmutableSet.of(TIMESTAMP_1)); + processSuccessfulCommit(TIMESTAMP_1, 0L); - eventCache.processStartTransactionsUpdate( - ImmutableSet.of(TIMESTAMP_2), LockWatchStateUpdate.success(LEADER, 0L, ImmutableList.of())); + eventCache.processStartTransactionsUpdate(ImmutableSet.of(TIMESTAMP_2), SUCCESS_WITH_NO_UPDATES); valueCache.processStartTransactions(ImmutableSet.of(TIMESTAMP_2)); // Confirms entry is present TransactionScopedCache scopedCache2 = valueCache.getTransactionScopedCache(TIMESTAMP_2); assertThat(getRemotelyReadCells(scopedCache2, TABLE, CELL_1)).isEmpty(); - processCommitTimestamp(TIMESTAMP_2, 0L); - valueCache.updateCacheOnCommit(ImmutableSet.of(TIMESTAMP_2)); + processSuccessfulCommit(TIMESTAMP_2, 0L); UUID newLeader = UUID.randomUUID(); eventCache.processStartTransactionsUpdate( @@ -311,14 +306,6 @@ public void leaderElectionCausesCacheToBeCleared() { // Confirms entry is no longer present TransactionScopedCache scopedCache3 = valueCache.getTransactionScopedCache(TIMESTAMP_3); assertThat(getRemotelyReadCells(scopedCache3, TABLE, CELL_1)).containsExactlyInAnyOrder(CELL_1); - eventCache.processGetCommitTimestampsUpdate( - ImmutableList.of(TransactionUpdate.builder() - .startTs(TIMESTAMP_3) - .commitTs(999_999_999) - .writesToken(LockToken.of(UUID.randomUUID())) - .build()), - LockWatchStateUpdate.success(newLeader, -1L, ImmutableList.of())); - valueCache.updateCacheOnCommit(ImmutableSet.of(TIMESTAMP_3)); } @Test @@ -350,7 +337,8 @@ public void locksAfterCommitTimeAreNotMissedWhenFlushingValues() { TransactionScopedCache scopedCache1 = valueCache.getTransactionScopedCache(TIMESTAMP_1); assertThat(getRemotelyReadCells(scopedCache1, TABLE, CELL_1, CELL_3)).containsExactlyInAnyOrder(CELL_1, CELL_3); - processCommitTimestamp(TIMESTAMP_1, 0L); + processEventCacheCommit(TIMESTAMP_1, 0L); + valueCache.updateCacheWithCommitTimestampsInformation(ImmutableSet.of(TIMESTAMP_1)); verify(metrics, times(1)).registerHits(0); verify(metrics, times(1)).registerMisses(2); @@ -365,13 +353,12 @@ public void locksAfterCommitTimeAreNotMissedWhenFlushingValues() { verify(metrics, times(2)).registerHits(0); verify(metrics, times(2)).registerMisses(2); - // Finally, the first transaction commits, but only after a lock has been taken out on one of the cached cells - valueCache.updateCacheOnCommit(ImmutableSet.of(TIMESTAMP_1)); - // Confirm that the read only cache ignores the new lock, since it happened after commit time TransactionScopedCache readOnlyCache1 = valueCache.getReadOnlyTransactionScopedCacheForCommit(TIMESTAMP_1); assertThat(getRemotelyReadCells(readOnlyCache1, TABLE, CELL_1, CELL_3)).isEmpty(); - valueCache.removeTransactionState(TIMESTAMP_1); + + // Finally, the first transaction commits, but only after a lock has been taken out on one of the cached cells + valueCache.onSuccessfulCommit(TIMESTAMP_1); // New transaction caches should have CELL_3 which was never locked, but CELL_1 should have been filtered out eventCache.processStartTransactionsUpdate( @@ -382,10 +369,37 @@ public void locksAfterCommitTimeAreNotMissedWhenFlushingValues() { assertThat(getRemotelyReadCells(scopedCache3, TABLE, CELL_1, CELL_3)).containsExactlyInAnyOrder(CELL_1); } - private void processCommitTimestamp(long startTimestamp, long sequence) { + @Test + public void ensureStateRemovedDoesNotFlushValuesToCentralCache() { + eventCache.processStartTransactionsUpdate(ImmutableSet.of(TIMESTAMP_1), LOCK_WATCH_SNAPSHOT); + valueCache.processStartTransactions(ImmutableSet.of(TIMESTAMP_1)); + + TransactionScopedCache scopedCache1 = valueCache.getTransactionScopedCache(TIMESTAMP_1); + assertThat(getRemotelyReadCells(scopedCache1, TABLE, CELL_1, CELL_3)).containsExactlyInAnyOrder(CELL_1, CELL_3); + processEventCacheCommit(TIMESTAMP_1, 0L); + valueCache.updateCacheWithCommitTimestampsInformation(ImmutableSet.of(TIMESTAMP_1)); + valueCache.ensureStateRemoved(TIMESTAMP_1); + verify(metrics, times(1)).registerHits(0); + verify(metrics, times(1)).registerMisses(2); + + eventCache.processStartTransactionsUpdate(ImmutableSet.of(TIMESTAMP_2), SUCCESS_WITH_NO_UPDATES); + valueCache.processStartTransactions(ImmutableSet.of(TIMESTAMP_2)); + + TransactionScopedCache scopedCache2 = valueCache.getTransactionScopedCache(TIMESTAMP_2); + assertThat(getRemotelyReadCells(scopedCache2, TABLE, CELL_1, CELL_3)).containsExactlyInAnyOrder(CELL_1, CELL_3); + verify(metrics, times(2)).registerHits(0); + verify(metrics, times(2)).registerMisses(2); + } + + private void processSuccessfulCommit(long startTimestamp, long sequence) { + processEventCacheCommit(startTimestamp, sequence); + valueCache.updateCacheWithCommitTimestampsInformation(ImmutableSet.of(startTimestamp)); + valueCache.onSuccessfulCommit(startTimestamp); + } + + private void processEventCacheCommit(long startTimestamp, long sequence) { eventCache.processGetCommitTimestampsUpdate( ImmutableList.of(TransactionUpdate.builder() - .startTs(startTimestamp) .startTs(startTimestamp) .commitTs(startTimestamp + 1337L) .writesToken(LockToken.of(UUID.randomUUID())) diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/watch/DuplicatingLockWatchEventCache.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/watch/DuplicatingLockWatchEventCache.java index 71113c13a96..5b9ce34abbd 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/watch/DuplicatingLockWatchEventCache.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/watch/DuplicatingLockWatchEventCache.java @@ -22,7 +22,6 @@ import com.palantir.lock.watch.LockWatchEventCache; import com.palantir.lock.watch.LockWatchStateUpdate; import com.palantir.lock.watch.LockWatchVersion; -import com.palantir.lock.watch.SpanningCommitUpdate; import com.palantir.lock.watch.TransactionUpdate; import com.palantir.lock.watch.TransactionsLockWatchUpdate; import java.util.Collection; @@ -68,11 +67,6 @@ public CommitUpdate getCommitUpdate(long startTs) { return mainCache.getCommitUpdate(startTs); } - @Override - public SpanningCommitUpdate getSpanningCommitUpdate(long startTs) { - return mainCache.getSpanningCommitUpdate(startTs); - } - @Override public TransactionsLockWatchUpdate getUpdateForTransactions( Set startTimestamps, Optional version) { @@ -84,6 +78,11 @@ public void removeTransactionStateFromCache(long startTimestamp) { mainCache.removeTransactionStateFromCache(startTimestamp); } + @Override + public CommitUpdate getEventUpdate(long startTs) { + return mainCache.getEventUpdate(startTs); + } + private void validateVersionEquality() { assertThat(mainCache.lastKnownVersion()).isEqualTo(secondaryCache.lastKnownVersion()); } diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchEventCacheIntegrationTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchEventCacheIntegrationTest.java index 12ced60b30d..04ed3616183 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchEventCacheIntegrationTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchEventCacheIntegrationTest.java @@ -528,7 +528,7 @@ public void timestampEventsRetentionedThrows() { assertThatThrownBy(() -> eventCache.getCommitUpdate(START_TS_1)) .isExactlyInstanceOf(TransactionLockWatchFailedException.class) - .hasMessage("start or commit info not processed for start timestamp, or current version missing"); + .hasMessage("start or commit info not processed for start timestamp"); } @Test diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchManagerImplTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchManagerImplTest.java index 8a6b85324d9..7271b7e8638 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchManagerImplTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchManagerImplTest.java @@ -117,7 +117,7 @@ public void onlyWatchCurrentWatches() { public void removeTransactionStateTest() { manager.removeTransactionStateFromCache(1L); verify(lockWatchEventCache).removeTransactionStateFromCache(1L); - verify(valueScopingCache).removeTransactionState(1L); + verify(valueScopingCache).ensureStateRemoved(1L); } @Test diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/watch/ResilientLockWatchProxyTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/watch/ResilientLockWatchProxyTest.java index d058a0d5466..456dad41026 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/watch/ResilientLockWatchProxyTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/watch/ResilientLockWatchProxyTest.java @@ -83,9 +83,9 @@ public void valueCacheProxyAlsoFallsBackOnException() { // Normal operation long timestamp = 1L; Set timestamps = ImmutableSet.of(timestamp); - proxyCache.updateCacheOnCommit(timestamps); - verify(defaultCache).updateCacheOnCommit(timestamps); - verify(fallbackCache, never()).updateCacheOnCommit(any()); + proxyCache.updateCacheWithCommitTimestampsInformation(timestamps); + verify(defaultCache).updateCacheWithCommitTimestampsInformation(timestamps); + verify(fallbackCache, never()).updateCacheWithCommitTimestampsInformation(any()); // Failure when(defaultCache.getTransactionScopedCache(timestamp)) diff --git a/changelog/@unreleased/pr-5652.v2.yml b/changelog/@unreleased/pr-5652.v2.yml new file mode 100644 index 00000000000..88432461b0f --- /dev/null +++ b/changelog/@unreleased/pr-5652.v2.yml @@ -0,0 +1,7 @@ +type: fix +fix: + description: Lock-watch transaction caches now correctly flush loaded values to + the central cache after the transaction has completed the commit phase, instead + of doing so eagerly at commit timestamp time. + links: + - https://github.com/palantir/atlasdb/pull/5652 diff --git a/lock-api-objects/src/main/java/com/palantir/lock/watch/SpanningCommitUpdate.java b/lock-api-objects/src/main/java/com/palantir/lock/watch/SpanningCommitUpdate.java deleted file mode 100644 index 347676f5660..00000000000 --- a/lock-api-objects/src/main/java/com/palantir/lock/watch/SpanningCommitUpdate.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * (c) Copyright 2021 Palantir Technologies Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.lock.watch; - -import com.palantir.common.annotations.ImmutablesStyles.StagedBuilderStyle; -import com.palantir.lock.watch.ImmutableSpanningCommitUpdate.TransactionCommitUpdateBuildStage; -import org.immutables.value.Value; - -@Value.Immutable -@StagedBuilderStyle -public interface SpanningCommitUpdate { - CommitUpdate transactionCommitUpdate(); - - CommitUpdate spanningCommitUpdate(); - - static TransactionCommitUpdateBuildStage builder() { - return ImmutableSpanningCommitUpdate.builder(); - } - - static SpanningCommitUpdate invalidateAll() { - return SpanningCommitUpdate.builder() - .transactionCommitUpdate(CommitUpdate.invalidateAll()) - .spanningCommitUpdate(CommitUpdate.invalidateAll()) - .build(); - } -} diff --git a/lock-api/src/main/java/com/palantir/lock/watch/LockWatchCache.java b/lock-api/src/main/java/com/palantir/lock/watch/LockWatchCache.java index 00a7a56cb98..69927a64fc7 100644 --- a/lock-api/src/main/java/com/palantir/lock/watch/LockWatchCache.java +++ b/lock-api/src/main/java/com/palantir/lock/watch/LockWatchCache.java @@ -24,7 +24,9 @@ public interface LockWatchCache { void processCommitTimestampsUpdate(Collection transactionUpdates, LockWatchStateUpdate update); - void removeTransactionStateFromCache(long startTimestamp); + void removeTransactionStateFromCache(long startTs); + + void onTransactionCommit(long startTs); LockWatchEventCache getEventCache(); diff --git a/lock-api/src/main/java/com/palantir/lock/watch/LockWatchCacheImpl.java b/lock-api/src/main/java/com/palantir/lock/watch/LockWatchCacheImpl.java index 13d1fbd3e49..a36e4cfaf90 100644 --- a/lock-api/src/main/java/com/palantir/lock/watch/LockWatchCacheImpl.java +++ b/lock-api/src/main/java/com/palantir/lock/watch/LockWatchCacheImpl.java @@ -43,14 +43,20 @@ public void processStartTransactionsUpdate(Set startTimestamps, LockWatchS public void processCommitTimestampsUpdate( Collection transactionUpdates, LockWatchStateUpdate update) { eventCache.processGetCommitTimestampsUpdate(transactionUpdates, update); - valueCache.updateCacheOnCommit( + valueCache.updateCacheWithCommitTimestampsInformation( transactionUpdates.stream().map(TransactionUpdate::startTs).collect(Collectors.toSet())); } @Override - public void removeTransactionStateFromCache(long startTimestamp) { - eventCache.removeTransactionStateFromCache(startTimestamp); - valueCache.removeTransactionState(startTimestamp); + public void removeTransactionStateFromCache(long startTs) { + eventCache.removeTransactionStateFromCache(startTs); + valueCache.ensureStateRemoved(startTs); + } + + @Override + public void onTransactionCommit(long startTs) { + valueCache.onSuccessfulCommit(startTs); + eventCache.removeTransactionStateFromCache(startTs); } @Override diff --git a/lock-api/src/main/java/com/palantir/lock/watch/LockWatchEventCache.java b/lock-api/src/main/java/com/palantir/lock/watch/LockWatchEventCache.java index 0c89d2e800e..e144ffce423 100644 --- a/lock-api/src/main/java/com/palantir/lock/watch/LockWatchEventCache.java +++ b/lock-api/src/main/java/com/palantir/lock/watch/LockWatchEventCache.java @@ -42,7 +42,7 @@ void processGetCommitTimestampsUpdate( Collection transactionUpdates, LockWatchStateUpdate update); /** - * Calculates the {@link CommitUpdate} taking into account all changes to lock watch state since the start of the + * Calculates the {@link CommitUpdate}, taking into account all changes to lock watch state since the start of the * transaction, excluding the transaction's own commit locks. * * @param startTs start timestamp of the transaction @@ -51,16 +51,15 @@ void processGetCommitTimestampsUpdate( CommitUpdate getCommitUpdate(long startTs); /** - * Computes a {@link SpanningCommitUpdate}, which is essentially two regular commit updates. The former is the - * same as the result from {@link LockWatchEventCache#getCommitUpdate(long)}, but the latter also includes locks - * taken out between commit time and the present. This is critical for flushing values to the central cache, as - * the central cache may be on a later version than the commit time of the transaction. + * Similar to {@link LockWatchEventCache#getCommitUpdate(long)}, but returns an update containing all descriptors + * from the start of the transaction until the current version of the cache - this is expected to be a superset + * of descriptors returned from getCommitUpdate. This endpoint is designed to be used for computing values that are + * safe to flush to the central cache. * * @param startTs start timestamp of the transaction - * @return a spanning commit update, which contains the commit update for this transaction, as well as a commit - * update which contains all lock descriptors from the start of the transaction until **now**. + * @return the commit update that encompasses the time from start of the transaction until the present */ - SpanningCommitUpdate getSpanningCommitUpdate(long startTs); + CommitUpdate getEventUpdate(long startTs); /** * Given a set of start timestamps, and a lock watch state version, returns a list of all events that occurred since diff --git a/lock-api/src/main/java/com/palantir/lock/watch/LockWatchValueCache.java b/lock-api/src/main/java/com/palantir/lock/watch/LockWatchValueCache.java index 5c693bd0ff0..9116bad323d 100644 --- a/lock-api/src/main/java/com/palantir/lock/watch/LockWatchValueCache.java +++ b/lock-api/src/main/java/com/palantir/lock/watch/LockWatchValueCache.java @@ -21,7 +21,9 @@ public interface LockWatchValueCache { void processStartTransactions(Set startTimestamps); - void updateCacheOnCommit(Set startTimestamps); + void updateCacheWithCommitTimestampsInformation(Set startTimestamps); - void removeTransactionState(long startTimestamp); + void ensureStateRemoved(long startTimestamp); + + void onSuccessfulCommit(long startTimestamp); } diff --git a/lock-api/src/main/java/com/palantir/lock/watch/NoOpLockWatchEventCache.java b/lock-api/src/main/java/com/palantir/lock/watch/NoOpLockWatchEventCache.java index a5fe7cf0e3f..c52c398f93f 100644 --- a/lock-api/src/main/java/com/palantir/lock/watch/NoOpLockWatchEventCache.java +++ b/lock-api/src/main/java/com/palantir/lock/watch/NoOpLockWatchEventCache.java @@ -59,11 +59,6 @@ public CommitUpdate getCommitUpdate(long startTs) { return CommitUpdate.invalidateAll(); } - @Override - public SpanningCommitUpdate getSpanningCommitUpdate(long startTs) { - return SpanningCommitUpdate.invalidateAll(); - } - @Override public TransactionsLockWatchUpdate getUpdateForTransactions( Set startTimestamps, Optional version) { @@ -77,6 +72,11 @@ public TransactionsLockWatchUpdate getUpdateForTransactions( @Override public void removeTransactionStateFromCache(long startTimestamp) {} + @Override + public CommitUpdate getEventUpdate(long startTs) { + return CommitUpdate.invalidateAll(); + } + private void updateVersion(Optional maybeNewVersion) { currentVersion = maybeNewVersion.map(newVersion -> currentVersion .map(current -> { diff --git a/lock-api/src/main/java/com/palantir/lock/watch/NoOpLockWatchValueCache.java b/lock-api/src/main/java/com/palantir/lock/watch/NoOpLockWatchValueCache.java index 0b3a6c3cef9..6a3738b969c 100644 --- a/lock-api/src/main/java/com/palantir/lock/watch/NoOpLockWatchValueCache.java +++ b/lock-api/src/main/java/com/palantir/lock/watch/NoOpLockWatchValueCache.java @@ -27,8 +27,11 @@ public static LockWatchValueCache create() { public void processStartTransactions(Set startTimestamps) {} @Override - public void updateCacheOnCommit(Set startTimestamps) {} + public void updateCacheWithCommitTimestampsInformation(Set startTimestamps) {} @Override - public void removeTransactionState(long startTimestamp) {} + public void ensureStateRemoved(long startTimestamp) {} + + @Override + public void onSuccessfulCommit(long startTimestamp) {} } diff --git a/lock-api/src/test/java/com/palantir/lock/watch/LockWatchCacheImplTest.java b/lock-api/src/test/java/com/palantir/lock/watch/LockWatchCacheImplTest.java index e3dad2c3fa5..eea5f0a89d3 100644 --- a/lock-api/src/test/java/com/palantir/lock/watch/LockWatchCacheImplTest.java +++ b/lock-api/src/test/java/com/palantir/lock/watch/LockWatchCacheImplTest.java @@ -56,13 +56,13 @@ public void startTransactionsTest() { public void commitTest() { cache.processCommitTimestampsUpdate(UPDATES, SUCCESS); verify(eventCache).processGetCommitTimestampsUpdate(UPDATES, SUCCESS); - verify(valueCache).updateCacheOnCommit(TIMESTAMPS); + verify(valueCache).updateCacheWithCommitTimestampsInformation(TIMESTAMPS); } @Test public void removeTest() { cache.removeTransactionStateFromCache(1L); verify(eventCache).removeTransactionStateFromCache(1L); - verify(valueCache).removeTransactionState(1L); + verify(valueCache).ensureStateRemoved(1L); } } diff --git a/timelock-server/src/integTest/java/com/palantir/atlasdb/timelock/LockWatchValueIntegrationTest.java b/timelock-server/src/integTest/java/com/palantir/atlasdb/timelock/LockWatchValueIntegrationTest.java index 47ccd523fd6..835ce93826a 100644 --- a/timelock-server/src/integTest/java/com/palantir/atlasdb/timelock/LockWatchValueIntegrationTest.java +++ b/timelock-server/src/integTest/java/com/palantir/atlasdb/timelock/LockWatchValueIntegrationTest.java @@ -423,6 +423,54 @@ public void nearbyCommitsDoNotAffectResultsPresentInCache() { .doesNotThrowAnyException(); } + @Test + public void lateAbortingTransactionDoesNotFlushValuesToCentralCache() { + txnManager.runTaskThrowOnConflict(txn -> { + txn.put(TABLE_REF, ImmutableMap.of(CELL_1, DATA_1, CELL_2, DATA_2, CELL_3, DATA_3)); + return null; + }); + + awaitUnlock(); + + AtomicLong startTimestamp = new AtomicLong(-1L); + PreCommitCondition commitFailingCondition = timestamp -> { + if (timestamp != startTimestamp.get()) { + throw new RuntimeException("Transaction failed at commit time"); + } + }; + + assertThatThrownBy( + () -> txnManager.runTaskWithConditionThrowOnConflict(commitFailingCondition, (txn, _unused) -> { + startTimestamp.set(txn.getTimestamp()); + txn.put(TABLE_REF, ImmutableMap.of(CELL_4, DATA_4)); + txn.get(TABLE_REF, ImmutableSet.of(CELL_1, CELL_2, CELL_3)); + return null; + })) + .isInstanceOf(RuntimeException.class); + + awaitUnlock(); + + txnManager.runTaskThrowOnConflict(txn -> { + // Confirm that the previous transaction did not commit writes + assertThat(txn.get(TABLE_REF, ImmutableSet.of(CELL_4))).isEmpty(); + txn.get(TABLE_REF, ImmutableSet.of(CELL_1, CELL_2, CELL_3)); + + assertLoadedValues( + txn, + ImmutableMap.of( + TABLE_CELL_1, + CacheValue.of(DATA_1), + TABLE_CELL_2, + CacheValue.of(DATA_2), + TABLE_CELL_3, + CacheValue.of(DATA_3), + TABLE_CELL_4, + CacheValue.empty())); + assertHitValues(txn, ImmutableSet.of()); + return null; + }); + } + private void simulateOverlappingWriteTransaction( AtomicReference lwCache, long theirStartTimestamp, long theirCommitTimestamp) { LockToken lockToken = LockToken.of(UUID.randomUUID());