Skip to content

Commit

Permalink
[Tiered Caching] Adds stats implementation for TieredSpilloverCache (o…
Browse files Browse the repository at this point in the history
…pensearch-project#13236)

Stats rework part 4 of 4
---------

Signed-off-by: Peter Alfonsi <[email protected]>
Co-authored-by: Peter Alfonsi <[email protected]>
  • Loading branch information
peteralfonsi and Peter Alfonsi committed Sep 3, 2024
1 parent 46944cb commit a5c5675
Show file tree
Hide file tree
Showing 12 changed files with 754 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ public MockDiskCachePlugin() {}

@Override
public Map<String, ICache.Factory> getCacheFactoryMap() {
return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 1000));
return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 1000, false));
}

@Override
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cache.common.tier;

import org.opensearch.common.cache.stats.DefaultCacheStatsHolder;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

/**
* A tier-aware version of DefaultCacheStatsHolder. Overrides the incrementer functions, as we can't just add the on-heap
* and disk stats to get a total for the cache as a whole. If the disk tier is present, the total hits, size, and entries
* should be the sum of both tiers' values, but the total misses and evictions should be the disk tier's values.
* When the disk tier isn't present, on-heap misses and evictions should contribute to the total.
*
* For example, if the heap tier has 5 misses and the disk tier has 4, the total cache has had 4 misses, not 9.
* The same goes for evictions. Other stats values add normally.
*
* This means for misses and evictions, if we are incrementing for the on-heap tier and the disk tier is present,
* we have to increment only the leaf nodes corresponding to the on-heap tier itself, and not its ancestors,
* which correspond to totals including both tiers. If the disk tier is not present, we do increment the ancestor nodes.
*/
public class TieredSpilloverCacheStatsHolder extends DefaultCacheStatsHolder {

/** Whether the disk cache is currently enabled. */
private boolean diskCacheEnabled;

// Common values used for tier dimension

/** The name for the tier dimension. */
public static final String TIER_DIMENSION_NAME = "tier";

/** Dimension value for on-heap cache, like OpenSearchOnHeapCache.*/
public static final String TIER_DIMENSION_VALUE_ON_HEAP = "on_heap";

/** Dimension value for on-disk cache, like EhcacheDiskCache. */
public static final String TIER_DIMENSION_VALUE_DISK = "disk";

/**
* Constructor for the stats holder.
* @param originalDimensionNames the original dimension names, not including TIER_DIMENSION_NAME
* @param diskCacheEnabled whether the disk tier starts out enabled
*/
public TieredSpilloverCacheStatsHolder(List<String> originalDimensionNames, boolean diskCacheEnabled) {
super(
getDimensionNamesWithTier(originalDimensionNames),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
);
this.diskCacheEnabled = diskCacheEnabled;
}

private static List<String> getDimensionNamesWithTier(List<String> dimensionNames) {
List<String> dimensionNamesWithTier = new ArrayList<>(dimensionNames);
dimensionNamesWithTier.add(TIER_DIMENSION_NAME);
return dimensionNamesWithTier;
}

/**
* Add tierValue to the end of a copy of the initial dimension values, so they can appropriately be used in this stats holder.
*/
List<String> getDimensionsWithTierValue(List<String> initialDimensions, String tierValue) {
List<String> result = new ArrayList<>(initialDimensions);
result.add(tierValue);
return result;
}

private String validateTierDimensionValue(List<String> dimensionValues) {
String tierDimensionValue = dimensionValues.get(dimensionValues.size() - 1);
assert tierDimensionValue.equals(TIER_DIMENSION_VALUE_ON_HEAP) || tierDimensionValue.equals(TIER_DIMENSION_VALUE_DISK)
: "Invalid tier dimension value";
return tierDimensionValue;
}

@Override
public void incrementHits(List<String> dimensionValues) {
validateTierDimensionValue(dimensionValues);
// Hits from either tier should be included in the total values.
super.incrementHits(dimensionValues);
}

@Override
public void incrementMisses(List<String> dimensionValues) {
final String tierValue = validateTierDimensionValue(dimensionValues);

// If the disk tier is present, only misses from the disk tier should be included in total values.
Consumer<Node> missIncrementer = (node) -> {
if (tierValue.equals(TIER_DIMENSION_VALUE_ON_HEAP) && diskCacheEnabled) {
// If on-heap tier, increment only the leaf node corresponding to the on heap values; not the total values in its parent
// nodes
if (node.isAtLowestLevel()) {
node.incrementMisses();
}
} else {
// If disk tier, or on-heap tier with a disabled disk tier, increment the leaf node and its parents
node.incrementMisses();
}
};
internalIncrement(dimensionValues, missIncrementer, true);
}

@Override
public void incrementEvictions(List<String> dimensionValues) {
final String tierValue = validateTierDimensionValue(dimensionValues);

// If the disk tier is present, only evictions from the disk tier should be included in total values.
Consumer<DefaultCacheStatsHolder.Node> evictionsIncrementer = (node) -> {
if (tierValue.equals(TIER_DIMENSION_VALUE_ON_HEAP) && diskCacheEnabled) {
// If on-heap tier, increment only the leaf node corresponding to the on heap values; not the total values in its parent
// nodes
if (node.isAtLowestLevel()) {
node.incrementEvictions();
}
} else {
// If disk tier, or on-heap tier with a disabled disk tier, increment the leaf node and its parents
node.incrementEvictions();
}
};
internalIncrement(dimensionValues, evictionsIncrementer, true);
}

@Override
public void incrementSizeInBytes(List<String> dimensionValues, long amountBytes) {
validateTierDimensionValue(dimensionValues);
// Size from either tier should be included in the total values.
super.incrementSizeInBytes(dimensionValues, amountBytes);
}

// For decrements, we should not create nodes if they are absent. This protects us from erroneously decrementing values for keys
// which have been entirely deleted, for example in an async removal listener.
@Override
public void decrementSizeInBytes(List<String> dimensionValues, long amountBytes) {
validateTierDimensionValue(dimensionValues);
// Size from either tier should be included in the total values.
super.decrementSizeInBytes(dimensionValues, amountBytes);
}

@Override
public void incrementItems(List<String> dimensionValues) {
validateTierDimensionValue(dimensionValues);
// Entries from either tier should be included in the total values.
super.incrementItems(dimensionValues);
}

@Override
public void decrementItems(List<String> dimensionValues) {
validateTierDimensionValue(dimensionValues);
// Entries from either tier should be included in the total values.
super.decrementItems(dimensionValues);
}

void setDiskCacheEnabled(boolean diskCacheEnabled) {
this.diskCacheEnabled = diskCacheEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.stats.CacheStatsHolder;
import org.opensearch.common.cache.stats.DefaultCacheStatsHolder;
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
import org.opensearch.common.cache.stats.NoopCacheStatsHolder;
import org.opensearch.common.cache.store.builders.ICacheBuilder;
import org.opensearch.common.cache.store.config.CacheConfig;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -32,12 +36,19 @@ public class MockDiskCache<K, V> implements ICache<K, V> {
long delay;

private final RemovalListener<ICacheKey<K>, V> removalListener;
private final CacheStatsHolder statsHolder; // Only update for number of entries; this is only used to test statsTrackingEnabled logic
// in TSC

public MockDiskCache(int maxSize, long delay, RemovalListener<ICacheKey<K>, V> removalListener) {
public MockDiskCache(int maxSize, long delay, RemovalListener<ICacheKey<K>, V> removalListener, boolean statsTrackingEnabled) {
this.maxSize = maxSize;
this.delay = delay;
this.removalListener = removalListener;
this.cache = new ConcurrentHashMap<ICacheKey<K>, V>();
if (statsTrackingEnabled) {
this.statsHolder = new DefaultCacheStatsHolder(List.of(), "mock_disk_cache");
} else {
this.statsHolder = NoopCacheStatsHolder.getInstance();
}
}

@Override
Expand All @@ -50,13 +61,15 @@ public V get(ICacheKey<K> key) {
public void put(ICacheKey<K> key, V value) {
if (this.cache.size() >= maxSize) { // For simplification
this.removalListener.onRemoval(new RemovalNotification<>(key, value, RemovalReason.EVICTED));
this.statsHolder.decrementItems(List.of());
}
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
this.cache.put(key, value);
this.statsHolder.incrementItems(List.of());
}

@Override
Expand All @@ -73,6 +86,7 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>

@Override
public void invalidate(ICacheKey<K> key) {
removalListener.onRemoval(new RemovalNotification<>(key, cache.get(key), RemovalReason.INVALIDATED));
this.cache.remove(key);
}

Expand All @@ -96,7 +110,9 @@ public void refresh() {}

@Override
public ImmutableCacheStatsHolder stats() {
return null;
// To allow testing of statsTrackingEnabled logic in TSC, return a dummy ImmutableCacheStatsHolder with the
// right number of entries, unless statsTrackingEnabled is false
return statsHolder.getImmutableCacheStatsHolder(null);
}

@Override
Expand All @@ -114,10 +130,12 @@ public static class MockDiskCacheFactory implements Factory {
public static final String NAME = "mockDiskCache";
final long delay;
final int maxSize;
final boolean statsTrackingEnabled;

public MockDiskCacheFactory(long delay, int maxSize) {
public MockDiskCacheFactory(long delay, int maxSize, boolean statsTrackingEnabled) {
this.delay = delay;
this.maxSize = maxSize;
this.statsTrackingEnabled = statsTrackingEnabled;
}

@Override
Expand All @@ -128,6 +146,7 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
.setMaxSize(maxSize)
.setDeliberateDelay(delay)
.setRemovalListener(config.getRemovalListener())
.setStatsTrackingEnabled(config.getStatsTrackingEnabled())
.build();
}

Expand All @@ -146,7 +165,7 @@ public static class Builder<K, V> extends ICacheBuilder<K, V> {

@Override
public ICache<K, V> build() {
return new MockDiskCache<K, V>(this.maxSize, this.delay, this.getRemovalListener());
return new MockDiskCache<K, V>(this.maxSize, this.delay, this.getRemovalListener(), getStatsTrackingEnabled());
}

public Builder<K, V> setMaxSize(int maxSize) {
Expand Down
Loading

0 comments on commit a5c5675

Please sign in to comment.