Skip to content

Commit

Permalink
Scripting: Unlimited compilation rate for ingest (#59267)
Browse files Browse the repository at this point in the history
* `ingest` and `processor_conditional` default to unlimited compilation rate

Fixes: #50152
  • Loading branch information
stu-elastic authored Jul 9, 2020
1 parent 83e78ac commit a8aecb8
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.script;

import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;

import java.util.Map;
Expand All @@ -33,7 +32,7 @@ public abstract class IngestConditionalScript {

/** The context used to compile {@link IngestConditionalScript} factories. */
public static final ScriptContext<Factory> CONTEXT = new ScriptContext<>("processor_conditional", Factory.class,
200, TimeValue.timeValueMillis(0), new Tuple<>(375, TimeValue.timeValueMinutes(5)));
200, TimeValue.timeValueMillis(0), ScriptCache.UNLIMITED_COMPILATION_RATE.asTuple());

/** The generic runtime parameters for the script. */
private final Map<String, Object> params;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

package org.elasticsearch.script;

import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;

import java.util.Map;
Expand All @@ -34,7 +33,7 @@ public abstract class IngestScript {

/** The context used to compile {@link IngestScript} factories. */
public static final ScriptContext<Factory> CONTEXT = new ScriptContext<>("ingest", Factory.class,
200, TimeValue.timeValueMillis(0), new Tuple<>(375, TimeValue.timeValueMinutes(5)));
200, TimeValue.timeValueMillis(0), ScriptCache.UNLIMITED_COMPILATION_RATE.asTuple());

/** The generic runtime parameters for the script. */
private final Map<String, Object> params;
Expand Down
88 changes: 80 additions & 8 deletions server/src/main/java/org/elasticsearch/script/ScriptCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class ScriptCache {

private static final Logger logger = LogManager.getLogger(ScriptService.class);

static final Tuple<Integer, TimeValue> UNLIMITED_COMPILATION_RATE = new Tuple<>(0, TimeValue.ZERO);
static final CompilationRate UNLIMITED_COMPILATION_RATE = new CompilationRate(0, TimeValue.ZERO);

private final Cache<CacheKey, Object> cache;
private final ScriptMetrics scriptMetrics;
Expand All @@ -51,14 +51,14 @@ public class ScriptCache {
// Cache settings or derived from settings
final int cacheSize;
final TimeValue cacheExpire;
final Tuple<Integer, TimeValue> rate;
final CompilationRate rate;
private final double compilesAllowedPerNano;
private final String contextRateSetting;

ScriptCache(
int cacheMaxSize,
TimeValue cacheExpire,
Tuple<Integer, TimeValue> maxCompilationRate,
CompilationRate maxCompilationRate,
String contextRateSetting
) {
this.cacheSize = cacheMaxSize;
Expand All @@ -78,9 +78,9 @@ public class ScriptCache {
this.cache = cacheBuilder.removalListener(new ScriptCacheRemovalListener()).build();

this.rate = maxCompilationRate;
this.compilesAllowedPerNano = ((double) rate.v1()) / rate.v2().nanos();
this.compilesAllowedPerNano = ((double) rate.count) / rate.time.nanos();
this.scriptMetrics = new ScriptMetrics();
this.tokenBucketState = new AtomicReference<TokenBucketState>(new TokenBucketState(this.rate.v1()));
this.tokenBucketState = new AtomicReference<TokenBucketState>(new TokenBucketState(this.rate.count));
}

<FactoryType> FactoryType compile(
Expand Down Expand Up @@ -156,8 +156,8 @@ void checkCompilationLimit() {
double scriptsPerTimeWindow = current.availableTokens + (timePassed) * compilesAllowedPerNano;

// It's been over the time limit anyway, readjust the bucket to be level
if (scriptsPerTimeWindow > rate.v1()) {
scriptsPerTimeWindow = rate.v1();
if (scriptsPerTimeWindow > rate.count) {
scriptsPerTimeWindow = rate.count;
}

// If there is enough tokens in the bucket, allow the request and decrease the tokens by 1
Expand All @@ -173,7 +173,7 @@ void checkCompilationLimit() {
scriptMetrics.onCompilationLimit();
// Otherwise reject the request
throw new CircuitBreakingException("[script] Too many dynamic script compilations within, max: [" +
rate.v1() + "/" + rate.v2() +"]; please use indexed, or scripts with parameters instead; " +
rate + "]; please use indexed, or scripts with parameters instead; " +
"this limit can be changed by the [" + contextRateSetting + "] setting",
CircuitBreaker.Durability.TRANSIENT);
}
Expand Down Expand Up @@ -243,4 +243,76 @@ private TokenBucketState(long lastInlineCompileTime, double availableTokens, boo
this.tokenSuccessfullyTaken = tokenSuccessfullyTaken;
}
}

public static class CompilationRate {
public final int count;
public final TimeValue time;
private final String source;

public CompilationRate(Integer count, TimeValue time) {
this.count = count;
this.time = time;
this.source = null;
}

public CompilationRate(Tuple<Integer,TimeValue> rate) {
this(rate.v1(), rate.v2());
}

/**
* Parses a string as a non-negative int count and a {@code TimeValue} as arguments split by a slash
*/
public CompilationRate(String value) {
if (value.contains("/") == false || value.startsWith("/") || value.endsWith("/")) {
throw new IllegalArgumentException("parameter must contain a positive integer and a timevalue, i.e. 10/1m, but was [" +
value + "]");
}
int idx = value.indexOf("/");
String count = value.substring(0, idx);
String time = value.substring(idx + 1);
try {
int rate = Integer.parseInt(count);
if (rate < 0) {
throw new IllegalArgumentException("rate [" + rate + "] must be positive");
}
TimeValue timeValue = TimeValue.parseTimeValue(time, "script.max_compilations_rate");
if (timeValue.nanos() <= 0) {
throw new IllegalArgumentException("time value [" + time + "] must be positive");
}
// protect against a too hard to check limit, like less than a minute
if (timeValue.seconds() < 60) {
throw new IllegalArgumentException("time value [" + time + "] must be at least on a one minute resolution");
}
this.count = rate;
this.time = timeValue;
this.source = value;
} catch (NumberFormatException e) {
// the number format exception message is so confusing, that it makes more sense to wrap it with a useful one
throw new IllegalArgumentException("could not parse [" + count + "] as integer in value [" + value + "]", e);
}
}

public Tuple<Integer,TimeValue> asTuple() {
return new Tuple<>(this.count, this.time);
}

@Override
public String toString() {
return source != null ? source : count + "/" + time.toHumanReadableString(0);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CompilationRate that = (CompilationRate) o;
return count == that.count &&
Objects.equals(time, that.time);
}

@Override
public int hashCode() {
return Objects.hash(count, time);
}
}
}
53 changes: 8 additions & 45 deletions server/src/main/java/org/elasticsearch/script/ScriptService.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
Expand Down Expand Up @@ -64,43 +63,6 @@ public class ScriptService implements Closeable, ClusterStateApplier {

static final String DISABLE_DYNAMIC_SCRIPTING_SETTING = "script.disable_dynamic";

// Special setting value for SCRIPT_GENERAL_MAX_COMPILATIONS_RATE to indicate the script service should use context
// specific caches
static final Tuple<Integer, TimeValue> USE_CONTEXT_RATE_VALUE = new Tuple<>(-1, TimeValue.MINUS_ONE);
static final String USE_CONTEXT_RATE_KEY = "use-context";

// a parsing function that requires a non negative int and a timevalue as arguments split by a slash
// this allows you to easily define rates
static final Function<String, Tuple<Integer, TimeValue>> MAX_COMPILATION_RATE_FUNCTION =
(String value) -> {
if (value.contains("/") == false || value.startsWith("/") || value.endsWith("/")) {
throw new IllegalArgumentException("parameter must contain a positive integer and a timevalue, i.e. 10/1m, but was [" +
value + "]");
}
int idx = value.indexOf("/");
String count = value.substring(0, idx);
String time = value.substring(idx + 1);
try {

int rate = Integer.parseInt(count);
if (rate < 0) {
throw new IllegalArgumentException("rate [" + rate + "] must be positive");
}
TimeValue timeValue = TimeValue.parseTimeValue(time, "script.context.$CONTEXT.max_compilations_rate");
if (timeValue.nanos() <= 0) {
throw new IllegalArgumentException("time value [" + time + "] must be positive");
}
// protect against a too hard to check limit, like less than a minute
if (timeValue.seconds() < 60) {
throw new IllegalArgumentException("time value [" + time + "] must be at least on a one minute resolution");
}
return Tuple.tuple(rate, timeValue);
} catch (NumberFormatException e) {
// the number format exception message is so confusing, that it makes more sense to wrap it with a useful one
throw new IllegalArgumentException("could not parse [" + count + "] as integer in value [" + value + "]", e);
}
};

public static final Setting<Integer> SCRIPT_MAX_SIZE_IN_BYTES =
Setting.intSetting("script.max_size_in_bytes", 65535, 0, Property.Dynamic, Property.NodeScope);

Expand All @@ -120,15 +82,15 @@ public class ScriptService implements Closeable, ClusterStateApplier {
// Unlimited compilation rate for context-specific script caches
static final String UNLIMITED_COMPILATION_RATE_KEY = "unlimited";

public static final Setting.AffixSetting<Tuple<Integer, TimeValue>> SCRIPT_MAX_COMPILATIONS_RATE_SETTING =
public static final Setting.AffixSetting<ScriptCache.CompilationRate> SCRIPT_MAX_COMPILATIONS_RATE_SETTING =
Setting.affixKeySetting(CONTEXT_PREFIX,
"max_compilations_rate",
key -> new Setting<>(key, "75/5m",
key -> new Setting<ScriptCache.CompilationRate>(key, "75/5m",
(String value) -> value.equals(UNLIMITED_COMPILATION_RATE_KEY) ? ScriptCache.UNLIMITED_COMPILATION_RATE:
MAX_COMPILATION_RATE_FUNCTION.apply(value),
new ScriptCache.CompilationRate(value),
Property.NodeScope, Property.Dynamic));

private static final Tuple<Integer, TimeValue> SCRIPT_COMPILATION_RATE_ZERO = new Tuple<>(0, TimeValue.ZERO);
private static final ScriptCache.CompilationRate SCRIPT_COMPILATION_RATE_ZERO = new ScriptCache.CompilationRate(0, TimeValue.ZERO);

public static final Setting<Boolean> SCRIPT_DISABLE_MAX_COMPILATIONS_RATE_SETTING =
Setting.boolSetting("script.disable_max_compilations_rate", false, Property.NodeScope);
Expand Down Expand Up @@ -560,14 +522,15 @@ ScriptCache contextCache(Settings settings, ScriptContext<?> context) {
TimeValue cacheExpire = cacheExpireSetting.existsOrFallbackExists(settings) ?
cacheExpireSetting.get(settings) : context.cacheExpireDefault;

Setting<Tuple<Integer, TimeValue>> rateSetting = SCRIPT_MAX_COMPILATIONS_RATE_SETTING.getConcreteSettingForNamespace(context.name);
Tuple<Integer, TimeValue> rate = null;
Setting<ScriptCache.CompilationRate> rateSetting =
SCRIPT_MAX_COMPILATIONS_RATE_SETTING.getConcreteSettingForNamespace(context.name);
ScriptCache.CompilationRate rate = null;
if (SCRIPT_DISABLE_MAX_COMPILATIONS_RATE_SETTING.get(settings) || compilationLimitsEnabled() == false) {
rate = SCRIPT_COMPILATION_RATE_ZERO;
} else if (rateSetting.existsOrFallbackExists(settings)) {
rate = rateSetting.get(settings);
} else {
rate = context.maxCompilationRateDefault;
rate = new ScriptCache.CompilationRate(context.maxCompilationRateDefault);
}

return new ScriptCache(cacheSize, cacheExpire, rate, rateSetting.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.elasticsearch.script;

import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -38,27 +37,29 @@ public void testCompilationCircuitBreaking() throws Exception {
).name;
final TimeValue expire = ScriptService.SCRIPT_CACHE_EXPIRE_SETTING.getConcreteSettingForNamespace(context).get(Settings.EMPTY);
final Integer size = ScriptService.SCRIPT_CACHE_SIZE_SETTING.getConcreteSettingForNamespace(context).get(Settings.EMPTY);
Setting<Tuple<Integer, TimeValue>> rateSetting =
Setting<ScriptCache.CompilationRate> rateSetting =
ScriptService.SCRIPT_MAX_COMPILATIONS_RATE_SETTING.getConcreteSettingForNamespace(context);
Tuple<Integer, TimeValue> rate =
ScriptCache.CompilationRate rate =
ScriptService.SCRIPT_MAX_COMPILATIONS_RATE_SETTING.getConcreteSettingForNamespace(context).get(Settings.EMPTY);
String rateSettingName = rateSetting.getKey();
ScriptCache cache = new ScriptCache(size, expire, Tuple.tuple(1, TimeValue.timeValueMinutes(1)), rateSettingName);
ScriptCache cache = new ScriptCache(size, expire,
new ScriptCache.CompilationRate(1, TimeValue.timeValueMinutes(1)), rateSettingName);
cache.checkCompilationLimit(); // should pass
expectThrows(CircuitBreakingException.class, cache::checkCompilationLimit);
cache = new ScriptCache(size, expire, (Tuple.tuple(2, TimeValue.timeValueMinutes(1))), rateSettingName);
cache = new ScriptCache(size, expire, new ScriptCache.CompilationRate(2, TimeValue.timeValueMinutes(1)), rateSettingName);
cache.checkCompilationLimit(); // should pass
cache.checkCompilationLimit(); // should pass
expectThrows(CircuitBreakingException.class, cache::checkCompilationLimit);
int count = randomIntBetween(5, 50);
cache = new ScriptCache(size, expire, (Tuple.tuple(count, TimeValue.timeValueMinutes(1))), rateSettingName);
cache = new ScriptCache(size, expire, new ScriptCache.CompilationRate(count, TimeValue.timeValueMinutes(1)), rateSettingName);
for (int i = 0; i < count; i++) {
cache.checkCompilationLimit(); // should pass
}
expectThrows(CircuitBreakingException.class, cache::checkCompilationLimit);
cache = new ScriptCache(size, expire, (Tuple.tuple(0, TimeValue.timeValueMinutes(1))), rateSettingName);
cache = new ScriptCache(size, expire, new ScriptCache.CompilationRate(0, TimeValue.timeValueMinutes(1)), rateSettingName);
expectThrows(CircuitBreakingException.class, cache::checkCompilationLimit);
cache = new ScriptCache(size, expire, (Tuple.tuple(Integer.MAX_VALUE, TimeValue.timeValueMinutes(1))), rateSettingName);
cache = new ScriptCache(size, expire,
new ScriptCache.CompilationRate(Integer.MAX_VALUE, TimeValue.timeValueMinutes(1)), rateSettingName);
int largeLimit = randomIntBetween(1000, 10000);
for (int i = 0; i < largeLimit; i++) {
cache.checkCompilationLimit();
Expand Down
Loading

0 comments on commit a8aecb8

Please sign in to comment.