Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[LW] Rework value cache commit flow (#5652)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jolyon-S authored Sep 28, 2021
1 parent fdc7f11 commit c9344f5
Show file tree
Hide file tree
Showing 25 changed files with 245 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,27 @@ public interface LockWatchValueScopingCache extends LockWatchValueCache {
void processStartTransactions(Set<Long> startTimestamps);

/**
* This does *not* remove state from the cache - {@link #removeTransactionState(long)} must be called at the end
* This does *not* remove state from the cache - {@link #ensureStateRemoved(long)} must be called at the end
* of the transaction to do so, or else there will be a memory leak.
*/
@Override
void updateCacheOnCommit(Set<Long> startTimestamps);
void updateCacheWithCommitTimestampsInformation(Set<Long> startTimestamps);

/**
* Guarantees that all relevant state for the given transaction has been removed. If the state is already gone (by
* calling {@link LockWatchValueScopingCache#onSuccessfulCommit(long)}, this will be a no-op. Failure to call this
* method may result in memory leaks (particularly for aborting transactions).
*/
@Override
void ensureStateRemoved(long startTs);

/**
* Performs final cleanup for a given transaction (identified by its start timestamp). This removes state associated
* with a transaction, and flushes values to the central value cache. Calling this method before a transaction is
* fully committed may result in incorrect values being cached.
*/
@Override
void removeTransactionState(long startTimestamp);
void onSuccessfulCommit(long startTs);

TransactionScopedCache getTransactionScopedCache(long startTs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ NavigableMap<byte[], RowResult<byte[]>> getRows(

/**
* This method should be called before retrieving the value or hit digest, as it guarantees that no more reads or
* writes will be performed on the cache.
* writes will be performed on the cache. This method is idempotent, and may legitimately be called multiple times.
*/
void finalise();

Expand All @@ -94,14 +94,4 @@ NavigableMap<byte[], RowResult<byte[]>> getRows(
HitDigest getHitDigest();

TransactionScopedCache createReadOnlyCache(CommitUpdate commitUpdate);

/**
* Checks if any values have been read remotely and stored locally for later flushing to the central cache. Note
* that this method **will** finalise the cache in order to retrieve the digest; no further reads or writes may
* be performed once this is called.
*/
default boolean hasUpdates() {
finalise();
return !getValueDigest().loadedValues().isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public abstract class LockWatchManagerInternal extends LockWatchManager implemen

public abstract void removeTransactionStateFromCache(long startTs);

public abstract void onTransactionCommit(long startTs);

public abstract TransactionScopedCache getTransactionScopedCache(long startTs);

public abstract TransactionScopedCache getReadOnlyTransactionScopedCache(long startTs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ public void removeTransactionStateFromCache(long startTs) {
cache.removeTransactionStateFromCache(startTs);
}

@Override
public void onTransactionCommit(long startTs) {
cache.onTransactionCommit(startTs);
}

@Override
TransactionsLockWatchUpdate getUpdateForTransactions(
Set<Long> startTimestamps, Optional<LockWatchVersion> version) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@
import com.palantir.atlasdb.transaction.api.TransactionLockWatchFailedException;
import com.palantir.common.streams.KeyedStream;
import com.palantir.lock.LockDescriptor;
import com.palantir.lock.watch.CommitUpdate.Visitor;
import com.palantir.lock.watch.CommitUpdate;
import com.palantir.lock.watch.LockWatchEvent;
import com.palantir.lock.watch.LockWatchEventCache;
import com.palantir.lock.watch.LockWatchVersion;
import com.palantir.lock.watch.SpanningCommitUpdate;
import com.palantir.lock.watch.TransactionsLockWatchUpdate;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -100,28 +99,19 @@ public synchronized void processStartTransactions(Set<Long> startTimestamps) {
}

@Override
public synchronized void updateCacheOnCommit(Set<Long> startTimestamps) {
public synchronized void updateCacheWithCommitTimestampsInformation(Set<Long> startTimestamps) {
startTimestamps.forEach(this::processCommitUpdate);
}

@Override
public synchronized void removeTransactionState(long startTimestamp) {
public synchronized void ensureStateRemoved(long startTimestamp) {
StartTimestamp startTs = StartTimestamp.of(startTimestamp);
snapshotStore.removeTimestamp(startTs);
cacheStore.removeCache(startTs);
}

@Override
public TransactionScopedCache getTransactionScopedCache(long startTs) {
return cacheStore.getCache(StartTimestamp.of(startTs));
}

@Override
public TransactionScopedCache getReadOnlyTransactionScopedCacheForCommit(long startTs) {
return cacheStore.getReadOnlyCache(StartTimestamp.of(startTs));
}

private synchronized void processCommitUpdate(long startTimestamp) {
public synchronized void onSuccessfulCommit(long startTimestamp) {
StartTimestamp startTs = StartTimestamp.of(startTimestamp);
TransactionScopedCache cache = cacheStore.getCache(startTs);
cache.finalise();
Expand All @@ -131,10 +121,8 @@ private synchronized void processCommitUpdate(long startTimestamp) {
return;
}

SpanningCommitUpdate spanningCommitUpdate = eventCache.getSpanningCommitUpdate(startTimestamp);
cacheStore.createReadOnlyCache(startTs, spanningCommitUpdate.transactionCommitUpdate());

spanningCommitUpdate.spanningCommitUpdate().accept(new Visitor<Void>() {
CommitUpdate commitUpdate = eventCache.getEventUpdate(startTimestamp);
commitUpdate.accept(new CommitUpdate.Visitor<Void>() {
@Override
public Void invalidateAll() {
// This might happen due to an election or if we exceeded the maximum number of events held in
Expand All @@ -155,6 +143,31 @@ public Void invalidateSome(Set<LockDescriptor> invalidatedLocks) {
return null;
}
});
ensureStateRemoved(startTimestamp);
}

/**
* Retrieval of transaction scoped caches (read-only or otherwise) does not need to be synchronised. The main
* reason for this is that the only race condition that could conceivably occur is for the state to not exist here
* when it should. However, in all of these cases, a no-op cache is used instead, and thus the transaction will
* simply go ahead but with caching disabled (which is guaranteed to be safe).
*/
@Override
public TransactionScopedCache getTransactionScopedCache(long startTs) {
return cacheStore.getCache(StartTimestamp.of(startTs));
}

@Override
public TransactionScopedCache getReadOnlyTransactionScopedCacheForCommit(long startTs) {
return cacheStore.getReadOnlyCache(StartTimestamp.of(startTs));
}

private synchronized void processCommitUpdate(long startTimestamp) {
StartTimestamp startTs = StartTimestamp.of(startTimestamp);
TransactionScopedCache cache = cacheStore.getCache(startTs);
cache.finalise();
CommitUpdate commitUpdate = eventCache.getCommitUpdate(startTimestamp);
cacheStore.createReadOnlyCache(startTs, commitUpdate);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.palantir.lock.watch.LockWatchCreatedEvent;
import com.palantir.lock.watch.LockWatchEvent;
import com.palantir.lock.watch.LockWatchVersion;
import com.palantir.lock.watch.SpanningCommitUpdate;
import com.palantir.lock.watch.TransactionsLockWatchUpdate;
import com.palantir.lock.watch.UnlockEvent;
import com.palantir.logsafe.SafeArg;
Expand Down Expand Up @@ -81,31 +80,20 @@ each version starting with the client version (exclusive) and ending with latest
.build();
}

default SpanningCommitUpdate toSpanningCommitUpdate(
LockWatchVersion startVersion, CommitInfo commitInfo, LockWatchVersion endVersion) {
default CommitUpdate toCommitUpdate(
LockWatchVersion startVersion, LockWatchVersion endVersion, Optional<CommitInfo> commitInfo) {
if (clearCache()) {
return SpanningCommitUpdate.invalidateAll();
return CommitUpdate.invalidateAll();
}

// We want to ensure that we do not miss any versions, but we do not care about the event with the same version
// as the start version.
verifyReturnedEventsEnclosesTransactionVersions(startVersion.version() + 1, endVersion.version());

LockEventVisitor eventVisitor = new LockEventVisitor(commitInfo.commitLockToken());
Set<LockDescriptor> transactionLocks = new HashSet<>();
Set<LockDescriptor> spanningLocks = new HashSet<>();
events().events().forEach(event -> {
Set<LockDescriptor> descriptors = event.accept(eventVisitor);
spanningLocks.addAll(descriptors);

if (event.sequence() <= commitInfo.commitVersion().version()) {
transactionLocks.addAll(descriptors);
}
});
return SpanningCommitUpdate.builder()
.transactionCommitUpdate(CommitUpdate.invalidateSome(transactionLocks))
.spanningCommitUpdate(CommitUpdate.invalidateSome(spanningLocks))
.build();
LockEventVisitor eventVisitor = new LockEventVisitor(commitInfo.map(CommitInfo::commitLockToken));
Set<LockDescriptor> locksTakenOut = new HashSet<>();
events().events().forEach(event -> locksTakenOut.addAll(event.accept(eventVisitor)));
return CommitUpdate.invalidateSome(locksTakenOut);
}

default void verifyReturnedEventsEnclosesTransactionVersions(long lowerBound, long upperBound) {
Expand All @@ -130,13 +118,11 @@ class Builder extends ImmutableClientLogEvents.Builder {}
final class LockEventVisitor implements LockWatchEvent.Visitor<Set<LockDescriptor>> {
private final Optional<UUID> commitRequestId;

private LockEventVisitor(LockToken commitLocksToken) {
if (commitLocksToken instanceof LeasedLockToken) {
commitRequestId = Optional.of(
((LeasedLockToken) commitLocksToken).serverToken().getRequestId());
} else {
commitRequestId = Optional.empty();
}
private LockEventVisitor(Optional<LockToken> commitLocksToken) {
commitRequestId = commitLocksToken
.filter(lockToken -> lockToken instanceof LeasedLockToken)
.map(lockToken ->
((LeasedLockToken) lockToken).serverToken().getRequestId());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.palantir.lock.watch.LockWatchStateUpdate;
import com.palantir.lock.watch.LockWatchVersion;
import com.palantir.lock.watch.NoOpLockWatchEventCache;
import com.palantir.lock.watch.SpanningCommitUpdate;
import com.palantir.lock.watch.TransactionUpdate;
import com.palantir.lock.watch.TransactionsLockWatchUpdate;
import java.util.Collection;
Expand Down Expand Up @@ -81,29 +80,39 @@ public synchronized void processGetCommitTimestampsUpdate(

@Override
public synchronized CommitUpdate getCommitUpdate(long startTs) {
return getSpanningCommitUpdate(startTs).transactionCommitUpdate();
Optional<LockWatchVersion> startVersion = timestampStateStore.getStartVersion(startTs);
Optional<CommitInfo> maybeCommitInfo = timestampStateStore.getCommitInfo(startTs);

assertTrue(
maybeCommitInfo.isPresent() && startVersion.isPresent(),
"start or commit info not processed for start timestamp");

CommitInfo commitInfo = maybeCommitInfo.get();

VersionBounds versionBounds = VersionBounds.builder()
.startVersion(startVersion)
.endVersion(commitInfo.commitVersion())
.build();

return eventLog.getEventsBetweenVersions(versionBounds)
.toCommitUpdate(startVersion.get(), commitInfo.commitVersion(), maybeCommitInfo);
}

@Override
public SpanningCommitUpdate getSpanningCommitUpdate(long startTs) {
public CommitUpdate getEventUpdate(long startTs) {
Optional<LockWatchVersion> startVersion = timestampStateStore.getStartVersion(startTs);
Optional<CommitInfo> maybeCommitInfo = timestampStateStore.getCommitInfo(startTs);
Optional<LockWatchVersion> currentVersion = eventLog.getLatestKnownVersion();

assertTrue(
maybeCommitInfo.isPresent()
&& startVersion.isPresent()
&& eventLog.getLatestKnownVersion().isPresent(),
"start or commit info not processed for start timestamp, or current version missing");
currentVersion.isPresent() && startVersion.isPresent(), "essential information missing for timestamp");

LockWatchVersion currentVersion = eventLog.getLatestKnownVersion().get();
CommitInfo commitInfo = maybeCommitInfo.get();
VersionBounds versionBounds = VersionBounds.builder()
.startVersion(startVersion)
.endVersion(currentVersion)
.endVersion(currentVersion.get())
.build();

return eventLog.getEventsBetweenVersions(versionBounds)
.toSpanningCommitUpdate(startVersion.get(), commitInfo, currentVersion);
.toCommitUpdate(startVersion.get(), currentVersion.get(), Optional.empty());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ public void removeTransactionStateFromCache(long startTs) {
lockWatchCache.removeTransactionStateFromCache(startTs);
}

@Override
public void onTransactionCommit(long startTs) {
lockWatchCache.onTransactionCommit(startTs);
}

@Override
public TransactionScopedCache getTransactionScopedCache(long startTs) {
return valueScopingCache.getTransactionScopedCache(startTs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
Expand Down Expand Up @@ -1819,15 +1818,6 @@ private void commitWrites(TransactionService transactionService) {
if (validationNecessaryForInvolvedTablesOnCommit()) {
throwIfImmutableTsOrCommitLocksExpired(null);
}

// if the cache has been used, we must work out which values can be flushed to the central cache by
// obtaining a commit update, which is obtained via the get commit timestamp request.
if (getCache().hasUpdates()) {
timedAndTraced(
"getCommitTimestampForCaching",
() -> timelockService.getCommitTimestamp(
getStartTimestamp(), LockToken.of(UUID.randomUUID())));
}
return;
}
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ public List<OpenTransaction> startTransactions(List<? extends PreCommitCondition

Transaction transaction = createTransaction(
immutableTs, startTimestampSupplier, immutableTsLock, condition);
transaction.onSuccess(
() -> lockWatchManager.onTransactionCommit(transaction.getTimestamp()));
return new OpenTransactionImpl(transaction, immutableTsLock);
})
.collect(Collectors.toList());
Expand Down
Loading

0 comments on commit c9344f5

Please sign in to comment.