Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented CSVCodec for S3 Source, config & unit tests #1644

Merged

Conversation

finnroblin
Copy link
Contributor

Signed-off-by: Finn Roblin [email protected]

Description

Implementation of CSVCodec to parse InputStream from S3 objects. Includes similar configuration options as the CSV Processor, with some slight changes — the CSV file's header/column names can be specified by user with header option, and the header name can be autodetected by setting detect_header to true.

Issues Resolved

Resolves #1617

Check List

  • New functionality includes testing.
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@finnroblin finnroblin requested a review from a team as a code owner August 5, 2022 16:39
@codecov-commenter
Copy link

codecov-commenter commented Aug 5, 2022

Codecov Report

Merging #1644 (cfb92fa) into main (c5ffce3) will not change coverage.
The diff coverage is n/a.

@@            Coverage Diff            @@
##               main    #1644   +/-   ##
=========================================
  Coverage     94.26%   94.26%           
  Complexity     1206     1206           
=========================================
  Files           162      162           
  Lines          3419     3419           
  Branches        276      276           
=========================================
  Hits           3223     3223           
  Misses          134      134           
  Partials         62       62           
Impacted Files Coverage Δ
...dataprepper/model/configuration/PipelineModel.java 100.00% <0.00%> (ø)

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

@DataPrepperPlugin(name = "csv", pluginType = Codec.class, pluginConfigurationType = CSVCodecConfig.class)
public class CSVCodec implements Codec {
private final CSVCodecConfig config;
private static final Logger LOG = LoggerFactory.getLogger(CSVCodec.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: By convention static variables are listed first

Comment on lines 89 to 91
} catch (JsonParseException jsonException) {
LOG.error("A JsonParseException occurred while reading a row of the CSV file, skipping line. Error ", jsonException);
} catch (final Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might consider combining these into a single catch block. From a user perspective I don't think the differentiation adds much information

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some more details to the logging messages — CsvReadException = too many rows on csv file, JsonParseException = unclosed quote character from my experience testing. Do you think differentiating the two is helpful? I guess the lines get skipped either way, so I get where you're coming from with the user's perspective.

Comment on lines 99 to 100
char delimiterAsChar = this.config.getDelimiter().charAt(0);
char quoteCharAsChar = this.config.getQuoteCharacter().charAt(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: You could define these as private member variables of this class since this same .get().charAt(0) is reused in several spots.

Comment on lines 97 to 98
final CsvMapper firstLineMapper = new CsvMapper();
firstLineMapper.enable(CsvParser.Feature.WRAP_AS_ARRAY); // allows firstLineMapper to read with empty schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use createCsvMapper here instead. Should .enable(CsvParser.Feature.WRAP_AS_ARRAY) be moved into createCsvMapper like in the CSV processor? https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/csv-processor/src/main/java/com/amazon/dataprepper/plugins/processor/csv/CSVProcessor.java#L91

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed the getSizeOfFirstLine method offline and Travis suggested that we can get the number of columns through counting the occurrences of delimiter with a loop. I made this change, so the method is simpler and no longer uses the CsvMapper class.

final CsvSchema getFirstLineLengthSchema = CsvSchema.emptySchema().withColumnSeparator(delimiterAsChar).
withQuoteChar(quoteCharAsChar);

try (final MappingIterator<List<String>> firstLineIterator = firstLineMapper.readerFor(List.class).with(getFirstLineLengthSchema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Rewriting this to put the Iterator instantiation inside the try block would make this more readable

try { 
    final MappingIterator<List<String>> firstLineIterator = firstLineMapper.readerFor(List.class)
        .with(getFirstLineLengthSchema)
        .readValues(firstLine)
    ...
}

return schema;
}

private String generateColumnHeader(final int colNumber) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a good amount of shared logic between this class and the CSV Processor. Could we move some of that shared logic into its own class? Is there a place that make sense to put that?

@asifsmohammed looks like you created an issue for a similar problem #1643

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 and I also noticed this while developing tests — a lot of the code from the Newline and JSON codec unit tests was applicable to the CSV codec unit tests. Sadly I'm not sure where this common code should live. I remember that David Venable suggested packaging the CSV codec and processor together, but I'm not sure how this would look with the CSV processor being its own plugin and the CSV codec attached to S3.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@finnroblin , I believe once we make a generic codec solution (#1532) then we can have one project which has both. As it is now, this code has to reside in the S3 plugin. I'm fine to refactor this later.

Comment on lines 58 to 65
// construct a header from the pipeline config or autogenerate it
final int defaultBufferSize = 8192; // number of chars before mark is ignored (this is the buffer size, so large header files
// can be read since more buffers will be allocated.)
reader.mark(defaultBufferSize); // getting number of columns of first line will consume the line, so mark initial location

final int firstLineSize = getSizeOfFirstLine(reader.readLine());
reader.reset(); // move reader pointer back to beginning of file in order to parse first line
schema = createCsvSchemaFromConfig(firstLineSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the first line of the reader contains the header do we need when creating the MappingIterator in the section below? Is the mark and reset stuff necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The branch in the else statement is triggered if there's no header in the csv file. So we want to mark and reset so that the first row of the file can be parsed into parsingIterator a second time (after it's used to figure out the actual number of columns in the csv file, and generate more column names for the header if necessary). I moved this logic to a helper method with a descriptive name which should hopefully improve readability.

Signed-off-by: Finn Roblin <[email protected]>
return schema;
}

private String generateColumnHeader(final int colNumber) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@finnroblin , I believe once we make a generic codec solution (#1532) then we can have one project which has both. As it is now, this code has to reside in the S3 plugin. I'm fine to refactor this later.

import java.util.function.Consumer;

@DataPrepperPlugin(name = "csv", pluginType = Codec.class, pluginConfigurationType = CSVCodecConfig.class)
public class CSVCodec implements Codec {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use Csv instead of CSV. I think this is more consistent with the current code base and Java conventions. Please change other classes as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this change is more consistent with conventions and the code base. I suppose we should also change the CSVProcessor class to CsvProcessor — is it okay to include these changes in this pull request? I can also open a new pull request to change to CsvProcessor if it's out of scope of this pull request.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A different PR would be better. That will help get this one in faster.


private int extractNumberOfColsFromFirstLine(final String firstLine) {
int numberOfSeperators = 0;
int charPointer = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason this is not in the for statement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no reason, oversight on my part. Thanks for catching it!

}
}

private int extractNumberOfColsFromFirstLine(final String firstLine) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use full words - Cols -> Columns.


csvCodec = createObjectUnderTest();
}
// @Test(timeout=10000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove these commented out lines.

}

private CsvSchema createCsvSchemaFromConfig(final int firstLineSize) {
final List<String> userSpecifiedHeader = config.getHeader();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The configuration option getHeader can be null, and if it is then the call to size() on line 116 will throw an exception. The proper behavior if header is null is to make userSpecifiedHeader an empty ArrayList. I'll fix this bug in the next revision.

Copy link
Contributor

@travisbenedict travisbenedict left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

}
}

private int getNumberOfColumnsByMarkingBeginningOfInputStreamAndResettingReaderAfter(BufferedReader reader) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: I think you could just call this getNumberOfColumns since the caller of this function doesn't need to care how it works

LOG.error("Invalid CSV row, skipping this line. This typically means the row has too many lines. Consider using the CSV " +
"Processor if there might be inconsistencies in the number of columns because it is more flexible. Error ",
csvException);
} catch (JsonParseException jsonException) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing final

@dlvenable dlvenable merged commit 5ffd5c6 into opensearch-project:main Aug 9, 2022
engechas pushed a commit to engechas/data-prepper that referenced this pull request Sep 12, 2022
…roject#1644)

* Implemented CSVCodec for S3 Source, config & unit tests

Signed-off-by: Finn Roblin <[email protected]>

* Addressed Travis's feedback

Signed-off-by: Finn Roblin <[email protected]>

* Addressed David's feedback & fixed NPE if header is null

Signed-off-by: Finn Roblin <[email protected]>

* Renamed all instances of CSVProcessor to CsvProcessor (and offshoots like CsvProcessorConfig)

Signed-off-by: Finn Roblin <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add CSV Codec
4 participants