Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented CSVCodec for S3 Source, config & unit tests #1644

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
* Processor to parse CSV data in Events.
*
*/
@DataPrepperPlugin(name="csv", pluginType = Processor.class, pluginConfigurationType = CSVProcessorConfig.class)
public class CSVProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(CSVProcessor.class);
private final CSVProcessorConfig config;
@DataPrepperPlugin(name="csv", pluginType = Processor.class, pluginConfigurationType = CsvProcessorConfig.class)
public class CsvProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(CsvProcessor.class);
private final CsvProcessorConfig config;

@DataPrepperPluginConstructor
public CSVProcessor(final PluginMetrics pluginMetrics, final CSVProcessorConfig config) {
public CsvProcessor(final PluginMetrics pluginMetrics, final CsvProcessorConfig config) {
super(pluginMetrics);
this.config = config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import jakarta.validation.constraints.AssertTrue;
import java.util.List;

public class CSVProcessorConfig {
public class CsvProcessorConfig {
static final String DEFAULT_SOURCE = "message";
static final String DEFAULT_DELIMITER = ",";
static final String DEFAULT_QUOTE_CHARACTER = "\""; // double quote
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,22 @@

import java.lang.reflect.Field;

import static com.amazon.dataprepper.plugins.processor.csv.CSVProcessorConfig.DEFAULT_SOURCE;
import static com.amazon.dataprepper.plugins.processor.csv.CSVProcessorConfig.DEFAULT_DELIMITER;
import static com.amazon.dataprepper.plugins.processor.csv.CSVProcessorConfig.DEFAULT_QUOTE_CHARACTER;
import static com.amazon.dataprepper.plugins.processor.csv.CSVProcessorConfig.DEFAULT_DELETE_HEADERS;
import static com.amazon.dataprepper.plugins.processor.csv.CsvProcessorConfig.DEFAULT_SOURCE;
import static com.amazon.dataprepper.plugins.processor.csv.CsvProcessorConfig.DEFAULT_DELIMITER;
import static com.amazon.dataprepper.plugins.processor.csv.CsvProcessorConfig.DEFAULT_QUOTE_CHARACTER;
import static com.amazon.dataprepper.plugins.processor.csv.CsvProcessorConfig.DEFAULT_DELETE_HEADERS;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

public class CSVProcessorConfigTest {
public class CsvProcessorConfigTest {

private CSVProcessorConfig createObjectUnderTest() {
return new CSVProcessorConfig();
private CsvProcessorConfig createObjectUnderTest() {
return new CsvProcessorConfig();
}

@Test
public void test_when_defaultCSVProcessorConfig_then_returns_default_values() {
final CSVProcessorConfig objectUnderTest = createObjectUnderTest();
public void test_when_defaultCsvProcessorConfig_then_returns_default_values() {
final CsvProcessorConfig objectUnderTest = createObjectUnderTest();

assertThat(objectUnderTest.getSource(), equalTo(DEFAULT_SOURCE));
assertThat(objectUnderTest.getDelimiter(), equalTo(DEFAULT_DELIMITER));
Expand All @@ -37,7 +37,7 @@ public void test_when_defaultCSVProcessorConfig_then_returns_default_values() {

@Nested
class Validation {
final CSVProcessorConfig csvProcessorConfig = createObjectUnderTest();
final CsvProcessorConfig csvProcessorConfig = createObjectUnderTest();

@Test
void isValidDelimiter_should_return_false_if_delimiter_is_multiple_characters()
Expand Down Expand Up @@ -71,9 +71,9 @@ void isValidQuoteCharacter_should_return_true_if_quote_char_is_single_character(
}
}

private void reflectivelySetField(final CSVProcessorConfig csvProcessorConfig, final String fieldName, final Object value)
private void reflectivelySetField(final CsvProcessorConfig csvProcessorConfig, final String fieldName, final Object value)
throws NoSuchFieldException, IllegalAccessException {
final Field field = CSVProcessorConfig.class.getDeclaredField(fieldName);
final Field field = CsvProcessorConfig.class.getDeclaredField(fieldName);
try {
field.setAccessible(true);
field.set(csvProcessorConfig, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class CSVProcessorTest {
class CsvProcessorTest {
@Mock
private CSVProcessorConfig processorConfig;
private CsvProcessorConfig processorConfig;

@Mock
private PluginMetrics pluginMetrics;

private CSVProcessor csvProcessor;
private CsvProcessor csvProcessor;

@BeforeEach
void setup() {
CSVProcessorConfig defaultConfig = new CSVProcessorConfig();
CsvProcessorConfig defaultConfig = new CsvProcessorConfig();
lenient().when(processorConfig.getSource()).thenReturn(defaultConfig.getSource());
lenient().when(processorConfig.getDelimiter()).thenReturn(defaultConfig.getDelimiter());
lenient().when(processorConfig.isDeleteHeader()).thenReturn(defaultConfig.isDeleteHeader());
Expand All @@ -49,8 +49,8 @@ void setup() {
csvProcessor = createObjectUnderTest();
}

private CSVProcessor createObjectUnderTest() {
return new CSVProcessor(pluginMetrics, processorConfig);
private CsvProcessor createObjectUnderTest() {
return new CsvProcessor(pluginMetrics, processorConfig);
}

@Test
Expand Down
1 change: 1 addition & 0 deletions data-prepper-plugins/s3-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {
implementation 'com.amazonaws:aws-java-sdk-s3:1.12.257'
implementation 'org.apache.commons:commons-compress:1.21'
implementation 'org.hibernate.validator:hibernate-validator:7.0.4.Final'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv'
testImplementation 'org.apache.commons:commons-lang3:3.12.0'
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.source.codec;

import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.annotations.DataPrepperPluginConstructor;
import com.amazon.dataprepper.model.event.Event;
import com.amazon.dataprepper.model.log.JacksonLog;
import com.amazon.dataprepper.model.record.Record;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvReadException;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;

@DataPrepperPlugin(name = "csv", pluginType = Codec.class, pluginConfigurationType = CsvCodecConfig.class)
public class CsvCodec implements Codec {
private static final Logger LOG = LoggerFactory.getLogger(CsvCodec.class);
private final CsvCodecConfig config;

@DataPrepperPluginConstructor
public CsvCodec(final CsvCodecConfig config) {
Objects.requireNonNull(config);
this.config = config;
}

@Override
public void parse(final InputStream inputStream, final Consumer<Record<Event>> eventConsumer) throws IOException {
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
parseBufferedReader(reader, eventConsumer);
}
}

private void parseBufferedReader(final BufferedReader reader, final Consumer<Record<Event>> eventConsumer) throws IOException {
final CsvMapper mapper = createCsvMapper();
final CsvSchema schema;
if (config.isDetectHeader()) {
schema = createAutodetectHeaderCsvSchema();
}
else {
final int numberColumnsFirstLine = getNumberOfColumnsByMarkingBeginningOfInputStreamAndResettingReaderAfter(reader);
schema = createCsvSchemaFromConfig(numberColumnsFirstLine);
}

MappingIterator<Map<String, String>> parsingIterator = mapper.readerFor(Map.class).with(schema).readValues(reader);
try {
while (parsingIterator.hasNextValue()) {
readCsvLine(parsingIterator, eventConsumer);
}
} catch (Exception jsonExceptionOnHasNextLine) {
LOG.error("An Exception occurred while determining if file has next line ", jsonExceptionOnHasNextLine);
}
}

private int getNumberOfColumnsByMarkingBeginningOfInputStreamAndResettingReaderAfter(BufferedReader reader) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: I think you could just call this getNumberOfColumns since the caller of this function doesn't need to care how it works

final int defaultBufferSize = 8192; // this number doesn't affect even a thousand column header — it's sufficiently large.
reader.mark(defaultBufferSize); // calling reader.readLine() will consume the first line, so mark initial location to reset after
final int firstLineNumberColumns = extractNumberOfColumnsFromFirstLine(reader.readLine());
reader.reset(); // move reader pointer back to beginning of file in order to reread first line
return firstLineNumberColumns;
}

private void readCsvLine(final MappingIterator<Map<String, String>> parsingIterator, final Consumer<Record<Event>> eventConsumer) {
try {
final Map<String, String> parsedLine = parsingIterator.nextValue();

final Event event = JacksonLog.builder()
.withData(parsedLine)
.build();
eventConsumer.accept(new Record<>(event));
} catch (final CsvReadException csvException) {
LOG.error("Invalid CSV row, skipping this line. This typically means the row has too many lines. Consider using the CSV " +
"Processor if there might be inconsistencies in the number of columns because it is more flexible. Error ",
csvException);
} catch (JsonParseException jsonException) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing final

LOG.error("A JsonParseException occurred on a row of the CSV file, skipping line. This typically means a quote character was " +
"not properly closed. Error ", jsonException);
} catch (final Exception e) {
LOG.error("An Exception occurred while reading a row of the CSV file. Error ", e);
}
}

private int extractNumberOfColumnsFromFirstLine(final String firstLine) {
int numberOfSeparators = 0;
for (int charPointer = 0; charPointer < firstLine.length(); charPointer++) {
if (firstLine.charAt(charPointer) == config.getDelimiter().charAt(0)) {
numberOfSeparators++;
}
}
return numberOfSeparators + 1;
}

private CsvSchema createCsvSchemaFromConfig(final int firstLineSize) {
final List<String> userSpecifiedHeader = Objects.isNull(config.getHeader()) ? new ArrayList<>() : config.getHeader();
final List<String> actualHeader = new ArrayList<>();
final char delimiter = config.getDelimiter().charAt(0);
final char quoteCharacter = config.getQuoteCharacter().charAt(0);
int providedHeaderColIdx = 0;
for (; providedHeaderColIdx < userSpecifiedHeader.size() && providedHeaderColIdx < firstLineSize; providedHeaderColIdx++) {
actualHeader.add(userSpecifiedHeader.get(providedHeaderColIdx));
}
for (int remainingColIdx = providedHeaderColIdx; remainingColIdx < firstLineSize; remainingColIdx++) {
actualHeader.add(generateColumnHeader(remainingColIdx));
}
CsvSchema.Builder headerBuilder = CsvSchema.builder();
for (String columnName : actualHeader) {
headerBuilder = headerBuilder.addColumn(columnName);
}
CsvSchema schema = headerBuilder.build().withColumnSeparator(delimiter).withQuoteChar(quoteCharacter);
return schema;
}

private String generateColumnHeader(final int columnNumber) {
final int displayColumnNumber = columnNumber + 1; // auto generated column name indices start from 1 (not 0)
return "column" + displayColumnNumber;
}

private CsvMapper createCsvMapper() {
final CsvMapper mapper = new CsvMapper();
return mapper;
}

private CsvSchema createAutodetectHeaderCsvSchema() {
final CsvSchema schema = CsvSchema.emptySchema().withColumnSeparator(config.getDelimiter().charAt(0))
.withQuoteChar(config.getQuoteCharacter().charAt(0)).withHeader();
return schema;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.source.codec;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;

import java.util.List;
import java.util.Objects;

public class CsvCodecConfig {
static final String DEFAULT_DELIMITER = ",";
static final String DEFAULT_QUOTE_CHARACTER = "\""; // double quote
static final Boolean DEFAULT_DETECT_HEADER = true;

@JsonProperty("delimiter")
private String delimiter = DEFAULT_DELIMITER;

@JsonProperty("quote_character")
private String quoteCharacter = DEFAULT_QUOTE_CHARACTER;

@JsonProperty("header")
private List<String> header;

@JsonProperty("detect_header")
private Boolean detectHeader = DEFAULT_DETECT_HEADER;

public String getDelimiter() {
return delimiter;
}

public String getQuoteCharacter() {
return quoteCharacter;
}

public List<String> getHeader() {
return header;
}

public Boolean isDetectHeader() {
return detectHeader;
}

@AssertTrue(message = "delimiter must be exactly one character.")
boolean isValidDelimiter() {
return delimiter.length() == 1;
}

@AssertTrue(message = "quote_character must be exactly one character.")
boolean isValidQuoteCharacter() {
return quoteCharacter.length() == 1;
}

@AssertTrue(message = "quote_character and delimiter cannot be the same character.")
boolean areDelimiterAndQuoteCharacterDifferent() {
return !(delimiter.equals(quoteCharacter));
}

@AssertTrue(message = "header must not be an empty list. To autogenerate columns, set detect_header: false and delete header " +
"from config.")
boolean isValidHeader() {
return Objects.isNull(header) || header.size() > 0;
}
}
Loading