Skip to content

Commit

Permalink
[ML] Interrupt Grok in file structure finder timeout (#36588)
Browse files Browse the repository at this point in the history
The file structure finder has timeout functionality,
but prior to this change it would not interrupt a
single long-running Grok match attempt.

This commit hooks into the ThreadWatchdog facility
provided by the Grok library to interrupt individual
Grok matches that may be running at the time the
file structure finder timeout expires.
  • Loading branch information
droberts195 authored Dec 14, 2018
1 parent a4b32f1 commit 690b10a
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ public final class FileStructureUtils {

private static final int NUM_TOP_HITS = 10;
// NUMBER Grok pattern doesn't support scientific notation, so we extend it
private static final Grok NUMBER_GROK = new Grok(Grok.getBuiltinPatterns(), "^%{NUMBER}(?:[eE][+-]?[0-3]?[0-9]{1,2})?$");
private static final Grok IP_GROK = new Grok(Grok.getBuiltinPatterns(), "^%{IP}$");
private static final Grok NUMBER_GROK = new Grok(Grok.getBuiltinPatterns(), "^%{NUMBER}(?:[eE][+-]?[0-3]?[0-9]{1,2})?$",
TimeoutChecker.watchdog);
private static final Grok IP_GROK = new Grok(Grok.getBuiltinPatterns(), "^%{IP}$", TimeoutChecker.watchdog);
private static final int KEYWORD_MAX_LEN = 256;
private static final int KEYWORD_MAX_SPACES = 5;

Expand Down Expand Up @@ -69,7 +70,7 @@ static Tuple<String, TimestampMatch> guessTimestampField(List<String> explanatio
}

// Accept the first match from the first sample that is compatible with all the other samples
for (Tuple<String, TimestampMatch> candidate : findCandidates(explanation, sampleRecords, overrides)) {
for (Tuple<String, TimestampMatch> candidate : findCandidates(explanation, sampleRecords, overrides, timeoutChecker)) {

boolean allGood = true;
for (Map<String, ?> sampleRecord : sampleRecords.subList(1, sampleRecords.size())) {
Expand All @@ -87,7 +88,8 @@ static Tuple<String, TimestampMatch> guessTimestampField(List<String> explanatio

timeoutChecker.check("timestamp field determination");

TimestampMatch match = TimestampFormatFinder.findFirstFullMatch(fieldValue.toString(), overrides.getTimestampFormat());
TimestampMatch match = TimestampFormatFinder.findFirstFullMatch(fieldValue.toString(), overrides.getTimestampFormat(),
timeoutChecker);
if (match == null || match.candidateIndex != candidate.v2().candidateIndex) {
if (overrides.getTimestampFormat() != null) {
throw new IllegalArgumentException("Specified timestamp format [" + overrides.getTimestampFormat() +
Expand All @@ -111,7 +113,7 @@ static Tuple<String, TimestampMatch> guessTimestampField(List<String> explanatio
}

private static List<Tuple<String, TimestampMatch>> findCandidates(List<String> explanation, List<Map<String, ?>> sampleRecords,
FileStructureOverrides overrides) {
FileStructureOverrides overrides, TimeoutChecker timeoutChecker) {

assert sampleRecords.isEmpty() == false;
Map<String, ?> firstRecord = sampleRecords.get(0);
Expand All @@ -130,7 +132,8 @@ private static List<Tuple<String, TimestampMatch>> findCandidates(List<String> e
if (onlyConsiderField == null || onlyConsiderField.equals(fieldName)) {
Object value = field.getValue();
if (value != null) {
TimestampMatch match = TimestampFormatFinder.findFirstFullMatch(value.toString(), overrides.getTimestampFormat());
TimestampMatch match = TimestampFormatFinder.findFirstFullMatch(value.toString(), overrides.getTimestampFormat(),
timeoutChecker);
if (match != null) {
Tuple<String, TimestampMatch> candidate = new Tuple<>(fieldName, match);
candidates.add(candidate);
Expand Down Expand Up @@ -211,7 +214,7 @@ static Tuple<Map<String, String>, FieldStats> guessMappingAndCalculateFieldStats
}

Collection<String> fieldValuesAsStrings = fieldValues.stream().map(Object::toString).collect(Collectors.toList());
Map<String, String> mapping = guessScalarMapping(explanation, fieldName, fieldValuesAsStrings);
Map<String, String> mapping = guessScalarMapping(explanation, fieldName, fieldValuesAsStrings, timeoutChecker);
timeoutChecker.check("mapping determination");
return new Tuple<>(mapping, calculateFieldStats(fieldValuesAsStrings, timeoutChecker));
}
Expand All @@ -238,10 +241,12 @@ private static Stream<Object> flatten(Object value) {
* @param fieldValues Values of the field for which mappings are to be guessed. The guessed
* mapping will be compatible with all the provided values. Must not be
* empty.
* @param timeoutChecker Will abort the operation if its timeout is exceeded.
* @return The sub-section of the index mappings most appropriate for the field,
* for example <code>{ "type" : "keyword" }</code>.
*/
static Map<String, String> guessScalarMapping(List<String> explanation, String fieldName, Collection<String> fieldValues) {
static Map<String, String> guessScalarMapping(List<String> explanation, String fieldName, Collection<String> fieldValues,
TimeoutChecker timeoutChecker) {

assert fieldValues.isEmpty() == false;

Expand All @@ -251,11 +256,12 @@ static Map<String, String> guessScalarMapping(List<String> explanation, String f

// This checks if a date mapping would be appropriate, and, if so, finds the correct format
Iterator<String> iter = fieldValues.iterator();
TimestampMatch timestampMatch = TimestampFormatFinder.findFirstFullMatch(iter.next());
TimestampMatch timestampMatch = TimestampFormatFinder.findFirstFullMatch(iter.next(), timeoutChecker);
while (timestampMatch != null && iter.hasNext()) {
// To be mapped as type date all the values must match the same timestamp format - it is
// not acceptable for all values to be dates, but with different formats
if (timestampMatch.equals(TimestampFormatFinder.findFirstFullMatch(iter.next(), timestampMatch.candidateIndex)) == false) {
if (timestampMatch.equals(TimestampFormatFinder.findFirstFullMatch(iter.next(), timestampMatch.candidateIndex,
timeoutChecker)) == false) {
timestampMatch = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ static class ValueOnlyGrokPatternCandidate implements GrokPatternCandidate {
this.fieldName = fieldName;
// The (?m) here has the Ruby meaning, which is equivalent to (?s) in Java
grok = new Grok(Grok.getBuiltinPatterns(), "(?m)%{DATA:" + PREFACE + "}" + preBreak +
"%{" + grokPatternName + ":" + VALUE + "}" + postBreak + "%{GREEDYDATA:" + EPILOGUE + "}");
"%{" + grokPatternName + ":" + VALUE + "}" + postBreak + "%{GREEDYDATA:" + EPILOGUE + "}", TimeoutChecker.watchdog);
}

@Override
Expand All @@ -472,22 +472,21 @@ public String processCaptures(Map<String, Integer> fieldNameCountStore, Collecti
TimeoutChecker timeoutChecker) {
Collection<String> values = new ArrayList<>();
for (String snippet : snippets) {
Map<String, Object> captures = grok.captures(snippet);
Map<String, Object> captures = timeoutChecker.grokCaptures(grok, snippet, "full message Grok pattern field extraction");
// If the pattern doesn't match then captures will be null
if (captures == null) {
throw new IllegalStateException("[%{" + grokPatternName + "}] does not match snippet [" + snippet + "]");
}
prefaces.add(captures.getOrDefault(PREFACE, "").toString());
values.add(captures.getOrDefault(VALUE, "").toString());
epilogues.add(captures.getOrDefault(EPILOGUE, "").toString());
timeoutChecker.check("full message Grok pattern field extraction");
}
String adjustedFieldName = buildFieldName(fieldNameCountStore, fieldName);
if (mappings != null) {
Map<String, String> fullMappingType = Collections.singletonMap(FileStructureUtils.MAPPING_TYPE_SETTING, mappingType);
if ("date".equals(mappingType)) {
assert values.isEmpty() == false;
TimestampMatch timestampMatch = TimestampFormatFinder.findFirstFullMatch(values.iterator().next());
TimestampMatch timestampMatch = TimestampFormatFinder.findFirstFullMatch(values.iterator().next(), timeoutChecker);
if (timestampMatch != null) {
fullMappingType = timestampMatch.getEsDateMappingTypeWithFormat();
}
Expand Down Expand Up @@ -548,7 +547,7 @@ public String processCaptures(Map<String, Integer> fieldNameCountStore, Collecti
throw new IllegalStateException("Cannot process KV matches until a field name has been determined");
}
Grok grok = new Grok(Grok.getBuiltinPatterns(), "(?m)%{DATA:" + PREFACE + "}\\b" +
fieldName + "=%{USER:" + VALUE + "}%{GREEDYDATA:" + EPILOGUE + "}");
fieldName + "=%{USER:" + VALUE + "}%{GREEDYDATA:" + EPILOGUE + "}", TimeoutChecker.watchdog);
Collection<String> values = new ArrayList<>();
for (String snippet : snippets) {
Map<String, Object> captures = grok.captures(snippet);
Expand All @@ -563,7 +562,8 @@ public String processCaptures(Map<String, Integer> fieldNameCountStore, Collecti
}
String adjustedFieldName = buildFieldName(fieldNameCountStore, fieldName);
if (mappings != null) {
mappings.put(adjustedFieldName, FileStructureUtils.guessScalarMapping(explanation, adjustedFieldName, values));
mappings.put(adjustedFieldName,
FileStructureUtils.guessScalarMapping(explanation, adjustedFieldName, values, timeoutChecker));
timeoutChecker.check("mapping determination");
}
if (fieldStats != null) {
Expand Down Expand Up @@ -610,7 +610,7 @@ static FullMatchGrokPatternCandidate fromGrokPattern(String grokPattern, String
private FullMatchGrokPatternCandidate(String grokPattern, String timeField) {
this.grokPattern = grokPattern;
this.timeField = timeField;
grok = new Grok(Grok.getBuiltinPatterns(), grokPattern);
grok = new Grok(Grok.getBuiltinPatterns(), grokPattern, TimeoutChecker.watchdog);
}

public String getTimeField() {
Expand Down Expand Up @@ -640,7 +640,8 @@ public Tuple<String, String> processMatch(List<String> explanation, Collection<S
Map<String, Collection<String>> valuesPerField = new HashMap<>();

for (String sampleMessage : sampleMessages) {
Map<String, Object> captures = grok.captures(sampleMessage);
Map<String, Object> captures = timeoutChecker.grokCaptures(grok, sampleMessage,
"full message Grok pattern field extraction");
// If the pattern doesn't match then captures will be null
if (captures == null) {
throw new IllegalStateException("[" + grokPattern + "] does not match snippet [" + sampleMessage + "]");
Expand All @@ -658,7 +659,6 @@ public Tuple<String, String> processMatch(List<String> explanation, Collection<S
}
});
}
timeoutChecker.check("full message Grok pattern field extraction");
}

for (Map.Entry<String, Collection<String>> valuesForField : valuesPerField.entrySet()) {
Expand All @@ -667,7 +667,7 @@ public Tuple<String, String> processMatch(List<String> explanation, Collection<S
// Exclude the time field because that will be dropped and replaced with @timestamp
if (fieldName.equals(timeField) == false) {
mappings.put(fieldName,
FileStructureUtils.guessScalarMapping(explanation, fieldName, valuesForField.getValue()));
FileStructureUtils.guessScalarMapping(explanation, fieldName, valuesForField.getValue(), timeoutChecker));
timeoutChecker.check("mapping determination");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ static Tuple<TimestampMatch, Set<String>> mostLikelyTimestamp(String[] sampleLin
int remainingLines = sampleLines.length;
double differenceBetweenTwoHighestWeights = 0.0;
for (String sampleLine : sampleLines) {
TimestampMatch match = TimestampFormatFinder.findFirstMatch(sampleLine, overrides.getTimestampFormat());
TimestampMatch match = TimestampFormatFinder.findFirstMatch(sampleLine, overrides.getTimestampFormat(), timeoutChecker);
if (match != null) {
TimestampMatch pureMatch = new TimestampMatch(match.candidateIndex, "", match.jodaTimestampFormats,
match.javaTimestampFormats, match.simplePattern, match.grokPatternName, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@
package org.elasticsearch.xpack.ml.filestructurefinder;

import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.grok.Grok;
import org.elasticsearch.grok.ThreadWatchdog;

import java.io.Closeable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* This class can be used to keep track of when a long running operation started and
Expand All @@ -32,9 +37,13 @@
*/
public class TimeoutChecker implements Closeable {

private static final TimeoutCheckerWatchdog timeoutCheckerWatchdog = new TimeoutCheckerWatchdog();
public static final ThreadWatchdog watchdog = timeoutCheckerWatchdog;

private final String operation;
private final ScheduledFuture<?> future;
private final TimeValue timeout;
private final Thread checkedThread;
private final ScheduledFuture<?> future;
private volatile boolean timeoutExceeded;

/**
Expand All @@ -48,6 +57,8 @@ public class TimeoutChecker implements Closeable {
public TimeoutChecker(String operation, TimeValue timeout, ScheduledExecutorService scheduler) {
this.operation = operation;
this.timeout = timeout;
this.checkedThread = Thread.currentThread();
timeoutCheckerWatchdog.add(checkedThread, timeout);
this.future = (timeout != null) ? scheduler.schedule(this::setTimeoutExceeded, timeout.nanos(), TimeUnit.NANOSECONDS) : null;
}

Expand All @@ -57,6 +68,7 @@ public TimeoutChecker(String operation, TimeValue timeout, ScheduledExecutorServ
@Override
public void close() {
FutureUtils.cancel(future);
timeoutCheckerWatchdog.remove(checkedThread);
}

/**
Expand All @@ -72,7 +84,80 @@ public void check(String where) {
}
}

/**
* Wrapper around {@link Grok#captures} that translates any timeout exception
* to the style thrown by this class's {@link #check} method.
* @param grok The grok pattern from which captures are to be extracted.
* @param text The text to match and extract values from.
* @param where Which stage of the operation is currently in progress?
* @return A map containing field names and their respective coerced values that matched.
* @throws ElasticsearchTimeoutException If the operation is found to have taken longer than the permitted time.
*/
public Map<String, Object> grokCaptures(Grok grok, String text, String where) {

try {
return grok.captures(text);
} finally {
// If a timeout has occurred then this check will overwrite any timeout exception thrown by Grok.captures() and this
// is intentional - the exception from this class makes more sense in the context of the find file structure API
check(where);
}
}

private void setTimeoutExceeded() {
timeoutExceeded = true;
timeoutCheckerWatchdog.interruptLongRunningThreadIfRegistered(checkedThread);
}

/**
* An implementation of the type of watchdog used by the {@link Grok} class to interrupt
* matching operations that take too long. Rather than have a timeout per match operation
* like the {@link ThreadWatchdog.Default} implementation, the interruption is governed by
* a {@link TimeoutChecker} associated with the thread doing the matching.
*/
static class TimeoutCheckerWatchdog implements ThreadWatchdog {

final ConcurrentHashMap<Thread, Tuple<AtomicBoolean, TimeValue>> registry = new ConcurrentHashMap<>();

void add(Thread thread, TimeValue timeout) {
Tuple<AtomicBoolean, TimeValue> previousValue = registry.put(thread, new Tuple<>(new AtomicBoolean(false), timeout));
assert previousValue == null;
}

@Override
public void register() {
Tuple<AtomicBoolean, TimeValue> value = registry.get(Thread.currentThread());
if (value != null) {
boolean wasFalse = value.v1().compareAndSet(false, true);
assert wasFalse;
}
}

@Override
public long maxExecutionTimeInMillis() {
Tuple<AtomicBoolean, TimeValue> value = registry.get(Thread.currentThread());
return value != null ? value.v2().getMillis() : Long.MAX_VALUE;
}

@Override
public void unregister() {
Tuple<AtomicBoolean, TimeValue> value = registry.get(Thread.currentThread());
if (value != null) {
boolean wasTrue = value.v1().compareAndSet(true, false);
assert wasTrue;
}
}

void remove(Thread thread) {
Tuple<AtomicBoolean, TimeValue> previousValue = registry.remove(thread);
assert previousValue != null;
}

void interruptLongRunningThreadIfRegistered(Thread thread) {
Tuple<AtomicBoolean, TimeValue> value = registry.get(thread);
if (value.v1().get()) {
thread.interrupt();
}
}
}
}
Loading

0 comments on commit 690b10a

Please sign in to comment.