Skip to content

Commit

Permalink
Allow setting cache size in different units
Browse files Browse the repository at this point in the history
  • Loading branch information
nielsbauman committed Aug 14, 2024
1 parent 6e681bc commit d6963d3
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 27 deletions.
16 changes: 10 additions & 6 deletions docs/reference/ingest/enrich.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,16 @@ enrich policy executor.
The enrich coordinator supports the following node settings:

`enrich.cache_size`::
Maximum number of searches to cache for enriching documents.
By default, instead of looking at the _number_ of searches,
the enrich coordinator looks at the size of the search hits in bytes and
aims to only take up to 1% of the node's max heap space.
There is a single cache for all enrich processors in the cluster. This setting
determines the size of that cache.
Maximum size of the cache that caches searches for enriching documents.
The size can be specified in three units: the raw number of
cached searches (e.g. `1000`), an absolute size in bytes (e.g. `100Mb`),
or a percentage of the max heap space of the node (e.g. `1%`).
Both for the absolute byte size and the percentage of heap space,
{es} does not guarantee that the enrich cache size will adhere exactly to that maximum,
as {es} uses the byte size of the serialized search response
which is is a good representation of the used space on the heap, but not an exact match.
Defaults to `1%`. There is a single cache for all enrich processors in the cluster.
This setting determines the size of that cache.

`enrich.coordinator_proxy.max_concurrent_requests`::
Maximum number of concurrent <<search-multi-search,multi-search requests>> to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1498,11 +1498,7 @@ public static int parseInt(String s, int minValue, int maxValue, String key, boo
return value;
}

public static long parseLong(String s, long minValue, String key) {
return parseLong(s, minValue, key);
}

public static long parseLong(String s, long minValue, String key, boolean isFiltered) {
private static long parseLong(String s, long minValue, String key, boolean isFiltered) {
long value = Long.parseLong(s);
if (value < minValue) {
String err = "Failed to parse value" + (isFiltered ? "" : " [" + s + "]") + " for setting [" + key + "] must be >= " + minValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,13 @@ public final class EnrichCache {
}

private Cache<CacheKey, CacheValue> createCache(long maxWeight, ToLongBiFunction<CacheKey, CacheValue> weigher) {
return CacheBuilder.<CacheKey, CacheValue>builder().setMaximumWeight(maxWeight).removalListener(notification -> {
var builder = CacheBuilder.<CacheKey, CacheValue>builder().setMaximumWeight(maxWeight).removalListener(notification -> {
sizeInBytes.getAndAdd(-1 * notification.getValue().sizeInBytes);
}).weigher(weigher).build();
});
if (weigher != null) {
builder.weigher(weigher);
}
return builder.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.MemorySizeValue;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.indices.SystemIndexDescriptor;
Expand Down Expand Up @@ -123,27 +125,28 @@ public class EnrichPlugin extends Plugin implements SystemIndexPlugin, IngestPlu
return String.valueOf(maxConcurrentRequests * maxLookupsPerRequest);
}, val -> Setting.parseInt(val, 1, Integer.MAX_VALUE, QUEUE_CAPACITY_SETTING_NAME), Setting.Property.NodeScope);

public static final Setting<Long> CACHE_SIZE = new Setting<>("enrich.cache_size", (String) null, (String s) -> {
if (s == null) {
return null;
}
return Setting.parseLong(s, 0, "enrich.cache_size");
}, Setting.Property.NodeScope);
public static final String CACHE_SIZE_SETTING_NAME = "enrich.cache.size";
public static final Setting<FlatNumberOrByteSizeValue> CACHE_SIZE = new Setting<>(
"enrich.cache.size",
(String) null,
(String s) -> FlatNumberOrByteSizeValue.parse(
s,
CACHE_SIZE_SETTING_NAME,
new FlatNumberOrByteSizeValue(ByteSizeValue.ofBytes((long) (0.01 * JvmInfo.jvmInfo().getConfiguredMaxHeapSize())))
),
Setting.Property.NodeScope
);

private final Settings settings;
private final EnrichCache enrichCache;

public EnrichPlugin(final Settings settings) {
this.settings = settings;
Long maxSize = CACHE_SIZE.get(settings);
long maxHeapSize = JvmInfo.jvmInfo().getConfiguredMaxHeapSize();
// If we were unable to read the max heap size, we fall back to 1000 entries.
if (maxSize == null && maxHeapSize > 0) {
var maxByteSize = (long) (maxHeapSize * 0.01);
this.enrichCache = new EnrichCache(ByteSizeValue.ofBytes(maxByteSize));
FlatNumberOrByteSizeValue maxSize = CACHE_SIZE.get(settings);
if (maxSize.byteSizeValue() != null) {
this.enrichCache = new EnrichCache(maxSize.byteSizeValue());
} else {
maxSize = maxSize == null ? 1000 : maxSize;
this.enrichCache = new EnrichCache(maxSize);
this.enrichCache = new EnrichCache(maxSize.flatNumber());
}
}

Expand Down Expand Up @@ -281,4 +284,45 @@ public String getFeatureName() {
public String getFeatureDescription() {
return "Manages data related to Enrich policies";
}

/**
* A class that specifies either a flat (unit-less) number or a byte size value.
*/
public static class FlatNumberOrByteSizeValue {

@Nullable
private final Long flatNumber;
@Nullable
private final ByteSizeValue byteSizeValue;

public FlatNumberOrByteSizeValue(ByteSizeValue byteSizeValue) {
this.byteSizeValue = byteSizeValue;
this.flatNumber = null;
}

public FlatNumberOrByteSizeValue(Long flatNumber) {
this.flatNumber = flatNumber;
this.byteSizeValue = null;
}

public static FlatNumberOrByteSizeValue parse(String value, String settingName, FlatNumberOrByteSizeValue defaultValue) {
if (value == null) {
return defaultValue;
}
if (value.endsWith("%") || value.endsWith("b") | value.endsWith("B")) {
return new FlatNumberOrByteSizeValue(MemorySizeValue.parseBytesSizeValueOrHeapRatio(value, settingName));
}
return new FlatNumberOrByteSizeValue(Long.parseLong(value));
}

@Nullable
public ByteSizeValue byteSizeValue() {
return byteSizeValue;
}

@Nullable
public Long flatNumber() {
return flatNumber;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.enrich;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.test.ESTestCase;

import static org.elasticsearch.xpack.enrich.EnrichPlugin.FlatNumberOrByteSizeValue;

public class FlatNumberOrByteSizeValueTests extends ESTestCase {

private static final String SETTING_NAME = "test.setting";

public void testParse() {
assertEquals(new FlatNumberOrByteSizeValue(7L), FlatNumberOrByteSizeValue.parse("7", SETTING_NAME, null));
assertEquals(new FlatNumberOrByteSizeValue(ByteSizeValue.ofGb(2)), FlatNumberOrByteSizeValue.parse("2GB", SETTING_NAME, null));
assertEquals(
new FlatNumberOrByteSizeValue(ByteSizeValue.ofBytes((long) (0.05 * JvmInfo.jvmInfo().getConfiguredMaxHeapSize()))),
FlatNumberOrByteSizeValue.parse("5%", SETTING_NAME, null)
);
assertEquals(
new FlatNumberOrByteSizeValue(3L),
FlatNumberOrByteSizeValue.parse(null, SETTING_NAME, new FlatNumberOrByteSizeValue(3L))
);
assertThrows(ElasticsearchParseException.class, () -> FlatNumberOrByteSizeValue.parse("5GB%", SETTING_NAME, null));
assertThrows(ElasticsearchParseException.class, () -> FlatNumberOrByteSizeValue.parse("5%GB", SETTING_NAME, null));
assertThrows(NumberFormatException.class, () -> FlatNumberOrByteSizeValue.parse("5GBX", SETTING_NAME, null));
}

private void assertEquals(FlatNumberOrByteSizeValue expected, FlatNumberOrByteSizeValue actual) {
assertEquals(expected.byteSizeValue(), actual.byteSizeValue());
assertEquals(expected.flatNumber(), actual.flatNumber());
}
}

0 comments on commit d6963d3

Please sign in to comment.