Skip to content

Commit

Permalink
add Cluster class
Browse files Browse the repository at this point in the history
  • Loading branch information
afloresep committed Dec 17, 2024
1 parent 9d99053 commit 288da42
Showing 1 changed file with 37 additions and 67 deletions.
104 changes: 37 additions & 67 deletions api/dask_clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,21 @@

logger = logging.getLogger(__name__)

class Cluster():
class PCAClusterAssigner():
def __init__(self, input_path: str, steps: list):
# Assert all steps are natural integers
for i in steps:
assert isinstance(i, int), "Step in STEP_LISTS is not of type int"
assert i > 0, f"Step cannot be a negative number: {i} "
assert type(i) == "int", f"Step must be int, instead got: {type(i)}"

self.input_path = input_path
# Steps or number of buckets per dimension
self.first_dim_steps = steps[0]
self.second_dim_steps = steps[1]
self.third_dim_steps = steps[2]
# TODO: Add support for other file types e.g csv
self.dataframe = dd.read_parquet(self.input_path, engine="pyarrow")

# Percentiles to split the data into buckets
self.first_dim_percenntiles = np.arange(0,102, (100/self.first_dim_steps))
self.second_dim_percentiles = np.arange(0,102, (100/self.second_dim_steps))
self.third_dim_percentiles = np.arange(0,102, (100/self.third_dim_steps))
def load_data(self):
"""Load the parquet files into a Dask DataFrame"""
self.df = dd.read_parquet(self.input_path, engine="pyarrow")

# TODO: Add support for other file types e.g csv
self.dataframe = dd.read_parquet(self.input_path)

def compute_percentiles(self, df, col, quantiles):
"""Compute given percentiles for a column in a Dask DataFrame."""
arr = df[col].to_dask_array(lengths=True)
Expand All @@ -40,13 +34,28 @@ def assign_cluster_ids(self, part, col, percentiles, cluster_col='cluster_id'):
part[cluster_col] = cluster_ids
return part

def assign_clusters_by_column(self, df, col, quantiles, cluster_col='cluster_id') -> list:
"""Compute percentiles for a column and assign cluster IDs to the entire DataFrame."""
# Compute global percentiles
percentiles = self.compute_percentiles(df, col, quantiles)
def assign_clusters_by_column(self, col, quantiles, cluster_col='cluster_id') -> list:
"""
Compute percentiles for a column and assign cluster IDs to the entire DataFrame.
Parameters
----------
col : str
Column name on which to base cluster assignments.
cluster_col : str, optional
Name of the new column that will store cluster IDs.
quantiles : array_like, optional
Sequence of quantiles to compute. Defaults to np.arange(0, 102, 2).
Returns
-------
percentiles : np.ndarray
Computed percentiles for the column.
"""
# Compute global percentiles
percentiles = self.compute_percentiles(self.dataframe, col, quantiles)

# Assign clusters to all rows based on these percentiles
df = df.map_partitions(self.assign_cluster_ids, col=col, percentiles=percentiles, cluster_col=cluster_col)
df = self.dataframe.map_partitions(self.assign_cluster_ids, col=col, percentiles=percentiles, cluster_col=cluster_col)
return df, percentiles

def assign_clusters_conditionally(self, df, filter_col, filter_val, col, cluster_col, percentiles):
Expand All @@ -65,52 +74,13 @@ def _assign_conditionally(part, col, percentiles, filter_col, filter_val, cluste
df = df.map_partitions(_assign_conditionally, col, percentiles, filter_col, filter_val, cluster_col)
return df


input_folder = "/mnt/samsung_2tb/mixed_data/output/pca_transformed_results/output/*.parquet"
print("Loading data")

# Read the data
df = dd.read_parquet(input_folder, engine='pyarrow')

print("Assign cluster by column for PCA_1")
# 1) First level clustering by PCA_1:
df_test, pca1_percentiles = assign_clusters_by_column(df, 'PCA_1', cluster_col='cluster_id')

print("Clustering for PCA 2")
with ProgressTracker(description="Clustering", total_files=50, interval=4) as tracker:
for i in range(50):

# 2) Second level clustering by PCA_2 within a bucket of PCA_1 for all buckets:
pca2_percentiles = compute_percentiles(df_test[df_test['cluster_id'] == i], 'PCA_2', quantiles=np.arange(0,102,2))

# Assign cluster_id_2 only to rows where cluster_id == i
df_test = assign_clusters_conditionally(df_test, filter_col='cluster_id', filter_val=i, col='PCA_2', cluster_col='cluster_id_2', percentiles=pca2_percentiles)

# Update progress bar
tracker.update_progress()

print("Clustering for PCA_3")
with ProgressTracker(description="Clustering", total_files=50, interval=4) as tracker:
for i in range(50):

# 2) Third level clustering by PCA_3 within a specific bucket of PCA_2
# Compute percentiles for PCA_2, but only for the subset where cluster_id == i
pca2_percentiles = compute_percentiles(df_test[df_test['cluster_id'] == i], 'PCA_3', quantiles=np.arange(0,102,2))

# Assign cluster_id_2 only to rows where cluster_id == 1
df_test = assign_clusters_conditionally(df_test, filter_col='cluster_id', filter_val=i, col='PCA_3', cluster_col='cluster_id_3', percentiles=pca2_percentiles)

# Update progress bar
tracker.update_progress()

df['combined_cluster_id'] = (df_test['cluster_id'] * 50**2) + (df_test['cluster_id_2'] * 50) + df_test['cluster_id_3']

df['combined_cluster_id'] = df['combined_cluster_id'].astype(int)

print("Cluster completed, saving results")
# After completing all clustering steps, repartition to 400 partitions
df = df.repartition(npartitions=400)

# Save as Parquet files
output_path = "/mnt/10tb_hdd/buckets_pca_dask"
df.to_parquet(output_path, engine='pyarrow', write_index=False)
def save_data(self, output_folder, partition_on=None):
"""
Save the updated Dataframe to parquet
Parameters
----------
partition_on : list or str, optional
Column(s) on which to partition the data when saving.
"""
self.df.to_parquet(output_folder, partition_on=partition_on)

0 comments on commit 288da42

Please sign in to comment.