Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make enrich cache based on memory usage #111412

Merged
merged 10 commits into from
Aug 23, 2024
5 changes: 4 additions & 1 deletion docs/reference/ingest/enrich.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,10 @@ enrich policy executor.
The enrich coordinator supports the following node settings:

`enrich.cache_size`::
Maximum number of searches to cache for enriching documents. Defaults to `1000`.
Maximum number of searches to cache for enriching documents.
By default, instead of looking at the _number_ of searches,
the enrich coordinator looks at the size of the search hits in bytes and
aims to only take up to 1% of the node's max heap space.
nielsbauman marked this conversation as resolved.
Show resolved Hide resolved
There is a single cache for all enrich processors in the cluster. This setting
determines the size of that cache.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1498,7 +1498,11 @@ public static int parseInt(String s, int minValue, int maxValue, String key, boo
return value;
}

static long parseLong(String s, long minValue, String key, boolean isFiltered) {
public static long parseLong(String s, long minValue, String key) {
return parseLong(s, minValue, key);
nielsbauman marked this conversation as resolved.
Show resolved Hide resolved
}

public static long parseLong(String s, long minValue, String key, boolean isFiltered) {
long value = Long.parseLong(s);
if (value < minValue) {
String err = "Failed to parse value" + (isFiltered ? "" : " [" + s + "]") + " for setting [" + key + "] must be >= " + minValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
Expand All @@ -29,6 +30,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;
import java.util.function.ToLongBiFunction;

/**
* A simple cache for enrich that uses {@link Cache}. There is one instance of this cache and
Expand Down Expand Up @@ -61,12 +63,25 @@ public final class EnrichCache {
this(maxSize, System::nanoTime);
}

EnrichCache(ByteSizeValue maxByteSize) {
this(maxByteSize, System::nanoTime);
}

// non-private for unit testing only
EnrichCache(long maxSize, LongSupplier relativeNanoTimeProvider) {
this.relativeNanoTimeProvider = relativeNanoTimeProvider;
this.cache = CacheBuilder.<CacheKey, CacheValue>builder().setMaximumWeight(maxSize).removalListener(notification -> {
this.cache = createCache(maxSize, null);
}

EnrichCache(ByteSizeValue maxByteSize, LongSupplier relativeNanoTimeProvider) {
this.relativeNanoTimeProvider = relativeNanoTimeProvider;
this.cache = createCache(maxByteSize.getBytes(), (key, value) -> value.sizeInBytes);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't add any explicit tests for the default case (i.e. where we configure a number of bytes instead of a flat document count) because I think that's pretty much covered by all test clusters that don't explicitly configure an enrich cache size. Let me know if people have any other thoughts.


private Cache<CacheKey, CacheValue> createCache(long maxWeight, ToLongBiFunction<CacheKey, CacheValue> weigher) {
return CacheBuilder.<CacheKey, CacheValue>builder().setMaximumWeight(maxWeight).removalListener(notification -> {
sizeInBytes.getAndAdd(-1 * notification.getValue().sizeInBytes);
}).build();
}).weigher(weigher).build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
Expand Down Expand Up @@ -121,14 +123,28 @@ public class EnrichPlugin extends Plugin implements SystemIndexPlugin, IngestPlu
return String.valueOf(maxConcurrentRequests * maxLookupsPerRequest);
}, val -> Setting.parseInt(val, 1, Integer.MAX_VALUE, QUEUE_CAPACITY_SETTING_NAME), Setting.Property.NodeScope);

public static final Setting<Long> CACHE_SIZE = Setting.longSetting("enrich.cache_size", 1000, 0, Setting.Property.NodeScope);
public static final Setting<Long> CACHE_SIZE = new Setting<>("enrich.cache_size", (String) null, (String s) -> {
if (s == null) {
return null;
}
return Setting.parseLong(s, 0, "enrich.cache_size");
}, Setting.Property.NodeScope);
nielsbauman marked this conversation as resolved.
Show resolved Hide resolved

private final Settings settings;
private final EnrichCache enrichCache;

public EnrichPlugin(final Settings settings) {
this.settings = settings;
this.enrichCache = new EnrichCache(CACHE_SIZE.get(settings));
Long maxSize = CACHE_SIZE.get(settings);
long maxHeapSize = JvmInfo.jvmInfo().getConfiguredMaxHeapSize();
// If we were unable to read the max heap size, we fall back to 1000 entries.
if (maxSize == null && maxHeapSize > 0) {
var maxByteSize = (long) (maxHeapSize * 0.01);
this.enrichCache = new EnrichCache(ByteSizeValue.ofBytes(maxByteSize));
} else {
maxSize = maxSize == null ? 1000 : maxSize;
this.enrichCache = new EnrichCache(maxSize);
}
}

@Override
Expand Down
Loading