Skip to content

Commit

Permalink
Blob cache remove time dependency
Browse files Browse the repository at this point in the history
Trigger decay whenever freq0 is empty, ensuring we decay slowly/rapidly as needed.

Divide time into epochs, switch to new one whenever we need to decay. A region
now promotes 2 freqs per access but only once per epoch
  • Loading branch information
henningandersen committed Jan 25, 2024
1 parent 912e4cd commit 1b90a7d
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.RelativeByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractAsyncTask;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.Assertions;
Expand All @@ -50,14 +49,14 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.IntConsumer;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -313,16 +312,14 @@ private CacheEntry(T chunk) {

private final Runnable evictIncrementer;

private final LongSupplier relativeTimeInMillisSupplier;

public SharedBlobCacheService(
NodeEnvironment environment,
Settings settings,
ThreadPool threadPool,
String ioExecutor,
BlobCacheMetrics blobCacheMetrics
) {
this(environment, settings, threadPool, ioExecutor, ioExecutor, blobCacheMetrics, threadPool::relativeTimeInMillis);
this(environment, settings, threadPool, ioExecutor, ioExecutor, blobCacheMetrics);
}

public SharedBlobCacheService(
Expand All @@ -331,8 +328,7 @@ public SharedBlobCacheService(
ThreadPool threadPool,
String ioExecutor,
String bulkExecutor,
BlobCacheMetrics blobCacheMetrics,
LongSupplier relativeTimeInMillisSupplier
BlobCacheMetrics blobCacheMetrics
) {
this.threadPool = threadPool;
this.ioExecutor = threadPool.executor(ioExecutor);
Expand Down Expand Up @@ -374,7 +370,6 @@ public SharedBlobCacheService(
this.recoveryRangeSize = BlobCacheUtils.toIntBytes(SHARED_CACHE_RECOVERY_RANGE_SIZE_SETTING.get(settings).getBytes());

this.blobCacheMetrics = blobCacheMetrics;
this.relativeTimeInMillisSupplier = relativeTimeInMillisSupplier;
this.evictIncrementer = blobCacheMetrics.getEvictedCountNonZeroFrequency()::increment;
}

Expand Down Expand Up @@ -615,10 +610,6 @@ int getFreq(CacheFileRegion cacheFileRegion) {
return -1;
}

private long relativeTimeInMillis() {
return relativeTimeInMillisSupplier.getAsLong();
}

@Override
public void close() {
sharedBytes.decRef();
Expand Down Expand Up @@ -1138,17 +1129,17 @@ class LFUCacheEntry extends CacheEntry<CacheFileRegion> {
LFUCacheEntry prev;
LFUCacheEntry next;
int freq;
volatile long lastAccessed;
volatile long lastAccessedEpoch;

LFUCacheEntry(CacheFileRegion chunk, long lastAccessed) {
super(chunk);
this.lastAccessed = lastAccessed;
this.lastAccessedEpoch = lastAccessed;
this.freq = 1;
}

void touch() {
long now = relativeTimeInMillis();
if (now - lastAccessed >= minTimeDelta) {
long now = epoch.get();
if (now > lastAccessedEpoch) {
maybePromote(now, this);
}
}
Expand All @@ -1157,21 +1148,20 @@ void touch() {
private final ConcurrentHashMap<RegionKey<KeyType>, LFUCacheEntry> keyMapping = new ConcurrentHashMap<>();
private final LFUCacheEntry[] freqs;
private final int maxFreq;
private final long minTimeDelta;
private final CacheDecayTask decayTask;
private final NewEpochAndDecayTask newEpochAndDecayTask;

private final AtomicLong epoch = new AtomicLong();

@SuppressWarnings("unchecked")
LFUCache(Settings settings) {
this.maxFreq = SHARED_CACHE_MAX_FREQ_SETTING.get(settings);
this.minTimeDelta = SHARED_CACHE_MIN_TIME_DELTA_SETTING.get(settings).millis();
freqs = (LFUCacheEntry[]) Array.newInstance(LFUCacheEntry.class, maxFreq);
decayTask = new CacheDecayTask(threadPool, threadPool.generic(), SHARED_CACHE_DECAY_INTERVAL_SETTING.get(settings));
decayTask.rescheduleIfNecessary();
newEpochAndDecayTask = new NewEpochAndDecayTask(threadPool.generic());
}

@Override
public void close() {
decayTask.close();
newEpochAndDecayTask.close();
}

int getFreq(CacheFileRegion cacheFileRegion) {
Expand All @@ -1181,7 +1171,7 @@ int getFreq(CacheFileRegion cacheFileRegion) {
@Override
public LFUCacheEntry get(KeyType cacheKey, long fileLength, int region) {
final RegionKey<KeyType> regionKey = new RegionKey<>(cacheKey, region);
final long now = relativeTimeInMillis();
final long now = epoch.get();
// try to just get from the map on the fast-path to save instantiating the capturing lambda needed on the slow path
// if we did not find an entry
var entry = keyMapping.get(regionKey);
Expand All @@ -1200,7 +1190,7 @@ public LFUCacheEntry get(KeyType cacheKey, long fileLength, int region) {
assert assertChunkActiveOrEvicted(entry);

// existing item, check if we need to promote item
if (now - entry.lastAccessed >= minTimeDelta) {
if (now > entry.lastAccessedEpoch) {
maybePromote(now, entry);
}

Expand Down Expand Up @@ -1354,12 +1344,14 @@ private boolean assertChunkActiveOrEvicted(LFUCacheEntry entry) {
return true;
}

private void maybePromote(long now, LFUCacheEntry entry) {
private void maybePromote(long epoch, LFUCacheEntry entry) {
synchronized (SharedBlobCacheService.this) {
if (now - entry.lastAccessed >= minTimeDelta && entry.freq + 1 < maxFreq && entry.chunk.isEvicted() == false) {
// go 2 up per epoch, allowing us to decay 1 every epoch.
int newFreq = Math.min(entry.freq + 2, maxFreq);
if (epoch > entry.lastAccessedEpoch && newFreq < maxFreq && entry.chunk.isEvicted() == false) {
unlink(entry);
entry.freq++;
entry.lastAccessed = now;
entry.freq = newFreq;
entry.lastAccessedEpoch = epoch;
pushEntryToBack(entry);
}
}
Expand Down Expand Up @@ -1401,44 +1393,68 @@ private void unlink(final LFUCacheEntry entry) {
*/
private SharedBytes.IO maybeEvictAndTake(Runnable evictedNotification) {
assert Thread.holdsLock(SharedBlobCacheService.this);
for (int currentFreq = 0; currentFreq < maxFreq; currentFreq++) {
long currentEpoch = epoch.get();
SharedBytes.IO freq0 = maybeEvictAndTakeForFrequency(evictedNotification, 0);
if (freq0 != null) {
return freq0;
}
// no frequency 0 entries, let us switch epoch and decay so we get some for next time.
tryNewEpoch(currentEpoch);
for (int currentFreq = 1; currentFreq < maxFreq; currentFreq++) {
// recheck this per freq in case we raced an eviction with an incref'er.
SharedBytes.IO freeRegion = freeRegions.poll();
if (freeRegion != null) {
return freeRegion;
}
for (LFUCacheEntry entry = freqs[currentFreq]; entry != null; entry = entry.next) {
boolean evicted = entry.chunk.tryEvictNoDecRef();
if (evicted) {
try {
SharedBytes.IO ioRef = entry.chunk.io;
if (ioRef != null) {
try {
if (entry.chunk.refCount() == 1) {
// we own that one refcount (since we CAS'ed evicted to 1)
// grab io, rely on incref'ers also checking evicted field.
entry.chunk.io = null;
assert regionOwners.remove(ioRef) == entry.chunk;
return ioRef;
}
} finally {
unlink(entry);
keyMapping.remove(entry.chunk.regionKey, entry);
SharedBytes.IO taken = maybeEvictAndTakeForFrequency(evictedNotification, currentFreq);
if (taken != null) {
return taken;
}
}
// give up
return null;
}

private SharedBytes.IO maybeEvictAndTakeForFrequency(Runnable evictedNotification, int currentFreq) {
for (LFUCacheEntry entry = freqs[currentFreq]; entry != null; entry = entry.next) {
boolean evicted = entry.chunk.tryEvictNoDecRef();
if (evicted) {
try {
SharedBytes.IO ioRef = entry.chunk.io;
if (ioRef != null) {
try {
if (entry.chunk.refCount() == 1) {
// we own that one refcount (since we CAS'ed evicted to 1)
// grab io, rely on incref'ers also checking evicted field.
entry.chunk.io = null;
assert regionOwners.remove(ioRef) == entry.chunk;
return ioRef;
}
}
} finally {
entry.chunk.decRef();
if (currentFreq > 0) {
evictedNotification.run();
} finally {
unlink(entry);
keyMapping.remove(entry.chunk.regionKey, entry);
}
}
} finally {
entry.chunk.decRef();
if (currentFreq > 0) {
evictedNotification.run();
}
}
}
}
// give up
return null;
}

/**
* Check if a new epoch is needed based on the input. The input epoch should be captured
* before the determination that a new epoch is needed is done.
* @param currentEpoch the epoch to check against if a new epoch is needed
*/
private void tryNewEpoch(long currentEpoch) {
newEpochAndDecayTask.spawnIfNotRunning(currentEpoch);
}

/**
* This method tries to evict the least used {@link LFUCacheEntry}. Only entries with the lowest possible frequency are considered
* for eviction.
Expand All @@ -1461,39 +1477,66 @@ public boolean maybeEvictLeastUsed() {

private void computeDecay() {
synchronized (SharedBlobCacheService.this) {
long now = relativeTimeInMillis();
for (int i = 0; i < maxFreq; i++) {
for (int i = 1; i < maxFreq; i++) {
// todo: link over entire list
for (LFUCacheEntry entry = freqs[i]; entry != null; entry = entry.next) {
if (entry.freq > 0 && now - entry.lastAccessed >= 2 * minTimeDelta) {
unlink(entry);
entry.freq--;
pushEntryToBack(entry);
}
unlink(entry);
entry.freq--;
pushEntryToBack(entry);
}
}
}
}

class CacheDecayTask extends AbstractAsyncTask {
class NewEpochAndDecayTask extends AbstractRunnable {

CacheDecayTask(ThreadPool threadPool, Executor executor, TimeValue interval) {
super(logger, Objects.requireNonNull(threadPool), executor, Objects.requireNonNull(interval), true);
private final Executor executor;
private final AtomicLong pendingEpoch = new AtomicLong();
private volatile boolean isClosed;

NewEpochAndDecayTask(Executor executor) {
this.executor = executor;
}

@Override
protected boolean mustReschedule() {
return true;
protected void doRun() throws Exception {
if (isClosed == false) {
computeDecay();
}
}

@Override
public void runInternal() {
computeDecay();
public void onFailure(Exception e) {
logger.error("failed to run cache decay task", e);
}

@Override
public void onAfter() {
assert pendingEpoch.get() == epoch.get() + 1;
epoch.incrementAndGet();
}

@Override
public void onRejection(Exception e) {
assert false : e;
logger.error("unexpected rejection", e);
epoch.incrementAndGet();
}

@Override
public String toString() {
return "shared_cache_decay_task";
}

public void spawnIfNotRunning(long currentEpoch) {
if (isClosed == false && pendingEpoch.compareAndSet(currentEpoch, currentEpoch + 1)) {
executor.execute(this);
}
}

public void close() {
this.isClosed = true;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,7 @@ public void execute(Runnable command) {
threadPool,
ThreadPool.Names.GENERIC,
"bulk",
BlobCacheMetrics.NOOP,
threadPool::relativeTimeInMillis
BlobCacheMetrics.NOOP
)
) {
{
Expand Down Expand Up @@ -491,8 +490,7 @@ public ExecutorService executor(String name) {
threadPool,
ThreadPool.Names.GENERIC,
"bulk",
BlobCacheMetrics.NOOP,
threadPool::relativeTimeInMillis
BlobCacheMetrics.NOOP
)
) {

Expand Down Expand Up @@ -742,8 +740,7 @@ public void testMaybeEvictLeastUsed() throws Exception {
taskQueue.getThreadPool(),
ThreadPool.Names.GENERIC,
"bulk",
BlobCacheMetrics.NOOP,
relativeTimeInMillis::get
BlobCacheMetrics.NOOP
)
) {
final Set<Object> cacheKeys = new HashSet<>();
Expand Down Expand Up @@ -840,8 +837,7 @@ public void execute(Runnable command) {
threadPool,
ThreadPool.Names.GENERIC,
"bulk",
BlobCacheMetrics.NOOP,
relativeTimeInMillis::get
BlobCacheMetrics.NOOP
)
) {
{
Expand Down Expand Up @@ -956,8 +952,7 @@ public void testPopulate() throws Exception {
taskQueue.getThreadPool(),
ThreadPool.Names.GENERIC,
ThreadPool.Names.GENERIC,
BlobCacheMetrics.NOOP,
relativeTimeInMillis::get
BlobCacheMetrics.NOOP
)
) {
final var cacheKey = generateCacheKey();
Expand Down

0 comments on commit 1b90a7d

Please sign in to comment.