Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scripting: Unlimited compilation rate for ingest #59267

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.script;

import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;

import java.util.Map;
Expand All @@ -33,7 +32,7 @@ public abstract class IngestConditionalScript {

/** The context used to compile {@link IngestConditionalScript} factories. */
public static final ScriptContext<Factory> CONTEXT = new ScriptContext<>("processor_conditional", Factory.class,
200, TimeValue.timeValueMillis(0), new Tuple<>(375, TimeValue.timeValueMinutes(5)));
200, TimeValue.timeValueMillis(0), ScriptCache.UNLIMITED_COMPILATION_RATE.asTuple());

/** The generic runtime parameters for the script. */
private final Map<String, Object> params;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

package org.elasticsearch.script;

import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;

import java.util.Map;
Expand All @@ -34,7 +33,7 @@ public abstract class IngestScript {

/** The context used to compile {@link IngestScript} factories. */
public static final ScriptContext<Factory> CONTEXT = new ScriptContext<>("ingest", Factory.class,
200, TimeValue.timeValueMillis(0), new Tuple<>(375, TimeValue.timeValueMinutes(5)));
200, TimeValue.timeValueMillis(0), ScriptCache.UNLIMITED_COMPILATION_RATE.asTuple());

/** The generic runtime parameters for the script. */
private final Map<String, Object> params;
Expand Down
94 changes: 83 additions & 11 deletions server/src/main/java/org/elasticsearch/script/ScriptCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class ScriptCache {

private static final Logger logger = LogManager.getLogger(ScriptService.class);

static final Tuple<Integer, TimeValue> UNLIMITED_COMPILATION_RATE = new Tuple<>(0, TimeValue.ZERO);
static final CompilationRate UNLIMITED_COMPILATION_RATE = new CompilationRate(0, TimeValue.ZERO);

private final Cache<CacheKey, Object> cache;
private final ScriptMetrics scriptMetrics;
Expand All @@ -51,15 +51,15 @@ public class ScriptCache {
// Cache settings or derived from settings
final int cacheSize;
final TimeValue cacheExpire;
final Tuple<Integer, TimeValue> rate;
final CompilationRate rate;
private final double compilesAllowedPerNano;
private final String contextRateSetting;

ScriptCache(
int cacheMaxSize,
TimeValue cacheExpire,
Tuple<Integer, TimeValue> maxCompilationRate,
String contextRateSetting
int cacheMaxSize,
stu-elastic marked this conversation as resolved.
Show resolved Hide resolved
TimeValue cacheExpire,
CompilationRate maxCompilationRate,
String contextRateSetting
) {
this.cacheSize = cacheMaxSize;
this.cacheExpire = cacheExpire;
Expand All @@ -78,9 +78,9 @@ public class ScriptCache {
this.cache = cacheBuilder.removalListener(new ScriptCacheRemovalListener()).build();

this.rate = maxCompilationRate;
this.compilesAllowedPerNano = ((double) rate.v1()) / rate.v2().nanos();
this.compilesAllowedPerNano = ((double) rate.count) / rate.time.nanos();
this.scriptMetrics = new ScriptMetrics();
this.tokenBucketState = new AtomicReference<TokenBucketState>(new TokenBucketState(this.rate.v1()));
this.tokenBucketState = new AtomicReference<TokenBucketState>(new TokenBucketState(this.rate.count));
}

<FactoryType> FactoryType compile(
Expand Down Expand Up @@ -156,8 +156,8 @@ void checkCompilationLimit() {
double scriptsPerTimeWindow = current.availableTokens + (timePassed) * compilesAllowedPerNano;

// It's been over the time limit anyway, readjust the bucket to be level
if (scriptsPerTimeWindow > rate.v1()) {
scriptsPerTimeWindow = rate.v1();
if (scriptsPerTimeWindow > rate.count) {
scriptsPerTimeWindow = rate.count;
}

// If there is enough tokens in the bucket, allow the request and decrease the tokens by 1
Expand All @@ -173,7 +173,7 @@ void checkCompilationLimit() {
scriptMetrics.onCompilationLimit();
// Otherwise reject the request
throw new CircuitBreakingException("[script] Too many dynamic script compilations within, max: [" +
rate.v1() + "/" + rate.v2() +"]; please use indexed, or scripts with parameters instead; " +
rate + "]; please use indexed, or scripts with parameters instead; " +
"this limit can be changed by the [" + contextRateSetting + "] setting",
CircuitBreaker.Durability.TRANSIENT);
}
Expand Down Expand Up @@ -243,4 +243,76 @@ private TokenBucketState(long lastInlineCompileTime, double availableTokens, boo
this.tokenSuccessfullyTaken = tokenSuccessfullyTaken;
}
}

public static class CompilationRate {
public final int count;
public final TimeValue time;
private final String source;

public CompilationRate(Integer count, TimeValue time) {
this.count = count;
this.time = time;
this.source = null;
}

public CompilationRate(Tuple<Integer,TimeValue> rate) {
this(rate.v1(), rate.v2());
}

/**
* Parses a string as a non-negative int count and a {@code TimeValue} as arguments split by a slash
*/
public CompilationRate(String value) {
if (value.contains("/") == false || value.startsWith("/") || value.endsWith("/")) {
throw new IllegalArgumentException("parameter must contain a positive integer and a timevalue, i.e. 10/1m, but was [" +
value + "]");
}
int idx = value.indexOf("/");
String count = value.substring(0, idx);
String time = value.substring(idx + 1);
try {
int rate = Integer.parseInt(count);
if (rate < 0) {
throw new IllegalArgumentException("rate [" + rate + "] must be positive");
}
TimeValue timeValue = TimeValue.parseTimeValue(time, "script.max_compilations_rate");
if (timeValue.nanos() <= 0) {
throw new IllegalArgumentException("time value [" + time + "] must be positive");
}
// protect against a too hard to check limit, like less than a minute
if (timeValue.seconds() < 60) {
throw new IllegalArgumentException("time value [" + time + "] must be at least on a one minute resolution");
}
this.count = rate;
this.time = timeValue;
this.source = value;
} catch (NumberFormatException e) {
// the number format exception message is so confusing, that it makes more sense to wrap it with a useful one
throw new IllegalArgumentException("could not parse [" + count + "] as integer in value [" + value + "]", e);
}
}

public Tuple<Integer,TimeValue> asTuple() {
return new Tuple<>(this.count, this.time);
}

@Override
public String toString() {
return source != null ? source : count + "/" + time.toHumanReadableString(0);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CompilationRate that = (CompilationRate) o;
return count == that.count &&
Objects.equals(time, that.time);
}

@Override
public int hashCode() {
return Objects.hash(count, time);
}
}
}
56 changes: 12 additions & 44 deletions server/src/main/java/org/elasticsearch/script/ScriptService.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
Expand Down Expand Up @@ -66,50 +65,18 @@ public class ScriptService implements Closeable, ClusterStateApplier {

// Special setting value for SCRIPT_GENERAL_MAX_COMPILATIONS_RATE to indicate the script service should use context
// specific caches
static final Tuple<Integer, TimeValue> USE_CONTEXT_RATE_VALUE = new Tuple<>(-1, TimeValue.MINUS_ONE);
static final ScriptCache.CompilationRate USE_CONTEXT_RATE_VALUE = new ScriptCache.CompilationRate(-1, TimeValue.MINUS_ONE);
static final String USE_CONTEXT_RATE_KEY = "use-context";

// a parsing function that requires a non negative int and a timevalue as arguments split by a slash
// this allows you to easily define rates
static final Function<String, Tuple<Integer, TimeValue>> MAX_COMPILATION_RATE_FUNCTION =
(String value) -> {
if (value.contains("/") == false || value.startsWith("/") || value.endsWith("/")) {
throw new IllegalArgumentException("parameter must contain a positive integer and a timevalue, i.e. 10/1m, but was [" +
value + "]");
}
int idx = value.indexOf("/");
String count = value.substring(0, idx);
String time = value.substring(idx + 1);
try {

int rate = Integer.parseInt(count);
if (rate < 0) {
throw new IllegalArgumentException("rate [" + rate + "] must be positive");
}
TimeValue timeValue = TimeValue.parseTimeValue(time, "script.max_compilations_rate");
if (timeValue.nanos() <= 0) {
throw new IllegalArgumentException("time value [" + time + "] must be positive");
}
// protect against a too hard to check limit, like less than a minute
if (timeValue.seconds() < 60) {
throw new IllegalArgumentException("time value [" + time + "] must be at least on a one minute resolution");
}
return Tuple.tuple(rate, timeValue);
} catch (NumberFormatException e) {
// the number format exception message is so confusing, that it makes more sense to wrap it with a useful one
throw new IllegalArgumentException("could not parse [" + count + "] as integer in value [" + value + "]", e);
}
};

public static final Setting<Integer> SCRIPT_GENERAL_CACHE_SIZE_SETTING =
Setting.intSetting("script.cache.max_size", 100, 0, Property.NodeScope, Property.Deprecated);
public static final Setting<TimeValue> SCRIPT_GENERAL_CACHE_EXPIRE_SETTING =
Setting.positiveTimeSetting("script.cache.expire", TimeValue.timeValueMillis(0), Property.NodeScope, Property.Deprecated);
public static final Setting<Integer> SCRIPT_MAX_SIZE_IN_BYTES =
Setting.intSetting("script.max_size_in_bytes", 65535, 0, Property.Dynamic, Property.NodeScope);
public static final Setting<Tuple<Integer, TimeValue>> SCRIPT_GENERAL_MAX_COMPILATIONS_RATE_SETTING =
public static final Setting<ScriptCache.CompilationRate> SCRIPT_GENERAL_MAX_COMPILATIONS_RATE_SETTING =
new Setting<>("script.max_compilations_rate", USE_CONTEXT_RATE_KEY,
(String value) -> value.equals(USE_CONTEXT_RATE_KEY) ? USE_CONTEXT_RATE_VALUE: MAX_COMPILATION_RATE_FUNCTION.apply(value),
(String value) -> value.equals(USE_CONTEXT_RATE_KEY) ? USE_CONTEXT_RATE_VALUE: new ScriptCache.CompilationRate(value),
Property.Dynamic, Property.NodeScope, Property.Deprecated);

// Per-context settings
Expand All @@ -128,15 +95,15 @@ public class ScriptService implements Closeable, ClusterStateApplier {
// Unlimited compilation rate for context-specific script caches
static final String UNLIMITED_COMPILATION_RATE_KEY = "unlimited";

public static final Setting.AffixSetting<Tuple<Integer, TimeValue>> SCRIPT_MAX_COMPILATIONS_RATE_SETTING =
public static final Setting.AffixSetting<ScriptCache.CompilationRate> SCRIPT_MAX_COMPILATIONS_RATE_SETTING =
Setting.affixKeySetting(CONTEXT_PREFIX,
"max_compilations_rate",
key -> new Setting<>(key, "75/5m",
key -> new Setting<ScriptCache.CompilationRate>(key, "75/5m",
(String value) -> value.equals(UNLIMITED_COMPILATION_RATE_KEY) ? ScriptCache.UNLIMITED_COMPILATION_RATE:
MAX_COMPILATION_RATE_FUNCTION.apply(value),
new ScriptCache.CompilationRate(value),
Property.NodeScope, Property.Dynamic));

private static final Tuple<Integer, TimeValue> SCRIPT_COMPILATION_RATE_ZERO = new Tuple<>(0, TimeValue.ZERO);
private static final ScriptCache.CompilationRate SCRIPT_COMPILATION_RATE_ZERO = new ScriptCache.CompilationRate(0, TimeValue.ZERO);

public static final Setting<Boolean> SCRIPT_DISABLE_MAX_COMPILATIONS_RATE_SETTING =
Setting.boolSetting("script.disable_max_compilations_rate", false, Property.NodeScope);
Expand Down Expand Up @@ -632,14 +599,15 @@ ScriptCache contextCache(Settings settings, ScriptContext<?> context) {
TimeValue cacheExpire = cacheExpireSetting.existsOrFallbackExists(settings) ?
cacheExpireSetting.get(settings) : context.cacheExpireDefault;

Setting<Tuple<Integer, TimeValue>> rateSetting = SCRIPT_MAX_COMPILATIONS_RATE_SETTING.getConcreteSettingForNamespace(context.name);
Tuple<Integer, TimeValue> rate = null;
Setting<ScriptCache.CompilationRate> rateSetting =
SCRIPT_MAX_COMPILATIONS_RATE_SETTING.getConcreteSettingForNamespace(context.name);
ScriptCache.CompilationRate rate = null;
if (SCRIPT_DISABLE_MAX_COMPILATIONS_RATE_SETTING.get(settings) || compilationLimitsEnabled() == false) {
rate = SCRIPT_COMPILATION_RATE_ZERO;
} else if (rateSetting.existsOrFallbackExists(settings)) {
rate = rateSetting.get(settings);
} else {
rate = context.maxCompilationRateDefault;
rate = new ScriptCache.CompilationRate(context.maxCompilationRateDefault);
}

return new ScriptCache(cacheSize, cacheExpire, rate, rateSetting.getKey());
Expand All @@ -654,7 +622,7 @@ static class CacheHolder {
final ScriptCache general;
final Map<String, AtomicReference<ScriptCache>> contextCache;

CacheHolder(int cacheMaxSize, TimeValue cacheExpire, Tuple<Integer, TimeValue> maxCompilationRate, String contextRateSetting) {
CacheHolder(int cacheMaxSize, TimeValue cacheExpire, ScriptCache.CompilationRate maxCompilationRate, String contextRateSetting) {
contextCache = null;
general = new ScriptCache(cacheMaxSize, cacheExpire, maxCompilationRate, contextRateSetting);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.elasticsearch.script;

import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
Expand All @@ -30,24 +29,24 @@ public class ScriptCacheTests extends ESTestCase {
public void testCompilationCircuitBreaking() throws Exception {
final TimeValue expire = ScriptService.SCRIPT_GENERAL_CACHE_EXPIRE_SETTING.get(Settings.EMPTY);
final Integer size = ScriptService.SCRIPT_GENERAL_CACHE_SIZE_SETTING.get(Settings.EMPTY);
Tuple<Integer, TimeValue> rate = ScriptService.SCRIPT_GENERAL_MAX_COMPILATIONS_RATE_SETTING.get(Settings.EMPTY);
String settingName = ScriptService.SCRIPT_GENERAL_MAX_COMPILATIONS_RATE_SETTING.getKey();
ScriptCache cache = new ScriptCache(size, expire, Tuple.tuple(1, TimeValue.timeValueMinutes(1)), settingName);
ScriptCache cache = new ScriptCache(size, expire, new ScriptCache.CompilationRate(1, TimeValue.timeValueMinutes(1)), settingName);
cache.checkCompilationLimit(); // should pass
expectThrows(CircuitBreakingException.class, cache::checkCompilationLimit);
cache = new ScriptCache(size, expire, (Tuple.tuple(2, TimeValue.timeValueMinutes(1))), settingName);
cache = new ScriptCache(size, expire, new ScriptCache.CompilationRate(2, TimeValue.timeValueMinutes(1)), settingName);
cache.checkCompilationLimit(); // should pass
cache.checkCompilationLimit(); // should pass
expectThrows(CircuitBreakingException.class, cache::checkCompilationLimit);
int count = randomIntBetween(5, 50);
cache = new ScriptCache(size, expire, (Tuple.tuple(count, TimeValue.timeValueMinutes(1))), settingName);
cache = new ScriptCache(size, expire, new ScriptCache.CompilationRate(count, TimeValue.timeValueMinutes(1)), settingName);
for (int i = 0; i < count; i++) {
cache.checkCompilationLimit(); // should pass
}
expectThrows(CircuitBreakingException.class, cache::checkCompilationLimit);
cache = new ScriptCache(size, expire, (Tuple.tuple(0, TimeValue.timeValueMinutes(1))), settingName);
cache = new ScriptCache(size, expire, new ScriptCache.CompilationRate(0, TimeValue.timeValueMinutes(1)), settingName);
expectThrows(CircuitBreakingException.class, cache::checkCompilationLimit);
cache = new ScriptCache(size, expire, (Tuple.tuple(Integer.MAX_VALUE, TimeValue.timeValueMinutes(1))), settingName);
cache = new ScriptCache(size, expire,
new ScriptCache.CompilationRate(Integer.MAX_VALUE, TimeValue.timeValueMinutes(1)), settingName);
int largeLimit = randomIntBetween(1000, 10000);
for (int i = 0; i < largeLimit; i++) {
cache.checkCompilationLimit();
Expand Down
Loading