diff --git a/CHANGELOG.md b/CHANGELOG.md index 9568be67594..b175568bf60 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - PR #1163 Integrated 2D shuffling and Louvain updates - PR #1178 Refactored cython graph factory code to scale to additional data types - PR #1175 Integrated 2D pagerank python/cython infra +- PR #1177 Integrated 2D bfs and sssp python/cython infra ## Improvements - PR 1081 MNMG Renumbering - sort partitions by degree @@ -49,6 +50,7 @@ - PR #1196 Move subcomms init outside of individual algorithm functions - PR #1198 Remove deprecated call to from_gpu_matrix + # cuGraph 0.15.0 (26 Aug 2020) ## New Features diff --git a/cpp/include/algorithms.hpp b/cpp/include/algorithms.hpp index f4b9868040b..3b1bdde5472 100644 --- a/cpp/include/algorithms.hpp +++ b/cpp/include/algorithms.hpp @@ -965,7 +965,7 @@ namespace experimental { * @param do_expensive_check A flag to run expensive checks for input arguments (if set to `true`). */ template -void bfs(raft::handle_t &handle, +void bfs(raft::handle_t const &handle, graph_view_t const &graph_view, vertex_t *distances, vertex_t *predecessors, @@ -998,7 +998,7 @@ void bfs(raft::handle_t &handle, * @param do_expensive_check A flag to run expensive checks for input arguments (if set to `true`). */ template -void sssp(raft::handle_t &handle, +void sssp(raft::handle_t const &handle, graph_view_t const &graph_view, weight_t *distances, vertex_t *predecessors, diff --git a/cpp/include/utilities/cython.hpp b/cpp/include/utilities/cython.hpp index 36e0369c1c6..8dcdfaf31cf 100644 --- a/cpp/include/utilities/cython.hpp +++ b/cpp/include/utilities/cython.hpp @@ -213,6 +213,26 @@ void call_pagerank(raft::handle_t const& handle, int64_t max_iter, bool has_guess); +// Wrapper for calling BFS through a graph container +template +void call_bfs(raft::handle_t const& handle, + graph_container_t const& graph_container, + vertex_t* identifiers, + vertex_t* distances, + vertex_t* predecessors, + double* sp_counters, + const vertex_t start_vertex, + bool directed); + +// Wrapper for calling SSSP through a graph container +template +void call_sssp(raft::handle_t const& handle, + graph_container_t const& graph_container, + vertex_t* identifiers, + weight_t* distances, + vertex_t* predecessors, + const vertex_t source_vertex); + // Helper for setting up subcommunicators, typically called as part of the // user-initiated comms initialization in Python. // diff --git a/cpp/src/experimental/bfs.cu b/cpp/src/experimental/bfs.cu index d9d7cb1a245..940ff30de07 100644 --- a/cpp/src/experimental/bfs.cu +++ b/cpp/src/experimental/bfs.cu @@ -41,7 +41,7 @@ namespace experimental { namespace detail { template -void bfs(raft::handle_t &handle, +void bfs(raft::handle_t const &handle, GraphViewType const &push_graph_view, typename GraphViewType::vertex_type *distances, PredecessorIterator predecessor_first, @@ -164,7 +164,7 @@ void bfs(raft::handle_t &handle, } // namespace detail template -void bfs(raft::handle_t &handle, +void bfs(raft::handle_t const &handle, graph_view_t const &graph_view, vertex_t *distances, vertex_t *predecessors, @@ -196,7 +196,7 @@ void bfs(raft::handle_t &handle, // explicit instantiation -template void bfs(raft::handle_t &handle, +template void bfs(raft::handle_t const &handle, graph_view_t const &graph_view, int32_t *distances, int32_t *predecessors, @@ -205,7 +205,7 @@ template void bfs(raft::handle_t &handle, int32_t depth_limit, bool do_expensive_check); -template void bfs(raft::handle_t &handle, +template void bfs(raft::handle_t const &handle, graph_view_t const &graph_view, int32_t *distances, int32_t *predecessors, @@ -214,7 +214,7 @@ template void bfs(raft::handle_t &handle, int32_t depth_limit, bool do_expensive_check); -template void bfs(raft::handle_t &handle, +template void bfs(raft::handle_t const &handle, graph_view_t const &graph_view, int32_t *distances, int32_t *predecessors, @@ -223,7 +223,7 @@ template void bfs(raft::handle_t &handle, int32_t depth_limit, bool do_expensive_check); -template void bfs(raft::handle_t &handle, +template void bfs(raft::handle_t const &handle, graph_view_t const &graph_view, int32_t *distances, int32_t *predecessors, @@ -232,7 +232,7 @@ template void bfs(raft::handle_t &handle, int32_t depth_limit, bool do_expensive_check); -template void bfs(raft::handle_t &handle, +template void bfs(raft::handle_t const &handle, graph_view_t const &graph_view, int64_t *distances, int64_t *predecessors, @@ -241,7 +241,7 @@ template void bfs(raft::handle_t &handle, int64_t depth_limit, bool do_expensive_check); -template void bfs(raft::handle_t &handle, +template void bfs(raft::handle_t const &handle, graph_view_t const &graph_view, int64_t *distances, int64_t *predecessors, @@ -250,7 +250,7 @@ template void bfs(raft::handle_t &handle, int64_t depth_limit, bool do_expensive_check); -template void bfs(raft::handle_t &handle, +template void bfs(raft::handle_t const &handle, graph_view_t const &graph_view, int32_t *distances, int32_t *predecessors, @@ -259,7 +259,7 @@ template void bfs(raft::handle_t &handle, int32_t depth_limit, bool do_expensive_check); -template void bfs(raft::handle_t &handle, +template void bfs(raft::handle_t const &handle, graph_view_t const &graph_view, int32_t *distances, int32_t *predecessors, @@ -268,7 +268,7 @@ template void bfs(raft::handle_t &handle, int32_t depth_limit, bool do_expensive_check); -template void bfs(raft::handle_t &handle, +template void bfs(raft::handle_t const &handle, graph_view_t const &graph_view, int32_t *distances, int32_t *predecessors, @@ -277,7 +277,7 @@ template void bfs(raft::handle_t &handle, int32_t depth_limit, bool do_expensive_check); -template void bfs(raft::handle_t &handle, +template void bfs(raft::handle_t const &handle, graph_view_t const &graph_view, int32_t *distances, int32_t *predecessors, @@ -286,7 +286,7 @@ template void bfs(raft::handle_t &handle, int32_t depth_limit, bool do_expensive_check); -template void bfs(raft::handle_t &handle, +template void bfs(raft::handle_t const &handle, graph_view_t const &graph_view, int64_t *distances, int64_t *predecessors, @@ -295,7 +295,7 @@ template void bfs(raft::handle_t &handle, int64_t depth_limit, bool do_expensive_check); -template void bfs(raft::handle_t &handle, +template void bfs(raft::handle_t const &handle, graph_view_t const &graph_view, int64_t *distances, int64_t *predecessors, diff --git a/cpp/src/experimental/sssp.cu b/cpp/src/experimental/sssp.cu index e0679ad0d56..b1bc2968c71 100644 --- a/cpp/src/experimental/sssp.cu +++ b/cpp/src/experimental/sssp.cu @@ -42,7 +42,7 @@ namespace experimental { namespace detail { template -void sssp(raft::handle_t &handle, +void sssp(raft::handle_t const &handle, GraphViewType const &push_graph_view, typename GraphViewType::weight_type *distances, PredecessorIterator predecessor_first, @@ -241,7 +241,7 @@ void sssp(raft::handle_t &handle, } // namespace detail template -void sssp(raft::handle_t &handle, +void sssp(raft::handle_t const &handle, graph_view_t const &graph_view, weight_t *distances, vertex_t *predecessors, @@ -265,7 +265,7 @@ void sssp(raft::handle_t &handle, // explicit instantiation -template void sssp(raft::handle_t &handle, +template void sssp(raft::handle_t const &handle, graph_view_t const &graph_view, float *distances, int32_t *predecessors, @@ -273,7 +273,7 @@ template void sssp(raft::handle_t &handle, float cutoff, bool do_expensive_check); -template void sssp(raft::handle_t &handle, +template void sssp(raft::handle_t const &handle, graph_view_t const &graph_view, double *distances, int32_t *predecessors, @@ -281,7 +281,7 @@ template void sssp(raft::handle_t &handle, double cutoff, bool do_expensive_check); -template void sssp(raft::handle_t &handle, +template void sssp(raft::handle_t const &handle, graph_view_t const &graph_view, float *distances, int32_t *predecessors, @@ -289,7 +289,7 @@ template void sssp(raft::handle_t &handle, float cutoff, bool do_expensive_check); -template void sssp(raft::handle_t &handle, +template void sssp(raft::handle_t const &handle, graph_view_t const &graph_view, double *distances, int32_t *predecessors, @@ -297,7 +297,7 @@ template void sssp(raft::handle_t &handle, double cutoff, bool do_expensive_check); -template void sssp(raft::handle_t &handle, +template void sssp(raft::handle_t const &handle, graph_view_t const &graph_view, float *distances, int64_t *predecessors, @@ -305,7 +305,7 @@ template void sssp(raft::handle_t &handle, float cutoff, bool do_expensive_check); -template void sssp(raft::handle_t &handle, +template void sssp(raft::handle_t const &handle, graph_view_t const &graph_view, double *distances, int64_t *predecessors, @@ -313,7 +313,7 @@ template void sssp(raft::handle_t &handle, double cutoff, bool do_expensive_check); -template void sssp(raft::handle_t &handle, +template void sssp(raft::handle_t const &handle, graph_view_t const &graph_view, float *distances, int32_t *predecessors, @@ -321,7 +321,7 @@ template void sssp(raft::handle_t &handle, float cutoff, bool do_expensive_check); -template void sssp(raft::handle_t &handle, +template void sssp(raft::handle_t const &handle, graph_view_t const &graph_view, double *distances, int32_t *predecessors, @@ -329,7 +329,7 @@ template void sssp(raft::handle_t &handle, double cutoff, bool do_expensive_check); -template void sssp(raft::handle_t &handle, +template void sssp(raft::handle_t const &handle, graph_view_t const &graph_view, float *distances, int32_t *predecessors, @@ -337,7 +337,7 @@ template void sssp(raft::handle_t &handle, float cutoff, bool do_expensive_check); -template void sssp(raft::handle_t &handle, +template void sssp(raft::handle_t const &handle, graph_view_t const &graph_view, double *distances, int32_t *predecessors, @@ -345,7 +345,7 @@ template void sssp(raft::handle_t &handle, double cutoff, bool do_expensive_check); -template void sssp(raft::handle_t &handle, +template void sssp(raft::handle_t const &handle, graph_view_t const &graph_view, float *distances, int64_t *predecessors, @@ -353,7 +353,7 @@ template void sssp(raft::handle_t &handle, float cutoff, bool do_expensive_check); -template void sssp(raft::handle_t &handle, +template void sssp(raft::handle_t const &handle, graph_view_t const &graph_view, double *distances, int64_t *predecessors, diff --git a/cpp/src/utilities/cython.cu b/cpp/src/utilities/cython.cu index 78b59fbead8..9705f229548 100644 --- a/cpp/src/utilities/cython.cu +++ b/cpp/src/utilities/cython.cu @@ -543,6 +543,108 @@ void call_pagerank(raft::handle_t const& handle, } } +// Wrapper for calling BFS through a graph container +template +void call_bfs(raft::handle_t const& handle, + graph_container_t const& graph_container, + vertex_t* identifiers, + vertex_t* distances, + vertex_t* predecessors, + double* sp_counters, + const vertex_t start_vertex, + bool directed) +{ + if (graph_container.graph_type == graphTypeEnum::GraphCSRViewFloat) { + graph_container.graph_ptr_union.GraphCSRViewFloatPtr->get_vertex_identifiers( + reinterpret_cast(identifiers)); + bfs(handle, + *(graph_container.graph_ptr_union.GraphCSRViewFloatPtr), + reinterpret_cast(distances), + reinterpret_cast(predecessors), + sp_counters, + static_cast(start_vertex), + directed); + } else if (graph_container.graph_type == graphTypeEnum::GraphCSRViewDouble) { + graph_container.graph_ptr_union.GraphCSRViewDoublePtr->get_vertex_identifiers( + reinterpret_cast(identifiers)); + bfs(handle, + *(graph_container.graph_ptr_union.GraphCSRViewDoublePtr), + reinterpret_cast(distances), + reinterpret_cast(predecessors), + sp_counters, + static_cast(start_vertex), + directed); + } else if (graph_container.graph_type == graphTypeEnum::graph_t) { + if (graph_container.edgeType == numberTypeEnum::int32Type) { + auto graph = + detail::create_graph(handle, graph_container); + cugraph::experimental::bfs(handle, + graph->view(), + reinterpret_cast(distances), + reinterpret_cast(predecessors), + static_cast(start_vertex)); + } else if (graph_container.edgeType == numberTypeEnum::int64Type) { + auto graph = + detail::create_graph(handle, graph_container); + cugraph::experimental::bfs(handle, + graph->view(), + reinterpret_cast(distances), + reinterpret_cast(predecessors), + static_cast(start_vertex)); + } else { + CUGRAPH_FAIL("vertexType/edgeType combination unsupported"); + } + } +} + +// Wrapper for calling SSSP through a graph container +template +void call_sssp(raft::handle_t const& handle, + graph_container_t const& graph_container, + vertex_t* identifiers, + weight_t* distances, + vertex_t* predecessors, + const vertex_t source_vertex) +{ + if (graph_container.graph_type == graphTypeEnum::GraphCSRViewFloat) { + graph_container.graph_ptr_union.GraphCSRViewFloatPtr->get_vertex_identifiers( + reinterpret_cast(identifiers)); + sssp( // handle, TODO: clarify: no raft_handle_t? why? + *(graph_container.graph_ptr_union.GraphCSRViewFloatPtr), + reinterpret_cast(distances), + reinterpret_cast(predecessors), + static_cast(source_vertex)); + } else if (graph_container.graph_type == graphTypeEnum::GraphCSRViewDouble) { + graph_container.graph_ptr_union.GraphCSRViewDoublePtr->get_vertex_identifiers( + reinterpret_cast(identifiers)); + sssp( // handle, TODO: clarify: no raft_handle_t? why? + *(graph_container.graph_ptr_union.GraphCSRViewDoublePtr), + reinterpret_cast(distances), + reinterpret_cast(predecessors), + static_cast(source_vertex)); + } else if (graph_container.graph_type == graphTypeEnum::graph_t) { + if (graph_container.edgeType == numberTypeEnum::int32Type) { + auto graph = + detail::create_graph(handle, graph_container); + cugraph::experimental::sssp(handle, + graph->view(), + reinterpret_cast(distances), + reinterpret_cast(predecessors), + static_cast(source_vertex)); + } else if (graph_container.edgeType == numberTypeEnum::int64Type) { + auto graph = + detail::create_graph(handle, graph_container); + cugraph::experimental::sssp(handle, + graph->view(), + reinterpret_cast(distances), + reinterpret_cast(predecessors), + static_cast(source_vertex)); + } else { + CUGRAPH_FAIL("vertexType/edgeType combination unsupported"); + } + } +} + // Explicit instantiations template std::pair call_louvain(raft::handle_t const& handle, graph_container_t const& graph_container, @@ -606,6 +708,70 @@ template void call_pagerank(raft::handle_t const& handle, int64_t max_iter, bool has_guess); +template void call_bfs(raft::handle_t const& handle, + graph_container_t const& graph_container, + int32_t* identifiers, + int32_t* distances, + int32_t* predecessors, + double* sp_counters, + const int32_t start_vertex, + bool directed); + +template void call_bfs(raft::handle_t const& handle, + graph_container_t const& graph_container, + int32_t* identifiers, + int32_t* distances, + int32_t* predecessors, + double* sp_counters, + const int32_t start_vertex, + bool directed); + +template void call_bfs(raft::handle_t const& handle, + graph_container_t const& graph_container, + int64_t* identifiers, + int64_t* distances, + int64_t* predecessors, + double* sp_counters, + const int64_t start_vertex, + bool directed); + +template void call_bfs(raft::handle_t const& handle, + graph_container_t const& graph_container, + int64_t* identifiers, + int64_t* distances, + int64_t* predecessors, + double* sp_counters, + const int64_t start_vertex, + bool directed); + +template void call_sssp(raft::handle_t const& handle, + graph_container_t const& graph_container, + int32_t* identifiers, + float* distances, + int32_t* predecessors, + const int32_t source_vertex); + +template void call_sssp(raft::handle_t const& handle, + graph_container_t const& graph_container, + int32_t* identifiers, + double* distances, + int32_t* predecessors, + const int32_t source_vertex); + +template void call_sssp(raft::handle_t const& handle, + graph_container_t const& graph_container, + int64_t* identifiers, + float* distances, + int64_t* predecessors, + const int64_t source_vertex); + +template void call_sssp(raft::handle_t const& handle, + graph_container_t const& graph_container, + int64_t* identifiers, + double* distances, + int64_t* predecessors, + const int64_t source_vertex); + // Helper for setting up subcommunicators void init_subcomms(raft::handle_t& handle, size_t row_comm_size) { diff --git a/python/cugraph/dask/__init__.py b/python/cugraph/dask/__init__.py index e62a8bfcdb4..a79bee7c026 100644 --- a/python/cugraph/dask/__init__.py +++ b/python/cugraph/dask/__init__.py @@ -13,5 +13,6 @@ from .link_analysis.pagerank import pagerank from .traversal.bfs import bfs +from .traversal.sssp import sssp from .common.read_utils import get_chunksize from .community.louvain import louvain diff --git a/python/cugraph/dask/community/louvain_wrapper.pyx b/python/cugraph/dask/community/louvain_wrapper.pyx index 59ec0f67733..a1a1e629732 100644 --- a/python/cugraph/dask/community/louvain_wrapper.pyx +++ b/python/cugraph/dask/community/louvain_wrapper.pyx @@ -78,7 +78,8 @@ def louvain(input_df, # data is on device, move to host (.values_host) since graph_t in # graph_container needs a host array - cdef uintptr_t c_vertex_partition_offsets = vertex_partition_offsets.values_host.__array_interface__['data'][0] + vertex_partition_offsets_host = vertex_partition_offsets.values_host + cdef uintptr_t c_vertex_partition_offsets = vertex_partition_offsets_host.__array_interface__['data'][0] cdef graph_container_t graph_container diff --git a/python/cugraph/dask/link_analysis/mg_pagerank.pxd b/python/cugraph/dask/link_analysis/mg_pagerank.pxd index 351b3d20d50..91104d9127c 100644 --- a/python/cugraph/dask/link_analysis/mg_pagerank.pxd +++ b/python/cugraph/dask/link_analysis/mg_pagerank.pxd @@ -31,4 +31,4 @@ cdef extern from "utilities/cython.hpp" namespace "cugraph::cython": double alpha, double tolerance, long long max_iter, - bool has_guess) except + + bool has_guess) except + \ No newline at end of file diff --git a/python/cugraph/dask/link_analysis/mg_pagerank_wrapper.pyx b/python/cugraph/dask/link_analysis/mg_pagerank_wrapper.pyx index 8fecbb9ab87..d459b93e7c4 100644 --- a/python/cugraph/dask/link_analysis/mg_pagerank_wrapper.pyx +++ b/python/cugraph/dask/link_analysis/mg_pagerank_wrapper.pyx @@ -67,7 +67,7 @@ def mg_pagerank(input_df, cdef uintptr_t c_src_vertices = src.__cuda_array_interface__['data'][0] cdef uintptr_t c_dst_vertices = dst.__cuda_array_interface__['data'][0] cdef uintptr_t c_edge_weights = NULL - + # FIXME: data is on device, move to host (to_pandas()), convert to np array and access pointer to pass to C vertex_partition_offsets_host = vertex_partition_offsets.values_host cdef uintptr_t c_vertex_partition_offsets = vertex_partition_offsets_host.__array_interface__['data'][0] diff --git a/python/cugraph/dask/link_analysis/pagerank.py b/python/cugraph/dask/link_analysis/pagerank.py index 143bb37dd22..0ea09969350 100644 --- a/python/cugraph/dask/link_analysis/pagerank.py +++ b/python/cugraph/dask/link_analysis/pagerank.py @@ -73,7 +73,7 @@ def pagerank(input_graph, Alpha should be greater than 0.0 and strictly lower than 1.0. personalization : cudf.Dataframe GPU Dataframe containing the personalization information. - + Currently not supported. personalization['vertex'] : cudf.Series Subset of vertices of graph for personalization personalization['values'] : cudf.Series @@ -99,13 +99,13 @@ def pagerank(input_graph, Returns ------- - PageRank : cudf.DataFrame - GPU data frame containing two cudf.Series of size V: the vertex - identifiers and the corresponding PageRank values. + PageRank : dask_cudf.DataFrame + GPU data frame containing two dask_cudf.Series of size V: the + vertex identifiers and the corresponding PageRank values. - df['vertex'] : cudf.Series + ddf['vertex'] : cudf.Series Contains the vertex identifiers - df['pagerank'] : cudf.Series + ddf['pagerank'] : cudf.Series Contains the PageRank score Examples diff --git a/python/cugraph/dask/traversal/bfs.py b/python/cugraph/dask/traversal/bfs.py index 8baf15e079b..88eba53de55 100644 --- a/python/cugraph/dask/traversal/bfs.py +++ b/python/cugraph/dask/traversal/bfs.py @@ -14,29 +14,36 @@ # from dask.distributed import wait, default_client -from cugraph.dask.common.input_utils import get_local_data +from cugraph.dask.common.input_utils import get_distributed_data +from cugraph.structure.shuffle import shuffle from cugraph.dask.traversal import mg_bfs_wrapper as mg_bfs import cugraph.comms.comms as Comms import cudf +import dask_cudf -def call_bfs(sID, data, local_data, start, num_verts, return_distances): +def call_bfs(sID, + data, + num_verts, + num_edges, + vertex_partition_offsets, + start, + return_distances): wid = Comms.get_worker_id(sID) handle = Comms.get_handle(sID) return mg_bfs.mg_bfs(data[0], - local_data, + num_verts, + num_edges, + vertex_partition_offsets, wid, handle, start, - num_verts, return_distances) def bfs(graph, start, - return_distances=False, - load_balance=True): - + return_distances=False): """ Find the distances and predecessors for a breadth first traversal of a graph. @@ -54,10 +61,6 @@ def bfs(graph, iterates over edges in the component reachable from this node. return_distances : bool, optional, default=False Indicates if distances should be returned - load_balance : bool, optional, default=True - Set as True to perform load_balancing after global sorting of - dask-cudf DataFrame. This ensures that the data is uniformly - distributed among multiple GPUs to avoid over-loading. Returns ------- @@ -87,35 +90,36 @@ def bfs(graph, client = default_client() - if(graph.local_data is not None and - graph.local_data['by'] == 'src'): - data = graph.local_data['data'] - else: - data = get_local_data(graph, by='src', load_balance=load_balance) + graph.compute_renumber_edge_list(transposed=False) + (ddf, + num_verts, + partition_row_size, + partition_col_size, + vertex_partition_offsets) = shuffle(graph, transposed=False) + num_edges = len(ddf) + data = get_distributed_data(ddf) if graph.renumbered: start = graph.lookup_internal_vertex_id(cudf.Series([start], dtype='int32')).compute() start = start.iloc[0] - result = dict([(data.worker_info[wf[0]]["rank"], - client.submit( - call_bfs, - Comms.get_session_id(), - wf[1], - data.local_data, - start, - data.max_vertex_id+1, - return_distances, - workers=[wf[0]])) - for idx, wf in enumerate(data.worker_to_parts.items())]) + result = [client.submit( + call_bfs, + Comms.get_session_id(), + wf[1], + num_verts, + num_edges, + vertex_partition_offsets, + start, + return_distances, + workers=[wf[0]]) + for idx, wf in enumerate(data.worker_to_parts.items())] wait(result) - - df = result[0].result() + ddf = dask_cudf.from_delayed(result) if graph.renumbered: - df = graph.unrenumber(df, 'vertex').compute() - df = graph.unrenumber(df, 'predecessor').compute() - df["predecessor"].fillna(-1, inplace=True) - - return df + ddf = graph.unrenumber(ddf, 'vertex') + ddf = graph.unrenumber(ddf, 'predecessor') + ddf["predecessor"] = ddf["predecessor"].fillna(-1) + return ddf diff --git a/python/cugraph/dask/traversal/mg_bfs.pxd b/python/cugraph/dask/traversal/mg_bfs.pxd index 68010e2b816..82c6e97d668 100644 --- a/python/cugraph/dask/traversal/mg_bfs.pxd +++ b/python/cugraph/dask/traversal/mg_bfs.pxd @@ -18,13 +18,14 @@ from cugraph.structure.graph_primtypes cimport * from libcpp cimport bool -cdef extern from "algorithms.hpp" namespace "cugraph": +cdef extern from "utilities/cython.hpp" namespace "cugraph::cython": - cdef void bfs[VT,ET,WT]( + cdef void call_bfs[vertex_t, weight_t]( const handle_t &handle, - const GraphCSRView[VT,ET,WT] &graph, - VT *distances, - VT *predecessors, + const graph_container_t &g, + vertex_t *identifiers, + vertex_t *distances, + vertex_t *predecessors, double *sp_counters, - const VT start_vertex, + const vertex_t start_vertex, bool directed) except + diff --git a/python/cugraph/dask/traversal/mg_bfs_wrapper.pyx b/python/cugraph/dask/traversal/mg_bfs_wrapper.pyx index 4c13aeb1286..c92f28eb407 100644 --- a/python/cugraph/dask/traversal/mg_bfs_wrapper.pyx +++ b/python/cugraph/dask/traversal/mg_bfs_wrapper.pyx @@ -21,7 +21,14 @@ from cugraph.structure.graph_primtypes cimport * import cugraph.structure.graph_primtypes_wrapper as graph_primtypes_wrapper from libc.stdint cimport uintptr_t -def mg_bfs(input_df, local_data, rank, handle, start, result_len, return_distances=False): +def mg_bfs(input_df, + num_global_verts, + num_global_edges, + vertex_partition_offsets, + rank, + handle, + start, + return_distances=False): """ Call pagerank """ @@ -32,59 +39,70 @@ def mg_bfs(input_df, local_data, rank, handle, start, result_len, return_distanc # Local COO information src = input_df['src'] dst = input_df['dst'] - num_verts = local_data['verts'].sum() - num_edges = local_data['edges'].sum() - local_offset = local_data['offsets'][rank] - src = src - local_offset - num_local_verts = local_data['verts'][rank] - num_local_edges = len(src) + vertex_t = src.dtype + if num_global_edges > (2**31 - 1): + edge_t = np.dtype("int64") + else: + edge_t = np.dtype("int32") + if "value" in input_df.columns: + weights = input_df['value'] + weight_t = weights.dtype + else: + weight_t = np.dtype("float32") - # Convert to local CSR - [src, dst] = graph_primtypes_wrapper.datatype_cast([src, dst], [np.int32]) - _offsets, indices, weights = coo2csr(src, dst, None) - offsets = _offsets[:num_local_verts + 1] - del _offsets + # FIXME: Offsets and indices are currently hardcoded to int, but this may + # not be acceptable in the future. + numberTypeMap = {np.dtype("int32") : numberTypeEnum.int32Type, + np.dtype("int64") : numberTypeEnum.int64Type, + np.dtype("float32") : numberTypeEnum.floatType, + np.dtype("double") : numberTypeEnum.doubleType} - # Pointers required for CSR Graph - cdef uintptr_t c_offsets_ptr = offsets.__cuda_array_interface__['data'][0] - cdef uintptr_t c_indices_ptr = indices.__cuda_array_interface__['data'][0] + # FIXME: needs to be edge_t type not int + cdef int num_partition_edges = len(src) + + cdef uintptr_t c_src_vertices = src.__cuda_array_interface__['data'][0] + cdef uintptr_t c_dst_vertices = dst.__cuda_array_interface__['data'][0] + cdef uintptr_t c_edge_weights = NULL + + # FIXME: data is on device, move to host (to_pandas()), convert to np array and access pointer to pass to C + vertex_partition_offsets_host = vertex_partition_offsets.values_host + cdef uintptr_t c_vertex_partition_offsets = vertex_partition_offsets_host.__array_interface__['data'][0] + + cdef graph_container_t graph_container + + populate_graph_container(graph_container, + handle_[0], + c_src_vertices, c_dst_vertices, c_edge_weights, + c_vertex_partition_offsets, + ((numberTypeMap[vertex_t])), + ((numberTypeMap[edge_t])), + ((numberTypeMap[weight_t])), + num_partition_edges, + num_global_verts, num_global_edges, + True, + False, True) # Generate the cudf.DataFrame result df = cudf.DataFrame() - df['vertex'] = cudf.Series(range(0, result_len), dtype=np.int32) - df['predecessor'] = cudf.Series(np.zeros(result_len, dtype=np.int32)) + df['vertex'] = cudf.Series(np.arange(vertex_partition_offsets.iloc[rank], vertex_partition_offsets.iloc[rank+1]), dtype=vertex_t) + df['predecessor'] = cudf.Series(np.zeros(len(df['vertex']), dtype=np.int32)) if (return_distances): - df['distance'] = cudf.Series(np.zeros(result_len, dtype=np.int32)) + df['distance'] = cudf.Series(np.zeros(len(df['vertex']), dtype=np.int32)) # Associate to cudf Series cdef uintptr_t c_distance_ptr = NULL # Pointer to the DataFrame 'distance' Series - cdef uintptr_t c_predecessor_ptr = df['predecessor'].__cuda_array_interface__['data'][0]; + cdef uintptr_t c_predecessor_ptr = df['predecessor'].__cuda_array_interface__['data'][0] if (return_distances): c_distance_ptr = df['distance'].__cuda_array_interface__['data'][0] - # Extract local data - cdef uintptr_t c_local_verts = local_data['verts'].__array_interface__['data'][0] - cdef uintptr_t c_local_edges = local_data['edges'].__array_interface__['data'][0] - cdef uintptr_t c_local_offsets = local_data['offsets'].__array_interface__['data'][0] - - # BFS - cdef GraphCSRView[int,int,float] graph - graph= GraphCSRView[int, int, float]( c_offsets_ptr, - c_indices_ptr, - NULL, - num_verts, - num_local_edges) - graph.set_local_data(c_local_verts, c_local_edges, c_local_offsets) - graph.set_handle(handle_) - cdef bool direction = 1 # MG BFS path assumes directed is true - c_bfs.bfs[int, int, float](handle_[0], - graph, + c_bfs.call_bfs[int, float](handle_[0], + graph_container, + NULL, c_distance_ptr, c_predecessor_ptr, NULL, start, direction) - return df diff --git a/python/cugraph/dask/traversal/mg_sssp.pxd b/python/cugraph/dask/traversal/mg_sssp.pxd new file mode 100644 index 00000000000..f846facd269 --- /dev/null +++ b/python/cugraph/dask/traversal/mg_sssp.pxd @@ -0,0 +1,28 @@ +# +# Copyright (c) 2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from cugraph.structure.graph_primtypes cimport * +from libcpp cimport bool + + +cdef extern from "utilities/cython.hpp" namespace "cugraph::cython": + + cdef void call_sssp[vertex_t, weight_t]( + const handle_t &handle, + const graph_container_t &g, + vertex_t *identifiers, + weight_t *distances, + vertex_t *predecessors, + const vertex_t start_vertex) diff --git a/python/cugraph/dask/traversal/mg_sssp_wrapper.pyx b/python/cugraph/dask/traversal/mg_sssp_wrapper.pyx new file mode 100644 index 00000000000..b7aec103098 --- /dev/null +++ b/python/cugraph/dask/traversal/mg_sssp_wrapper.pyx @@ -0,0 +1,115 @@ +# +# Copyright (c) 2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from cugraph.structure.utils_wrapper import * +from cugraph.dask.traversal cimport mg_sssp as c_sssp +import cudf +from cugraph.structure.graph_primtypes cimport * +import cugraph.structure.graph_primtypes_wrapper as graph_primtypes_wrapper +from libc.stdint cimport uintptr_t + +def mg_sssp(input_df, + num_global_verts, + num_global_edges, + vertex_partition_offsets, + rank, + handle, + start): + """ + Call sssp + """ + + cdef size_t handle_size_t = handle.getHandle() + handle_ = handle_size_t + + # Local COO information + src = input_df['src'] + dst = input_df['dst'] + vertex_t = src.dtype + if num_global_edges > (2**31 - 1): + edge_t = np.dtype("int64") + else: + edge_t = np.dtype("int32") + if "value" in input_df.columns: + weights = input_df['value'] + weight_t = weights.dtype + else: + weights = None + weight_t = np.dtype("float32") + + # FIXME: Offsets and indices are currently hardcoded to int, but this may + # not be acceptable in the future. + numberTypeMap = {np.dtype("int32") : numberTypeEnum.int32Type, + np.dtype("int64") : numberTypeEnum.int64Type, + np.dtype("float32") : numberTypeEnum.floatType, + np.dtype("double") : numberTypeEnum.doubleType} + + # FIXME: needs to be edge_t type not int + cdef int num_partition_edges = len(src) + + cdef uintptr_t c_src_vertices = src.__cuda_array_interface__['data'][0] + cdef uintptr_t c_dst_vertices = dst.__cuda_array_interface__['data'][0] + cdef uintptr_t c_edge_weights = NULL + if weights is not None: + c_edge_weights = weights.__cuda_array_interface__['data'][0] + + # FIXME: data is on device, move to host (to_pandas()), convert to np array and access pointer to pass to C + vertex_partition_offsets_host = vertex_partition_offsets.values_host + cdef uintptr_t c_vertex_partition_offsets = vertex_partition_offsets_host.__array_interface__['data'][0] + + cdef graph_container_t graph_container + + populate_graph_container(graph_container, + handle_[0], + c_src_vertices, c_dst_vertices, c_edge_weights, + c_vertex_partition_offsets, + ((numberTypeMap[vertex_t])), + ((numberTypeMap[edge_t])), + ((numberTypeMap[weight_t])), + num_partition_edges, + num_global_verts, num_global_edges, + True, + False, True) + + # Generate the cudf.DataFrame result + df = cudf.DataFrame() + df['vertex'] = cudf.Series(np.arange(vertex_partition_offsets.iloc[rank], vertex_partition_offsets.iloc[rank+1]), dtype=vertex_t) + df['predecessor'] = cudf.Series(np.zeros(len(df['vertex']), dtype=vertex_t)) + df['distance'] = cudf.Series(np.zeros(len(df['vertex']), dtype=weight_t)) + + # Associate to cudf Series + cdef uintptr_t c_predecessor_ptr = df['predecessor'].__cuda_array_interface__['data'][0] + cdef uintptr_t c_distance_ptr = df['distance'].__cuda_array_interface__['data'][0] + + # MG BFS path assumes directed is true + if weight_t == np.float32: + c_sssp.call_sssp[int, float](handle_[0], + graph_container, + NULL, + c_distance_ptr, + c_predecessor_ptr, + start) + elif weight_t == np.float64: + c_sssp.call_sssp[int, double](handle_[0], + graph_container, + NULL, + c_distance_ptr, + c_predecessor_ptr, + start) + else: # This case should not happen + raise NotImplementedError + + return df diff --git a/python/cugraph/dask/traversal/sssp.py b/python/cugraph/dask/traversal/sssp.py new file mode 100644 index 00000000000..9554e10f4d6 --- /dev/null +++ b/python/cugraph/dask/traversal/sssp.py @@ -0,0 +1,120 @@ +# Copyright (c) 2019-2020, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from dask.distributed import wait, default_client +from cugraph.dask.common.input_utils import get_distributed_data +from cugraph.structure.shuffle import shuffle +from cugraph.dask.traversal import mg_sssp_wrapper as mg_sssp +import cugraph.comms.comms as Comms +import cudf +import dask_cudf + + +def call_sssp(sID, + data, + num_verts, + num_edges, + vertex_partition_offsets, + start): + wid = Comms.get_worker_id(sID) + handle = Comms.get_handle(sID) + return mg_sssp.mg_sssp(data[0], + num_verts, + num_edges, + vertex_partition_offsets, + wid, + handle, + start) + + +def sssp(graph, + source): + + """ + Find the distances and predecessors for a breadth first traversal of a + graph. + The input graph must contain edge list as dask-cudf dataframe with + one partition per GPU. + + Parameters + ---------- + graph : cugraph.DiGraph + cuGraph graph descriptor, should contain the connectivity information + as dask cudf edge list dataframe(edge weights are not used for this + algorithm). Undirected Graph not currently supported. + source : Integer + Specify source vertex + + Returns + ------- + df : cudf.DataFrame + df['vertex'][i] gives the vertex id of the i'th vertex + + df['distance'][i] gives the path distance for the i'th vertex from the + starting vertex (Only if return_distances is True) + + df['predecessor'][i] gives for the i'th vertex the vertex it was + reached from in the traversal + + Examples + -------- + >>> import cugraph.dask as dcg + >>> Comms.initialize() + >>> chunksize = dcg.get_chunksize(input_data_path) + >>> ddf = dask_cudf.read_csv(input_data_path, chunksize=chunksize, + delimiter=' ', + names=['src', 'dst', 'value'], + dtype=['int32', 'int32', 'float32']) + >>> dg = cugraph.DiGraph() + >>> dg.from_dask_cudf_edgelist(ddf) + >>> df = dcg.sssp(dg, 0) + >>> Comms.destroy() + """ + + client = default_client() + + graph.compute_renumber_edge_list(transposed=False) + (ddf, + num_verts, + partition_row_size, + partition_col_size, + vertex_partition_offsets) = shuffle(graph, transposed=False) + num_edges = len(ddf) + data = get_distributed_data(ddf) + + if graph.renumbered: + source = graph.lookup_internal_vertex_id(cudf.Series([source], + dtype='int32')).compute() + source = source.iloc[0] + + result = [client.submit( + call_sssp, + Comms.get_session_id(), + wf[1], + num_verts, + num_edges, + vertex_partition_offsets, + source, + workers=[wf[0]]) + for idx, wf in enumerate(data.worker_to_parts.items())] + wait(result) + ddf = dask_cudf.from_delayed(result) + + if graph.renumbered: + ddf = graph.unrenumber(ddf, 'vertex') + ddf = graph.unrenumber(ddf, 'predecessor') + ddf["predecessor"] = ddf["predecessor"].fillna(-1) + + return ddf diff --git a/python/cugraph/tests/dask/test_mg_bfs.py b/python/cugraph/tests/dask/test_mg_bfs.py index 94bed827fd0..553bbc698ff 100644 --- a/python/cugraph/tests/dask/test_mg_bfs.py +++ b/python/cugraph/tests/dask/test_mg_bfs.py @@ -27,7 +27,7 @@ def client_connection(): cluster = LocalCUDACluster() client = Client(cluster) - Comms.initialize() + Comms.initialize(p2p=True) yield client @@ -68,6 +68,7 @@ def test_dask_bfs(client_connection): expected_dist = cugraph.bfs(g, 0) result_dist = dcg.bfs(dg, 0, True) + result_dist = result_dist.compute() compare_dist = expected_dist.merge( result_dist, on="vertex", suffixes=["_local", "_dask"] diff --git a/python/cugraph/tests/dask/test_mg_sssp.py b/python/cugraph/tests/dask/test_mg_sssp.py new file mode 100644 index 00000000000..ac4a60f1bdc --- /dev/null +++ b/python/cugraph/tests/dask/test_mg_sssp.py @@ -0,0 +1,86 @@ +# Copyright (c) 2020, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import cugraph.dask as dcg +import cugraph.comms as Comms +from dask.distributed import Client +import gc +import pytest +import cugraph +import dask_cudf +import cudf +from dask_cuda import LocalCUDACluster +from cugraph.dask.common.mg_utils import is_single_gpu + + +@pytest.fixture +def client_connection(): + cluster = LocalCUDACluster() + client = Client(cluster) + Comms.initialize(p2p=True) + + yield client + + Comms.destroy() + client.close() + cluster.close() + + +@pytest.mark.skipif( + is_single_gpu(), reason="skipping MG testing on Single GPU system" +) +def test_dask_sssp(client_connection): + gc.collect() + + input_data_path = r"../datasets/netscience.csv" + chunksize = dcg.get_chunksize(input_data_path) + + ddf = dask_cudf.read_csv( + input_data_path, + chunksize=chunksize, + delimiter=" ", + names=["src", "dst", "value"], + dtype=["int32", "int32", "float32"], + ) + + df = cudf.read_csv( + input_data_path, + delimiter=" ", + names=["src", "dst", "value"], + dtype=["int32", "int32", "float32"], + ) + + g = cugraph.DiGraph() + g.from_cudf_edgelist(df, "src", "dst", "value", renumber=True) + + dg = cugraph.DiGraph() + dg.from_dask_cudf_edgelist(ddf, "src", "dst", "value") + + expected_dist = cugraph.sssp(g, 0) + print(expected_dist) + result_dist = dcg.sssp(dg, 0) + result_dist = result_dist.compute() + + compare_dist = expected_dist.merge( + result_dist, on="vertex", suffixes=["_local", "_dask"] + ) + + err = 0 + + for i in range(len(compare_dist)): + if ( + compare_dist["distance_local"].iloc[i] + != compare_dist["distance_dask"].iloc[i] + ): + err = err + 1 + assert err == 0 diff --git a/python/cugraph/traversal/bfs.pxd b/python/cugraph/traversal/bfs.pxd index 0502754c161..5b73d23045c 100644 --- a/python/cugraph/traversal/bfs.pxd +++ b/python/cugraph/traversal/bfs.pxd @@ -20,13 +20,13 @@ from cugraph.structure.graph_primtypes cimport * from libcpp cimport bool -cdef extern from "algorithms.hpp" namespace "cugraph": - - cdef void bfs[VT,ET,WT]( +cdef extern from "utilities/cython.hpp" namespace "cugraph::cython": + cdef void call_bfs[vertex_t, weight_t]( const handle_t &handle, - const GraphCSRView[VT,ET,WT] &graph, - VT *distances, - VT *predecessors, + const graph_container_t &g, + vertex_t *identifiers, + vertex_t *distances, + vertex_t *predecessors, double *sp_counters, - const VT start_vertex, + const vertex_t start_vertex, bool directed) except + diff --git a/python/cugraph/traversal/bfs_wrapper.pyx b/python/cugraph/traversal/bfs_wrapper.pyx index c13e1eb58ee..ae346aea953 100644 --- a/python/cugraph/traversal/bfs_wrapper.pyx +++ b/python/cugraph/traversal/bfs_wrapper.pyx @@ -33,12 +33,22 @@ def bfs(input_graph, start, directed=True, Call bfs """ # Step 1: Declare the different varibales - cdef GraphCSRView[int, int, float] graph_float # For weighted float graph (SSSP) and Unweighted (BFS) - cdef GraphCSRView[int, int, double] graph_double # For weighted double graph (SSSP) + cdef graph_container_t graph_container + # FIXME: Offsets and indices are currently hardcoded to int, but this may + # not be acceptable in the future. + numberTypeMap = {np.dtype("int32") : numberTypeEnum.int32Type, + np.dtype("int64") : numberTypeEnum.int64Type, + np.dtype("float32") : numberTypeEnum.floatType, + np.dtype("double") : numberTypeEnum.doubleType} # Pointers required for CSR Graph cdef uintptr_t c_offsets_ptr = NULL # Pointer to the CSR offsets cdef uintptr_t c_indices_ptr = NULL # Pointer to the CSR indices + cdef uintptr_t c_weights = NULL + cdef uintptr_t c_local_verts = NULL; + cdef uintptr_t c_local_edges = NULL; + cdef uintptr_t c_local_offsets = NULL; + weight_t = np.dtype("float32") # Pointers for SSSP / BFS cdef uintptr_t c_identifier_ptr = NULL # Pointer to the DataFrame 'vertex' Series @@ -52,6 +62,7 @@ def bfs(input_graph, start, directed=True, cdef unique_ptr[handle_t] handle_ptr handle_ptr.reset(new handle_t()) + handle_ = handle_ptr.get(); # Step 3: Extract CSR offsets, indices, weights are not expected # - offsets: int (signed, 32-bit) @@ -86,15 +97,20 @@ def bfs(input_graph, start, directed=True, # Step 8: Proceed to BFS # FIXME: [int, int, float] or may add an explicit [int, int, int] in graph.cu? - graph_float = GraphCSRView[int, int, float]( c_offsets_ptr, - c_indices_ptr, - NULL, - num_verts, - num_edges) - graph_float.get_vertex_identifiers( c_identifier_ptr) + populate_graph_container_legacy(graph_container, + ((graphTypeEnum.LegacyCSR)), + handle_[0], + c_offsets_ptr, c_indices_ptr, c_weights, + ((numberTypeEnum.int32Type)), + ((numberTypeEnum.int32Type)), + ((numberTypeMap[weight_t])), + num_verts, num_edges, + c_local_verts, c_local_edges, c_local_offsets) + # Different pathing wether shortest_path_counting is required or not - c_bfs.bfs[int, int, float](handle_ptr.get()[0], - graph_float, + c_bfs.call_bfs[int, float](handle_ptr.get()[0], + graph_container, + c_identifier_ptr, c_distance_ptr, c_predecessor_ptr, c_sp_counter_ptr, diff --git a/python/cugraph/traversal/sssp.pxd b/python/cugraph/traversal/sssp.pxd index 8f36ff12ae8..e4b709cb879 100644 --- a/python/cugraph/traversal/sssp.pxd +++ b/python/cugraph/traversal/sssp.pxd @@ -18,10 +18,12 @@ from cugraph.structure.graph_primtypes cimport * -cdef extern from "algorithms.hpp" namespace "cugraph": +cdef extern from "utilities/cython.hpp" namespace "cugraph::cython": - cdef void sssp[VT, ET, WT]( - const GraphCSRView[VT, ET, WT] &graph, - WT *distances, - VT *predecessors, - VT start_vertex) except + + cdef void call_sssp[vertex_t, weight_t]( + const handle_t &handle, + const graph_container_t &g, + vertex_t *identifiers, + weight_t *distances, + vertex_t *predecessors, + vertex_t start_vertex) except + diff --git a/python/cugraph/traversal/sssp_wrapper.pyx b/python/cugraph/traversal/sssp_wrapper.pyx index 1504eee53e1..730fe0db94e 100644 --- a/python/cugraph/traversal/sssp_wrapper.pyx +++ b/python/cugraph/traversal/sssp_wrapper.pyx @@ -34,13 +34,22 @@ def sssp(input_graph, source): Call sssp """ # Step 1: Declare the different variables - cdef GraphCSRView[int, int, float] graph_float # For weighted float graph (SSSP) and Unweighted (BFS) - cdef GraphCSRView[int, int, double] graph_double # For weighted double graph (SSSP) + cdef graph_container_t graph_container + # FIXME: Offsets and indices are currently hardcoded to int, but this may + # not be acceptable in the future. + numberTypeMap = {np.dtype("int32") : numberTypeEnum.int32Type, + np.dtype("int64") : numberTypeEnum.int64Type, + np.dtype("float32") : numberTypeEnum.floatType, + np.dtype("double") : numberTypeEnum.doubleType} # Pointers required for CSR Graph cdef uintptr_t c_offsets_ptr = NULL # Pointer to the CSR offsets cdef uintptr_t c_indices_ptr = NULL # Pointer to the CSR indices cdef uintptr_t c_weights_ptr = NULL # Pointer to the CSR weights + cdef uintptr_t c_local_verts = NULL; + cdef uintptr_t c_local_edges = NULL; + cdef uintptr_t c_local_offsets = NULL; + weight_t = np.dtype("int32") # Pointers for SSSP / BFS cdef uintptr_t c_identifier_ptr = NULL # Pointer to the DataFrame 'vertex' Series @@ -49,6 +58,7 @@ def sssp(input_graph, source): cdef unique_ptr[handle_t] handle_ptr handle_ptr.reset(new handle_t()) + handle_ = handle_ptr.get(); # Step 2: Verify that input_graph has the expected format # the SSSP implementation expects CSR format @@ -65,9 +75,8 @@ def sssp(input_graph, source): c_offsets_ptr = offsets.__cuda_array_interface__['data'][0] c_indices_ptr = indices.__cuda_array_interface__['data'][0] - data_type = np.int32 if weights is not None: - data_type = weights.dtype + weight_t = weights.dtype c_weights_ptr = weights.__cuda_array_interface__['data'][0] # Step 4: Setup number of vertices and number of edges @@ -83,7 +92,7 @@ def sssp(input_graph, source): df = cudf.DataFrame() df['vertex'] = cudf.Series(np.zeros(num_verts, dtype=np.int32)) - df['distance'] = cudf.Series(np.zeros(num_verts, dtype=data_type)) + df['distance'] = cudf.Series(np.zeros(num_verts, dtype=weight_t)) df['predecessor'] = cudf.Series(np.zeros(num_verts, dtype=np.int32)) # Step 7: Associate to cudf Series @@ -94,44 +103,41 @@ def sssp(input_graph, source): # Step 8: Dispatch to SSSP / BFS Based on weights # - weights is not None: SSSP float or SSSP double # - weights is None: BFS + populate_graph_container_legacy(graph_container, + ((graphTypeEnum.LegacyCSR)), + handle_[0], + c_offsets_ptr, c_indices_ptr, c_weights_ptr, + ((numberTypeEnum.int32Type)), + ((numberTypeEnum.int32Type)), + ((numberTypeMap[weight_t])), + num_verts, num_edges, + c_local_verts, c_local_edges, c_local_offsets) + if weights is not None: - if data_type == np.float32: - graph_float = GraphCSRView[int, int, float]( c_offsets_ptr, - c_indices_ptr, - c_weights_ptr, - num_verts, - num_edges) - graph_float.get_vertex_identifiers( c_identifier_ptr) - c_sssp.sssp[int, int, float](graph_float, + if weight_t == np.float32: + c_sssp.call_sssp[int, float](handle_[0], + graph_container, + c_identifier_ptr, c_distance_ptr, c_predecessor_ptr, source) - elif data_type == np.float64: - graph_double = GraphCSRView[int, int, double]( c_offsets_ptr, - c_indices_ptr, - c_weights_ptr, - num_verts, - num_edges) - graph_double.get_vertex_identifiers( c_identifier_ptr) - c_sssp.sssp[int, int, double](graph_double, + elif weight_t == np.float64: + c_sssp.call_sssp[int, double](handle_[0], + graph_container, + c_identifier_ptr, c_distance_ptr, c_predecessor_ptr, source) else: # This case should not happen raise NotImplementedError else: - # FIXME: Something might be done here considering WT = float - graph_float = GraphCSRView[int, int, float]( c_offsets_ptr, - c_indices_ptr, - NULL, - num_verts, - num_edges) - graph_float.get_vertex_identifiers( c_identifier_ptr) - c_bfs.bfs[int, int, float](handle_ptr.get()[0], - graph_float, + c_bfs.call_bfs[int, float](handle_[0], + graph_container, + c_identifier_ptr, c_distance_ptr, c_predecessor_ptr, NULL, - source) + source, + 1) return df