Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File source improvements and change to event model #601

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions data-prepper-plugins/common/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,33 @@ A prepper plugin to generate new string records with upper or lower case convers

## `file` (source)

A source plugin to read input data from the specified file path.

- path (String): absolute input data file path
A source plugin to read input data from the specified file path. The file source creates a new Record for each line of data in the file.

* `path` (String): absolute input data file path. It is required

* `format` (String): The format of each line of the file. Valid options are `json` or `plain`. Default is `plain`.
<br></br>
* `plain`: Reads plaintext data from files. Internally, a plain text line from a file will be given a key of `message` as shown below.
```
Example log line in file
```
becomes
```
{ "message": "Example log line in file" }
```

* `json`: Reads data that is in the form of a JSON string from a file. If the json string is unable to be parsed, the file source will treat it as a plaintext line.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the code change, maybe we should specify that this format expects json lines as follows

{ "key1": "val1" }
{ "key2": "val2" }
{ "key3": "val3" }

instead of a single json blob

Expects json lines as follows:
```
{ "key1": "val1" }
{ "key2": "val2" }
{ "key3": "val3" }
```


* `record_type` (String): The Event type that will be stored in the metadata of the Event. Default is `event`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this, even though it is temporary.

I'd generally prefer to keep the default as string since that is the default behavior. But, since this is more for test and non-Trace Analytics pipelines, it could be a reasonable breaking change. If the default is event, the release notes should state this, and how to get past it. #618 .

Copy link
Member Author

@graytaylor0 graytaylor0 Nov 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I can make the default string, but will have to temporarily change some grok documentation, which is less painful in the end than making it a breaking change.

Temporarily, `type` can either be `event` or `string`. If you would like to use the file source for trace analytics use cases,
change this to `string`.

## `file` (sink)

Expand Down
2 changes: 2 additions & 0 deletions data-prepper-plugins/common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ plugins {
}
dependencies {
api project(':data-prepper-api')
implementation project(':data-prepper-plugins:blocking-buffer')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you only need this for the tests. It can be made to be testImplementation and should sit with the other testImplementation dependencies.

implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation "commons-io:commons-io:2.11.0"
Expand All @@ -25,6 +26,7 @@ dependencies {
implementation 'org.reflections:reflections:0.10.2'
testImplementation 'commons-io:commons-io:2.11.0'
testImplementation "org.hamcrest:hamcrest:2.2"
testImplementation "org.mockito:mockito-inline:${versionMap.mockito}"
}

jacocoTestCoverageVerification {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.amazon.dataprepper.plugins.source.file;

import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* An enumm to represent the file formats supported in Data Prepper's file source.
* @since 1.2
*/
public enum FileFormat {

PLAIN("plain"),
JSON("json");

private static final Map<String, FileFormat> NAMES_MAP = Arrays.stream(FileFormat.values())
.collect(Collectors.toMap(FileFormat::toString, Function.identity()));

private final String name;

FileFormat(final String name) {
this.name = name;
}

public String toString() {
return this.name;
}

public static FileFormat getByName(final String name) {
return NAMES_MAP.get(name.toLowerCase());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package com.amazon.dataprepper.plugins.source.file;

import com.amazon.dataprepper.metrics.PluginMetrics;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.annotations.DataPrepperPluginConstructor;
import com.amazon.dataprepper.model.buffer.Buffer;
import com.amazon.dataprepper.model.event.JacksonEvent;
import com.amazon.dataprepper.model.plugin.PluginFactory;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.source.Source;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;

@DataPrepperPlugin(name = "file", pluginType = Source.class, pluginConfigurationType = FileSourceConfig.class)
public class FileSource implements Source<Record<Object>> {

static final String MESSAGE_KEY = "message";
private static final Logger LOG = LoggerFactory.getLogger(FileSource.class);
private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<Map<String, Object>>() {};

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final FileSourceConfig fileSourceConfig;

private boolean isStopRequested;
private final int writeTimeout;

@DataPrepperPluginConstructor
public FileSource(final FileSourceConfig fileSourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory) {
fileSourceConfig.validate();
this.fileSourceConfig = fileSourceConfig;
this.isStopRequested = false;
this.writeTimeout = FileSourceConfig.DEFAULT_TIMEOUT;
}


@Override
public void start(final Buffer<Record<Object>> buffer) {
checkNotNull(buffer, "Buffer cannot be null for file source to start");
try (BufferedReader reader = Files.newBufferedReader(Paths.get(fileSourceConfig.getFilePathToRead()), StandardCharsets.UTF_8)) {
String line;
while ((line = reader.readLine()) != null && !isStopRequested) {
writeLineAsEventOrString(line, buffer);
}
} catch (IOException | TimeoutException | IllegalArgumentException ex) {
LOG.error("Error processing the input file path [{}]", fileSourceConfig.getFilePathToRead(), ex);
throw new RuntimeException(format("Error processing the input file %s",
fileSourceConfig.getFilePathToRead()), ex);
}
}

@Override
public void stop() {
isStopRequested = true;
}

private Record<Object> getEventRecordFromLine(final String line) {
Map<String, Object> structuredLine = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can't be final it's getting assigned in the JSON case. I could pass it as an argument but I like how it is now better.


switch(fileSourceConfig.getFormat()) {
case JSON:
structuredLine = parseJson(line);
break;
case PLAIN:
structuredLine.put(MESSAGE_KEY, line);
break;
}

return new Record<>(JacksonEvent
.builder()
.withEventType(fileSourceConfig.getType())
.withData(structuredLine)
.build());
}

private Map<String, Object> parseJson(final String jsonString) {
try {
return OBJECT_MAPPER.readValue(jsonString, MAP_TYPE_REFERENCE);
} catch (JsonProcessingException e) {
LOG.error("Unable to parse json data [{}], assuming plain text", jsonString, e);
final Map<String, Object> plainMap = new HashMap<>();
plainMap.put(MESSAGE_KEY, jsonString);
return plainMap;
}
}

// Temporary function to support both trace and log ingestion pipelines.
// TODO: This function should be removed with the completion of: https://github.com/opensearch-project/data-prepper/issues/546
private void writeLineAsEventOrString(final String line, final Buffer<Record<Object>> buffer) throws TimeoutException, IllegalArgumentException {
if (fileSourceConfig.getType().equals(FileSourceConfig.DEFAULT_TYPE)) {
buffer.write(getEventRecordFromLine(line), writeTimeout);
} else if (fileSourceConfig.getType().equals(FileSourceConfig.STRING_TYPE)) {
buffer.write(new Record<>(line), writeTimeout);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.amazon.dataprepper.plugins.source.file;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;

import java.util.Objects;

public class FileSourceConfig {
static final String ATTRIBUTE_PATH = "path";
static final String ATTRIBUTE_TYPE = "record_type";
static final String ATTRIBUTE_FORMAT = "format";
static final int DEFAULT_TIMEOUT = 5_000;
static final String DEFAULT_TYPE = "event";
static final String DEFAULT_FORMAT = "plain";
static final String STRING_TYPE = "string";


@JsonProperty(ATTRIBUTE_PATH)
private String filePathToRead;

@JsonProperty(ATTRIBUTE_FORMAT)
private String format = DEFAULT_FORMAT;

@JsonProperty(ATTRIBUTE_TYPE)
private String type = DEFAULT_TYPE;

public String getFilePathToRead() {
return filePathToRead;
}

@JsonIgnore
public FileFormat getFormat() {
return FileFormat.getByName(format);
}

public String getType() {
return type;
}

void validate() {
Objects.requireNonNull(filePathToRead, "File path is required");
Preconditions.checkArgument(type.equals(STRING_TYPE) || type.equals(DEFAULT_TYPE), "Invalid type: must be either [event] or [string]");
Preconditions.checkArgument(format.equals(DEFAULT_FORMAT) || format.equals("json"), "Invalid file format. Options are [json] and [plain]");
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's introduce validation to this config. We should be able to do this while we are waiting on JSR-303. Check out the HttpSourceConfig for an example.

Loading