Skip to content

Commit

Permalink
Changed to trigger decay when freq0 goes empty.
Browse files Browse the repository at this point in the history
Fixed various tests, including the decay test.
  • Loading branch information
henningandersen committed Jan 25, 2024
1 parent 2d689a6 commit 11af2a1
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.IntConsumer;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -261,6 +260,18 @@ void computeDecay() {
}
}

// used in tests
void tryNewEpoch() {
if (cache instanceof LFUCache lfuCache) {
lfuCache.tryNewEpoch(lfuCache.epoch.get());
}
}

// used in tests
long epoch() {
return ((LFUCache) cache).epoch.get();
}

private interface Cache<K, T> extends Releasable {
CacheEntry<T> get(K cacheKey, long fileLength, int region);

Expand Down Expand Up @@ -1151,6 +1162,9 @@ class LFUCacheEntry extends CacheEntry<CacheFileRegion> {
LFUCacheEntry(CacheFileRegion chunk, long lastAccessed) {
super(chunk);
this.lastAccessedEpoch = lastAccessed;
// todo: consider whether freq=1 is still right for new entries.
// it could risk decaying to level 0 right after and thus potentially be evicted
// if the freq 1 LRU chain was short.
this.freq = 1;
}

Expand Down Expand Up @@ -1412,11 +1426,13 @@ private SharedBytes.IO maybeEvictAndTake(Runnable evictedNotification) {
assert Thread.holdsLock(SharedBlobCacheService.this);
long currentEpoch = epoch.get();
SharedBytes.IO freq0 = maybeEvictAndTakeForFrequency(evictedNotification, 0);
if (freqs[0] == null) {
// no frequency 0 entries, let us switch epoch and decay so we get some for next time.
tryNewEpoch(currentEpoch);
}
if (freq0 != null) {
return freq0;
}
// no frequency 0 entries, let us switch epoch and decay so we get some for next time.
tryNewEpoch(currentEpoch);
for (int currentFreq = 1; currentFreq < maxFreq; currentFreq++) {
// recheck this per freq in case we raced an eviction with an incref'er.
SharedBytes.IO freeRegion = freeRegions.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,10 @@ public void testForceEvictResponse() throws IOException {
}

public void testDecay() throws IOException {
// we have 8 regions
Settings settings = Settings.builder()
.put(NODE_NAME_SETTING.getKey(), "node")
.put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(500)).getStringRep())
.put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(400)).getStringRep())
.put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep())
.put("path.home", createTempDir())
.build();
Expand All @@ -268,58 +269,85 @@ public void testDecay() throws IOException {
BlobCacheMetrics.NOOP
)
) {
assertEquals(4, cacheService.freeRegionCount());

final var cacheKey1 = generateCacheKey();
final var cacheKey2 = generateCacheKey();
final var cacheKey3 = generateCacheKey();
assertEquals(5, cacheService.freeRegionCount());
// add a region that we can evict when provoking first decay
cacheService.get("evictkey", size(250), 0);
assertEquals(3, cacheService.freeRegionCount());
final var region0 = cacheService.get(cacheKey1, size(250), 0);
assertEquals(4, cacheService.freeRegionCount());
assertEquals(2, cacheService.freeRegionCount());
final var region1 = cacheService.get(cacheKey2, size(250), 1);
assertEquals(3, cacheService.freeRegionCount());
assertEquals(1, cacheService.freeRegionCount());
final var region2 = cacheService.get(cacheKey3, size(250), 1);
assertEquals(2, cacheService.freeRegionCount());
assertEquals(0, cacheService.freeRegionCount());

assertEquals(1, cacheService.getFreq(region0));
assertEquals(1, cacheService.getFreq(region1));
assertEquals(1, cacheService.getFreq(region2));
AtomicLong expectedEpoch = new AtomicLong();
Runnable triggerDecay = () -> {
assertThat(taskQueue.hasRunnableTasks(), is(false));
cacheService.get(expectedEpoch.toString(), size(250), 0);
assertThat(taskQueue.hasRunnableTasks(), is(true));
taskQueue.runAllRunnableTasks();
assertThat(cacheService.epoch(), equalTo(expectedEpoch.incrementAndGet()));
};

taskQueue.advanceTime();
taskQueue.runAllRunnableTasks();
triggerDecay.run();

cacheService.get(cacheKey1, size(250), 0);
cacheService.get(cacheKey2, size(250), 1);
cacheService.get(cacheKey3, size(250), 1);

triggerDecay.run();

final var region0Again = cacheService.get(cacheKey1, size(250), 0);
assertSame(region0Again, region0);
assertEquals(2, cacheService.getFreq(region0));
assertEquals(3, cacheService.getFreq(region0));
assertEquals(1, cacheService.getFreq(region1));
assertEquals(1, cacheService.getFreq(region2));

taskQueue.advanceTime();
taskQueue.runAllRunnableTasks();
triggerDecay.run();

cacheService.get(cacheKey1, size(250), 0);
assertEquals(3, cacheService.getFreq(region0));
assertEquals(4, cacheService.getFreq(region0));
cacheService.get(cacheKey1, size(250), 0);
assertEquals(3, cacheService.getFreq(region0));
assertEquals(4, cacheService.getFreq(region0));
assertEquals(0, cacheService.getFreq(region1));
assertEquals(0, cacheService.getFreq(region2));

// advance 2 ticks (decay only starts after 2 ticks)
taskQueue.advanceTime();
taskQueue.runAllRunnableTasks();
taskQueue.advanceTime();
taskQueue.runAllRunnableTasks();
// ensure no freq=0 entries
cacheService.get(cacheKey2, size(250), 1);
cacheService.get(cacheKey3, size(250), 1);
assertEquals(2, cacheService.getFreq(region1));
assertEquals(2, cacheService.getFreq(region2));

triggerDecay.run();

assertEquals(3, cacheService.getFreq(region0));
assertEquals(1, cacheService.getFreq(region1));
assertEquals(1, cacheService.getFreq(region2));

triggerDecay.run();
assertEquals(2, cacheService.getFreq(region0));
assertEquals(0, cacheService.getFreq(region1));
assertEquals(0, cacheService.getFreq(region2));

// advance another tick
taskQueue.advanceTime();
taskQueue.runAllRunnableTasks();
// ensure no freq=0 entries
cacheService.get(cacheKey2, size(250), 1);
cacheService.get(cacheKey3, size(250), 1);
assertEquals(2, cacheService.getFreq(region1));
assertEquals(2, cacheService.getFreq(region2));

triggerDecay.run();
assertEquals(1, cacheService.getFreq(region0));
assertEquals(0, cacheService.getFreq(region1));
assertEquals(0, cacheService.getFreq(region2));
assertEquals(1, cacheService.getFreq(region1));
assertEquals(1, cacheService.getFreq(region2));

// advance another tick
taskQueue.advanceTime();
taskQueue.runAllRunnableTasks();
triggerDecay.run();
assertEquals(0, cacheService.getFreq(region0));
assertEquals(0, cacheService.getFreq(region1));
assertEquals(0, cacheService.getFreq(region2));
Expand Down Expand Up @@ -732,7 +760,7 @@ public void testCacheSizeChanges() throws IOException {
}

public void testMaybeEvictLeastUsed() throws Exception {
final int numRegions = 10;randomIntBetween(1, 500);
final int numRegions = 10;
final long regionSize = size(1L);
Settings settings = Settings.builder()
.put(NODE_NAME_SETTING.getKey(), "node")
Expand Down Expand Up @@ -781,24 +809,26 @@ public void testMaybeEvictLeastUsed() throws Exception {
assertThat("All regions are used", cacheService.freeRegionCount(), equalTo(0));
assertThat("Cache entries are not old enough to be evicted", cacheService.maybeEvictLeastUsed(), is(false));

// simulate elapsed time
var minInternalMillis = SharedBlobCacheService.SHARED_CACHE_MIN_TIME_DELTA_SETTING.getDefault(Settings.EMPTY).millis();
relativeTimeInMillis.addAndGet(minInternalMillis);
cacheService.tryNewEpoch();
taskQueue.runAllRunnableTasks();

cacheEntries.keySet().forEach(key -> cacheService.get(key, regionSize, 0));
cacheService.tryNewEpoch();
taskQueue.runAllRunnableTasks();

// touch some random cache entries
var useedCacheKeys = Set.copyOf(randomSubsetOf(cacheEntries.keySet()));
useedCacheKeys.forEach(key -> cacheService.get(key, regionSize, 0));

cacheEntries.forEach(
(key, entry) -> assertThat(cacheService.getFreq(entry), useedCacheKeys.contains(key) ? equalTo(2) : equalTo(1))
(key, entry) -> assertThat(cacheService.getFreq(entry), useedCacheKeys.contains(key) ? equalTo(3) : equalTo(1))
);

assertThat("All regions are used", cacheService.freeRegionCount(), equalTo(0));
assertThat("Cache entries are not old enough to be evicted", cacheService.maybeEvictLeastUsed(), is(false));

// need to advance time and compute decay to decrease frequencies in cache
relativeTimeInMillis.addAndGet(minInternalMillis);
cacheService.computeDecay();
cacheService.tryNewEpoch();
taskQueue.runAllRunnableTasks();

assertThat("All regions are used", cacheService.freeRegionCount(), equalTo(0));
cacheEntries.forEach(
Expand Down

0 comments on commit 11af2a1

Please sign in to comment.