Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow s3_client_kwargs to be passed into repartition #158

Merged
merged 2 commits into from
Jul 26, 2023

Conversation

rkenmi
Copy link
Member

@rkenmi rkenmi commented Jul 26, 2023

Allows S3 keyword arguments to be passed into repartition session for use cases such as AWS credential overrides.

Copy link
Collaborator

@raghumdani raghumdani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of minor comments. LGTM

Copy link
Collaborator

@raghumdani raghumdani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, please also change the default to None compaction_session.

Copy link
Collaborator

@pfaraone pfaraone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you publish a pre-release release here for 0.1.18.beta10 after merging? -See https://github.com/ray-project/deltacat/releases

Copy link
Collaborator

@valiantljk valiantljk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@rkenmi rkenmi merged commit 6471727 into main Jul 26, 2023
Copy link
Collaborator

@raghumdani raghumdani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please address minor comment.

@@ -284,6 +284,9 @@ def _execute_compaction_round(
max_parallelism = int(cluster_cpus)
logger.info(f"Max parallelism: {max_parallelism}")

if s3_client_kwargs is None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The defaulting must be done in compact_partition method itself.

pdames added a commit that referenced this pull request Sep 5, 2023
… and new makefile targets. (#202)

* Logging memory consumed to validate worker estimation correctness (#142)

* Logging memory consumed to validate worker estimation correctness

[Description] This change will allow us to validate the correctness of the workers we estimated to run a compaction job reliably.

* update tests

* fix tests for python 3.7 as addClassCleanup is absent

* addressed comments

* Update deltacat/compute/compactor/compaction_session.py

Co-authored-by: Patrick Ames <[email protected]>
Signed-off-by: Raghavendra M Dani <[email protected]>

* Update deltacat/utils/ray_utils/concurrency.py

Co-authored-by: Patrick Ames <[email protected]>
Signed-off-by: Raghavendra M Dani <[email protected]>

* remove dependency on rebase_source_partition, simplify the repartition delta discovery

* remove repartition_during_rebase argument, ping the source to compacted table

* remove repartition rcf write, generalize the rcf write by supporting pre-specified url

* addressed comments

* logging cpu usage metrics

* fix percentages

* capture latency of retrieving cluster resources

---------

Signed-off-by: Raghavendra M Dani <[email protected]>
Co-authored-by: Patrick Ames <[email protected]>
Co-authored-by: Jialin Liu <[email protected]>

* Capturing all the performance metrics in an audit (#146)

* Capturing all the performance metrics in an audit

[Description] It is important to capture audit automatically and analyze the datapoints from significant number of runs to recognize a pattern and come up with a formula to predict resource consumption. This would improve reliability and efficiency.

[Motivation] #141

* fix unit tests

* refactoring and captured new metrics

* fix constructor args

* fix assert statement

* fix NoneType error

* fix bug in skipping manifest

* add comment and address comments

* fix stage_delta

* fixing deltacat method signature

* capturing the head process peak memory usage and result size of each step

* [skip download cold manifest] Add support for skipping download cold manifest entries during materialize step

* Address review comments

* Adding a data model to represent compact_partition function parameters (#151)

* defined CompactPartitionParams class

* fixed setuptools.setup name

* updated CompactPartitionParams docstring to refer to function location

* added compact_partition_from_request method that wraps compact_partition

* fixed unit tests

* fixed test_serialize_handles_sets unit test by asserting that two list contain the same elements instead of using == operator

* fixed linting in test_compact_partition_params.py

* Object store implementation to allow elastically increasing the object store memory (#149)

* [skip untouched files]Disable copy by reference during backfill and rebase only compaction; Fix referenced manifest PyarrowWriteResult (#153)

* add dd parallelism params (#154)

Co-authored-by: Jialin Liu <[email protected]>

* Allow s3 client kwargs as argument of compact_partition (#155)

* Allow s3 client kwargs as argument of compact_partition

* Changing default as empty dict

* add s3_client_kwargs to rcf

* Bunping up version to 0.1.18b8

* Honor profile name in s3 client kwargs (#157)

* Use kwargs as to determine the profile to use when creating the s3 client

* bumped up version to 0.1.18b9

* Allow s3_client_kwargs to be passed into repartition (#158)

* Allow s3_client_kwargs to be passed into repartition

* Update compaction session s3_client_kwargs to default to None

* Move s3_client_kwargs default setter to parent scope (#159)

* Move s3_client_kwargs default setter to parent scope

* Bump version to 0.1.18b11

* keep null row and remove dw_int64 column (#161)

* keep null row and remove dw_int64 column

* add assertation for extra column remove

* remove column by name explicitly

* use drop column

* use list of str in drop columns

---------

Co-authored-by: Jialin Liu <[email protected]>

* version bump to 0.1.18b12 (#164)

* Cleaning up rehashing logic as it is a dead code as of now. (#166)

* Cleaning up rehashing logic as it is a dead code as of now.

[Motivation] #165

* fix version number

* Fix stream position and support latest pyarrow (#168)

[Motivation] #167

* Bumped version from 0.1.18b12 to 0.1.18b13 (#169)

* bumped version from 0.1.18b12 to 0.1.18b13

* fixed current_node_name logic to handle ipv6 addresses

* Add pytest benchmarking for Parquet reads (#160)

* Add benchmarking scripts and README

* Cleanup code and report visualizations

* Enable 1RG tpch files

* Fix naming from 1RG to 2RG

* Update dev-requirements.txt

Co-authored-by: Patrick Ames <[email protected]>
Signed-off-by: Jay Chia <[email protected]>

* Update deltacat/benchmarking/conftest.py

Co-authored-by: Patrick Ames <[email protected]>
Signed-off-by: Jay Chia <[email protected]>

* Update deltacat/benchmarking/README.md

Co-authored-by: Patrick Ames <[email protected]>
Signed-off-by: Jay Chia <[email protected]>

* Lints and README

* Use benchmark-requirements.txt instead of dev-requirements.txt

* Lints

---------

Signed-off-by: Jay Chia <[email protected]>
Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
Co-authored-by: Patrick Ames <[email protected]>

* Polling EC2 Instance Metadata endpoint until HTTP 200 OK (#172)

* block_until_instance_metadata_service_returns_success implemented

* now just checking INSTANCE_METADATA_SERVICE_IPV4_URI

* added unit testing

* removed unneccesary logging

* removed commented out print statement

* fixed session.mount

* removed unused args

* now using tenacity for retrying

* changed default stop strategy

* removed unused import in TestBlockUntilInstanceMetadataServiceReturnsSuccess

* retry_url->retrying_get, log_attempt_number now private

* Adding local deltacat storage module (#175)

* Adding local deltacat storage module

* Using camel case dict keys for consistency

* Support storing data in parquet and utsv bytes

* Fix README.md to allow db_file_path arg

* version bump from 0.1.18b13 to 0.1.18b14 (#179)

* Now triggering publish-to-pypi on editing and creating a release (#180)

* `compact_partition` incremental unit test (#188)


* fixed commit_partition and list_deltas local deltacat storage bug

* providing destination table with deltas

* ds_mock_kwargs fixtures just consist of dict of db_file_path

* working test_compact_partition_success unit test

* flake8 issues

* removed print statements

* fixed  assert (
            manifest_records == len(compacted_table)

* 3 working unit test cases

* implemented test_compact_partition_incremental

* commented out broken unit test

* removed unused test files

* added explanatory comment to block_until_instance_metadata_service_returns_success

* reverting ray_utils/runtime change

* fixed test_retrying_on_statuses_in_status_force_list

* fixed test_retrying_on_statuses_in_status_force_list

* added additional block_until_instance_metadata_service_returns_success unit test

* additional fixtures

* validation_callback_func_ -> validation_callback_func_kwargs

* added use_prev_compacted key

* added additional use_prev_compacted

* fixed test_compaction_session unit tests

* copied over working unit tests from dev/test_compact_partition_first_cut

* fixed repartition unit tests

* moved test_utils to itw own module

* removed unused kwargs arg

* paramtrizing records_per_compacted_file, hash_bucket_count

* removed unused TODO

* augmented CompactPartitionParams to include additional compact_partition kwargs

* augmented CompactPartitionParams to include additional compact_partition kwargs

* refactored testcases and setup to there own modules

* added additional type hints

* defaulting to empty dict safetly wherever deltacat_storage_kwargs is a param

* revert change that no longer passed **list_deltas_kwargs to io.discover_deltas

* additional type hints

* no longer passing kwargs to dedupe + decimal-pk case

* no longer passing kwargs to materialize

* unpacking deltacat_storage_kwargs in deltacat_storage.stage_delta timed_invocation

* no longer unpacking **list_deltas_kwargs when calling _execute_compaction_round

* readded kwargs

* added incremental timestamp-pk unit test

* added docstring to offer_iso8601_timestamp_list

* added tc for duplicate w sorting keyand multiple primary keys

* removed deadcode

* added additional comments to first incremental test case

* fixture refactoring - compaction_artifacts_s3_bucket -> setup_compaction_artifacts_s3_bucket

* parameterizing table version

* # ds_mock_kwargs -> # teardown_local_deltacat_storage_db

* tearing down db within test execution is now parameterized

* added INCREMENTAL_DEPENDENT_TEST_CASES dependent test case

* reversing order of sk in 10-incremental-decimal-pk-multi-dup

* Switch botocore retry mode to adaptive from standard (#191)

Co-authored-by: Kevin Yan <[email protected]>

* Merge phash_main into main branch (#195)

* Adding support for reading table into ParquetFile (#163)

* Adding support for reading just the parquet meta by leveraging ParquetFile

[Description] Right now we read the entire table using read_parquet method of pyarrow. However, we lose the parquet metadata in the process and this will eagerly load all the data. In this commit we will use ParquetFile (https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)

[Motivation] #162

* Using pyarrow.fs.S3FileSystem instead of s3fs

* Adding s3 adaptive retries and validate encoding

* Interface for merge v2 (#182)

* Adding interface and definitions for merge step

* fix tests and merge logs

* Add hashed memcached client support (#173)

* Adding support for reading table into ParquetFile (#163)

* Adding support for reading just the parquet meta by leveraging ParquetFile

[Description] Right now we read the entire table using read_parquet method of pyarrow. However, we lose the parquet metadata in the process and this will eagerly load all the data. In this commit we will use ParquetFile (https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)

[Motivation] #162

* Using pyarrow.fs.S3FileSystem instead of s3fs

* Adding s3 adaptive retries and validate encoding

* Add hashed memcached client support

* Adding support for reading table into ParquetFile (#163)

* Adding support for reading just the parquet meta by leveraging ParquetFile

[Description] Right now we read the entire table using read_parquet method of pyarrow. However, we lose the parquet metadata in the process and this will eagerly load all the data. In this commit we will use ParquetFile (https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)

[Motivation] #162

* Using pyarrow.fs.S3FileSystem instead of s3fs

* Adding s3 adaptive retries and validate encoding

* use uid instead of ip address as key to hash algorithm

---------

Co-authored-by: Raghavendra M Dani <[email protected]>

* Implementing hash bucketing v2 (#178)

* Implementing hash bucket v2

* Fix the assertion regarding hash buckets

* Python 3.7 does not have doClassCleanups in super

* Fix the memory issue with the hb index creation

* Compaction session implementation for algo v2 (#187)

* Compaction session implementation for algo v2

* Address comments

* Added capability to measure instance minutes in a autoscaling cluster setting

* Avoid configuring logger if ray is uninitialized

* Add readme to run tests

* Refactoring and fixing the num_cpus key in options

* Resolve merge conflict and rebase from main

* Adding additional optimization from POC (#194)

* Adding additional optimization from POC

* fix typo

* Moved the compact_partition tests to top level module

* Adding unit tests for parquet downloaders

* fix typo

* fix repartition session

* Adding stack trace and passing config kwargs separate due to s3fs bug

* fix the parquet reader

* pass deltacat_storage_kwargs in repartition_session

* addressed comments and extend tests to handle v2

---------

Co-authored-by: Zyiqin-Miranda <[email protected]>

* Daft Native Reader for Parquet Content Types  (#183)

* Add pyarrow_to_daft_schema helper

* added initial daft read for parquet

* integrate io code with daft cast

* suggestions

* lint-fix EOF for requirements

* lint-fixs EOF with pre-commit

* bump version to 0.1.18b15

* Fix tuple issue with coerce_int96_timestamp_unit

* Fix kwargs and Schema

* Lint

* using include column names instead of column names

* Update to use new Daft Schema.from_pyarrow_schema method

* Remove daft requirement from benchmark-requirements.txt

* Factor out daft parquet reading code into seperate util file

* add timing to daft.read_parquet, address feedback, add unit tests

* run pre-commit

* switch on columns and add local file

* add schema override unit test

* add schema override unit test

* fix fstrings

* thread column names through

* downgrade to daft==0.1.12

* add support for partial row group downloads

* add `reader_type` to pyarrow utils

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>

* [WIP] Read Iceberg to DeltaCAT Dataset (#131)

* add read_iceberg and corresponding test structure

* rollback requirements change

* prepare for pyiceberg0.4.0 release. Add more tests

* add missing requirements

* add quick install command

* add read_iceberg and corresponding test structure

* rollback requirements change

* prepare for pyiceberg0.4.0 release. Add more tests

* add missing requirements

* add quick install command

* Fixes for Ray 2.X and add recommendations from review of PR #131.

* fix format issue

Signed-off-by: Rushan Jiang <[email protected]>

* fix requirements list and setup

Signed-off-by: Rushan Jiang <[email protected]>

* fix the partitioning issue reported in the ray-project PR

Signed-off-by: Rushan Jiang <[email protected]>

* porting apache/iceberg#7768 to reduce the waiting time and increase stability when running integration test

Signed-off-by: Rushan Jiang <[email protected]>

---------

Signed-off-by: Rushan Jiang <[email protected]>
Co-authored-by: Patrick Ames <[email protected]>

* Add workaround for pydantic 2.0 incompatibility, add build & deploy to s3, and add AWS glue job runner.

* Fix worker logging on AWS Glue, stop duplicate pip installs of DeltaCAT, linting.

* Add regionalization, remove assumptions about high-level errors/responses from Glue.

---------

Signed-off-by: Raghavendra M Dani <[email protected]>
Signed-off-by: Jay Chia <[email protected]>
Signed-off-by: Rushan Jiang <[email protected]>
Signed-off-by: Patrick Ames <[email protected]>
Co-authored-by: jialin <[email protected]>
Co-authored-by: Raghavendra M Dani <[email protected]>
Co-authored-by: Jialin Liu <[email protected]>
Co-authored-by: Zyiqin-Miranda <[email protected]>
Co-authored-by: zyiqin <[email protected]>
Co-authored-by: pf <[email protected]>
Co-authored-by: rkenmi <[email protected]>
Co-authored-by: Jay Chia <[email protected]>
Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
Co-authored-by: Kevin Yan <[email protected]>
Co-authored-by: Kevin Yan <[email protected]>
Co-authored-by: Sammy Sidhu <[email protected]>
Co-authored-by: Jonas(Rushan) Jiang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants