From 28cf786d1268d24a67351b017a943e7d3e10658c Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 31 May 2024 08:31:42 +0300 Subject: [PATCH] [improve][ml] Improve RangeCache refactoring: test race conditions and prevent endless loops --- .../bookkeeper/mledger/util/RangeCache.java | 84 ++++++++++++++----- .../mledger/util/RangeCacheTest.java | 33 +++++++- 2 files changed, 95 insertions(+), 22 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java index 46d03bea1b5ad9..ff30c1e7827d81 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java @@ -32,21 +32,26 @@ import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicLong; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.util.RangeCache.ValueWithKeyValidation; import org.apache.commons.lang3.tuple.Pair; /** * Special type of cache where get() and delete() operations can be done over a range of keys. - * The implementation avoids locks and synchronization and relies on ConcurrentSkipListMap for storing the entries. - * Since there is no locks, there is a need to have a way to ensure that a single entry in the cache is removed - * exactly once. Removing an entry multiple times would result in the entries of the cache getting released too - * while they could still be in use. + * The implementation avoids locks and synchronization by relying on ConcurrentSkipListMap for storing the entries. + * Since there are no locks, it's necessary to ensure that a single entry in the cache is removed exactly once. + * Removing an entry multiple times could result in the entries of the cache being released multiple times, + * even while they are still in use. This is prevented by using a custom wrapper around the value to store in the map + * that ensures that the value is removed from the map only if the exact same instance is present in the map. + * There's also a check that ensures that the value matches the key. This is used to detect races without impacting + * consistency. * * @param * Cache key. Needs to be Comparable * @param * Cache value */ +@Slf4j public class RangeCache, Value extends ValueWithKeyValidation> { public interface ValueWithKeyValidation extends ReferenceCounted { boolean matchesKey(T key); @@ -268,7 +273,7 @@ public Pair removeRange(Key first, Key last, boolean lastInclusiv RemovalCounters counters = RemovalCounters.create(); Map> subMap = entries.subMap(first, true, last, lastInclusive); for (Map.Entry> entry : subMap.entrySet()) { - removeEntry(entry, counters); + removeEntry(entry, counters, true); } return handleRemovalResult(counters); } @@ -279,16 +284,23 @@ enum RemoveEntryResult { BREAK_LOOP; } - private RemoveEntryResult removeEntry(Map.Entry> entry, RemovalCounters counters) { - return removeEntry(entry, counters, (x) -> true); + private RemoveEntryResult removeEntry(Map.Entry> entry, RemovalCounters counters, + boolean skipInvalid) { + return removeEntry(entry, counters, skipInvalid, x -> true); } private RemoveEntryResult removeEntry(Map.Entry> entry, RemovalCounters counters, - Predicate removeCondition) { + boolean skipInvalid, Predicate removeCondition) { Key key = entry.getKey(); IdentityWrapper identityWrapper = entry.getValue(); if (identityWrapper.getKey() != key) { - // the wrapper has been recycled and contains another key + // the wrapper has already been recycled and contains another key + if (!skipInvalid) { + // log and remove the entry without releasing the value + log.info("Key {} does not match the entry's value wrapper's key {}, removing entry by key without " + + "releasing the value", key, identityWrapper.getKey()); + entries.remove(key); + } return RemoveEntryResult.CONTINUE_LOOP; } Value value = identityWrapper.getValue(); @@ -297,16 +309,49 @@ private RemoveEntryResult removeEntry(Map.Entry value.retain(); } catch (IllegalReferenceCountException e) { // Value was already released + if (!skipInvalid) { + // remove the specific entry without releasing the value + log.info("Value was already released for key {}, removing entry without releasing the value", key); + entries.remove(key, identityWrapper); + } return RemoveEntryResult.CONTINUE_LOOP; } try { if (!removeCondition.test(value)) { return RemoveEntryResult.BREAK_LOOP; } - // check that the value hasn't been recycled in between - // there should be at least 2 references since this method adds one and the cache should have one - // it is valid that the value contains references even after the key has been removed from the cache - if (value.refCnt() > 1 && value.matchesKey(key) && entries.remove(key, identityWrapper)) { + if (!skipInvalid) { + // remove the specific entry + boolean entryRemoved = entries.remove(key, identityWrapper); + if (entryRemoved) { + boolean matchesKey = value.matchesKey(key); + if (matchesKey) { + counters.entryRemoved(weighter.getSize(value)); + // check that the value hasn't been recycled in between + // there should be at least 2 references since this method adds one and the cache should have + // one reference. it is valid that the value contains references even after the key has been + // removed from the cache + if (value.refCnt() > 1) { + identityWrapper.recycle(); + // remove the cache reference + value.release(); + } else { + log.info("Unexpected refCnt {} for key {}, removed entry without releasing the value", + value.refCnt(), key); + return RemoveEntryResult.CONTINUE_LOOP; + } + } else { + // we don't know the size (weight) of the value + counters.entryRemoved(0); + log.info("Value {} does not match the key {}, removed entry without releasing the value", value, + key); + return RemoveEntryResult.CONTINUE_LOOP; + } + } + } else if (skipInvalid && value.refCnt() > 1 && value.matchesKey(key) + && entries.remove(key, identityWrapper)) { + // when skipInvalid is true, we don't remove the entry if it doesn't match matches the key + // or the refCnt is invalid identityWrapper.recycle(); counters.entryRemoved(weighter.getSize(value)); // remove the cache reference @@ -334,12 +379,12 @@ private Pair handleRemovalResult(RemovalCounters counters) { public Pair evictLeastAccessedEntries(long minSize) { checkArgument(minSize > 0); RemovalCounters counters = RemovalCounters.create(); - while (counters.removedSize < minSize) { + while (counters.removedSize < minSize && !Thread.currentThread().isInterrupted()) { Map.Entry> entry = entries.firstEntry(); if (entry == null) { break; } - removeEntry(entry, counters); + removeEntry(entry, counters, false); } return handleRemovalResult(counters); } @@ -351,12 +396,12 @@ public Pair evictLeastAccessedEntries(long minSize) { */ public Pair evictLEntriesBeforeTimestamp(long maxTimestamp) { RemovalCounters counters = RemovalCounters.create(); - while (true) { + while (!Thread.currentThread().isInterrupted()) { Map.Entry> entry = entries.firstEntry(); if (entry == null) { break; } - if (removeEntry(entry, counters, value -> timestampExtractor.getTimestamp(value) <= maxTimestamp) + if (removeEntry(entry, counters, false, value -> timestampExtractor.getTimestamp(value) <= maxTimestamp) == RemoveEntryResult.BREAK_LOOP) { break; } @@ -382,12 +427,12 @@ public long getSize() { */ public Pair clear() { RemovalCounters counters = RemovalCounters.create(); - while (true) { + while (!Thread.currentThread().isInterrupted()) { Map.Entry> entry = entries.firstEntry(); if (entry == null) { break; } - removeEntry(entry, counters); + removeEntry(entry, counters, false); } return handleRemovalResult(counters); } @@ -421,5 +466,4 @@ public long getSize(Value value) { return 1; } } - } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java index 01b3c67bf1113a..47fe45e9765161 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java @@ -30,11 +30,14 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import lombok.Data; import org.apache.commons.lang3.tuple.Pair; +import org.awaitility.Awaitility; import org.testng.annotations.Test; public class RangeCacheTest { + @Data class RefString extends AbstractReferenceCounted implements RangeCache.ValueWithKeyValidation { String s; Integer matchingKey; @@ -288,15 +291,19 @@ public void evictions() { @Test public void testPutWhileClearIsCalledConcurrently() { RangeCache cache = new RangeCache<>(value -> value.s.length(), x -> 0); - int numberOfThreads = 4; + int numberOfThreads = 8; @Cleanup("shutdownNow") ScheduledExecutorService executor = Executors.newScheduledThreadPool(numberOfThreads); for (int i = 0; i < numberOfThreads; i++) { executor.scheduleWithFixedDelay(cache::clear, 0, 1, TimeUnit.MILLISECONDS); } - for (int i = 0; i < 100000; i++) { + for (int i = 0; i < 200000; i++) { cache.put(i, new RefString(String.valueOf(i))); } + executor.shutdown(); + // ensure that no clear operation got into endless loop + Awaitility.await().untilAsserted(() -> assertTrue(executor.isTerminated())); + assertEquals(cache.getNumberOfEntries(), 0); } @Test @@ -307,4 +314,26 @@ public void testPutSameObj() { assertTrue(cache.put(0, s0)); assertFalse(cache.put(0, s0)); } + + @Test + public void testRemoveEntryWithInvalidRefCount() { + RangeCache cache = new RangeCache<>(value -> value.s.length(), x -> 0); + RefString value = new RefString("1"); + cache.put(1, value); + // release the value to make the reference count invalid + value.release(); + cache.clear(); + assertEquals(cache.getNumberOfEntries(), 0); + } + + @Test + public void testRemoveEntryWithInvalidMatchingKey() { + RangeCache cache = new RangeCache<>(value -> value.s.length(), x -> 0); + RefString value = new RefString("1"); + cache.put(1, value); + // change the matching key to make it invalid + value.setMatchingKey(123); + cache.clear(); + assertEquals(cache.getNumberOfEntries(), 0); + } }