-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉 New destination: S3 #3672
🎉 New destination: S3 #3672
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
having looked into StreamTransferManager
I think it's unlikely that it will do the kind of partitioning we want (uploading a stream of data into multiple files). It seems that it's trying to solve the use case of uploading a file of unknown size to S3. The way it happens to do that is to split it up into multiple files of known size in memory, uploading them, then concatenating them via a native S3 operation: MultipartUploads.
Given this, I think we have a few options:
- don't partition output within a stream
- partition by counting how many bytes have been been uploaded on the current upload manager. Once we exceed the desired size, complete that manager's work, then create a new manager to upload a new part file. Repeat this process until done.
- Load a single file then split after uploading. I don't like this approach because it has many moving parts, plus not all file formats are easilysplittable (CSV or JSON for example) -- it's possible to split those by reading N lines and counting bytes then saving that offset but it feels similar to approach Singer Postgres --> Postgres replication demo #2.
I suspect we should do approach 1 then 2 in a follow up release. I think this sequencing does not introduce any backwards incompatibilities and is congruent with an incremental value delivering approach. WDYT?
...e-java/src/main/java/io/airbyte/integrations/base/FailureTrackingAirbyteMessageConsumer.java
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json
Show resolved
Hide resolved
airbyte-integrations/connectors/destination-s3/src/main/resources/spec.json
Outdated
Show resolved
Hide resolved
...ctors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Destination.java
Outdated
Show resolved
Hide resolved
...nnectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Consumer.java
Outdated
Show resolved
Hide resolved
...onnectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Handler.java
Outdated
Show resolved
Hide resolved
...rs/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvHandler.java
Outdated
Show resolved
Hide resolved
import java.util.Map; | ||
import java.util.UUID; | ||
|
||
public class S3Consumer extends FailureTrackingAirbyteMessageConsumer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this a distinct class from the one in JDBC? should it reuse that class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The JDBC one has lots of database operations in it. I tried to reuse that one at the beginning of last week, but it was unnecessarily complicated. So I decided to create a separate one that only deals with S3 logic.
/test connector=connectors/destination-s3
|
/publish connector=connectors/destination-s3
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
few comments but we are almost there!
...ation-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/S3CsvFormatConfigTest.java
Show resolved
Hide resolved
...ctors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Destination.java
Show resolved
Hide resolved
...nation-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvOutputFormatter.java
Outdated
Show resolved
Hide resolved
...nation-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvOutputFormatter.java
Outdated
Show resolved
Hide resolved
...tination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConstants.java
Outdated
Show resolved
Hide resolved
...nation-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvOutputFormatter.java
Outdated
Show resolved
Hide resolved
...on-s3/src/test/java/io/airbyte/integrations/destination/s3/csv/S3CsvOutputFormatterTest.java
Show resolved
Hide resolved
...nation-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvOutputFormatter.java
Outdated
Show resolved
Hide resolved
They belong to another PR.
/test connector=connectors/destination-s3
|
/publish connector=connectors/destination-s3
|
What
How
S3StreamCopier
, but without DB operations.Pre-merge Checklist
Recommended reading order
destination-s3/spec.json
S3Destination.java
S3Consumer.java
S3OutputFormatter.java
S3CsvOutputFormatter.java