-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉 Source S3 - memory & performance optimisations + advanced CSV options #6615
Conversation
/test connector=connectors/source-s3
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the main reason of performance issues is loading of file objects for filtering/sorting( by last_modified).
def last_modified(self) -> datetime:
"""
Using decorator set up boto3 session & s3 resource.
Note: slight nuance for grabbing this when we have no credentials.
:return: last_modified property of the blob/file
"""
bucket = self._provider.get("bucket")
try:
obj = self._boto_s3_resource.Object(bucket, self.url)
return obj.last_modified
And thus your code tries to download all files (including not relevant for incremental mode).
I propose to move this filtering/sorting logic to the bucket listing function:
def _list_bucket(self, accept_key=lambda k: True) -> Iterator[str]:
....
content = response["Contents"]
# =======
# content["LastModified"]
# =======
raise Exception(content)
except KeyError:
pass
....
This function should return relevant sorted filepath values only.
airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py
Outdated
Show resolved
Hide resolved
airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py
Show resolved
Hide resolved
I like the thinking but few reasons not to make that change:
I've greatly reduced mem usage (4-5x) by storing just the filepaths rather than objects, this seemed like the problem there. |
/test connector=connectors/source-s3
|
…m pyarrow ReadOptions
…3-csv-advanced-options' into george/fix-s3-oom # Conflicts: # airbyte-integrations/connectors/source-s3/setup.py # airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py # docs/integrations/sources/s3.md
/test connector=connectors/source-s3
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid that this python code is too fancy for me to provide any useful review comments.
airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py
Outdated
Show resolved
Hide resolved
# Conflicts: # airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py
/publish connector=connectors/source-s3
|
/publish connector=connectors/source-s3
|
/publish connector=connectors/source-s3
|
Looks like the EC2 we are using for deployment is missing |
/publish connector=connectors/source-s3
|
/publish connector=connectors/source-s3
|
…ns (airbytehq#6615) * memory & performance optimisations * address comments * version bump * added advanced_options for reading csv without header, and more custom pyarrow ReadOptions * updated to use the latest airbyte-cdk * updated docs * bump source-s3 to 0.1.6 * remove unneeded lines * Use the all dep ami for python builds. * ec2-instance-id should be ec2-image-id * ec2-instance-id should be ec2-image-id Co-authored-by: Jingkun Zhuang <[email protected]> Co-authored-by: Davin Chia <[email protected]>
What
closes #6606
I'm fairly confident I've been able to make some small but impactful improvements here:
How
_get_master_schema()
so we can pass in the state on incremental runs and therefore only use new files to create our master schema (previously using all of them). Since the schema will be saved in state from a previous run, the only time we run the schema inference will be duringread
where we can pass in the state.Additionally