Skip to content

Commit

Permalink
Change the behavior of the CSV codec in the S3 source to fail when it…
Browse files Browse the repository at this point in the history
… is unable to parse CSV rows. Resolves #2512. (#2513)

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Apr 18, 2023
1 parent da14b74 commit bc75494
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,22 @@ private void parseBufferedReader(final BufferedReader reader, final Consumer<Rec
}

MappingIterator<Map<String, String>> parsingIterator = mapper.readerFor(Map.class).with(schema).readValues(reader);
boolean hasNextValue;
try {
while (parsingIterator.hasNextValue()) {
readCsvLine(parsingIterator, eventConsumer);
hasNextValue = parsingIterator.hasNextValue();
} catch (Exception ex) {
LOG.error("An Exception occurred while determining if file has next line ", ex);
throw ex;
}

while (hasNextValue) {
readCsvLine(parsingIterator, eventConsumer);
try {
hasNextValue = parsingIterator.hasNextValue();
} catch (Exception ex) {
LOG.error("An Exception occurred while determining if file has next line ", ex);
throw ex;
}
} catch (Exception jsonExceptionOnHasNextLine) {
LOG.error("An Exception occurred while determining if file has next line ", jsonExceptionOnHasNextLine);
}
}

Expand All @@ -78,7 +88,7 @@ private int getNumberOfColumnsByMarkingBeginningOfInputStreamAndResettingReaderA
return firstLineNumberColumns;
}

private void readCsvLine(final MappingIterator<Map<String, String>> parsingIterator, final Consumer<Record<Event>> eventConsumer) {
private void readCsvLine(final MappingIterator<Map<String, String>> parsingIterator, final Consumer<Record<Event>> eventConsumer) throws IOException {
try {
final Map<String, String> parsedLine = parsingIterator.nextValue();

Expand All @@ -90,11 +100,14 @@ private void readCsvLine(final MappingIterator<Map<String, String>> parsingItera
LOG.error("Invalid CSV row, skipping this line. This typically means the row has too many columns. Consider using the CSV " +
"Processor if there might be inconsistencies in the number of columns because it is more flexible. Error: {}. Line Number: {} " +
"Character Number: {}", csvException.getMessage(), parsingIterator.getCurrentLocation().getLineNr(), parsingIterator.getCurrentLocation().getColumnNr());
throw csvException;
} catch (JsonParseException jsonException) {
LOG.error("A JsonParseException occurred on a row of the CSV file, skipping line. This typically means a quote character was " +
"not properly closed. Error: {}", jsonException.getMessage());
throw jsonException;
} catch (final Exception e) {
LOG.error("An Exception occurred while reading a row of the CSV file. Error ", e);
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@

package org.opensearch.dataprepper.plugins.source.codec;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.record.Record;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvParser;
import com.fasterxml.jackson.dataformat.csv.CsvReadException;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -20,6 +19,9 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.record.Record;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -280,7 +282,7 @@ void test_when_manualHeaderTooManyColumns_then_omitsExtraColumnsOnParsedEvents(f

@ParameterizedTest
@ValueSource(ints = {2, 10, 100})
void test_when_autoDetectHeaderWrongNumberColumnsAndJaggedRows_then_skipsJaggedRows(final int numberOfRows) throws IOException {
void test_when_autoDetectHeaderWrongNumberColumnsAndJaggedRows_then_skipsRemainingRows(final int numberOfRows) throws IOException {
// row that's longer than header should be skipped
when(config.isDetectHeader()).thenReturn(Boolean.TRUE);

Expand All @@ -290,13 +292,14 @@ void test_when_autoDetectHeaderWrongNumberColumnsAndJaggedRows_then_skipsJaggedR
final List<String> csvRowsExcludingHeader = new ArrayList<>(csvRowsExcludingHeaderImmutable);

csvRowsExcludingHeader.add(generateCsvLine(numberOfColumns+1, config.getDelimiter(), config.getQuoteCharacter()));
csvRowsExcludingHeader.addAll(generateCsvLinesAsList(numberOfProperlyFormattedRows, numberOfColumns));


final String header = generateHeader(numberOfColumns);

final InputStream inputStream = createInputStreamAndAppendHeader(csvRowsExcludingHeader, header);

csvCodec.parse(inputStream, eventConsumer);
assertThrows(CsvReadException.class, () -> csvCodec.parse(inputStream, eventConsumer));

final ArgumentCaptor<Record<Event>> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class);

Expand All @@ -322,7 +325,7 @@ void test_when_autoDetectHeaderWrongNumberColumnsAndJaggedRows_then_skipsJaggedR

@ParameterizedTest
@ValueSource(ints = {2, 10, 100})
void test_when_manualHeaderWrongNumberColumnsAndJaggedRows_then_skipsJaggedRows(final int numberOfRows) throws IOException {
void test_when_manualHeaderWrongNumberColumnsAndJaggedRows_then_skipsRemainingRows(final int numberOfRows) throws IOException {
// row that's longer than header should be skipped
when(config.isDetectHeader()).thenReturn(Boolean.FALSE);

Expand All @@ -336,10 +339,11 @@ void test_when_manualHeaderWrongNumberColumnsAndJaggedRows_then_skipsJaggedRows(
final List<String> csvRowsExcludingHeader = new ArrayList<>(csvRowsExcludingHeaderImmutable);

csvRowsExcludingHeader.add(generateCsvLine(numberOfColumns+1, config.getDelimiter(), config.getQuoteCharacter()));
csvRowsExcludingHeader.addAll(generateCsvLinesAsList(numberOfProperlyFormattedRows, numberOfColumns));

final InputStream inputStream = createInputStream(csvRowsExcludingHeader);

csvCodec.parse(inputStream, eventConsumer);
assertThrows(CsvReadException.class, () -> csvCodec.parse(inputStream, eventConsumer));

final ArgumentCaptor<Record<Event>> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class);

Expand Down Expand Up @@ -388,7 +392,7 @@ void test_when_tooFewColumns_then_parsedCorrectly() throws IOException {

@ParameterizedTest
@ValueSource(ints = {2, 10, 100})
void test_when_unrecoverableRow_then_logsExceptionAndSkipsOffendingRow(final int numberOfRows) throws IOException {
void test_when_unrecoverableRow_then_logsExceptionAndSkipsRemainingRows(final int numberOfRows) throws IOException {
// If there's an unclosed quote then expected behavior is to skip that row
when(config.isDetectHeader()).thenReturn(Boolean.TRUE);
when(config.getQuoteCharacter()).thenReturn(";"); // ";" is also the array character
Expand All @@ -399,11 +403,13 @@ void test_when_unrecoverableRow_then_logsExceptionAndSkipsOffendingRow(final int
final List<String> csvRowsExcludingHeader = new ArrayList<>(csvRowsExcludingHeaderImmutable);
csvRowsExcludingHeader.add(generateIllegalString(numberOfColumns, config.getDelimiter(), config.getQuoteCharacter()));
csvRowsExcludingHeader.add(generateCsvLine(numberOfColumns, config.getDelimiter(), config.getQuoteCharacter()));
csvRowsExcludingHeader.addAll(generateCsvLinesAsList(numberOfProperlyFormattedRows, numberOfColumns,
config.getDelimiter(), config.getQuoteCharacter()));
final String header = generateHeader(numberOfColumns, config.getDelimiter().charAt(0), config.getQuoteCharacter().charAt(0));

final InputStream inputStream = createInputStreamAndAppendHeader(csvRowsExcludingHeader, header);

csvCodec.parse(inputStream, eventConsumer);
assertThrows(JsonParseException.class, () -> csvCodec.parse(inputStream, eventConsumer));

final ArgumentCaptor<Record<Event>> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class);
verify(eventConsumer, times(numberOfProperlyFormattedRows)).accept(recordArgumentCaptor.capture());
Expand Down

0 comments on commit bc75494

Please sign in to comment.