Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop using ReleasableLock in o.e.c.cache.Cache to save O(10M) in heap #107555

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 64 additions & 22 deletions server/src/main/java/org/elasticsearch/common/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.elasticsearch.common.cache;

import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.core.Tuple;

import java.lang.reflect.Array;
Expand All @@ -19,6 +18,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -178,10 +178,10 @@ private static final class Entry<K, V> {
*/
private final class CacheSegment {
// read/write lock protecting mutations to the segment
ReadWriteLock segmentLock = new ReentrantReadWriteLock();
final ReadWriteLock segmentLock = new ReentrantReadWriteLock();

ReleasableLock readLock = new ReleasableLock(segmentLock.readLock());
ReleasableLock writeLock = new ReleasableLock(segmentLock.writeLock());
final Lock readLock = segmentLock.readLock();
final Lock writeLock = segmentLock.writeLock();

Map<K, CompletableFuture<Entry<K, V>>> map;

Expand All @@ -196,8 +196,11 @@ private final class CacheSegment {
*/
Entry<K, V> get(K key, long now, boolean eagerEvict) {
CompletableFuture<Entry<K, V>> future;
try (ReleasableLock ignored = readLock.acquire()) {
readLock.lock();
try {
future = map == null ? null : map.get(key);
} finally {
readLock.unlock();
}
if (future != null) {
Entry<K, V> entry;
Expand All @@ -213,8 +216,11 @@ Entry<K, V> get(K key, long now, boolean eagerEvict) {
if (isExpired(entry, now)) {
misses.increment();
if (eagerEvict) {
try (ReleasableLock ignored = lruLock.acquire()) {
lruLock.lock();
try {
evictEntry(entry);
} finally {
lruLock.unlock();
}
}
return null;
Expand All @@ -240,7 +246,8 @@ Entry<K, V> get(K key, long now, boolean eagerEvict) {
Tuple<Entry<K, V>, Entry<K, V>> put(K key, V value, long now) {
Entry<K, V> entry = new Entry<>(key, value, now);
Entry<K, V> existing = null;
try (ReleasableLock ignored = writeLock.acquire()) {
writeLock.lock();
try {
try {
if (map == null) {
map = new HashMap<>();
Expand All @@ -252,6 +259,8 @@ Tuple<Entry<K, V>, Entry<K, V>> put(K key, V value, long now) {
} catch (ExecutionException | InterruptedException e) {
throw new IllegalStateException(e);
}
} finally {
writeLock.unlock();
}
return Tuple.tuple(entry, existing);
}
Expand All @@ -263,7 +272,8 @@ Tuple<Entry<K, V>, Entry<K, V>> put(K key, V value, long now) {
*/
void remove(K key) {
CompletableFuture<Entry<K, V>> future;
try (ReleasableLock ignored = writeLock.acquire()) {
writeLock.lock();
try {
if (map == null) {
future = null;
} else {
Expand All @@ -272,6 +282,8 @@ void remove(K key) {
map = null;
}
}
} finally {
writeLock.unlock();
}
if (future != null) {
evictions.increment();
Expand All @@ -290,7 +302,8 @@ void remove(K key) {
void remove(K key, V value, boolean notify) {
CompletableFuture<Entry<K, V>> future;
boolean removed = false;
try (ReleasableLock ignored = writeLock.acquire()) {
writeLock.lock();
try {
future = map == null ? null : map.get(key);
try {
if (future != null) {
Expand All @@ -307,6 +320,8 @@ void remove(K key, V value, boolean notify) {
} catch (ExecutionException | InterruptedException e) {
throw new IllegalStateException(e);
}
} finally {
writeLock.unlock();
}

if (future != null && removed) {
Expand All @@ -333,7 +348,7 @@ void remove(K key, V value, boolean notify) {
Entry<K, V> tail;

// lock protecting mutations to the LRU list
private final ReleasableLock lruLock = new ReleasableLock(new ReentrantLock());
private final ReentrantLock lruLock = new ReentrantLock();

/**
* Returns the value to which the specified key is mapped, or null if this map contains no mapping for the key.
Expand Down Expand Up @@ -380,30 +395,36 @@ public V computeIfAbsent(K key, CacheLoader<K, V> loader) throws ExecutionExcept
// need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding
// the segment lock; to do this, we atomically put a future in the map that can load the value, and then
// get the value from this future on the thread that won the race to place the future into the segment map
CacheSegment segment = getCacheSegment(key);
final CacheSegment segment = getCacheSegment(key);
CompletableFuture<Entry<K, V>> future;
CompletableFuture<Entry<K, V>> completableFuture = new CompletableFuture<>();

try (ReleasableLock ignored = segment.writeLock.acquire()) {
segment.writeLock.lock();
try {
if (segment.map == null) {
segment.map = new HashMap<>();
}
future = segment.map.putIfAbsent(key, completableFuture);
} finally {
segment.writeLock.unlock();
}

BiFunction<? super Entry<K, V>, Throwable, ? extends V> handler = (ok, ex) -> {
if (ok != null) {
promote(ok, now);
return ok.value;
} else {
try (ReleasableLock ignored = segment.writeLock.acquire()) {
segment.writeLock.lock();
try {
CompletableFuture<Entry<K, V>> sanity = segment.map == null ? null : segment.map.get(key);
if (sanity != null && sanity.isCompletedExceptionally()) {
segment.map.remove(key);
if (segment.map.isEmpty()) {
segment.map = null;
}
}
} finally {
segment.writeLock.unlock();
}
return null;
}
Expand Down Expand Up @@ -461,13 +482,16 @@ private void put(K key, V value, long now) {
CacheSegment segment = getCacheSegment(key);
Tuple<Entry<K, V>, Entry<K, V>> tuple = segment.put(key, value, now);
boolean replaced = false;
try (ReleasableLock ignored = lruLock.acquire()) {
lruLock.lock();
try {
if (tuple.v2() != null && tuple.v2().state == State.EXISTING) {
if (unlink(tuple.v2())) {
replaced = true;
}
}
promote(tuple.v1(), now);
} finally {
lruLock.unlock();
}
if (replaced) {
removalListener.onRemoval(
Expand All @@ -479,8 +503,11 @@ private void put(K key, V value, long now) {
private void notifyWithInvalidated(CompletableFuture<Entry<K, V>> f) {
try {
Entry<K, V> entry = f.get();
try (ReleasableLock ignored = lruLock.acquire()) {
lruLock.lock();
try {
delete(entry, RemovalNotification.RemovalReason.INVALIDATED);
} finally {
lruLock.unlock();
}
} catch (ExecutionException e) {
// ok
Expand Down Expand Up @@ -521,7 +548,8 @@ public void invalidateAll() {
Entry<K, V> h;

boolean[] haveSegmentLock = new boolean[NUMBER_OF_SEGMENTS];
try (ReleasableLock ignored = lruLock.acquire()) {
lruLock.lock();
try {
try {
for (int i = 0; i < NUMBER_OF_SEGMENTS; i++) {
segments[i].segmentLock.writeLock().lock();
Expand All @@ -546,6 +574,8 @@ public void invalidateAll() {
}
}
}
} finally {
lruLock.unlock();
}
while (h != null) {
removalListener.onRemoval(new RemovalNotification<>(h.key, h.value, RemovalNotification.RemovalReason.INVALIDATED));
Expand All @@ -558,8 +588,11 @@ public void invalidateAll() {
*/
public void refresh() {
long now = now();
try (ReleasableLock ignored = lruLock.acquire()) {
lruLock.lock();
try {
evict(now);
} finally {
lruLock.unlock();
}
}

Expand Down Expand Up @@ -589,7 +622,7 @@ public long weight() {
* @return an LRU-ordered {@link Iterable} over the keys in the cache
*/
public Iterable<K> keys() {
return () -> new Iterator<K>() {
return () -> new Iterator<>() {
private final CacheIterator iterator = new CacheIterator(head);

@Override
Expand Down Expand Up @@ -617,7 +650,7 @@ public void remove() {
* @return an LRU-ordered {@link Iterable} over the values in the cache
*/
public Iterable<V> values() {
return () -> new Iterator<V>() {
return () -> new Iterator<>() {
private final CacheIterator iterator = new CacheIterator(head);

@Override
Expand Down Expand Up @@ -647,7 +680,8 @@ public void remove() {
*/
public void forEach(BiConsumer<K, V> consumer) {
for (CacheSegment segment : segments) {
try (ReleasableLock ignored = segment.readLock.acquire()) {
segment.readLock.lock();
try {
if (segment.map == null) {
continue;
}
Expand All @@ -661,6 +695,8 @@ public void forEach(BiConsumer<K, V> consumer) {
throw new IllegalStateException(e);
}
}
} finally {
segment.readLock.unlock();
}
}
}
Expand Down Expand Up @@ -692,9 +728,12 @@ public void remove() {
if (entry != null) {
CacheSegment segment = getCacheSegment(entry.key);
segment.remove(entry.key, entry.value, false);
try (ReleasableLock ignored = lruLock.acquire()) {
lruLock.lock();
try {
current = null;
delete(entry, RemovalNotification.RemovalReason.INVALIDATED);
} finally {
lruLock.unlock();
}
}
}
Expand Down Expand Up @@ -736,7 +775,8 @@ public long getEvictions() {

private void promote(Entry<K, V> entry, long now) {
boolean promoted = true;
try (ReleasableLock ignored = lruLock.acquire()) {
lruLock.lock();
try {
switch (entry.state) {
case DELETED -> promoted = false;
case EXISTING -> relinkAtHead(entry);
Expand All @@ -745,6 +785,8 @@ private void promote(Entry<K, V> entry, long now) {
if (promoted) {
evict(now);
}
} finally {
lruLock.unlock();
}
}

Expand Down