Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] MG BFS Dask PR #1010

Merged
merged 16 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- PR #990 MG Consolidation
- PR #993 Add persistent Handle for Comms
- PR #979 Add hypergraph implementation to convert DataFrames into Graphs
- PR #1010 MG BFS (dask)
- PR #1018 MG personalized pagerank

## Improvements
Expand Down Expand Up @@ -57,6 +58,7 @@
- PR #992 Fix unrenumber of predecessor
- PR #1008 Fix for cudf updates disabling iteration of Series/Columns/Index
- PR #1012 Fix Local build script README
- PR #1017 Fix more mg bugs
- PR #1022 Fix support for using a cudf.DataFrame with a MG graph

# cuGraph 0.14.0 (03 Jun 2020)
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/graph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ enum class DegreeDirection {
template <typename VT, typename ET, typename WT>
class GraphViewBase {
public:
WT *edge_data; ///< edge weight
raft::handle_t *handle;
WT *edge_data; ///< edge weight
GraphProperties prop;

VT number_of_vertices;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/traversal/mg/bfs.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ void bfs(raft::handle_t const &handle,
}

// BFS communications wrapper
BFSCommunicatorIterativeBCastReduce<VT, ET, WT> bfs_comm(handle, word_count);
BFSCommunicatorBCastReduce<VT, ET, WT> bfs_comm(handle, word_count);

// 0. 'Insert' starting vertex in the input frontier
input_frontier[start_vertex / BitsPWrd<unsigned>] = static_cast<unsigned>(1)
Expand Down
28 changes: 24 additions & 4 deletions python/cugraph/comms/comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,33 @@

# Intialize Comms. If explicit Comms not provided as arg,
# default Comms are initialized as per client information.
def initialize(arg=None):
def initialize(comms=None, p2p=False):
"""
Intitializes a communicator for multi-node multi-gpu communications.
It is expected to be called right after client initialization for running
mnmg algorithms. It wraps raft comms that manages underlying NCCL and UCX
comms handles across the workers of a Dask cluster.
It is recommended to also call `destroy()` when the comms are no longer
needed so the underlying resources can be cleaned up.

Parameters
----------
comms : raft Comms
A pre-initialized raft communicator. If provided, this is used for mnmg
communications.
p2p : bool
Initialize UCX endpoints
"""

global __instance
if __instance is None:
global __default_handle
__default_handle = None
if arg is None:
__instance = raftComms()
if comms is None:
__instance = raftComms(comms_p2p=p2p)
__instance.init()
else:
__instance = arg
__instance = comms
else:
raise Exception("Communicator is already initialized")

Expand Down Expand Up @@ -47,6 +64,9 @@ def get_session_id():

# Destroy Comms
def destroy():
"""
Shuts down initialized comms and cleans up resources.
"""
global __instance
if is_initialized():
__instance.destroy()
Expand Down
1 change: 1 addition & 0 deletions python/cugraph/dask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
# limitations under the License.

from .mg_pagerank.pagerank import pagerank
from .mg_bfs.bfs import bfs
from .common.read_utils import get_chunksize
18 changes: 9 additions & 9 deletions python/cugraph/dask/common/input_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,13 @@ def calculate_local_data(self, comms, by):
for rank in range(len(_local_data_dict)):
data = _local_data_dict[rank]
local_data_dict['edges'].append(data[0])
local_data_dict['offsets'].append(data[1])
local_data_dict['verts'].append(data[2])
if rank == 0:
local_offset = 0
else:
prev_data = _local_data_dict[rank-1]
local_offset = prev_data[1] + 1
local_data_dict['offsets'].append(local_offset)
local_data_dict['verts'].append(data[1] - local_offset + 1)

import numpy as np
local_data_dict['edges'] = np.array(local_data_dict['edges'],
Expand Down Expand Up @@ -191,13 +196,8 @@ def get_obj(x): return x[0] if multiple else x
def _get_local_data(df, by):
df = df[0]
num_local_edges = len(df)
if num_local_edges == 0:
local_offset = 0
num_local_verts = 0
else:
local_offset = df[by].min()
num_local_verts = df[by].max() - local_offset + 1
return num_local_edges, local_offset, num_local_verts
local_max = df[by].iloc[-1]
return num_local_edges, local_max


def get_local_data(input_graph, by, load_balance=True):
Expand Down
Empty file.
112 changes: 112 additions & 0 deletions python/cugraph/dask/mg_bfs/bfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Copyright (c) 2019-2020, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from dask.distributed import wait, default_client
from cugraph.dask.common.input_utils import get_local_data
from cugraph.mg.traversal import mg_bfs_wrapper as mg_bfs
import cugraph.comms.comms as Comms
import cudf


def call_bfs(sID, data, local_data, start, return_distances):
wid = Comms.get_worker_id(sID)
handle = Comms.get_handle(sID)
return mg_bfs.mg_bfs(data[0],
local_data,
wid,
handle,
start,
return_distances)


def bfs(graph,
start,
return_distances=False):

"""
Find the distances and predecessors for a breadth first traversal of a
graph.
The input graph must contain edge list as dask-cudf dataframe with
one partition per GPU.
Parameters
----------
graph : cugraph.DiGraph
cuGraph graph descriptor, should contain the connectivity information
as dask cudf edge list dataframe(edge weights are not used for this
algorithm). Undirected Graph not currently supported.
start : Integer
Specify starting vertex for breadth-first search; this function
iterates over edges in the component reachable from this node.
return_distances : bool, optional, default=False
Indicates if distances should be returned
Returns
-------
df : cudf.DataFrame
df['vertex'][i] gives the vertex id of the i'th vertex
df['distance'][i] gives the path distance for the i'th vertex from the
kaatish marked this conversation as resolved.
Show resolved Hide resolved
starting vertex (Only if return_distances is True)
df['predecessor'][i] gives for the i'th vertex the vertex it was
reached from in the traversal
Examples
--------
>>> import cugraph.dask as dcg
>>> chunksize = dcg.get_chunksize(input_data_path)
>>> ddf = dask_cudf.read_csv(input_data_path, chunksize=chunksize,
delimiter=' ',
names=['src', 'dst', 'value'],
dtype=['int32', 'int32', 'float32'])
>>> dg = cugraph.DiGraph()
>>> dg.from_dask_cudf_edgelist(ddf)
>>> df = dcg.bfs(dg, 0)
"""

client = default_client()

if(graph.local_data is not None and
graph.local_data['by'] == 'src'):
data = graph.local_data['data']
else:
data = get_local_data(graph, by='src')

if graph.renumbered:
start = graph.lookup_internal_vertex_id(cudf.Series([start])).compute()
start = start.iloc[0]

result = dict([(data.worker_info[wf[0]]["rank"],
client.submit(
call_bfs,
Comms.get_session_id(),
wf[1],
data.local_data,
start,
return_distances,
workers=[wf[0]]))
for idx, wf in enumerate(data.worker_to_parts.items())])
wait(result)

df = result[0].result()

if graph.renumbered:
df = graph.unrenumber(df, 'vertex').compute()
df = graph.unrenumber(df, 'predecessor').compute()
df["predecessor"].fillna(-1, inplace=True)

return df
5 changes: 3 additions & 2 deletions python/cugraph/dask/mg_pagerank/pagerank.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ def pagerank(input_graph,
names=['src', 'dst', 'value'],
dtype=['int32', 'int32', 'float32'])
>>> dg = cugraph.DiGraph()
>>> dg.from_dask_cudf_edgelist(ddf)
>>> dg.from_dask_cudf_edgelist(ddf, source='src', destination='dst',
edge_attr='value')
>>> pr = dcg.pagerank(dg)
"""
from cugraph.structure.graph import null_check
Expand All @@ -121,7 +122,7 @@ def pagerank(input_graph,
if input_graph.renumbered is True:
personalization = input_graph.add_internal_vertex_id(
personalization, "vertex", "vertex"
)
).compute()

result = dict([(data.worker_info[wf[0]]["rank"],
client.submit(
Expand Down
1 change: 0 additions & 1 deletion python/cugraph/dask/pagerank/__init__.py

This file was deleted.

45 changes: 0 additions & 45 deletions python/cugraph/dask/pagerank/pagerank.py

This file was deleted.

17 changes: 17 additions & 0 deletions python/cugraph/mg/traversal/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Copyright (c) 2020, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from cugraph.mg.traversal.mg_bfs_wrapper import mg_bfs
30 changes: 30 additions & 0 deletions python/cugraph/mg/traversal/mg_bfs.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#
# Copyright (c) 2020, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from cugraph.structure.graph_new cimport *
from libcpp cimport bool


cdef extern from "algorithms.hpp" namespace "cugraph":

cdef void bfs[VT,ET,WT](
const handle_t &handle,
const GraphCSRView[VT,ET,WT] &graph,
VT *distances,
VT *predecessors,
double *sp_counters,
const VT start_vertex,
bool directed) except +
Loading