diff --git a/CHANGELOG.md b/CHANGELOG.md index d9bc1c0028e..fdc160667f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,6 +58,7 @@ - PR #1034 Expose resolution (gamma) parameter in Louvain - PR #1037 Centralize test main function and replace usage of deprecated `cnmem_memory_resource` - PR #1041 Use S3 bucket directly for benchmark plugin +- PR #1062 Compute max_vertex_id in mnmg local data computation - PR #1068 Remove unused thirdparty code ## Bug Fixes diff --git a/python/cugraph/dask/common/input_utils.py b/python/cugraph/dask/common/input_utils.py index 8a8590957b9..26201c835d2 100644 --- a/python/cugraph/dask/common/input_utils.py +++ b/python/cugraph/dask/common/input_utils.py @@ -55,6 +55,7 @@ def __init__(self, gpu_futures=None, workers=None, self.multiple = multiple self.worker_info = None self.total_rows = None + self.max_vertex_id = None self.ranks = None self.parts_to_sizes = None self.local_data = None @@ -148,6 +149,7 @@ def calculate_local_data(self, comms, by): _local_data_dict = self.client.compute(local_data, sync=True) local_data_dict = {'edges': [], 'offsets': [], 'verts': []} + max_vid = 0 for rank in range(len(_local_data_dict)): data = _local_data_dict[rank] local_data_dict['edges'].append(data[0]) @@ -158,6 +160,8 @@ def calculate_local_data(self, comms, by): local_offset = prev_data[1] + 1 local_data_dict['offsets'].append(local_offset) local_data_dict['verts'].append(data[1] - local_offset + 1) + if data[2] > max_vid: + max_vid = data[2] import numpy as np local_data_dict['edges'] = np.array(local_data_dict['edges'], @@ -167,6 +171,7 @@ def calculate_local_data(self, comms, by): local_data_dict['verts'] = np.array(local_data_dict['verts'], dtype=np.int32) self.local_data = local_data_dict + self.max_vertex_id = max_vid """ Internal methods, API subject to change """ @@ -196,8 +201,9 @@ def get_obj(x): return x[0] if multiple else x def _get_local_data(df, by): df = df[0] num_local_edges = len(df) - local_max = df[by].iloc[-1] - return num_local_edges, local_max + local_by_max = df[by].iloc[-1] + local_max = df[['src', 'dst']].max().max() + return num_local_edges, local_by_max, local_max def get_local_data(input_graph, by, load_balance=True):