Skip to content

Commit

Permalink
File source improvements and change to event model (#601)
Browse files Browse the repository at this point in the history
Refactored file source, added record_type and format configuration options for json and plaintext, support for both Event And String

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Nov 18, 2021
1 parent 8dd106b commit 515fa4d
Show file tree
Hide file tree
Showing 13 changed files with 486 additions and 250 deletions.
30 changes: 27 additions & 3 deletions data-prepper-plugins/common/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,33 @@ A prepper plugin to generate new string records with upper or lower case convers

## `file` (source)

A source plugin to read input data from the specified file path.

- path (String): absolute input data file path
A source plugin to read input data from the specified file path. The file source creates a new Record for each line of data in the file.

* `path` (String): absolute input data file path. It is required

* `format` (String): The format of each line of the file. Valid options are `json` or `plain`. Default is `plain`.
<br></br>
* `plain`: Reads plaintext data from files. Internally, a plain text line from a file will be given a key of `message` as shown below.
```
Example log line in file
```
becomes
```
{ "message": "Example log line in file" }
```
* `json`: Reads data that is in the form of a JSON string from a file. If the json string is unable to be parsed, the file source will treat it as a plaintext line.
Expects json lines as follows:
```
{ "key1": "val1" }
{ "key2": "val2" }
{ "key3": "val3" }
```
* `record_type` (String): The Event type that will be stored in the metadata of the Event. Default is `string`.
Temporarily, `type` can either be `event` or `string`. If you would like to use the file source for log analytics use cases like grok,
change this to `event`.
## `file` (sink)
Expand Down
2 changes: 2 additions & 0 deletions data-prepper-plugins/common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ dependencies {
implementation "org.bouncycastle:bcprov-jdk15on:1.69"
implementation "org.bouncycastle:bcpkix-jdk15on:1.69"
implementation 'org.reflections:reflections:0.10.2'
testImplementation project(':data-prepper-plugins:blocking-buffer')
testImplementation 'commons-io:commons-io:2.11.0'
testImplementation "org.hamcrest:hamcrest:2.2"
testImplementation "org.mockito:mockito-inline:${versionMap.mockito}"
}

jacocoTestCoverageVerification {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.amazon.dataprepper.plugins.source.file;

import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* An enumm to represent the file formats supported in Data Prepper's file source.
* @since 1.2
*/
public enum FileFormat {

PLAIN("plain"),
JSON("json");

private static final Map<String, FileFormat> NAMES_MAP = Arrays.stream(FileFormat.values())
.collect(Collectors.toMap(FileFormat::toString, Function.identity()));

private final String name;

FileFormat(final String name) {
this.name = name;
}

public String toString() {
return this.name;
}

public static FileFormat getByName(final String name) {
return NAMES_MAP.get(name.toLowerCase());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package com.amazon.dataprepper.plugins.source.file;

import com.amazon.dataprepper.metrics.PluginMetrics;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.annotations.DataPrepperPluginConstructor;
import com.amazon.dataprepper.model.buffer.Buffer;
import com.amazon.dataprepper.model.event.JacksonEvent;
import com.amazon.dataprepper.model.plugin.PluginFactory;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.source.Source;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;

@DataPrepperPlugin(name = "file", pluginType = Source.class, pluginConfigurationType = FileSourceConfig.class)
public class FileSource implements Source<Record<Object>> {

static final String MESSAGE_KEY = "message";
private static final Logger LOG = LoggerFactory.getLogger(FileSource.class);
private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<Map<String, Object>>() {};

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final FileSourceConfig fileSourceConfig;

private boolean isStopRequested;
private final int writeTimeout;

@DataPrepperPluginConstructor
public FileSource(final FileSourceConfig fileSourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory) {
fileSourceConfig.validate();
this.fileSourceConfig = fileSourceConfig;
this.isStopRequested = false;
this.writeTimeout = FileSourceConfig.DEFAULT_TIMEOUT;
}


@Override
public void start(final Buffer<Record<Object>> buffer) {
checkNotNull(buffer, "Buffer cannot be null for file source to start");
try (BufferedReader reader = Files.newBufferedReader(Paths.get(fileSourceConfig.getFilePathToRead()), StandardCharsets.UTF_8)) {
String line;
while ((line = reader.readLine()) != null && !isStopRequested) {
writeLineAsEventOrString(line, buffer);
}
} catch (IOException | TimeoutException | IllegalArgumentException ex) {
LOG.error("Error processing the input file path [{}]", fileSourceConfig.getFilePathToRead(), ex);
throw new RuntimeException(format("Error processing the input file %s",
fileSourceConfig.getFilePathToRead()), ex);
}
}

@Override
public void stop() {
isStopRequested = true;
}

private Record<Object> getEventRecordFromLine(final String line) {
Map<String, Object> structuredLine = new HashMap<>();

switch(fileSourceConfig.getFormat()) {
case JSON:
structuredLine = parseJson(line);
break;
case PLAIN:
structuredLine.put(MESSAGE_KEY, line);
break;
}

return new Record<>(JacksonEvent
.builder()
.withEventType(fileSourceConfig.getRecordType())
.withData(structuredLine)
.build());
}

private Map<String, Object> parseJson(final String jsonString) {
try {
return OBJECT_MAPPER.readValue(jsonString, MAP_TYPE_REFERENCE);
} catch (JsonProcessingException e) {
LOG.error("Unable to parse json data [{}], assuming plain text", jsonString, e);
final Map<String, Object> plainMap = new HashMap<>();
plainMap.put(MESSAGE_KEY, jsonString);
return plainMap;
}
}

// Temporary function to support both trace and log ingestion pipelines.
// TODO: This function should be removed with the completion of: https://github.com/opensearch-project/data-prepper/issues/546
private void writeLineAsEventOrString(final String line, final Buffer<Record<Object>> buffer) throws TimeoutException, IllegalArgumentException {
if (fileSourceConfig.getRecordType().equals(FileSourceConfig.EVENT_TYPE)) {
buffer.write(getEventRecordFromLine(line), writeTimeout);
} else if (fileSourceConfig.getRecordType().equals(FileSourceConfig.DEFAULT_TYPE)) {
buffer.write(new Record<>(line), writeTimeout);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.amazon.dataprepper.plugins.source.file;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;

import java.util.Objects;

public class FileSourceConfig {
static final String ATTRIBUTE_PATH = "path";
static final String ATTRIBUTE_TYPE = "record_type";
static final String ATTRIBUTE_FORMAT = "format";
static final int DEFAULT_TIMEOUT = 5_000;
static final String DEFAULT_TYPE = "string";
static final String DEFAULT_FORMAT = "plain";
static final String EVENT_TYPE = "event";


@JsonProperty(ATTRIBUTE_PATH)
private String filePathToRead;

@JsonProperty(ATTRIBUTE_FORMAT)
private String format = DEFAULT_FORMAT;

@JsonProperty(ATTRIBUTE_TYPE)
private String recordType = DEFAULT_TYPE;

public String getFilePathToRead() {
return filePathToRead;
}

@JsonIgnore
public FileFormat getFormat() {
return FileFormat.getByName(format);
}

public String getRecordType() {
return recordType;
}

void validate() {
Objects.requireNonNull(filePathToRead, "File path is required");
Preconditions.checkArgument(recordType.equals(EVENT_TYPE) || recordType.equals(DEFAULT_TYPE), "Invalid type: must be either [event] or [string]");
Preconditions.checkArgument(format.equals(DEFAULT_FORMAT) || format.equals("json"), "Invalid file format. Options are [json] and [plain]");
}
}
Loading

0 comments on commit 515fa4d

Please sign in to comment.