Skip to content

Commit

Permalink
[Tiered Caching] Adds stats implementation for TieredSpilloverCache (#…
Browse files Browse the repository at this point in the history
…13236)

Stats rework part 4 of 4 
---------

Signed-off-by: Peter Alfonsi <[email protected]>
Co-authored-by: Peter Alfonsi <[email protected]>
  • Loading branch information
peteralfonsi and Peter Alfonsi authored May 3, 2024
1 parent 37df569 commit a8017d8
Show file tree
Hide file tree
Showing 13 changed files with 755 additions and 100 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Tiered Caching] Add dimension-based stats to ICache implementations. ([#12531](https://github.com/opensearch-project/OpenSearch/pull/12531))
- Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868))
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))
- [Tiered Caching] Add dimension-based stats to TieredSpilloverCache ([#13236](https://github.com/opensearch-project/OpenSearch/pull/13236))
- [Tiered Caching] Expose new cache stats API ([#13237](https://github.com/opensearch-project/OpenSearch/pull/13237))
- [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174))
- Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ public MockDiskCachePlugin() {}

@Override
public Map<String, ICache.Factory> getCacheFactoryMap() {
return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 1000));
return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 1000, false));
}

@Override
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cache.common.tier;

import org.opensearch.common.cache.stats.DefaultCacheStatsHolder;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

/**
* A tier-aware version of DefaultCacheStatsHolder. Overrides the incrementer functions, as we can't just add the on-heap
* and disk stats to get a total for the cache as a whole. If the disk tier is present, the total hits, size, and entries
* should be the sum of both tiers' values, but the total misses and evictions should be the disk tier's values.
* When the disk tier isn't present, on-heap misses and evictions should contribute to the total.
*
* For example, if the heap tier has 5 misses and the disk tier has 4, the total cache has had 4 misses, not 9.
* The same goes for evictions. Other stats values add normally.
*
* This means for misses and evictions, if we are incrementing for the on-heap tier and the disk tier is present,
* we have to increment only the leaf nodes corresponding to the on-heap tier itself, and not its ancestors,
* which correspond to totals including both tiers. If the disk tier is not present, we do increment the ancestor nodes.
*/
public class TieredSpilloverCacheStatsHolder extends DefaultCacheStatsHolder {

/** Whether the disk cache is currently enabled. */
private boolean diskCacheEnabled;

// Common values used for tier dimension

/** The name for the tier dimension. */
public static final String TIER_DIMENSION_NAME = "tier";

/** Dimension value for on-heap cache, like OpenSearchOnHeapCache.*/
public static final String TIER_DIMENSION_VALUE_ON_HEAP = "on_heap";

/** Dimension value for on-disk cache, like EhcacheDiskCache. */
public static final String TIER_DIMENSION_VALUE_DISK = "disk";

/**
* Constructor for the stats holder.
* @param originalDimensionNames the original dimension names, not including TIER_DIMENSION_NAME
* @param diskCacheEnabled whether the disk tier starts out enabled
*/
public TieredSpilloverCacheStatsHolder(List<String> originalDimensionNames, boolean diskCacheEnabled) {
super(
getDimensionNamesWithTier(originalDimensionNames),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
);
this.diskCacheEnabled = diskCacheEnabled;
}

private static List<String> getDimensionNamesWithTier(List<String> dimensionNames) {
List<String> dimensionNamesWithTier = new ArrayList<>(dimensionNames);
dimensionNamesWithTier.add(TIER_DIMENSION_NAME);
return dimensionNamesWithTier;
}

/**
* Add tierValue to the end of a copy of the initial dimension values, so they can appropriately be used in this stats holder.
*/
List<String> getDimensionsWithTierValue(List<String> initialDimensions, String tierValue) {
List<String> result = new ArrayList<>(initialDimensions);
result.add(tierValue);
return result;
}

private String validateTierDimensionValue(List<String> dimensionValues) {
String tierDimensionValue = dimensionValues.get(dimensionValues.size() - 1);
assert tierDimensionValue.equals(TIER_DIMENSION_VALUE_ON_HEAP) || tierDimensionValue.equals(TIER_DIMENSION_VALUE_DISK)
: "Invalid tier dimension value";
return tierDimensionValue;
}

@Override
public void incrementHits(List<String> dimensionValues) {
validateTierDimensionValue(dimensionValues);
// Hits from either tier should be included in the total values.
super.incrementHits(dimensionValues);
}

@Override
public void incrementMisses(List<String> dimensionValues) {
final String tierValue = validateTierDimensionValue(dimensionValues);

// If the disk tier is present, only misses from the disk tier should be included in total values.
Consumer<Node> missIncrementer = (node) -> {
if (tierValue.equals(TIER_DIMENSION_VALUE_ON_HEAP) && diskCacheEnabled) {
// If on-heap tier, increment only the leaf node corresponding to the on heap values; not the total values in its parent
// nodes
if (node.isAtLowestLevel()) {
node.incrementMisses();
}
} else {
// If disk tier, or on-heap tier with a disabled disk tier, increment the leaf node and its parents
node.incrementMisses();
}
};
internalIncrement(dimensionValues, missIncrementer, true);
}

@Override
public void incrementEvictions(List<String> dimensionValues) {
final String tierValue = validateTierDimensionValue(dimensionValues);

// If the disk tier is present, only evictions from the disk tier should be included in total values.
Consumer<DefaultCacheStatsHolder.Node> evictionsIncrementer = (node) -> {
if (tierValue.equals(TIER_DIMENSION_VALUE_ON_HEAP) && diskCacheEnabled) {
// If on-heap tier, increment only the leaf node corresponding to the on heap values; not the total values in its parent
// nodes
if (node.isAtLowestLevel()) {
node.incrementEvictions();
}
} else {
// If disk tier, or on-heap tier with a disabled disk tier, increment the leaf node and its parents
node.incrementEvictions();
}
};
internalIncrement(dimensionValues, evictionsIncrementer, true);
}

@Override
public void incrementSizeInBytes(List<String> dimensionValues, long amountBytes) {
validateTierDimensionValue(dimensionValues);
// Size from either tier should be included in the total values.
super.incrementSizeInBytes(dimensionValues, amountBytes);
}

// For decrements, we should not create nodes if they are absent. This protects us from erroneously decrementing values for keys
// which have been entirely deleted, for example in an async removal listener.
@Override
public void decrementSizeInBytes(List<String> dimensionValues, long amountBytes) {
validateTierDimensionValue(dimensionValues);
// Size from either tier should be included in the total values.
super.decrementSizeInBytes(dimensionValues, amountBytes);
}

@Override
public void incrementItems(List<String> dimensionValues) {
validateTierDimensionValue(dimensionValues);
// Entries from either tier should be included in the total values.
super.incrementItems(dimensionValues);
}

@Override
public void decrementItems(List<String> dimensionValues) {
validateTierDimensionValue(dimensionValues);
// Entries from either tier should be included in the total values.
super.decrementItems(dimensionValues);
}

void setDiskCacheEnabled(boolean diskCacheEnabled) {
this.diskCacheEnabled = diskCacheEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.stats.CacheStatsHolder;
import org.opensearch.common.cache.stats.DefaultCacheStatsHolder;
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
import org.opensearch.common.cache.stats.NoopCacheStatsHolder;
import org.opensearch.common.cache.store.builders.ICacheBuilder;
import org.opensearch.common.cache.store.config.CacheConfig;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -32,12 +36,19 @@ public class MockDiskCache<K, V> implements ICache<K, V> {
long delay;

private final RemovalListener<ICacheKey<K>, V> removalListener;
private final CacheStatsHolder statsHolder; // Only update for number of entries; this is only used to test statsTrackingEnabled logic
// in TSC

public MockDiskCache(int maxSize, long delay, RemovalListener<ICacheKey<K>, V> removalListener) {
public MockDiskCache(int maxSize, long delay, RemovalListener<ICacheKey<K>, V> removalListener, boolean statsTrackingEnabled) {
this.maxSize = maxSize;
this.delay = delay;
this.removalListener = removalListener;
this.cache = new ConcurrentHashMap<ICacheKey<K>, V>();
if (statsTrackingEnabled) {
this.statsHolder = new DefaultCacheStatsHolder(List.of(), "mock_disk_cache");
} else {
this.statsHolder = NoopCacheStatsHolder.getInstance();
}
}

@Override
Expand All @@ -50,13 +61,15 @@ public V get(ICacheKey<K> key) {
public void put(ICacheKey<K> key, V value) {
if (this.cache.size() >= maxSize) { // For simplification
this.removalListener.onRemoval(new RemovalNotification<>(key, value, RemovalReason.EVICTED));
this.statsHolder.decrementItems(List.of());
}
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
this.cache.put(key, value);
this.statsHolder.incrementItems(List.of());
}

@Override
Expand All @@ -73,6 +86,7 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>

@Override
public void invalidate(ICacheKey<K> key) {
removalListener.onRemoval(new RemovalNotification<>(key, cache.get(key), RemovalReason.INVALIDATED));
this.cache.remove(key);
}

Expand All @@ -96,7 +110,9 @@ public void refresh() {}

@Override
public ImmutableCacheStatsHolder stats() {
return null;
// To allow testing of statsTrackingEnabled logic in TSC, return a dummy ImmutableCacheStatsHolder with the
// right number of entries, unless statsTrackingEnabled is false
return statsHolder.getImmutableCacheStatsHolder(null);
}

@Override
Expand All @@ -114,10 +130,12 @@ public static class MockDiskCacheFactory implements Factory {
public static final String NAME = "mockDiskCache";
final long delay;
final int maxSize;
final boolean statsTrackingEnabled;

public MockDiskCacheFactory(long delay, int maxSize) {
public MockDiskCacheFactory(long delay, int maxSize, boolean statsTrackingEnabled) {
this.delay = delay;
this.maxSize = maxSize;
this.statsTrackingEnabled = statsTrackingEnabled;
}

@Override
Expand All @@ -128,6 +146,7 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
.setMaxSize(maxSize)
.setDeliberateDelay(delay)
.setRemovalListener(config.getRemovalListener())
.setStatsTrackingEnabled(config.getStatsTrackingEnabled())
.build();
}

Expand All @@ -146,7 +165,7 @@ public static class Builder<K, V> extends ICacheBuilder<K, V> {

@Override
public ICache<K, V> build() {
return new MockDiskCache<K, V>(this.maxSize, this.delay, this.getRemovalListener());
return new MockDiskCache<K, V>(this.maxSize, this.delay, this.getRemovalListener(), getStatsTrackingEnabled());
}

public Builder<K, V> setMaxSize(int maxSize) {
Expand Down
Loading

0 comments on commit a8017d8

Please sign in to comment.