From 81b2043dc3e8faad64f634567ce30b021e832dc9 Mon Sep 17 00:00:00 2001 From: Joseph Nke Date: Sun, 10 Apr 2022 20:19:32 -0700 Subject: [PATCH 1/3] enable MG support for small datasets --- .../cugraph/cugraph/dask/common/part_utils.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/python/cugraph/cugraph/dask/common/part_utils.py b/python/cugraph/cugraph/dask/common/part_utils.py index c8e675c7a29..2b14bc7fe21 100644 --- a/python/cugraph/cugraph/dask/common/part_utils.py +++ b/python/cugraph/cugraph/dask/common/part_utils.py @@ -88,9 +88,24 @@ async def _extract_partitions(dask_obj, client=None, batch_enabled=False): if batch_enabled: persisted = client.persist(dask_obj, workers=worker_list[0]) else: + # Have the first n workers persisting the n partitions + # Ideally, there should be as many partition as there are workers persisted = [client.persist( dask_obj.get_partition(p), workers=w) for p, w in enumerate( - worker_list)] + worker_list[:dask_obj.npartitions])] + + # Persist empty dataframe to the remaining workers if there are + # less partition than workers + if dask_obj.npartitions < len(worker_list): + # The empty df should have the same column names and dtypes as + # dask_obj + empty_df = cudf.DataFrame(columns = list(dask_obj.columns)) + empty_df = empty_df.astype(dict(zip( + dask_obj.columns, dask_obj.dtypes))) + for p, w in enumerate(worker_list[dask_obj.npartitions:]): + empty_ddf = dask_cudf.from_cudf(empty_df, npartitions=1) + persisted.append(client.persist(empty_ddf, workers=w)) + parts = futures_of(persisted) # iterable of dask collections (need to colocate them) elif isinstance(dask_obj, collections.abc.Sequence): From c52e9704f54e8b50f4fce6ff666afe903a88bb20 Mon Sep 17 00:00:00 2001 From: Joseph Nke Date: Sun, 10 Apr 2022 21:11:39 -0700 Subject: [PATCH 2/3] fix style check, update docstring --- python/cugraph/cugraph/dask/common/part_utils.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/python/cugraph/cugraph/dask/common/part_utils.py b/python/cugraph/cugraph/dask/common/part_utils.py index 2b14bc7fe21..bc15e0a7da9 100644 --- a/python/cugraph/cugraph/dask/common/part_utils.py +++ b/python/cugraph/cugraph/dask/common/part_utils.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2021, NVIDIA CORPORATION. +# Copyright (c) 2019-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -89,17 +89,16 @@ async def _extract_partitions(dask_obj, client=None, batch_enabled=False): persisted = client.persist(dask_obj, workers=worker_list[0]) else: # Have the first n workers persisting the n partitions - # Ideally, there should be as many partition as there are workers + # Ideally, there would be as many partitions as there are workers persisted = [client.persist( dask_obj.get_partition(p), workers=w) for p, w in enumerate( worker_list[:dask_obj.npartitions])] - - # Persist empty dataframe to the remaining workers if there are - # less partition than workers + # Persist empty dataframe with the remaining workers if there are + # less partitions than workers if dask_obj.npartitions < len(worker_list): # The empty df should have the same column names and dtypes as # dask_obj - empty_df = cudf.DataFrame(columns = list(dask_obj.columns)) + empty_df = cudf.DataFrame(columns=list(dask_obj.columns)) empty_df = empty_df.astype(dict(zip( dask_obj.columns, dask_obj.dtypes))) for p, w in enumerate(worker_list[dask_obj.npartitions:]): From 3674b7d805550ba5e9845c23057498ce4c0c0ae3 Mon Sep 17 00:00:00 2001 From: Joseph Nke Date: Sat, 16 Apr 2022 08:33:18 -0700 Subject: [PATCH 3/3] add columns major_vertices and minor_vertices to shuffled_df when shuffled_minor_series and shuffled_major_series are None --- python/cugraph/cugraph/structure/renumber_wrapper.pyx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/cugraph/cugraph/structure/renumber_wrapper.pyx b/python/cugraph/cugraph/structure/renumber_wrapper.pyx index bf165001ef1..515e53f59f1 100644 --- a/python/cugraph/cugraph/structure/renumber_wrapper.pyx +++ b/python/cugraph/cugraph/structure/renumber_wrapper.pyx @@ -53,11 +53,11 @@ cdef renumber_helper(shuffled_vertices_t* ptr_maj_min_w, vertex_t, weights): # vertex_t or weight_t. Failing to do that will create am empty column of type object # which is not supported by '__cuda_array_interface__' if shuffled_major_series is None: - shuffled_major_series = cudf.Series(dtype=vertex_t) + shuffled_df['major_vertices'] = cudf.Series(dtype=vertex_t) else: shuffled_df['major_vertices']= shuffled_major_series if shuffled_minor_series is None: - shuffled_minor_series = cudf.Series(dtype=vertex_t) + shuffled_df['minor_vertices'] = cudf.Series(dtype=vertex_t) else: shuffled_df['minor_vertices']= shuffled_minor_series