diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index 63cdbca101f2a..b6d6913a9f8d4 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -8,6 +8,8 @@ package org.opensearch.cache.common.tier; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.cache.common.policy.TookTimePolicy; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.cache.CacheType; @@ -35,9 +37,13 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.ToLongBiFunction; @@ -61,6 +67,7 @@ public class TieredSpilloverCache implements ICache { // Used to avoid caching stale entries in lower tiers. private static final List SPILLOVER_REMOVAL_REASONS = List.of(RemovalReason.EVICTED, RemovalReason.CAPACITY); + private static final Logger logger = LogManager.getLogger(TieredSpilloverCache.class); private final ICache diskCache; private final ICache onHeapCache; @@ -86,6 +93,12 @@ public class TieredSpilloverCache implements ICache { private final Map, TierInfo> caches; private final List> policies; + /** + * This map is used to handle concurrent requests for same key in computeIfAbsent() to ensure we load the value + * only once. + */ + Map, CompletableFuture, V>>> completableFutureMap = new ConcurrentHashMap<>(); + TieredSpilloverCache(Builder builder) { Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null"); Objects.requireNonNull(builder.diskCacheFactory, "disk cache builder can't be null"); @@ -190,10 +203,7 @@ public V computeIfAbsent(ICacheKey key, LoadAwareCacheLoader, V> // Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside. // This is needed as there can be many requests for the same key at the same time and we only want to load // the value once. - V value = null; - try (ReleasableLock ignore = writeLock.acquire()) { - value = onHeapCache.computeIfAbsent(key, loader); - } + V value = compute(key, loader); // Handle stats if (loader.isLoaded()) { // The value was just computed and added to the cache by this thread. Register a miss for the heap cache, and the disk cache @@ -222,6 +232,57 @@ public V computeIfAbsent(ICacheKey key, LoadAwareCacheLoader, V> return cacheValueTuple.v1(); } + private V compute(ICacheKey key, LoadAwareCacheLoader, V> loader) throws Exception { + // Only one of the threads will succeed putting a future into map for the same key. + // Rest will fetch existing future and wait on that to complete. + CompletableFuture, V>> future = completableFutureMap.putIfAbsent(key, new CompletableFuture<>()); + // Handler to handle results post processing. Takes a tuple or exception as an input and returns + // the value. Also before returning value, puts the value in cache. + BiFunction, V>, Throwable, Void> handler = (pair, ex) -> { + if (pair != null) { + try (ReleasableLock ignore = writeLock.acquire()) { + onHeapCache.put(pair.v1(), pair.v2()); + } catch (Exception e) { + // TODO: Catch specific exceptions to know whether this resulted from cache or underlying removal + // listeners/stats. Needs better exception handling at underlying layers.For now swallowing + // exception. + logger.warn("Exception occurred while putting item onto heap cache", e); + } + } else { + if (ex != null) { + logger.warn("Exception occurred while trying to compute the value", ex); + } + } + completableFutureMap.remove(key); // Remove key from map as not needed anymore. + return null; + }; + V value = null; + if (future == null) { + future = completableFutureMap.get(key); + future.handle(handler); + try { + value = loader.load(key); + } catch (Exception ex) { + future.completeExceptionally(ex); + throw new ExecutionException(ex); + } + if (value == null) { + NullPointerException npe = new NullPointerException("Loader returned a null value"); + future.completeExceptionally(npe); + throw new ExecutionException(npe); + } else { + future.complete(new Tuple<>(key, value)); + } + } else { + try { + value = future.get().v2(); + } catch (InterruptedException ex) { + throw new IllegalStateException(ex); + } + } + return value; + } + @Override public void invalidate(ICacheKey key) { // We are trying to invalidate the key from all caches though it would be present in only of them. @@ -328,12 +389,22 @@ void handleRemovalFromHeapTier(RemovalNotification, V> notification ICacheKey key = notification.getKey(); boolean wasEvicted = SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason()); boolean countEvictionTowardsTotal = false; // Don't count this eviction towards the cache's total if it ends up in the disk tier - if (caches.get(diskCache).isEnabled() && wasEvicted && evaluatePolicies(notification.getValue())) { + boolean exceptionOccurredOnDiskCachePut = false; + boolean canCacheOnDisk = caches.get(diskCache).isEnabled() && wasEvicted && evaluatePolicies(notification.getValue()); + if (canCacheOnDisk) { try (ReleasableLock ignore = writeLock.acquire()) { diskCache.put(key, notification.getValue()); // spill over to the disk tier and increment its stats + } catch (Exception ex) { + // TODO: Catch specific exceptions. Needs better exception handling. We are just swallowing exception + // in this case as it shouldn't cause upstream request to fail. + logger.warn("Exception occurred while putting item to disk cache", ex); + exceptionOccurredOnDiskCachePut = true; } - updateStatsOnPut(TIER_DIMENSION_VALUE_DISK, key, notification.getValue()); - } else { + if (!exceptionOccurredOnDiskCachePut) { + updateStatsOnPut(TIER_DIMENSION_VALUE_DISK, key, notification.getValue()); + } + } + if (!canCacheOnDisk || exceptionOccurredOnDiskCachePut) { // If the value is not going to the disk cache, send this notification to the TSC's removal listener // as the value is leaving the TSC entirely removalListener.onRemoval(notification); diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index 54b15f236a418..b9c7bbdb77d3d 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -44,8 +44,12 @@ import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; @@ -56,6 +60,10 @@ import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_DISK; import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_ON_HEAP; import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TieredSpilloverCacheTests extends OpenSearchTestCase { static final List dimensionNames = List.of("dim1", "dim2", "dim3"); @@ -408,6 +416,7 @@ public void testComputeIfAbsentWithEvictionsFromOnHeapCache() throws Exception { assertEquals(onHeapCacheHit, getHitsForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_ON_HEAP)); assertEquals(cacheMiss + numOfItems1, getMissesForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_DISK)); assertEquals(diskCacheHit, getHitsForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_DISK)); + assertEquals(0, tieredSpilloverCache.completableFutureMap.size()); } public void testComputeIfAbsentWithEvictionsFromTieredCache() throws Exception { @@ -802,7 +811,7 @@ public String load(ICacheKey key) { }; loadAwareCacheLoaderList.add(loadAwareCacheLoader); phaser.arriveAndAwaitAdvance(); - tieredSpilloverCache.computeIfAbsent(key, loadAwareCacheLoader); + assertEquals(value, tieredSpilloverCache.computeIfAbsent(key, loadAwareCacheLoader)); } catch (Exception e) { throw new RuntimeException(e); } @@ -811,7 +820,7 @@ public String load(ICacheKey key) { threads[i].start(); } phaser.arriveAndAwaitAdvance(); - countDownLatch.await(); // Wait for rest of tasks to be cancelled. + countDownLatch.await(); int numberOfTimesKeyLoaded = 0; assertEquals(numberOfSameKeys, loadAwareCacheLoaderList.size()); for (int i = 0; i < loadAwareCacheLoaderList.size(); i++) { @@ -824,6 +833,215 @@ public String load(ICacheKey key) { // We should see only one heap miss, and the rest hits assertEquals(1, getMissesForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_ON_HEAP)); assertEquals(numberOfSameKeys - 1, getHitsForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_ON_HEAP)); + assertEquals(0, tieredSpilloverCache.completableFutureMap.size()); + } + + public void testComputIfAbsentConcurrentlyWithMultipleKeys() throws Exception { + int onHeapCacheSize = randomIntBetween(300, 500); + int diskCacheSize = randomIntBetween(600, 700); + int keyValueSize = 50; + + MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); + Settings settings = Settings.builder() + .put( + OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) + .get(MAXIMUM_SIZE_IN_BYTES_KEY) + .getKey(), + onHeapCacheSize * keyValueSize + "b" + ) + .build(); + + TieredSpilloverCache tieredSpilloverCache = initializeTieredSpilloverCache( + keyValueSize, + diskCacheSize, + removalListener, + settings, + 0 + ); + + int iterations = 10; + int numberOfKeys = 20; + List> iCacheKeyList = new ArrayList<>(); + for (int i = 0; i < numberOfKeys; i++) { + ICacheKey key = getICacheKey(UUID.randomUUID().toString()); + iCacheKeyList.add(key); + } + ExecutorService executorService = Executors.newFixedThreadPool(8); + CountDownLatch countDownLatch = new CountDownLatch(iterations * numberOfKeys); // To wait for all threads to finish. + + List, String>> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>(); + for (int j = 0; j < numberOfKeys; j++) { + int finalJ = j; + for (int i = 0; i < iterations; i++) { + executorService.submit(() -> { + try { + LoadAwareCacheLoader, String> loadAwareCacheLoader = new LoadAwareCacheLoader<>() { + boolean isLoaded = false; + + @Override + public boolean isLoaded() { + return isLoaded; + } + + @Override + public String load(ICacheKey key) { + isLoaded = true; + return iCacheKeyList.get(finalJ).key; + } + }; + loadAwareCacheLoaderList.add(loadAwareCacheLoader); + tieredSpilloverCache.computeIfAbsent(iCacheKeyList.get(finalJ), loadAwareCacheLoader); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + countDownLatch.countDown(); + } + }); + } + } + countDownLatch.await(); + int numberOfTimesKeyLoaded = 0; + assertEquals(iterations * numberOfKeys, loadAwareCacheLoaderList.size()); + for (int i = 0; i < loadAwareCacheLoaderList.size(); i++) { + LoadAwareCacheLoader, String> loader = loadAwareCacheLoaderList.get(i); + if (loader.isLoaded()) { + numberOfTimesKeyLoaded++; + } + } + assertEquals(numberOfKeys, numberOfTimesKeyLoaded); // It should be loaded only once. + // We should see only one heap miss, and the rest hits + assertEquals(numberOfKeys, getMissesForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_ON_HEAP)); + assertEquals((iterations * numberOfKeys) - numberOfKeys, getHitsForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_ON_HEAP)); + assertEquals(0, tieredSpilloverCache.completableFutureMap.size()); + executorService.shutdownNow(); + } + + public void testComputeIfAbsentConcurrentlyAndThrowsException() throws Exception { + LoadAwareCacheLoader, String> loadAwareCacheLoader = new LoadAwareCacheLoader<>() { + boolean isLoaded = false; + + @Override + public boolean isLoaded() { + return isLoaded; + } + + @Override + public String load(ICacheKey key) { + throw new RuntimeException("Testing"); + } + }; + verifyComputeIfAbsentThrowsException(RuntimeException.class, loadAwareCacheLoader, "Testing"); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void testComputeIfAbsentWithOnHeapCacheThrowingExceptionOnPut() throws Exception { + int onHeapCacheSize = randomIntBetween(100, 300); + int diskCacheSize = randomIntBetween(200, 400); + int keyValueSize = 50; + + MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); + Settings settings = Settings.builder() + .put( + OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) + .get(MAXIMUM_SIZE_IN_BYTES_KEY) + .getKey(), + onHeapCacheSize * keyValueSize + "b" + ) + .build(); + ICache.Factory onHeapCacheFactory = mock(OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.class); + ICache mockOnHeapCache = mock(ICache.class); + when(onHeapCacheFactory.create(any(), any(), any())).thenReturn(mockOnHeapCache); + doThrow(new RuntimeException("Testing")).when(mockOnHeapCache).put(any(), any()); + CacheConfig cacheConfig = getCacheConfig(keyValueSize, settings, removalListener); + ICache.Factory mockDiskCacheFactory = new MockDiskCache.MockDiskCacheFactory(0, diskCacheSize, false); + + TieredSpilloverCache tieredSpilloverCache = getTieredSpilloverCache( + onHeapCacheFactory, + mockDiskCacheFactory, + cacheConfig, + null, + removalListener + ); + String value = ""; + value = tieredSpilloverCache.computeIfAbsent(getICacheKey("test"), new LoadAwareCacheLoader<>() { + @Override + public boolean isLoaded() { + return false; + } + + @Override + public String load(ICacheKey key) { + return "test"; + } + }); + assertEquals("test", value); + assertEquals(0, tieredSpilloverCache.completableFutureMap.size()); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void testComputeIfAbsentWithDiskCacheThrowingExceptionOnPut() throws Exception { + int onHeapCacheSize = 0; + int keyValueSize = 50; + + MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); + Settings settings = Settings.builder() + .put( + OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) + .get(MAXIMUM_SIZE_IN_BYTES_KEY) + .getKey(), + onHeapCacheSize * keyValueSize + "b" + ) + .build(); + ICache.Factory onHeapCacheFactory = new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(); + CacheConfig cacheConfig = getCacheConfig(keyValueSize, settings, removalListener); + ICache.Factory mockDiskCacheFactory = mock(MockDiskCache.MockDiskCacheFactory.class); + ICache mockDiskCache = mock(ICache.class); + when(mockDiskCacheFactory.create(any(), any(), any())).thenReturn(mockDiskCache); + doThrow(new RuntimeException("Test")).when(mockDiskCache).put(any(), any()); + + TieredSpilloverCache tieredSpilloverCache = getTieredSpilloverCache( + onHeapCacheFactory, + mockDiskCacheFactory, + cacheConfig, + null, + removalListener + ); + + String response = ""; + response = tieredSpilloverCache.computeIfAbsent(getICacheKey("test"), new LoadAwareCacheLoader<>() { + @Override + public boolean isLoaded() { + return false; + } + + @Override + public String load(ICacheKey key) { + return "test"; + } + }); + ImmutableCacheStats diskStats = getStatsSnapshotForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_DISK); + + assertEquals(0, diskStats.getSizeInBytes()); + assertEquals(1, removalListener.evictionsMetric.count()); + assertEquals("test", response); + assertEquals(0, tieredSpilloverCache.completableFutureMap.size()); + } + + public void testComputeIfAbsentConcurrentlyWithLoaderReturningNull() throws Exception { + LoadAwareCacheLoader, String> loadAwareCacheLoader = new LoadAwareCacheLoader<>() { + boolean isLoaded = false; + + @Override + public boolean isLoaded() { + return isLoaded; + } + + @Override + public String load(ICacheKey key) { + return null; + } + }; + verifyComputeIfAbsentThrowsException(NullPointerException.class, loadAwareCacheLoader, "Loader returned a null value"); } public void testConcurrencyForEvictionFlowFromOnHeapToDiskTier() throws Exception { @@ -1408,6 +1626,26 @@ public boolean isLoaded() { }; } + private TieredSpilloverCache getTieredSpilloverCache( + ICache.Factory onHeapCacheFactory, + ICache.Factory mockDiskCacheFactory, + CacheConfig cacheConfig, + List> policies, + RemovalListener, String> removalListener + ) { + TieredSpilloverCache.Builder builder = new TieredSpilloverCache.Builder().setCacheType( + CacheType.INDICES_REQUEST_CACHE + ) + .setRemovalListener(removalListener) + .setOnHeapCacheFactory(onHeapCacheFactory) + .setDiskCacheFactory(mockDiskCacheFactory) + .setCacheConfig(cacheConfig); + if (policies != null) { + builder.addPolicies(policies); + } + return builder.build(); + } + private TieredSpilloverCache initializeTieredSpilloverCache( int keyValueSize, int diskCacheSize, @@ -1450,17 +1688,34 @@ private TieredSpilloverCache intializeTieredSpilloverCache( .build(); ICache.Factory mockDiskCacheFactory = new MockDiskCache.MockDiskCacheFactory(diskDeliberateDelay, diskCacheSize, false); - TieredSpilloverCache.Builder builder = new TieredSpilloverCache.Builder().setCacheType( - CacheType.INDICES_REQUEST_CACHE - ) + return getTieredSpilloverCache(onHeapCacheFactory, mockDiskCacheFactory, cacheConfig, policies, removalListener); + } + + private CacheConfig getCacheConfig( + int keyValueSize, + Settings settings, + RemovalListener, String> removalListener + ) { + return new CacheConfig.Builder().setKeyType(String.class) + .setKeyType(String.class) + .setWeigher((k, v) -> keyValueSize) + .setSettings(settings) + .setDimensionNames(dimensionNames) .setRemovalListener(removalListener) - .setOnHeapCacheFactory(onHeapCacheFactory) - .setDiskCacheFactory(mockDiskCacheFactory) - .setCacheConfig(cacheConfig); - if (policies != null) { - builder.addPolicies(policies); - } - return builder.build(); + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) + .setSettings( + Settings.builder() + .put( + CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ) + .put(FeatureFlags.PLUGGABLE_CACHE, "true") + .put(settings) + .build() + ) + .setClusterSettings(clusterSettings) + .build(); } // Helper functions for extracting tier aggregated stats. @@ -1501,6 +1756,66 @@ private ImmutableCacheStats getStatsSnapshotForTier(TieredSpilloverCache t return snapshot; } + private void verifyComputeIfAbsentThrowsException( + Class expectedException, + LoadAwareCacheLoader, String> loader, + String expectedExceptionMessage + ) throws InterruptedException { + int onHeapCacheSize = randomIntBetween(100, 300); + int diskCacheSize = randomIntBetween(200, 400); + int keyValueSize = 50; + + MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); + Settings settings = Settings.builder() + .put( + OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) + .get(MAXIMUM_SIZE_IN_BYTES_KEY) + .getKey(), + onHeapCacheSize * keyValueSize + "b" + ) + .build(); + + TieredSpilloverCache tieredSpilloverCache = initializeTieredSpilloverCache( + keyValueSize, + diskCacheSize, + removalListener, + settings, + 0 + ); + + int numberOfSameKeys = randomIntBetween(10, onHeapCacheSize - 1); + ICacheKey key = getICacheKey(UUID.randomUUID().toString()); + String value = UUID.randomUUID().toString(); + AtomicInteger exceptionCount = new AtomicInteger(); + + Thread[] threads = new Thread[numberOfSameKeys]; + Phaser phaser = new Phaser(numberOfSameKeys + 1); + CountDownLatch countDownLatch = new CountDownLatch(numberOfSameKeys); // To wait for all threads to finish. + + for (int i = 0; i < numberOfSameKeys; i++) { + threads[i] = new Thread(() -> { + try { + phaser.arriveAndAwaitAdvance(); + tieredSpilloverCache.computeIfAbsent(key, loader); + } catch (Exception e) { + exceptionCount.incrementAndGet(); + assertEquals(ExecutionException.class, e.getClass()); + assertEquals(expectedException, e.getCause().getClass()); + assertEquals(expectedExceptionMessage, e.getCause().getMessage()); + } finally { + countDownLatch.countDown(); + } + }); + threads[i].start(); + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); // Wait for rest of tasks to be cancelled. + + // Verify exception count was equal to number of requests + assertEquals(numberOfSameKeys, exceptionCount.get()); + assertEquals(0, tieredSpilloverCache.completableFutureMap.size()); + } + private ImmutableCacheStats getTotalStatsSnapshot(TieredSpilloverCache tsc) throws IOException { ImmutableCacheStatsHolder cacheStats = tsc.stats(new String[0]); return cacheStats.getStatsForDimensionValues(List.of());