Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for timeout millis, refactored unit tests to createObje… #449

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion data-prepper-plugins/grok-prepper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,13 @@ TODO: provide examples for using each configuration

* `target_key` (Optional): A `String` that will wrap all captures for a Record in an additional outer key value. Default value is `null`


* `timeout_millis` (Optional): An `int` that specifies the maximum amount of time, in milliseconds, that matching will be performed on an individual Record before it times out and moves on to the next Record.
Setting a `timeout_millis = 0` will make it so that matching a Record never times out. Default value is `30,000`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be simplified: "Setting a timeout_millis = 0 will disable this feature. Default value is 30,000"


## Notes on Patterns

The Grok Prepper uses the [java-grok Library](https://github.com/thekrakken/java-grok) internally and supports all java-grok Library compatible patterns.
The Grok Prepper uses the [java-grok Library](https://github.com/thekrakken/java-grok) internally and supports all java-grok library compatible patterns. The java-grok library is built using the `java.util.regex` regular expression library.

[Default Patterns](https://github.com/thekrakken/java-grok/blob/master/src/main/resources/patterns/patterns)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;

Expand All @@ -64,13 +70,19 @@ public class GrokPrepper extends AbstractPrepper<Record<String>, Record<String>>
private final Map<String, List<Grok>> fieldToGrok;
private final GrokPrepperConfig grokPrepperConfig;
private final Set<String> keysToOverwrite;
private final ExecutorService executorService;

public GrokPrepper(final PluginSetting pluginSetting) {
this(pluginSetting, GrokCompiler.newInstance(), Executors.newSingleThreadExecutor());
}

GrokPrepper(final PluginSetting pluginSetting, final GrokCompiler grokCompiler, final ExecutorService executorService) {
super(pluginSetting);
grokPrepperConfig = GrokPrepperConfig.buildConfig(pluginSetting);
keysToOverwrite = new HashSet<>(grokPrepperConfig.getkeysToOverwrite());
grokCompiler = GrokCompiler.newInstance();
fieldToGrok = new LinkedHashMap<>();
this.grokPrepperConfig = GrokPrepperConfig.buildConfig(pluginSetting);
this.keysToOverwrite = new HashSet<>(grokPrepperConfig.getkeysToOverwrite());
this.grokCompiler = grokCompiler;
this.fieldToGrok = new LinkedHashMap<>();
this.executorService = executorService;

registerPatterns();
compileMatchPatterns();
Expand All @@ -90,31 +102,11 @@ public Collection<Record<String>> doExecute(final Collection<Record<String>> rec
for (final Record<String> record : records) {
try {
final Map<String, Object> recordMap = OBJECT_MAPPER.readValue(record.getData(), MAP_TYPE_REFERENCE);
final Map<String, Object> grokkedCaptures = new HashMap<>();

for (final Map.Entry<String, List<Grok>> entry : fieldToGrok.entrySet()) {
for (final Grok grok : entry.getValue()) {
if (recordMap.containsKey(entry.getKey())) {
final Match match = grok.match(recordMap.get(entry.getKey()).toString());
match.setKeepEmptyCaptures(grokPrepperConfig.isKeepEmptyCaptures());

final Map<String, Object> captures = match.capture();
mergeCaptures(grokkedCaptures, captures);

if (shouldBreakOnMatch(grokkedCaptures)) {
break;
}
}
}
if (shouldBreakOnMatch(grokkedCaptures)) {
break;
}
}

if (grokPrepperConfig.getTargetKey() != null) {
recordMap.put(grokPrepperConfig.getTargetKey(), grokkedCaptures);
if (grokPrepperConfig.getTimeoutMillis() == 0) {
matchAndMerge(recordMap);
} else {
mergeCaptures(recordMap, grokkedCaptures);
runWithTimeout(() -> matchAndMerge(recordMap));
}

final Record<String> grokkedRecord = new Record<>(OBJECT_MAPPER.writeValueAsString(recordMap), record.getMetadata());
Expand All @@ -123,33 +115,52 @@ public Collection<Record<String>> doExecute(final Collection<Record<String>> rec
} catch (JsonProcessingException e) {
LOG.error("Failed to parse the record [{}]", record.getData());
recordsOut.add(record);
} catch (TimeoutException e) {
LOG.error("Matching on record [{}] took longer than [{}] and timed out", record.getData(), grokPrepperConfig.getTimeoutMillis());
recordsOut.add(record);
} catch (ExecutionException e) {
LOG.error("An exception occurred while matching on record [{}]", record.getData(), e);
recordsOut.add(record);
} catch (InterruptedException e) {
LOG.error("Matching on record [{}] was interrupted", record.getData(), e);
recordsOut.add(record);
} catch (RuntimeException e) {
LOG.error("Unknown exception occurred when matching record [{}]", record.getData(), e);
recordsOut.add(record);
}
}
}
return recordsOut;
}

@Override
public void prepareForShutdown() {

executorService.shutdown();
}

@Override
public boolean isReadyForShutdown() {
try {
if (executorService.awaitTermination(300, TimeUnit.MILLISECONDS)) {
LOG.info("Successfully waited for running task to terminate");
}
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for running task to terminate", e);
}
return true;
}

@Override
public void shutdown() {

executorService.shutdownNow();
}

private void registerPatterns() {
grokCompiler.registerDefaultPatterns();
grokCompiler.register(grokPrepperConfig.getPatternDefinitions());
registerPatternsDir();
registerPatternsDirectories();
}

private void registerPatternsDir() {
private void registerPatternsDirectories() {
for (final String directory : grokPrepperConfig.getPatternsDirectories()) {
final Path path = FileSystems.getDefault().getPath(directory);
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, grokPrepperConfig.getPatternsFilesGlob())) {
Expand Down Expand Up @@ -186,6 +197,35 @@ private void compileMatchPatterns() {
}
}

private void matchAndMerge(final Map<String, Object> recordMap) {
final Map<String, Object> grokkedCaptures = new HashMap<>();

for (final Map.Entry<String, List<Grok>> entry : fieldToGrok.entrySet()) {
for (final Grok grok : entry.getValue()) {
if (recordMap.containsKey(entry.getKey())) {
final Match match = grok.match(recordMap.get(entry.getKey()).toString());
match.setKeepEmptyCaptures(grokPrepperConfig.isKeepEmptyCaptures());

final Map<String, Object> captures = match.capture();
mergeCaptures(grokkedCaptures, captures);

if (shouldBreakOnMatch(grokkedCaptures)) {
break;
}
}
}
if (shouldBreakOnMatch(grokkedCaptures)) {
break;
}
}

if (grokPrepperConfig.getTargetKey() != null) {
recordMap.put(grokPrepperConfig.getTargetKey(), grokkedCaptures);
} else {
mergeCaptures(recordMap, grokkedCaptures);
}
}

private void mergeCaptures(final Map<String, Object> original, final Map<String, Object> updates) {
for (final Map.Entry<String, Object> updateEntry : updates.entrySet()) {
if (!(original.containsKey(updateEntry.getKey())) || keysToOverwrite.contains(updateEntry.getKey())) {
Expand Down Expand Up @@ -214,4 +254,9 @@ private void mergeValueWithValues(final Object value, final List<Object> values)
private boolean shouldBreakOnMatch(final Map<String, Object> captures) {
return captures.size() > 0 && grokPrepperConfig.isBreakOnMatch();
}

private void runWithTimeout(final Runnable runnable) throws TimeoutException, ExecutionException, InterruptedException {
Future<?> task = executorService.submit(runnable);
task.get(grokPrepperConfig.getTimeoutMillis(), TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should update the READMe with this configuration options now that we are using it.

}
}
Loading