Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
[LW] Add additional fail-safe mechanisms around stored state (#5658)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jolyon-S authored Sep 27, 2021
1 parent f996f9f commit 30cdf93
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package com.palantir.atlasdb.keyvalue.api.cache;

import com.palantir.atlasdb.keyvalue.api.watch.StartTimestamp;
import com.palantir.atlasdb.transaction.api.TransactionFailedRetriableException;
import com.palantir.lock.watch.CommitUpdate;
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -56,10 +58,7 @@ final class CacheStoreImpl implements CacheStore {

@Override
public void createCache(StartTimestamp timestamp) {
if (cacheMap.size() > maxCacheCount) {
throw new TransactionFailedRetriableException(
"Exceeded maximum concurrent caches; transaction can be retried, but with caching disabled");
}
validateStateSize();

cacheMap.computeIfAbsent(timestamp, key -> snapshotStore
.getSnapshot(key)
Expand Down Expand Up @@ -98,6 +97,19 @@ public TransactionScopedCache getReadOnlyCache(StartTimestamp timestamp) {
.orElseGet(() -> NoOpTransactionScopedCache.create().createReadOnlyCache(CommitUpdate.invalidateAll()));
}

private void validateStateSize() {
if (cacheMap.size() > maxCacheCount) {
log.warn(
"Transaction cache store has exceeded maximum concurrent caches. This likely indicates a memory"
+ " leak",
SafeArg.of("cacheMapSize", cacheMap.size()),
SafeArg.of("maxCacheCount", maxCacheCount),
SafeArg.of("earliestTimestamp", cacheMap.keySet().stream().min(Comparator.naturalOrder())));
throw new SafeIllegalStateException(
"Exceeded maximum concurrent caches; transaction can be retried, but with caching disabled");
}
}

private Optional<Caches> getCacheInternal(StartTimestamp timestamp) {
return Optional.ofNullable(cacheMap.get(timestamp));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

@ThreadSafe
public final class LockWatchValueScopingCacheImpl implements LockWatchValueScopingCache {
private static final int MAX_CACHE_COUNT = 100_000;
private static final int MAX_CACHE_COUNT = 20_000;
private final LockWatchEventCache eventCache;
private final CacheStore cacheStore;
private final ValueStore valueStore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,22 @@
import com.google.common.collect.SetMultimap;
import com.palantir.atlasdb.keyvalue.api.watch.Sequence;
import com.palantir.atlasdb.keyvalue.api.watch.StartTimestamp;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import javax.annotation.concurrent.NotThreadSafe;

@NotThreadSafe
final class SnapshotStoreImpl implements SnapshotStore {
private static final SafeLogger log = SafeLoggerFactory.get(SnapshotStoreImpl.class);
private static final int MAXIMUM_SIZE = 20_000;

private final Map<Sequence, ValueCacheSnapshot> snapshotMap;
private final SetMultimap<Sequence, StartTimestamp> liveSequences;
private final Map<StartTimestamp, Sequence> timestampMap;
Expand All @@ -40,6 +48,8 @@ final class SnapshotStoreImpl implements SnapshotStore {

@Override
public void storeSnapshot(Sequence sequence, Collection<StartTimestamp> timestamps, ValueCacheSnapshot snapshot) {
validateStateSize();

if (!timestamps.isEmpty()) {
snapshotMap.put(sequence, snapshot);
timestamps.forEach(timestamp -> {
Expand Down Expand Up @@ -75,4 +85,26 @@ public void reset() {
public Optional<ValueCacheSnapshot> getSnapshotForSequence(Sequence sequence) {
return Optional.ofNullable(snapshotMap.get(sequence));
}

private void validateStateSize() {
if (snapshotMap.size() > MAXIMUM_SIZE
|| liveSequences.size() > MAXIMUM_SIZE
|| timestampMap.size() > MAXIMUM_SIZE) {
log.warn(
"Snapshot store has exceeded its maximum size. This likely indicates a memory leak.",
SafeArg.of("snapshotMapSize", snapshotMap.size()),
SafeArg.of("liveSequencesSize", liveSequences.size()),
SafeArg.of("timestampMapSize", timestampMap.size()),
SafeArg.of(
"earliestTimestamp", timestampMap.keySet().stream().min(Comparator.naturalOrder())),
SafeArg.of(
"earliestSnapshotSequence",
snapshotMap.keySet().stream().min(Comparator.naturalOrder())),
SafeArg.of(
"earliestLiveSequence",
liveSequences.keySet().stream().min(Comparator.naturalOrder())),
SafeArg.of("maximumSize", MAXIMUM_SIZE));
throw new SafeIllegalStateException("Exceeded max snapshot store size");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

@ThreadSafe
public final class LockWatchEventCacheImpl implements LockWatchEventCache {
// The minimum number of events should be the same as Timelocks' LockEventLogImpl.
// The minimum number of events should be the same as Timelock's LockEventLogImpl.
private static final int MIN_EVENTS = 1000;
private static final int MAX_EVENTS = 10_000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
import com.palantir.lock.watch.LockWatchVersion;
import com.palantir.lock.watch.TransactionUpdate;
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import java.util.Collection;
import java.util.NavigableMap;
import java.util.Optional;
Expand All @@ -36,10 +40,14 @@

@NotThreadSafe
final class TimestampStateStore {
private static final SafeLogger log = SafeLoggerFactory.get(TimestampStateStore.class);
private static final int MAXIMUM_SIZE = 20_000;
private final NavigableMap<StartTimestamp, MapEntry> timestampMap = new TreeMap<>();
private final SortedSetMultimap<Sequence, StartTimestamp> livingVersions = TreeMultimap.create();

void putStartTimestamps(Collection<Long> startTimestamps, LockWatchVersion version) {
validateStateSize();

startTimestamps.stream().map(StartTimestamp::of).forEach(startTimestamp -> {
MapEntry previous = timestampMap.putIfAbsent(startTimestamp, MapEntry.of(version));
Preconditions.checkArgument(previous == null, "Start timestamp already present in map");
Expand Down Expand Up @@ -93,10 +101,24 @@ TimestampStateStoreState getStateForTesting() {
.build();
}

public Optional<Sequence> getEarliestLiveSequence() {
Optional<Sequence> getEarliestLiveSequence() {
return Optional.ofNullable(Iterables.getFirst(livingVersions.keySet(), null));
}

private void validateStateSize() {
if (timestampMap.size() > MAXIMUM_SIZE || livingVersions.size() > MAXIMUM_SIZE) {
log.warn(
"Timestamp state store has exceeded its maximum size. This likely indicates a memory leak",
SafeArg.of("timestampMapSize", timestampMap.size()),
SafeArg.of("livingVersionsSize", livingVersions.size()),
SafeArg.of("maximumSize", MAXIMUM_SIZE),
SafeArg.of("minimumLiveTimestamp", timestampMap.firstEntry()),
SafeArg.of("maximumLiveTimestamp", timestampMap.lastEntry()),
SafeArg.of("minimumLiveVersion", getEarliestLiveSequence()));
throw new SafeIllegalStateException("Exceeded maximum timestamp state store size");
}
}

@Value.Immutable
@JsonDeserialize(as = ImmutableMapEntry.class)
@JsonSerialize(as = ImmutableMapEntry.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.google.common.collect.ImmutableSet;
import com.palantir.atlasdb.keyvalue.api.watch.Sequence;
import com.palantir.atlasdb.keyvalue.api.watch.StartTimestamp;
import com.palantir.atlasdb.transaction.api.TransactionFailedRetriableException;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import io.vavr.collection.HashMap;
import io.vavr.collection.HashSet;
import org.junit.Test;
Expand Down Expand Up @@ -87,7 +87,7 @@ public void cachesExceedingMaximumCountThrows() {
cacheStore.createCache(TIMESTAMP_1);
cacheStore.createCache(timestamp);
assertThatThrownBy(() -> cacheStore.createCache(TIMESTAMP_2))
.isExactlyInstanceOf(TransactionFailedRetriableException.class)
.isExactlyInstanceOf(SafeIllegalStateException.class)
.hasMessage("Exceeded maximum concurrent caches; transaction can be retried, but with caching "
+ "disabled");
assertThat(getTransactionCacheInstanceCount()).isEqualTo(2);
Expand Down
6 changes: 6 additions & 0 deletions changelog/@unreleased/pr-5658.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
type: improvement
improvement:
description: Add additional failsafe mechanisms around the size of lock watch internal
state - this will cause the cache to fallback if it exceeds a certain size.
links:
- https://github.com/palantir/atlasdb/pull/5658

0 comments on commit 30cdf93

Please sign in to comment.