Skip to content

Commit

Permalink
clean clustering
Browse files Browse the repository at this point in the history
  • Loading branch information
afloresep committed Dec 13, 2024
1 parent ffd6795 commit c25c586
Showing 1 changed file with 26 additions and 68 deletions.
94 changes: 26 additions & 68 deletions api/dask-clustering.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import dask.dataframe as dd
import dask.array as da
import numpy as np
from utils.helper_functions import FileProgressTracker
from api.utils.helper_functions import FileProgressTracker


def compute_percentiles(df, col, quantiles):
"""Compute given percentiles for a column in a Dask DataFrame."""
Expand Down Expand Up @@ -47,33 +48,46 @@ def _assign_conditionally(part, col, percentiles, filter_col, filter_val, cluste
return df


# ------------------------------------------
# Example Usage
# ------------------------------------------

input_folder = "/mnt/samsung_2tb/mixed_data/output/pca_transformed_results/output/*.parquet"

print("Loading data")

# Read the data
df = dd.read_parquet(input_folder, engine='pyarrow')

print("Assign cluster by column for PCA_2")
print("Assign cluster by column for PCA_1")
# 1) First level clustering by PCA_1:
df_test, pca1_percentiles = assign_clusters_by_column(df, 'PCA_1', cluster_col='cluster_id')

with FileProgressTracker(description="Clustering", total_files=50, interval=60) as tracker:
print("Clustering for PCA 2")
with FileProgressTracker(description="Clustering", total_files=50, interval=4) as tracker:
for i in range(50):
# 2) Second level clustering by PCA_2 within a specific bucket of PCA_1 (for example, bucket 1):
# Compute percentiles for PCA_2, but only for the subset where cluster_id == 1

# 2) Second level clustering by PCA_2 within a bucket of PCA_1 for all buckets:
pca2_percentiles = compute_percentiles(df_test[df_test['cluster_id'] == i], 'PCA_2', quantiles=np.arange(0,102,2))

# Assign cluster_id_2 only to rows where cluster_id == 1
# Assign cluster_id_2 only to rows where cluster_id == i
df_test = assign_clusters_conditionally(df_test, filter_col='cluster_id', filter_val=i, col='PCA_2', cluster_col='cluster_id_2', percentiles=pca2_percentiles)

# At this point, `df` has both cluster_id (from PCA_1) and cluster_id_2 (from PCA_2 for bucket 1) columns.
# Update progress bar
tracker.update_progress()

print("Clustering for PCA_3")
with FileProgressTracker(description="Clustering", total_files=50, interval=4) as tracker:
for i in range(50):

# 2) Third level clustering by PCA_3 within a specific bucket of PCA_2
# Compute percentiles for PCA_2, but only for the subset where cluster_id == i
pca2_percentiles = compute_percentiles(df_test[df_test['cluster_id'] == i], 'PCA_3', quantiles=np.arange(0,102,2))

# Assign cluster_id_2 only to rows where cluster_id == 1
df_test = assign_clusters_conditionally(df_test, filter_col='cluster_id', filter_val=i, col='PCA_3', cluster_col='cluster_id_3', percentiles=pca2_percentiles)

# Update progress bar
tracker.update_progress()

df['combined_cluster_id'] = (df_test['cluster_id'] * 50**2) + (df_test['cluster_id_2'] * 50) + df_test['cluster_id_3']

df['combined_cluster_id'] = df['combined_cluster_id'].astype(int)

print("Cluster completed, saving results")
# After completing all clustering steps, repartition to 400 partitions
Expand All @@ -82,59 +96,3 @@ def _assign_conditionally(part, col, percentiles, filter_col, filter_val, cluste
# Save as Parquet files
output_path = "/mnt/10tb_hdd/buckets_pca_dask"
df.to_parquet(output_path, engine='pyarrow', write_index=False)


# import dask.dataframe as dd
# import dask.array as da
# import numpy as np
# import os

# input_folder = "/mnt/samsung_2tb/mixed_data/output/pca_transformed_results/output/*.parquet"

# df = dd.read_parquet(input_folder, engine="pyarrow")

# def get_percentiles(dataframe):
# pca_array = dataframe.to_dask_array(lengths=True)
# percentiles_to_compute = [i for i in np.arange(102, step=2)]
# percentiles = da.percentile( pca_array, q=percentiles_to_compute, method='linear', internal_method='tdigest').compute()

# return percentiles.tolist()

# def assign_bucket(part, name_pca_column, percentiles):
# cluster_ids = np.digitize(part[name_pca_column], percentiles) - 1
# cluster_ids = np.where(cluster_ids==50, 49, cluster_ids)
# part['cluster_id'] = cluster_ids
# return part

# pca1_percentiles = get_percentiles(df['PCA_1'])

# df = df.map_partitions(assign_bucket())


# for i in range(50):
# bucket = df[df['cluster_id'] == i]

# # Convert 'PCA_1' column to a Dask array
# pca2_array = df['PCA_2'].to_dask_array(lengths=True)

# # Define the percentiles to compute
# percentiles_to_compute = [i for i in np.arange(102, step=2)]

# # Compute the percentiles using Dask's internal method
# percentiles = da.percentile(
# pca2_array,
# q=percentiles_to_compute,
# method='linear',
# internal_method='tdigest' # or 'tdigest' for memory efficiency
# ).compute()

# def assign_bucket(part):
# cluster_ids = np.digitize(part['PCA_1'], percentiles) - 1
# cluster_ids = np.where(cluster_ids==50, 49, cluster_ids )
# part['cluster_id'] = cluster_ids
# return part

# df = df.map_partitions(assign_bucket)

# for i in range(50):

0 comments on commit c25c586

Please sign in to comment.