diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/codec/Codec.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/InputCodec.java similarity index 72% rename from data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/codec/Codec.java rename to data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/InputCodec.java index a29aacf6a1..296a809a83 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/codec/Codec.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/InputCodec.java @@ -3,26 +3,22 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.codec; +package org.opensearch.dataprepper.model.codec; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; - import java.io.IOException; import java.io.InputStream; import java.util.function.Consumer; -/** - * A codec parsing data through an input stream. Each implementation of this class should - * support parsing a specific type or format of data. See sub-classes for examples. - */ -public interface Codec { +public interface InputCodec { /** * Parses an {@link InputStream}. Implementors should call the {@link Consumer} for each * {@link Record} loaded from the {@link InputStream}. * * @param inputStream The input stream for the S3 object * @param eventConsumer The consumer which handles each event from the stream + * @throws IOException throws IOException when invalid input is received or incorrect codec name is provided */ void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException; } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/codec/CsvCodec.java b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvInputCodec.java similarity index 93% rename from data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/codec/CsvCodec.java rename to data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvInputCodec.java index bac4f0731f..02176c3325 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/codec/CsvCodec.java +++ b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvInputCodec.java @@ -3,18 +3,19 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.codec; +package org.opensearch.dataprepper.plugins.codec.csv; -import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; -import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.log.JacksonLog; -import org.opensearch.dataprepper.model.record.Record; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.MappingIterator; import com.fasterxml.jackson.dataformat.csv.CsvMapper; import com.fasterxml.jackson.dataformat.csv.CsvReadException; import com.fasterxml.jackson.dataformat.csv.CsvSchema; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.record.Record; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,15 +30,15 @@ import java.util.function.Consumer; /** - * An implementation of {@link Codec} which parses CSV records into fields. + * An implementation of {@link InputCodec} which parses CSV records into fields. */ -@DataPrepperPlugin(name = "csv", pluginType = Codec.class, pluginConfigurationType = CsvCodecConfig.class) -public class CsvCodec implements Codec { - private static final Logger LOG = LoggerFactory.getLogger(CsvCodec.class); - private final CsvCodecConfig config; +@DataPrepperPlugin(name = "csv", pluginType = InputCodec.class, pluginConfigurationType = CsvInputCodecConfig.class) +public class CsvInputCodec implements InputCodec { + private static final Logger LOG = LoggerFactory.getLogger(CsvInputCodec.class); + private final CsvInputCodecConfig config; @DataPrepperPluginConstructor - public CsvCodec(final CsvCodecConfig config) { + public CsvInputCodec(CsvInputCodecConfig config) { Objects.requireNonNull(config); this.config = config; } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/codec/CsvCodecConfig.java b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvInputCodecConfig.java similarity index 94% rename from data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/codec/CsvCodecConfig.java rename to data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvInputCodecConfig.java index ee3fd3953c..986fc29c2f 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/codec/CsvCodecConfig.java +++ b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/csv/CsvInputCodecConfig.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.codec; +package org.opensearch.dataprepper.plugins.codec.csv; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.AssertTrue; @@ -12,9 +12,9 @@ import java.util.Objects; /** - * Configuration class for {@link CsvCodec}. + * Configuration class for {@link CsvInputCodec}. */ -public class CsvCodecConfig { +public class CsvInputCodecConfig { static final String DEFAULT_DELIMITER = ","; static final String DEFAULT_QUOTE_CHARACTER = "\""; // double quote static final Boolean DEFAULT_DETECT_HEADER = true; diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/codec/CsvCodecTest.java b/data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/csv/CsvCodecTest.java similarity index 99% rename from data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/codec/CsvCodecTest.java rename to data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/csv/CsvCodecTest.java index cc215e854d..ddc2644c4c 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/codec/CsvCodecTest.java +++ b/data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/csv/CsvCodecTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.codec; +package org.opensearch.dataprepper.plugins.codec.csv; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.MappingIterator; @@ -50,17 +50,17 @@ @ExtendWith(MockitoExtension.class) public class CsvCodecTest { @Mock - private CsvCodecConfig config; + private CsvInputCodecConfig config; @Mock private Consumer> eventConsumer; - private CsvCodec csvCodec; - private CsvCodec createObjectUnderTest() { - return new CsvCodec(config); + private CsvInputCodec csvCodec; + private CsvInputCodec createObjectUnderTest() { + return new CsvInputCodec(config); } @BeforeEach void setup() { - CsvCodecConfig defaultCsvCodecConfig = new CsvCodecConfig(); + CsvInputCodecConfig defaultCsvCodecConfig = new CsvInputCodecConfig(); lenient().when(config.getDelimiter()).thenReturn(defaultCsvCodecConfig.getDelimiter()); lenient().when(config.getQuoteCharacter()).thenReturn(defaultCsvCodecConfig.getQuoteCharacter()); lenient().when(config.getHeader()).thenReturn(defaultCsvCodecConfig.getHeader()); diff --git a/data-prepper-plugins/newline-codecs/build.gradle b/data-prepper-plugins/newline-codecs/build.gradle new file mode 100644 index 0000000000..ccc3d83a32 --- /dev/null +++ b/data-prepper-plugins/newline-codecs/build.gradle @@ -0,0 +1,20 @@ +plugins { + id 'java' +} + + +repositories { + mavenCentral() +} + +dependencies { + implementation project(':data-prepper-api') + implementation 'com.fasterxml.jackson.core:jackson-core' + implementation 'com.fasterxml.jackson.core:jackson-databind' + testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1' + testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1' +} + +test { + useJUnitPlatform() +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/codec/NewlineDelimitedCodec.java b/data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedInputCodec.java similarity index 87% rename from data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/codec/NewlineDelimitedCodec.java rename to data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedInputCodec.java index f99a9e58ac..12d4f3962e 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/codec/NewlineDelimitedCodec.java +++ b/data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedInputCodec.java @@ -3,10 +3,11 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.codec; +package org.opensearch.dataprepper.plugins.codec.newline; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; @@ -20,14 +21,14 @@ import java.util.Objects; import java.util.function.Consumer; -@DataPrepperPlugin(name = "newline", pluginType = Codec.class, pluginConfigurationType = NewlineDelimitedConfig.class) -public class NewlineDelimitedCodec implements Codec { +@DataPrepperPlugin(name = "newline", pluginType = InputCodec.class, pluginConfigurationType = NewlineDelimitedInputConfig.class) +public class NewlineDelimitedInputCodec implements InputCodec { private static final String MESSAGE_FIELD_NAME = "message"; private final int skipLines; private final String headerDestination; @DataPrepperPluginConstructor - public NewlineDelimitedCodec(final NewlineDelimitedConfig config) { + public NewlineDelimitedInputCodec(final NewlineDelimitedInputConfig config) { Objects.requireNonNull(config); skipLines = config.getSkipLines(); diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/codec/NewlineDelimitedConfig.java b/data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedInputConfig.java similarity index 92% rename from data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/codec/NewlineDelimitedConfig.java rename to data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedInputConfig.java index 6fc63372b5..579b825ba1 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/codec/NewlineDelimitedConfig.java +++ b/data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedInputConfig.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.codec; +package org.opensearch.dataprepper.plugins.codec.newline; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.AssertTrue; @@ -13,7 +13,7 @@ /** * Configuration class for the newline delimited codec. */ -public class NewlineDelimitedConfig { +public class NewlineDelimitedInputConfig { private int skipLines = 0; @JsonProperty("header_destination") diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/codec/NewlineDelimitedCodecTest.java b/data-prepper-plugins/newline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedCodecTest.java similarity index 94% rename from data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/codec/NewlineDelimitedCodecTest.java rename to data-prepper-plugins/newline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedCodecTest.java index cb91e08879..00e56dbf65 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/codec/NewlineDelimitedCodecTest.java +++ b/data-prepper-plugins/newline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedCodecTest.java @@ -3,17 +3,17 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.codec; +package org.opensearch.dataprepper.plugins.codec.newline; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventType; -import org.opensearch.dataprepper.model.record.Record; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventType; +import org.opensearch.dataprepper.model.record.Record; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -32,17 +32,18 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verifyNoInteractions; + @ExtendWith(MockitoExtension.class) class NewlineDelimitedCodecTest { @Mock - private NewlineDelimitedConfig config; + private NewlineDelimitedInputConfig config; - private NewlineDelimitedCodec createObjectUnderTest() { - return new NewlineDelimitedCodec(config); + private NewlineDelimitedInputCodec createObjectUnderTest() { + return new NewlineDelimitedInputCodec(config); } @Test @@ -54,7 +55,7 @@ void constructor_throws_if_config_is_null() { @Test void constructor_throws_if_header_destination_is_empty() throws NoSuchFieldException, IllegalAccessException { - final NewlineDelimitedConfig objectUnderTest = new NewlineDelimitedConfig(); + final NewlineDelimitedInputConfig objectUnderTest = new NewlineDelimitedInputConfig(); reflectivelySetField(objectUnderTest, "headerDestination", ""); assertThat(objectUnderTest.isValidHeaderDestination(), equalTo(false)); } @@ -242,12 +243,12 @@ private String generateMultilineString(final List numberOfLines) { return stringWriter.toString(); } - private void reflectivelySetField(final NewlineDelimitedConfig newlineDelimitedConfig, final String fieldName, final Object value) + private void reflectivelySetField(final NewlineDelimitedInputConfig newlineDelimitedInputConfig, final String fieldName, final Object value) throws NoSuchFieldException, IllegalAccessException { - final Field field = NewlineDelimitedConfig.class.getDeclaredField(fieldName); + final Field field = NewlineDelimitedInputConfig.class.getDeclaredField(fieldName); try { field.setAccessible(true); - field.set(newlineDelimitedConfig, value); + field.set(newlineDelimitedInputConfig, value); } finally { field.setAccessible(false); } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/codec/JsonCodec.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodec.java similarity index 86% rename from data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/codec/JsonCodec.java rename to data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodec.java index f52d84141f..921f274bd1 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/codec/JsonCodec.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodec.java @@ -3,17 +3,18 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.codec; +package org.opensearch.dataprepper.plugins.codec.json; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.record.Record; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.io.InputStream; @@ -22,10 +23,11 @@ import java.util.function.Consumer; /** - * An implementation of {@link Codec} which parses JSON objects for arrays. + * An implementation of {@link InputCodec} which parses JSON Objects for arrays. */ -@DataPrepperPlugin(name = "json", pluginType = Codec.class) -public class JsonCodec implements Codec { +@DataPrepperPlugin(name = "json", pluginType = InputCodec.class) +public class JsonInputCodec implements InputCodec { + private final ObjectMapper objectMapper = new ObjectMapper(); private final JsonFactory jsonFactory = new JsonFactory(); @@ -42,6 +44,7 @@ public void parse(final InputStream inputStream, final Consumer> e parseRecordsArray(jsonParser, eventConsumer); } } + } private void parseRecordsArray(final JsonParser jsonParser, final Consumer> eventConsumer) throws IOException { @@ -60,4 +63,5 @@ private Record createRecord(final Map json) { return new Record<>(event); } + } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/codec/JsonCodecTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodecTest.java similarity index 97% rename from data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/codec/JsonCodecTest.java rename to data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodecTest.java index 875cbbbd7c..a7dcde8ab6 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/codec/JsonCodecTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodecTest.java @@ -3,11 +3,8 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.codec; +package org.opensearch.dataprepper.plugins.codec.json; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.EventType; -import org.opensearch.dataprepper.model.record.Record; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.BeforeEach; @@ -19,33 +16,39 @@ import org.junit.jupiter.params.provider.ArgumentsSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventType; +import org.opensearch.dataprepper.model.record.Record; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; + import java.util.Collections; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Stream; +import java.util.LinkedHashMap; +import java.util.ArrayList; +import java.util.Arrays; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.params.provider.Arguments.arguments; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.mock; + + -class JsonCodecTest { +class JsonInputCodecTest { private ObjectMapper objectMapper; private Consumer> eventConsumer; @@ -57,13 +60,13 @@ void setUp() { eventConsumer = mock(Consumer.class); } - private JsonCodec createObjectUnderTest() { - return new JsonCodec(); + private JsonInputCodec createObjectUnderTest() { + return new JsonInputCodec(); } @Test void parse_with_null_InputStream_throws() { - final JsonCodec objectUnderTest = createObjectUnderTest(); + final JsonInputCodec objectUnderTest = createObjectUnderTest(); assertThrows(NullPointerException.class, () -> objectUnderTest.parse(null, eventConsumer)); @@ -73,7 +76,7 @@ void parse_with_null_InputStream_throws() { @Test void parse_with_null_Consumer_throws() { - final JsonCodec objectUnderTest = createObjectUnderTest(); + final JsonInputCodec objectUnderTest = createObjectUnderTest(); final InputStream inputStream = mock(InputStream.class); assertThrows(NullPointerException.class, () -> diff --git a/data-prepper-plugins/s3-source/build.gradle b/data-prepper-plugins/s3-source/build.gradle index b71f76576d..feec08b161 100644 --- a/data-prepper-plugins/s3-source/build.gradle +++ b/data-prepper-plugins/s3-source/build.gradle @@ -31,6 +31,9 @@ dependencies { testImplementation 'com.github.tomakehurst:wiremock:3.0.0-beta-5' testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' testImplementation project(':data-prepper-test-common') + testImplementation project(':data-prepper-plugins:csv-processor') + testImplementation project(':data-prepper-plugins:parse-json-processor') + testImplementation project(':data-prepper-plugins:newline-codecs') } test { diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/CsvRecordsGenerator.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/CsvRecordsGenerator.java index c67c7f4134..1e5746f360 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/CsvRecordsGenerator.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/CsvRecordsGenerator.java @@ -5,10 +5,10 @@ package org.opensearch.dataprepper.plugins.source; +import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.plugins.source.codec.Codec; -import org.opensearch.dataprepper.plugins.source.codec.CsvCodec; -import org.opensearch.dataprepper.plugins.source.codec.CsvCodecConfig; +import org.opensearch.dataprepper.plugins.codec.csv.CsvInputCodecConfig; +import org.opensearch.dataprepper.plugins.codec.csv.CsvInputCodec; import java.io.OutputStream; import java.io.PrintWriter; @@ -37,19 +37,19 @@ public void write(final int numberOfRecords, final OutputStream outputStream) { } @Override - public Codec getCodec() { - CsvCodecConfig config = csvCodecConfigWithAutogenerateHeader(); - return new CsvCodec(config); + public InputCodec getCodec() { + CsvInputCodecConfig config = csvCodecConfigWithAutogenerateHeader(); + return new CsvInputCodec(config); } /** * For easy testing, we will autogenerate all column names (which requires setting detectHeader = false) * @return CsvCodecConfig for testing */ - private CsvCodecConfig csvCodecConfigWithAutogenerateHeader() { - CsvCodecConfig csvCodecConfig = new CsvCodecConfig(); + private CsvInputCodecConfig csvCodecConfigWithAutogenerateHeader() { + CsvInputCodecConfig csvCodecConfig = new CsvInputCodecConfig(); try { - setField(CsvCodecConfig.class, csvCodecConfig, "detectHeader", false); + setField(CsvInputCodecConfig.class, csvCodecConfig, "detectHeader", false); } catch (NoSuchFieldException | IllegalAccessException e) { throw new RuntimeException(e); } diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/JsonRecordsGenerator.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/JsonRecordsGenerator.java index c2b729319e..0eacd1e50a 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/JsonRecordsGenerator.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/JsonRecordsGenerator.java @@ -5,9 +5,9 @@ package org.opensearch.dataprepper.plugins.source; +import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.plugins.source.codec.Codec; -import org.opensearch.dataprepper.plugins.source.codec.JsonCodec; +import org.opensearch.dataprepper.plugins.codec.json.JsonInputCodec; import com.fasterxml.jackson.core.JsonEncoding; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; @@ -48,8 +48,8 @@ public void write(final int numberOfRecords, final OutputStream outputStream) th } @Override - public Codec getCodec() { - return new JsonCodec(); + public InputCodec getCodec() { + return new JsonInputCodec(); } @Override diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/NewlineDelimitedRecordsGenerator.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/NewlineDelimitedRecordsGenerator.java index cf327075b7..975946fbb8 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/NewlineDelimitedRecordsGenerator.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/NewlineDelimitedRecordsGenerator.java @@ -5,10 +5,10 @@ package org.opensearch.dataprepper.plugins.source; +import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.plugins.source.codec.Codec; -import org.opensearch.dataprepper.plugins.source.codec.NewlineDelimitedCodec; -import org.opensearch.dataprepper.plugins.source.codec.NewlineDelimitedConfig; +import org.opensearch.dataprepper.plugins.codec.newline.NewlineDelimitedInputCodec; +import org.opensearch.dataprepper.plugins.codec.newline.NewlineDelimitedInputConfig; import java.io.OutputStream; import java.io.PrintWriter; @@ -38,8 +38,8 @@ public void write(final int numberOfRecords, final OutputStream outputStream) { } @Override - public Codec getCodec() { - return new NewlineDelimitedCodec(new NewlineDelimitedConfig()); + public InputCodec getCodec() { + return new NewlineDelimitedInputCodec(new NewlineDelimitedInputConfig()); } @Override diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/RecordsGenerator.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/RecordsGenerator.java index 4b1e0b9ef7..c2b231b3bf 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/RecordsGenerator.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/RecordsGenerator.java @@ -5,8 +5,8 @@ package org.opensearch.dataprepper.plugins.source; +import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.plugins.source.codec.Codec; import java.io.IOException; import java.io.OutputStream; @@ -14,7 +14,7 @@ interface RecordsGenerator { void write(int numberOfRecords, OutputStream outputStream) throws IOException; - Codec getCodec(); + InputCodec getCodec(); String getFileExtension(); diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorkerIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorkerIT.java index 524877ff1f..29b87e952b 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorkerIT.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorkerIT.java @@ -7,9 +7,9 @@ import io.micrometer.core.instrument.DistributionSummary; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.source.codec.Codec; import org.opensearch.dataprepper.plugins.source.compression.CompressionEngine; import org.opensearch.dataprepper.plugins.source.compression.GZipCompressionEngine; import org.opensearch.dataprepper.plugins.source.compression.NoneCompressionEngine; @@ -109,7 +109,7 @@ private void stubBufferWriter(final Consumer additionalEventAssertions, f .when(buffer).writeAll(anyCollection(), anyInt()); } - private S3ObjectWorker createObjectUnderTest(final Codec codec, final int numberOfRecordsToAccumulate, final CompressionEngine compressionEngine) { + private S3ObjectWorker createObjectUnderTest(final InputCodec codec, final int numberOfRecordsToAccumulate, final CompressionEngine compressionEngine) { final S3ObjectRequest request = new S3ObjectRequest.Builder(buffer, numberOfRecordsToAccumulate, Duration.ofMillis(TIMEOUT_IN_MILLIS), s3ObjectPluginMetrics).bucketOwnerProvider(bucketOwnerProvider) .eventConsumer(eventMetadataModifier).codec(codec).s3Client(s3Client) diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectRequest.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectRequest.java index 1eb6c5a3cf..5f96806f3b 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectRequest.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectRequest.java @@ -7,7 +7,7 @@ import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.source.codec.Codec; +import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.plugins.source.compression.CompressionEngine; import org.opensearch.dataprepper.plugins.source.configuration.S3SelectCSVOption; import org.opensearch.dataprepper.plugins.source.configuration.S3SelectJsonOption; @@ -30,7 +30,7 @@ public class S3ObjectRequest { private final S3SelectResponseHandlerFactory s3SelectResponseHandlerFactory; private final CompressionEngine compressionEngine; private final BucketOwnerProvider bucketOwnerProvider; - private final Codec codec; + private final InputCodec codec; private final BiConsumer eventConsumer; private final S3Client s3Client; private final CompressionType compressionType; @@ -98,7 +98,7 @@ public BucketOwnerProvider getBucketOwnerProvider() { return bucketOwnerProvider; } - public Codec getCodec() { + public InputCodec getCodec() { return codec; } @@ -137,7 +137,7 @@ public static class Builder { private S3AsyncClient s3AsyncClient; private S3SelectResponseHandlerFactory s3SelectResponseHandlerFactory; private CompressionEngine compressionEngine; - private Codec codec; + private InputCodec codec; private BiConsumer eventConsumer; private S3Client s3Client; private CompressionType compressionType; @@ -184,7 +184,7 @@ public Builder compressionEngine(CompressionEngine compressionEngine) { this.compressionEngine = compressionEngine; return this; } - public Builder codec(Codec codec) { + public Builder codec(InputCodec codec) { this.codec = codec; return this; } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorker.java index dd233e6895..510e2f3da5 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorker.java @@ -7,10 +7,10 @@ import org.apache.commons.compress.utils.CountingInputStream; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; -import org.opensearch.dataprepper.plugins.source.codec.Codec; import org.opensearch.dataprepper.plugins.source.compression.CompressionEngine; import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; import org.slf4j.Logger; @@ -37,7 +37,7 @@ class S3ObjectWorker implements S3ObjectHandler { private final S3Client s3Client; private final Buffer> buffer; private final CompressionEngine compressionEngine; - private final Codec codec; + private final InputCodec codec; private final BucketOwnerProvider bucketOwnerProvider; private final Duration bufferTimeout; private final int numberOfRecordsToAccumulate; diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3Source.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3Source.java index 568171e90e..38f87e4e69 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3Source.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3Source.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; @@ -16,7 +17,6 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; -import org.opensearch.dataprepper.plugins.source.codec.Codec; import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; import org.opensearch.dataprepper.plugins.source.ownership.ConfigBucketOwnerProviderFactory; import org.opensearch.dataprepper.plugins.source.configuration.S3SelectOptions; @@ -85,7 +85,7 @@ public void start(Buffer> buffer) { } else { final PluginModel codecConfiguration = s3SourceConfig.getCodec(); final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); - final Codec codec = pluginFactory.loadPlugin(Codec.class, codecPluginSettings); + final InputCodec codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings); final S3ObjectRequest s3ObjectRequest = s3ObjectRequestBuilder .bucketOwnerProvider(bucketOwnerProvider) .codec(codec) diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ObjectRequestTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ObjectRequestTest.java index 58bade2a26..b5c1222211 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ObjectRequestTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ObjectRequestTest.java @@ -9,7 +9,7 @@ import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.source.codec.Codec; +import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.plugins.source.compression.CompressionEngine; import org.opensearch.dataprepper.plugins.source.configuration.S3SelectCSVOption; import org.opensearch.dataprepper.plugins.source.configuration.S3SelectJsonOption; @@ -41,7 +41,7 @@ public class S3ObjectRequestTest { BiConsumer eventConsumer; @Mock - private Codec codec; + private InputCodec codec; @Mock private S3AsyncClient s3AsyncClient; diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorkerTest.java index 78b6e7e1c5..94af7bbe03 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorkerTest.java @@ -19,10 +19,10 @@ import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; -import org.opensearch.dataprepper.plugins.source.codec.Codec; import org.opensearch.dataprepper.plugins.source.compression.CompressionEngine; import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; import software.amazon.awssdk.core.ResponseInputStream; @@ -74,7 +74,7 @@ class S3ObjectWorkerTest { private CompressionEngine compressionEngine; @Mock - private Codec codec; + private InputCodec codec; @Mock private BucketOwnerProvider bucketOwnerProvider; diff --git a/settings.gradle b/settings.gradle index 645ea966c5..bcfaeb08a0 100644 --- a/settings.gradle +++ b/settings.gradle @@ -107,3 +107,4 @@ include 'release:maven' include 'e2e-test:peerforwarder' include 'rss-source' include 'data-prepper-plugins:failures-common' +include 'data-prepper-plugins:newline-codecs' \ No newline at end of file