diff --git a/data-prepper-plugins/common/README.md b/data-prepper-plugins/common/README.md index 86da8ea331..9be29adec5 100644 --- a/data-prepper-plugins/common/README.md +++ b/data-prepper-plugins/common/README.md @@ -8,9 +8,33 @@ A prepper plugin to generate new string records with upper or lower case convers ## `file` (source) -A source plugin to read input data from the specified file path. - -- path (String): absolute input data file path +A source plugin to read input data from the specified file path. The file source creates a new Record for each line of data in the file. + +* `path` (String): absolute input data file path. It is required + +* `format` (String): The format of each line of the file. Valid options are `json` or `plain`. Default is `plain`. +

+ * `plain`: Reads plaintext data from files. Internally, a plain text line from a file will be given a key of `message` as shown below. + ``` + Example log line in file + ``` + becomes + ``` + { "message": "Example log line in file" } + ``` + + * `json`: Reads data that is in the form of a JSON string from a file. If the json string is unable to be parsed, the file source will treat it as a plaintext line. + Expects json lines as follows: + ``` + { "key1": "val1" } + { "key2": "val2" } + { "key3": "val3" } + ``` + + +* `record_type` (String): The Event type that will be stored in the metadata of the Event. Default is `string`. +Temporarily, `type` can either be `event` or `string`. If you would like to use the file source for log analytics use cases like grok, + change this to `event`. ## `file` (sink) diff --git a/data-prepper-plugins/common/build.gradle b/data-prepper-plugins/common/build.gradle index 22e498359c..7d07857328 100644 --- a/data-prepper-plugins/common/build.gradle +++ b/data-prepper-plugins/common/build.gradle @@ -23,8 +23,10 @@ dependencies { implementation "org.bouncycastle:bcprov-jdk15on:1.69" implementation "org.bouncycastle:bcpkix-jdk15on:1.69" implementation 'org.reflections:reflections:0.10.2' + testImplementation project(':data-prepper-plugins:blocking-buffer') testImplementation 'commons-io:commons-io:2.11.0' testImplementation "org.hamcrest:hamcrest:2.2" + testImplementation "org.mockito:mockito-inline:${versionMap.mockito}" } jacocoTestCoverageVerification { diff --git a/data-prepper-plugins/common/src/main/java/com/amazon/dataprepper/plugins/source/FileSource.java b/data-prepper-plugins/common/src/main/java/com/amazon/dataprepper/plugins/source/FileSource.java deleted file mode 100644 index a2b75aed09..0000000000 --- a/data-prepper-plugins/common/src/main/java/com/amazon/dataprepper/plugins/source/FileSource.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - * - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package com.amazon.dataprepper.plugins.source; - -import com.amazon.dataprepper.model.annotations.DataPrepperPlugin; -import com.amazon.dataprepper.model.buffer.Buffer; -import com.amazon.dataprepper.model.configuration.PluginSetting; -import com.amazon.dataprepper.model.record.Record; -import com.amazon.dataprepper.model.source.Source; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.concurrent.TimeoutException; - -import static com.google.common.base.Preconditions.checkNotNull; -import static java.lang.String.format; - -@DataPrepperPlugin(name = "file", pluginType = Source.class) -public class FileSource implements Source> { - private static final Logger LOG = LoggerFactory.getLogger(FileSource.class); - private static final String ATTRIBUTE_PATH = "path"; - private static final String ATTRIBUTE_TIMEOUT = "write_timeout"; - private static final int WRITE_TIMEOUT = 5_000; - - private final String filePathToRead; - private final int writeTimeout; - private final String pipelineName; - private boolean isStopRequested; - - - /** - * Mandatory constructor for Data Prepper Component - This constructor is used by Data Prepper - * runtime engine to construct an instance of {@link FileSource} using an instance of {@link PluginSetting} which - * has access to pluginSetting metadata from pipeline - * pluginSetting file. - * - * @param pluginSetting instance with metadata information from pipeline pluginSetting file. - */ - public FileSource(final PluginSetting pluginSetting) { - this((String) checkNotNull(pluginSetting, "PluginSetting cannot be null") - .getAttributeFromSettings(ATTRIBUTE_PATH), - pluginSetting.getIntegerOrDefault(ATTRIBUTE_TIMEOUT, WRITE_TIMEOUT), - pluginSetting.getPipelineName()); - } - - public FileSource(final String filePath, final int writeTimeout, final String pipelineName) { - if (filePath == null || filePath.isEmpty()) { - throw new RuntimeException(format("Pipeline [%s] - path is a required attribute for file source", - pipelineName)); - } - this.filePathToRead = filePath; - this.writeTimeout = writeTimeout; - this.pipelineName = checkNotNull(pipelineName, "Pipeline name cannot be null"); - isStopRequested = false; - } - - - @Override - public void start(final Buffer> buffer) { - checkNotNull(buffer, format("Pipeline [%s] - buffer cannot be null for file source to start", pipelineName)); - try (BufferedReader reader = Files.newBufferedReader(Paths.get(filePathToRead), StandardCharsets.UTF_8)) { - String line; - while ((line = reader.readLine()) != null && !isStopRequested) { - buffer.write(new Record<>(line), writeTimeout); - } - } catch (IOException | TimeoutException ex) { - LOG.error("Pipeline [{}] - Error processing the input file [{}]", pipelineName, filePathToRead, ex); - throw new RuntimeException(format("Pipeline [%s] - Error processing the input file %s", pipelineName, - filePathToRead), ex); - } - } - - @Override - public void stop() { - isStopRequested = true; - } -} \ No newline at end of file diff --git a/data-prepper-plugins/common/src/main/java/com/amazon/dataprepper/plugins/source/file/FileFormat.java b/data-prepper-plugins/common/src/main/java/com/amazon/dataprepper/plugins/source/file/FileFormat.java new file mode 100644 index 0000000000..a969ea7a31 --- /dev/null +++ b/data-prepper-plugins/common/src/main/java/com/amazon/dataprepper/plugins/source/file/FileFormat.java @@ -0,0 +1,33 @@ +package com.amazon.dataprepper.plugins.source.file; + +import java.util.Arrays; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * An enumm to represent the file formats supported in Data Prepper's file source. + * @since 1.2 + */ +public enum FileFormat { + + PLAIN("plain"), + JSON("json"); + + private static final Map NAMES_MAP = Arrays.stream(FileFormat.values()) + .collect(Collectors.toMap(FileFormat::toString, Function.identity())); + + private final String name; + + FileFormat(final String name) { + this.name = name; + } + + public String toString() { + return this.name; + } + + public static FileFormat getByName(final String name) { + return NAMES_MAP.get(name.toLowerCase()); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/common/src/main/java/com/amazon/dataprepper/plugins/source/file/FileSource.java b/data-prepper-plugins/common/src/main/java/com/amazon/dataprepper/plugins/source/file/FileSource.java new file mode 100644 index 0000000000..0023069451 --- /dev/null +++ b/data-prepper-plugins/common/src/main/java/com/amazon/dataprepper/plugins/source/file/FileSource.java @@ -0,0 +1,121 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package com.amazon.dataprepper.plugins.source.file; + +import com.amazon.dataprepper.metrics.PluginMetrics; +import com.amazon.dataprepper.model.annotations.DataPrepperPlugin; +import com.amazon.dataprepper.model.annotations.DataPrepperPluginConstructor; +import com.amazon.dataprepper.model.buffer.Buffer; +import com.amazon.dataprepper.model.event.JacksonEvent; +import com.amazon.dataprepper.model.plugin.PluginFactory; +import com.amazon.dataprepper.model.record.Record; +import com.amazon.dataprepper.model.source.Source; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.lang.String.format; + +@DataPrepperPlugin(name = "file", pluginType = Source.class, pluginConfigurationType = FileSourceConfig.class) +public class FileSource implements Source> { + + static final String MESSAGE_KEY = "message"; + private static final Logger LOG = LoggerFactory.getLogger(FileSource.class); + private static final TypeReference> MAP_TYPE_REFERENCE = new TypeReference>() {}; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private final FileSourceConfig fileSourceConfig; + + private boolean isStopRequested; + private final int writeTimeout; + + @DataPrepperPluginConstructor + public FileSource(final FileSourceConfig fileSourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory) { + fileSourceConfig.validate(); + this.fileSourceConfig = fileSourceConfig; + this.isStopRequested = false; + this.writeTimeout = FileSourceConfig.DEFAULT_TIMEOUT; + } + + + @Override + public void start(final Buffer> buffer) { + checkNotNull(buffer, "Buffer cannot be null for file source to start"); + try (BufferedReader reader = Files.newBufferedReader(Paths.get(fileSourceConfig.getFilePathToRead()), StandardCharsets.UTF_8)) { + String line; + while ((line = reader.readLine()) != null && !isStopRequested) { + writeLineAsEventOrString(line, buffer); + } + } catch (IOException | TimeoutException | IllegalArgumentException ex) { + LOG.error("Error processing the input file path [{}]", fileSourceConfig.getFilePathToRead(), ex); + throw new RuntimeException(format("Error processing the input file %s", + fileSourceConfig.getFilePathToRead()), ex); + } + } + + @Override + public void stop() { + isStopRequested = true; + } + + private Record getEventRecordFromLine(final String line) { + Map structuredLine = new HashMap<>(); + + switch(fileSourceConfig.getFormat()) { + case JSON: + structuredLine = parseJson(line); + break; + case PLAIN: + structuredLine.put(MESSAGE_KEY, line); + break; + } + + return new Record<>(JacksonEvent + .builder() + .withEventType(fileSourceConfig.getRecordType()) + .withData(structuredLine) + .build()); + } + + private Map parseJson(final String jsonString) { + try { + return OBJECT_MAPPER.readValue(jsonString, MAP_TYPE_REFERENCE); + } catch (JsonProcessingException e) { + LOG.error("Unable to parse json data [{}], assuming plain text", jsonString, e); + final Map plainMap = new HashMap<>(); + plainMap.put(MESSAGE_KEY, jsonString); + return plainMap; + } + } + + // Temporary function to support both trace and log ingestion pipelines. + // TODO: This function should be removed with the completion of: https://github.com/opensearch-project/data-prepper/issues/546 + private void writeLineAsEventOrString(final String line, final Buffer> buffer) throws TimeoutException, IllegalArgumentException { + if (fileSourceConfig.getRecordType().equals(FileSourceConfig.EVENT_TYPE)) { + buffer.write(getEventRecordFromLine(line), writeTimeout); + } else if (fileSourceConfig.getRecordType().equals(FileSourceConfig.DEFAULT_TYPE)) { + buffer.write(new Record<>(line), writeTimeout); + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/common/src/main/java/com/amazon/dataprepper/plugins/source/file/FileSourceConfig.java b/data-prepper-plugins/common/src/main/java/com/amazon/dataprepper/plugins/source/file/FileSourceConfig.java new file mode 100644 index 0000000000..2d0e846e07 --- /dev/null +++ b/data-prepper-plugins/common/src/main/java/com/amazon/dataprepper/plugins/source/file/FileSourceConfig.java @@ -0,0 +1,46 @@ +package com.amazon.dataprepper.plugins.source.file; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + +import java.util.Objects; + +public class FileSourceConfig { + static final String ATTRIBUTE_PATH = "path"; + static final String ATTRIBUTE_TYPE = "record_type"; + static final String ATTRIBUTE_FORMAT = "format"; + static final int DEFAULT_TIMEOUT = 5_000; + static final String DEFAULT_TYPE = "string"; + static final String DEFAULT_FORMAT = "plain"; + static final String EVENT_TYPE = "event"; + + + @JsonProperty(ATTRIBUTE_PATH) + private String filePathToRead; + + @JsonProperty(ATTRIBUTE_FORMAT) + private String format = DEFAULT_FORMAT; + + @JsonProperty(ATTRIBUTE_TYPE) + private String recordType = DEFAULT_TYPE; + + public String getFilePathToRead() { + return filePathToRead; + } + + @JsonIgnore + public FileFormat getFormat() { + return FileFormat.getByName(format); + } + + public String getRecordType() { + return recordType; + } + + void validate() { + Objects.requireNonNull(filePathToRead, "File path is required"); + Preconditions.checkArgument(recordType.equals(EVENT_TYPE) || recordType.equals(DEFAULT_TYPE), "Invalid type: must be either [event] or [string]"); + Preconditions.checkArgument(format.equals(DEFAULT_FORMAT) || format.equals("json"), "Invalid file format. Options are [json] and [plain]"); + } +} diff --git a/data-prepper-plugins/common/src/test/java/com/amazon/dataprepper/plugins/source/FileSourceTests.java b/data-prepper-plugins/common/src/test/java/com/amazon/dataprepper/plugins/source/FileSourceTests.java deleted file mode 100644 index 42981af9b3..0000000000 --- a/data-prepper-plugins/common/src/test/java/com/amazon/dataprepper/plugins/source/FileSourceTests.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - * - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package com.amazon.dataprepper.plugins.source; - -import com.amazon.dataprepper.model.CheckpointState; -import com.amazon.dataprepper.model.configuration.PluginSetting; -import com.amazon.dataprepper.model.record.Record; -import com.amazon.dataprepper.plugins.buffer.TestBuffer; -import com.google.common.collect.ImmutableMap; -import org.junit.Test; - -import java.util.Collection; -import java.util.LinkedList; -import java.util.Map; -import java.util.Queue; - -import static java.lang.String.format; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.MatcherAssert.assertThat; - -public class FileSourceTests { - private static final String TEST_PIPELINE_NAME = "test-pipeline"; - private static final int TEST_WRITE_TIMEOUT = 100; - private static final String TEST_FILE_PATH = "src/test/resources/test-file-source.tst"; - private static final String FILE_DOES_NOT_EXIST = "file_does_not_exist"; - private static final String FILE_CONTENT = "THIS IS A TEST"; - - - @Test - public void testFileSourceWithEmptyFilePath() { - try { - new FileSource("", TEST_WRITE_TIMEOUT, TEST_PIPELINE_NAME); - } catch (RuntimeException ex) { - assertThat(ex.getMessage(), is(equalTo(format("Pipeline [%s] - path is a required attribute for file " + - "source", TEST_PIPELINE_NAME)))); - } - } - - @Test - public void testFileSourceWithNullFilePath() { - try { - new FileSource(null, TEST_WRITE_TIMEOUT, TEST_PIPELINE_NAME); - } catch (RuntimeException ex) { - assertThat(ex.getMessage(), is(equalTo(format("Pipeline [%s] - path is a required attribute for file " + - "source", TEST_PIPELINE_NAME)))); - } - } - - @Test - public void testFileSourceCreationWithValues() { - final FileSource fileSource = new FileSource(TEST_FILE_PATH, TEST_WRITE_TIMEOUT, TEST_PIPELINE_NAME); - assertThat(fileSource, notNullValue()); - } - - @Test - public void testFileSourceCreationWithNullPipelineName() { - try { - new FileSource(TEST_FILE_PATH, TEST_WRITE_TIMEOUT, null); - } catch (NullPointerException ex) { - assertThat(ex.getMessage(), is(equalTo("Pipeline name cannot be null"))); - } - } - - @Test - public void testFileSourceCreationUsingPluginSettings() { - final Map settingMap = ImmutableMap.of( - "path", TEST_FILE_PATH); - final PluginSetting pluginSetting = new PluginSetting("file", settingMap); - pluginSetting.setPipelineName(TEST_PIPELINE_NAME); - final FileSource fileSource = new FileSource(pluginSetting); - assertThat(fileSource, notNullValue()); - } - - @Test - public void testFileSourceCreationWithNullPluginSetting() { - try { - new FileSource(null); - } catch (NullPointerException ex) { - assertThat(ex.getMessage(), is(equalTo("PluginSetting cannot be null"))); - } - } - - @Test - public void testFileSourceWithNonExistentFile() { - final Queue> bufferQueue = new LinkedList<>(); - final FileSource fileSource = new FileSource(FILE_DOES_NOT_EXIST, TEST_WRITE_TIMEOUT, TEST_PIPELINE_NAME); - try { - fileSource.start(new TestBuffer(bufferQueue, 1, true)); - } catch (RuntimeException ex) { - assertThat(ex.getMessage(), is(equalTo(format("Pipeline [%s] - Error processing the input file %s", - TEST_PIPELINE_NAME, FILE_DOES_NOT_EXIST)))); - } - } - - @Test - public void testFileSourceWriteTimeoutException() { - final Queue> bufferQueue = new LinkedList<>(); - final TestBuffer buffer = new TestBuffer(bufferQueue, 1, true); - final FileSource fileSource = new FileSource(TEST_FILE_PATH, TEST_WRITE_TIMEOUT, TEST_PIPELINE_NAME); - try { - fileSource.start(buffer); - } catch (RuntimeException ex) { - assertThat(ex.getMessage(), is(equalTo(format("Pipeline [%s] - Error processing the input file %s", - TEST_PIPELINE_NAME, TEST_FILE_PATH)))); - } - } - - @Test - public void testFileSourceWritingToBuffer() { - final Queue> bufferQueue = new LinkedList<>(); - final TestBuffer buffer = new TestBuffer(bufferQueue, 1); - final FileSource fileSource = new FileSource(TEST_FILE_PATH, TEST_WRITE_TIMEOUT, TEST_PIPELINE_NAME); - assertThat(buffer.size(), is(equalTo(0))); - fileSource.start(buffer); - assertThat(buffer.size(), is(equalTo(1))); - final Map.Entry>, CheckpointState> readResult = buffer.read(TEST_WRITE_TIMEOUT); - final Collection> recordsFromBuffer = readResult.getKey(); - assertThat(recordsFromBuffer.size(), is(equalTo(1))); - recordsFromBuffer.forEach(actualRecord -> assertThat(actualRecord.getData(), is(equalTo(FILE_CONTENT)))); - } - - @Test - public void testFileSourceStartAfterStop() { - final Queue> bufferQueue = new LinkedList<>(); - final TestBuffer buffer = new TestBuffer(bufferQueue, 1); - final FileSource fileSource = new FileSource(TEST_FILE_PATH, TEST_WRITE_TIMEOUT, TEST_PIPELINE_NAME); - assertThat(buffer.size(), is(equalTo(0))); - fileSource.stop(); - fileSource.start(buffer); //should not write any records to buffer - assertThat(buffer.size(), is(equalTo(0))); - } - - @Test - public void testFileSourceWithNullBuffer() { - final FileSource fileSource = new FileSource(TEST_FILE_PATH, TEST_WRITE_TIMEOUT, TEST_PIPELINE_NAME); - try { - fileSource.start(null); - } catch (NullPointerException ex) { - assertThat(ex.getMessage(), is(equalTo(format("Pipeline [%s] - buffer cannot be null for file source to start", - TEST_PIPELINE_NAME)))); - } - } - -} diff --git a/data-prepper-plugins/common/src/test/java/com/amazon/dataprepper/plugins/source/file/FileSourceTests.java b/data-prepper-plugins/common/src/test/java/com/amazon/dataprepper/plugins/source/file/FileSourceTests.java new file mode 100644 index 0000000000..b5e5459fe6 --- /dev/null +++ b/data-prepper-plugins/common/src/test/java/com/amazon/dataprepper/plugins/source/file/FileSourceTests.java @@ -0,0 +1,249 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package com.amazon.dataprepper.plugins.source.file; + +import com.amazon.dataprepper.metrics.PluginMetrics; +import com.amazon.dataprepper.model.buffer.Buffer; +import com.amazon.dataprepper.model.configuration.PluginSetting; +import com.amazon.dataprepper.model.event.Event; +import com.amazon.dataprepper.model.event.JacksonEvent; +import com.amazon.dataprepper.model.plugin.PluginFactory; +import com.amazon.dataprepper.model.record.Record; +import com.amazon.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@ExtendWith(MockitoExtension.class) +public class FileSourceTests { + private static final Logger LOG = LoggerFactory.getLogger(FileSourceTests.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final TypeReference> MAP_TYPE_REFERENCE = new TypeReference>() {}; + + private static final String TEST_PIPELINE_NAME = "pipeline"; + private static final String TEST_FILE_PATH_PLAIN = "src/test/resources/test-file-source-plain.tst"; + private static final String TEST_FILE_PATH_JSON = "src/test/resources/test-file-source-json.tst"; + private static final String TEST_FILE_PATH_INVALID_JSON = "src/test/resources/test-file-source-invalid-json.tst"; + private static final String FILE_DOES_NOT_EXIST = "file_does_not_exist"; + + private FileSourceConfig fileSourceConfig; + private FileSource fileSource; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private PluginFactory pluginFactory; + + private Buffer> buffer; + + private Map pluginSettings; + + private List> expectedEventsPlain; + private List> expectedEventsJson; + private List> expectedEventsInvalidJson; + + + @BeforeEach + public void setup() { + pluginSettings = new HashMap<>(); + expectedEventsPlain = new ArrayList<>(); + expectedEventsJson = new ArrayList<>(); + expectedEventsInvalidJson = new ArrayList<>(); + + pluginSettings.put(FileSourceConfig.ATTRIBUTE_TYPE, FileSourceConfig.EVENT_TYPE); + pluginSettings.put(FileSourceConfig.ATTRIBUTE_PATH, TEST_FILE_PATH_PLAIN); + + // plain + final String expectedPlainFirstLine = "THIS IS A PLAINTEXT LINE"; + final String expectedPlainSecondLine = "THIS IS ANOTHER PLAINTEXT LINE"; + + + final Record firstEventPlain = createRecordEventWithKeyValuePair(FileSource.MESSAGE_KEY, expectedPlainFirstLine); + final Record secondEventPlain = createRecordEventWithKeyValuePair(FileSource.MESSAGE_KEY, expectedPlainSecondLine); + + expectedEventsPlain.add(firstEventPlain); + expectedEventsPlain.add(secondEventPlain); + + //json + final String expectedJsonFirstLine = "{\"test_key: \"test_value\"}"; + final String expectedJsonSecondLine = "{\"second_test_key\": \"second_test_value\"}"; + + final Record firstEventJson = createRecordEventWithKeyValuePair("test_key", "test_value"); + final Record secondEventJson = createRecordEventWithKeyValuePair("second_test_key", "second_test_value"); + + expectedEventsJson.add(firstEventJson); + expectedEventsJson.add(secondEventJson); + + // invalid json + final String expectedInvalidJsonFirstLine = "{\"test_key: test_value\"}"; + final String expectedInvalidJsonSecondLine = "{\"second_test_key\": \"second_test_value\""; + + + final Record firstEventInvalidJson = createRecordEventWithKeyValuePair(FileSource.MESSAGE_KEY, expectedInvalidJsonFirstLine); + final Record secondEventInvalidJson = createRecordEventWithKeyValuePair(FileSource.MESSAGE_KEY, expectedInvalidJsonSecondLine); + + expectedEventsInvalidJson.add(firstEventInvalidJson); + expectedEventsInvalidJson.add(secondEventInvalidJson); + + + + buffer = getBuffer(); + } + + private FileSource createObjectUnderTest() { + fileSourceConfig = OBJECT_MAPPER.convertValue(pluginSettings, FileSourceConfig.class); + return new FileSource(fileSourceConfig, pluginMetrics, pluginFactory); + } + + private BlockingBuffer> getBuffer() { + final HashMap integerHashMap = new HashMap<>(); + integerHashMap.put("buffer_size", 2); + integerHashMap.put("batch_size", 2); + final PluginSetting pluginSetting = new PluginSetting("blocking_buffer", integerHashMap); + pluginSetting.setPipelineName(TEST_PIPELINE_NAME); + return new BlockingBuffer<>(pluginSetting); + } + + @Test + public void testFileSourceWithEmptyFilePathThrowsRuntimeException() { + pluginSettings.put(FileSourceConfig.ATTRIBUTE_PATH, ""); + fileSource = createObjectUnderTest(); + assertThrows(RuntimeException.class, () -> fileSource.start(buffer)); + } + + @Test + public void testFileSourceWithNonexistentFilePathThrowsRuntimeException() { + pluginSettings.put(FileSourceConfig.ATTRIBUTE_PATH, FILE_DOES_NOT_EXIST); + fileSource = createObjectUnderTest(); + assertThrows(RuntimeException.class, () -> fileSource.start(buffer)); + } + + @Test + public void testFileSourceWithNullFilePathThrowsNullPointerException() { + pluginSettings.put(FileSourceConfig.ATTRIBUTE_PATH, null); + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Test + public void testFileWithPlainTextAddsEventsToBufferCorrectly() { + fileSource = createObjectUnderTest(); + fileSource.start(buffer); + + final List> bufferEvents = new ArrayList<>(buffer.read(1000).getKey()); + + assertThat(bufferEvents.size(), equalTo(expectedEventsPlain.size())); + assertExpectedRecordsAreEqual(expectedEventsPlain, bufferEvents); + } + + @Test + public void testFileWithJSONAddsEventsToBufferCorrectly() { + pluginSettings.put(FileSourceConfig.ATTRIBUTE_PATH, TEST_FILE_PATH_JSON); + pluginSettings.put(FileSourceConfig.ATTRIBUTE_FORMAT, "json"); + + fileSource = createObjectUnderTest(); + fileSource.start(buffer); + + final List> bufferEvents = new ArrayList<>(buffer.read(1000).getKey()); + + assertThat(bufferEvents.size(), equalTo(expectedEventsJson.size())); + assertExpectedRecordsAreEqual(expectedEventsJson, bufferEvents); + } + + @Test + public void testFileWithInvalidJSONAddsEventsToBufferAsPlainText() { + pluginSettings.put(FileSourceConfig.ATTRIBUTE_PATH, TEST_FILE_PATH_INVALID_JSON); + pluginSettings.put(FileSourceConfig.ATTRIBUTE_FORMAT, "json"); + fileSource = createObjectUnderTest(); + fileSource.start(buffer); + + final List> bufferEvents = new ArrayList<>(buffer.read(1000).getKey()); + + assertThat(bufferEvents.size(), equalTo(expectedEventsInvalidJson.size())); + assertExpectedRecordsAreEqual(expectedEventsInvalidJson, bufferEvents); + } + + @Test + public void testStringTypeAddsStringsToBufferCorrectly() { + pluginSettings.put(FileSourceConfig.ATTRIBUTE_TYPE, FileSourceConfig.DEFAULT_TYPE); + fileSource = createObjectUnderTest(); + fileSource.start(buffer); + + final List> bufferEvents = new ArrayList<>(buffer.read(1000).getKey()); + + assertThat(bufferEvents.size(), equalTo(expectedEventsPlain.size())); + assertThat(bufferEvents.get(0).getData(), equalTo("THIS IS A PLAINTEXT LINE")); + assertThat(bufferEvents.get(1).getData(), equalTo("THIS IS ANOTHER PLAINTEXT LINE")); + + } + + @Test + public void testNonSupportedFileFormatThrowsIllegalArgumentException() { + pluginSettings.put(FileSourceConfig.ATTRIBUTE_FORMAT, "unsupported"); + assertThrows(IllegalArgumentException.class, this::createObjectUnderTest); + } + + @Test + public void testNonSupportedFileTypeThrowsIllegalArgumentException() { + pluginSettings.put(FileSourceConfig.ATTRIBUTE_TYPE, "bad_type"); + assertThrows(IllegalArgumentException.class, this::createObjectUnderTest); + } + + static void assertExpectedRecordsAreEqual(final List> expectedEvents, final List> actualEvents) { + for (int i = 0; i < expectedEvents.size(); i++) { + assertThat(actualEvents.get(i), notNullValue()); + assertThat(actualEvents.get(i).getData(), notNullValue()); + assertEventRecordsAreEqual(actualEvents.get(i), expectedEvents.get(i)); + } + } + + static void assertEventRecordsAreEqual(final Record first, final Record second) { + try { + final Event firstEvent = (Event) first.getData(); + final Event secondEvent = (Event) second.getData(); + final Map recordMapFirst = OBJECT_MAPPER.readValue(firstEvent.toJsonString(), MAP_TYPE_REFERENCE); + final Map recordMapSecond = OBJECT_MAPPER.readValue(secondEvent.toJsonString(), MAP_TYPE_REFERENCE); + assertThat(recordMapFirst, is(equalTo(recordMapSecond))); + } catch (JsonProcessingException e) { + LOG.error("Unable to parse Event as JSON"); + } + } + + private Record createRecordEventWithKeyValuePair(final String key, final String value) { + final Map eventData = new HashMap<>(); + eventData.put(key, value); + + return new Record<>(JacksonEvent + .builder() + .withEventType("event") + .withData(eventData) + .build()); + } +} diff --git a/data-prepper-plugins/common/src/test/resources/test-file-source-invalid-json.tst b/data-prepper-plugins/common/src/test/resources/test-file-source-invalid-json.tst new file mode 100644 index 0000000000..5cdfe97552 --- /dev/null +++ b/data-prepper-plugins/common/src/test/resources/test-file-source-invalid-json.tst @@ -0,0 +1,2 @@ +{"test_key: test_value"} +{"second_test_key": "second_test_value" \ No newline at end of file diff --git a/data-prepper-plugins/common/src/test/resources/test-file-source-json.tst b/data-prepper-plugins/common/src/test/resources/test-file-source-json.tst new file mode 100644 index 0000000000..0d0d8037ab --- /dev/null +++ b/data-prepper-plugins/common/src/test/resources/test-file-source-json.tst @@ -0,0 +1,2 @@ +{"test_key": "test_value"} +{"second_test_key": "second_test_value"} \ No newline at end of file diff --git a/data-prepper-plugins/common/src/test/resources/test-file-source-plain.tst b/data-prepper-plugins/common/src/test/resources/test-file-source-plain.tst new file mode 100644 index 0000000000..226beb530d --- /dev/null +++ b/data-prepper-plugins/common/src/test/resources/test-file-source-plain.tst @@ -0,0 +1,2 @@ +THIS IS A PLAINTEXT LINE +THIS IS ANOTHER PLAINTEXT LINE \ No newline at end of file diff --git a/data-prepper-plugins/common/src/test/resources/test-file-source.tst b/data-prepper-plugins/common/src/test/resources/test-file-source.tst deleted file mode 100644 index ca80c316c9..0000000000 --- a/data-prepper-plugins/common/src/test/resources/test-file-source.tst +++ /dev/null @@ -1 +0,0 @@ -THIS IS A TEST \ No newline at end of file diff --git a/data-prepper-plugins/grok-prepper/README.md b/data-prepper-plugins/grok-prepper/README.md index 7e71d9ba6b..86999decb5 100644 --- a/data-prepper-plugins/grok-prepper/README.md +++ b/data-prepper-plugins/grok-prepper/README.md @@ -17,6 +17,8 @@ grok-pipeline: source: file: path: "/full/path/to/grok_logs_json.log" + record_type: "event" + format: "json" prepper: - grok: match: