Skip to content

Commit

Permalink
Integrates dummy stats holder with caches
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Alfonsi <[email protected]>
  • Loading branch information
Peter Alfonsi committed Apr 16, 2024
1 parent 5ba6e47 commit bd0c2e5
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@
import org.opensearch.common.cache.serializer.ICacheKeySerializer;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.stats.CacheStatsHolder;
import org.opensearch.common.cache.stats.CacheStatsHolderInterface;
import org.opensearch.common.cache.stats.DummyCacheStatsHolder;
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
import org.opensearch.common.cache.store.builders.ICacheBuilder;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.io.IOUtils;

import java.io.File;
Expand Down Expand Up @@ -113,7 +116,7 @@ public class EhcacheDiskCache<K, V> implements ICache<K, V> {
private final Class<K> keyType;
private final Class<V> valueType;
private final TimeValue expireAfterAccess;
private final CacheStatsHolder cacheStatsHolder;
private final CacheStatsHolderInterface cacheStatsHolder;
private final EhCacheEventListener ehCacheEventListener;
private final String threadPoolAlias;
private final Settings settings;
Expand Down Expand Up @@ -162,7 +165,12 @@ private EhcacheDiskCache(Builder<K, V> builder) {
this.ehCacheEventListener = new EhCacheEventListener(builder.getRemovalListener(), builder.getWeigher());
this.cache = buildCache(Duration.ofMillis(expireAfterAccess.getMillis()), builder);
List<String> dimensionNames = Objects.requireNonNull(builder.dimensionNames, "Dimension names can't be null");
this.cacheStatsHolder = new CacheStatsHolder(dimensionNames);

if (FeatureFlags.PLUGGABLE_CACHE_SETTING.get(builder.getSettings())) {
this.cacheStatsHolder = new CacheStatsHolder(dimensionNames);
} else {
this.cacheStatsHolder = new DummyCacheStatsHolder(dimensionNames);
}
}

@SuppressWarnings({ "rawtypes" })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.bytes.CompositeBytesReference;
Expand Down Expand Up @@ -59,7 +60,7 @@ public class EhCacheDiskCacheTests extends OpenSearchSingleNodeTestCase {
private final String dimensionName = "shardId";

public void testBasicGetAndPut() throws IOException {
Settings settings = Settings.builder().build();
Settings settings = getSettings(true);
MockRemovalListener<String, String> removalListener = new MockRemovalListener<>();
ToLongBiFunction<ICacheKey<String>, String> weigher = getWeigher();
try (NodeEnvironment env = newNodeEnvironment(settings)) {
Expand Down Expand Up @@ -142,6 +143,7 @@ public void testBasicGetAndPutUsingFactory() throws IOException {
.getKey(),
true
)
.put(FeatureFlags.PLUGGABLE_CACHE, true)
.build()
)
.build(),
Expand Down Expand Up @@ -173,7 +175,7 @@ public void testBasicGetAndPutUsingFactory() throws IOException {
}

public void testConcurrentPut() throws Exception {
Settings settings = Settings.builder().build();
Settings settings = getSettings(true);
MockRemovalListener<String, String> removalListener = new MockRemovalListener<>();
try (NodeEnvironment env = newNodeEnvironment(settings)) {
ICache<String, String> ehcacheTest = new EhcacheDiskCache.Builder<String, String>().setDiskCacheAlias("test1")
Expand Down Expand Up @@ -223,7 +225,7 @@ public void testConcurrentPut() throws Exception {
}

public void testEhcacheParallelGets() throws Exception {
Settings settings = Settings.builder().build();
Settings settings = getSettings(true);
MockRemovalListener<String, String> removalListener = new MockRemovalListener<>();
try (NodeEnvironment env = newNodeEnvironment(settings)) {
ICache<String, String> ehcacheTest = new EhcacheDiskCache.Builder<String, String>().setDiskCacheAlias("test1")
Expand Down Expand Up @@ -272,7 +274,7 @@ public void testEhcacheParallelGets() throws Exception {
}

public void testEhcacheKeyIterator() throws Exception {
Settings settings = Settings.builder().build();
Settings settings = getSettings(true);
try (NodeEnvironment env = newNodeEnvironment(settings)) {
ICache<String, String> ehcacheTest = new EhcacheDiskCache.Builder<String, String>().setDiskCacheAlias("test1")
.setThreadPoolAlias("ehcacheTest")
Expand Down Expand Up @@ -312,7 +314,7 @@ public void testEhcacheKeyIterator() throws Exception {
}

public void testEvictions() throws Exception {
Settings settings = Settings.builder().build();
Settings settings = getSettings(true);
MockRemovalListener<String, String> removalListener = new MockRemovalListener<>();
ToLongBiFunction<ICacheKey<String>, String> weigher = getWeigher();
try (NodeEnvironment env = newNodeEnvironment(settings)) {
Expand Down Expand Up @@ -348,7 +350,7 @@ public void testEvictions() throws Exception {
}

public void testComputeIfAbsentConcurrently() throws Exception {
Settings settings = Settings.builder().build();
Settings settings = getSettings(true);
MockRemovalListener<String, String> removalListener = new MockRemovalListener<>();
try (NodeEnvironment env = newNodeEnvironment(settings)) {
ICache<String, String> ehcacheTest = new EhcacheDiskCache.Builder<String, String>().setDiskCacheAlias("test1")
Expand Down Expand Up @@ -424,7 +426,7 @@ public String load(ICacheKey<String> key) {
}

public void testComputeIfAbsentConcurrentlyAndThrowsException() throws Exception {
Settings settings = Settings.builder().build();
Settings settings = getSettings(true);
MockRemovalListener<String, String> removalListener = new MockRemovalListener<>();
try (NodeEnvironment env = newNodeEnvironment(settings)) {
ICache<String, String> ehcacheTest = new EhcacheDiskCache.Builder<String, String>().setDiskCacheAlias("test1")
Expand Down Expand Up @@ -485,7 +487,7 @@ public String load(ICacheKey<String> key) throws Exception {
}

public void testComputeIfAbsentWithNullValueLoading() throws Exception {
Settings settings = Settings.builder().build();
Settings settings = getSettings(true);
MockRemovalListener<String, String> removalListener = new MockRemovalListener<>();
try (NodeEnvironment env = newNodeEnvironment(settings)) {
ICache<String, String> ehcacheTest = new EhcacheDiskCache.Builder<String, String>().setDiskCacheAlias("test1")
Expand Down Expand Up @@ -552,7 +554,7 @@ public String load(ICacheKey<String> key) throws Exception {

public void testMemoryTracking() throws Exception {
// Test all cases for EhCacheEventListener.onEvent and check stats memory usage is updated correctly
Settings settings = Settings.builder().build();
Settings settings = getSettings(true);
ToLongBiFunction<ICacheKey<String>, String> weigher = getWeigher();
int initialKeyLength = 40;
int initialValueLength = 40;
Expand Down Expand Up @@ -626,7 +628,7 @@ public void testMemoryTracking() throws Exception {
}

public void testEhcacheKeyIteratorWithRemove() throws IOException {
Settings settings = Settings.builder().build();
Settings settings = getSettings(true);
try (NodeEnvironment env = newNodeEnvironment(settings)) {
ICache<String, String> ehcacheTest = new EhcacheDiskCache.Builder<String, String>().setDiskCacheAlias("test1")
.setThreadPoolAlias("ehcacheTest")
Expand Down Expand Up @@ -673,7 +675,7 @@ public void testEhcacheKeyIteratorWithRemove() throws IOException {
}

public void testInvalidateAll() throws Exception {
Settings settings = Settings.builder().build();
Settings settings = getSettings(true);
MockRemovalListener<String, String> removalListener = new MockRemovalListener<>();
try (NodeEnvironment env = newNodeEnvironment(settings)) {
ICache<String, String> ehcacheTest = new EhcacheDiskCache.Builder<String, String>().setThreadPoolAlias("ehcacheTest")
Expand Down Expand Up @@ -710,7 +712,7 @@ public void testInvalidateAll() throws Exception {
}

public void testBasicGetAndPutBytesReference() throws Exception {
Settings settings = Settings.builder().build();
Settings settings = getSettings(true);
try (NodeEnvironment env = newNodeEnvironment(settings)) {
ICache<String, BytesReference> ehCacheDiskCachingTier = new EhcacheDiskCache.Builder<String, BytesReference>()
.setThreadPoolAlias("ehcacheTest")
Expand Down Expand Up @@ -756,7 +758,7 @@ public void testBasicGetAndPutBytesReference() throws Exception {
}

public void testInvalidate() throws Exception {
Settings settings = Settings.builder().build();
Settings settings = getSettings(true);
MockRemovalListener<String, String> removalListener = new MockRemovalListener<>();
try (NodeEnvironment env = newNodeEnvironment(settings)) {
ICache<String, String> ehcacheTest = new EhcacheDiskCache.Builder<String, String>().setThreadPoolAlias("ehcacheTest")
Expand Down Expand Up @@ -800,7 +802,7 @@ public void testInvalidate() throws Exception {

// Modified from OpenSearchOnHeapCacheTests.java
public void testInvalidateWithDropDimensions() throws Exception {
Settings settings = Settings.builder().build();
Settings settings = getSettings(true);
List<String> dimensionNames = List.of("dim1", "dim2");
try (NodeEnvironment env = newNodeEnvironment(settings)) {
ICache<String, String> ehCacheDiskCachingTier = new EhcacheDiskCache.Builder<String, String>().setThreadPoolAlias("ehcacheTest")
Expand Down Expand Up @@ -849,6 +851,37 @@ public void testInvalidateWithDropDimensions() throws Exception {
}
}

public void testStatsWithoutPluggableCaches() throws Exception {
Settings settings = getSettings(false);
MockRemovalListener<String, String> removalListener = new MockRemovalListener<>();
ToLongBiFunction<ICacheKey<String>, String> weigher = getWeigher();
try (NodeEnvironment env = newNodeEnvironment(settings)) {
ICache<String, String> ehcacheTest = new EhcacheDiskCache.Builder<String, String>().setThreadPoolAlias("ehcacheTest")
.setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache")
.setIsEventListenerModeSync(true)
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setDimensionNames(List.of(dimensionName))
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
.setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES)
.setRemovalListener(removalListener)
.setWeigher(weigher)
.build();
int randomKeys = randomIntBetween(10, 100);
for (int i = 0; i < randomKeys; i++) {
ICacheKey<String> iCacheKey = getICacheKey(UUID.randomUUID().toString());
ehcacheTest.put(iCacheKey, UUID.randomUUID().toString());
assertEquals(i + 1, ehcacheTest.count());
assertEquals(new ImmutableCacheStats(0, 0, 0, 0, 0), ehcacheTest.stats().getTotalStats());
}
ehcacheTest.close();
}
}

private List<String> getRandomDimensions(List<String> dimensionNames) {
Random rand = Randomness.get();
int bound = 3;
Expand Down Expand Up @@ -893,6 +926,10 @@ private ToLongBiFunction<ICacheKey<String>, String> getWeigher() {
};
}

private Settings getSettings(boolean pluggableCaches) {
return Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, pluggableCaches).build();
}

static class MockRemovalListener<K, V> implements RemovalListener<ICacheKey<K>, V> {

CounterMetric evictionMetric = new CounterMetric();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.stats.CacheStatsHolder;
import org.opensearch.common.cache.stats.CacheStatsHolderInterface;
import org.opensearch.common.cache.stats.DummyCacheStatsHolder;
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
import org.opensearch.common.cache.store.builders.ICacheBuilder;
import org.opensearch.common.cache.store.config.CacheConfig;
Expand Down Expand Up @@ -47,7 +49,7 @@
public class OpenSearchOnHeapCache<K, V> implements ICache<K, V>, RemovalListener<ICacheKey<K>, V> {

private final Cache<ICacheKey<K>, V> cache;
private final CacheStatsHolder cacheStatsHolder;
private final CacheStatsHolderInterface cacheStatsHolder;
private final RemovalListener<ICacheKey<K>, V> removalListener;
private final List<String> dimensionNames;
private final ToLongBiFunction<ICacheKey<K>, V> weigher;
Expand All @@ -62,7 +64,11 @@ public OpenSearchOnHeapCache(Builder<K, V> builder) {
}
cache = cacheBuilder.build();
this.dimensionNames = Objects.requireNonNull(builder.dimensionNames, "Dimension names can't be null");
this.cacheStatsHolder = new CacheStatsHolder(dimensionNames);
if (FeatureFlags.PLUGGABLE_CACHE_SETTING.get(builder.getSettings())) {
this.cacheStatsHolder = new CacheStatsHolder(dimensionNames);
} else {
this.cacheStatsHolder = new DummyCacheStatsHolder(dimensionNames);
}
this.removalListener = builder.getRemovalListener();
this.weigher = builder.getWeigher();
}
Expand Down Expand Up @@ -167,6 +173,7 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
.setMaximumWeightInBytes(((ByteSizeValue) settingList.get(MAXIMUM_SIZE_IN_BYTES_KEY).get(settings)).getBytes())
.setExpireAfterAccess(((TimeValue) settingList.get(EXPIRE_AFTER_ACCESS_KEY).get(settings)))
.setWeigher(config.getWeigher())
.setSettings(config.getSettings())
.setRemovalListener(config.getRemovalListener());
Setting<String> cacheSettingForCacheType = CacheSettings.CACHE_TYPE_STORE_NAME.getConcreteSettingForNamespace(
cacheType.getSettingPrefix()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.stats.ImmutableCacheStats;
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.test.OpenSearchTestCase;

import java.util.ArrayList;
Expand All @@ -37,7 +39,9 @@ public void testStats() throws Exception {
MockRemovalListener<String, String> listener = new MockRemovalListener<>();
int maxKeys = between(10, 50);
int numEvicted = between(10, 20);
OpenSearchOnHeapCache<String, String> cache = getCache(maxKeys, listener);
OpenSearchOnHeapCache<String, String> cache = getCache(maxKeys, listener, true);

// When the pluggable caches setting is on, we should get stats as expected from cache.stats().

List<ICacheKey<String>> keysAdded = new ArrayList<>();
int numAdded = maxKeys + numEvicted;
Expand Down Expand Up @@ -77,7 +81,34 @@ public void testStats() throws Exception {
}
}

private OpenSearchOnHeapCache<String, String> getCache(int maxSizeKeys, MockRemovalListener<String, String> listener) {
public void testStatsWithoutPluggableCaches() throws Exception {
// When the pluggable caches setting is off, we should get all-zero stats from cache.stats(), but count() should still work.
MockRemovalListener<String, String> listener = new MockRemovalListener<>();
int maxKeys = between(10, 50);
int numEvicted = between(10, 20);
OpenSearchOnHeapCache<String, String> cache = getCache(maxKeys, listener, false);

List<ICacheKey<String>> keysAdded = new ArrayList<>();
int numAdded = maxKeys + numEvicted;
for (int i = 0; i < numAdded; i++) {
ICacheKey<String> key = getICacheKey(UUID.randomUUID().toString());
keysAdded.add(key);
cache.computeIfAbsent(key, getLoadAwareCacheLoader());

assertEquals(Math.min(maxKeys, i + 1), cache.count());
assertZeroStats(cache.stats());
}
}

private void assertZeroStats(ImmutableCacheStatsHolder stats) {
assertEquals(new ImmutableCacheStats(0, 0, 0, 0, 0), stats.getTotalStats());
}

private OpenSearchOnHeapCache<String, String> getCache(
int maxSizeKeys,
MockRemovalListener<String, String> listener,
boolean pluggableCachesSetting
) {
ICache.Factory onHeapCacheFactory = new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory();
Settings settings = Settings.builder()
.put(
Expand All @@ -86,6 +117,7 @@ private OpenSearchOnHeapCache<String, String> getCache(int maxSizeKeys, MockRemo
.getKey(),
maxSizeKeys * keyValueSize + "b"
)
.put(FeatureFlags.PLUGGABLE_CACHE, pluggableCachesSetting)
.build();

CacheConfig<String, String> cacheConfig = new CacheConfig.Builder<String, String>().setKeyType(String.class)
Expand All @@ -102,7 +134,7 @@ private OpenSearchOnHeapCache<String, String> getCache(int maxSizeKeys, MockRemo
public void testInvalidateWithDropDimensions() throws Exception {
MockRemovalListener<String, String> listener = new MockRemovalListener<>();
int maxKeys = 50;
OpenSearchOnHeapCache<String, String> cache = getCache(maxKeys, listener);
OpenSearchOnHeapCache<String, String> cache = getCache(maxKeys, listener, true);

List<ICacheKey<String>> keysAdded = new ArrayList<>();

Expand Down

0 comments on commit bd0c2e5

Please sign in to comment.