diff --git a/api/dask-clustering.py b/api/dask-clustering.py index ba0eea6..9019024 100644 --- a/api/dask-clustering.py +++ b/api/dask-clustering.py @@ -1,7 +1,8 @@ import dask.dataframe as dd import dask.array as da import numpy as np -from utils.helper_functions import FileProgressTracker +from api.utils.helper_functions import FileProgressTracker + def compute_percentiles(df, col, quantiles): """Compute given percentiles for a column in a Dask DataFrame.""" @@ -47,33 +48,46 @@ def _assign_conditionally(part, col, percentiles, filter_col, filter_val, cluste return df -# ------------------------------------------ -# Example Usage -# ------------------------------------------ - input_folder = "/mnt/samsung_2tb/mixed_data/output/pca_transformed_results/output/*.parquet" - print("Loading data") + # Read the data df = dd.read_parquet(input_folder, engine='pyarrow') -print("Assign cluster by column for PCA_2") +print("Assign cluster by column for PCA_1") # 1) First level clustering by PCA_1: df_test, pca1_percentiles = assign_clusters_by_column(df, 'PCA_1', cluster_col='cluster_id') -with FileProgressTracker(description="Clustering", total_files=50, interval=60) as tracker: +print("Clustering for PCA 2") +with FileProgressTracker(description="Clustering", total_files=50, interval=4) as tracker: for i in range(50): - # 2) Second level clustering by PCA_2 within a specific bucket of PCA_1 (for example, bucket 1): - # Compute percentiles for PCA_2, but only for the subset where cluster_id == 1 + + # 2) Second level clustering by PCA_2 within a bucket of PCA_1 for all buckets: pca2_percentiles = compute_percentiles(df_test[df_test['cluster_id'] == i], 'PCA_2', quantiles=np.arange(0,102,2)) - # Assign cluster_id_2 only to rows where cluster_id == 1 + # Assign cluster_id_2 only to rows where cluster_id == i df_test = assign_clusters_conditionally(df_test, filter_col='cluster_id', filter_val=i, col='PCA_2', cluster_col='cluster_id_2', percentiles=pca2_percentiles) - # At this point, `df` has both cluster_id (from PCA_1) and cluster_id_2 (from PCA_2 for bucket 1) columns. + # Update progress bar + tracker.update_progress() + +print("Clustering for PCA_3") +with FileProgressTracker(description="Clustering", total_files=50, interval=4) as tracker: + for i in range(50): + + # 2) Third level clustering by PCA_3 within a specific bucket of PCA_2 + # Compute percentiles for PCA_2, but only for the subset where cluster_id == i + pca2_percentiles = compute_percentiles(df_test[df_test['cluster_id'] == i], 'PCA_3', quantiles=np.arange(0,102,2)) + + # Assign cluster_id_2 only to rows where cluster_id == 1 + df_test = assign_clusters_conditionally(df_test, filter_col='cluster_id', filter_val=i, col='PCA_3', cluster_col='cluster_id_3', percentiles=pca2_percentiles) + # Update progress bar tracker.update_progress() +df['combined_cluster_id'] = (df_test['cluster_id'] * 50**2) + (df_test['cluster_id_2'] * 50) + df_test['cluster_id_3'] + +df['combined_cluster_id'] = df['combined_cluster_id'].astype(int) print("Cluster completed, saving results") # After completing all clustering steps, repartition to 400 partitions @@ -82,59 +96,3 @@ def _assign_conditionally(part, col, percentiles, filter_col, filter_val, cluste # Save as Parquet files output_path = "/mnt/10tb_hdd/buckets_pca_dask" df.to_parquet(output_path, engine='pyarrow', write_index=False) - - -# import dask.dataframe as dd -# import dask.array as da -# import numpy as np -# import os - -# input_folder = "/mnt/samsung_2tb/mixed_data/output/pca_transformed_results/output/*.parquet" - -# df = dd.read_parquet(input_folder, engine="pyarrow") - -# def get_percentiles(dataframe): -# pca_array = dataframe.to_dask_array(lengths=True) -# percentiles_to_compute = [i for i in np.arange(102, step=2)] -# percentiles = da.percentile( pca_array, q=percentiles_to_compute, method='linear', internal_method='tdigest').compute() - -# return percentiles.tolist() - -# def assign_bucket(part, name_pca_column, percentiles): -# cluster_ids = np.digitize(part[name_pca_column], percentiles) - 1 -# cluster_ids = np.where(cluster_ids==50, 49, cluster_ids) -# part['cluster_id'] = cluster_ids -# return part - -# pca1_percentiles = get_percentiles(df['PCA_1']) - -# df = df.map_partitions(assign_bucket()) - - -# for i in range(50): -# bucket = df[df['cluster_id'] == i] - -# # Convert 'PCA_1' column to a Dask array -# pca2_array = df['PCA_2'].to_dask_array(lengths=True) - -# # Define the percentiles to compute -# percentiles_to_compute = [i for i in np.arange(102, step=2)] - -# # Compute the percentiles using Dask's internal method -# percentiles = da.percentile( -# pca2_array, -# q=percentiles_to_compute, -# method='linear', -# internal_method='tdigest' # or 'tdigest' for memory efficiency -# ).compute() - -# def assign_bucket(part): -# cluster_ids = np.digitize(part['PCA_1'], percentiles) - 1 -# cluster_ids = np.where(cluster_ids==50, 49, cluster_ids ) -# part['cluster_id'] = cluster_ids -# return part - -# df = df.map_partitions(assign_bucket) - -# for i in range(50): -