Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Grok Prepper Matching Functionality #324

Merged
merged 10 commits into from
Oct 1, 2021
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -254,13 +254,13 @@ public Long getLongOrDefault(final String attribute, final long defaultValue) {
throw new IllegalArgumentException(String.format(UNEXPECTED_ATTRIBUTE_TYPE_MSG, object.getClass(), attribute));
}

private <T> void checkObjectType(String attribute, Object object, Class<T> type) {
private <T> void checkObjectType(final String attribute, final Object object, final Class<T> type) {
if (!(type.isAssignableFrom(object.getClass()))){
throw new IllegalArgumentException(String.format(UNEXPECTED_ATTRIBUTE_TYPE_MSG, object.getClass(), attribute));
}
}

private <T> void checkObjectForListType(String attribute, Object object, Class<T> type) {
private <T> void checkObjectForListType(final String attribute, final Object object, final Class<T> type) {
checkObjectType(attribute, object, List.class);

((List<?>) object).forEach(o -> {
Expand Down
37 changes: 37 additions & 0 deletions data-prepper-plugins/grok-prepper/README.md
Original file line number Diff line number Diff line change
@@ -1 +1,38 @@
# Grok Prepper

This is a prepper that takes unstructured data and utilizes pattern matching
to structure and extract important fields for easier and more insightful aggregation and analysis.

## Usages

Example `.yaml` configuration

```
prepper:
- grok:
match:
message: [ "%{COMMONAPACHELOG}" ]
```

## Configuration

* `match` (Optional): A `Map<String, List<String>>` that specifies which fields of a Record to match which patterns against. Default value is `{}`

* `keep_empty_captures` (Optional): A `boolean` that specifies whether `null` captures should be kept. Default value is `false`

* `named_captures_only` (Optional): A `boolean` that specifies whether to only keep named captures. Default value is `true`

## Notes on Patterns

The Grok Prepper uses the [java-grok Library](https://github.com/thekrakken/java-grok) internally and supports all java-grok Library compatible patterns.

[Default Patterns](https://github.com/thekrakken/java-grok/blob/master/src/main/resources/patterns/patterns)

## Metrics

TBD

## Developer Guide
This plugin is compatible with Java 14. See
- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/readme/monitoring.md)
1 change: 1 addition & 0 deletions data-prepper-plugins/grok-prepper/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies {
implementation "io.krakens:java-grok:0.1.9"
implementation 'com.fasterxml.jackson.core:jackson-databind'
testImplementation "org.hamcrest:hamcrest:2.2"
testImplementation "org.mockito:mockito-inline:${versionMap.mockito}"
}

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,45 @@
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.prepper.AbstractPrepper;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.type.TypeReference;

import io.krakens.grok.api.Grok;
import io.krakens.grok.api.GrokCompiler;
import io.krakens.grok.api.Match;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;


@DataPrepperPlugin(name = "grok", type = PluginType.PREPPER)
public class GrokPrepper extends AbstractPrepper<Record<String>, Record<String>> {

private static final Logger LOG = LoggerFactory.getLogger(GrokPrepper.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<Map<String, Object>>() {};

private final GrokCompiler grokCompiler;
private final Map<String, List<Grok>> fieldToGrok;
private final GrokPrepperConfig grokPrepperConfig;

public GrokPrepper(final PluginSetting pluginSetting) {
super(pluginSetting);
grokPrepperConfig = GrokPrepperConfig.buildConfig(pluginSetting);
grokCompiler = GrokCompiler.newInstance();
fieldToGrok = new LinkedHashMap<>();

registerPatterns();
compileMatchPatterns();
}

/**
Expand All @@ -43,10 +66,35 @@ public GrokPrepper(final PluginSetting pluginSetting) {
*
* @param records Input records that will be modified/processed
* @return Record modified output records
*/
*/
@Override
public Collection<Record<String>> doExecute(Collection<Record<String>> records) {
return records;
public Collection<Record<String>> doExecute(final Collection<Record<String>> records) {
final List<Record<String>> recordsOut = new LinkedList<>();
dlvenable marked this conversation as resolved.
Show resolved Hide resolved

for (final Record<String> record : records) {
try {
final Map<String, Object> recordMap = OBJECT_MAPPER.readValue(record.getData(), MAP_TYPE_REFERENCE);

for (final Map.Entry<String, List<Grok>> entry : fieldToGrok.entrySet()) {
for (final Grok grok : entry.getValue()) {
if (recordMap.containsKey(entry.getKey())) {
final Match match = grok.match(recordMap.get(entry.getKey()).toString());
match.setKeepEmptyCaptures(grokPrepperConfig.isKeepEmptyCaptures());

mergeCaptures(recordMap, match.capture());
}
}
}

final Record<String> grokkedRecord = new Record<>(OBJECT_MAPPER.writeValueAsString(recordMap), record.getMetadata());
recordsOut.add(grokkedRecord);

} catch (JsonProcessingException e) {
LOG.error("Failed to parse the record [{}]", record.getData());
recordsOut.add(record);
}
}
return recordsOut;
}

@Override
Expand All @@ -63,4 +111,42 @@ public boolean isReadyForShutdown() {
public void shutdown() {

}
}

private void registerPatterns() {
grokCompiler.registerDefaultPatterns();
}

private void compileMatchPatterns() {
for (final Map.Entry<String, List<String>> entry : grokPrepperConfig.getMatch().entrySet()) {
fieldToGrok.put(entry.getKey(), entry.getValue()
.stream()
.map(item -> grokCompiler.compile(item, grokPrepperConfig.isNamedCapturesOnly()))
.collect(Collectors.toList()));
}
}

private void mergeCaptures(final Map<String, Object> original, final Map<String, Object> updates) {
for (final Map.Entry<String, Object> updateEntry : updates.entrySet()) {
if (!(original.containsKey(updateEntry.getKey()))) {
original.put(updateEntry.getKey(), updateEntry.getValue());
continue;
}

if (original.get(updateEntry.getKey()) instanceof List) {
mergeValueWithValues(updateEntry.getValue(), (List<Object>) original.get(updateEntry.getKey()));
} else {
final List<Object> values = new ArrayList<>(Collections.singletonList(original.get(updateEntry.getKey())));
mergeValueWithValues(updateEntry.getValue(), values);
original.put(updateEntry.getKey(), values);
}
}
}

private void mergeValueWithValues(final Object value, final List<Object> values) {
if (value instanceof List) {
values.addAll((List<Object>) value);
} else {
values.add(value);
}
}
}
Loading