diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/CacheStoreImpl.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/CacheStoreImpl.java index 32439774e0f..d119a94bd66 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/CacheStoreImpl.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/CacheStoreImpl.java @@ -17,11 +17,13 @@ package com.palantir.atlasdb.keyvalue.api.cache; import com.palantir.atlasdb.keyvalue.api.watch.StartTimestamp; -import com.palantir.atlasdb.transaction.api.TransactionFailedRetriableException; import com.palantir.lock.watch.CommitUpdate; import com.palantir.logsafe.Preconditions; +import com.palantir.logsafe.SafeArg; +import com.palantir.logsafe.exceptions.SafeIllegalStateException; import com.palantir.logsafe.logger.SafeLogger; import com.palantir.logsafe.logger.SafeLoggerFactory; +import java.util.Comparator; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; @@ -56,10 +58,7 @@ final class CacheStoreImpl implements CacheStore { @Override public void createCache(StartTimestamp timestamp) { - if (cacheMap.size() > maxCacheCount) { - throw new TransactionFailedRetriableException( - "Exceeded maximum concurrent caches; transaction can be retried, but with caching disabled"); - } + validateStateSize(); cacheMap.computeIfAbsent(timestamp, key -> snapshotStore .getSnapshot(key) @@ -98,6 +97,19 @@ public TransactionScopedCache getReadOnlyCache(StartTimestamp timestamp) { .orElseGet(() -> NoOpTransactionScopedCache.create().createReadOnlyCache(CommitUpdate.invalidateAll())); } + private void validateStateSize() { + if (cacheMap.size() > maxCacheCount) { + log.warn( + "Transaction cache store has exceeded maximum concurrent caches. This likely indicates a memory" + + " leak", + SafeArg.of("cacheMapSize", cacheMap.size()), + SafeArg.of("maxCacheCount", maxCacheCount), + SafeArg.of("earliestTimestamp", cacheMap.keySet().stream().min(Comparator.naturalOrder()))); + throw new SafeIllegalStateException( + "Exceeded maximum concurrent caches; transaction can be retried, but with caching disabled"); + } + } + private Optional getCacheInternal(StartTimestamp timestamp) { return Optional.ofNullable(cacheMap.get(timestamp)); } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/LockWatchValueScopingCacheImpl.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/LockWatchValueScopingCacheImpl.java index 3032feafafb..f07fc8b7854 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/LockWatchValueScopingCacheImpl.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/LockWatchValueScopingCacheImpl.java @@ -44,7 +44,7 @@ @ThreadSafe public final class LockWatchValueScopingCacheImpl implements LockWatchValueScopingCache { - private static final int MAX_CACHE_COUNT = 100_000; + private static final int MAX_CACHE_COUNT = 20_000; private final LockWatchEventCache eventCache; private final CacheStore cacheStore; private final ValueStore valueStore; diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/SnapshotStoreImpl.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/SnapshotStoreImpl.java index 5f056fa0092..b465bebd31e 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/SnapshotStoreImpl.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/cache/SnapshotStoreImpl.java @@ -20,7 +20,12 @@ import com.google.common.collect.SetMultimap; import com.palantir.atlasdb.keyvalue.api.watch.Sequence; import com.palantir.atlasdb.keyvalue.api.watch.StartTimestamp; +import com.palantir.logsafe.SafeArg; +import com.palantir.logsafe.exceptions.SafeIllegalStateException; +import com.palantir.logsafe.logger.SafeLogger; +import com.palantir.logsafe.logger.SafeLoggerFactory; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -28,6 +33,9 @@ @NotThreadSafe final class SnapshotStoreImpl implements SnapshotStore { + private static final SafeLogger log = SafeLoggerFactory.get(SnapshotStoreImpl.class); + private static final int MAXIMUM_SIZE = 20_000; + private final Map snapshotMap; private final SetMultimap liveSequences; private final Map timestampMap; @@ -40,6 +48,8 @@ final class SnapshotStoreImpl implements SnapshotStore { @Override public void storeSnapshot(Sequence sequence, Collection timestamps, ValueCacheSnapshot snapshot) { + validateStateSize(); + if (!timestamps.isEmpty()) { snapshotMap.put(sequence, snapshot); timestamps.forEach(timestamp -> { @@ -75,4 +85,26 @@ public void reset() { public Optional getSnapshotForSequence(Sequence sequence) { return Optional.ofNullable(snapshotMap.get(sequence)); } + + private void validateStateSize() { + if (snapshotMap.size() > MAXIMUM_SIZE + || liveSequences.size() > MAXIMUM_SIZE + || timestampMap.size() > MAXIMUM_SIZE) { + log.warn( + "Snapshot store has exceeded its maximum size. This likely indicates a memory leak.", + SafeArg.of("snapshotMapSize", snapshotMap.size()), + SafeArg.of("liveSequencesSize", liveSequences.size()), + SafeArg.of("timestampMapSize", timestampMap.size()), + SafeArg.of( + "earliestTimestamp", timestampMap.keySet().stream().min(Comparator.naturalOrder())), + SafeArg.of( + "earliestSnapshotSequence", + snapshotMap.keySet().stream().min(Comparator.naturalOrder())), + SafeArg.of( + "earliestLiveSequence", + liveSequences.keySet().stream().min(Comparator.naturalOrder())), + SafeArg.of("maximumSize", MAXIMUM_SIZE)); + throw new SafeIllegalStateException("Exceeded max snapshot store size"); + } + } } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchEventCacheImpl.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchEventCacheImpl.java index dcac1136426..cb3b1e459f5 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchEventCacheImpl.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchEventCacheImpl.java @@ -36,7 +36,7 @@ @ThreadSafe public final class LockWatchEventCacheImpl implements LockWatchEventCache { - // The minimum number of events should be the same as Timelocks' LockEventLogImpl. + // The minimum number of events should be the same as Timelock's LockEventLogImpl. private static final int MIN_EVENTS = 1000; private static final int MAX_EVENTS = 10_000; diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/TimestampStateStore.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/TimestampStateStore.java index 78d3a73606f..a55fec69135 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/TimestampStateStore.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/TimestampStateStore.java @@ -27,6 +27,10 @@ import com.palantir.lock.watch.LockWatchVersion; import com.palantir.lock.watch.TransactionUpdate; import com.palantir.logsafe.Preconditions; +import com.palantir.logsafe.SafeArg; +import com.palantir.logsafe.exceptions.SafeIllegalStateException; +import com.palantir.logsafe.logger.SafeLogger; +import com.palantir.logsafe.logger.SafeLoggerFactory; import java.util.Collection; import java.util.NavigableMap; import java.util.Optional; @@ -36,10 +40,14 @@ @NotThreadSafe final class TimestampStateStore { + private static final SafeLogger log = SafeLoggerFactory.get(TimestampStateStore.class); + private static final int MAXIMUM_SIZE = 20_000; private final NavigableMap timestampMap = new TreeMap<>(); private final SortedSetMultimap livingVersions = TreeMultimap.create(); void putStartTimestamps(Collection startTimestamps, LockWatchVersion version) { + validateStateSize(); + startTimestamps.stream().map(StartTimestamp::of).forEach(startTimestamp -> { MapEntry previous = timestampMap.putIfAbsent(startTimestamp, MapEntry.of(version)); Preconditions.checkArgument(previous == null, "Start timestamp already present in map"); @@ -93,10 +101,24 @@ TimestampStateStoreState getStateForTesting() { .build(); } - public Optional getEarliestLiveSequence() { + Optional getEarliestLiveSequence() { return Optional.ofNullable(Iterables.getFirst(livingVersions.keySet(), null)); } + private void validateStateSize() { + if (timestampMap.size() > MAXIMUM_SIZE || livingVersions.size() > MAXIMUM_SIZE) { + log.warn( + "Timestamp state store has exceeded its maximum size. This likely indicates a memory leak", + SafeArg.of("timestampMapSize", timestampMap.size()), + SafeArg.of("livingVersionsSize", livingVersions.size()), + SafeArg.of("maximumSize", MAXIMUM_SIZE), + SafeArg.of("minimumLiveTimestamp", timestampMap.firstEntry()), + SafeArg.of("maximumLiveTimestamp", timestampMap.lastEntry()), + SafeArg.of("minimumLiveVersion", getEarliestLiveSequence())); + throw new SafeIllegalStateException("Exceeded maximum timestamp state store size"); + } + } + @Value.Immutable @JsonDeserialize(as = ImmutableMapEntry.class) @JsonSerialize(as = ImmutableMapEntry.class) diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/cache/CacheStoreImplTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/cache/CacheStoreImplTest.java index 583547bcdc3..1570fb4410e 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/cache/CacheStoreImplTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/keyvalue/api/cache/CacheStoreImplTest.java @@ -25,7 +25,7 @@ import com.google.common.collect.ImmutableSet; import com.palantir.atlasdb.keyvalue.api.watch.Sequence; import com.palantir.atlasdb.keyvalue.api.watch.StartTimestamp; -import com.palantir.atlasdb.transaction.api.TransactionFailedRetriableException; +import com.palantir.logsafe.exceptions.SafeIllegalStateException; import io.vavr.collection.HashMap; import io.vavr.collection.HashSet; import org.junit.Test; @@ -87,7 +87,7 @@ public void cachesExceedingMaximumCountThrows() { cacheStore.createCache(TIMESTAMP_1); cacheStore.createCache(timestamp); assertThatThrownBy(() -> cacheStore.createCache(TIMESTAMP_2)) - .isExactlyInstanceOf(TransactionFailedRetriableException.class) + .isExactlyInstanceOf(SafeIllegalStateException.class) .hasMessage("Exceeded maximum concurrent caches; transaction can be retried, but with caching " + "disabled"); assertThat(getTransactionCacheInstanceCount()).isEqualTo(2); diff --git a/changelog/@unreleased/pr-5658.v2.yml b/changelog/@unreleased/pr-5658.v2.yml new file mode 100644 index 00000000000..5dd62cda504 --- /dev/null +++ b/changelog/@unreleased/pr-5658.v2.yml @@ -0,0 +1,6 @@ +type: improvement +improvement: + description: Add additional failsafe mechanisms around the size of lock watch internal + state - this will cause the cache to fallback if it exceeds a certain size. + links: + - https://github.com/palantir/atlasdb/pull/5658