Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented CSVCodec for S3 Source, config & unit tests #1644

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions data-prepper-plugins/s3-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {
implementation 'com.amazonaws:aws-java-sdk-s3:1.12.257'
implementation 'org.apache.commons:commons-compress:1.21'
implementation 'org.hibernate.validator:hibernate-validator:7.0.4.Final'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv'
testImplementation 'org.apache.commons:commons-lang3:3.12.0'
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.source.codec;

import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.annotations.DataPrepperPluginConstructor;
import com.amazon.dataprepper.model.event.Event;
import com.amazon.dataprepper.model.log.JacksonLog;
import com.amazon.dataprepper.model.record.Record;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvParser;
import com.fasterxml.jackson.dataformat.csv.CsvReadException;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;

@DataPrepperPlugin(name = "csv", pluginType = Codec.class, pluginConfigurationType = CSVCodecConfig.class)
public class CSVCodec implements Codec {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use Csv instead of CSV. I think this is more consistent with the current code base and Java conventions. Please change other classes as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this change is more consistent with conventions and the code base. I suppose we should also change the CSVProcessor class to CsvProcessor — is it okay to include these changes in this pull request? I can also open a new pull request to change to CsvProcessor if it's out of scope of this pull request.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A different PR would be better. That will help get this one in faster.

private final CSVCodecConfig config;
private static final Logger LOG = LoggerFactory.getLogger(CSVCodec.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: By convention static variables are listed first


@DataPrepperPluginConstructor
public CSVCodec(final CSVCodecConfig config) {
Objects.requireNonNull(config);
this.config = config;
}

@Override
public void parse(final InputStream inputStream, final Consumer<Record<Event>> eventConsumer) throws IOException {
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
parseBufferedReader(reader, eventConsumer);
}
}

private void parseBufferedReader(final BufferedReader reader, final Consumer<Record<Event>> eventConsumer) throws IOException {
final CsvMapper mapper = createCsvMapper();
final CsvSchema schema;
if (config.isDetectHeader()) {
// autodetect header from the first line of csv file
schema = createAutodetectCsvSchema();
}
else {
// construct a header from the pipeline config or autogenerate it
final int defaultBufferSize = 8192; // number of chars before mark is ignored (this is the buffer size, so large header files
// can be read since more buffers will be allocated.)
reader.mark(defaultBufferSize); // getting number of columns of first line will consume the line, so mark initial location

final int firstLineSize = getSizeOfFirstLine(reader.readLine());
reader.reset(); // move reader pointer back to beginning of file in order to parse first line
schema = createCsvSchemaFromConfig(firstLineSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the first line of the reader contains the header do we need when creating the MappingIterator in the section below? Is the mark and reset stuff necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The branch in the else statement is triggered if there's no header in the csv file. So we want to mark and reset so that the first row of the file can be parsed into parsingIterator a second time (after it's used to figure out the actual number of columns in the csv file, and generate more column names for the header if necessary). I moved this logic to a helper method with a descriptive name which should hopefully improve readability.

}

MappingIterator<Map<String, String>> parsingIterator = mapper.readerFor(Map.class).with(schema).readValues(reader);
try {
while (parsingIterator.hasNextValue()) {
readCSVLine(parsingIterator, eventConsumer);
}
} catch (Exception jsonExceptionOnHasNextLine) {
LOG.error("An Exception occurred while determining if file has next line ", jsonExceptionOnHasNextLine);
}
}

private void readCSVLine(final MappingIterator<Map<String, String>> parsingIterator, final Consumer<Record<Event>> eventConsumer) {
try {
final Map<String, String> parsedLine = parsingIterator.nextValue();

final Event event = JacksonLog.builder()
.withData(parsedLine)
.build();
eventConsumer.accept(new Record<>(event));
} catch (final CsvReadException csvException) {
LOG.error("Invalid CSV row, skipping this line. Consider using the CSV Processor if there might be inconsistencies " +
"in the number of columns because it is more flexible. Error ", csvException);
} catch (JsonParseException jsonException) {
LOG.error("A JsonParseException occurred while reading a row of the CSV file, skipping line. Error ", jsonException);
} catch (final Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might consider combining these into a single catch block. From a user perspective I don't think the differentiation adds much information

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some more details to the logging messages — CsvReadException = too many rows on csv file, JsonParseException = unclosed quote character from my experience testing. Do you think differentiating the two is helpful? I guess the lines get skipped either way, so I get where you're coming from with the user's perspective.

LOG.error("An Exception occurred while reading a row of the CSV file. Error ", e);
}
}

private int getSizeOfFirstLine(final String firstLine) {
final CsvMapper firstLineMapper = new CsvMapper();
firstLineMapper.enable(CsvParser.Feature.WRAP_AS_ARRAY); // allows firstLineMapper to read with empty schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use createCsvMapper here instead. Should .enable(CsvParser.Feature.WRAP_AS_ARRAY) be moved into createCsvMapper like in the CSV processor? https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/csv-processor/src/main/java/com/amazon/dataprepper/plugins/processor/csv/CSVProcessor.java#L91

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed the getSizeOfFirstLine method offline and Travis suggested that we can get the number of columns through counting the occurrences of delimiter with a loop. I made this change, so the method is simpler and no longer uses the CsvMapper class.

char delimiterAsChar = this.config.getDelimiter().charAt(0);
char quoteCharAsChar = this.config.getQuoteCharacter().charAt(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: You could define these as private member variables of this class since this same .get().charAt(0) is reused in several spots.


final CsvSchema getFirstLineLengthSchema = CsvSchema.emptySchema().withColumnSeparator(delimiterAsChar).
withQuoteChar(quoteCharAsChar);

try (final MappingIterator<List<String>> firstLineIterator = firstLineMapper.readerFor(List.class).with(getFirstLineLengthSchema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Rewriting this to put the Iterator instantiation inside the try block would make this more readable

try { 
    final MappingIterator<List<String>> firstLineIterator = firstLineMapper.readerFor(List.class)
        .with(getFirstLineLengthSchema)
        .readValues(firstLine)
    ...
}

.readValues(firstLine)) {
final List<String> parsedFirstLine = firstLineIterator.nextValue();

return parsedFirstLine.size();
} catch (final IOException e) {
LOG.error("An exception occurred while reading first line", e);
return 0;
}
}

private CsvSchema createCsvSchemaFromConfig(final int firstLineSize) {
final List<String> userSpecifiedHeader = config.getHeader();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The configuration option getHeader can be null, and if it is then the call to size() on line 116 will throw an exception. The proper behavior if header is null is to make userSpecifiedHeader an empty ArrayList. I'll fix this bug in the next revision.

final List<String> actualHeader = new ArrayList<>();
final char delimiter = config.getDelimiter().charAt(0);
final char quoteCharacter = config.getQuoteCharacter().charAt(0);
int providedHeaderColIdx = 0;
for (; providedHeaderColIdx < userSpecifiedHeader.size() && providedHeaderColIdx < firstLineSize; providedHeaderColIdx++) {
actualHeader.add(userSpecifiedHeader.get(providedHeaderColIdx));
}
for (int remainingColIdx = providedHeaderColIdx; remainingColIdx < firstLineSize; remainingColIdx++) {
actualHeader.add(generateColumnHeader(remainingColIdx));
}
CsvSchema.Builder headerBuilder = CsvSchema.builder();
for (String columnName : actualHeader) {
headerBuilder = headerBuilder.addColumn(columnName);
}
CsvSchema schema = headerBuilder.build().withColumnSeparator(delimiter).withQuoteChar(quoteCharacter);
return schema;
}

private String generateColumnHeader(final int colNumber) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a good amount of shared logic between this class and the CSV Processor. Could we move some of that shared logic into its own class? Is there a place that make sense to put that?

@asifsmohammed looks like you created an issue for a similar problem #1643

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 and I also noticed this while developing tests — a lot of the code from the Newline and JSON codec unit tests was applicable to the CSV codec unit tests. Sadly I'm not sure where this common code should live. I remember that David Venable suggested packaging the CSV codec and processor together, but I'm not sure how this would look with the CSV processor being its own plugin and the CSV codec attached to S3.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@finnroblin , I believe once we make a generic codec solution (#1532) then we can have one project which has both. As it is now, this code has to reside in the S3 plugin. I'm fine to refactor this later.

final int displayColNumber = colNumber + 1; // auto generated column name indices start from 1 (not 0)
return "column" + displayColNumber;
}

private CsvMapper createCsvMapper() {
final CsvMapper mapper = new CsvMapper();
return mapper;
}

private CsvSchema createAutodetectCsvSchema() {
final char delimiterAsChar = config.getDelimiter().charAt(0); // safe due to config input validations
final char quoteCharAsChar = config.getQuoteCharacter().charAt(0); // safe due to config input validations
final CsvSchema schema = CsvSchema.emptySchema().withColumnSeparator(delimiterAsChar).withQuoteChar(quoteCharAsChar).withHeader();
return schema;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.source.codec;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;

import java.util.List;
import java.util.Objects;

public class CSVCodecConfig {
static final String DEFAULT_DELIMITER = ",";
static final String DEFAULT_QUOTE_CHARACTER = "\""; // double quote
static final Boolean DEFAULT_DETECT_HEADER = true;

@JsonProperty("delimiter")
private String delimiter = DEFAULT_DELIMITER;

@JsonProperty("quote_character")
private String quoteCharacter = DEFAULT_QUOTE_CHARACTER;

@JsonProperty("header")
private List<String> header;

@JsonProperty("detect_header")
private Boolean detectHeader = DEFAULT_DETECT_HEADER;

public String getDelimiter() {
return delimiter;
}

public String getQuoteCharacter() {
return quoteCharacter;
}

public List<String> getHeader() {
return header;
}

public Boolean isDetectHeader() {
return detectHeader;
}

@AssertTrue(message = "delimiter must be exactly one character.")
boolean isValidDelimiter() {
return delimiter.length() == 1;
}

@AssertTrue(message = "quote_character must be exactly one character.")
boolean isValidQuoteCharacter() {
return quoteCharacter.length() == 1;
}

@AssertTrue(message = "quote_character and delimiter cannot be the same character")
boolean areDelimiterAndQuoteCharacterDifferent() {
return !(delimiter.equals(quoteCharacter));
}

@AssertTrue(message = "header must not be an empty list. To autogenerate columns, set detect_header: false and delete header " +
"from config.")
boolean isValidHeader() {
return Objects.isNull(header) || header.size() > 0;
}
}
Loading