Skip to content

Commit

Permalink
[improve][ml] Improve RangeCache refactoring: test race conditions an…
Browse files Browse the repository at this point in the history
…d prevent endless loops
  • Loading branch information
lhotari committed May 31, 2024
1 parent c39f9f8 commit 28cf786
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,26 @@
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.util.RangeCache.ValueWithKeyValidation;
import org.apache.commons.lang3.tuple.Pair;

/**
* Special type of cache where get() and delete() operations can be done over a range of keys.
* The implementation avoids locks and synchronization and relies on ConcurrentSkipListMap for storing the entries.
* Since there is no locks, there is a need to have a way to ensure that a single entry in the cache is removed
* exactly once. Removing an entry multiple times would result in the entries of the cache getting released too
* while they could still be in use.
* The implementation avoids locks and synchronization by relying on ConcurrentSkipListMap for storing the entries.
* Since there are no locks, it's necessary to ensure that a single entry in the cache is removed exactly once.
* Removing an entry multiple times could result in the entries of the cache being released multiple times,
* even while they are still in use. This is prevented by using a custom wrapper around the value to store in the map
* that ensures that the value is removed from the map only if the exact same instance is present in the map.
* There's also a check that ensures that the value matches the key. This is used to detect races without impacting
* consistency.
*
* @param <Key>
* Cache key. Needs to be Comparable
* @param <Value>
* Cache value
*/
@Slf4j
public class RangeCache<Key extends Comparable<Key>, Value extends ValueWithKeyValidation<Key>> {
public interface ValueWithKeyValidation<T> extends ReferenceCounted {
boolean matchesKey(T key);
Expand Down Expand Up @@ -268,7 +273,7 @@ public Pair<Integer, Long> removeRange(Key first, Key last, boolean lastInclusiv
RemovalCounters counters = RemovalCounters.create();
Map<Key, IdentityWrapper<Key, Value>> subMap = entries.subMap(first, true, last, lastInclusive);
for (Map.Entry<Key, IdentityWrapper<Key, Value>> entry : subMap.entrySet()) {
removeEntry(entry, counters);
removeEntry(entry, counters, true);
}
return handleRemovalResult(counters);
}
Expand All @@ -279,16 +284,23 @@ enum RemoveEntryResult {
BREAK_LOOP;
}

private RemoveEntryResult removeEntry(Map.Entry<Key, IdentityWrapper<Key, Value>> entry, RemovalCounters counters) {
return removeEntry(entry, counters, (x) -> true);
private RemoveEntryResult removeEntry(Map.Entry<Key, IdentityWrapper<Key, Value>> entry, RemovalCounters counters,
boolean skipInvalid) {
return removeEntry(entry, counters, skipInvalid, x -> true);
}

private RemoveEntryResult removeEntry(Map.Entry<Key, IdentityWrapper<Key, Value>> entry, RemovalCounters counters,
Predicate<Value> removeCondition) {
boolean skipInvalid, Predicate<Value> removeCondition) {
Key key = entry.getKey();
IdentityWrapper<Key, Value> identityWrapper = entry.getValue();
if (identityWrapper.getKey() != key) {
// the wrapper has been recycled and contains another key
// the wrapper has already been recycled and contains another key
if (!skipInvalid) {
// log and remove the entry without releasing the value
log.info("Key {} does not match the entry's value wrapper's key {}, removing entry by key without "
+ "releasing the value", key, identityWrapper.getKey());
entries.remove(key);
}
return RemoveEntryResult.CONTINUE_LOOP;
}
Value value = identityWrapper.getValue();
Expand All @@ -297,16 +309,49 @@ private RemoveEntryResult removeEntry(Map.Entry<Key, IdentityWrapper<Key, Value>
value.retain();
} catch (IllegalReferenceCountException e) {
// Value was already released
if (!skipInvalid) {
// remove the specific entry without releasing the value
log.info("Value was already released for key {}, removing entry without releasing the value", key);
entries.remove(key, identityWrapper);
}
return RemoveEntryResult.CONTINUE_LOOP;
}
try {
if (!removeCondition.test(value)) {
return RemoveEntryResult.BREAK_LOOP;
}
// check that the value hasn't been recycled in between
// there should be at least 2 references since this method adds one and the cache should have one
// it is valid that the value contains references even after the key has been removed from the cache
if (value.refCnt() > 1 && value.matchesKey(key) && entries.remove(key, identityWrapper)) {
if (!skipInvalid) {
// remove the specific entry
boolean entryRemoved = entries.remove(key, identityWrapper);
if (entryRemoved) {
boolean matchesKey = value.matchesKey(key);
if (matchesKey) {
counters.entryRemoved(weighter.getSize(value));
// check that the value hasn't been recycled in between
// there should be at least 2 references since this method adds one and the cache should have
// one reference. it is valid that the value contains references even after the key has been
// removed from the cache
if (value.refCnt() > 1) {
identityWrapper.recycle();
// remove the cache reference
value.release();
} else {
log.info("Unexpected refCnt {} for key {}, removed entry without releasing the value",
value.refCnt(), key);
return RemoveEntryResult.CONTINUE_LOOP;
}
} else {
// we don't know the size (weight) of the value
counters.entryRemoved(0);
log.info("Value {} does not match the key {}, removed entry without releasing the value", value,
key);
return RemoveEntryResult.CONTINUE_LOOP;
}
}
} else if (skipInvalid && value.refCnt() > 1 && value.matchesKey(key)
&& entries.remove(key, identityWrapper)) {
// when skipInvalid is true, we don't remove the entry if it doesn't match matches the key
// or the refCnt is invalid
identityWrapper.recycle();
counters.entryRemoved(weighter.getSize(value));
// remove the cache reference
Expand Down Expand Up @@ -334,12 +379,12 @@ private Pair<Integer, Long> handleRemovalResult(RemovalCounters counters) {
public Pair<Integer, Long> evictLeastAccessedEntries(long minSize) {
checkArgument(minSize > 0);
RemovalCounters counters = RemovalCounters.create();
while (counters.removedSize < minSize) {
while (counters.removedSize < minSize && !Thread.currentThread().isInterrupted()) {
Map.Entry<Key, IdentityWrapper<Key, Value>> entry = entries.firstEntry();
if (entry == null) {
break;
}
removeEntry(entry, counters);
removeEntry(entry, counters, false);
}
return handleRemovalResult(counters);
}
Expand All @@ -351,12 +396,12 @@ public Pair<Integer, Long> evictLeastAccessedEntries(long minSize) {
*/
public Pair<Integer, Long> evictLEntriesBeforeTimestamp(long maxTimestamp) {
RemovalCounters counters = RemovalCounters.create();
while (true) {
while (!Thread.currentThread().isInterrupted()) {
Map.Entry<Key, IdentityWrapper<Key, Value>> entry = entries.firstEntry();
if (entry == null) {
break;
}
if (removeEntry(entry, counters, value -> timestampExtractor.getTimestamp(value) <= maxTimestamp)
if (removeEntry(entry, counters, false, value -> timestampExtractor.getTimestamp(value) <= maxTimestamp)
== RemoveEntryResult.BREAK_LOOP) {
break;
}
Expand All @@ -382,12 +427,12 @@ public long getSize() {
*/
public Pair<Integer, Long> clear() {
RemovalCounters counters = RemovalCounters.create();
while (true) {
while (!Thread.currentThread().isInterrupted()) {
Map.Entry<Key, IdentityWrapper<Key, Value>> entry = entries.firstEntry();
if (entry == null) {
break;
}
removeEntry(entry, counters);
removeEntry(entry, counters, false);
}
return handleRemovalResult(counters);
}
Expand Down Expand Up @@ -421,5 +466,4 @@ public long getSize(Value value) {
return 1;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.Data;
import org.apache.commons.lang3.tuple.Pair;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;

public class RangeCacheTest {

@Data
class RefString extends AbstractReferenceCounted implements RangeCache.ValueWithKeyValidation<Integer> {
String s;
Integer matchingKey;
Expand Down Expand Up @@ -288,15 +291,19 @@ public void evictions() {
@Test
public void testPutWhileClearIsCalledConcurrently() {
RangeCache<Integer, RefString> cache = new RangeCache<>(value -> value.s.length(), x -> 0);
int numberOfThreads = 4;
int numberOfThreads = 8;
@Cleanup("shutdownNow")
ScheduledExecutorService executor = Executors.newScheduledThreadPool(numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
executor.scheduleWithFixedDelay(cache::clear, 0, 1, TimeUnit.MILLISECONDS);
}
for (int i = 0; i < 100000; i++) {
for (int i = 0; i < 200000; i++) {
cache.put(i, new RefString(String.valueOf(i)));
}
executor.shutdown();
// ensure that no clear operation got into endless loop
Awaitility.await().untilAsserted(() -> assertTrue(executor.isTerminated()));
assertEquals(cache.getNumberOfEntries(), 0);
}

@Test
Expand All @@ -307,4 +314,26 @@ public void testPutSameObj() {
assertTrue(cache.put(0, s0));
assertFalse(cache.put(0, s0));
}

@Test
public void testRemoveEntryWithInvalidRefCount() {
RangeCache<Integer, RefString> cache = new RangeCache<>(value -> value.s.length(), x -> 0);
RefString value = new RefString("1");
cache.put(1, value);
// release the value to make the reference count invalid
value.release();
cache.clear();
assertEquals(cache.getNumberOfEntries(), 0);
}

@Test
public void testRemoveEntryWithInvalidMatchingKey() {
RangeCache<Integer, RefString> cache = new RangeCache<>(value -> value.s.length(), x -> 0);
RefString value = new RefString("1");
cache.put(1, value);
// change the matching key to make it invalid
value.setMatchingKey(123);
cache.clear();
assertEquals(cache.getNumberOfEntries(), 0);
}
}

0 comments on commit 28cf786

Please sign in to comment.