Skip to content

Commit

Permalink
Blob-cache get remove false negatives
Browse files Browse the repository at this point in the history
The blob cache would sometimes respond with an already closed exception
even though regions could be made available. Fixed to only do so when there
really are no available regions (which should never happen with
region count > thread count, i.e., normally).
One exception is explicit evict, which only happens on explicit clear cache
or corruptions.
  • Loading branch information
henningandersen committed Jan 24, 2024
1 parent e177182 commit 3ae5931
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,7 @@ class CacheFileRegion extends EvictableRefCounted {

final RegionKey<KeyType> regionKey;
final SparseFileTracker tracker;
// io can be null when not init'ed or after evict/take
volatile SharedBytes.IO io = null;

CacheFileRegion(RegionKey<KeyType> regionKey, int regionSize) {
Expand All @@ -701,6 +702,16 @@ boolean tryEvict() {
return false;
}

boolean tryEvictNoDecRef() {
assert Thread.holdsLock(SharedBlobCacheService.this) : "must hold lock when evicting";
if (refCount() <= 1 && evict()) {
logger.trace("evicted and take {} with channel offset {}", regionKey, physicalStartOffset());
evictCount.increment();
return true;
}

return false;
}
public boolean forceEvict() {
assert Thread.holdsLock(SharedBlobCacheService.this) : "must hold lock when evicting";
if (evict()) {
Expand Down Expand Up @@ -1165,7 +1176,7 @@ public LFUCacheEntry get(KeyType cacheKey, long fileLength, int region) {
// io is volatile, double locking is fine, as long as we assign it last.
if (entry.chunk.io == null) {
synchronized (entry.chunk) {
if (entry.chunk.io == null) {
if (entry.chunk.io == null && entry.chunk.isEvicted() == false) {
return initChunk(entry);
}
}
Expand Down Expand Up @@ -1226,16 +1237,15 @@ private LFUCacheEntry initChunk(LFUCacheEntry entry) {
assignToSlot(entry, freeSlot);
} else {
// need to evict something
int frequency;
SharedBytes.IO io;
synchronized (SharedBlobCacheService.this) {
frequency = maybeEvict();
io = maybeEvictAndTake(blobCacheMetrics.getEvictedCountNonZeroFrequency()::increment);
}
if (frequency > 0) {
blobCacheMetrics.getEvictedCountNonZeroFrequency().increment();
if (io == null) {
io = freeRegions.poll();
}
final SharedBytes.IO freeSlotRetry = freeRegions.poll();
if (freeSlotRetry != null) {
assignToSlot(entry, freeSlotRetry);
if (io != null) {
assignToSlot(entry, io);
} else {
boolean removed = keyMapping.remove(regionKey, entry);
assert removed;
Expand Down Expand Up @@ -1322,7 +1332,9 @@ private boolean assertChunkActiveOrEvicted(LFUCacheEntry entry) {
assert entry.prev != null || entry.chunk.isEvicted();

}
assert regionOwners.get(entry.chunk.io) == entry.chunk || entry.chunk.isEvicted();
SharedBytes.IO io = entry.chunk.io;
assert io != null || entry.chunk.isEvicted();
assert io == null || regionOwners.get(io) == entry.chunk || entry.chunk.isEvicted();
return true;
}

Expand Down Expand Up @@ -1384,6 +1396,44 @@ private int maybeEvict() {
return -1;
}

private SharedBytes.IO maybeEvictAndTake(Runnable evictedNotification) {
assert Thread.holdsLock(SharedBlobCacheService.this);
for (int currentFreq = 0; currentFreq < maxFreq; currentFreq++) {
// recheck this per freq in case we raced an eviction with an incref'er.
SharedBytes.IO freeRegion = freeRegions.poll();
if (freeRegion != null) {
return freeRegion;
}
for (LFUCacheEntry entry = freqs[currentFreq]; entry != null; entry = entry.next) {
boolean evicted = entry.chunk.tryEvictNoDecRef();
if (evicted) {
try {
if (entry.chunk.io != null) {
try {
if (entry.chunk.refCount() == 1) {
// grab io, rely on incref'ers also checking evicted field.
final SharedBytes.IO result = entry.chunk.io;
entry.chunk.io = null;
assert regionOwners.remove(result) == entry.chunk;
return result;
}
} finally {
unlink(entry);
keyMapping.remove(entry.chunk.regionKey, entry);
}
}
} finally {
entry.chunk.decRef();
if (currentFreq > 0) {
evictedNotification.run();
}
}
}
}
}
// give up
return null;
}
/**
* This method tries to evict the least used {@link LFUCacheEntry}. Only entries with the lowest possible frequency are considered
* for eviction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ public void testBasicEviction() throws IOException {
assertEquals(2, cacheService.freeRegionCount());

synchronized (cacheService) {
assertTrue(region1.tryEvict());
assertTrue(tryEvict(region1));
}
assertEquals(3, cacheService.freeRegionCount());
synchronized (cacheService) {
assertFalse(region1.tryEvict());
assertFalse(tryEvict(region1));
}
assertEquals(3, cacheService.freeRegionCount());
final var bytesReadFuture = new PlainActionFuture<Integer>();
Expand All @@ -107,24 +107,36 @@ public void testBasicEviction() throws IOException {
bytesReadFuture
);
synchronized (cacheService) {
assertFalse(region0.tryEvict());
assertFalse(tryEvict(region0));
}
assertEquals(3, cacheService.freeRegionCount());
assertFalse(bytesReadFuture.isDone());
taskQueue.runAllRunnableTasks();
synchronized (cacheService) {
assertTrue(region0.tryEvict());
assertTrue(tryEvict(region0));
}
assertEquals(4, cacheService.freeRegionCount());
synchronized (cacheService) {
assertTrue(region2.tryEvict());
assertTrue(tryEvict(region2));
}
assertEquals(5, cacheService.freeRegionCount());
assertTrue(bytesReadFuture.isDone());
assertEquals(Integer.valueOf(1), bytesReadFuture.actionGet());
}
}

private static boolean tryEvict(SharedBlobCacheService<Object>.CacheFileRegion region1) {
if (randomBoolean()) {
return region1.tryEvict();
} else {
boolean result = region1.tryEvictNoDecRef();
if (result) {
region1.decRef();
}
return result;
}
}

public void testAutoEviction() throws IOException {
Settings settings = Settings.builder()
.put(NODE_NAME_SETTING.getKey(), "node")
Expand Down Expand Up @@ -163,7 +175,7 @@ public void testAutoEviction() throws IOException {

// explicitly evict region 1
synchronized (cacheService) {
assertTrue(region1.tryEvict());
assertTrue(tryEvict(region1));
}
assertEquals(1, cacheService.freeRegionCount());
}
Expand Down Expand Up @@ -308,11 +320,14 @@ public void testDecay() throws IOException {
*/
public void testGetMultiThreaded() throws IOException {
int threads = between(2, 10);
int regionCount = between(1, 20);
// if we have enough regions, a get should always have a result (except for explicit evict interference)
final boolean allowAlreadyClosed = regionCount < threads;
Settings settings = Settings.builder()
.put(NODE_NAME_SETTING.getKey(), "node")
.put(
SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(),
ByteSizeValue.ofBytes(size(between(1, 20) * 100L)).getStringRep()
ByteSizeValue.ofBytes(size(regionCount * 100L)).getStringRep()
)
.put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep())
.put(SharedBlobCacheService.SHARED_CACHE_MIN_TIME_DELTA_SETTING.getKey(), randomFrom("0", "1ms", "10s"))
Expand Down Expand Up @@ -343,11 +358,17 @@ public void testGetMultiThreaded() throws IOException {
ready.await();
for (int i = 0; i < iterations; ++i) {
try {
SharedBlobCacheService<String>.CacheFileRegion cacheFileRegion = cacheService.get(
cacheKeys[i],
fileLength,
regions[i]
);
SharedBlobCacheService<String>.CacheFileRegion cacheFileRegion;
try {
cacheFileRegion = cacheService.get(
cacheKeys[i],
fileLength,
regions[i]
);
} catch (AlreadyClosedException e) {
assert allowAlreadyClosed || e.getMessage().equals("evicted during free region allocation"): e;
throw e;
}
if (cacheFileRegion.tryIncRef()) {
if (yield[i] == 0) {
Thread.yield();
Expand Down

0 comments on commit 3ae5931

Please sign in to comment.