From 168d471f6268a6fa583d9881662dfda25aa615c1 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 1 Jun 2018 09:34:17 +0200 Subject: [PATCH 1/5] [INGEST] Interrupt the current thread if evaluation grok expressions take too long This adds a thread interrupter that allows us to encapsulate calls to org.joni.Matcher#search() This method can hang forever if the regex expression is too complex. The thread interrupter in the background checks every 3 seconds whether there are threads execution the org.joni.Matcher#search() method for longer than 5 seconds and if so interrupts these threads. Joni has checks that that for every 30k iterations it checks if the current thread is interrupted and if so returns org.joni.Matcher#INTERRUPTED Closes #28731 --- .../java/org/elasticsearch/grok/Grok.java | 53 +++++-- .../elasticsearch/grok/ThreadInterrupter.java | 134 ++++++++++++++++++ .../org/elasticsearch/grok/GrokTests.java | 38 +++-- .../grok/ThreadInterrupterTests.java | 66 +++++++++ .../ingest/common/GrokProcessor.java | 12 +- .../ingest/common/IngestCommonPlugin.java | 24 +++- .../common/GrokProcessorFactoryTests.java | 17 +-- .../ingest/common/GrokProcessorTests.java | 37 ++--- .../elasticsearch/ingest/IngestService.java | 2 +- .../org/elasticsearch/ingest/Processor.java | 12 +- 10 files changed, 342 insertions(+), 53 deletions(-) create mode 100644 libs/grok/src/main/java/org/elasticsearch/grok/ThreadInterrupter.java create mode 100644 libs/grok/src/test/java/org/elasticsearch/grok/ThreadInterrupterTests.java diff --git a/libs/grok/src/main/java/org/elasticsearch/grok/Grok.java b/libs/grok/src/main/java/org/elasticsearch/grok/Grok.java index 3800c7711a2fd..cd20ddcf42252 100644 --- a/libs/grok/src/main/java/org/elasticsearch/grok/Grok.java +++ b/libs/grok/src/main/java/org/elasticsearch/grok/Grok.java @@ -76,15 +76,24 @@ public final class Grok { private final Map patternBank; private final boolean namedCaptures; private final Regex compiledExpression; + private final ThreadInterrupter threadInterrupter; public Grok(Map patternBank, String grokPattern) { - this(patternBank, grokPattern, true); + this(patternBank, grokPattern, true, ThreadInterrupter.noop()); } - - @SuppressWarnings("unchecked") + + public Grok(Map patternBank, String grokPattern, ThreadInterrupter threadInterrupter) { + this(patternBank, grokPattern, true, threadInterrupter); + } + Grok(Map patternBank, String grokPattern, boolean namedCaptures) { + this(patternBank, grokPattern, namedCaptures, ThreadInterrupter.noop()); + } + + private Grok(Map patternBank, String grokPattern, boolean namedCaptures, ThreadInterrupter threadInterrupter) { this.patternBank = patternBank; this.namedCaptures = namedCaptures; + this.threadInterrupter = threadInterrupter; for (Map.Entry entry : patternBank.entrySet()) { String name = entry.getKey(); @@ -163,7 +172,13 @@ public String toRegex(String grokPattern) { byte[] grokPatternBytes = grokPattern.getBytes(StandardCharsets.UTF_8); Matcher matcher = GROK_PATTERN_REGEX.matcher(grokPatternBytes); - int result = matcher.search(0, grokPatternBytes.length, Option.NONE); + int result; + try { + threadInterrupter.register(); + result = matcher.search(0, grokPatternBytes.length, Option.NONE); + } finally { + threadInterrupter.deregister(); + } if (result != -1) { Region region = matcher.getEagerRegion(); String namedPatternRef = groupMatch(NAME_GROUP, region, grokPattern); @@ -205,7 +220,13 @@ public String toRegex(String grokPattern) { */ public boolean match(String text) { Matcher matcher = compiledExpression.matcher(text.getBytes(StandardCharsets.UTF_8)); - int result = matcher.search(0, text.length(), Option.DEFAULT); + int result; + try { + threadInterrupter.register(); + result = matcher.search(0, text.length(), Option.DEFAULT); + } finally { + threadInterrupter.deregister(); + } return (result != -1); } @@ -220,8 +241,19 @@ public Map captures(String text) { byte[] textAsBytes = text.getBytes(StandardCharsets.UTF_8); Map fields = new HashMap<>(); Matcher matcher = compiledExpression.matcher(textAsBytes); - int result = matcher.search(0, textAsBytes.length, Option.DEFAULT); - if (result != -1 && compiledExpression.numberOfNames() > 0) { + int result; + try { + threadInterrupter.register(); + result = matcher.search(0, textAsBytes.length, Option.DEFAULT); + } finally { + threadInterrupter.deregister(); + } + if (result == Matcher.INTERRUPTED) { + throw new IllegalArgumentException("grok pattern matching is too complex and takes too long to execute"); + } else if (result == Matcher.FAILED) { + // I think we should throw an error here? + return null; + } else if (compiledExpression.numberOfNames() > 0) { Region region = matcher.getEagerRegion(); for (Iterator entry = compiledExpression.namedBackrefIterator(); entry.hasNext();) { NameEntry e = entry.next(); @@ -235,13 +267,10 @@ public Map captures(String text) { break; } } - + } - return fields; - } else if (result != -1) { - return fields; } - return null; + return fields; } public static Map getBuiltinPatterns() { diff --git a/libs/grok/src/main/java/org/elasticsearch/grok/ThreadInterrupter.java b/libs/grok/src/main/java/org/elasticsearch/grok/ThreadInterrupter.java new file mode 100644 index 0000000000000..94bb600a9222b --- /dev/null +++ b/libs/grok/src/main/java/org/elasticsearch/grok/ThreadInterrupter.java @@ -0,0 +1,134 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.grok; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; +import java.util.function.BiFunction; +import java.util.function.LongSupplier; + +/** + * Protects against long running operations that happen between the register and de-register invocations. + * Threads that invoke {@link #register()}, but take too long to invoke the {@link #deregister()} method + * will be interrupted. + * + * This is needed for Joni's {@link org.joni.Matcher#search(int, int, int)} method, because + * it can end up spinning endlessly if the regular expression is too complex. Joni has checks + * that that for every 30k iterations it checks if the current thread is interrupted and if so + * returns {@link org.joni.Matcher#INTERRUPTED}. + */ +public interface ThreadInterrupter { + + /** + * Registers the current thread and interrupts the current thread + * if the takes too long for this thread to invoke {@link #deregister()}. + */ + void register(); + + /** + * De-registers the current thread and prevents it from being interrupted. + */ + void deregister(); + + /** + * Returns an implementation that checks for each fixed interval if there are threads that have invoked {@link #register()} + * and not {@link #deregister()} and have been in this state for longer than the specified max execution interval and + * then interrupts these threads. + * + * @param interval The fixed interval to check if there are threads to interrupt + * @param maxExecutionTime The time a thread has the execute an operation. + * @param relativeTimeSupplier A supplier that returns relative time + * @param scheduler A scheduler that is able to execute a command for each fixed interval + */ + static ThreadInterrupter newInstance(long interval, + long maxExecutionTime, + LongSupplier relativeTimeSupplier, + BiFunction> scheduler) { + return new Default(interval, maxExecutionTime, relativeTimeSupplier, scheduler); + } + + /** + * @return A noop implementation that does not interrupt threads and is useful for testing and pre-defined grok expressions. + */ + static ThreadInterrupter noop() { + return new Noop(); + } + + class Noop implements ThreadInterrupter { + + private Noop() { + } + + @Override + public void register() { + } + + @Override + public void deregister() { + } + } + + class Default implements ThreadInterrupter { + + private final long interval; + private final long maxExecutionTime; + private final LongSupplier relativeTimeSupplier; + private final BiFunction> scheduler; + final ConcurrentHashMap registry = new ConcurrentHashMap<>(); + + private Default(long interval, + long maxExecutionTime, + LongSupplier relativeTimeSupplier, + BiFunction> scheduler) { + this.interval = interval; + this.maxExecutionTime = maxExecutionTime; + this.relativeTimeSupplier = relativeTimeSupplier; + this.scheduler = scheduler; + scheduler.apply(interval, this::interruptLongRunningExecutions); + } + + public void register() { + Long previousValue = registry.put(Thread.currentThread(), relativeTimeSupplier.getAsLong()); + assert previousValue == null; + } + + public void deregister() { + Long previousValue = registry.remove(Thread.currentThread()); + assert previousValue != null; + } + + private void interruptLongRunningExecutions() { + try { + final long currentRelativeTime = relativeTimeSupplier.getAsLong(); + for (Map.Entry entry : registry.entrySet()) { + long threadTime = entry.getValue(); + if ((currentRelativeTime - threadTime) > maxExecutionTime) { + entry.getKey().interrupt(); + // not removing the entry here, this happens in the deregister() method. + } + } + } finally { + scheduler.apply(interval, this::interruptLongRunningExecutions); + } + } + + } + +} diff --git a/libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java b/libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java index eb8d0e9548753..d66b601eace2d 100644 --- a/libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java +++ b/libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java @@ -20,15 +20,16 @@ package org.elasticsearch.grok; import org.elasticsearch.test.ESTestCase; -import org.junit.Before; -import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -36,12 +37,7 @@ public class GrokTests extends ESTestCase { - private Map basePatterns; - - @Before - public void setup() { - basePatterns = Grok.getBuiltinPatterns(); - } + private static final Map basePatterns = Grok.getBuiltinPatterns(); public void testMatchWithoutCaptures() { String line = "value"; @@ -416,4 +412,30 @@ public void testMultipleNamedCapturesWithSameName() { expected.put("num", "1"); assertThat(grok.captures("12"), equalTo(expected)); } + + public void testExponentialExpressions() { + AtomicBoolean run = new AtomicBoolean(true); // to avoid a lingering thread when test has completed + + String grokPattern = "Bonsuche mit folgender Anfrage: Belegart->\\[%{WORD:param2},(?(\\s*%{NOTSPACE})*)\\] " + + "Zustand->ABGESCHLOSSEN Kassennummer->%{WORD:param9} Bonnummer->%{WORD:param10} Datum->%{DATESTAMP_OTHER:param11}"; + String logLine = "Bonsuche mit folgender Anfrage: Belegart->[EINGESCHRAENKTER_VERKAUF, VERKAUF, NACHERFASSUNG] " + + "Zustand->ABGESCHLOSSEN Kassennummer->2 Bonnummer->6362 Datum->Mon Jan 08 00:00:00 UTC 2018"; + BiFunction> scheduler = (delay, command) -> { + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + } + Thread t = new Thread(() -> { + if (run.get()) { + command.run(); + } + }); + t.start(); + return null; + }; + Grok grok = new Grok(basePatterns, grokPattern, ThreadInterrupter.newInstance(10, 200, System::currentTimeMillis, scheduler)); + Exception e = expectThrows(IllegalArgumentException.class, () -> grok.captures(logLine)); + run.set(false); + assertThat(e.getMessage(), equalTo("grok pattern matching is too complex and takes too long to execute")); + } } diff --git a/libs/grok/src/test/java/org/elasticsearch/grok/ThreadInterrupterTests.java b/libs/grok/src/test/java/org/elasticsearch/grok/ThreadInterrupterTests.java new file mode 100644 index 0000000000000..f4cd455af46e4 --- /dev/null +++ b/libs/grok/src/test/java/org/elasticsearch/grok/ThreadInterrupterTests.java @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.grok; + +import org.elasticsearch.test.ESTestCase; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.Matchers.is; + +public class ThreadInterrupterTests extends ESTestCase { + + public void testInterrupt() throws Exception { + AtomicBoolean run = new AtomicBoolean(true); // to avoid a lingering thread when test has completed + ThreadInterrupter guard = ThreadInterrupter.newInstance(10, 100, System::currentTimeMillis, (delay, command) -> { + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + } + Thread thread = new Thread(() -> { + if (run.get()) { + command.run(); + } + }); + thread.start(); + return null; + }); + + Map registry = ((ThreadInterrupter.Default) guard).registry; + assertThat(registry.size(), is(0)); + // need to call #register() method on a different thread, assertBusy() fails if current thread gets interrupted + Thread thread = new Thread(() -> { + guard.register(); + while (run.get()) { + } + guard.deregister(); + }); + thread.start(); + assertBusy(() -> { + assertThat(thread.isInterrupted(), is(true)); + assertThat(registry.size(), is(1)); + }); + run.set(false); + assertBusy(() -> { + assertThat(registry.size(), is(0)); + }); + } + +} diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java index 8d1d2127e7213..982653162cadf 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest.common; import org.elasticsearch.grok.Grok; +import org.elasticsearch.grok.ThreadInterrupter; import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; @@ -43,11 +44,11 @@ public final class GrokProcessor extends AbstractProcessor { private final boolean ignoreMissing; GrokProcessor(String tag, Map patternBank, List matchPatterns, String matchField, - boolean traceMatch, boolean ignoreMissing) { + boolean traceMatch, boolean ignoreMissing, ThreadInterrupter threadInterrupter) { super(tag); this.matchField = matchField; this.matchPatterns = matchPatterns; - this.grok = new Grok(patternBank, combinePatterns(matchPatterns, traceMatch)); + this.grok = new Grok(patternBank, combinePatterns(matchPatterns, traceMatch), threadInterrupter); this.traceMatch = traceMatch; this.ignoreMissing = ignoreMissing; } @@ -132,9 +133,11 @@ static String combinePatterns(List patterns, boolean traceMatch) { public static final class Factory implements Processor.Factory { private final Map builtinPatterns; + private final ThreadInterrupter threadInterrupter; - public Factory(Map builtinPatterns) { + public Factory(Map builtinPatterns, ThreadInterrupter threadInterrupter) { this.builtinPatterns = builtinPatterns; + this.threadInterrupter = threadInterrupter; } @Override @@ -155,7 +158,8 @@ public GrokProcessor create(Map registry, String proc } try { - return new GrokProcessor(processorTag, patternBank, matchPatterns, matchField, traceMatch, ignoreMissing); + return new GrokProcessor(processorTag, patternBank, matchPatterns, matchField, traceMatch, ignoreMissing, + threadInterrupter); } catch (Exception e) { throw newConfigurationException(TYPE, processorTag, "patterns", "Invalid regex pattern found in: " + matchPatterns + ". " + e.getMessage()); diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index a29c994f10d37..a12d9c133a69e 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -25,15 +25,19 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.grok.Grok; +import org.elasticsearch.grok.ThreadInterrupter; import org.elasticsearch.ingest.Processor; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; import java.util.Collections; @@ -45,6 +49,10 @@ public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPlugin { static final Map GROK_PATTERNS = Grok.getBuiltinPatterns(); + static final Setting GUARD_INTERVAL = + Setting.timeSetting("ingest.grok.guard.interval", TimeValue.timeValueSeconds(3), Setting.Property.NodeScope); + static final Setting GUARD_MAX_EXECUTION_TIME = + Setting.timeSetting("ingest.grok.guard.max_execution_time", TimeValue.timeValueSeconds(5), Setting.Property.NodeScope); public IngestCommonPlugin() { } @@ -68,7 +76,8 @@ public Map getProcessors(Processor.Parameters paramet processors.put(ForEachProcessor.TYPE, new ForEachProcessor.Factory()); processors.put(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory()); processors.put(SortProcessor.TYPE, new SortProcessor.Factory()); - processors.put(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS)); + ThreadInterrupter threadInterrupter = createGrokThreadInterrupter(parameters); + processors.put(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS, threadInterrupter)); processors.put(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService)); processors.put(DotExpanderProcessor.TYPE, new DotExpanderProcessor.Factory()); processors.put(JsonProcessor.TYPE, new JsonProcessor.Factory()); @@ -89,5 +98,18 @@ public List getRestHandlers(Settings settings, RestController restC Supplier nodesInCluster) { return Arrays.asList(new GrokProcessorGetAction.RestAction(settings, restController)); } + + @Override + public List> getSettings() { + return Arrays.asList(GUARD_INTERVAL, GUARD_MAX_EXECUTION_TIME); + } + + private static ThreadInterrupter createGrokThreadInterrupter(Processor.Parameters parameters) { + long intervalMillis = GUARD_INTERVAL.get(parameters.env.settings()).getMillis(); + long maxExecutionTimeMillis = GUARD_MAX_EXECUTION_TIME.get(parameters.env.settings()).getMillis(); + ThreadPool threadPool = parameters.threadPool; + return ThreadInterrupter.newInstance(intervalMillis, maxExecutionTimeMillis, threadPool::relativeTimeInMillis, + (delay, command) -> threadPool.schedule(TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC, command)); + } } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorFactoryTests.java index 4cac94cd5b571..59db6e9d83166 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorFactoryTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest.common; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.grok.ThreadInterrupter; import org.elasticsearch.test.ESTestCase; import java.util.Collections; @@ -33,7 +34,7 @@ public class GrokProcessorFactoryTests extends ESTestCase { public void testBuild() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadInterrupter.noop()); Map config = new HashMap<>(); config.put("field", "_field"); @@ -47,7 +48,7 @@ public void testBuild() throws Exception { } public void testBuildWithIgnoreMissing() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadInterrupter.noop()); Map config = new HashMap<>(); config.put("field", "_field"); @@ -62,7 +63,7 @@ public void testBuildWithIgnoreMissing() throws Exception { } public void testBuildMissingField() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadInterrupter.noop()); Map config = new HashMap<>(); config.put("patterns", Collections.singletonList("(?\\w+)")); ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config)); @@ -70,7 +71,7 @@ public void testBuildMissingField() throws Exception { } public void testBuildMissingPatterns() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadInterrupter.noop()); Map config = new HashMap<>(); config.put("field", "foo"); ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config)); @@ -78,7 +79,7 @@ public void testBuildMissingPatterns() throws Exception { } public void testBuildEmptyPatternsList() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadInterrupter.noop()); Map config = new HashMap<>(); config.put("field", "foo"); config.put("patterns", Collections.emptyList()); @@ -87,7 +88,7 @@ public void testBuildEmptyPatternsList() throws Exception { } public void testCreateWithCustomPatterns() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadInterrupter.noop()); Map config = new HashMap<>(); config.put("field", "_field"); @@ -100,7 +101,7 @@ public void testCreateWithCustomPatterns() throws Exception { } public void testCreateWithInvalidPattern() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadInterrupter.noop()); Map config = new HashMap<>(); config.put("field", "_field"); config.put("patterns", Collections.singletonList("[")); @@ -109,7 +110,7 @@ public void testCreateWithInvalidPattern() throws Exception { } public void testCreateWithInvalidPatternDefinition() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadInterrupter.noop()); Map config = new HashMap<>(); config.put("field", "_field"); config.put("patterns", Collections.singletonList("%{MY_PATTERN:name}!")); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorTests.java index 37c26db4b74f3..bece3edda7c54 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.ingest.common; +import org.elasticsearch.grok.ThreadInterrupter; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.test.ESTestCase; @@ -39,7 +40,7 @@ public void testMatch() throws Exception { IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); doc.setFieldValue(fieldName, "1"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, false); + Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadInterrupter.noop()); processor.execute(doc); assertThat(doc.getFieldValue("one", String.class), equalTo("1")); } @@ -49,7 +50,7 @@ public void testNoMatch() { IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); doc.setFieldValue(fieldName, "23"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, false); + Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadInterrupter.noop()); Exception e = expectThrows(Exception.class, () -> processor.execute(doc)); assertThat(e.getMessage(), equalTo("Provided Grok expressions do not match field value: [23]")); } @@ -60,7 +61,7 @@ public void testNoMatchingPatternName() { doc.setFieldValue(fieldName, "23"); Exception e = expectThrows(IllegalArgumentException.class, () -> new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), Collections.singletonList("%{NOTONE:not_one}"), fieldName, - false, false)); + false, false, ThreadInterrupter.noop())); assertThat(e.getMessage(), equalTo("Unable to find pattern [NOTONE] in Grok's pattern dictionary")); } @@ -70,7 +71,7 @@ public void testMatchWithoutCaptures() throws Exception { originalDoc.setFieldValue(fieldName, fieldName); IngestDocument doc = new IngestDocument(originalDoc); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.emptyMap(), - Collections.singletonList(fieldName), fieldName, false, false); + Collections.singletonList(fieldName), fieldName, false, false, ThreadInterrupter.noop()); processor.execute(doc); assertThat(doc, equalTo(originalDoc)); } @@ -80,7 +81,7 @@ public void testNullField() { IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); doc.setFieldValue(fieldName, null); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, false); + Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadInterrupter.noop()); Exception e = expectThrows(Exception.class, () -> processor.execute(doc)); assertThat(e.getMessage(), equalTo("field [" + fieldName + "] is null, cannot process it.")); } @@ -91,7 +92,7 @@ public void testNullFieldWithIgnoreMissing() throws Exception { originalIngestDocument.setFieldValue(fieldName, null); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, true); + Collections.singletonList("%{ONE:one}"), fieldName, false, true, ThreadInterrupter.noop()); processor.execute(ingestDocument); assertIngestDocument(originalIngestDocument, ingestDocument); } @@ -101,7 +102,7 @@ public void testNotStringField() { IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); doc.setFieldValue(fieldName, 1); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, false); + Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadInterrupter.noop()); Exception e = expectThrows(Exception.class, () -> processor.execute(doc)); assertThat(e.getMessage(), equalTo("field [" + fieldName + "] of type [java.lang.Integer] cannot be cast to [java.lang.String]")); } @@ -111,7 +112,7 @@ public void testNotStringFieldWithIgnoreMissing() { IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); doc.setFieldValue(fieldName, 1); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, true); + Collections.singletonList("%{ONE:one}"), fieldName, false, true, ThreadInterrupter.noop()); Exception e = expectThrows(Exception.class, () -> processor.execute(doc)); assertThat(e.getMessage(), equalTo("field [" + fieldName + "] of type [java.lang.Integer] cannot be cast to [java.lang.String]")); } @@ -120,7 +121,7 @@ public void testMissingField() { String fieldName = "foo.bar"; IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, false); + Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadInterrupter.noop()); Exception e = expectThrows(Exception.class, () -> processor.execute(doc)); assertThat(e.getMessage(), equalTo("field [foo] not present as part of path [foo.bar]")); } @@ -130,7 +131,7 @@ public void testMissingFieldWithIgnoreMissing() throws Exception { IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, true); + Collections.singletonList("%{ONE:one}"), fieldName, false, true, ThreadInterrupter.noop()); processor.execute(ingestDocument); assertIngestDocument(originalIngestDocument, ingestDocument); } @@ -144,7 +145,7 @@ public void testMultiplePatternsWithMatchReturn() throws Exception { patternBank.put("TWO", "2"); patternBank.put("THREE", "3"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, - Arrays.asList("%{ONE:one}", "%{TWO:two}", "%{THREE:three}"), fieldName, false, false); + Arrays.asList("%{ONE:one}", "%{TWO:two}", "%{THREE:three}"), fieldName, false, false, ThreadInterrupter.noop()); processor.execute(doc); assertThat(doc.hasField("one"), equalTo(false)); assertThat(doc.getFieldValue("two", String.class), equalTo("2")); @@ -160,7 +161,7 @@ public void testSetMetadata() throws Exception { patternBank.put("TWO", "2"); patternBank.put("THREE", "3"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, - Arrays.asList("%{ONE:one}", "%{TWO:two}", "%{THREE:three}"), fieldName, true, false); + Arrays.asList("%{ONE:one}", "%{TWO:two}", "%{THREE:three}"), fieldName, true, false, ThreadInterrupter.noop()); processor.execute(doc); assertThat(doc.hasField("one"), equalTo(false)); assertThat(doc.getFieldValue("two", String.class), equalTo("2")); @@ -175,7 +176,7 @@ public void testTraceWithOnePattern() throws Exception { Map patternBank = new HashMap<>(); patternBank.put("ONE", "1"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, - Arrays.asList("%{ONE:one}"), fieldName, true, false); + Arrays.asList("%{ONE:one}"), fieldName, true, false, ThreadInterrupter.noop()); processor.execute(doc); assertThat(doc.hasField("one"), equalTo(true)); assertThat(doc.getFieldValue("_ingest._grok_match_index", String.class), equalTo("0")); @@ -205,8 +206,8 @@ public void testCombineSamePatternNameAcrossPatterns() throws Exception { patternBank.put("ONE", "1"); patternBank.put("TWO", "2"); patternBank.put("THREE", "3"); - GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, - Arrays.asList("%{ONE:first}-%{TWO:second}", "%{ONE:first}-%{THREE:second}"), fieldName, randomBoolean(), randomBoolean()); + GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, Arrays.asList("%{ONE:first}-%{TWO:second}", + "%{ONE:first}-%{THREE:second}"), fieldName, randomBoolean(), randomBoolean(), ThreadInterrupter.noop()); processor.execute(doc); assertThat(doc.getFieldValue("first", String.class), equalTo("1")); assertThat(doc.getFieldValue("second", String.class), equalTo("3")); @@ -219,7 +220,8 @@ public void testFirstWinNamedCapture() throws Exception { Map patternBank = new HashMap<>(); patternBank.put("ONETWO", "1|2"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, - Collections.singletonList("%{ONETWO:first}%{ONETWO:first}"), fieldName, randomBoolean(), randomBoolean()); + Collections.singletonList("%{ONETWO:first}%{ONETWO:first}"), fieldName, randomBoolean(), randomBoolean(), + ThreadInterrupter.noop()); processor.execute(doc); assertThat(doc.getFieldValue("first", String.class), equalTo("1")); } @@ -232,7 +234,8 @@ public void testUnmatchedNamesNotIncludedInDocument() throws Exception { patternBank.put("ONETWO", "1|2"); patternBank.put("THREE", "3"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, - Collections.singletonList("%{ONETWO:first}|%{THREE:second}"), fieldName, randomBoolean(), randomBoolean()); + Collections.singletonList("%{ONETWO:first}|%{THREE:second}"), fieldName, randomBoolean(), randomBoolean(), + ThreadInterrupter.noop()); processor.execute(doc); assertFalse(doc.hasField("first")); assertThat(doc.getFieldValue("second", String.class), equalTo("3")); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index ad2b8643f7ae3..25bb50cd793bb 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -43,7 +43,7 @@ public IngestService(Settings settings, ThreadPool threadPool, Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, List ingestPlugins) { Processor.Parameters parameters = new Processor.Parameters(env, scriptService, - analysisRegistry, threadPool.getThreadContext()); + analysisRegistry, threadPool); Map processorFactories = new HashMap<>(); for (IngestPlugin ingestPlugin : ingestPlugins) { Map newProcessors = ingestPlugin.getProcessors(parameters); diff --git a/server/src/main/java/org/elasticsearch/ingest/Processor.java b/server/src/main/java/org/elasticsearch/ingest/Processor.java index 39d74fb09a945..ce3d87b2b167e 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/server/src/main/java/org/elasticsearch/ingest/Processor.java @@ -23,6 +23,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.threadpool.ThreadPool; import java.util.Map; @@ -94,13 +95,20 @@ class Parameters { * instances that have run prior to in ingest. */ public final ThreadContext threadContext; + + /** + * Provides a thread pool + */ + // TODO: do we really want to expose ThreadPool here? Or a BiFunction> to just handle scheduling? + public final ThreadPool threadPool; public Parameters(Environment env, ScriptService scriptService, - AnalysisRegistry analysisRegistry, ThreadContext threadContext) { + AnalysisRegistry analysisRegistry, ThreadPool threadPool) { this.env = env; this.scriptService = scriptService; - this.threadContext = threadContext; + this.threadContext = threadPool.getThreadContext(); this.analysisRegistry = analysisRegistry; + this.threadPool = threadPool; } } From d90f38b45b1d03344b15337d6cab9450059ea142 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 4 Jun 2018 13:12:48 +0200 Subject: [PATCH 2/5] iter --- .../java/org/elasticsearch/grok/Grok.java | 32 ++++---- ...adInterrupter.java => ThreadWatchdog.java} | 74 +++++++++++-------- .../org/elasticsearch/grok/GrokTests.java | 5 +- ...terTests.java => ThreadWatchdogTests.java} | 20 +++-- .../ingest/common/GrokProcessor.java | 14 ++-- .../ingest/common/IngestCommonPlugin.java | 26 +++---- .../common/GrokProcessorFactoryTests.java | 18 ++--- .../ingest/common/GrokProcessorTests.java | 34 ++++----- .../elasticsearch/ingest/IngestService.java | 9 ++- .../org/elasticsearch/ingest/Processor.java | 20 +++-- 10 files changed, 138 insertions(+), 114 deletions(-) rename libs/grok/src/main/java/org/elasticsearch/grok/{ThreadInterrupter.java => ThreadWatchdog.java} (66%) rename libs/grok/src/test/java/org/elasticsearch/grok/{ThreadInterrupterTests.java => ThreadWatchdogTests.java} (73%) diff --git a/libs/grok/src/main/java/org/elasticsearch/grok/Grok.java b/libs/grok/src/main/java/org/elasticsearch/grok/Grok.java index cd20ddcf42252..ae89013fde7d3 100644 --- a/libs/grok/src/main/java/org/elasticsearch/grok/Grok.java +++ b/libs/grok/src/main/java/org/elasticsearch/grok/Grok.java @@ -76,24 +76,24 @@ public final class Grok { private final Map patternBank; private final boolean namedCaptures; private final Regex compiledExpression; - private final ThreadInterrupter threadInterrupter; + private final ThreadWatchdog threadWatchdog; public Grok(Map patternBank, String grokPattern) { - this(patternBank, grokPattern, true, ThreadInterrupter.noop()); + this(patternBank, grokPattern, true, ThreadWatchdog.noop()); } - public Grok(Map patternBank, String grokPattern, ThreadInterrupter threadInterrupter) { - this(patternBank, grokPattern, true, threadInterrupter); + public Grok(Map patternBank, String grokPattern, ThreadWatchdog threadWatchdog) { + this(patternBank, grokPattern, true, threadWatchdog); } Grok(Map patternBank, String grokPattern, boolean namedCaptures) { - this(patternBank, grokPattern, namedCaptures, ThreadInterrupter.noop()); + this(patternBank, grokPattern, namedCaptures, ThreadWatchdog.noop()); } - private Grok(Map patternBank, String grokPattern, boolean namedCaptures, ThreadInterrupter threadInterrupter) { + private Grok(Map patternBank, String grokPattern, boolean namedCaptures, ThreadWatchdog threadWatchdog) { this.patternBank = patternBank; this.namedCaptures = namedCaptures; - this.threadInterrupter = threadInterrupter; + this.threadWatchdog = threadWatchdog; for (Map.Entry entry : patternBank.entrySet()) { String name = entry.getKey(); @@ -174,10 +174,10 @@ public String toRegex(String grokPattern) { int result; try { - threadInterrupter.register(); + threadWatchdog.register(); result = matcher.search(0, grokPatternBytes.length, Option.NONE); } finally { - threadInterrupter.deregister(); + threadWatchdog.unregister(); } if (result != -1) { Region region = matcher.getEagerRegion(); @@ -222,10 +222,10 @@ public boolean match(String text) { Matcher matcher = compiledExpression.matcher(text.getBytes(StandardCharsets.UTF_8)); int result; try { - threadInterrupter.register(); + threadWatchdog.register(); result = matcher.search(0, text.length(), Option.DEFAULT); } finally { - threadInterrupter.deregister(); + threadWatchdog.unregister(); } return (result != -1); } @@ -243,15 +243,16 @@ public Map captures(String text) { Matcher matcher = compiledExpression.matcher(textAsBytes); int result; try { - threadInterrupter.register(); + threadWatchdog.register(); result = matcher.search(0, textAsBytes.length, Option.DEFAULT); } finally { - threadInterrupter.deregister(); + threadWatchdog.unregister(); } if (result == Matcher.INTERRUPTED) { - throw new IllegalArgumentException("grok pattern matching is too complex and takes too long to execute"); + throw new IllegalArgumentException("grok pattern matching was interrupted after [" + + threadWatchdog.maxExecutionTime() + "] ms"); } else if (result == Matcher.FAILED) { - // I think we should throw an error here? + // TODO: I think we should throw an error here? return null; } else if (compiledExpression.numberOfNames() > 0) { Region region = matcher.getEagerRegion(); @@ -267,7 +268,6 @@ public Map captures(String text) { break; } } - } } return fields; diff --git a/libs/grok/src/main/java/org/elasticsearch/grok/ThreadInterrupter.java b/libs/grok/src/main/java/org/elasticsearch/grok/ThreadWatchdog.java similarity index 66% rename from libs/grok/src/main/java/org/elasticsearch/grok/ThreadInterrupter.java rename to libs/grok/src/main/java/org/elasticsearch/grok/ThreadWatchdog.java index 94bb600a9222b..c06e81ddb63c8 100644 --- a/libs/grok/src/main/java/org/elasticsearch/grok/ThreadInterrupter.java +++ b/libs/grok/src/main/java/org/elasticsearch/grok/ThreadWatchdog.java @@ -25,31 +25,37 @@ import java.util.function.LongSupplier; /** - * Protects against long running operations that happen between the register and de-register invocations. - * Threads that invoke {@link #register()}, but take too long to invoke the {@link #deregister()} method + * Protects against long running operations that happen between the register and unregister invocations. + * Threads that invoke {@link #register()}, but take too long to invoke the {@link #unregister()} method * will be interrupted. * * This is needed for Joni's {@link org.joni.Matcher#search(int, int, int)} method, because * it can end up spinning endlessly if the regular expression is too complex. Joni has checks - * that that for every 30k iterations it checks if the current thread is interrupted and if so + * that for every 30k iterations it checks if the current thread is interrupted and if so * returns {@link org.joni.Matcher#INTERRUPTED}. */ -public interface ThreadInterrupter { +public interface ThreadWatchdog { /** * Registers the current thread and interrupts the current thread - * if the takes too long for this thread to invoke {@link #deregister()}. + * if the takes too long for this thread to invoke {@link #unregister()}. */ void register(); /** - * De-registers the current thread and prevents it from being interrupted. + * @return The maximum allowed time for a thread to invoke {@link #unregister()} after {@link #register()} + * has been invoked before this ThreadWatchDog starts to interrupting that thread. */ - void deregister(); + long maxExecutionTime(); + + /** + * Unregisters the current thread and prevents it from being interrupted. + */ + void unregister(); /** * Returns an implementation that checks for each fixed interval if there are threads that have invoked {@link #register()} - * and not {@link #deregister()} and have been in this state for longer than the specified max execution interval and + * and not {@link #unregister()} and have been in this state for longer than the specified max execution interval and * then interrupts these threads. * * @param interval The fixed interval to check if there are threads to interrupt @@ -57,35 +63,42 @@ public interface ThreadInterrupter { * @param relativeTimeSupplier A supplier that returns relative time * @param scheduler A scheduler that is able to execute a command for each fixed interval */ - static ThreadInterrupter newInstance(long interval, - long maxExecutionTime, - LongSupplier relativeTimeSupplier, - BiFunction> scheduler) { + static ThreadWatchdog newInstance(long interval, + long maxExecutionTime, + LongSupplier relativeTimeSupplier, + BiFunction> scheduler) { return new Default(interval, maxExecutionTime, relativeTimeSupplier, scheduler); } /** * @return A noop implementation that does not interrupt threads and is useful for testing and pre-defined grok expressions. */ - static ThreadInterrupter noop() { - return new Noop(); + static ThreadWatchdog noop() { + return Noop.INSTANCE; } - class Noop implements ThreadInterrupter { + class Noop implements ThreadWatchdog { + + private static final Noop INSTANCE = new Noop(); private Noop() { } - + @Override public void register() { } + + @Override + public long maxExecutionTime() { + return Long.MAX_VALUE; + } @Override - public void deregister() { + public void unregister() { } } - class Default implements ThreadInterrupter { + class Default implements ThreadWatchdog { private final long interval; private final long maxExecutionTime; @@ -108,25 +121,26 @@ public void register() { Long previousValue = registry.put(Thread.currentThread(), relativeTimeSupplier.getAsLong()); assert previousValue == null; } - - public void deregister() { + + @Override + public long maxExecutionTime() { + return maxExecutionTime; + } + + public void unregister() { Long previousValue = registry.remove(Thread.currentThread()); assert previousValue != null; } private void interruptLongRunningExecutions() { - try { - final long currentRelativeTime = relativeTimeSupplier.getAsLong(); - for (Map.Entry entry : registry.entrySet()) { - long threadTime = entry.getValue(); - if ((currentRelativeTime - threadTime) > maxExecutionTime) { - entry.getKey().interrupt(); - // not removing the entry here, this happens in the deregister() method. - } + final long currentRelativeTime = relativeTimeSupplier.getAsLong(); + for (Map.Entry entry : registry.entrySet()) { + if ((currentRelativeTime - entry.getValue()) > maxExecutionTime) { + entry.getKey().interrupt(); + // not removing the entry here, this happens in the unregister() method. } - } finally { - scheduler.apply(interval, this::interruptLongRunningExecutions); } + scheduler.apply(interval, this::interruptLongRunningExecutions); } } diff --git a/libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java b/libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java index d66b601eace2d..c04be73ff6efa 100644 --- a/libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java +++ b/libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java @@ -424,6 +424,7 @@ public void testExponentialExpressions() { try { Thread.sleep(delay); } catch (InterruptedException e) { + throw new AssertionError(e); } Thread t = new Thread(() -> { if (run.get()) { @@ -433,9 +434,9 @@ public void testExponentialExpressions() { t.start(); return null; }; - Grok grok = new Grok(basePatterns, grokPattern, ThreadInterrupter.newInstance(10, 200, System::currentTimeMillis, scheduler)); + Grok grok = new Grok(basePatterns, grokPattern, ThreadWatchdog.newInstance(10, 200, System::currentTimeMillis, scheduler)); Exception e = expectThrows(IllegalArgumentException.class, () -> grok.captures(logLine)); run.set(false); - assertThat(e.getMessage(), equalTo("grok pattern matching is too complex and takes too long to execute")); + assertThat(e.getMessage(), equalTo("grok pattern matching was interrupted after [200] ms")); } } diff --git a/libs/grok/src/test/java/org/elasticsearch/grok/ThreadInterrupterTests.java b/libs/grok/src/test/java/org/elasticsearch/grok/ThreadWatchdogTests.java similarity index 73% rename from libs/grok/src/test/java/org/elasticsearch/grok/ThreadInterrupterTests.java rename to libs/grok/src/test/java/org/elasticsearch/grok/ThreadWatchdogTests.java index f4cd455af46e4..46faa4ae05d38 100644 --- a/libs/grok/src/test/java/org/elasticsearch/grok/ThreadInterrupterTests.java +++ b/libs/grok/src/test/java/org/elasticsearch/grok/ThreadWatchdogTests.java @@ -25,14 +25,15 @@ import static org.hamcrest.Matchers.is; -public class ThreadInterrupterTests extends ESTestCase { +public class ThreadWatchdogTests extends ESTestCase { public void testInterrupt() throws Exception { AtomicBoolean run = new AtomicBoolean(true); // to avoid a lingering thread when test has completed - ThreadInterrupter guard = ThreadInterrupter.newInstance(10, 100, System::currentTimeMillis, (delay, command) -> { + ThreadWatchdog watchdog = ThreadWatchdog.newInstance(10, 100, System::currentTimeMillis, (delay, command) -> { try { Thread.sleep(delay); } catch (InterruptedException e) { + throw new AssertionError(e); } Thread thread = new Thread(() -> { if (run.get()) { @@ -43,18 +44,21 @@ public void testInterrupt() throws Exception { return null; }); - Map registry = ((ThreadInterrupter.Default) guard).registry; + Map registry = ((ThreadWatchdog.Default) watchdog).registry; assertThat(registry.size(), is(0)); // need to call #register() method on a different thread, assertBusy() fails if current thread gets interrupted + AtomicBoolean interrupted = new AtomicBoolean(false); Thread thread = new Thread(() -> { - guard.register(); - while (run.get()) { - } - guard.deregister(); + Thread currentThread = Thread.currentThread(); + watchdog.register(); + while (currentThread.isInterrupted() == false) {} + interrupted.set(true); + while (run.get()) {} // wait here so that the size of the registry can be asserted + watchdog.unregister(); }); thread.start(); assertBusy(() -> { - assertThat(thread.isInterrupted(), is(true)); + assertThat(interrupted.get(), is(true)); assertThat(registry.size(), is(1)); }); run.set(false); diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java index 982653162cadf..7bb3ebfba6e36 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java @@ -20,7 +20,7 @@ package org.elasticsearch.ingest.common; import org.elasticsearch.grok.Grok; -import org.elasticsearch.grok.ThreadInterrupter; +import org.elasticsearch.grok.ThreadWatchdog; import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; @@ -44,11 +44,11 @@ public final class GrokProcessor extends AbstractProcessor { private final boolean ignoreMissing; GrokProcessor(String tag, Map patternBank, List matchPatterns, String matchField, - boolean traceMatch, boolean ignoreMissing, ThreadInterrupter threadInterrupter) { + boolean traceMatch, boolean ignoreMissing, ThreadWatchdog threadWatchdog) { super(tag); this.matchField = matchField; this.matchPatterns = matchPatterns; - this.grok = new Grok(patternBank, combinePatterns(matchPatterns, traceMatch), threadInterrupter); + this.grok = new Grok(patternBank, combinePatterns(matchPatterns, traceMatch), threadWatchdog); this.traceMatch = traceMatch; this.ignoreMissing = ignoreMissing; } @@ -133,11 +133,11 @@ static String combinePatterns(List patterns, boolean traceMatch) { public static final class Factory implements Processor.Factory { private final Map builtinPatterns; - private final ThreadInterrupter threadInterrupter; + private final ThreadWatchdog threadWatchdog; - public Factory(Map builtinPatterns, ThreadInterrupter threadInterrupter) { + public Factory(Map builtinPatterns, ThreadWatchdog threadWatchdog) { this.builtinPatterns = builtinPatterns; - this.threadInterrupter = threadInterrupter; + this.threadWatchdog = threadWatchdog; } @Override @@ -159,7 +159,7 @@ public GrokProcessor create(Map registry, String proc try { return new GrokProcessor(processorTag, patternBank, matchPatterns, matchField, traceMatch, ignoreMissing, - threadInterrupter); + threadWatchdog); } catch (Exception e) { throw newConfigurationException(TYPE, processorTag, "patterns", "Invalid regex pattern found in: " + matchPatterns + ". " + e.getMessage()); diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index a12d9c133a69e..3b323fdd68b04 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -30,14 +30,13 @@ import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.grok.Grok; -import org.elasticsearch.grok.ThreadInterrupter; +import org.elasticsearch.grok.ThreadWatchdog; import org.elasticsearch.ingest.Processor; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; -import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; import java.util.Collections; @@ -49,10 +48,10 @@ public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPlugin { static final Map GROK_PATTERNS = Grok.getBuiltinPatterns(); - static final Setting GUARD_INTERVAL = - Setting.timeSetting("ingest.grok.guard.interval", TimeValue.timeValueSeconds(3), Setting.Property.NodeScope); - static final Setting GUARD_MAX_EXECUTION_TIME = - Setting.timeSetting("ingest.grok.guard.max_execution_time", TimeValue.timeValueSeconds(5), Setting.Property.NodeScope); + static final Setting WATCHDOG_INTERVAL = + Setting.timeSetting("ingest.grok.watchdog.interval", TimeValue.timeValueSeconds(3), Setting.Property.NodeScope); + static final Setting WATCHDOG_MAX_EXECUTION_TIME = + Setting.timeSetting("ingest.grok.watchdog.max_execution_time", TimeValue.timeValueSeconds(5), Setting.Property.NodeScope); public IngestCommonPlugin() { } @@ -76,8 +75,7 @@ public Map getProcessors(Processor.Parameters paramet processors.put(ForEachProcessor.TYPE, new ForEachProcessor.Factory()); processors.put(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory()); processors.put(SortProcessor.TYPE, new SortProcessor.Factory()); - ThreadInterrupter threadInterrupter = createGrokThreadInterrupter(parameters); - processors.put(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS, threadInterrupter)); + processors.put(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS, createGrokThreadWatchdog(parameters))); processors.put(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService)); processors.put(DotExpanderProcessor.TYPE, new DotExpanderProcessor.Factory()); processors.put(JsonProcessor.TYPE, new JsonProcessor.Factory()); @@ -101,15 +99,13 @@ public List getRestHandlers(Settings settings, RestController restC @Override public List> getSettings() { - return Arrays.asList(GUARD_INTERVAL, GUARD_MAX_EXECUTION_TIME); + return Arrays.asList(WATCHDOG_INTERVAL, WATCHDOG_MAX_EXECUTION_TIME); } - private static ThreadInterrupter createGrokThreadInterrupter(Processor.Parameters parameters) { - long intervalMillis = GUARD_INTERVAL.get(parameters.env.settings()).getMillis(); - long maxExecutionTimeMillis = GUARD_MAX_EXECUTION_TIME.get(parameters.env.settings()).getMillis(); - ThreadPool threadPool = parameters.threadPool; - return ThreadInterrupter.newInstance(intervalMillis, maxExecutionTimeMillis, threadPool::relativeTimeInMillis, - (delay, command) -> threadPool.schedule(TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC, command)); + private static ThreadWatchdog createGrokThreadWatchdog(Processor.Parameters parameters) { + long intervalMillis = WATCHDOG_INTERVAL.get(parameters.env.settings()).getMillis(); + long maxExecutionTimeMillis = WATCHDOG_MAX_EXECUTION_TIME.get(parameters.env.settings()).getMillis(); + return ThreadWatchdog.newInstance(intervalMillis, maxExecutionTimeMillis, parameters.relativeTimeSupplier, parameters.scheduler); } } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorFactoryTests.java index 59db6e9d83166..f35fa34eec46d 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorFactoryTests.java @@ -20,7 +20,7 @@ package org.elasticsearch.ingest.common; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.grok.ThreadInterrupter; +import org.elasticsearch.grok.ThreadWatchdog; import org.elasticsearch.test.ESTestCase; import java.util.Collections; @@ -34,7 +34,7 @@ public class GrokProcessorFactoryTests extends ESTestCase { public void testBuild() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadInterrupter.noop()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop()); Map config = new HashMap<>(); config.put("field", "_field"); @@ -48,7 +48,7 @@ public void testBuild() throws Exception { } public void testBuildWithIgnoreMissing() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadInterrupter.noop()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop()); Map config = new HashMap<>(); config.put("field", "_field"); @@ -63,7 +63,7 @@ public void testBuildWithIgnoreMissing() throws Exception { } public void testBuildMissingField() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadInterrupter.noop()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop()); Map config = new HashMap<>(); config.put("patterns", Collections.singletonList("(?\\w+)")); ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config)); @@ -71,7 +71,7 @@ public void testBuildMissingField() throws Exception { } public void testBuildMissingPatterns() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadInterrupter.noop()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop()); Map config = new HashMap<>(); config.put("field", "foo"); ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config)); @@ -79,7 +79,7 @@ public void testBuildMissingPatterns() throws Exception { } public void testBuildEmptyPatternsList() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadInterrupter.noop()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop()); Map config = new HashMap<>(); config.put("field", "foo"); config.put("patterns", Collections.emptyList()); @@ -88,7 +88,7 @@ public void testBuildEmptyPatternsList() throws Exception { } public void testCreateWithCustomPatterns() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadInterrupter.noop()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop()); Map config = new HashMap<>(); config.put("field", "_field"); @@ -101,7 +101,7 @@ public void testCreateWithCustomPatterns() throws Exception { } public void testCreateWithInvalidPattern() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadInterrupter.noop()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop()); Map config = new HashMap<>(); config.put("field", "_field"); config.put("patterns", Collections.singletonList("[")); @@ -110,7 +110,7 @@ public void testCreateWithInvalidPattern() throws Exception { } public void testCreateWithInvalidPatternDefinition() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadInterrupter.noop()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop()); Map config = new HashMap<>(); config.put("field", "_field"); config.put("patterns", Collections.singletonList("%{MY_PATTERN:name}!")); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorTests.java index bece3edda7c54..0eba79523aca2 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorTests.java @@ -19,7 +19,7 @@ package org.elasticsearch.ingest.common; -import org.elasticsearch.grok.ThreadInterrupter; +import org.elasticsearch.grok.ThreadWatchdog; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.test.ESTestCase; @@ -40,7 +40,7 @@ public void testMatch() throws Exception { IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); doc.setFieldValue(fieldName, "1"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadInterrupter.noop()); + Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop()); processor.execute(doc); assertThat(doc.getFieldValue("one", String.class), equalTo("1")); } @@ -50,7 +50,7 @@ public void testNoMatch() { IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); doc.setFieldValue(fieldName, "23"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadInterrupter.noop()); + Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop()); Exception e = expectThrows(Exception.class, () -> processor.execute(doc)); assertThat(e.getMessage(), equalTo("Provided Grok expressions do not match field value: [23]")); } @@ -61,7 +61,7 @@ public void testNoMatchingPatternName() { doc.setFieldValue(fieldName, "23"); Exception e = expectThrows(IllegalArgumentException.class, () -> new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), Collections.singletonList("%{NOTONE:not_one}"), fieldName, - false, false, ThreadInterrupter.noop())); + false, false, ThreadWatchdog.noop())); assertThat(e.getMessage(), equalTo("Unable to find pattern [NOTONE] in Grok's pattern dictionary")); } @@ -71,7 +71,7 @@ public void testMatchWithoutCaptures() throws Exception { originalDoc.setFieldValue(fieldName, fieldName); IngestDocument doc = new IngestDocument(originalDoc); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.emptyMap(), - Collections.singletonList(fieldName), fieldName, false, false, ThreadInterrupter.noop()); + Collections.singletonList(fieldName), fieldName, false, false, ThreadWatchdog.noop()); processor.execute(doc); assertThat(doc, equalTo(originalDoc)); } @@ -81,7 +81,7 @@ public void testNullField() { IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); doc.setFieldValue(fieldName, null); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadInterrupter.noop()); + Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop()); Exception e = expectThrows(Exception.class, () -> processor.execute(doc)); assertThat(e.getMessage(), equalTo("field [" + fieldName + "] is null, cannot process it.")); } @@ -92,7 +92,7 @@ public void testNullFieldWithIgnoreMissing() throws Exception { originalIngestDocument.setFieldValue(fieldName, null); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, true, ThreadInterrupter.noop()); + Collections.singletonList("%{ONE:one}"), fieldName, false, true, ThreadWatchdog.noop()); processor.execute(ingestDocument); assertIngestDocument(originalIngestDocument, ingestDocument); } @@ -102,7 +102,7 @@ public void testNotStringField() { IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); doc.setFieldValue(fieldName, 1); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadInterrupter.noop()); + Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop()); Exception e = expectThrows(Exception.class, () -> processor.execute(doc)); assertThat(e.getMessage(), equalTo("field [" + fieldName + "] of type [java.lang.Integer] cannot be cast to [java.lang.String]")); } @@ -112,7 +112,7 @@ public void testNotStringFieldWithIgnoreMissing() { IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); doc.setFieldValue(fieldName, 1); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, true, ThreadInterrupter.noop()); + Collections.singletonList("%{ONE:one}"), fieldName, false, true, ThreadWatchdog.noop()); Exception e = expectThrows(Exception.class, () -> processor.execute(doc)); assertThat(e.getMessage(), equalTo("field [" + fieldName + "] of type [java.lang.Integer] cannot be cast to [java.lang.String]")); } @@ -121,7 +121,7 @@ public void testMissingField() { String fieldName = "foo.bar"; IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadInterrupter.noop()); + Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop()); Exception e = expectThrows(Exception.class, () -> processor.execute(doc)); assertThat(e.getMessage(), equalTo("field [foo] not present as part of path [foo.bar]")); } @@ -131,7 +131,7 @@ public void testMissingFieldWithIgnoreMissing() throws Exception { IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, true, ThreadInterrupter.noop()); + Collections.singletonList("%{ONE:one}"), fieldName, false, true, ThreadWatchdog.noop()); processor.execute(ingestDocument); assertIngestDocument(originalIngestDocument, ingestDocument); } @@ -145,7 +145,7 @@ public void testMultiplePatternsWithMatchReturn() throws Exception { patternBank.put("TWO", "2"); patternBank.put("THREE", "3"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, - Arrays.asList("%{ONE:one}", "%{TWO:two}", "%{THREE:three}"), fieldName, false, false, ThreadInterrupter.noop()); + Arrays.asList("%{ONE:one}", "%{TWO:two}", "%{THREE:three}"), fieldName, false, false, ThreadWatchdog.noop()); processor.execute(doc); assertThat(doc.hasField("one"), equalTo(false)); assertThat(doc.getFieldValue("two", String.class), equalTo("2")); @@ -161,7 +161,7 @@ public void testSetMetadata() throws Exception { patternBank.put("TWO", "2"); patternBank.put("THREE", "3"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, - Arrays.asList("%{ONE:one}", "%{TWO:two}", "%{THREE:three}"), fieldName, true, false, ThreadInterrupter.noop()); + Arrays.asList("%{ONE:one}", "%{TWO:two}", "%{THREE:three}"), fieldName, true, false, ThreadWatchdog.noop()); processor.execute(doc); assertThat(doc.hasField("one"), equalTo(false)); assertThat(doc.getFieldValue("two", String.class), equalTo("2")); @@ -176,7 +176,7 @@ public void testTraceWithOnePattern() throws Exception { Map patternBank = new HashMap<>(); patternBank.put("ONE", "1"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, - Arrays.asList("%{ONE:one}"), fieldName, true, false, ThreadInterrupter.noop()); + Arrays.asList("%{ONE:one}"), fieldName, true, false, ThreadWatchdog.noop()); processor.execute(doc); assertThat(doc.hasField("one"), equalTo(true)); assertThat(doc.getFieldValue("_ingest._grok_match_index", String.class), equalTo("0")); @@ -207,7 +207,7 @@ public void testCombineSamePatternNameAcrossPatterns() throws Exception { patternBank.put("TWO", "2"); patternBank.put("THREE", "3"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, Arrays.asList("%{ONE:first}-%{TWO:second}", - "%{ONE:first}-%{THREE:second}"), fieldName, randomBoolean(), randomBoolean(), ThreadInterrupter.noop()); + "%{ONE:first}-%{THREE:second}"), fieldName, randomBoolean(), randomBoolean(), ThreadWatchdog.noop()); processor.execute(doc); assertThat(doc.getFieldValue("first", String.class), equalTo("1")); assertThat(doc.getFieldValue("second", String.class), equalTo("3")); @@ -221,7 +221,7 @@ public void testFirstWinNamedCapture() throws Exception { patternBank.put("ONETWO", "1|2"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, Collections.singletonList("%{ONETWO:first}%{ONETWO:first}"), fieldName, randomBoolean(), randomBoolean(), - ThreadInterrupter.noop()); + ThreadWatchdog.noop()); processor.execute(doc); assertThat(doc.getFieldValue("first", String.class), equalTo("1")); } @@ -235,7 +235,7 @@ public void testUnmatchedNamesNotIncludedInDocument() throws Exception { patternBank.put("THREE", "3"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, Collections.singletonList("%{ONETWO:first}|%{THREE:second}"), fieldName, randomBoolean(), randomBoolean(), - ThreadInterrupter.noop()); + ThreadWatchdog.noop()); processor.execute(doc); assertFalse(doc.hasField("first")); assertThat(doc.getFieldValue("second", String.class), equalTo("3")); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 25bb50cd793bb..46b11f7ac141d 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -24,8 +24,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ScheduledFuture; +import java.util.function.BiFunction; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.Environment; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.plugins.IngestPlugin; @@ -42,8 +45,10 @@ public class IngestService { public IngestService(Settings settings, ThreadPool threadPool, Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, List ingestPlugins) { - Processor.Parameters parameters = new Processor.Parameters(env, scriptService, - analysisRegistry, threadPool); + BiFunction> scheduler = + (delay, command) -> threadPool.schedule(TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC, command); + Processor.Parameters parameters = new Processor.Parameters(env, scriptService, analysisRegistry, + threadPool.getThreadContext(), threadPool::relativeTimeInMillis, scheduler); Map processorFactories = new HashMap<>(); for (IngestPlugin ingestPlugin : ingestPlugins) { Map newProcessors = ingestPlugin.getProcessors(parameters); diff --git a/server/src/main/java/org/elasticsearch/ingest/Processor.java b/server/src/main/java/org/elasticsearch/ingest/Processor.java index ce3d87b2b167e..c318d478814de 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/server/src/main/java/org/elasticsearch/ingest/Processor.java @@ -23,9 +23,11 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.script.ScriptService; -import org.elasticsearch.threadpool.ThreadPool; import java.util.Map; +import java.util.concurrent.ScheduledFuture; +import java.util.function.BiFunction; +import java.util.function.LongSupplier; /** * A processor implementation may modify the data belonging to a document. @@ -96,19 +98,21 @@ class Parameters { */ public final ThreadContext threadContext; + public final LongSupplier relativeTimeSupplier; + /** - * Provides a thread pool + * Provides scheduler support */ - // TODO: do we really want to expose ThreadPool here? Or a BiFunction> to just handle scheduling? - public final ThreadPool threadPool; + public final BiFunction> scheduler; - public Parameters(Environment env, ScriptService scriptService, - AnalysisRegistry analysisRegistry, ThreadPool threadPool) { + public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext, + LongSupplier relativeTimeSupplier, BiFunction> scheduler) { this.env = env; this.scriptService = scriptService; - this.threadContext = threadPool.getThreadContext(); + this.threadContext = threadContext; this.analysisRegistry = analysisRegistry; - this.threadPool = threadPool; + this.relativeTimeSupplier = relativeTimeSupplier; + this.scheduler = scheduler; } } From 281ca0e7c57362cff4817f9d8bdc52f3752c3a32 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 4 Jun 2018 13:24:41 +0200 Subject: [PATCH 3/5] changed defaults and added docs --- docs/reference/ingest/ingest-node.asciidoc | 18 ++++++++++++++++++ .../ingest/common/IngestCommonPlugin.java | 4 ++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/docs/reference/ingest/ingest-node.asciidoc b/docs/reference/ingest/ingest-node.asciidoc index 8a7c33086abe8..d72a4dc8ee60f 100644 --- a/docs/reference/ingest/ingest-node.asciidoc +++ b/docs/reference/ingest/ingest-node.asciidoc @@ -1512,6 +1512,24 @@ The above request will return a response body containing a key-value representat This can be useful to reference as the built-in patterns change across versions. +[[grok-watchdog]] +==== Grok watchdog + +Grok expression that take too long to execute are interrupted and +the the grok processor then fails with an exception. The grok +processor has a watchdog thread that determines when evaluation +a grok expression takes too long and is controlled by the following +settings: + +[[grok-watchdog-options]] +.Grok watchdog settings +[options="header"] +|====== +| Name | Default | Description +| `ingest.grok.watchdog.interval` | 1s | How often to check whether there are grok evaluations that take longer than the maximum allowed execution time. +| `ingest.grok.watchdog.max_execution_time` | 1s | The maximum allowed execution of a grok expression evaluation. +|====== + [[gsub-processor]] === Gsub Processor Converts a string field by applying a regular expression and a replacement. diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index 3b323fdd68b04..591060098b728 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -49,9 +49,9 @@ public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPl static final Map GROK_PATTERNS = Grok.getBuiltinPatterns(); static final Setting WATCHDOG_INTERVAL = - Setting.timeSetting("ingest.grok.watchdog.interval", TimeValue.timeValueSeconds(3), Setting.Property.NodeScope); + Setting.timeSetting("ingest.grok.watchdog.interval", TimeValue.timeValueSeconds(1), Setting.Property.NodeScope); static final Setting WATCHDOG_MAX_EXECUTION_TIME = - Setting.timeSetting("ingest.grok.watchdog.max_execution_time", TimeValue.timeValueSeconds(5), Setting.Property.NodeScope); + Setting.timeSetting("ingest.grok.watchdog.max_execution_time", TimeValue.timeValueSeconds(1), Setting.Property.NodeScope); public IngestCommonPlugin() { } From 92737df414219e3ff921f5d38029cdd9398a63cc Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 4 Jun 2018 20:46:31 +0200 Subject: [PATCH 4/5] iter2 --- docs/reference/ingest/ingest-node.asciidoc | 6 +++--- libs/grok/src/main/java/org/elasticsearch/grok/Grok.java | 2 +- .../src/test/java/org/elasticsearch/grok/GrokTests.java | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/reference/ingest/ingest-node.asciidoc b/docs/reference/ingest/ingest-node.asciidoc index d72a4dc8ee60f..3acb13950108e 100644 --- a/docs/reference/ingest/ingest-node.asciidoc +++ b/docs/reference/ingest/ingest-node.asciidoc @@ -1515,9 +1515,9 @@ This can be useful to reference as the built-in patterns change across versions. [[grok-watchdog]] ==== Grok watchdog -Grok expression that take too long to execute are interrupted and -the the grok processor then fails with an exception. The grok -processor has a watchdog thread that determines when evaluation +Grok expressions that take too long to execute are interrupted and +the grok processor then fails with an exception. The grok +processor has a watchdog thread that determines when evaluation of a grok expression takes too long and is controlled by the following settings: diff --git a/libs/grok/src/main/java/org/elasticsearch/grok/Grok.java b/libs/grok/src/main/java/org/elasticsearch/grok/Grok.java index ae89013fde7d3..a3471550b2eba 100644 --- a/libs/grok/src/main/java/org/elasticsearch/grok/Grok.java +++ b/libs/grok/src/main/java/org/elasticsearch/grok/Grok.java @@ -249,7 +249,7 @@ public Map captures(String text) { threadWatchdog.unregister(); } if (result == Matcher.INTERRUPTED) { - throw new IllegalArgumentException("grok pattern matching was interrupted after [" + + throw new RuntimeException("grok pattern matching was interrupted after [" + threadWatchdog.maxExecutionTime() + "] ms"); } else if (result == Matcher.FAILED) { // TODO: I think we should throw an error here? diff --git a/libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java b/libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java index c04be73ff6efa..8d79aa290ebff 100644 --- a/libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java +++ b/libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java @@ -435,7 +435,7 @@ public void testExponentialExpressions() { return null; }; Grok grok = new Grok(basePatterns, grokPattern, ThreadWatchdog.newInstance(10, 200, System::currentTimeMillis, scheduler)); - Exception e = expectThrows(IllegalArgumentException.class, () -> grok.captures(logLine)); + Exception e = expectThrows(RuntimeException.class, () -> grok.captures(logLine)); run.set(false); assertThat(e.getMessage(), equalTo("grok pattern matching was interrupted after [200] ms")); } From c21a40824e033493c191e2f0de1d2474b25b633a Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 5 Jun 2018 07:45:46 +0200 Subject: [PATCH 5/5] rename method --- .../src/main/java/org/elasticsearch/grok/Grok.java | 2 +- .../java/org/elasticsearch/grok/ThreadWatchdog.java | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/libs/grok/src/main/java/org/elasticsearch/grok/Grok.java b/libs/grok/src/main/java/org/elasticsearch/grok/Grok.java index a3471550b2eba..02388d838bc2a 100644 --- a/libs/grok/src/main/java/org/elasticsearch/grok/Grok.java +++ b/libs/grok/src/main/java/org/elasticsearch/grok/Grok.java @@ -250,7 +250,7 @@ public Map captures(String text) { } if (result == Matcher.INTERRUPTED) { throw new RuntimeException("grok pattern matching was interrupted after [" + - threadWatchdog.maxExecutionTime() + "] ms"); + threadWatchdog.maxExecutionTimeInMillis() + "] ms"); } else if (result == Matcher.FAILED) { // TODO: I think we should throw an error here? return null; diff --git a/libs/grok/src/main/java/org/elasticsearch/grok/ThreadWatchdog.java b/libs/grok/src/main/java/org/elasticsearch/grok/ThreadWatchdog.java index c06e81ddb63c8..d0de7637d2c08 100644 --- a/libs/grok/src/main/java/org/elasticsearch/grok/ThreadWatchdog.java +++ b/libs/grok/src/main/java/org/elasticsearch/grok/ThreadWatchdog.java @@ -43,10 +43,10 @@ public interface ThreadWatchdog { void register(); /** - * @return The maximum allowed time for a thread to invoke {@link #unregister()} after {@link #register()} - * has been invoked before this ThreadWatchDog starts to interrupting that thread. + * @return The maximum allowed time in milliseconds for a thread to invoke {@link #unregister()} + * after {@link #register()} has been invoked before this ThreadWatchDog starts to interrupting that thread. */ - long maxExecutionTime(); + long maxExecutionTimeInMillis(); /** * Unregisters the current thread and prevents it from being interrupted. @@ -89,7 +89,7 @@ public void register() { } @Override - public long maxExecutionTime() { + public long maxExecutionTimeInMillis() { return Long.MAX_VALUE; } @@ -123,7 +123,7 @@ public void register() { } @Override - public long maxExecutionTime() { + public long maxExecutionTimeInMillis() { return maxExecutionTime; }