From 895f14e6d1fd22b87941b886198b6365000e5932 Mon Sep 17 00:00:00 2001 From: Kumar Aatish Date: Fri, 17 Jul 2020 13:13:18 -0400 Subject: [PATCH 01/11] WIP BFS Dask PR --- python/cugraph/dask/__init__.py | 1 + python/cugraph/dask/bfs/__init__.py | 1 + python/cugraph/dask/bfs/bfs.py | 46 ++++++++ python/cugraph/dask/opg_bfs/__init__.py | 0 python/cugraph/dask/opg_bfs/bfs.py | 102 ++++++++++++++++++ python/cugraph/opg/traversal/__init__.py | 17 +++ python/cugraph/opg/traversal/mg_bfs.pxd | 30 ++++++ .../cugraph/opg/traversal/mg_bfs_wrapper.pyx | 94 ++++++++++++++++ 8 files changed, 291 insertions(+) create mode 100644 python/cugraph/dask/bfs/__init__.py create mode 100644 python/cugraph/dask/bfs/bfs.py create mode 100644 python/cugraph/dask/opg_bfs/__init__.py create mode 100644 python/cugraph/dask/opg_bfs/bfs.py create mode 100644 python/cugraph/opg/traversal/__init__.py create mode 100644 python/cugraph/opg/traversal/mg_bfs.pxd create mode 100644 python/cugraph/opg/traversal/mg_bfs_wrapper.pyx diff --git a/python/cugraph/dask/__init__.py b/python/cugraph/dask/__init__.py index 0e4a5058638..bfb3dcbf8c9 100644 --- a/python/cugraph/dask/__init__.py +++ b/python/cugraph/dask/__init__.py @@ -12,4 +12,5 @@ # limitations under the License. from .opg_pagerank.pagerank import pagerank +from .opg_bfs.bfs import bfs from .common.read_utils import get_chunksize diff --git a/python/cugraph/dask/bfs/__init__.py b/python/cugraph/dask/bfs/__init__.py new file mode 100644 index 00000000000..fc7dfaa163a --- /dev/null +++ b/python/cugraph/dask/bfs/__init__.py @@ -0,0 +1 @@ +from .bfs import bfs, get_chunksize, read_split_csv, drop_duplicates diff --git a/python/cugraph/dask/bfs/bfs.py b/python/cugraph/dask/bfs/bfs.py new file mode 100644 index 00000000000..1a871375fb6 --- /dev/null +++ b/python/cugraph/dask/bfs/bfs.py @@ -0,0 +1,46 @@ +def bfs(edge_list, start, return_distances=False): + """ + Find the distances and predecessors for a breadth first traversal of a + graph. + The input edge list should be provided in dask-cudf dataframe + with one partition per GPU. + + Parameters + ---------- + edge_list : dask_cudf.DataFrame + Contain the connectivity information as an edge list. + Source 'src' and destination 'dst' columns must be of type 'int32'. + Edge weights are not used for this algorithm. + Indices must be in the range [0, V-1], where V is the global number + of vertices. + start : Integer + The index of the graph vertex from which the traversal begins + + return_distances : bool, optional, default=False + Indicates if distances should be returned + + Returns + ------- + df : cudf.DataFrame + df['vertex'][i] gives the vertex id of the i'th vertex + + df['distance'][i] gives the path distance for the i'th vertex from the + starting vertex + + df['predecessor'][i] gives for the i'th vertex the vertex it was + reached from in the traversal + + Examples + -------- + >>> import dask_cugraph.bfs as dcg + >>> chunksize = dcg.get_chunksize(edge_list.csv) + >>> ddf_edge_list = dask_cudf.read_csv(edge_list.csv, + >>> chunksize = chunksize, + >>> delimiter='\t', + >>> names=['src', 'dst'], + >>> dtype=['int32', 'int32']) + >>> pr = dcg.bfs(ddf_edge_list, start=0, return_distances=False) + """ + + raise Exception("mg_bfs currently disabled... " + "new OPG version coming soon") diff --git a/python/cugraph/dask/opg_bfs/__init__.py b/python/cugraph/dask/opg_bfs/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/python/cugraph/dask/opg_bfs/bfs.py b/python/cugraph/dask/opg_bfs/bfs.py new file mode 100644 index 00000000000..8c034f15486 --- /dev/null +++ b/python/cugraph/dask/opg_bfs/bfs.py @@ -0,0 +1,102 @@ +# Copyright (c) 2019-2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from dask.distributed import wait, default_client +from cugraph.dask.common.input_utils import get_local_data +from cugraph.opg.traversal import mg_bfs_wrapper as mg_bfs +import cugraph.comms.comms as Comms +import warnings + + +def call_bfs(sID, data, local_data, start, return_distances): + wid = Comms.get_worker_id(sID) + handle = Comms.get_handle(sID) + return mg_bfs.mg_bfs(data[0], + local_data, + wid, + handle, + start, + return_distances) + + +def bfs(input_graph, + start, + return_distances=False): + + """ + Find the PageRank values for each vertex in a graph using multiple GPUs. + cuGraph computes an approximation of the Pagerank using the power method. + The input graph must contain edge list as dask-cudf dataframe with + one partition per GPU. + + Parameters + ---------- + edge_list : dask_cudf.DataFrame + Contain the connectivity information as an edge list. + Source 'src' and destination 'dst' columns must be of type 'int32'. + Edge weights are not used for this algorithm. + Indices must be in the range [0, V-1], where V is the global number + of vertices. + start : Integer + The index of the graph vertex from which the traversal begins + + return_sp_counter : bool, optional, default=False + Indicates if shortest path counters should be returned + + Returns + ------- + df : cudf.DataFrame + df['vertex'][i] gives the vertex id of the i'th vertex + + df['distance'][i] gives the path distance for the i'th vertex from the + starting vertex + + df['predecessor'][i] gives for the i'th vertex the vertex it was + reached from in the traversal + + Examples + -------- + >>> import cugraph.dask as dcg + >>> chunksize = dcg.get_chunksize(input_data_path) + >>> ddf = dask_cudf.read_csv(input_data_path, chunksize=chunksize, + delimiter=' ', + names=['src', 'dst', 'value'], + dtype=['int32', 'int32', 'float32']) + >>> dg = cugraph.DiGraph() + >>> dg.from_dask_cudf_edgelist(ddf) + >>> df = dcg.bfs(dg, 0) + """ + + client = default_client() + + if(input_graph.local_data is not None and + input_graph.local_data['by'] == 'src'): + data = input_graph.local_data['data'] + else: + data = get_local_data(input_graph, by='src') + + result = dict([(data.worker_info[wf[0]]["rank"], + client.submit( + call_bfs, + Comms.get_session_id(), + wf[1], + data.local_data, + start, + return_distances, + workers=[wf[0]])) + for idx, wf in enumerate(data.worker_to_parts.items())]) + wait(result) + + return result[0].result() diff --git a/python/cugraph/opg/traversal/__init__.py b/python/cugraph/opg/traversal/__init__.py new file mode 100644 index 00000000000..7eb1ab5e302 --- /dev/null +++ b/python/cugraph/opg/traversal/__init__.py @@ -0,0 +1,17 @@ +# +# Copyright (c) 2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from cugraph.opg.traversal.mg_bfs_wrapper import mg_bfs diff --git a/python/cugraph/opg/traversal/mg_bfs.pxd b/python/cugraph/opg/traversal/mg_bfs.pxd new file mode 100644 index 00000000000..8b9e8c1c81f --- /dev/null +++ b/python/cugraph/opg/traversal/mg_bfs.pxd @@ -0,0 +1,30 @@ +# +# Copyright (c) 2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from cugraph.structure.graph_new cimport * +from libcpp cimport bool + + +cdef extern from "algorithms.hpp" namespace "cugraph": + + cdef void bfs[VT,ET,WT]( + const handle_t &handle, + const GraphCSRView[VT,ET,WT] &graph, + VT *distances, + VT *predecessors, + double *sp_counters, + const VT start_vertex, + bool directed) except + diff --git a/python/cugraph/opg/traversal/mg_bfs_wrapper.pyx b/python/cugraph/opg/traversal/mg_bfs_wrapper.pyx new file mode 100644 index 00000000000..5b27a5174c8 --- /dev/null +++ b/python/cugraph/opg/traversal/mg_bfs_wrapper.pyx @@ -0,0 +1,94 @@ +# +# Copyright (c) 2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from cugraph.structure.utils_wrapper import * +from cugraph.opg.traversal cimport mg_bfs as c_bfs +import cudf +from cugraph.structure.graph_new cimport * +import cugraph.structure.graph_new_wrapper as graph_new_wrapper +from libc.stdint cimport uintptr_t + +def mg_bfs(input_df, local_data, rank, handle, start, return_distances=False): + """ + Call pagerank + """ + + cdef size_t handle_size_t = handle.getHandle() + handle_ = handle_size_t + + # Local COO information + src = input_df['src'] + dst = input_df['dst'] + num_verts = local_data['verts'].sum() + num_edges = local_data['edges'].sum() + local_offset = local_data['offsets'][rank] + dst = dst - local_offset + num_local_verts = local_data['verts'][rank] + num_local_edges = len(src) + + # Convert to local CSR + [src, dst] = graph_new_wrapper.datatype_cast([src, dst], [np.int32]) + _offsets, indices, weights = coo2csr(src, dst, None) + offsets = _offsets[:num_local_verts + 1] + del _offsets + + # Pointers required for CSR Graph + cdef uintptr_t c_offsets_ptr = offsets.__cuda_array_interface__['data'][0] + cdef uintptr_t c_indices_ptr = indices.__cuda_array_interface__['data'][0] + + # Generate the cudf.DataFrame result + df = cudf.DataFrame() + df['vertex'] = cudf.Series(np.zeros(num_verts, dtype=np.int32)) + df['predecessor'] = cudf.Series(np.zeros(num_verts, dtype=np.int32)) + if (return_distances): + df['distance'] = cudf.Series(np.zeros(num_verts, dtype=np.int32)) + + # Associate to cudf Series + cdef uintptr_t c_identifier_ptr = df['vertex'].__cuda_array_interface__['data'][0]; + cdef uintptr_t c_distance_ptr = NULL # Pointer to the DataFrame 'distance' Series + cdef uintptr_t c_predecessor_ptr = df['predecessor'].__cuda_array_interface__['data'][0]; + if (return_distances): + c_distance_ptr = df['distance'].__cuda_array_interface__['data'][0] + + # Extract local data + cdef uintptr_t c_local_verts = local_data['verts'].__array_interface__['data'][0] + cdef uintptr_t c_local_edges = local_data['edges'].__array_interface__['data'][0] + cdef uintptr_t c_local_offsets = local_data['offsets'].__array_interface__['data'][0] + if (return_distances): + df['distance'] = cudf.Series(np.zeros(num_verts, dtype=np.int32)) + + # BFS + cdef GraphCSRView[int,int,float] graph + graph= GraphCSRView[int, int, float]( c_offsets_ptr, + c_indices_ptr, + NULL, + num_verts, + num_local_edges) + graph.set_local_data(c_local_verts, c_local_edges, c_local_offsets) + graph.set_handle(handle_) + graph.get_vertex_identifiers(c_identifier_ptr) + + cdef bool direction = 1 + # MG BFS path assumes directed is true + c_bfs.bfs[int, int, float](handle_[0], + graph, + c_distance_ptr, + c_predecessor_ptr, + NULL, + start, + direction) + + return df From 269d3f992b3d51757a5315eea8fcd83ec02d14c5 Mon Sep 17 00:00:00 2001 From: Kumar Aatish Date: Fri, 17 Jul 2020 13:14:39 -0400 Subject: [PATCH 02/11] CHANGELOG fix --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b20d184804b..fe1dade868e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - PR #964 OPG BFS (CUDA) - PR #990 MG Consolidation - PR #993 Add persistent Handle for Comms +- PR #1010 OPG BFS (dask) ## Improvements - PR #898 Add Edge Betweenness Centrality, and endpoints to BC From ed94d56a28c39dfc04b66eb1891fed84b56494c6 Mon Sep 17 00:00:00 2001 From: Kumar Aatish Date: Tue, 21 Jul 2020 10:22:38 -0400 Subject: [PATCH 03/11] Pytest for opg bfs --- cpp/src/traversal/opg/bfs.cuh | 2 +- python/cugraph/dask/bfs/__init__.py | 1 - python/cugraph/dask/bfs/bfs.py | 46 ------------- python/cugraph/dask/opg_bfs/bfs.py | 1 - .../cugraph/opg/traversal/mg_bfs_wrapper.pyx | 4 +- python/cugraph/tests/dask/test_opg_bfs.py | 65 +++++++++++++++++++ .../cugraph/tests/dask/test_opg_pagerank.py | 2 + 7 files changed, 69 insertions(+), 52 deletions(-) delete mode 100644 python/cugraph/dask/bfs/__init__.py delete mode 100644 python/cugraph/dask/bfs/bfs.py create mode 100644 python/cugraph/tests/dask/test_opg_bfs.py diff --git a/cpp/src/traversal/opg/bfs.cuh b/cpp/src/traversal/opg/bfs.cuh index e19fa83470e..c8295b8971a 100644 --- a/cpp/src/traversal/opg/bfs.cuh +++ b/cpp/src/traversal/opg/bfs.cuh @@ -66,7 +66,7 @@ void bfs(raft::handle_t const &handle, } // BFS communications wrapper - BFSCommunicatorIterativeBCastReduce bfs_comm(handle, word_count); + BFSCommunicatorBCastReduce bfs_comm(handle, word_count); // 0. 'Insert' starting vertex in the input frontier input_frontier[start_vertex / BitsPWrd] = static_cast(1) diff --git a/python/cugraph/dask/bfs/__init__.py b/python/cugraph/dask/bfs/__init__.py deleted file mode 100644 index fc7dfaa163a..00000000000 --- a/python/cugraph/dask/bfs/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .bfs import bfs, get_chunksize, read_split_csv, drop_duplicates diff --git a/python/cugraph/dask/bfs/bfs.py b/python/cugraph/dask/bfs/bfs.py deleted file mode 100644 index 1a871375fb6..00000000000 --- a/python/cugraph/dask/bfs/bfs.py +++ /dev/null @@ -1,46 +0,0 @@ -def bfs(edge_list, start, return_distances=False): - """ - Find the distances and predecessors for a breadth first traversal of a - graph. - The input edge list should be provided in dask-cudf dataframe - with one partition per GPU. - - Parameters - ---------- - edge_list : dask_cudf.DataFrame - Contain the connectivity information as an edge list. - Source 'src' and destination 'dst' columns must be of type 'int32'. - Edge weights are not used for this algorithm. - Indices must be in the range [0, V-1], where V is the global number - of vertices. - start : Integer - The index of the graph vertex from which the traversal begins - - return_distances : bool, optional, default=False - Indicates if distances should be returned - - Returns - ------- - df : cudf.DataFrame - df['vertex'][i] gives the vertex id of the i'th vertex - - df['distance'][i] gives the path distance for the i'th vertex from the - starting vertex - - df['predecessor'][i] gives for the i'th vertex the vertex it was - reached from in the traversal - - Examples - -------- - >>> import dask_cugraph.bfs as dcg - >>> chunksize = dcg.get_chunksize(edge_list.csv) - >>> ddf_edge_list = dask_cudf.read_csv(edge_list.csv, - >>> chunksize = chunksize, - >>> delimiter='\t', - >>> names=['src', 'dst'], - >>> dtype=['int32', 'int32']) - >>> pr = dcg.bfs(ddf_edge_list, start=0, return_distances=False) - """ - - raise Exception("mg_bfs currently disabled... " - "new OPG version coming soon") diff --git a/python/cugraph/dask/opg_bfs/bfs.py b/python/cugraph/dask/opg_bfs/bfs.py index 8c034f15486..7132be92fe8 100644 --- a/python/cugraph/dask/opg_bfs/bfs.py +++ b/python/cugraph/dask/opg_bfs/bfs.py @@ -17,7 +17,6 @@ from cugraph.dask.common.input_utils import get_local_data from cugraph.opg.traversal import mg_bfs_wrapper as mg_bfs import cugraph.comms.comms as Comms -import warnings def call_bfs(sID, data, local_data, start, return_distances): diff --git a/python/cugraph/opg/traversal/mg_bfs_wrapper.pyx b/python/cugraph/opg/traversal/mg_bfs_wrapper.pyx index 5b27a5174c8..44d7050f49a 100644 --- a/python/cugraph/opg/traversal/mg_bfs_wrapper.pyx +++ b/python/cugraph/opg/traversal/mg_bfs_wrapper.pyx @@ -35,7 +35,7 @@ def mg_bfs(input_df, local_data, rank, handle, start, return_distances=False): num_verts = local_data['verts'].sum() num_edges = local_data['edges'].sum() local_offset = local_data['offsets'][rank] - dst = dst - local_offset + src = src - local_offset num_local_verts = local_data['verts'][rank] num_local_edges = len(src) @@ -67,8 +67,6 @@ def mg_bfs(input_df, local_data, rank, handle, start, return_distances=False): cdef uintptr_t c_local_verts = local_data['verts'].__array_interface__['data'][0] cdef uintptr_t c_local_edges = local_data['edges'].__array_interface__['data'][0] cdef uintptr_t c_local_offsets = local_data['offsets'].__array_interface__['data'][0] - if (return_distances): - df['distance'] = cudf.Series(np.zeros(num_verts, dtype=np.int32)) # BFS cdef GraphCSRView[int,int,float] graph diff --git a/python/cugraph/tests/dask/test_opg_bfs.py b/python/cugraph/tests/dask/test_opg_bfs.py new file mode 100644 index 00000000000..4dc278b6396 --- /dev/null +++ b/python/cugraph/tests/dask/test_opg_bfs.py @@ -0,0 +1,65 @@ +# Copyright (c) 2020, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import cugraph.dask as dcg +import cugraph.comms as Comms +from dask.distributed import Client +import gc +import cugraph +import dask_cudf +import cudf +from dask_cuda import LocalCUDACluster + + +def test_dask_pagerank(): + gc.collect() + cluster = LocalCUDACluster() + client = Client(cluster) + Comms.initialize() + + input_data_path = r"../datasets/karate.csv" + chunksize = dcg.get_chunksize(input_data_path) + + ddf = dask_cudf.read_csv(input_data_path, chunksize=chunksize, + delimiter=' ', + names=['src', 'dst', 'value'], + dtype=['int32', 'int32', 'float32']) + + df = cudf.read_csv(input_data_path, + delimiter=' ', + names=['src', 'dst', 'value'], + dtype=['int32', 'int32', 'float32']) + + g = cugraph.DiGraph() + g.from_cudf_edgelist(df, 'src', 'dst') + + dg = cugraph.DiGraph() + dg.from_dask_cudf_edgelist(ddf) + + # Pre compute local data + # dg.compute_local_data(by='dst') + + expected_dist = cugraph.bfs(g, 0) + result_dist = dcg.bfs(dg, 0, True) + + err = 0 + + assert len(expected_dist) == len(result_dist) + for i in range(len(result_dist)): + if(result_dist['distance'].iloc[i] != expected_dist['distance'].iloc[i]): + err = err + 1 + assert err == 0 + + Comms.destroy() + client.close() + cluster.close() diff --git a/python/cugraph/tests/dask/test_opg_pagerank.py b/python/cugraph/tests/dask/test_opg_pagerank.py index 127dbc83ae2..b7ad188d7ba 100644 --- a/python/cugraph/tests/dask/test_opg_pagerank.py +++ b/python/cugraph/tests/dask/test_opg_pagerank.py @@ -49,8 +49,10 @@ def test_dask_pagerank(): # Pre compute local data # dg.compute_local_data(by='dst') + print("Start call") expected_pr = cugraph.pagerank(g) result_pr = dcg.pagerank(dg) + print("End call") err = 0 tol = 1.0e-05 From 124b970a17d131fb857c724f009f2fbc7c5d7dca Mon Sep 17 00:00:00 2001 From: Ishika Roy Date: Wed, 22 Jul 2020 10:17:39 -0500 Subject: [PATCH 04/11] fix local vert and offset calculation --- CHANGELOG.md | 1 + python/cugraph/dask/common/input_utils.py | 14 +++++++++----- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 513e034aa0e..b4694932bbd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ - PR #992 Fix unrenumber of predecessor - PR #1008 Fix for cudf updates disabling iteration of Series/Columns/Index - PR #1012 Fix Local build script README +- PR #1017 Fix more mg bugs # cuGraph 0.14.0 (03 Jun 2020) diff --git a/python/cugraph/dask/common/input_utils.py b/python/cugraph/dask/common/input_utils.py index e3f7a4b1899..ab038507b78 100644 --- a/python/cugraph/dask/common/input_utils.py +++ b/python/cugraph/dask/common/input_utils.py @@ -151,8 +151,13 @@ def calculate_local_data(self, comms, by): for rank in range(len(_local_data_dict)): data = _local_data_dict[rank] local_data_dict['edges'].append(data[0]) - local_data_dict['offsets'].append(data[1]) - local_data_dict['verts'].append(data[2]) + if rank == 0: + local_offset = 0 + else: + prev_data = _local_data_dict[rank-1] + local_offset = prev_data[1] + 1 + local_data_dict['offsets'].append(local_offset) + local_data_dict['verts'].append(data[1] - local_offset + 1) import numpy as np local_data_dict['edges'] = np.array(local_data_dict['edges'], @@ -191,9 +196,8 @@ def get_obj(x): return x[0] if multiple else x def _get_local_data(df, by): df = df[0] num_local_edges = len(df) - local_offset = df[by].min() - num_local_verts = df[by].max() - local_offset + 1 - return num_local_edges, local_offset, num_local_verts + local_max = df[by].iloc[-1] + return num_local_edges, local_max def get_local_data(input_graph, by, load_balance=True): From d23a5dd1f9ab4566e03887de28bbe6c5aaba552d Mon Sep 17 00:00:00 2001 From: Ishika Roy Date: Thu, 23 Jul 2020 00:13:43 -0500 Subject: [PATCH 05/11] update from_dask_cudf_edgelist api --- python/cugraph/dask/opg_pagerank/pagerank.py | 3 ++- python/cugraph/structure/graph.py | 14 +++++++++++++- python/cugraph/tests/dask/opg_utility_testing.py | 3 ++- python/cugraph/tests/dask/test_opg_comms.py | 4 ++-- python/cugraph/tests/dask/test_opg_degree.py | 2 +- python/cugraph/tests/dask/test_opg_pagerank.py | 2 +- 6 files changed, 21 insertions(+), 7 deletions(-) diff --git a/python/cugraph/dask/opg_pagerank/pagerank.py b/python/cugraph/dask/opg_pagerank/pagerank.py index 90a785251f0..1a04e79aba7 100644 --- a/python/cugraph/dask/opg_pagerank/pagerank.py +++ b/python/cugraph/dask/opg_pagerank/pagerank.py @@ -92,7 +92,8 @@ def pagerank(input_graph, names=['src', 'dst', 'value'], dtype=['int32', 'int32', 'float32']) >>> dg = cugraph.DiGraph() - >>> dg.from_dask_cudf_edgelist(ddf) + >>> dg.from_dask_cudf_edgelist(ddf, source='src', destination='dst', + edge_attr='value') >>> pr = dcg.pagerank(dg) """ diff --git a/python/cugraph/structure/graph.py b/python/cugraph/structure/graph.py index 9ba78e9c266..5a66cb41763 100644 --- a/python/cugraph/structure/graph.py +++ b/python/cugraph/structure/graph.py @@ -233,7 +233,9 @@ def add_edge_list(self, source, destination, value=None): else: self.from_cudf_edgelist(input_df) - def from_dask_cudf_edgelist(self, input_ddf): + def from_dask_cudf_edgelist(self, input_ddf, source='source', + destination='destination', + edge_attr=None, renumber=True): """ Initializes the distributed graph from the dask_cudf.DataFrame edgelist. Renumbering and undirected Graphs are not currently @@ -242,6 +244,12 @@ def from_dask_cudf_edgelist(self, input_ddf): ---------- input_ddf : dask_cudf.DataFrame The edgelist as a dask_cudf.DataFrame + source : str + source argument is source column name + destination : str + destination argument is destination column name. + edge_attr : str + edge_attr argument is the weights column name. """ if self.edgelist is not None or self.adjlist is not None: raise Exception('Graph already has values') @@ -250,6 +258,10 @@ def from_dask_cudf_edgelist(self, input_ddf): if isinstance(input_ddf, dask_cudf.DataFrame): self.distributed = True self.local_data = None + rename_map = {source: 'src', destination: 'dst'} + if edge_attr is not None: + rename_map[edge_attr] = 'weights' + input_ddf = input_ddf.rename(columns=rename_map) self.edgelist = self.EdgeList(input_ddf) else: raise Exception('input should be a dask_cudf dataFrame') diff --git a/python/cugraph/tests/dask/opg_utility_testing.py b/python/cugraph/tests/dask/opg_utility_testing.py index aef81fb53f5..2819d3b3253 100644 --- a/python/cugraph/tests/dask/opg_utility_testing.py +++ b/python/cugraph/tests/dask/opg_utility_testing.py @@ -20,7 +20,8 @@ def test_compute_local_data(): dtype=['int32', 'int32', 'float32']) dg = cugraph.DiGraph() - dg.from_dask_cudf_edgelist(ddf) + dg.from_dask_cudf_edgelist(ddf, source='src', destination='dst', + edge_attr='value') # Compute_local_data dg.compute_local_data(by='dst') diff --git a/python/cugraph/tests/dask/test_opg_comms.py b/python/cugraph/tests/dask/test_opg_comms.py index dbb192f8bd8..93d30813343 100644 --- a/python/cugraph/tests/dask/test_opg_comms.py +++ b/python/cugraph/tests/dask/test_opg_comms.py @@ -42,7 +42,7 @@ def test_dask_pagerank(): dtype=['int32', 'int32', 'float32']) dg1 = cugraph.DiGraph() - dg1.from_dask_cudf_edgelist(ddf1) + dg1.from_dask_cudf_edgelist(ddf1, 'src', 'dst') result_pr1 = dcg.pagerank(dg1) ddf2 = dask_cudf.read_csv(input_data_path2, chunksize=chunksize2, @@ -51,7 +51,7 @@ def test_dask_pagerank(): dtype=['int32', 'int32', 'float32']) dg2 = cugraph.DiGraph() - dg2.from_dask_cudf_edgelist(ddf2) + dg2.from_dask_cudf_edgelist(ddf2, 'src', 'dst') result_pr2 = dcg.pagerank(dg2) # Calculate single GPU pagerank for verification of results diff --git a/python/cugraph/tests/dask/test_opg_degree.py b/python/cugraph/tests/dask/test_opg_degree.py index 7fb9078feb7..38a6900e290 100644 --- a/python/cugraph/tests/dask/test_opg_degree.py +++ b/python/cugraph/tests/dask/test_opg_degree.py @@ -31,7 +31,7 @@ def test_dask_opg_degree(): dtype=['int32', 'int32', 'float32']) dg = cugraph.DiGraph() - dg.from_dask_cudf_edgelist(ddf) + dg.from_dask_cudf_edgelist(ddf, 'src', 'dst') g = cugraph.DiGraph() g.from_cudf_edgelist(df, 'src', 'dst') diff --git a/python/cugraph/tests/dask/test_opg_pagerank.py b/python/cugraph/tests/dask/test_opg_pagerank.py index 99db0338b56..76862e4de4c 100644 --- a/python/cugraph/tests/dask/test_opg_pagerank.py +++ b/python/cugraph/tests/dask/test_opg_pagerank.py @@ -44,7 +44,7 @@ def test_dask_pagerank(): g.from_cudf_edgelist(df, 'src', 'dst') dg = cugraph.DiGraph() - dg.from_dask_cudf_edgelist(ddf) + dg.from_dask_cudf_edgelist(ddf, 'src', 'dst') # Pre compute local data # dg.compute_local_data(by='dst') From ec2bc43dc0dc4ef7de32419ddf16fc2a85c9d920 Mon Sep 17 00:00:00 2001 From: Ishika Roy Date: Tue, 28 Jul 2020 17:04:44 -0500 Subject: [PATCH 06/11] flake8 --- python/cugraph/dask/common/input_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/cugraph/dask/common/input_utils.py b/python/cugraph/dask/common/input_utils.py index 7d516e9e72a..ab038507b78 100644 --- a/python/cugraph/dask/common/input_utils.py +++ b/python/cugraph/dask/common/input_utils.py @@ -199,6 +199,7 @@ def _get_local_data(df, by): local_max = df[by].iloc[-1] return num_local_edges, local_max + def get_local_data(input_graph, by, load_balance=True): _ddf = input_graph.edgelist.edgelist_df ddf = _ddf.sort_values(by=by, ignore_index=True) From f4fdc7f090343b45c8a87e11489c15c45ce9f98b Mon Sep 17 00:00:00 2001 From: Ishika Roy Date: Tue, 28 Jul 2020 22:54:04 -0500 Subject: [PATCH 07/11] add comms doc --- python/cugraph/comms/comms.py | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/python/cugraph/comms/comms.py b/python/cugraph/comms/comms.py index 28ce2a3fc1e..642d99440e0 100644 --- a/python/cugraph/comms/comms.py +++ b/python/cugraph/comms/comms.py @@ -9,16 +9,33 @@ # Intialize Comms. If explicit Comms not provided as arg, # default Comms are initialized as per client information. -def initialize(arg=None): +def initialize(comms=None, p2p=False): + """ + Intitializes a communicator for multi-node multi-gpu communications. + It is expected to be called right after client initialization for running + mnmg algorithms. It wraps raft comms that manages underlying NCCL and UCX + comms handles across the workers of a Dask cluster. + It is recommended to also call `destroy()` when the comms are no longer + needed so the underlying resources can be cleaned up. + + Parameters + ---------- + comms : raft Comms + A pre-initialized raft communicator. If provided, this is used for mnmg + communications. + p2p : bool + Initialize UCX endpoints + """ + global __instance if __instance is None: global __default_handle __default_handle = None - if arg is None: - __instance = raftComms() + if comms is None: + __instance = raftComms(comms_p2p=p2p) __instance.init() else: - __instance = arg + __instance = comms else: raise Exception("Communicator is already initialized") @@ -47,6 +64,9 @@ def get_session_id(): # Destroy Comms def destroy(): + """ + Shuts down initialized comms and cleans up resources. + """ global __instance if is_initialized(): __instance.destroy() From b1962f7beb2a3dbf69683124f3f50a61bc729dad Mon Sep 17 00:00:00 2001 From: Kumar Aatish Date: Wed, 29 Jul 2020 10:41:10 -0400 Subject: [PATCH 08/11] Replace OPG with MG --- cpp/include/graph.hpp | 2 +- python/cugraph/dask/__init__.py | 2 +- python/cugraph/dask/{opg_bfs => mg_bfs}/__init__.py | 0 python/cugraph/dask/{opg_bfs => mg_bfs}/bfs.py | 2 +- python/cugraph/{opg => mg}/traversal/__init__.py | 2 +- python/cugraph/{opg => mg}/traversal/mg_bfs.pxd | 0 .../cugraph/{opg => mg}/traversal/mg_bfs_wrapper.pyx | 2 +- .../tests/dask/{test_opg_bfs.py => test_mg_bfs.py} | 10 ++++------ python/cugraph/tests/dask/test_mg_renumber.py | 2 +- 9 files changed, 10 insertions(+), 12 deletions(-) rename python/cugraph/dask/{opg_bfs => mg_bfs}/__init__.py (100%) rename python/cugraph/dask/{opg_bfs => mg_bfs}/bfs.py (98%) rename python/cugraph/{opg => mg}/traversal/__init__.py (91%) rename python/cugraph/{opg => mg}/traversal/mg_bfs.pxd (100%) rename python/cugraph/{opg => mg}/traversal/mg_bfs_wrapper.pyx (98%) rename python/cugraph/tests/dask/{test_opg_bfs.py => test_mg_bfs.py} (88%) diff --git a/cpp/include/graph.hpp b/cpp/include/graph.hpp index 05aeb83bf4e..2e91bb90627 100644 --- a/cpp/include/graph.hpp +++ b/cpp/include/graph.hpp @@ -53,8 +53,8 @@ enum class DegreeDirection { template class GraphViewBase { public: - WT *edge_data; ///< edge weight raft::handle_t *handle; + WT *edge_data; ///< edge weight GraphProperties prop; VT number_of_vertices; diff --git a/python/cugraph/dask/__init__.py b/python/cugraph/dask/__init__.py index f9207363b04..b2644814870 100644 --- a/python/cugraph/dask/__init__.py +++ b/python/cugraph/dask/__init__.py @@ -12,5 +12,5 @@ # limitations under the License. from .mg_pagerank.pagerank import pagerank -from .opg_bfs.bfs import bfs +from .mg_bfs.bfs import bfs from .common.read_utils import get_chunksize diff --git a/python/cugraph/dask/opg_bfs/__init__.py b/python/cugraph/dask/mg_bfs/__init__.py similarity index 100% rename from python/cugraph/dask/opg_bfs/__init__.py rename to python/cugraph/dask/mg_bfs/__init__.py diff --git a/python/cugraph/dask/opg_bfs/bfs.py b/python/cugraph/dask/mg_bfs/bfs.py similarity index 98% rename from python/cugraph/dask/opg_bfs/bfs.py rename to python/cugraph/dask/mg_bfs/bfs.py index 7132be92fe8..88eb715ae23 100644 --- a/python/cugraph/dask/opg_bfs/bfs.py +++ b/python/cugraph/dask/mg_bfs/bfs.py @@ -15,7 +15,7 @@ from dask.distributed import wait, default_client from cugraph.dask.common.input_utils import get_local_data -from cugraph.opg.traversal import mg_bfs_wrapper as mg_bfs +from cugraph.mg.traversal import mg_bfs_wrapper as mg_bfs import cugraph.comms.comms as Comms diff --git a/python/cugraph/opg/traversal/__init__.py b/python/cugraph/mg/traversal/__init__.py similarity index 91% rename from python/cugraph/opg/traversal/__init__.py rename to python/cugraph/mg/traversal/__init__.py index 7eb1ab5e302..62aed127359 100644 --- a/python/cugraph/opg/traversal/__init__.py +++ b/python/cugraph/mg/traversal/__init__.py @@ -14,4 +14,4 @@ # limitations under the License. # -from cugraph.opg.traversal.mg_bfs_wrapper import mg_bfs +from cugraph.mg.traversal.mg_bfs_wrapper import mg_bfs diff --git a/python/cugraph/opg/traversal/mg_bfs.pxd b/python/cugraph/mg/traversal/mg_bfs.pxd similarity index 100% rename from python/cugraph/opg/traversal/mg_bfs.pxd rename to python/cugraph/mg/traversal/mg_bfs.pxd diff --git a/python/cugraph/opg/traversal/mg_bfs_wrapper.pyx b/python/cugraph/mg/traversal/mg_bfs_wrapper.pyx similarity index 98% rename from python/cugraph/opg/traversal/mg_bfs_wrapper.pyx rename to python/cugraph/mg/traversal/mg_bfs_wrapper.pyx index 44d7050f49a..da64b597406 100644 --- a/python/cugraph/opg/traversal/mg_bfs_wrapper.pyx +++ b/python/cugraph/mg/traversal/mg_bfs_wrapper.pyx @@ -15,7 +15,7 @@ # from cugraph.structure.utils_wrapper import * -from cugraph.opg.traversal cimport mg_bfs as c_bfs +from cugraph.mg.traversal cimport mg_bfs as c_bfs import cudf from cugraph.structure.graph_new cimport * import cugraph.structure.graph_new_wrapper as graph_new_wrapper diff --git a/python/cugraph/tests/dask/test_opg_bfs.py b/python/cugraph/tests/dask/test_mg_bfs.py similarity index 88% rename from python/cugraph/tests/dask/test_opg_bfs.py rename to python/cugraph/tests/dask/test_mg_bfs.py index 4dc278b6396..eebd074fd1c 100644 --- a/python/cugraph/tests/dask/test_opg_bfs.py +++ b/python/cugraph/tests/dask/test_mg_bfs.py @@ -41,13 +41,10 @@ def test_dask_pagerank(): dtype=['int32', 'int32', 'float32']) g = cugraph.DiGraph() - g.from_cudf_edgelist(df, 'src', 'dst') + g.from_cudf_edgelist(df, 'src', 'dst', renumber=False) dg = cugraph.DiGraph() - dg.from_dask_cudf_edgelist(ddf) - - # Pre compute local data - # dg.compute_local_data(by='dst') + dg.from_dask_cudf_edgelist(ddf, renumber=False) expected_dist = cugraph.bfs(g, 0) result_dist = dcg.bfs(dg, 0, True) @@ -56,7 +53,8 @@ def test_dask_pagerank(): assert len(expected_dist) == len(result_dist) for i in range(len(result_dist)): - if(result_dist['distance'].iloc[i] != expected_dist['distance'].iloc[i]): + if(result_dist['distance'].iloc[i] != + expected_dist['distance'].iloc[i]): err = err + 1 assert err == 0 diff --git a/python/cugraph/tests/dask/test_mg_renumber.py b/python/cugraph/tests/dask/test_mg_renumber.py index 6ef91d21372..67098cbc253 100644 --- a/python/cugraph/tests/dask/test_mg_renumber.py +++ b/python/cugraph/tests/dask/test_mg_renumber.py @@ -121,7 +121,7 @@ def test_mg_renumber2(graph_file, client_connection): # Test all combinations of default/managed and pooled/non-pooled allocation @pytest.mark.parametrize("graph_file", utils.DATASETS) -def test_opg_renumber3(graph_file, client_connection): +def test_mg_renumber3(graph_file, client_connection): gc.collect() M = utils.read_csv_for_nx(graph_file) From b63a063c9d4e7307d4c336e7ba55526043ca0ee2 Mon Sep 17 00:00:00 2001 From: Kumar Aatish Date: Wed, 29 Jul 2020 13:27:55 -0400 Subject: [PATCH 09/11] PR comment fixes --- CHANGELOG.md | 2 +- python/cugraph/dask/mg_bfs/bfs.py | 46 +++++++++++-------- python/cugraph/tests/dask/test_mg_bfs.py | 4 +- python/cugraph/tests/dask/test_mg_pagerank.py | 2 +- 4 files changed, 32 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 51d8a04655e..fa588224be0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,7 @@ - PR #990 MG Consolidation - PR #993 Add persistent Handle for Comms - PR #979 Add hypergraph implementation to convert DataFrames into Graphs -- PR #1010 OPG BFS (dask) +- PR #1010 MG BFS (dask) - PR #1018 MG personalized pagerank ## Improvements diff --git a/python/cugraph/dask/mg_bfs/bfs.py b/python/cugraph/dask/mg_bfs/bfs.py index 88eb715ae23..659df9429e0 100644 --- a/python/cugraph/dask/mg_bfs/bfs.py +++ b/python/cugraph/dask/mg_bfs/bfs.py @@ -17,6 +17,7 @@ from cugraph.dask.common.input_utils import get_local_data from cugraph.mg.traversal import mg_bfs_wrapper as mg_bfs import cugraph.comms.comms as Comms +import cudf def call_bfs(sID, data, local_data, start, return_distances): @@ -30,29 +31,28 @@ def call_bfs(sID, data, local_data, start, return_distances): return_distances) -def bfs(input_graph, +def bfs(graph, start, return_distances=False): """ - Find the PageRank values for each vertex in a graph using multiple GPUs. - cuGraph computes an approximation of the Pagerank using the power method. + Find the distances and predecessors for a breadth first traversal of a + graph. The input graph must contain edge list as dask-cudf dataframe with one partition per GPU. Parameters ---------- - edge_list : dask_cudf.DataFrame - Contain the connectivity information as an edge list. - Source 'src' and destination 'dst' columns must be of type 'int32'. - Edge weights are not used for this algorithm. - Indices must be in the range [0, V-1], where V is the global number - of vertices. + graph : cugraph.DiGraph + cuGraph graph descriptor, should contain the connectivity information + as dask cudf edge list dataframe(edge weights are not used for this + algorithm). Undirected Graph not currently supported. start : Integer - The index of the graph vertex from which the traversal begins + Specify starting vertex for breadth-first search; this function + iterates over edges in the component reachable from this node. - return_sp_counter : bool, optional, default=False - Indicates if shortest path counters should be returned + return_distances : bool, optional, default=False + Indicates if distances should be returned Returns ------- @@ -60,7 +60,7 @@ def bfs(input_graph, df['vertex'][i] gives the vertex id of the i'th vertex df['distance'][i] gives the path distance for the i'th vertex from the - starting vertex + starting vertex (Only if return_distances is True) df['predecessor'][i] gives for the i'th vertex the vertex it was reached from in the traversal @@ -80,11 +80,14 @@ def bfs(input_graph, client = default_client() - if(input_graph.local_data is not None and - input_graph.local_data['by'] == 'src'): - data = input_graph.local_data['data'] + if(graph.local_data is not None and + graph.local_data['by'] == 'src'): + data = graph.local_data['data'] else: - data = get_local_data(input_graph, by='src') + data = get_local_data(graph, by='src') + + if graph.renumbered: + start = graph.lookup_internal_vertex_id(cudf.Series([start]))[0] result = dict([(data.worker_info[wf[0]]["rank"], client.submit( @@ -98,4 +101,11 @@ def bfs(input_graph, for idx, wf in enumerate(data.worker_to_parts.items())]) wait(result) - return result[0].result() + df = result[0].result() + + if graph.renumbered: + df = graph.unrenumber(df, 'vertex').compute() + df = graph.unrenumber(df, 'predecessor').compute() + df["predecessor"].fillna(-1, inplace=True) + + return df diff --git a/python/cugraph/tests/dask/test_mg_bfs.py b/python/cugraph/tests/dask/test_mg_bfs.py index eebd074fd1c..b7d31d28342 100644 --- a/python/cugraph/tests/dask/test_mg_bfs.py +++ b/python/cugraph/tests/dask/test_mg_bfs.py @@ -21,13 +21,13 @@ from dask_cuda import LocalCUDACluster -def test_dask_pagerank(): +def test_dask_bfs(): gc.collect() cluster = LocalCUDACluster() client = Client(cluster) Comms.initialize() - input_data_path = r"../datasets/karate.csv" + input_data_path = r"../datasets/netscience.csv" chunksize = dcg.get_chunksize(input_data_path) ddf = dask_cudf.read_csv(input_data_path, chunksize=chunksize, diff --git a/python/cugraph/tests/dask/test_mg_pagerank.py b/python/cugraph/tests/dask/test_mg_pagerank.py index 42a721c55e9..c0d3ccf35e6 100644 --- a/python/cugraph/tests/dask/test_mg_pagerank.py +++ b/python/cugraph/tests/dask/test_mg_pagerank.py @@ -67,7 +67,7 @@ def client_connection(): def test_dask_pagerank(client_connection, personalization_perc): gc.collect() - input_data_path = r"../datasets/karate.csv" + input_data_path = r"../datasets/netscience.csv" chunksize = dcg.get_chunksize(input_data_path) ddf = dask_cudf.read_csv(input_data_path, chunksize=chunksize, From b85ccaa6ada33d9bcfba3f4baf34f238946a89e4 Mon Sep 17 00:00:00 2001 From: Kumar Aatish Date: Wed, 29 Jul 2020 16:40:33 -0400 Subject: [PATCH 10/11] MG BFS Dask PR --- cpp/include/graph.hpp | 2 +- cpp/src/traversal/mg/bfs.cuh | 2 +- python/cugraph/dask/__init__.py | 1 + python/cugraph/dask/mg_bfs/__init__.py | 0 python/cugraph/dask/mg_bfs/bfs.py | 112 ++++++++++++++++++ python/cugraph/mg/traversal/__init__.py | 17 +++ python/cugraph/mg/traversal/mg_bfs.pxd | 30 +++++ .../cugraph/mg/traversal/mg_bfs_wrapper.pyx | 92 ++++++++++++++ python/cugraph/tests/dask/test_mg_bfs.py | 66 +++++++++++ python/cugraph/tests/dask/test_mg_pagerank.py | 1 - 10 files changed, 320 insertions(+), 3 deletions(-) create mode 100644 python/cugraph/dask/mg_bfs/__init__.py create mode 100644 python/cugraph/dask/mg_bfs/bfs.py create mode 100644 python/cugraph/mg/traversal/__init__.py create mode 100644 python/cugraph/mg/traversal/mg_bfs.pxd create mode 100644 python/cugraph/mg/traversal/mg_bfs_wrapper.pyx create mode 100644 python/cugraph/tests/dask/test_mg_bfs.py diff --git a/cpp/include/graph.hpp b/cpp/include/graph.hpp index 05aeb83bf4e..2e91bb90627 100644 --- a/cpp/include/graph.hpp +++ b/cpp/include/graph.hpp @@ -53,8 +53,8 @@ enum class DegreeDirection { template class GraphViewBase { public: - WT *edge_data; ///< edge weight raft::handle_t *handle; + WT *edge_data; ///< edge weight GraphProperties prop; VT number_of_vertices; diff --git a/cpp/src/traversal/mg/bfs.cuh b/cpp/src/traversal/mg/bfs.cuh index f3045797253..454c892099e 100644 --- a/cpp/src/traversal/mg/bfs.cuh +++ b/cpp/src/traversal/mg/bfs.cuh @@ -66,7 +66,7 @@ void bfs(raft::handle_t const &handle, } // BFS communications wrapper - BFSCommunicatorIterativeBCastReduce bfs_comm(handle, word_count); + BFSCommunicatorBCastReduce bfs_comm(handle, word_count); // 0. 'Insert' starting vertex in the input frontier input_frontier[start_vertex / BitsPWrd] = static_cast(1) diff --git a/python/cugraph/dask/__init__.py b/python/cugraph/dask/__init__.py index 9d7aebf42fb..b2644814870 100644 --- a/python/cugraph/dask/__init__.py +++ b/python/cugraph/dask/__init__.py @@ -12,4 +12,5 @@ # limitations under the License. from .mg_pagerank.pagerank import pagerank +from .mg_bfs.bfs import bfs from .common.read_utils import get_chunksize diff --git a/python/cugraph/dask/mg_bfs/__init__.py b/python/cugraph/dask/mg_bfs/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/python/cugraph/dask/mg_bfs/bfs.py b/python/cugraph/dask/mg_bfs/bfs.py new file mode 100644 index 00000000000..5f6576db7b1 --- /dev/null +++ b/python/cugraph/dask/mg_bfs/bfs.py @@ -0,0 +1,112 @@ +# Copyright (c) 2019-2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from dask.distributed import wait, default_client +from cugraph.dask.common.input_utils import get_local_data +from cugraph.mg.traversal import mg_bfs_wrapper as mg_bfs +import cugraph.comms.comms as Comms +import cudf + + +def call_bfs(sID, data, local_data, start, return_distances): + wid = Comms.get_worker_id(sID) + handle = Comms.get_handle(sID) + return mg_bfs.mg_bfs(data[0], + local_data, + wid, + handle, + start, + return_distances) + + +def bfs(graph, + start, + return_distances=False): + + """ + Find the distances and predecessors for a breadth first traversal of a + graph. + The input graph must contain edge list as dask-cudf dataframe with + one partition per GPU. + + Parameters + ---------- + graph : cugraph.DiGraph + cuGraph graph descriptor, should contain the connectivity information + as dask cudf edge list dataframe(edge weights are not used for this + algorithm). Undirected Graph not currently supported. + start : Integer + Specify starting vertex for breadth-first search; this function + iterates over edges in the component reachable from this node. + + return_distances : bool, optional, default=False + Indicates if distances should be returned + + Returns + ------- + df : cudf.DataFrame + df['vertex'][i] gives the vertex id of the i'th vertex + + df['distance'][i] gives the path distance for the i'th vertex from the + starting vertex (Only if return_distances is True) + + df['predecessor'][i] gives for the i'th vertex the vertex it was + reached from in the traversal + + Examples + -------- + >>> import cugraph.dask as dcg + >>> chunksize = dcg.get_chunksize(input_data_path) + >>> ddf = dask_cudf.read_csv(input_data_path, chunksize=chunksize, + delimiter=' ', + names=['src', 'dst', 'value'], + dtype=['int32', 'int32', 'float32']) + >>> dg = cugraph.DiGraph() + >>> dg.from_dask_cudf_edgelist(ddf) + >>> df = dcg.bfs(dg, 0) + """ + + client = default_client() + + if(graph.local_data is not None and + graph.local_data['by'] == 'src'): + data = graph.local_data['data'] + else: + data = get_local_data(graph, by='src') + + if graph.renumbered: + start = graph.lookup_internal_vertex_id(cudf.Series([start])).compute() + start = start.iloc[0] + + result = dict([(data.worker_info[wf[0]]["rank"], + client.submit( + call_bfs, + Comms.get_session_id(), + wf[1], + data.local_data, + start, + return_distances, + workers=[wf[0]])) + for idx, wf in enumerate(data.worker_to_parts.items())]) + wait(result) + + df = result[0].result() + + if graph.renumbered: + df = graph.unrenumber(df, 'vertex').compute() + df = graph.unrenumber(df, 'predecessor').compute() + df["predecessor"].fillna(-1, inplace=True) + + return df diff --git a/python/cugraph/mg/traversal/__init__.py b/python/cugraph/mg/traversal/__init__.py new file mode 100644 index 00000000000..62aed127359 --- /dev/null +++ b/python/cugraph/mg/traversal/__init__.py @@ -0,0 +1,17 @@ +# +# Copyright (c) 2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from cugraph.mg.traversal.mg_bfs_wrapper import mg_bfs diff --git a/python/cugraph/mg/traversal/mg_bfs.pxd b/python/cugraph/mg/traversal/mg_bfs.pxd new file mode 100644 index 00000000000..8b9e8c1c81f --- /dev/null +++ b/python/cugraph/mg/traversal/mg_bfs.pxd @@ -0,0 +1,30 @@ +# +# Copyright (c) 2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from cugraph.structure.graph_new cimport * +from libcpp cimport bool + + +cdef extern from "algorithms.hpp" namespace "cugraph": + + cdef void bfs[VT,ET,WT]( + const handle_t &handle, + const GraphCSRView[VT,ET,WT] &graph, + VT *distances, + VT *predecessors, + double *sp_counters, + const VT start_vertex, + bool directed) except + diff --git a/python/cugraph/mg/traversal/mg_bfs_wrapper.pyx b/python/cugraph/mg/traversal/mg_bfs_wrapper.pyx new file mode 100644 index 00000000000..da64b597406 --- /dev/null +++ b/python/cugraph/mg/traversal/mg_bfs_wrapper.pyx @@ -0,0 +1,92 @@ +# +# Copyright (c) 2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from cugraph.structure.utils_wrapper import * +from cugraph.mg.traversal cimport mg_bfs as c_bfs +import cudf +from cugraph.structure.graph_new cimport * +import cugraph.structure.graph_new_wrapper as graph_new_wrapper +from libc.stdint cimport uintptr_t + +def mg_bfs(input_df, local_data, rank, handle, start, return_distances=False): + """ + Call pagerank + """ + + cdef size_t handle_size_t = handle.getHandle() + handle_ = handle_size_t + + # Local COO information + src = input_df['src'] + dst = input_df['dst'] + num_verts = local_data['verts'].sum() + num_edges = local_data['edges'].sum() + local_offset = local_data['offsets'][rank] + src = src - local_offset + num_local_verts = local_data['verts'][rank] + num_local_edges = len(src) + + # Convert to local CSR + [src, dst] = graph_new_wrapper.datatype_cast([src, dst], [np.int32]) + _offsets, indices, weights = coo2csr(src, dst, None) + offsets = _offsets[:num_local_verts + 1] + del _offsets + + # Pointers required for CSR Graph + cdef uintptr_t c_offsets_ptr = offsets.__cuda_array_interface__['data'][0] + cdef uintptr_t c_indices_ptr = indices.__cuda_array_interface__['data'][0] + + # Generate the cudf.DataFrame result + df = cudf.DataFrame() + df['vertex'] = cudf.Series(np.zeros(num_verts, dtype=np.int32)) + df['predecessor'] = cudf.Series(np.zeros(num_verts, dtype=np.int32)) + if (return_distances): + df['distance'] = cudf.Series(np.zeros(num_verts, dtype=np.int32)) + + # Associate to cudf Series + cdef uintptr_t c_identifier_ptr = df['vertex'].__cuda_array_interface__['data'][0]; + cdef uintptr_t c_distance_ptr = NULL # Pointer to the DataFrame 'distance' Series + cdef uintptr_t c_predecessor_ptr = df['predecessor'].__cuda_array_interface__['data'][0]; + if (return_distances): + c_distance_ptr = df['distance'].__cuda_array_interface__['data'][0] + + # Extract local data + cdef uintptr_t c_local_verts = local_data['verts'].__array_interface__['data'][0] + cdef uintptr_t c_local_edges = local_data['edges'].__array_interface__['data'][0] + cdef uintptr_t c_local_offsets = local_data['offsets'].__array_interface__['data'][0] + + # BFS + cdef GraphCSRView[int,int,float] graph + graph= GraphCSRView[int, int, float]( c_offsets_ptr, + c_indices_ptr, + NULL, + num_verts, + num_local_edges) + graph.set_local_data(c_local_verts, c_local_edges, c_local_offsets) + graph.set_handle(handle_) + graph.get_vertex_identifiers(c_identifier_ptr) + + cdef bool direction = 1 + # MG BFS path assumes directed is true + c_bfs.bfs[int, int, float](handle_[0], + graph, + c_distance_ptr, + c_predecessor_ptr, + NULL, + start, + direction) + + return df diff --git a/python/cugraph/tests/dask/test_mg_bfs.py b/python/cugraph/tests/dask/test_mg_bfs.py new file mode 100644 index 00000000000..80ddf579973 --- /dev/null +++ b/python/cugraph/tests/dask/test_mg_bfs.py @@ -0,0 +1,66 @@ +# Copyright (c) 2020, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import cugraph.dask as dcg +import cugraph.comms as Comms +from dask.distributed import Client +import gc +import cugraph +import dask_cudf +import cudf +from dask_cuda import LocalCUDACluster + + +def test_dask_bfs(): + gc.collect() + cluster = LocalCUDACluster() + client = Client(cluster) + Comms.initialize() + + input_data_path = r"../datasets/netscience.csv" + chunksize = dcg.get_chunksize(input_data_path) + + ddf = dask_cudf.read_csv(input_data_path, chunksize=chunksize, + delimiter=' ', + names=['src', 'dst', 'value'], + dtype=['int32', 'int32', 'float32']) + + df = cudf.read_csv(input_data_path, + delimiter=' ', + names=['src', 'dst', 'value'], + dtype=['int32', 'int32', 'float32']) + + g = cugraph.DiGraph() + g.from_cudf_edgelist(df, 'src', 'dst', renumber=True) + + dg = cugraph.DiGraph() + dg.from_dask_cudf_edgelist(ddf, renumber=True) + + expected_dist = cugraph.bfs(g, 0) + result_dist = dcg.bfs(dg, 0, True) + + compare_dist = expected_dist.merge( + result_dist, on="vertex", suffixes=['_local', '_dask'] + ) + + err = 0 + + for i in range(len(compare_dist)): + if (compare_dist['distance_local'].iloc[i] != + compare_dist['distance_dask'].iloc[i]): + err = err + 1 + assert err == 0 + + Comms.destroy() + client.close() + cluster.close() diff --git a/python/cugraph/tests/dask/test_mg_pagerank.py b/python/cugraph/tests/dask/test_mg_pagerank.py index be6c9ab45b4..c23b2bb0262 100644 --- a/python/cugraph/tests/dask/test_mg_pagerank.py +++ b/python/cugraph/tests/dask/test_mg_pagerank.py @@ -112,5 +112,4 @@ def test_dask_pagerank(client_connection, personalization_perc): compare_pr['pagerank_dask'].iloc[i]) if diff > tol * 1.1: err = err + 1 - print("Mismatches:", err) assert err == 0 From 39425ced445230f7451f9d1d9c7cf99c7241100b Mon Sep 17 00:00:00 2001 From: Kumar Aatish Date: Wed, 29 Jul 2020 16:49:01 -0400 Subject: [PATCH 11/11] CHANGELOG fix --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e05126bf2fa..04133eb4eb7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ - PR #990 MG Consolidation - PR #993 Add persistent Handle for Comms - PR #979 Add hypergraph implementation to convert DataFrames into Graphs +- PR #1010 MG BFS (dask) - PR #1018 MG personalized pagerank ## Improvements