Skip to content

Commit

Permalink
[Tiered Caching] Moving query recomputation logic outside of write lo…
Browse files Browse the repository at this point in the history
…ck (opensearch-project#14187)

* Moving query recompute out of write lock

Signed-off-by: Sagar Upadhyaya <[email protected]>

* [Tiered Caching] Moving query recomputation logic outside of write lock

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Adding java doc for the completable map

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Changes to call future handler only once per key

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Fixing spotless check

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Added changelog

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Addressing comments

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Fixing gradle fail

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Addressing comments to refactor unit test

Signed-off-by: Sagar Upadhyaya <[email protected]>

* minor UT refactor

Signed-off-by: Sagar Upadhyaya <[email protected]>

---------

Signed-off-by: Sagar Upadhyaya <[email protected]>
Signed-off-by: Sagar <[email protected]>
Co-authored-by: Sagar Upadhyaya <[email protected]>
  • Loading branch information
2 people authored and Peter Alfonsi committed Sep 3, 2024
1 parent 8347779 commit 29ec9d6
Show file tree
Hide file tree
Showing 2 changed files with 405 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.cache.common.tier;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cache.common.policy.TookTimePolicy;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.cache.CacheType;
Expand Down Expand Up @@ -35,9 +37,13 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.ToLongBiFunction;
Expand All @@ -61,6 +67,7 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {

// Used to avoid caching stale entries in lower tiers.
private static final List<RemovalReason> SPILLOVER_REMOVAL_REASONS = List.of(RemovalReason.EVICTED, RemovalReason.CAPACITY);
private static final Logger logger = LogManager.getLogger(TieredSpilloverCache.class);

private final ICache<K, V> diskCache;
private final ICache<K, V> onHeapCache;
Expand All @@ -86,6 +93,12 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
private final Map<ICache<K, V>, TierInfo> caches;
private final List<Predicate<V>> policies;

/**
* This map is used to handle concurrent requests for same key in computeIfAbsent() to ensure we load the value
* only once.
*/
Map<ICacheKey<K>, CompletableFuture<Tuple<ICacheKey<K>, V>>> completableFutureMap = new ConcurrentHashMap<>();

TieredSpilloverCache(Builder<K, V> builder) {
Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null");
Objects.requireNonNull(builder.diskCacheFactory, "disk cache builder can't be null");
Expand Down Expand Up @@ -190,10 +203,7 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
// Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside.
// This is needed as there can be many requests for the same key at the same time and we only want to load
// the value once.
V value = null;
try (ReleasableLock ignore = writeLock.acquire()) {
value = onHeapCache.computeIfAbsent(key, loader);
}
V value = compute(key, loader);
// Handle stats
if (loader.isLoaded()) {
// The value was just computed and added to the cache by this thread. Register a miss for the heap cache, and the disk cache
Expand Down Expand Up @@ -222,6 +232,57 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
return cacheValueTuple.v1();
}

private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader) throws Exception {
// Only one of the threads will succeed putting a future into map for the same key.
// Rest will fetch existing future and wait on that to complete.
CompletableFuture<Tuple<ICacheKey<K>, V>> future = completableFutureMap.putIfAbsent(key, new CompletableFuture<>());
// Handler to handle results post processing. Takes a tuple<key, value> or exception as an input and returns
// the value. Also before returning value, puts the value in cache.
BiFunction<Tuple<ICacheKey<K>, V>, Throwable, Void> handler = (pair, ex) -> {
if (pair != null) {
try (ReleasableLock ignore = writeLock.acquire()) {
onHeapCache.put(pair.v1(), pair.v2());
} catch (Exception e) {
// TODO: Catch specific exceptions to know whether this resulted from cache or underlying removal
// listeners/stats. Needs better exception handling at underlying layers.For now swallowing
// exception.
logger.warn("Exception occurred while putting item onto heap cache", e);
}
} else {
if (ex != null) {
logger.warn("Exception occurred while trying to compute the value", ex);
}
}
completableFutureMap.remove(key); // Remove key from map as not needed anymore.
return null;
};
V value = null;
if (future == null) {
future = completableFutureMap.get(key);
future.handle(handler);
try {
value = loader.load(key);
} catch (Exception ex) {
future.completeExceptionally(ex);
throw new ExecutionException(ex);
}
if (value == null) {
NullPointerException npe = new NullPointerException("Loader returned a null value");
future.completeExceptionally(npe);
throw new ExecutionException(npe);
} else {
future.complete(new Tuple<>(key, value));
}
} else {
try {
value = future.get().v2();
} catch (InterruptedException ex) {
throw new IllegalStateException(ex);
}
}
return value;
}

@Override
public void invalidate(ICacheKey<K> key) {
// We are trying to invalidate the key from all caches though it would be present in only of them.
Expand Down Expand Up @@ -328,12 +389,22 @@ void handleRemovalFromHeapTier(RemovalNotification<ICacheKey<K>, V> notification
ICacheKey<K> key = notification.getKey();
boolean wasEvicted = SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason());
boolean countEvictionTowardsTotal = false; // Don't count this eviction towards the cache's total if it ends up in the disk tier
if (caches.get(diskCache).isEnabled() && wasEvicted && evaluatePolicies(notification.getValue())) {
boolean exceptionOccurredOnDiskCachePut = false;
boolean canCacheOnDisk = caches.get(diskCache).isEnabled() && wasEvicted && evaluatePolicies(notification.getValue());
if (canCacheOnDisk) {
try (ReleasableLock ignore = writeLock.acquire()) {
diskCache.put(key, notification.getValue()); // spill over to the disk tier and increment its stats
} catch (Exception ex) {
// TODO: Catch specific exceptions. Needs better exception handling. We are just swallowing exception
// in this case as it shouldn't cause upstream request to fail.
logger.warn("Exception occurred while putting item to disk cache", ex);
exceptionOccurredOnDiskCachePut = true;
}
updateStatsOnPut(TIER_DIMENSION_VALUE_DISK, key, notification.getValue());
} else {
if (!exceptionOccurredOnDiskCachePut) {
updateStatsOnPut(TIER_DIMENSION_VALUE_DISK, key, notification.getValue());
}
}
if (!canCacheOnDisk || exceptionOccurredOnDiskCachePut) {
// If the value is not going to the disk cache, send this notification to the TSC's removal listener
// as the value is leaving the TSC entirely
removalListener.onRemoval(notification);
Expand Down
Loading

0 comments on commit 29ec9d6

Please sign in to comment.