diff --git a/libs/core/src/main/java/org/opensearch/core/common/bytes/BytesReference.java b/libs/core/src/main/java/org/opensearch/core/common/bytes/BytesReference.java index e07aca89339b9..f349baeef3cd7 100644 --- a/libs/core/src/main/java/org/opensearch/core/common/bytes/BytesReference.java +++ b/libs/core/src/main/java/org/opensearch/core/common/bytes/BytesReference.java @@ -55,7 +55,7 @@ * @opensearch.api */ @PublicApi(since = "1.0.0") -public interface BytesReference extends Comparable, ToXContentFragment { +public interface BytesReference extends Comparable, ToXContentFragment, Serializable { /** * Convert an {@link XContentBuilder} into a BytesReference. This method closes the builder, diff --git a/server/src/main/java/org/opensearch/indices/CachingTier.java b/server/src/main/java/org/opensearch/indices/CachingTier.java index 8c0dc0936f9dc..6726167fe469d 100644 --- a/server/src/main/java/org/opensearch/indices/CachingTier.java +++ b/server/src/main/java/org/opensearch/indices/CachingTier.java @@ -19,13 +19,13 @@ */ public interface CachingTier { - V get(K key) throws IOException; + V get(K key); - void put(K key, V value) throws IOException; + void put(K key, V value); V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception; - void invalidate(K key) throws IOException; + void invalidate(K key); V compute(K key, TieredCacheLoader loader) throws Exception; diff --git a/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java b/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java index 4a34fea847981..3fe9873e1ab80 100644 --- a/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java +++ b/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java @@ -31,6 +31,7 @@ import org.opensearch.common.metrics.CounterMetric; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.BytesStreamInput; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; @@ -40,6 +41,7 @@ import java.util.Collections; public class EhcacheDiskCachingTier implements DiskCachingTier, RemovalListener { + // & Writeable.Reader ? private final PersistentCacheManager cacheManager; private final Cache cache; @@ -108,12 +110,16 @@ public EhcacheDiskCachingTier( } @Override - public V get(K key) throws IOException { + public V get(K key) { // I don't think we need to do the future stuff as the cache is threadsafe // if (keystore.contains(key.hashCode()) { long now = System.nanoTime(); - V value = cache.get(new EhcacheKey(key)); + V value = null; + try { + value = cache.get(new EhcacheKey(key)); + } catch (IOException ignored) { // do smth with this later + } double tookTimeMillis = ((double) (System.nanoTime() - now)) / 1000000; getTimeMillisEWMA.addValue(tookTimeMillis); return value; @@ -122,12 +128,15 @@ public V get(K key) throws IOException { } @Override - public void put(K key, V value) throws IOException { + public void put(K key, V value) { // No need to get old value, this is handled by EhcacheEventListener. // CheckDataResult policyResult = policy.checkData(value) // if (policyResult.isAccepted()) { - cache.put(new EhcacheKey(key), value); + try { + cache.put(new EhcacheKey(key), value); + } catch (IOException ignored) { // do smth with this later + } // keystore.add(key.hashCode()); // else { do something with policyResult.deniedReason()? } // } @@ -139,12 +148,15 @@ public V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception } @Override - public void invalidate(K key) throws IOException { + public void invalidate(K key) { // keep keystore check to avoid unneeded disk seek // RemovalNotification is handled by EhcacheEventListener // if (keystore.contains(key.hashCode()) { - cache.remove(new EhcacheKey(key)); + try { + cache.remove(new EhcacheKey(key)); + } catch (IOException ignored) { // do smth with this later + } // keystore.remove(key.hashCode()); // } } @@ -207,6 +219,16 @@ public K convertEhcacheKeyToOriginal(EhcacheKey eKey) throws IOException { is.readBytes(bytes, 0, bytes.length); // we somehow have to use the Reader thing in the Writeable interface // otherwise its not generic + try { + K key = keyType.getDeclaredConstructor(new Class[]{StreamInput.class}).newInstance(); + // This cannot be the proper way to do it + // but if it is, make K extend some interface guaranteeing it has such a constructor (which im sure already exists somewhere in OS) + return key; + } catch (Exception e) { + System.out.println("Was unable to reconstruct EhcacheKey into K"); + e.printStackTrace(); + } + return null; } @Override diff --git a/server/src/main/java/org/opensearch/indices/EhcacheEventListener.java b/server/src/main/java/org/opensearch/indices/EhcacheEventListener.java index 6c355dba025e1..7ad8718222925 100644 --- a/server/src/main/java/org/opensearch/indices/EhcacheEventListener.java +++ b/server/src/main/java/org/opensearch/indices/EhcacheEventListener.java @@ -26,7 +26,7 @@ public class EhcacheEventListener implements CacheEventL // Receives key-value pairs (BytesReference, BytesReference), but must transform into (Key, BytesReference) // to send removal notifications private RemovalListener removalListener; - private EhcacheDiskCachingTier tier; + private EhcacheDiskCachingTier tier; EhcacheEventListener(RemovalListener removalListener, EhcacheDiskCachingTier tier) { this.removalListener = removalListener; this.tier = tier; // needed to handle count changes diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 6caa83768a5eb..e55ce6ae73bb8 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -294,7 +294,7 @@ interface CacheEntity extends Accountable, Writeable { * * @opensearch.internal */ - class Key implements Accountable, Writeable { + class Key implements Accountable, Writeable, Writeable.Reader { private final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); public final CacheEntity entity; // use as identity equality @@ -349,6 +349,11 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(readerCacheKeyUniqueId); out.writeBytesReference(value); } + + @Override + public Key read(StreamInput in) throws IOException { + return new Key(in); + } } private class CleanupKey implements IndexReader.ClosedListener { diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 87d8495a136d4..6ef23c7bb0790 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -181,10 +181,10 @@ public void testAddDirectToEhcache() throws Exception { cache.closeDiskTier(); } - public void testSimpleEhcache() throws Exception { + /*public void testSimpleEhcache() throws Exception { // for debug only, delete - CounterMetric count = new CounterMetric(); - String cacheAlias = "dummy"; + CounterMetric count = new CounterMetric(); + String cacheAlias = "dummy"; class DummyRemovalListener implements RemovalListener { public DummyRemovalListener() { } @@ -275,10 +275,11 @@ public void onRemoval(RemovalNotification notification) { IndicesRequestCache.Key key = new IndicesRequestCache.Key(entity, reader.getReaderCacheHelper().getKey(), termBytes);*/ - cacheManager.removeCache(cacheAlias); + /*cacheManager.removeCache(cacheAlias); cacheManager.close(); //IOUtils.close(reader, writer, dir); - } + + }*/ public void testSpillover() throws Exception { // fill the on-heap cache until we spill over