Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: process multiple partitions in parallel #288

Closed
AzulGarza opened this issue Apr 8, 2024 · 0 comments · Fixed by #296
Closed

feat: process multiple partitions in parallel #288

AzulGarza opened this issue Apr 8, 2024 · 0 comments · Fixed by #296
Assignees

Comments

@AzulGarza
Copy link
Member

AzulGarza commented Apr 8, 2024

currently, we have the num_partitions parameter for heavy requests that allows users to partition their data into chunks of time series. this process, however, is being done sequentially, which can ve slow in some cases. a good option might be to parallelize these multiple requests. to make sequential partitioned requests we used the following decorator:

def partition_by_uid(func):
def wrapper(self, num_partitions, **kwargs):
if num_partitions is None or num_partitions == 1:
return func(self, **kwargs, num_partitions=1)
df = kwargs.pop("df")
X_df = kwargs.pop("X_df", None)
id_col = kwargs["id_col"]
uids = df["unique_id"].unique()
results_df = []
for uids_split in np.array_split(uids, num_partitions):
df_uids = df.query("unique_id in @uids_split")
if X_df is not None:
X_df_uids = X_df.query("unique_id in @uids_split")
else:
X_df_uids = None
df_uids = remove_unused_categories(df_uids, col=id_col)
X_df_uids = remove_unused_categories(X_df_uids, col=id_col)
kwargs_uids = {"df": df_uids, **kwargs}
if X_df_uids is not None:
kwargs_uids["X_df"] = X_df_uids
results_uids = func(self, **kwargs_uids, num_partitions=1)
results_df.append(results_uids)
results_df = pd.concat(results_df).reset_index(drop=True)
return results_df
return wrapper

instead of calling the methods using a for loop, this process can be optimized to call them in parallel using concurrent.future.ThreadPoolExecutor (since the process is I/O and CPU bound) or multiprocess.Pool (the election of the method should be based on empirical performance).

the election of the number of parallel processes should be elected using min(num_partitions, os.cpu_count()); this behavior must be documented in the corresponding docstrings. ideally, the pr should include before and after processing time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants