Skip to content

Commit

Permalink
Addressed Travis's feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Finn Roblin <[email protected]>
  • Loading branch information
finnroblin committed Aug 5, 2022
1 parent eea0d1f commit cfb92fa
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvParser;
import com.fasterxml.jackson.dataformat.csv.CsvReadException;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import org.slf4j.Logger;
Expand All @@ -31,8 +30,8 @@

@DataPrepperPlugin(name = "csv", pluginType = Codec.class, pluginConfigurationType = CSVCodecConfig.class)
public class CSVCodec implements Codec {
private final CSVCodecConfig config;
private static final Logger LOG = LoggerFactory.getLogger(CSVCodec.class);
private final CSVCodecConfig config;

@DataPrepperPluginConstructor
public CSVCodec(final CSVCodecConfig config) {
Expand All @@ -51,18 +50,11 @@ private void parseBufferedReader(final BufferedReader reader, final Consumer<Rec
final CsvMapper mapper = createCsvMapper();
final CsvSchema schema;
if (config.isDetectHeader()) {
// autodetect header from the first line of csv file
schema = createAutodetectCsvSchema();
schema = createAutodetectHeaderCsvSchema();
}
else {
// construct a header from the pipeline config or autogenerate it
final int defaultBufferSize = 8192; // number of chars before mark is ignored (this is the buffer size, so large header files
// can be read since more buffers will be allocated.)
reader.mark(defaultBufferSize); // getting number of columns of first line will consume the line, so mark initial location

final int firstLineSize = getSizeOfFirstLine(reader.readLine());
reader.reset(); // move reader pointer back to beginning of file in order to parse first line
schema = createCsvSchemaFromConfig(firstLineSize);
final int numberColsFirstLine = getNumberOfColsByMarkingBeginningOfInputStreamAndResettingReaderAfter(reader);
schema = createCsvSchemaFromConfig(numberColsFirstLine);
}

MappingIterator<Map<String, String>> parsingIterator = mapper.readerFor(Map.class).with(schema).readValues(reader);
Expand All @@ -75,6 +67,14 @@ private void parseBufferedReader(final BufferedReader reader, final Consumer<Rec
}
}

private int getNumberOfColsByMarkingBeginningOfInputStreamAndResettingReaderAfter(BufferedReader reader) throws IOException {
final int defaultBufferSize = 8192; // this number doesn't affect even a thousand column header — it's sufficiently large.
reader.mark(defaultBufferSize); // calling reader.readLine() will consume the first line, so mark initial location to reset after
final int firstLineNumCols = extractNumberOfColsFromFirstLine(reader.readLine());
reader.reset(); // move reader pointer back to beginning of file in order to reread first line
return firstLineNumCols;
}

private void readCSVLine(final MappingIterator<Map<String, String>> parsingIterator, final Consumer<Record<Event>> eventConsumer) {
try {
final Map<String, String> parsedLine = parsingIterator.nextValue();
Expand All @@ -84,33 +84,27 @@ private void readCSVLine(final MappingIterator<Map<String, String>> parsingItera
.build();
eventConsumer.accept(new Record<>(event));
} catch (final CsvReadException csvException) {
LOG.error("Invalid CSV row, skipping this line. Consider using the CSV Processor if there might be inconsistencies " +
"in the number of columns because it is more flexible. Error ", csvException);
LOG.error("Invalid CSV row, skipping this line. This typically means the row has too many lines. Consider using the CSV " +
"Processor if there might be inconsistencies in the number of columns because it is more flexible. Error ",
csvException);
} catch (JsonParseException jsonException) {
LOG.error("A JsonParseException occurred while reading a row of the CSV file, skipping line. Error ", jsonException);
LOG.error("A JsonParseException occurred on a row of the CSV file, skipping line. This typically means a quote character was " +
"not properly closed. Error ", jsonException);
} catch (final Exception e) {
LOG.error("An Exception occurred while reading a row of the CSV file. Error ", e);
}
}

private int getSizeOfFirstLine(final String firstLine) {
final CsvMapper firstLineMapper = new CsvMapper();
firstLineMapper.enable(CsvParser.Feature.WRAP_AS_ARRAY); // allows firstLineMapper to read with empty schema
char delimiterAsChar = this.config.getDelimiter().charAt(0);
char quoteCharAsChar = this.config.getQuoteCharacter().charAt(0);
private int extractNumberOfColsFromFirstLine(final String firstLine) {
int numberOfSeperators = 0;
int charPointer = 0;

final CsvSchema getFirstLineLengthSchema = CsvSchema.emptySchema().withColumnSeparator(delimiterAsChar).
withQuoteChar(quoteCharAsChar);

try (final MappingIterator<List<String>> firstLineIterator = firstLineMapper.readerFor(List.class).with(getFirstLineLengthSchema)
.readValues(firstLine)) {
final List<String> parsedFirstLine = firstLineIterator.nextValue();

return parsedFirstLine.size();
} catch (final IOException e) {
LOG.error("An exception occurred while reading first line", e);
return 0;
for ( ; charPointer < firstLine.length(); charPointer++) {
if (firstLine.charAt(charPointer) == config.getDelimiter().charAt(0)) {
numberOfSeperators++;
}
}
return numberOfSeperators + 1;
}

private CsvSchema createCsvSchemaFromConfig(final int firstLineSize) {
Expand Down Expand Up @@ -143,10 +137,9 @@ private CsvMapper createCsvMapper() {
return mapper;
}

private CsvSchema createAutodetectCsvSchema() {
final char delimiterAsChar = config.getDelimiter().charAt(0); // safe due to config input validations
final char quoteCharAsChar = config.getQuoteCharacter().charAt(0); // safe due to config input validations
final CsvSchema schema = CsvSchema.emptySchema().withColumnSeparator(delimiterAsChar).withQuoteChar(quoteCharAsChar).withHeader();
private CsvSchema createAutodetectHeaderCsvSchema() {
final CsvSchema schema = CsvSchema.emptySchema().withColumnSeparator(config.getDelimiter().charAt(0))
.withQuoteChar(config.getQuoteCharacter().charAt(0)).withHeader();
return schema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ void setup() {

csvCodec = createObjectUnderTest();
}

// @Test(timeout=10000)
// void test_millionLineFile() {
//
// }
@Test
void test_when_configIsNull_then_throwsException() {
config = null;
Expand Down

0 comments on commit cfb92fa

Please sign in to comment.