Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make enrich cache based on memory usage #111412

Merged
merged 10 commits into from
Aug 23, 2024
6 changes: 6 additions & 0 deletions docs/changelog/111412.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 111412
summary: Make enrich cache based on memory usage
area: Ingest Node
type: enhancement
issues:
- 106081
13 changes: 10 additions & 3 deletions docs/reference/ingest/enrich.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,16 @@ enrich policy executor.
The enrich coordinator supports the following node settings:

`enrich.cache_size`::
Maximum number of searches to cache for enriching documents. Defaults to `1000`.
There is a single cache for all enrich processors in the cluster. This setting
determines the size of that cache.
Maximum size of the cache that caches searches for enriching documents.
The size can be specified in three units: the raw number of
cached searches (e.g. `1000`), an absolute size in bytes (e.g. `100Mb`),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we explicitly state here that you need to in include a b (upper or lowercase) for the second unit?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should allow all the same things that ByteSizeValue supports, which would be all the endings in

if (lowerSValue.endsWith("k")) {
return parse(sValue, lowerSValue, "k", ByteSizeUnit.KB, settingName);
} else if (lowerSValue.endsWith("kb")) {
return parse(sValue, lowerSValue, "kb", ByteSizeUnit.KB, settingName);
} else if (lowerSValue.endsWith("m")) {
return parse(sValue, lowerSValue, "m", ByteSizeUnit.MB, settingName);
} else if (lowerSValue.endsWith("mb")) {
return parse(sValue, lowerSValue, "mb", ByteSizeUnit.MB, settingName);
} else if (lowerSValue.endsWith("g")) {
return parse(sValue, lowerSValue, "g", ByteSizeUnit.GB, settingName);
} else if (lowerSValue.endsWith("gb")) {
return parse(sValue, lowerSValue, "gb", ByteSizeUnit.GB, settingName);
} else if (lowerSValue.endsWith("t")) {
return parse(sValue, lowerSValue, "t", ByteSizeUnit.TB, settingName);
} else if (lowerSValue.endsWith("tb")) {
return parse(sValue, lowerSValue, "tb", ByteSizeUnit.TB, settingName);
} else if (lowerSValue.endsWith("p")) {
return parse(sValue, lowerSValue, "p", ByteSizeUnit.PB, settingName);
} else if (lowerSValue.endsWith("pb")) {
return parse(sValue, lowerSValue, "pb", ByteSizeUnit.PB, settingName);
} else if (lowerSValue.endsWith("b")) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the logic to handle all of ByteSizeValue's cases. But I was more referring to that if you want to specify bytes, you'll have to add one of those letters -- since specifying a raw number of bytes would be parsed as a flat document count. But that disclaimer might be redundant/overkill.

or a percentage of the max heap space of the node (e.g. `1%`).
Both for the absolute byte size and the percentage of heap space,
{es} does not guarantee that the enrich cache size will adhere exactly to that maximum,
as {es} uses the byte size of the serialized search response
which is is a good representation of the used space on the heap, but not an exact match.
Defaults to `1%`. There is a single cache for all enrich processors in the cluster.
This setting determines the size of that cache.
nielsbauman marked this conversation as resolved.
Show resolved Hide resolved

`enrich.coordinator_proxy.max_concurrent_requests`::
Maximum number of concurrent <<search-multi-search,multi-search requests>> to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
Expand All @@ -29,6 +30,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;
import java.util.function.ToLongBiFunction;

/**
* A simple cache for enrich that uses {@link Cache}. There is one instance of this cache and
Expand Down Expand Up @@ -61,12 +63,29 @@ public final class EnrichCache {
this(maxSize, System::nanoTime);
}

EnrichCache(ByteSizeValue maxByteSize) {
this(maxByteSize, System::nanoTime);
}

// non-private for unit testing only
EnrichCache(long maxSize, LongSupplier relativeNanoTimeProvider) {
this.relativeNanoTimeProvider = relativeNanoTimeProvider;
this.cache = CacheBuilder.<CacheKey, CacheValue>builder().setMaximumWeight(maxSize).removalListener(notification -> {
this.cache = createCache(maxSize, null);
}

EnrichCache(ByteSizeValue maxByteSize, LongSupplier relativeNanoTimeProvider) {
this.relativeNanoTimeProvider = relativeNanoTimeProvider;
this.cache = createCache(maxByteSize.getBytes(), (key, value) -> value.sizeInBytes);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't add any explicit tests for the default case (i.e. where we configure a number of bytes instead of a flat document count) because I think that's pretty much covered by all test clusters that don't explicitly configure an enrich cache size. Let me know if people have any other thoughts.


private Cache<CacheKey, CacheValue> createCache(long maxWeight, ToLongBiFunction<CacheKey, CacheValue> weigher) {
var builder = CacheBuilder.<CacheKey, CacheValue>builder().setMaximumWeight(maxWeight).removalListener(notification -> {
sizeInBytes.getAndAdd(-1 * notification.getValue().sizeInBytes);
}).build();
});
if (weigher != null) {
builder.weigher(weigher);
}
return builder.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.MemorySizeValue;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
Expand Down Expand Up @@ -121,14 +125,29 @@ public class EnrichPlugin extends Plugin implements SystemIndexPlugin, IngestPlu
return String.valueOf(maxConcurrentRequests * maxLookupsPerRequest);
}, val -> Setting.parseInt(val, 1, Integer.MAX_VALUE, QUEUE_CAPACITY_SETTING_NAME), Setting.Property.NodeScope);

public static final Setting<Long> CACHE_SIZE = Setting.longSetting("enrich.cache_size", 1000, 0, Setting.Property.NodeScope);
public static final String CACHE_SIZE_SETTING_NAME = "enrich.cache.size";
public static final Setting<FlatNumberOrByteSizeValue> CACHE_SIZE = new Setting<>(
"enrich.cache.size",
(String) null,
(String s) -> FlatNumberOrByteSizeValue.parse(
s,
CACHE_SIZE_SETTING_NAME,
new FlatNumberOrByteSizeValue(ByteSizeValue.ofBytes((long) (0.01 * JvmInfo.jvmInfo().getConfiguredMaxHeapSize())))
),
Setting.Property.NodeScope
);

private final Settings settings;
private final EnrichCache enrichCache;

public EnrichPlugin(final Settings settings) {
this.settings = settings;
this.enrichCache = new EnrichCache(CACHE_SIZE.get(settings));
FlatNumberOrByteSizeValue maxSize = CACHE_SIZE.get(settings);
if (maxSize.byteSizeValue() != null) {
this.enrichCache = new EnrichCache(maxSize.byteSizeValue());
} else {
this.enrichCache = new EnrichCache(maxSize.flatNumber());
}
}

@Override
Expand Down Expand Up @@ -265,4 +284,45 @@ public String getFeatureName() {
public String getFeatureDescription() {
return "Manages data related to Enrich policies";
}

/**
* A class that specifies either a flat (unit-less) number or a byte size value.
*/
public static class FlatNumberOrByteSizeValue {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any suggestions for this name are welcome 😅 (also it's location if people think it should be a separate class and/or in a different package).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay here. I can't think of a better name either


@Nullable
private final Long flatNumber;
@Nullable
private final ByteSizeValue byteSizeValue;

public FlatNumberOrByteSizeValue(ByteSizeValue byteSizeValue) {
this.byteSizeValue = byteSizeValue;
this.flatNumber = null;
}

public FlatNumberOrByteSizeValue(Long flatNumber) {
this.flatNumber = flatNumber;
this.byteSizeValue = null;
}

public static FlatNumberOrByteSizeValue parse(String value, String settingName, FlatNumberOrByteSizeValue defaultValue) {
if (value == null) {
return defaultValue;
}
if (value.endsWith("%") || value.endsWith("b") | value.endsWith("B")) {
nielsbauman marked this conversation as resolved.
Show resolved Hide resolved
return new FlatNumberOrByteSizeValue(MemorySizeValue.parseBytesSizeValueOrHeapRatio(value, settingName));
}
return new FlatNumberOrByteSizeValue(Long.parseLong(value));
}

@Nullable
public ByteSizeValue byteSizeValue() {
return byteSizeValue;
}

@Nullable
public Long flatNumber() {
return flatNumber;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.enrich;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.test.ESTestCase;

import static org.elasticsearch.xpack.enrich.EnrichPlugin.FlatNumberOrByteSizeValue;

public class FlatNumberOrByteSizeValueTests extends ESTestCase {

private static final String SETTING_NAME = "test.setting";

public void testParse() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I currently only have tests for parsing. Should we also have tests that actually try to verify the behavior of using a byte size value in the cache instead of a flat document count?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be good to add those, it's fine to just expose that in EnrichCache with a getMaxSize() method, and test that configuring it to an absolute byte size value sets it appropriately

assertEquals(new FlatNumberOrByteSizeValue(7L), FlatNumberOrByteSizeValue.parse("7", SETTING_NAME, null));
assertEquals(new FlatNumberOrByteSizeValue(ByteSizeValue.ofGb(2)), FlatNumberOrByteSizeValue.parse("2GB", SETTING_NAME, null));
nielsbauman marked this conversation as resolved.
Show resolved Hide resolved
assertEquals(
new FlatNumberOrByteSizeValue(ByteSizeValue.ofBytes((long) (0.05 * JvmInfo.jvmInfo().getConfiguredMaxHeapSize()))),
FlatNumberOrByteSizeValue.parse("5%", SETTING_NAME, null)
);
assertEquals(
new FlatNumberOrByteSizeValue(3L),
FlatNumberOrByteSizeValue.parse(null, SETTING_NAME, new FlatNumberOrByteSizeValue(3L))
);
assertThrows(ElasticsearchParseException.class, () -> FlatNumberOrByteSizeValue.parse("5GB%", SETTING_NAME, null));
assertThrows(ElasticsearchParseException.class, () -> FlatNumberOrByteSizeValue.parse("5%GB", SETTING_NAME, null));
assertThrows(NumberFormatException.class, () -> FlatNumberOrByteSizeValue.parse("5GBX", SETTING_NAME, null));
}

private void assertEquals(FlatNumberOrByteSizeValue expected, FlatNumberOrByteSizeValue actual) {
assertEquals(expected.byteSizeValue(), actual.byteSizeValue());
assertEquals(expected.flatNumber(), actual.flatNumber());
}
}