From dee3eeb4b96538512b82fed6ccf7b567f61034ba Mon Sep 17 00:00:00 2001 From: afloresep Date: Thu, 19 Dec 2024 13:48:44 +0100 Subject: [PATCH] (fix) rm specific path for config paths --- cli/clustering.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cli/clustering.py b/cli/clustering.py index 90883fd..357911c 100644 --- a/cli/clustering.py +++ b/cli/clustering.py @@ -46,6 +46,7 @@ def main() -> None: logging.info(f"Output directory: {config.OUTPUT_PATH}") logging.info(f"TDigest Model loaded: {config.TDIGEST_MODEL}") logging.info(f"Number of steps: {config.STEPS_LIST}") + logging.info(f"Temporary folder: {config.TMP_FOLDER} ") # TODO: Add Spark RAM config # TODO: Add tmp folder option, right now is on same path as OUTPUT_PATH @@ -87,7 +88,7 @@ def main() -> None: .config("spark.executor.memory", "50g") \ .config("spark.sql.shuffle.partitions", "2000") \ .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC") \ - .config("spark.local.dir", "/mnt/10tb_hdd/pyspark_tmp") \ + .config("spark.local.dir", config.TMP_FOLDER) \ .getOrCreate() # For every step in the first dim (i.e. for every bucket created in PCA_1 dimension), create new buckets @@ -125,7 +126,7 @@ def main() -> None: # Write each iteration's DataFrame to a separate folder # The final directory structure: /mnt/10tb_hdd/results/bucket_i/ print(f"Writing dataframe") - output_path = f"/mnt/10tb_hdd/buckets_pca_dask_test/pyspark_combined_results/bucket_{first_dim_step}" + output_path = os.path.join(config.OUTPUT_PATH, f"bucket_{first_dim_step}") bucket_dataframe_result.coalesce(10).write.mode("overwrite").parquet(output_path) # TODO: Remove temporary directory?