From e0448b5d89175026ff1edf46c3fd3b87060dec96 Mon Sep 17 00:00:00 2001 From: rkenmi Date: Wed, 26 Jul 2023 10:31:18 -0700 Subject: [PATCH 1/2] Allow s3_client_kwargs to be passed into repartition --- deltacat/__init__.py | 2 +- deltacat/compute/compactor/repartition_session.py | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/deltacat/__init__.py b/deltacat/__init__.py index f069e3b5..970aa16e 100644 --- a/deltacat/__init__.py +++ b/deltacat/__init__.py @@ -43,7 +43,7 @@ deltacat.logs.configure_deltacat_logger(logging.getLogger(__name__)) -__version__ = "0.1.18b9" +__version__ = "0.1.18b10" __all__ = [ diff --git a/deltacat/compute/compactor/repartition_session.py b/deltacat/compute/compactor/repartition_session.py index a554e165..83379b94 100644 --- a/deltacat/compute/compactor/repartition_session.py +++ b/deltacat/compute/compactor/repartition_session.py @@ -53,6 +53,7 @@ def repartition( pg_config: Optional[PlacementGroupConfig] = None, list_deltas_kwargs: Optional[Dict[str, Any]] = None, read_kwargs_provider: Optional[ReadKwargsProvider] = None, + s3_client_kwargs: Optional[Dict[str, Any]] = None, deltacat_storage=unimplemented_deltacat_storage, **kwargs, ) -> Optional[str]: @@ -166,9 +167,13 @@ def repartition( bit_width_of_sort_keys, None, ) + if s3_client_kwargs is None: + s3_client_kwargs = {} + return rcf.write_round_completion_file( None, None, repartition_completion_info, repartition_completion_file_s3_url, + **s3_client_kwargs, ) From 6f5b38fc665ded9eede065f171d070d1f21055dd Mon Sep 17 00:00:00 2001 From: rkenmi Date: Wed, 26 Jul 2023 11:16:47 -0700 Subject: [PATCH 2/2] Update compaction session s3_client_kwargs to default to None --- deltacat/compute/compactor/compaction_session.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/deltacat/compute/compactor/compaction_session.py b/deltacat/compute/compactor/compaction_session.py index 5808811a..2faf838f 100644 --- a/deltacat/compute/compactor/compaction_session.py +++ b/deltacat/compute/compactor/compaction_session.py @@ -115,7 +115,7 @@ def compact_partition( read_kwargs_provider: Optional[ReadKwargsProvider] = None, s3_table_writer_kwargs: Optional[Dict[str, Any]] = None, object_store: Optional[IObjectStore] = RayPlasmaObjectStore(), - s3_client_kwargs: Optional[Dict[str, Any]] = {}, + s3_client_kwargs: Optional[Dict[str, Any]] = None, deltacat_storage=unimplemented_deltacat_storage, **kwargs, ) -> Optional[str]: @@ -284,6 +284,9 @@ def _execute_compaction_round( max_parallelism = int(cluster_cpus) logger.info(f"Max parallelism: {max_parallelism}") + if s3_client_kwargs is None: + s3_client_kwargs = {} + # read the results from any previously completed compaction round round_completion_info = None if not rebase_source_partition_locator: