Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

-Support for Source Codecs #2519

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,22 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.model.codec;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;

import java.io.IOException;
import java.io.InputStream;
import java.util.function.Consumer;

/**
* A codec parsing data through an input stream. Each implementation of this class should
* support parsing a specific type or format of data. See sub-classes for examples.
*/
public interface Codec {
public interface InputCodec {
/**
* Parses an {@link InputStream}. Implementors should call the {@link Consumer} for each
* {@link Record} loaded from the {@link InputStream}.
*
* @param inputStream The input stream for the S3 object
* @param eventConsumer The consumer which handles each event from the stream
* @throws IOException throws IOException when invalid input is received or incorrect codec name is provided
*/
void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.plugins.codec.csv;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvReadException;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,15 +30,15 @@
import java.util.function.Consumer;

/**
* An implementation of {@link Codec} which parses CSV records into fields.
* An implementation of {@link InputCodec} which parses CSV records into fields.
*/
@DataPrepperPlugin(name = "csv", pluginType = Codec.class, pluginConfigurationType = CsvCodecConfig.class)
public class CsvCodec implements Codec {
private static final Logger LOG = LoggerFactory.getLogger(CsvCodec.class);
private final CsvCodecConfig config;
@DataPrepperPlugin(name = "csv", pluginType = InputCodec.class, pluginConfigurationType = CsvInputCodecConfig.class)
public class CsvInputCodec implements InputCodec {
private static final Logger LOG = LoggerFactory.getLogger(CsvInputCodec.class);
private final CsvInputCodecConfig config;

@DataPrepperPluginConstructor
public CsvCodec(final CsvCodecConfig config) {
public CsvInputCodec(CsvInputCodecConfig config) {
Objects.requireNonNull(config);
this.config = config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.plugins.codec.csv;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
Expand All @@ -12,9 +12,9 @@
import java.util.Objects;

/**
* Configuration class for {@link CsvCodec}.
* Configuration class for {@link CsvInputCodec}.
*/
public class CsvCodecConfig {
public class CsvInputCodecConfig {
static final String DEFAULT_DELIMITER = ",";
static final String DEFAULT_QUOTE_CHARACTER = "\""; // double quote
static final Boolean DEFAULT_DETECT_HEADER = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.plugins.codec.csv;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.MappingIterator;
Expand Down Expand Up @@ -50,17 +50,17 @@
@ExtendWith(MockitoExtension.class)
public class CsvCodecTest {
@Mock
private CsvCodecConfig config;
private CsvInputCodecConfig config;
@Mock
private Consumer<Record<Event>> eventConsumer;
private CsvCodec csvCodec;
private CsvCodec createObjectUnderTest() {
return new CsvCodec(config);
private CsvInputCodec csvCodec;
private CsvInputCodec createObjectUnderTest() {
return new CsvInputCodec(config);
}

@BeforeEach
void setup() {
CsvCodecConfig defaultCsvCodecConfig = new CsvCodecConfig();
CsvInputCodecConfig defaultCsvCodecConfig = new CsvInputCodecConfig();
lenient().when(config.getDelimiter()).thenReturn(defaultCsvCodecConfig.getDelimiter());
lenient().when(config.getQuoteCharacter()).thenReturn(defaultCsvCodecConfig.getQuoteCharacter());
lenient().when(config.getHeader()).thenReturn(defaultCsvCodecConfig.getHeader());
Expand Down
20 changes: 20 additions & 0 deletions data-prepper-plugins/newline-codecs/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
plugins {
id 'java'
}


repositories {
mavenCentral()
}

dependencies {
implementation project(':data-prepper-api')
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.plugins.codec.newline;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
Expand All @@ -20,14 +21,14 @@
import java.util.Objects;
import java.util.function.Consumer;

@DataPrepperPlugin(name = "newline", pluginType = Codec.class, pluginConfigurationType = NewlineDelimitedConfig.class)
public class NewlineDelimitedCodec implements Codec {
@DataPrepperPlugin(name = "newline", pluginType = InputCodec.class, pluginConfigurationType = NewlineDelimitedInputConfig.class)
public class NewlineDelimitedInputCodec implements InputCodec {
private static final String MESSAGE_FIELD_NAME = "message";
private final int skipLines;
private final String headerDestination;

@DataPrepperPluginConstructor
public NewlineDelimitedCodec(final NewlineDelimitedConfig config) {
public NewlineDelimitedInputCodec(final NewlineDelimitedInputConfig config) {
Objects.requireNonNull(config);
skipLines = config.getSkipLines();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.plugins.codec.newline;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
Expand All @@ -13,7 +13,7 @@
/**
* Configuration class for the newline delimited codec.
*/
public class NewlineDelimitedConfig {
public class NewlineDelimitedInputConfig {
private int skipLines = 0;

@JsonProperty("header_destination")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.plugins.codec.newline;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.record.Record;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.record.Record;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand All @@ -32,17 +32,18 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verifyNoInteractions;


@ExtendWith(MockitoExtension.class)
class NewlineDelimitedCodecTest {

@Mock
private NewlineDelimitedConfig config;
private NewlineDelimitedInputConfig config;

private NewlineDelimitedCodec createObjectUnderTest() {
return new NewlineDelimitedCodec(config);
private NewlineDelimitedInputCodec createObjectUnderTest() {
return new NewlineDelimitedInputCodec(config);
}

@Test
Expand All @@ -54,7 +55,7 @@ void constructor_throws_if_config_is_null() {

@Test
void constructor_throws_if_header_destination_is_empty() throws NoSuchFieldException, IllegalAccessException {
final NewlineDelimitedConfig objectUnderTest = new NewlineDelimitedConfig();
final NewlineDelimitedInputConfig objectUnderTest = new NewlineDelimitedInputConfig();
reflectivelySetField(objectUnderTest, "headerDestination", "");
assertThat(objectUnderTest.isValidHeaderDestination(), equalTo(false));
}
Expand Down Expand Up @@ -242,12 +243,12 @@ private String generateMultilineString(final List<String> numberOfLines) {
return stringWriter.toString();
}

private void reflectivelySetField(final NewlineDelimitedConfig newlineDelimitedConfig, final String fieldName, final Object value)
private void reflectivelySetField(final NewlineDelimitedInputConfig newlineDelimitedInputConfig, final String fieldName, final Object value)
throws NoSuchFieldException, IllegalAccessException {
final Field field = NewlineDelimitedConfig.class.getDeclaredField(fieldName);
final Field field = NewlineDelimitedInputConfig.class.getDeclaredField(fieldName);
try {
field.setAccessible(true);
field.set(newlineDelimitedConfig, value);
field.set(newlineDelimitedInputConfig, value);
} finally {
field.setAccessible(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.codec;
package org.opensearch.dataprepper.plugins.codec.json;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -22,10 +23,11 @@
import java.util.function.Consumer;

/**
* An implementation of {@link Codec} which parses JSON objects for arrays.
* An implementation of {@link InputCodec} which parses JSON Objects for arrays.
*/
@DataPrepperPlugin(name = "json", pluginType = Codec.class)
public class JsonCodec implements Codec {
@DataPrepperPlugin(name = "json", pluginType = InputCodec.class)
public class JsonInputCodec implements InputCodec {

private final ObjectMapper objectMapper = new ObjectMapper();
private final JsonFactory jsonFactory = new JsonFactory();

Expand All @@ -42,6 +44,7 @@ public void parse(final InputStream inputStream, final Consumer<Record<Event>> e
parseRecordsArray(jsonParser, eventConsumer);
}
}

}

private void parseRecordsArray(final JsonParser jsonParser, final Consumer<Record<Event>> eventConsumer) throws IOException {
Expand All @@ -60,4 +63,5 @@ private Record<Event> createRecord(final Map<String, Object> json) {

return new Record<>(event);
}

}
Loading