Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Grok Prepper Matching Functionality #324

Merged
merged 10 commits into from
Oct 1, 2021
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -254,13 +254,13 @@ public Long getLongOrDefault(final String attribute, final long defaultValue) {
throw new IllegalArgumentException(String.format(UNEXPECTED_ATTRIBUTE_TYPE_MSG, object.getClass(), attribute));
}

private <T> void checkObjectType(String attribute, Object object, Class<T> type) {
private <T> void checkObjectType(final String attribute, final Object object, final Class<T> type) {
if (!(type.isAssignableFrom(object.getClass()))){
throw new IllegalArgumentException(String.format(UNEXPECTED_ATTRIBUTE_TYPE_MSG, object.getClass(), attribute));
}
}

private <T> void checkObjectForListType(String attribute, Object object, Class<T> type) {
private <T> void checkObjectForListType(final String attribute, final Object object, final Class<T> type) {
checkObjectType(attribute, object, List.class);

((List<?>) object).forEach(o -> {
Expand Down
35 changes: 35 additions & 0 deletions data-prepper-plugins/grok-prepper/README.md
Original file line number Diff line number Diff line change
@@ -1 +1,36 @@
# Grok Prepper

This is a prepper that takes unstructured data and utilizes pattern matching
to structure and extract important fields for easier and more insightful aggregation and analysis.

## Usages

Example `.yaml` configuration

```
prepper:
- grok:
match:
message: [ "%{COMMONAPACHELOG}" ]
```

## Configuration

* `match` (Optional): A `Map<String, List<String>>` that specifies which fields of a Record to match which patterns against. Default value is `{}`

* `keep_empty_captures` (Optional): A `boolean` that specifies whether `null` captures should be kept. Default value is `false`

* `named_captures_only` (Optional): A `boolean` that specifies whether to only keep named captures. Default value is `true`

## Notes on Patterns

The Grok Prepper internally uses the [java-grok Library](https://github.com/thekrakken/java-grok), so any patterns compatible there are compatible with Grok Prepper.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few comments here, but let's defer that to a separate PR where we can focus on the documentation for our users later. I don't want to block this functionality right now.

  • "so any patterns compatible there are compatible with Grok Prepper." - this is passive and "there" refers to the object of the sentence instead of the subject which is a little confusing. Might I recommend:
    "The Grok Prepper uses the the java-grok Library internally and supports all java-grok Library compatible patterns."

  • We should link to the default patterns as well.


## Metrics

TBD

## Developer Guide
This plugin is compatible with Java 14. See
- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/readme/monitoring.md)
1 change: 1 addition & 0 deletions data-prepper-plugins/grok-prepper/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies {
implementation "io.krakens:java-grok:0.1.9"
implementation 'com.fasterxml.jackson.core:jackson-databind'
testImplementation "org.hamcrest:hamcrest:2.2"
testImplementation "org.mockito:mockito-inline:${versionMap.mockito}"
}

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,45 @@
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.prepper.AbstractPrepper;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.type.TypeReference;

import io.krakens.grok.api.Grok;
import io.krakens.grok.api.GrokCompiler;
import io.krakens.grok.api.Match;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;


@DataPrepperPlugin(name = "grok", type = PluginType.PREPPER)
public class GrokPrepper extends AbstractPrepper<Record<String>, Record<String>> {

private static final Logger LOG = LoggerFactory.getLogger(GrokPrepper.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<Map<String, Object>>() {};

private final GrokCompiler grokCompiler;
private final Map<String, List<Grok>> fieldToGrok;
private final GrokPrepperConfig grokPrepperConfig;

public GrokPrepper(final PluginSetting pluginSetting) {
super(pluginSetting);
grokPrepperConfig = GrokPrepperConfig.buildConfig(pluginSetting);
grokCompiler = GrokCompiler.newInstance();
fieldToGrok = new LinkedHashMap<>();

registerPatterns();
compileMatchPatterns();
}

/**
Expand All @@ -43,10 +66,35 @@ public GrokPrepper(final PluginSetting pluginSetting) {
*
* @param records Input records that will be modified/processed
* @return Record modified output records
*/
*/
@Override
public Collection<Record<String>> doExecute(Collection<Record<String>> records) {
return records;
public Collection<Record<String>> doExecute(final Collection<Record<String>> records) {
final List<Record<String>> recordsOut = new LinkedList<>();
dlvenable marked this conversation as resolved.
Show resolved Hide resolved

for (Record<String> record : records) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: final

try {
final Map<String, Object> recordMap = OBJECT_MAPPER.readValue(record.getData(), MAP_TYPE_REFERENCE);

for (final Map.Entry<String, List<Grok>> entry : fieldToGrok.entrySet()) {
for (final Grok grok : entry.getValue()) {
if (recordMap.containsKey(entry.getKey())) {
Match match = grok.match(recordMap.get(entry.getKey()).toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: final

match.setKeepEmptyCaptures(grokPrepperConfig.isKeepEmptyCaptures());

mergeCaptures(recordMap, match.capture());
}
}
}

final Record<String> grokkedRecord = new Record<>(OBJECT_MAPPER.writeValueAsString(recordMap), record.getMetadata());
recordsOut.add(grokkedRecord);

} catch (JsonProcessingException e) {
LOG.error("Failed to parse the record [{}]", record.getData());
recordsOut.add(record);
}
}
return recordsOut;
}

@Override
Expand All @@ -63,4 +111,42 @@ public boolean isReadyForShutdown() {
public void shutdown() {

}
}

private void registerPatterns() {
grokCompiler.registerDefaultPatterns();
}

private void compileMatchPatterns() {
for (final Map.Entry<String, List<String>> entry : grokPrepperConfig.getMatch().entrySet()) {
fieldToGrok.put(entry.getKey(), entry.getValue()
.stream()
.map(item -> grokCompiler.compile(item, grokPrepperConfig.isNamedCapturesOnly()))
.collect(Collectors.toList()));
}
}

private void mergeCaptures(final Map<String, Object> original, final Map<String, Object> updates) {
for (final Map.Entry<String, Object> updateEntry : updates.entrySet()) {
if (!(original.containsKey(updateEntry.getKey()))) {
original.put(updateEntry.getKey(), updateEntry.getValue());
continue;
}

if (original.get(updateEntry.getKey()) instanceof List) {
mergeValueWithValues(updateEntry.getValue(), (List<Object>) original.get(updateEntry.getKey()));
} else {
final List<Object> values = new ArrayList<>(Collections.singletonList(original.get(updateEntry.getKey())));
mergeValueWithValues(updateEntry.getValue(), values);
original.put(updateEntry.getKey(), values);
}
}
}

private void mergeValueWithValues(final Object value, final List<Object> values) {
if (value instanceof List) {
values.addAll((List<Object>) value);
} else {
values.add(value);
}
}
}
Loading