diff --git a/data-prepper-plugins/csv-processor/src/main/java/com/amazon/dataprepper/plugins/processor/csv/CSVProcessor.java b/data-prepper-plugins/csv-processor/src/main/java/com/amazon/dataprepper/plugins/processor/csv/CsvProcessor.java similarity index 94% rename from data-prepper-plugins/csv-processor/src/main/java/com/amazon/dataprepper/plugins/processor/csv/CSVProcessor.java rename to data-prepper-plugins/csv-processor/src/main/java/com/amazon/dataprepper/plugins/processor/csv/CsvProcessor.java index 0ab0e70589..c3a151707f 100644 --- a/data-prepper-plugins/csv-processor/src/main/java/com/amazon/dataprepper/plugins/processor/csv/CSVProcessor.java +++ b/data-prepper-plugins/csv-processor/src/main/java/com/amazon/dataprepper/plugins/processor/csv/CsvProcessor.java @@ -29,13 +29,13 @@ * Processor to parse CSV data in Events. * */ -@DataPrepperPlugin(name="csv", pluginType = Processor.class, pluginConfigurationType = CSVProcessorConfig.class) -public class CSVProcessor extends AbstractProcessor, Record> { - private static final Logger LOG = LoggerFactory.getLogger(CSVProcessor.class); - private final CSVProcessorConfig config; +@DataPrepperPlugin(name="csv", pluginType = Processor.class, pluginConfigurationType = CsvProcessorConfig.class) +public class CsvProcessor extends AbstractProcessor, Record> { + private static final Logger LOG = LoggerFactory.getLogger(CsvProcessor.class); + private final CsvProcessorConfig config; @DataPrepperPluginConstructor - public CSVProcessor(final PluginMetrics pluginMetrics, final CSVProcessorConfig config) { + public CsvProcessor(final PluginMetrics pluginMetrics, final CsvProcessorConfig config) { super(pluginMetrics); this.config = config; } diff --git a/data-prepper-plugins/csv-processor/src/main/java/com/amazon/dataprepper/plugins/processor/csv/CSVProcessorConfig.java b/data-prepper-plugins/csv-processor/src/main/java/com/amazon/dataprepper/plugins/processor/csv/CsvProcessorConfig.java similarity index 98% rename from data-prepper-plugins/csv-processor/src/main/java/com/amazon/dataprepper/plugins/processor/csv/CSVProcessorConfig.java rename to data-prepper-plugins/csv-processor/src/main/java/com/amazon/dataprepper/plugins/processor/csv/CsvProcessorConfig.java index aa737f455a..39af15dd1d 100644 --- a/data-prepper-plugins/csv-processor/src/main/java/com/amazon/dataprepper/plugins/processor/csv/CSVProcessorConfig.java +++ b/data-prepper-plugins/csv-processor/src/main/java/com/amazon/dataprepper/plugins/processor/csv/CsvProcessorConfig.java @@ -9,7 +9,7 @@ import jakarta.validation.constraints.AssertTrue; import java.util.List; -public class CSVProcessorConfig { +public class CsvProcessorConfig { static final String DEFAULT_SOURCE = "message"; static final String DEFAULT_DELIMITER = ","; static final String DEFAULT_QUOTE_CHARACTER = "\""; // double quote diff --git a/data-prepper-plugins/csv-processor/src/test/java/com/amazon/dataprepper/plugins/processor/csv/CSVProcessorConfigTest.java b/data-prepper-plugins/csv-processor/src/test/java/com/amazon/dataprepper/plugins/processor/csv/CsvProcessorConfigTest.java similarity index 79% rename from data-prepper-plugins/csv-processor/src/test/java/com/amazon/dataprepper/plugins/processor/csv/CSVProcessorConfigTest.java rename to data-prepper-plugins/csv-processor/src/test/java/com/amazon/dataprepper/plugins/processor/csv/CsvProcessorConfigTest.java index b2b2140561..04dd804c72 100644 --- a/data-prepper-plugins/csv-processor/src/test/java/com/amazon/dataprepper/plugins/processor/csv/CSVProcessorConfigTest.java +++ b/data-prepper-plugins/csv-processor/src/test/java/com/amazon/dataprepper/plugins/processor/csv/CsvProcessorConfigTest.java @@ -10,22 +10,22 @@ import java.lang.reflect.Field; -import static com.amazon.dataprepper.plugins.processor.csv.CSVProcessorConfig.DEFAULT_SOURCE; -import static com.amazon.dataprepper.plugins.processor.csv.CSVProcessorConfig.DEFAULT_DELIMITER; -import static com.amazon.dataprepper.plugins.processor.csv.CSVProcessorConfig.DEFAULT_QUOTE_CHARACTER; -import static com.amazon.dataprepper.plugins.processor.csv.CSVProcessorConfig.DEFAULT_DELETE_HEADERS; +import static com.amazon.dataprepper.plugins.processor.csv.CsvProcessorConfig.DEFAULT_SOURCE; +import static com.amazon.dataprepper.plugins.processor.csv.CsvProcessorConfig.DEFAULT_DELIMITER; +import static com.amazon.dataprepper.plugins.processor.csv.CsvProcessorConfig.DEFAULT_QUOTE_CHARACTER; +import static com.amazon.dataprepper.plugins.processor.csv.CsvProcessorConfig.DEFAULT_DELETE_HEADERS; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -public class CSVProcessorConfigTest { +public class CsvProcessorConfigTest { - private CSVProcessorConfig createObjectUnderTest() { - return new CSVProcessorConfig(); + private CsvProcessorConfig createObjectUnderTest() { + return new CsvProcessorConfig(); } @Test - public void test_when_defaultCSVProcessorConfig_then_returns_default_values() { - final CSVProcessorConfig objectUnderTest = createObjectUnderTest(); + public void test_when_defaultCsvProcessorConfig_then_returns_default_values() { + final CsvProcessorConfig objectUnderTest = createObjectUnderTest(); assertThat(objectUnderTest.getSource(), equalTo(DEFAULT_SOURCE)); assertThat(objectUnderTest.getDelimiter(), equalTo(DEFAULT_DELIMITER)); @@ -37,7 +37,7 @@ public void test_when_defaultCSVProcessorConfig_then_returns_default_values() { @Nested class Validation { - final CSVProcessorConfig csvProcessorConfig = createObjectUnderTest(); + final CsvProcessorConfig csvProcessorConfig = createObjectUnderTest(); @Test void isValidDelimiter_should_return_false_if_delimiter_is_multiple_characters() @@ -71,9 +71,9 @@ void isValidQuoteCharacter_should_return_true_if_quote_char_is_single_character( } } - private void reflectivelySetField(final CSVProcessorConfig csvProcessorConfig, final String fieldName, final Object value) + private void reflectivelySetField(final CsvProcessorConfig csvProcessorConfig, final String fieldName, final Object value) throws NoSuchFieldException, IllegalAccessException { - final Field field = CSVProcessorConfig.class.getDeclaredField(fieldName); + final Field field = CsvProcessorConfig.class.getDeclaredField(fieldName); try { field.setAccessible(true); field.set(csvProcessorConfig, value); diff --git a/data-prepper-plugins/csv-processor/src/test/java/com/amazon/dataprepper/plugins/processor/csv/CSVProcessorTest.java b/data-prepper-plugins/csv-processor/src/test/java/com/amazon/dataprepper/plugins/processor/csv/CsvProcessorTest.java similarity index 98% rename from data-prepper-plugins/csv-processor/src/test/java/com/amazon/dataprepper/plugins/processor/csv/CSVProcessorTest.java rename to data-prepper-plugins/csv-processor/src/test/java/com/amazon/dataprepper/plugins/processor/csv/CsvProcessorTest.java index e7c2481d52..e6b3d03138 100644 --- a/data-prepper-plugins/csv-processor/src/test/java/com/amazon/dataprepper/plugins/processor/csv/CSVProcessorTest.java +++ b/data-prepper-plugins/csv-processor/src/test/java/com/amazon/dataprepper/plugins/processor/csv/CsvProcessorTest.java @@ -27,18 +27,18 @@ import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) -class CSVProcessorTest { +class CsvProcessorTest { @Mock - private CSVProcessorConfig processorConfig; + private CsvProcessorConfig processorConfig; @Mock private PluginMetrics pluginMetrics; - private CSVProcessor csvProcessor; + private CsvProcessor csvProcessor; @BeforeEach void setup() { - CSVProcessorConfig defaultConfig = new CSVProcessorConfig(); + CsvProcessorConfig defaultConfig = new CsvProcessorConfig(); lenient().when(processorConfig.getSource()).thenReturn(defaultConfig.getSource()); lenient().when(processorConfig.getDelimiter()).thenReturn(defaultConfig.getDelimiter()); lenient().when(processorConfig.isDeleteHeader()).thenReturn(defaultConfig.isDeleteHeader()); @@ -49,8 +49,8 @@ void setup() { csvProcessor = createObjectUnderTest(); } - private CSVProcessor createObjectUnderTest() { - return new CSVProcessor(pluginMetrics, processorConfig); + private CsvProcessor createObjectUnderTest() { + return new CsvProcessor(pluginMetrics, processorConfig); } @Test diff --git a/data-prepper-plugins/s3-source/build.gradle b/data-prepper-plugins/s3-source/build.gradle index 038f208f15..bcc916bb12 100644 --- a/data-prepper-plugins/s3-source/build.gradle +++ b/data-prepper-plugins/s3-source/build.gradle @@ -20,6 +20,7 @@ dependencies { implementation 'com.amazonaws:aws-java-sdk-s3:1.12.257' implementation 'org.apache.commons:commons-compress:1.21' implementation 'org.hibernate.validator:hibernate-validator:7.0.4.Final' + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv' testImplementation 'org.apache.commons:commons-lang3:3.12.0' } diff --git a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/codec/CsvCodec.java b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/codec/CsvCodec.java new file mode 100644 index 0000000000..d60be2e686 --- /dev/null +++ b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/codec/CsvCodec.java @@ -0,0 +1,143 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.source.codec; + +import com.amazon.dataprepper.model.annotations.DataPrepperPlugin; +import com.amazon.dataprepper.model.annotations.DataPrepperPluginConstructor; +import com.amazon.dataprepper.model.event.Event; +import com.amazon.dataprepper.model.log.JacksonLog; +import com.amazon.dataprepper.model.record.Record; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvReadException; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; + +@DataPrepperPlugin(name = "csv", pluginType = Codec.class, pluginConfigurationType = CsvCodecConfig.class) +public class CsvCodec implements Codec { + private static final Logger LOG = LoggerFactory.getLogger(CsvCodec.class); + private final CsvCodecConfig config; + + @DataPrepperPluginConstructor + public CsvCodec(final CsvCodecConfig config) { + Objects.requireNonNull(config); + this.config = config; + } + + @Override + public void parse(final InputStream inputStream, final Consumer> eventConsumer) throws IOException { + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { + parseBufferedReader(reader, eventConsumer); + } + } + + private void parseBufferedReader(final BufferedReader reader, final Consumer> eventConsumer) throws IOException { + final CsvMapper mapper = createCsvMapper(); + final CsvSchema schema; + if (config.isDetectHeader()) { + schema = createAutodetectHeaderCsvSchema(); + } + else { + final int numberColumnsFirstLine = getNumberOfColumnsByMarkingBeginningOfInputStreamAndResettingReaderAfter(reader); + schema = createCsvSchemaFromConfig(numberColumnsFirstLine); + } + + MappingIterator> parsingIterator = mapper.readerFor(Map.class).with(schema).readValues(reader); + try { + while (parsingIterator.hasNextValue()) { + readCsvLine(parsingIterator, eventConsumer); + } + } catch (Exception jsonExceptionOnHasNextLine) { + LOG.error("An Exception occurred while determining if file has next line ", jsonExceptionOnHasNextLine); + } + } + + private int getNumberOfColumnsByMarkingBeginningOfInputStreamAndResettingReaderAfter(BufferedReader reader) throws IOException { + final int defaultBufferSize = 8192; // this number doesn't affect even a thousand column header — it's sufficiently large. + reader.mark(defaultBufferSize); // calling reader.readLine() will consume the first line, so mark initial location to reset after + final int firstLineNumberColumns = extractNumberOfColumnsFromFirstLine(reader.readLine()); + reader.reset(); // move reader pointer back to beginning of file in order to reread first line + return firstLineNumberColumns; + } + + private void readCsvLine(final MappingIterator> parsingIterator, final Consumer> eventConsumer) { + try { + final Map parsedLine = parsingIterator.nextValue(); + + final Event event = JacksonLog.builder() + .withData(parsedLine) + .build(); + eventConsumer.accept(new Record<>(event)); + } catch (final CsvReadException csvException) { + LOG.error("Invalid CSV row, skipping this line. This typically means the row has too many lines. Consider using the CSV " + + "Processor if there might be inconsistencies in the number of columns because it is more flexible. Error ", + csvException); + } catch (JsonParseException jsonException) { + LOG.error("A JsonParseException occurred on a row of the CSV file, skipping line. This typically means a quote character was " + + "not properly closed. Error ", jsonException); + } catch (final Exception e) { + LOG.error("An Exception occurred while reading a row of the CSV file. Error ", e); + } + } + + private int extractNumberOfColumnsFromFirstLine(final String firstLine) { + int numberOfSeparators = 0; + for (int charPointer = 0; charPointer < firstLine.length(); charPointer++) { + if (firstLine.charAt(charPointer) == config.getDelimiter().charAt(0)) { + numberOfSeparators++; + } + } + return numberOfSeparators + 1; + } + + private CsvSchema createCsvSchemaFromConfig(final int firstLineSize) { + final List userSpecifiedHeader = Objects.isNull(config.getHeader()) ? new ArrayList<>() : config.getHeader(); + final List actualHeader = new ArrayList<>(); + final char delimiter = config.getDelimiter().charAt(0); + final char quoteCharacter = config.getQuoteCharacter().charAt(0); + int providedHeaderColIdx = 0; + for (; providedHeaderColIdx < userSpecifiedHeader.size() && providedHeaderColIdx < firstLineSize; providedHeaderColIdx++) { + actualHeader.add(userSpecifiedHeader.get(providedHeaderColIdx)); + } + for (int remainingColIdx = providedHeaderColIdx; remainingColIdx < firstLineSize; remainingColIdx++) { + actualHeader.add(generateColumnHeader(remainingColIdx)); + } + CsvSchema.Builder headerBuilder = CsvSchema.builder(); + for (String columnName : actualHeader) { + headerBuilder = headerBuilder.addColumn(columnName); + } + CsvSchema schema = headerBuilder.build().withColumnSeparator(delimiter).withQuoteChar(quoteCharacter); + return schema; + } + + private String generateColumnHeader(final int columnNumber) { + final int displayColumnNumber = columnNumber + 1; // auto generated column name indices start from 1 (not 0) + return "column" + displayColumnNumber; + } + + private CsvMapper createCsvMapper() { + final CsvMapper mapper = new CsvMapper(); + return mapper; + } + + private CsvSchema createAutodetectHeaderCsvSchema() { + final CsvSchema schema = CsvSchema.emptySchema().withColumnSeparator(config.getDelimiter().charAt(0)) + .withQuoteChar(config.getQuoteCharacter().charAt(0)).withHeader(); + return schema; + } +} diff --git a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/codec/CsvCodecConfig.java b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/codec/CsvCodecConfig.java new file mode 100644 index 0000000000..f46a5b3cb2 --- /dev/null +++ b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/codec/CsvCodecConfig.java @@ -0,0 +1,67 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.source.codec; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.AssertTrue; + +import java.util.List; +import java.util.Objects; + +public class CsvCodecConfig { + static final String DEFAULT_DELIMITER = ","; + static final String DEFAULT_QUOTE_CHARACTER = "\""; // double quote + static final Boolean DEFAULT_DETECT_HEADER = true; + + @JsonProperty("delimiter") + private String delimiter = DEFAULT_DELIMITER; + + @JsonProperty("quote_character") + private String quoteCharacter = DEFAULT_QUOTE_CHARACTER; + + @JsonProperty("header") + private List header; + + @JsonProperty("detect_header") + private Boolean detectHeader = DEFAULT_DETECT_HEADER; + + public String getDelimiter() { + return delimiter; + } + + public String getQuoteCharacter() { + return quoteCharacter; + } + + public List getHeader() { + return header; + } + + public Boolean isDetectHeader() { + return detectHeader; + } + + @AssertTrue(message = "delimiter must be exactly one character.") + boolean isValidDelimiter() { + return delimiter.length() == 1; + } + + @AssertTrue(message = "quote_character must be exactly one character.") + boolean isValidQuoteCharacter() { + return quoteCharacter.length() == 1; + } + + @AssertTrue(message = "quote_character and delimiter cannot be the same character.") + boolean areDelimiterAndQuoteCharacterDifferent() { + return !(delimiter.equals(quoteCharacter)); + } + + @AssertTrue(message = "header must not be an empty list. To autogenerate columns, set detect_header: false and delete header " + + "from config.") + boolean isValidHeader() { + return Objects.isNull(header) || header.size() > 0; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/codec/CsvCodecTest.java b/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/codec/CsvCodecTest.java new file mode 100644 index 0000000000..56124a543a --- /dev/null +++ b/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/codec/CsvCodecTest.java @@ -0,0 +1,697 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.source.codec; + +import com.amazon.dataprepper.model.event.Event; +import com.amazon.dataprepper.model.event.EventType; +import com.amazon.dataprepper.model.record.Record; +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class CsvCodecTest { + @Mock + private CsvCodecConfig config; + @Mock + private Consumer> eventConsumer; + private CsvCodec csvCodec; + private CsvCodec createObjectUnderTest() { + return new CsvCodec(config); + } + + @BeforeEach + void setup() { + CsvCodecConfig defaultCsvCodecConfig = new CsvCodecConfig(); + lenient().when(config.getDelimiter()).thenReturn(defaultCsvCodecConfig.getDelimiter()); + lenient().when(config.getQuoteCharacter()).thenReturn(defaultCsvCodecConfig.getQuoteCharacter()); + lenient().when(config.getHeader()).thenReturn(defaultCsvCodecConfig.getHeader()); + lenient().when(config.isDetectHeader()).thenReturn(defaultCsvCodecConfig.isDetectHeader()); + + csvCodec = createObjectUnderTest(); + } + + @Test + void test_when_configIsNull_then_throwsException() { + config = null; + + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Test + void test_when_nullInputStream_then_throwsException() { + assertThrows(NullPointerException.class, () -> + csvCodec.parse(null, eventConsumer)); + + verifyNoInteractions(eventConsumer); + } + + @Test + void test_when_noAutoDetectHeaderOrUserSpecifiedHeader_then_autogeneratesHeaderAndParsesCorrectly() throws IOException { + when(config.isDetectHeader()).thenReturn(Boolean.FALSE); + + final int numberOfRows = 10; + final int numberOfColumns = 50; + + final List csvRowsExcludingHeader = generateCsvLinesAsList(numberOfRows, numberOfColumns); + final InputStream inputStream = createInputStream(csvRowsExcludingHeader); + csvCodec.parse(inputStream, eventConsumer); + + final ArgumentCaptor> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + verify(eventConsumer, times(numberOfRows)).accept(recordArgumentCaptor.capture()); + + final List> actualRecords = recordArgumentCaptor.getAllValues(); + + assertThat(actualRecords.size(), equalTo(numberOfRows)); + final List autoGeneratedHeader = createAutoGeneratedHeader(numberOfColumns); + for (int i = 0; i < actualRecords.size(); i++) { + + final Record actualRecord = actualRecords.get(i); + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getData(), notNullValue()); + assertThat(actualRecord.getData().getMetadata(), notNullValue()); + assertThat(actualRecord.getData().getMetadata().getEventType(), equalTo(EventType.LOG.toString())); + + final Map expectedMap = createExpectedMapFromHeaderList(csvRowsExcludingHeader.get(i), autoGeneratedHeader); + assertThat(actualRecord.getData().toMap(), equalTo(expectedMap)); + } + } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 10, 100}) + void test_when_autoDetectHeaderHappyCase_then_callsConsumerWithParsedEvents(final int numberOfRows) throws IOException { + when(config.isDetectHeader()).thenReturn(Boolean.TRUE); + + final int numberOfColumns = 50; + final List csvRowsExcludingHeader = generateCsvLinesAsList(numberOfRows, numberOfColumns); + final String header = generateHeader(numberOfColumns); + final InputStream inputStream = createInputStreamAndAppendHeader(csvRowsExcludingHeader, header); + csvCodec.parse(inputStream, eventConsumer); + + final ArgumentCaptor> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + verify(eventConsumer, times(numberOfRows)).accept(recordArgumentCaptor.capture()); + + final List> actualRecords = recordArgumentCaptor.getAllValues(); + + assertThat(actualRecords.size(), equalTo(numberOfRows)); + + for (int i = 0; i < actualRecords.size(); i++) { + + final Record actualRecord = actualRecords.get(i); + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getData(), notNullValue()); + assertThat(actualRecord.getData().getMetadata(), notNullValue()); + assertThat(actualRecord.getData().getMetadata().getEventType(), equalTo(EventType.LOG.toString())); + + final Map expectedMap = createExpectedMap(csvRowsExcludingHeader.get(i), header); + assertThat(actualRecord.getData().toMap(), equalTo(expectedMap)); + } + } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 10, 100}) + void test_when_manualHeaderHappyCase_then_callsConsumerWithParsedEvents(final int numberOfRows) throws IOException { + when(config.isDetectHeader()).thenReturn(Boolean.FALSE); + + final int numberOfColumns = 50; + final List header = generateHeaderAsList(numberOfColumns); + + when(config.getHeader()).thenReturn(header); + final List csvRowsExcludingHeader = generateCsvLinesAsList(numberOfRows, numberOfColumns); + final InputStream inputStream = createInputStream(csvRowsExcludingHeader); + csvCodec.parse(inputStream, eventConsumer); + + final ArgumentCaptor> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + verify(eventConsumer, times(numberOfRows)).accept(recordArgumentCaptor.capture()); + + final List> actualRecords = recordArgumentCaptor.getAllValues(); + + assertThat(actualRecords.size(), equalTo(numberOfRows)); + + for (int i = 0; i < actualRecords.size(); i++) { + + final Record actualRecord = actualRecords.get(i); + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getData(), notNullValue()); + assertThat(actualRecord.getData().getMetadata(), notNullValue()); + assertThat(actualRecord.getData().getMetadata().getEventType(), equalTo(EventType.LOG.toString())); + + final Map expectedMap = createExpectedMapFromHeaderList(csvRowsExcludingHeader.get(i), header); + assertThat(actualRecord.getData().toMap(), equalTo(expectedMap)); + } + } + + + @ParameterizedTest + @ValueSource(ints = {1, 2, 10, 100}) + void test_when_manualHeaderTooFewColumns_then_autoGenerateRemainingColumnsOnParsedEvents(final int numberOfRows) throws IOException { + when(config.isDetectHeader()).thenReturn(Boolean.FALSE); + + final int numberOfColumns = 50; + final List header = generateHeaderAsList(numberOfColumns-10); + + when(config.getHeader()).thenReturn(header); + final List csvRowsExcludingHeader = generateCsvLinesAsList(numberOfRows, numberOfColumns); + final InputStream inputStream = createInputStream(csvRowsExcludingHeader); + csvCodec.parse(inputStream, eventConsumer); + + final ArgumentCaptor> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + verify(eventConsumer, times(numberOfRows)).accept(recordArgumentCaptor.capture()); + + final List> actualRecords = recordArgumentCaptor.getAllValues(); + final List actualHeader = addExtraAutogeneratedColumnsToExistingHeader(header, 10); + assertThat(actualRecords.size(), equalTo(numberOfRows)); + + for (int i = 0; i < actualRecords.size(); i++) { + + final Record actualRecord = actualRecords.get(i); + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getData(), notNullValue()); + assertThat(actualRecord.getData().getMetadata(), notNullValue()); + assertThat(actualRecord.getData().getMetadata().getEventType(), equalTo(EventType.LOG.toString())); + + final Map expectedMap = createExpectedMapFromHeaderList(csvRowsExcludingHeader.get(i), actualHeader); + assertThat(actualRecord.getData().toMap(), equalTo(expectedMap)); + } + } + + @Test + void test_when_autoDetectHeaderWithMoreColumnsThenBufferCapacity_then_parsesEntireHeader() throws IOException { + when(config.isDetectHeader()).thenReturn(Boolean.TRUE); + + final int numberOfColumns = 10000; + + final int numberOfRows = 2; + + final List csvRowsExcludingHeader = generateCsvLinesAsList(numberOfRows, numberOfColumns); + final String header = generateHeader(numberOfColumns); + + final InputStream inputStream = createInputStreamAndAppendHeader(csvRowsExcludingHeader, header); + csvCodec.parse(inputStream, eventConsumer); + + final ArgumentCaptor> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + verify(eventConsumer, times(numberOfRows)).accept(recordArgumentCaptor.capture()); + + final List> actualRecords = recordArgumentCaptor.getAllValues(); + + assertThat(actualRecords.size(), equalTo(numberOfRows)); + + for (int i = 0; i < actualRecords.size(); i++) { + + final Record actualRecord = actualRecords.get(i); + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getData(), notNullValue()); + assertThat(actualRecord.getData().getMetadata(), notNullValue()); + assertThat(actualRecord.getData().getMetadata().getEventType(), equalTo(EventType.LOG.toString())); + + final Map expectedMap = createExpectedMap(csvRowsExcludingHeader.get(i), header); + assertThat(actualRecord.getData().toMap(), equalTo(expectedMap)); + } + } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 10, 100}) + void test_when_manualHeaderTooManyColumns_then_omitsExtraColumnsOnParsedEvents(final int numberOfRows) throws IOException { + when(config.isDetectHeader()).thenReturn(Boolean.FALSE); + + final int numberOfColumns = 50; + final List header = generateHeaderAsList(numberOfColumns+10); + + when(config.getHeader()).thenReturn(header); + final List csvRowsExcludingHeader = generateCsvLinesAsList(numberOfRows, numberOfColumns); + final InputStream inputStream = createInputStream(csvRowsExcludingHeader); + csvCodec.parse(inputStream, eventConsumer); + + final ArgumentCaptor> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + verify(eventConsumer, times(numberOfRows)).accept(recordArgumentCaptor.capture()); + + final List> actualRecords = recordArgumentCaptor.getAllValues(); + final List actualHeader = header.subList(0,50); + assertThat(actualRecords.size(), equalTo(numberOfRows)); + + for (int i = 0; i < actualRecords.size(); i++) { + + final Record actualRecord = actualRecords.get(i); + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getData(), notNullValue()); + assertThat(actualRecord.getData().getMetadata(), notNullValue()); + assertThat(actualRecord.getData().getMetadata().getEventType(), equalTo(EventType.LOG.toString())); + + final Map expectedMap = createExpectedMapFromHeaderList(csvRowsExcludingHeader.get(i), actualHeader); + assertThat(actualRecord.getData().toMap(), equalTo(expectedMap)); + } + } + + @ParameterizedTest + @ValueSource(ints = {2, 10, 100}) + void test_when_autoDetectHeaderWrongNumberColumnsAndJaggedRows_then_skipsJaggedRows(final int numberOfRows) throws IOException { + // row that's longer than header should be skipped + when(config.isDetectHeader()).thenReturn(Boolean.TRUE); + + final int numberOfColumns = 5; + final int numberOfProperlyFormattedRows = numberOfRows-1; + final List csvRowsExcludingHeaderImmutable = generateCsvLinesAsList(numberOfProperlyFormattedRows, numberOfColumns); + final List csvRowsExcludingHeader = new ArrayList<>(csvRowsExcludingHeaderImmutable); + + csvRowsExcludingHeader.add(generateCsvLine(numberOfColumns+1, config.getDelimiter(), config.getQuoteCharacter())); + + + final String header = generateHeader(numberOfColumns); + + final InputStream inputStream = createInputStreamAndAppendHeader(csvRowsExcludingHeader, header); + + csvCodec.parse(inputStream, eventConsumer); + + final ArgumentCaptor> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + + verify(eventConsumer, times(numberOfProperlyFormattedRows)).accept(recordArgumentCaptor.capture()); + + final List> actualRecords = recordArgumentCaptor.getAllValues(); + + assertThat(actualRecords.size(), equalTo(numberOfProperlyFormattedRows)); + + for (int i = 0; i < actualRecords.size(); i++) { + + final Record actualRecord = actualRecords.get(i); + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getData(), notNullValue()); + assertThat(actualRecord.getData().getMetadata(), notNullValue()); + assertThat(actualRecord.getData().getMetadata().getEventType(), equalTo(EventType.LOG.toString())); + + final Map expectedMap = createExpectedMap(csvRowsExcludingHeader.get(i), header); + assertThat(actualRecord.getData().toMap(), equalTo(expectedMap)); + } + } + + + @ParameterizedTest + @ValueSource(ints = {2, 10, 100}) + void test_when_manualHeaderWrongNumberColumnsAndJaggedRows_then_skipsJaggedRows(final int numberOfRows) throws IOException { + // row that's longer than header should be skipped + when(config.isDetectHeader()).thenReturn(Boolean.FALSE); + + final int numberOfColumns = 5; + + final List header = generateHeaderAsList(numberOfColumns); + + when(config.getHeader()).thenReturn(header); + final int numberOfProperlyFormattedRows = numberOfRows-1; + final List csvRowsExcludingHeaderImmutable = generateCsvLinesAsList(numberOfProperlyFormattedRows, numberOfColumns); + final List csvRowsExcludingHeader = new ArrayList<>(csvRowsExcludingHeaderImmutable); + + csvRowsExcludingHeader.add(generateCsvLine(numberOfColumns+1, config.getDelimiter(), config.getQuoteCharacter())); + + final InputStream inputStream = createInputStream(csvRowsExcludingHeader); + + csvCodec.parse(inputStream, eventConsumer); + + final ArgumentCaptor> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + + verify(eventConsumer, times(numberOfProperlyFormattedRows)).accept(recordArgumentCaptor.capture()); + + + final List> actualRecords = recordArgumentCaptor.getAllValues(); + + assertThat(actualRecords.size(), equalTo(numberOfProperlyFormattedRows)); + + for (int i = 0; i < actualRecords.size(); i++) { + + final Record actualRecord = actualRecords.get(i); + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getData(), notNullValue()); + assertThat(actualRecord.getData().getMetadata(), notNullValue()); + assertThat(actualRecord.getData().getMetadata().getEventType(), equalTo(EventType.LOG.toString())); + + final Map expectedMap = createExpectedMapFromHeaderList(csvRowsExcludingHeader.get(i), header); + assertThat(actualRecord.getData().toMap(), equalTo(expectedMap)); + } + } + + @Test + void test_when_tooFewColumns_then_parsedCorrectly() throws IOException { + when(config.isDetectHeader()).thenReturn(Boolean.FALSE); + + final int numberOfColumns = 5; + + final List header = generateHeaderAsList(numberOfColumns); + + when(config.getHeader()).thenReturn(header); + + final int numberOfRows = 1; + + final List csvRowsExcludingHeader = new ArrayList<>(); + csvRowsExcludingHeader.add(generateCsvLine(numberOfColumns-1, config.getDelimiter(), config.getQuoteCharacter())); + final InputStream inputStream = createInputStream(csvRowsExcludingHeader); + csvCodec.parse(inputStream, eventConsumer); + + final ArgumentCaptor> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + verify(eventConsumer, times(numberOfRows)).accept(recordArgumentCaptor.capture()); + final List> actualRecords = recordArgumentCaptor.getAllValues(); + assertThat(actualRecords.size(), equalTo(numberOfRows)); + } + + @ParameterizedTest + @ValueSource(ints = {2, 10, 100}) + void test_when_unrecoverableRow_then_logsExceptionAndSkipsOffendingRow(final int numberOfRows) throws IOException { + // If there's an unclosed quote then expected behavior is to skip that row + when(config.isDetectHeader()).thenReturn(Boolean.TRUE); + when(config.getQuoteCharacter()).thenReturn(";"); // ";" is also the array character + final int numberOfColumns = 5; + final int numberOfProperlyFormattedRows = numberOfRows; + final List csvRowsExcludingHeaderImmutable = generateCsvLinesAsList(numberOfProperlyFormattedRows, numberOfColumns, + config.getDelimiter(), config.getQuoteCharacter()); + final List csvRowsExcludingHeader = new ArrayList<>(csvRowsExcludingHeaderImmutable); + csvRowsExcludingHeader.add(generateIllegalString(numberOfColumns, config.getDelimiter(), config.getQuoteCharacter())); + csvRowsExcludingHeader.add(generateCsvLine(numberOfColumns, config.getDelimiter(), config.getQuoteCharacter())); + final String header = generateHeader(numberOfColumns, config.getDelimiter().charAt(0), config.getQuoteCharacter().charAt(0)); + + final InputStream inputStream = createInputStreamAndAppendHeader(csvRowsExcludingHeader, header); + + csvCodec.parse(inputStream, eventConsumer); + + final ArgumentCaptor> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + verify(eventConsumer, times(numberOfProperlyFormattedRows)).accept(recordArgumentCaptor.capture()); + + final List> actualRecords = recordArgumentCaptor.getAllValues(); + + assertThat(actualRecords.size(), equalTo(numberOfProperlyFormattedRows)); + + for (int i = 0; i < actualRecords.size(); i++) { + + final Record actualRecord = actualRecords.get(i); + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getData(), notNullValue()); + assertThat(actualRecord.getData().getMetadata(), notNullValue()); + assertThat(actualRecord.getData().getMetadata().getEventType(), equalTo(EventType.LOG.toString())); + + final Map expectedMap = createExpectedMap(csvRowsExcludingHeader.get(i), + header, config.getDelimiter().charAt(0), config.getQuoteCharacter().charAt(0)); + assertThat(actualRecord.getData().toMap(), equalTo(expectedMap)); + } + } + + private String generateIllegalString(final int numberOfColumns, final String delimiter, final String quoteCharacter) { + + StringBuilder thisRow = new StringBuilder(quoteCharacter + UUID.randomUUID().toString() + quoteCharacter); + + for (int col = 1; col < numberOfColumns; col++) { + String strToAppend = delimiter + quoteCharacter + UUID.randomUUID().toString(); + thisRow.append(strToAppend); + } + return thisRow.toString(); + } + + @ParameterizedTest + @ValueSource(ints = {2, 10, 100}) + void test_when_emptyLineWithCorrectNumberDelimiters_then_parsesAsEmpty() throws IOException { + final int numberOfColumns = 5; + final int numberOfRows = 10; + + final List csvRowsExcludingHeaderImmutable = generateCsvLinesAsList(numberOfRows-2, numberOfColumns); + final List csvRowsExcludingHeader = new ArrayList<>(csvRowsExcludingHeaderImmutable); + csvRowsExcludingHeader.add(",,,,"); + csvRowsExcludingHeader.add(generateCsvLine(numberOfColumns, config.getDelimiter(), config.getQuoteCharacter())); + + final String header = generateHeader(numberOfColumns); + final InputStream inputStream = createInputStreamAndAppendHeader(csvRowsExcludingHeader, header); + csvCodec.parse(inputStream, eventConsumer); + + final ArgumentCaptor> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + verify(eventConsumer, times(numberOfRows)).accept(recordArgumentCaptor.capture()); + + final List> actualRecords = recordArgumentCaptor.getAllValues(); + + assertThat(actualRecords.size(), equalTo(numberOfRows)); + + for (int i = 0; i < actualRecords.size(); i++) { + + final Record actualRecord = actualRecords.get(i); + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getData(), notNullValue()); + assertThat(actualRecord.getData().getMetadata(), notNullValue()); + assertThat(actualRecord.getData().getMetadata().getEventType(), equalTo(EventType.LOG.toString())); + + final Map expectedMap = createExpectedMap(csvRowsExcludingHeader.get(i), header); + assertThat(actualRecord.getData().toMap(), equalTo(expectedMap)); + } + } + + @Test + void test_when_manualHeaderDifferentDelimiterAndQuoteCharacter_then_parsesCorrectly() throws IOException { + when(config.isDetectHeader()).thenReturn(Boolean.FALSE); + final String delimiter = "\t"; + final String quoteCharacter = "'"; + when(config.getDelimiter()).thenReturn(delimiter); + when(config.getQuoteCharacter()).thenReturn(quoteCharacter); + + final int numberOfColumns = 50; + final List header = generateHeaderAsList(numberOfColumns); + when(config.getHeader()).thenReturn(header); + + final int numberOfRows = 10; + final List csvRowsExcludingHeader = generateCsvLinesAsList(numberOfRows, numberOfColumns, delimiter, quoteCharacter); + final InputStream inputStream = createInputStream(csvRowsExcludingHeader); + csvCodec.parse(inputStream, eventConsumer); + + final ArgumentCaptor> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + verify(eventConsumer, times(numberOfRows)).accept(recordArgumentCaptor.capture()); + + final List> actualRecords = recordArgumentCaptor.getAllValues(); + + assertThat(actualRecords.size(), equalTo(numberOfRows)); + + for (int i = 0; i < actualRecords.size(); i++) { + + final Record actualRecord = actualRecords.get(i); + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getData(), notNullValue()); + assertThat(actualRecord.getData().getMetadata(), notNullValue()); + assertThat(actualRecord.getData().getMetadata().getEventType(), equalTo(EventType.LOG.toString())); + + final Map expectedMap = createExpectedMapFromHeaderList(csvRowsExcludingHeader.get(i), header, + delimiter.charAt(0), quoteCharacter.charAt(0)); + assertThat(actualRecord.getData().toMap(), equalTo(expectedMap)); + } + } + + @Test + void test_when_autodetectHeaderDifferentDelimiterAndQuoteCharacter_then_parsesCorrectly() throws IOException { + when(config.isDetectHeader()).thenReturn(Boolean.TRUE); + final String delimiter = "\t"; + final String quoteCharacter = "'"; + when(config.getDelimiter()).thenReturn(delimiter); + when(config.getQuoteCharacter()).thenReturn(quoteCharacter); + + final int numberOfColumns = 50; + final int numberOfRows = 10; + + final List csvRowsExcludingHeader = generateCsvLinesAsList(numberOfRows, numberOfColumns, delimiter, quoteCharacter); + final String header = generateHeader(numberOfColumns, delimiter.charAt(0), quoteCharacter.charAt(0)); + final InputStream inputStream = createInputStreamAndAppendHeader(csvRowsExcludingHeader, header); + csvCodec.parse(inputStream, eventConsumer); + + final ArgumentCaptor> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + verify(eventConsumer, times(numberOfRows)).accept(recordArgumentCaptor.capture()); + + final List> actualRecords = recordArgumentCaptor.getAllValues(); + + assertThat(actualRecords.size(), equalTo(numberOfRows)); + + for (int i = 0; i < actualRecords.size(); i++) { + + final Record actualRecord = actualRecords.get(i); + assertThat(actualRecord, notNullValue()); + assertThat(actualRecord.getData(), notNullValue()); + assertThat(actualRecord.getData().getMetadata(), notNullValue()); + assertThat(actualRecord.getData().getMetadata().getEventType(), equalTo(EventType.LOG.toString())); + + final Map expectedMap = createExpectedMap(csvRowsExcludingHeader.get(i), header, delimiter.charAt(0), + quoteCharacter.charAt(0)); + assertThat(actualRecord.getData().toMap(), equalTo(expectedMap)); + } + } + + private Map createExpectedMapFromHeaderList(final String row, final List header) throws IOException { + final CsvMapper mapper = new CsvMapper(); + mapper.enable(CsvParser.Feature.WRAP_AS_ARRAY); // allows mapper to read with empty schema + final CsvSchema schema = CsvSchema.emptySchema(); + + final MappingIterator> rowIterator = mapper.readerFor(List.class).with(schema).readValues(row); + final List parsedRow = rowIterator.nextValue(); + assertThat(header.size(), equalTo(parsedRow.size())); + + Map expectedMap = new HashMap<>(); + for (int i = 0; i < header.size(); i++) { + expectedMap.put(header.get(i), parsedRow.get(i)); + } + return expectedMap; + } + + private Map createExpectedMapFromHeaderList(final String row, final List header, final char delimiter, + final char quoteCharacter) throws IOException { + final CsvMapper mapper = new CsvMapper(); + mapper.enable(CsvParser.Feature.WRAP_AS_ARRAY); // allows mapper to read with empty schema + final CsvSchema schema = CsvSchema.emptySchema().withColumnSeparator(delimiter).withQuoteChar(quoteCharacter); + + final MappingIterator> rowIterator = mapper.readerFor(List.class).with(schema).readValues(row); + final List parsedRow = rowIterator.nextValue(); + assertThat(header.size(), equalTo(parsedRow.size())); + + Map expectedMap = new HashMap<>(); + for (int i = 0; i < header.size(); i++) { + expectedMap.put(header.get(i), parsedRow.get(i)); + } + return expectedMap; + } + + private List generateHeaderAsList(final int numberOfColumns) { + List header = new ArrayList<>(numberOfColumns); + for (int colNumber = 0; colNumber < numberOfColumns; colNumber++) { + final String thisColName = "list_col" + colNumber; + header.add(thisColName); + } + return header; + } + + private Map createExpectedMap(final String row, final String header) throws IOException { + return createExpectedMap(row, header, ',','"'); + } + + private Map createExpectedMap(final String row, final String header, final char delimiter, + final char quoteCharacter) throws IOException { + final CsvMapper mapper = new CsvMapper(); + mapper.enable(CsvParser.Feature.WRAP_AS_ARRAY); // allows mapper to read with empty schema + final CsvSchema schema = CsvSchema.emptySchema().withQuoteChar(quoteCharacter).withColumnSeparator(delimiter); + + final MappingIterator> headerIterator = mapper.readerFor(List.class).with(schema).readValues(header); + final MappingIterator> rowIterator = mapper.readerFor(List.class).with(schema).readValues(row); + + final List parsedHeader = headerIterator.nextValue(); + final List parsedRow = rowIterator.nextValue(); + assertThat(parsedHeader.size(), equalTo(parsedRow.size())); + + Map expectedMap = new HashMap<>(); + for (int i = 0; i < parsedHeader.size(); i++) { + expectedMap.put(parsedHeader.get(i), parsedRow.get(i)); + } + return expectedMap; + } + + private String generateHeader(final int numberOfColumns) { + return generateHeader(numberOfColumns, ',','"'); + } + + private String generateHeader(final int numberOfColumns, final char delimiter, final char quoteCharacter) { + if (numberOfColumns <= 0) { + return ""; + } + String header = quoteCharacter + "col0" + quoteCharacter; + for (int colNumber = 1; colNumber < numberOfColumns; colNumber++) { + final String thisColName = quoteCharacter + "col" + colNumber + quoteCharacter; + final String thisColWithDelimiter = delimiter + thisColName; + header += thisColWithDelimiter; + } + return header; + } + + private InputStream createInputStreamAndAppendHeader(final List csvRowsExcludingHeader, final String header) + throws IOException { + LinkedList csvRowsWithHeader = new LinkedList<>(csvRowsExcludingHeader); // Linked list for O(1) insertion at front + csvRowsWithHeader.addFirst(header); + + return createInputStream(csvRowsWithHeader); + } + + private InputStream createInputStream(final List rowsIn) throws IOException { + final String inputString = generateMultilineString(rowsIn); + + return new ByteArrayInputStream(inputString.getBytes(StandardCharsets.UTF_8)); + } + + private List generateCsvLinesAsList(final int numberOfRows, final int numberOfColumns) { + return generateCsvLinesAsList(numberOfRows, numberOfColumns, ",", "\""); + } + private List addExtraAutogeneratedColumnsToExistingHeader(final List headerFromConfig, final int numberRemainingColumns) { + List header = new ArrayList<>(headerFromConfig); + final int existingHeaderSize = headerFromConfig.size(); + for (int i = 0; i < numberRemainingColumns; i++) { + final int colNumber = existingHeaderSize + i + 1;// auto generated column name indices start from 1 (not 0) + header.add("column"+colNumber); + } + return header; + } + + private List generateCsvLinesAsList(final int numberOfRows, final int numberOfColumns, final String delimiter, + final String quoteCharacter) { + final List csvRows = new ArrayList<>(numberOfRows); + for (int row = 0; row < numberOfRows; row++) { + csvRows.add(generateCsvLine(numberOfColumns, delimiter, quoteCharacter)); + } + return Collections.unmodifiableList(csvRows); + } + + private String generateCsvLine(final int numberOfColumns, final String delimiter, final String quoteCharacter) { + StringBuilder thisRow = new StringBuilder(quoteCharacter + UUID.randomUUID().toString() + quoteCharacter); + + for (int col = 1; col < numberOfColumns; col++) { + String strToAppend = delimiter + quoteCharacter + UUID.randomUUID().toString() + quoteCharacter; + thisRow.append(strToAppend); + } + return thisRow.toString(); + } + + private String generateMultilineString(final List csvFileRows) { + final StringWriter stringWriter = new StringWriter(); + for (String row : csvFileRows) { + stringWriter.write(row); + stringWriter.write(System.lineSeparator()); + } + + return stringWriter.toString(); + } + + private List createAutoGeneratedHeader(final int numberOfColumns) { + final List header = new ArrayList<>(numberOfColumns); + for (int column = 0; column < numberOfColumns; column++) { + final int displayColumnNumber = column + 1; // auto generated column name indices start from 1 (not 0) + header.add("column" + displayColumnNumber); + } + return header; + } +}