-
Notifications
You must be signed in to change notification settings - Fork 208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add new InputCodec interface to support seek-able input and corresponding implementation and tests for S3 objects #2727
Conversation
* @param eventConsumer The consumer which handles each event from the stream | ||
* @throws IOException throws IOException when invalid input is received or incorrect codec name is provided | ||
*/ | ||
void parse(InputFile inputFile, Consumer<Record<Event>> eventConsumer) throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't add a Parquet file in here since this couples the whole Data Prepper API with Parquet.
Ideally, we have a simpler model here and then adapt that to the Parquet InputFile
in the parquet-codecs
project.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also may need to consider how to let the InputCodec
advertise whether it requires being seekable or not. We are planning to the InputCodec
in other sources as well, which may not always be seekable. For example, the Kafka source may use them (though that is probably seekable as it may be just a byte array in the end).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to have a separate interface call InputFile and OutputFile that extends the Parquet InputFile and OutputFile interfaces.
I don't think the InputCodec should need to advertise if it requires seeking or not. In general InputStreams are not seek-able. They should be read sequentially. This is especially true of Kafka consumers. The data should be read sequentially. Files on the other hand are not required to be read sequentially.
I like the idea of presenting either as an option. Kafka is a good example if stream being more efficient. Local files, S3, or HDFS are good example of file being more efficient. The decision to use which source should be taken in the sink or the source depending on what is more efficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making some of these changes. I'm thinking of having Parquet entirely removed from the data-prepper-api and s3-source projects.
We can have an interface in data-prepper-api which represents something similar (hopefully with only what we really need). There can be an S3 implementation similar to what you have presently. And then finally, the parquet-codecs project can adapt that interface into the Parquet InputFile
model.
I'll see if I can put together something a little more concrete and get back to you on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that dependency management can be done better as we are now pulling in parquet map reduce dependencies, but I don't think that's a blocker. We can always go back and clean up the dependencies by adding our own implementations later on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a simple fix that can be applied here is:
- Copy the existing Parquet interfaces and mostly re-use them. I think there may be some opportunity to simplify as they have both
byte[]
andByteBuffer
fields that seem similar. This will remove Parquet from data-prepper-api which is the most important part. - Keep the dependency within the S3 source for now. We can fix this part in a later PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is that simple. We definitely want to use the ParquetReader library because that handles a lot of situations for us. Re-implementing that library is a lot of unnecessary work. And to be able to use that library the files just need to implement InputFile. This is a pretty minimal ask. The only dependency is on Parquet Common.
We could build a set of converters to avoid this dependency, but that adds another layer of complexity because each implementation of our InputFile needs to map to a Parquet implementation of InputFile. It's much simpler to just ensure that our implementation of an InputFile is a Parquet InputFile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not asking to re-implement the ParquetReader. Only adapting between the two interfaces.
So, create a new interface in data-prepper-api. Then create an adapter which adapts the Parquet InputFile to the data-prepper-api InputFile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a matter of opinion. It's not a blocker for adding this feature. You are welcome to make that change on top of this change.
...ommon/src/main/java/org/opensearch/dataprepper/plugins/fs/LocalFilePositionOutputStream.java
Show resolved
Hide resolved
data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/InputCodec.java
Show resolved
Hide resolved
...plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3InputStream.java
Show resolved
Hide resolved
aa9c5ae
to
1654410
Compare
e65af5b
to
5abd083
Compare
...per-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/fs/LocalInputStream.java
Show resolved
Hide resolved
...per-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/fs/LocalInputStream.java
Show resolved
Hide resolved
...per-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/fs/LocalInputStream.java
Outdated
Show resolved
Hide resolved
...src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ScanObjectWorkerIT.java
Outdated
Show resolved
Hide resolved
...ce/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ObjectGenerator.java
Outdated
Show resolved
Hide resolved
...pper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/fs/LocalOutputFile.java
Show resolved
Hide resolved
...r-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/fs/LocalInputFileTest.java
Show resolved
Hide resolved
...per-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/fs/LocalInputStream.java
Show resolved
Hide resolved
...plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3InputStream.java
Show resolved
Hide resolved
The InputCodec currently only supports reading from an InputStream. This interface does not work for inputs that require random read access. In order to support random access reads in addition to sequential reads, the InputCode interface should support reading from an InputFile. For convenience, the existing InputFile interface defined in the parquet-common package is being used. Signed-off-by: Adi Suresh <[email protected]>
Previously, S3 files could only be sequentially streamed. Random access to an S3 file required downloading the file locally. This change introduces an input stream on top of the S3 object that support random read access. Additionally, the tests have been updated to test all codecs. Signed-off-by: Adi Suresh <[email protected]>
Instead of directly using the Parquet InputFile interface, the DataPrepper InputFile interface is used by codecs and the Data Prepper InputFile interface implements the Parquet InputFile interface. Signed-off-by: Adi Suresh <[email protected]>
Signed-off-by: Adi Suresh <[email protected]>
Signed-off-by: Adi Suresh <[email protected]>
Signed-off-by: Adi Suresh <[email protected]>
The decompression engine should be passed along with the InputFile in cases where the file does not contain a hint to whethere a decompression codec should be used or not. Signed-off-by: Adi Suresh <[email protected]>
Signed-off-by: Adi Suresh <[email protected]>
Signed-off-by: Adi Suresh <[email protected]>
Signed-off-by: Adi Suresh <[email protected]>
This changes add the correct JSON encoding logic and corresponding tests to assert the correct behavior. Signed-off-by: Adi Suresh <[email protected]>
Signed-off-by: Adi Suresh <[email protected]>
Signed-off-by: Adi Suresh <[email protected]>
Signed-off-by: Adi Suresh <[email protected]>
Signed-off-by: Adi Suresh <[email protected]>
Signed-off-by: Adi Suresh <[email protected]>
Signed-off-by: Adi Suresh <[email protected]>
Signed-off-by: Adi Suresh <[email protected]>
Signed-off-by: Adi Suresh <[email protected]>
Signed-off-by: Adi Suresh <[email protected]>
We will have to fix this dependency issue later.
Description
Adds new codec interface to support seeking through an input and corresponding implementation and tests to read S3 files in this way.
Issues Resolved
#1532
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.