diff --git a/.github/ISSUE_TEMPLATE/documentation_request.md b/.github/ISSUE_TEMPLATE/documentation_request.md index 595a87e191e..939fcc13cb4 100644 --- a/.github/ISSUE_TEMPLATE/documentation_request.md +++ b/.github/ISSUE_TEMPLATE/documentation_request.md @@ -7,29 +7,15 @@ assignees: '' --- -## Report incorrect documentation +## Report incorrect or missing documentation -**Location of incorrect documentation** +**Location of documentation** Provide links and line numbers if applicable. **Describe the problems or issues found in the documentation** A clear and concise description of what you found to be incorrect. -**Steps taken to verify documentation is incorrect** -List any steps you have taken: - **Suggested fix for documentation** Detail proposed changes to fix the documentation if you have any. ---- - -## Report needed documentation - -**Report needed documentation** -A clear and concise description of what documentation you believe it is needed and why. - -**Describe the documentation you'd like** -A clear and concise description of what you want to happen. -**Steps taken to search for needed documentation** -List any steps you have taken: \ No newline at end of file diff --git a/.github/ISSUE_TEMPLATE/enhancement-request.md b/.github/ISSUE_TEMPLATE/enhancement-request.md deleted file mode 100644 index 95cae6fd929..00000000000 --- a/.github/ISSUE_TEMPLATE/enhancement-request.md +++ /dev/null @@ -1,17 +0,0 @@ ---- -name: Enhancement request -about: 'Suggest an improveement to a feature ' -title: "[ENH]" -labels: "? - Needs Triage, Enhancement" -assignees: '' - ---- - -**Describe the solution you'd like** -A clear and concise description of what you want to happen. - -**Describe alternatives you've considered** -A clear and concise description of any alternative solutions or features you've considered. - -**Additional context** -Add any other context or screenshots about the feature request here. diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md index e5e02a4cb2d..f0a31093487 100644 --- a/.github/ISSUE_TEMPLATE/feature_request.md +++ b/.github/ISSUE_TEMPLATE/feature_request.md @@ -7,14 +7,22 @@ assignees: '' --- -**Is your feature request related to a problem? Please describe.** -A clear and concise description of what the problem is. Ex. I wish I could use cuGraph to do [...] -**Describe the solution you'd like** -A clear and concise description of what you want to happen. +**Describe the solution you'd like and any additional context** -**Describe alternatives you've considered** -A clear and concise description of any alternative solutions or features you've considered. -**Additional context** -Add any other context, code examples, or references to existing implementations about the feature request here. \ No newline at end of file + + + +--- +_For Developers below this line_ + + - [] Code passes CI + - [] Code uses Graph Primitives + - [] Code in C++/CUDA layer + - [] Code in C layer + - [] Code in pylibcugraph Python layer + - [] Code in cugraph Python layer + - [] Documentation + - [] Test cases at each layer (for MG tests, a note in the PR description indicating the new/existing MG tests were run and passed) + diff --git a/.github/ISSUE_TEMPLATE/question.md b/.github/ISSUE_TEMPLATE/question.md index a9b590525aa..16157e171a4 100644 --- a/.github/ISSUE_TEMPLATE/question.md +++ b/.github/ISSUE_TEMPLATE/question.md @@ -7,4 +7,4 @@ assignees: '' --- -Ask a question that could be converted into a feature or enhancement +Ask a question diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 14440e20892..870c553815f 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -135,9 +135,7 @@ rapids_cpm_init() # following public header-only raft dependencies: # * RMM # * Thrust -# * libcu++ # * GTest/GMock -# * cuCollections # # The CMakeLists.txt for each of these projects are properly configured # to generate a build and install export-set, so reimplementing finding or @@ -145,6 +143,11 @@ rapids_cpm_init() # error-prone if something about those targets change and our implementation # lags behind. ### + +# Putting this before raft to override RAFT from pulling them in. +include(cmake/thirdparty/get_libcudacxx.cmake) +include(cmake/thirdparty/get_cuco.cmake) + include(cmake/thirdparty/get_raft.cmake) include(cmake/thirdparty/get_libcugraphops.cmake) @@ -290,6 +293,7 @@ target_link_libraries(cugraph PUBLIC cugraph-ops::cugraph-ops++ raft::raft + cuco::cuco PRIVATE cugraph::cuHornet NCCL::NCCL @@ -405,6 +409,8 @@ target_link_libraries(cugraph_c CUDA::curand CUDA::cusolver CUDA::cusparse + raft::raft + cuco::cuco PRIVATE cugraph::cugraph ) diff --git a/cpp/cmake/thirdparty/get_cuco.cmake b/cpp/cmake/thirdparty/get_cuco.cmake new file mode 100644 index 00000000000..63453071ece --- /dev/null +++ b/cpp/cmake/thirdparty/get_cuco.cmake @@ -0,0 +1,34 @@ +#============================================================================= +# Copyright (c) 2021-2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#============================================================================= + +function(find_and_configure_cuco VERSION) + + rapids_cpm_find(cuco ${VERSION} + GLOBAL_TARGETS cuco::cuco + BUILD_EXPORT_SET cugraph-exports + CPM_ARGS + EXCLUDE_FROM_ALL TRUE + GIT_REPOSITORY https://github.com/NVIDIA/cuCollections.git + GIT_TAG 0ca860b824f5dc22cf8a41f09912e62e11f07d82 + OPTIONS "BUILD_TESTS OFF" + "BUILD_BENCHMARKS OFF" + "BUILD_EXAMPLES OFF" + ) + +endfunction() + +# cuCollections doesn't have a version yet +find_and_configure_cuco(0.0) diff --git a/cpp/cmake/thirdparty/get_libcudacxx.cmake b/cpp/cmake/thirdparty/get_libcudacxx.cmake new file mode 100644 index 00000000000..41e5998a448 --- /dev/null +++ b/cpp/cmake/thirdparty/get_libcudacxx.cmake @@ -0,0 +1,24 @@ +# ============================================================================= +# Copyright (c) 2020-2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. +# ============================================================================= + +# This function finds libcudacxx and sets any additional necessary environment variables. +function(find_and_configure_libcudacxx) + include(${rapids-cmake-dir}/cpm/libcudacxx.cmake) + + rapids_cpm_libcudacxx(BUILD_EXPORT_SET cugraph-exports + INSTALL_EXPORT_SET cugraph-exports) + +endfunction() + +find_and_configure_libcudacxx() diff --git a/cpp/cmake/thirdparty/get_raft.cmake b/cpp/cmake/thirdparty/get_raft.cmake index c659424fea9..674348453db 100644 --- a/cpp/cmake/thirdparty/get_raft.cmake +++ b/cpp/cmake/thirdparty/get_raft.cmake @@ -32,6 +32,7 @@ function(find_and_configure_raft) BUILD_EXPORT_SET cugraph-exports INSTALL_EXPORT_SET cugraph-exports CPM_ARGS + EXCLUDE_FROM_ALL TRUE GIT_REPOSITORY https://github.com/${PKG_FORK}/raft.git GIT_TAG ${PKG_PINNED_TAG} SOURCE_SUBDIR cpp @@ -39,6 +40,7 @@ function(find_and_configure_raft) "RAFT_COMPILE_LIBRARIES OFF" "BUILD_TESTS OFF" "BUILD_BENCH OFF" + "RAFT_ENABLE_cuco_DEPENDENCY OFF" ) if(raft_ADDED) diff --git a/cpp/include/cugraph/prims/copy_v_transform_reduce_in_out_nbr.cuh b/cpp/include/cugraph/prims/copy_v_transform_reduce_in_out_nbr.cuh index 0f944a178e9..04fc797517a 100644 --- a/cpp/include/cugraph/prims/copy_v_transform_reduce_in_out_nbr.cuh +++ b/cpp/include/cugraph/prims/copy_v_transform_reduce_in_out_nbr.cuh @@ -503,7 +503,7 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle, auto execution_policy = handle.get_thrust_policy(); if constexpr (GraphViewType::is_multi_gpu) { - minor_tmp_buffer.fill(minor_init, handle.get_stream()); + minor_tmp_buffer.fill(handle, minor_init); } else { thrust::fill(execution_policy, vertex_value_output_first, diff --git a/cpp/include/cugraph/prims/count_if_v.cuh b/cpp/include/cugraph/prims/count_if_v.cuh index 76b889bc365..215fe125a87 100644 --- a/cpp/include/cugraph/prims/count_if_v.cuh +++ b/cpp/include/cugraph/prims/count_if_v.cuh @@ -27,6 +27,24 @@ namespace cugraph { +namespace detail { + +template +struct count_if_call_v_op_t { + vertex_t local_vertex_partition_range_first{}; + VertexValueInputIterator vertex_value_input_first{}; + VertexOp v_op{}; + + __device__ bool operator()(vertex_t i) + { + return v_op(local_vertex_partition_range_first + i, *(vertex_value_input_first + i)) + ? vertex_t{1} + : vertex_t{0}; + } +}; + +} // namespace detail + /** * @brief Count the number of vertices that satisfies the given predicate. * @@ -42,8 +60,8 @@ namespace cugraph { * @param vertex_value_input_first Iterator pointing to the vertex properties for the first * (inclusive) vertex (assigned to this process in multi-GPU). `vertex_value_input_last` (exclusive) * is deduced as @p vertex_value_input_first + @p graph_view.local_vertex_partition_range_size(). - * @param v_op Unary operator takes *(@p vertex_value_input_first + i) (where i is [0, @p - * graph_view.local_vertex_partition_range_size())) and returns true if this vertex should be + * @param v_op Binary operator takes vertex ID and *(@p vertex_value_input_first + i) (where i is + * [0, @p graph_view.local_vertex_partition_range_size())) and returns true if this vertex should be * included in the returned count. * @return GraphViewType::vertex_type Number of times @p v_op returned true. */ @@ -53,47 +71,16 @@ typename GraphViewType::vertex_type count_if_v(raft::handle_t const& handle, VertexValueInputIterator vertex_value_input_first, VertexOp v_op) { - auto count = - thrust::count_if(handle.get_thrust_policy(), - vertex_value_input_first, - vertex_value_input_first + graph_view.local_vertex_partition_range_size(), - v_op); - if (GraphViewType::is_multi_gpu) { - count = - host_scalar_allreduce(handle.get_comms(), count, raft::comms::op_t::SUM, handle.get_stream()); - } - return count; -} + using vertex_t = typename GraphViewType::vertex_type; -/** - * @brief Count the number of vertices that satisfies the given predicate. - * - * This version (conceptually) iterates over only a subset of the graph vertices. This function - * actually works as thrust::count_if() on [@p input_first, @p input_last) (followed by - * inter-process reduction in multi-GPU). @p input_last - @p input_first (or the sum of @p - * input_last - @p input_first values in multi-GPU) should not overflow GraphViewType::vertex_type. - * - * @tparam GraphViewType Type of the passed non-owning graph object. - * @tparam InputIterator Type of the iterator for input values. - * @tparam VertexOp VertexOp Type of the unary predicate operator. - * @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and - * handles to various CUDA libraries) to run graph algorithms. - * @param graph_view Non-owning graph object. - * @param input_first Iterator pointing to the beginning (inclusive) of the values to be passed to - * @p v_op. - * @param input_last Iterator pointing to the end (exclusive) of the values to be passed to @p v_op. - * @param v_op Unary operator takes *(@p input_first + i) (where i is [0, @p input_last - @p - * input_first)) and returns true if this vertex should be included in the returned count. - * @return GraphViewType::vertex_type Number of times @p v_op returned true. - */ -template -typename GraphViewType::vertex_type count_if_v(raft::handle_t const& handle, - GraphViewType const& graph_view, - InputIterator input_first, - InputIterator input_last, - VertexOp v_op) -{ - auto count = thrust::count_if(handle.get_thrust_policy(), input_first, input_last, v_op); + auto it = thrust::make_transform_iterator( + thrust::make_counting_iterator(vertex_t{0}), + detail::count_if_call_v_op_t{ + graph_view.local_vertex_partition_range_first(), vertex_value_input_first, v_op}); + auto count = thrust::reduce(handle.get_thrust_policy(), + it, + it + graph_view.local_vertex_partition_range_size(), + vertex_t{0}); if (GraphViewType::is_multi_gpu) { count = host_scalar_allreduce(handle.get_comms(), count, raft::comms::op_t::SUM, handle.get_stream()); diff --git a/cpp/include/cugraph/prims/edge_partition_src_dst_property.cuh b/cpp/include/cugraph/prims/edge_partition_src_dst_property.cuh index b01ead105ae..25c28eca1e7 100644 --- a/cpp/include/cugraph/prims/edge_partition_src_dst_property.cuh +++ b/cpp/include/cugraph/prims/edge_partition_src_dst_property.cuh @@ -254,10 +254,12 @@ class edge_partition_major_property_t { edge_partition_major_value_start_offsets_ = std::nullopt; } - void fill(T value, rmm::cuda_stream_view stream) + void fill(raft::handle_t const& handle, T value) { - thrust::fill( - rmm::exec_policy(stream), value_data(), value_data() + size_dataframe_buffer(buffer_), value); + thrust::fill(handle.get_thrust_policy(), + value_data(), + value_data() + size_dataframe_buffer(buffer_), + value); } auto key_first() { return key_first_; } @@ -267,6 +269,7 @@ class edge_partition_major_property_t { (*edge_partition_key_offsets_).back()) : std::nullopt; } + auto value_data() { return get_dataframe_buffer_begin(buffer_); } auto device_view() const @@ -351,14 +354,17 @@ class edge_partition_minor_property_t { shrink_to_fit_dataframe_buffer(buffer_, handle.get_stream()); } - void fill(T value, rmm::cuda_stream_view stream) + void fill(raft::handle_t const& handle, T value) { - thrust::fill( - rmm::exec_policy(stream), value_data(), value_data() + size_dataframe_buffer(buffer_), value); + thrust::fill(handle.get_thrust_policy(), + value_data(), + value_data() + size_dataframe_buffer(buffer_), + value); } auto key_first() { return key_first_; } auto key_last() { return key_last_; } + auto value_data() { return get_dataframe_buffer_begin(buffer_); } auto device_view() const @@ -480,7 +486,7 @@ class edge_partition_src_property_t { void clear(raft::handle_t const& handle) { property_.clear(handle); } - void fill(T value, rmm::cuda_stream_view stream) { property_.fill(value, stream); } + void fill(raft::handle_t const& handle, T value) { property_.fill(handle, value); } auto key_first() { return property_.key_first(); } auto key_last() { return property_.key_last(); } @@ -561,7 +567,7 @@ class edge_partition_dst_property_t { void clear(raft::handle_t const& handle) { property_.clear(handle); } - void fill(T value, rmm::cuda_stream_view stream) { property_.fill(value, stream); } + void fill(raft::handle_t const& handle, T value) { property_.fill(handle, value); } auto key_first() { return property_.key_first(); } auto key_last() { return property_.key_last(); } diff --git a/cpp/include/cugraph/prims/property_op_utils.cuh b/cpp/include/cugraph/prims/property_op_utils.cuh index c50a1fde93f..fc48df64c3c 100644 --- a/cpp/include/cugraph/prims/property_op_utils.cuh +++ b/cpp/include/cugraph/prims/property_op_utils.cuh @@ -191,7 +191,7 @@ struct property_op, Op> private: template - __host__ __device__ constexpr auto sum_impl(T& t1, T& t2, std::index_sequence) + __host__ __device__ constexpr auto binary_op_impl(T& t1, T& t2, std::index_sequence) { return thrust::make_tuple((Op::type>()( thrust::get(t1), thrust::get(t2)))...); @@ -200,7 +200,7 @@ struct property_op, Op> public: __host__ __device__ constexpr auto operator()(const Type& t1, const Type& t2) { - return sum_impl(t1, t2, std::make_index_sequence::value>()); + return binary_op_impl(t1, t2, std::make_index_sequence::value>()); } }; diff --git a/cpp/include/cugraph/prims/reduce_v.cuh b/cpp/include/cugraph/prims/reduce_v.cuh index b2981c2693f..fa4e59b7a65 100644 --- a/cpp/include/cugraph/prims/reduce_v.cuh +++ b/cpp/include/cugraph/prims/reduce_v.cuh @@ -68,44 +68,4 @@ T reduce_v(raft::handle_t const& handle, return ret; } -/** - * @brief Reduce the vertex properties. - * - * This version (conceptually) iterates over only a subset of the graph vertices. This function - * actually works as thrust::reduce() on [@p input_first, @p input_last) (followed by - * inter-process reduction in multi-GPU). - * - * @tparam GraphViewType Type of the passed non-owning graph object. - * @tparam InputIterator Type of the iterator for input values. - * @tparam T Type of the initial value. - * @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and - * handles to various CUDA libraries) to run graph algorithms. - * @param graph_view Non-owning graph object. - * @param input_first Iterator pointing to the beginning (inclusive) of the values to be reduced. - * @param input_last Iterator pointing to the end (exclusive) of the values to be reduced. - * @param init Initial value to be added to the reduced input vertex properties. - * @return T Reduction of the input vertex properties. - */ -template -T reduce_v(raft::handle_t const& handle, - GraphViewType const& graph_view, - InputIterator input_first, - InputIterator input_last, - T init = T{}, - raft::comms::op_t op = raft::comms::op_t::SUM) -{ - auto ret = op_dispatch(op, [&handle, &graph_view, input_first, input_last, init](auto op) { - return thrust::reduce( - handle.get_thrust_policy(), - input_first, - input_last, - ((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init, - op); - }); - if constexpr (GraphViewType::is_multi_gpu) { - ret = host_scalar_allreduce(handle.get_comms(), ret, op, handle.get_stream()); - } - return ret; -} - } // namespace cugraph diff --git a/cpp/include/cugraph/prims/transform_reduce_v.cuh b/cpp/include/cugraph/prims/transform_reduce_v.cuh index 20646c9963e..fedc93d783f 100644 --- a/cpp/include/cugraph/prims/transform_reduce_v.cuh +++ b/cpp/include/cugraph/prims/transform_reduce_v.cuh @@ -27,6 +27,22 @@ namespace cugraph { +namespace detail { + +template +struct transform_reduce_call_v_op_t { + vertex_t local_vertex_partition_range_first{}; + VertexValueInputIterator vertex_value_input_first{}; + VertexOp v_op{}; + + __device__ T operator()(vertex_t i) + { + return v_op(local_vertex_partition_range_first + i, *(vertex_value_input_first + i)); + } +}; + +} // namespace detail + /** * @brief Apply an operator to the vertex properties and reduce. * @@ -43,8 +59,9 @@ namespace cugraph { * @param vertex_value_input_first Iterator pointing to the vertex properties for the first * (inclusive) vertex (assigned to this process in multi-GPU). `vertex_value_input_last` (exclusive) * is deduced as @p vertex_value_input_first + @p graph_view.local_vertex_partition_range_size(). - * @param v_op Unary operator takes *(@p vertex_value_input_first + i) (where i is [0, @p - * graph_view.local_vertex_partition_range_size())) and returns a transformed value to be reduced. + * @param v_op Binary operator takes vertex ID and *(@p vertex_value_input_first + i) (where i is + * [0, @p graph_view.local_vertex_partition_range_size())) and returns a transformed value to be + * reduced. * @param init Initial value to be added to the transform-reduced input vertex properties. * @return T Reduction of the @p v_op outputs. */ @@ -56,61 +73,19 @@ T transform_reduce_v(raft::handle_t const& handle, T init, raft::comms::op_t op = raft::comms::op_t::SUM) { - auto id = identity_element(op); - auto ret = - op_dispatch(op, [&handle, &graph_view, vertex_value_input_first, v_op, id, init](auto op) { - return thrust::transform_reduce( - handle.get_thrust_policy(), - vertex_value_input_first, - vertex_value_input_first + graph_view.local_vertex_partition_range_size(), - v_op, - ((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? id : init, - op); - }); - if (GraphViewType::is_multi_gpu) { - ret = host_scalar_allreduce(handle.get_comms(), ret, op, handle.get_stream()); - } - return ret; -} + using vertex_t = typename GraphViewType::vertex_type; -/** - * @brief Apply an operator to the vertex properties and reduce. - * - * This version (conceptually) iterates over only a subset of the graph vertices. This function - * actually works as thrust::transform_reduce() on [@p input_first, @p input_last) (followed by - * inter-process reduction in multi-GPU). - * - * @tparam GraphViewType Type of the passed non-owning graph object. - * @tparam InputIterator Type of the iterator for input values. - * @tparam VertexOp - * @tparam T Type of the initial value. - * @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and - * handles to various CUDA libraries) to run graph algorithms. - * @param graph_view Non-owning graph object. - * @param input_first Iterator pointing to the beginning (inclusive) of the values to be passed to - * @p v_op. - * @param input_last Iterator pointing to the end (exclusive) of the values to be passed to @p v_op. - * @param v_op Unary operator takes *(@p input_first + i) (where i is [0, @p input_last - @p - * input_first)) and returns a transformed value to be reduced. - * @param init Initial value to be added to the transform-reduced input vertex properties. - * @return T Reduction of the @p v_op outputs. - */ -template -T transform_reduce_v(raft::handle_t const& handle, - GraphViewType const& graph_view, - InputIterator input_first, - InputIterator input_last, - VertexOp v_op, - T init = T{}, - raft::comms::op_t op = raft::comms::op_t::SUM) -{ - auto ret = op_dispatch(op, [&handle, input_first, input_last, v_op, init](auto op) { - return thrust::transform_reduce( + auto id = identity_element(op); + auto it = thrust::make_transform_iterator( + thrust::make_counting_iterator(vertex_t{0}), + detail::transform_reduce_call_v_op_t{ + graph_view.local_vertex_partition_range_first(), vertex_value_input_first, v_op}); + auto ret = op_dispatch(op, [&handle, &graph_view, it, id, init](auto op) { + return thrust::reduce( handle.get_thrust_policy(), - input_first, - input_last, - v_op, - ((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init, + it, + it + graph_view.local_vertex_partition_range_size(), + ((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? id : init, op); }); if (GraphViewType::is_multi_gpu) { diff --git a/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh b/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh index fe2d632a342..027c2763e0e 100644 --- a/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh +++ b/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh @@ -131,7 +131,7 @@ template -struct call_v_op_t { +struct update_frontier_call_v_op_t { VertexValueInputIterator vertex_value_input_first{}; VertexValueOutputIterator vertex_value_output_first{}; VertexOp v_op{}; @@ -835,7 +835,7 @@ typename GraphViewType::edge_type compute_num_out_nbrs_from_frontier( edge_t ret{0}; - auto const& cur_frontier_bucket = frontier.get_bucket(cur_frontier_bucket_idx); + auto const& cur_frontier_bucket = frontier.bucket(cur_frontier_bucket_idx); vertex_t const* local_frontier_vertex_first{nullptr}; vertex_t const* local_frontier_vertex_last{nullptr}; if constexpr (std::is_same_v) { @@ -952,11 +952,11 @@ typename GraphViewType::edge_type compute_num_out_nbrs_from_frontier( * @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and * handles to various CUDA libraries) to run graph algorithms. * @param graph_view Non-owning graph object. - * @param frontier VertexFrontier class object for vertex frontier managements. This object includes - * multiple bucket objects. - * @param cur_frontier_bucket_idx Index of the VertexFrontier bucket holding vertices for the + * @param frontier VertexFrontierType class object for vertex frontier managements. This object + * includes multiple bucket objects. + * @param cur_frontier_bucket_idx Index of the vertex frontier bucket holding vertices for the * current iteration. - * @param next_frontier_bucket_indices Indices of the VertexFrontier buckets to store new frontier + * @param next_frontier_bucket_indices Indices of the vertex frontier buckets to store new frontier * vertices for the next iteration. * @param edge_partition_src_value_input Device-copyable wrapper used to access source input * property values (for the edge sources assigned to this process in multi-GPU). Use either @@ -1031,8 +1031,8 @@ void update_frontier_v_push_if_out_nbr( using key_t = typename VertexFrontierType::key_type; using payload_t = typename ReduceOp::type; - auto frontier_key_first = frontier.get_bucket(cur_frontier_bucket_idx).begin(); - auto frontier_key_last = frontier.get_bucket(cur_frontier_bucket_idx).end(); + auto frontier_key_first = frontier.bucket(cur_frontier_bucket_idx).begin(); + auto frontier_key_last = frontier.bucket(cur_frontier_bucket_idx).end(); // 1. fill the buffer @@ -1334,7 +1334,7 @@ void update_frontier_v_push_if_out_nbr( // 3. update vertex properties and frontier if (num_buffer_elements > 0) { - static_assert(VertexFrontierType::kNumBuckets <= std::numeric_limits::max()); + assert(frontier.num_buckets() <= std::numeric_limits::max()); rmm::device_uvector bucket_indices(num_buffer_elements, handle.get_stream()); auto vertex_partition = vertex_partition_device_view_t( @@ -1376,21 +1376,21 @@ void update_frontier_v_push_if_out_nbr( resize_dataframe_buffer(payload_buffer, size_t{0}, handle.get_stream()); shrink_to_fit_dataframe_buffer(payload_buffer, handle.get_stream()); } else { - thrust::transform( - handle.get_thrust_policy(), - get_dataframe_buffer_begin(key_buffer), - get_dataframe_buffer_begin(key_buffer) + num_buffer_elements, - bucket_indices.begin(), - detail::call_v_op_t{vertex_value_input_first, - vertex_value_output_first, - v_op, - vertex_partition, - VertexFrontierType::kInvalidBucketIdx}); + thrust::transform(handle.get_thrust_policy(), + get_dataframe_buffer_begin(key_buffer), + get_dataframe_buffer_begin(key_buffer) + num_buffer_elements, + bucket_indices.begin(), + detail::update_frontier_call_v_op_t{ + vertex_value_input_first, + vertex_value_output_first, + v_op, + vertex_partition, + VertexFrontierType::kInvalidBucketIdx}); } auto bucket_key_pair_first = thrust::make_zip_iterator( diff --git a/cpp/include/cugraph/prims/vertex_frontier.cuh b/cpp/include/cugraph/prims/vertex_frontier.cuh index 86b55ab3f16..54400c2af0c 100644 --- a/cpp/include/cugraph/prims/vertex_frontier.cuh +++ b/cpp/include/cugraph/prims/vertex_frontier.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,7 +40,7 @@ namespace cugraph { // stores unique key objects in the sorted (non-descending) order; key type is either vertex_t // (tag_t == void) or thrust::tuple (tag_t != void) template -class SortedUniqueKeyBucket { +class sorted_unique_key_bucket_t { static_assert(std::is_same_v || std::is_arithmetic_v); using optional_buffer_type = std:: @@ -48,13 +48,13 @@ class SortedUniqueKeyBucket { public: template >* = nullptr> - SortedUniqueKeyBucket(raft::handle_t const& handle) + sorted_unique_key_bucket_t(raft::handle_t const& handle) : handle_ptr_(&handle), vertices_(0, handle.get_stream()), tags_(std::byte{0}) { } template >* = nullptr> - SortedUniqueKeyBucket(raft::handle_t const& handle) + sorted_unique_key_bucket_t(raft::handle_t const& handle) : handle_ptr_(&handle), vertices_(0, handle.get_stream()), tags_(0, handle.get_stream()) { } @@ -277,32 +277,31 @@ class SortedUniqueKeyBucket { optional_buffer_type tags_; }; -template -class VertexFrontier { +template +class vertex_frontier_t { static_assert(std::is_same_v || std::is_arithmetic_v); public: using key_type = std::conditional_t, vertex_t, thrust::tuple>; - static size_t constexpr kNumBuckets = num_buckets; static size_t constexpr kInvalidBucketIdx{std::numeric_limits::max()}; - VertexFrontier(raft::handle_t const& handle) : handle_ptr_(&handle) + vertex_frontier_t(raft::handle_t const& handle, size_t num_buckets) : handle_ptr_(&handle) { + buckets_.reserve(num_buckets); for (size_t i = 0; i < num_buckets; ++i) { buckets_.emplace_back(handle); } } - SortedUniqueKeyBucket& get_bucket(size_t bucket_idx) + size_t num_buckets() const { return buckets_.size(); } + + sorted_unique_key_bucket_t& bucket(size_t bucket_idx) { return buckets_[bucket_idx]; } - SortedUniqueKeyBucket const& get_bucket(size_t bucket_idx) const + sorted_unique_key_bucket_t const& bucket(size_t bucket_idx) const { return buckets_[bucket_idx]; } @@ -317,12 +316,12 @@ class VertexFrontier { std::vector const& move_to_bucket_indices, SplitOp split_op) { - auto& this_bucket = get_bucket(this_bucket_idx); + auto& this_bucket = bucket(this_bucket_idx); if (this_bucket.size() == 0) { return; } // 1. apply split_op to each bucket element - static_assert(kNumBuckets <= std::numeric_limits::max()); + assert(buckets_.size() <= std::numeric_limits::max()); rmm::device_uvector bucket_indices(this_bucket.size(), handle_ptr_->get_stream()); thrust::transform( handle_ptr_->get_thrust_policy(), @@ -447,14 +446,14 @@ class VertexFrontier { // 2. insert to the target buckets for (size_t i = 0; i < insert_offsets.size(); ++i) { - get_bucket(insert_bucket_indices[i]) + bucket(insert_bucket_indices[i]) .insert(key_first + insert_offsets[i], key_first + (insert_offsets[i] + insert_sizes[i])); } } private: raft::handle_t const* handle_ptr_{nullptr}; - std::vector> buckets_{}; + std::vector> buckets_{}; }; } // namespace cugraph diff --git a/cpp/include/cugraph/utilities/cython.hpp b/cpp/include/cugraph/utilities/cython.hpp index a8a9e2524cd..9d697dbf95a 100644 --- a/cpp/include/cugraph/utilities/cython.hpp +++ b/cpp/include/cugraph/utilities/cython.hpp @@ -594,9 +594,10 @@ std::unique_ptr> call_shuffle( edgelist_major_vertices, // [IN / OUT]: groupby_gpu_id_and_shuffle_values() sorts in-place vertex_t* edgelist_minor_vertices, // [IN / OUT] weight_t* edgelist_weights, // [IN / OUT] - edge_t num_edgelist_edges); + edge_t num_edgelist_edges, + bool is_weighted); -// Wrapper for calling renumber_edeglist() inplace: +// Wrapper for calling renumber_edgelist() inplace: // template std::unique_ptr> call_renumber( diff --git a/cpp/src/centrality/katz_centrality_impl.cuh b/cpp/src/centrality/katz_centrality_impl.cuh index 1f0900c720f..1fa5c93c4b9 100644 --- a/cpp/src/centrality/katz_centrality_impl.cuh +++ b/cpp/src/centrality/katz_centrality_impl.cuh @@ -72,8 +72,10 @@ void katz_centrality(raft::handle_t const& handle, // FIXME: should I check for betas? if (has_initial_guess) { - auto num_negative_values = count_if_v( - handle, pull_graph_view, katz_centralities, [] __device__(auto val) { return val < 0.0; }); + auto num_negative_values = + count_if_v(handle, pull_graph_view, katz_centralities, [] __device__(auto, auto val) { + return val < 0.0; + }); CUGRAPH_EXPECTS(num_negative_values == 0, "Invalid input argument: initial guess values should be non-negative."); } @@ -132,7 +134,7 @@ void katz_centrality(raft::handle_t const& handle, handle, pull_graph_view, thrust::make_zip_iterator(thrust::make_tuple(new_katz_centralities, old_katz_centralities)), - [] __device__(auto val) { return std::abs(thrust::get<0>(val) - thrust::get<1>(val)); }, + [] __device__(auto, auto val) { return std::abs(thrust::get<0>(val) - thrust::get<1>(val)); }, result_t{0.0}); iter++; @@ -156,7 +158,7 @@ void katz_centrality(raft::handle_t const& handle, handle, pull_graph_view, katz_centralities, - [] __device__(auto val) { return val * val; }, + [] __device__(auto, auto val) { return val * val; }, result_t{0.0}); l2_norm = std::sqrt(l2_norm); CUGRAPH_EXPECTS(l2_norm > 0.0, diff --git a/cpp/src/community/louvain.cuh b/cpp/src/community/louvain.cuh index a558be1e198..a5551d76ae2 100644 --- a/cpp/src/community/louvain.cuh +++ b/cpp/src/community/louvain.cuh @@ -25,7 +25,6 @@ #include #include #include -#include #include #include diff --git a/cpp/src/components/weakly_connected_components_impl.cuh b/cpp/src/components/weakly_connected_components_impl.cuh index f4b76670a3f..fd2609eb242 100644 --- a/cpp/src/components/weakly_connected_components_impl.cuh +++ b/cpp/src/components/weakly_connected_components_impl.cuh @@ -179,8 +179,8 @@ struct v_op_t { // FIXME: we can use cuda::atomic instead but currently on a system with x86 + GPU, this requires // placing the atomic barrier on managed memory and this adds additional complication. size_t* num_edge_inserts{}; - size_t next_bucket_idx{}; - size_t conflict_bucket_idx{}; // relevant only if GraphViewType::is_multi_gpu is true + size_t bucket_idx_next{}; + size_t bucket_idx_conflict{}; // relevant only if GraphViewType::is_multi_gpu is true template __device__ std::enable_if_t>> @@ -195,11 +195,11 @@ struct v_op_t { atomicCAS(level_components + v_offset, invalid_component_id::value, tag); if (old != invalid_component_id::value && old != tag) { // conflict return thrust::optional>{ - thrust::make_tuple(conflict_bucket_idx, std::byte{0} /* dummy */)}; + thrust::make_tuple(bucket_idx_conflict, std::byte{0} /* dummy */)}; } else { return (old == invalid_component_id::value) ? thrust::optional>{thrust::make_tuple( - next_bucket_idx, std::byte{0} /* dummy */)} + bucket_idx_next, std::byte{0} /* dummy */)} : thrust::nullopt; } } @@ -209,7 +209,7 @@ struct v_op_t { operator()(thrust::tuple /* tagged_v */, int /* v_val */) const { return thrust::optional>{ - thrust::make_tuple(next_bucket_idx, std::byte{0} /* dummy */)}; + thrust::make_tuple(bucket_idx_next, std::byte{0} /* dummy */)}; } }; @@ -243,12 +243,11 @@ void weakly_connected_components_impl(raft::handle_t const& handle, // 2. recursively run multi-root frontier expansion - enum class Bucket { - cur, - next, - conflict /* relevant only if GraphViewType::is_multi_gpu is true */, - num_buckets - }; + constexpr size_t bucket_idx_cur = 0; + constexpr size_t bucket_idx_next = 1; + constexpr size_t bucket_idx_conflict = 2; + constexpr size_t num_buckets = 4; + // tuning parameter to balance work per iteration (should be large enough to be throughput // bounded) vs # conflicts between frontiers with different roots (# conflicts == # edges for the // next level) @@ -449,11 +448,8 @@ void weakly_connected_components_impl(raft::handle_t const& handle, // 2-3. initialize vertex frontier, edge_buffer, and edge_partition_dst_components (if // multi-gpu) - VertexFrontier(Bucket::num_buckets)> - vertex_frontier(handle); + vertex_frontier_t vertex_frontier(handle, + num_buckets); vertex_t next_candidate_offset{0}; edge_t edge_count{0}; @@ -468,8 +464,7 @@ void weakly_connected_components_impl(raft::handle_t const& handle, ? edge_partition_dst_property_t(handle, level_graph_view) : edge_partition_dst_property_t(handle); if constexpr (GraphViewType::is_multi_gpu) { - edge_partition_dst_components.fill(invalid_component_id::value, - handle.get_stream()); + edge_partition_dst_components.fill(handle, invalid_component_id::value); } // 2.4 iterate till every vertex gets visited @@ -502,33 +497,25 @@ void weakly_connected_components_impl(raft::handle_t const& handle, auto pair_first = thrust::make_zip_iterator(thrust::make_tuple(new_roots.begin(), new_roots.begin())); - vertex_frontier.get_bucket(static_cast(Bucket::cur)) - .insert(pair_first, pair_first + new_roots.size()); + vertex_frontier.bucket(bucket_idx_cur).insert(pair_first, pair_first + new_roots.size()); } - if (vertex_frontier.get_bucket(static_cast(Bucket::cur)).aggregate_size() == 0) { - break; - } + if (vertex_frontier.bucket(bucket_idx_cur).aggregate_size() == 0) { break; } if constexpr (GraphViewType::is_multi_gpu) { update_edge_partition_dst_property( handle, level_graph_view, - thrust::get<0>(vertex_frontier.get_bucket(static_cast(Bucket::cur)) - .begin() - .get_iterator_tuple()), - thrust::get<0>(vertex_frontier.get_bucket(static_cast(Bucket::cur)) - .end() - .get_iterator_tuple()), + thrust::get<0>(vertex_frontier.bucket(bucket_idx_cur).begin().get_iterator_tuple()), + thrust::get<0>(vertex_frontier.bucket(bucket_idx_cur).end().get_iterator_tuple()), level_components, edge_partition_dst_components); } - auto max_pushes = - GraphViewType::is_multi_gpu - ? compute_num_out_nbrs_from_frontier( - handle, level_graph_view, vertex_frontier, static_cast(Bucket::cur)) - : edge_count; + auto max_pushes = GraphViewType::is_multi_gpu + ? compute_num_out_nbrs_from_frontier( + handle, level_graph_view, vertex_frontier, bucket_idx_cur) + : edge_count; // FIXME: if we use cuco::static_map (no duplicates, ideally we need static_set), edge_buffer // size cannot exceed (# roots)^2 and we can avoid additional sort & unique (but resizing the @@ -540,10 +527,9 @@ void weakly_connected_components_impl(raft::handle_t const& handle, handle, level_graph_view, vertex_frontier, - static_cast(Bucket::cur), - GraphViewType::is_multi_gpu ? std::vector{static_cast(Bucket::next), - static_cast(Bucket::conflict)} - : std::vector{static_cast(Bucket::next)}, + bucket_idx_cur, + GraphViewType::is_multi_gpu ? std::vector{bucket_idx_next, bucket_idx_conflict} + : std::vector{bucket_idx_next}, dummy_property_t{}.device_view(), dummy_property_t{}.device_view(), [col_components = @@ -579,12 +565,12 @@ void weakly_connected_components_impl(raft::handle_t const& handle, level_components, get_dataframe_buffer_begin(edge_buffer), num_edge_inserts.data(), - static_cast(Bucket::next), - static_cast(Bucket::conflict)}); + bucket_idx_next, + bucket_idx_conflict}); if (GraphViewType::is_multi_gpu) { auto cur_num_edge_inserts = num_edge_inserts.value(handle.get_stream()); - auto& conflict_bucket = vertex_frontier.get_bucket(static_cast(Bucket::conflict)); + auto& conflict_bucket = vertex_frontier.bucket(bucket_idx_conflict); resize_dataframe_buffer( edge_buffer, cur_num_edge_inserts + conflict_bucket.size(), handle.get_stream()); thrust::for_each( @@ -636,17 +622,13 @@ void weakly_connected_components_impl(raft::handle_t const& handle, num_edge_inserts.set_value_async(num_unique_edges, handle.get_stream()); } - vertex_frontier.get_bucket(static_cast(Bucket::cur)).clear(); - vertex_frontier.get_bucket(static_cast(Bucket::cur)).shrink_to_fit(); - vertex_frontier.swap_buckets(static_cast(Bucket::cur), - static_cast(Bucket::next)); + vertex_frontier.bucket(bucket_idx_cur).clear(); + vertex_frontier.bucket(bucket_idx_cur).shrink_to_fit(); + vertex_frontier.swap_buckets(bucket_idx_cur, bucket_idx_next); edge_count = thrust::transform_reduce( handle.get_thrust_policy(), - thrust::get<0>(vertex_frontier.get_bucket(static_cast(Bucket::cur)) - .begin() - .get_iterator_tuple()), - thrust::get<0>( - vertex_frontier.get_bucket(static_cast(Bucket::cur)).end().get_iterator_tuple()), + thrust::get<0>(vertex_frontier.bucket(bucket_idx_cur).begin().get_iterator_tuple()), + thrust::get<0>(vertex_frontier.bucket(bucket_idx_cur).end().get_iterator_tuple()), [vertex_partition, degrees = degrees.data()] __device__(auto v) { return degrees[vertex_partition.local_vertex_partition_offset_from_vertex_nocheck(v)]; }, diff --git a/cpp/src/cores/core_number_impl.cuh b/cpp/src/cores/core_number_impl.cuh index b9d2d75c4fb..6d077e7085c 100644 --- a/cpp/src/cores/core_number_impl.cuh +++ b/cpp/src/cores/core_number_impl.cuh @@ -143,9 +143,11 @@ void core_number(raft::handle_t const& handle, // start iteration - enum class Bucket { cur, next, num_buckets }; - VertexFrontier(Bucket::num_buckets)> - vertex_frontier(handle); + constexpr size_t bucket_idx_cur = 0; + constexpr size_t bucket_idx_next = 1; + constexpr size_t num_buckets = 2; + + vertex_frontier_t vertex_frontier(handle, num_buckets); edge_partition_dst_property_t, edge_t> dst_core_numbers(handle, graph_view); @@ -177,15 +179,14 @@ void core_number(raft::handle_t const& handle, remaining_vertices.end(), [core_numbers, k, v_first = graph_view.local_vertex_partition_range_first()] __device__( auto v) { return core_numbers[v - v_first] >= k; }); - vertex_frontier.get_bucket(static_cast(Bucket::cur)) - .insert(less_than_k_first, remaining_vertices.end()); + vertex_frontier.bucket(bucket_idx_cur).insert(less_than_k_first, remaining_vertices.end()); remaining_vertices.resize(thrust::distance(remaining_vertices.begin(), less_than_k_first), handle.get_stream()); auto delta = (graph_view.is_symmetric() && (degree_type == k_core_degree_type_t::INOUT)) ? edge_t{2} : edge_t{1}; - if (vertex_frontier.get_bucket(static_cast(Bucket::cur)).aggregate_size() > 0) { + if (vertex_frontier.bucket(bucket_idx_cur).aggregate_size() > 0) { do { // FIXME: If most vertices have core numbers less than k, (dst_val >= k) will be mostly // false leading to too many unnecessary edge traversals (this is especially problematic if @@ -198,8 +199,8 @@ void core_number(raft::handle_t const& handle, handle, graph_view, vertex_frontier, - static_cast(Bucket::cur), - std::vector{static_cast(Bucket::next)}, + bucket_idx_cur, + std::vector{bucket_idx_next}, dummy_property_t{}.device_view(), dst_core_numbers.device_view(), [k, delta] __device__(vertex_t src, vertex_t dst, auto, auto dst_val) { @@ -219,7 +220,7 @@ void core_number(raft::handle_t const& handle, new_core_number = new_core_number < (k - delta) ? (k - delta) : new_core_number; new_core_number = new_core_number < k_first ? edge_t{0} : new_core_number; return thrust::optional>{ - thrust::make_tuple(static_cast(Bucket::next), new_core_number)}; + thrust::make_tuple(bucket_idx_next, new_core_number)}; }); } @@ -230,33 +231,31 @@ void core_number(raft::handle_t const& handle, CUGRAPH_FAIL("unimplemented."); } - update_edge_partition_dst_property( - handle, - graph_view, - vertex_frontier.get_bucket(static_cast(Bucket::next)).begin(), - vertex_frontier.get_bucket(static_cast(Bucket::next)).end(), - core_numbers, - dst_core_numbers); + update_edge_partition_dst_property(handle, + graph_view, + vertex_frontier.bucket(bucket_idx_next).begin(), + vertex_frontier.bucket(bucket_idx_next).end(), + core_numbers, + dst_core_numbers); - vertex_frontier.get_bucket(static_cast(Bucket::next)) + vertex_frontier.bucket(bucket_idx_next) .resize(static_cast(thrust::distance( - vertex_frontier.get_bucket(static_cast(Bucket::next)).begin(), + vertex_frontier.bucket(bucket_idx_next).begin(), thrust::remove_if( handle.get_thrust_policy(), - vertex_frontier.get_bucket(static_cast(Bucket::next)).begin(), - vertex_frontier.get_bucket(static_cast(Bucket::next)).end(), + vertex_frontier.bucket(bucket_idx_next).begin(), + vertex_frontier.bucket(bucket_idx_next).end(), [core_numbers, k, v_first = graph_view.local_vertex_partition_range_first()] __device__(auto v) { return core_numbers[v - v_first] >= k; })))); - vertex_frontier.get_bucket(static_cast(Bucket::next)).shrink_to_fit(); + vertex_frontier.bucket(bucket_idx_next).shrink_to_fit(); - vertex_frontier.get_bucket(static_cast(Bucket::cur)).clear(); - vertex_frontier.get_bucket(static_cast(Bucket::cur)).shrink_to_fit(); - vertex_frontier.swap_buckets(static_cast(Bucket::cur), - static_cast(Bucket::next)); - } while (vertex_frontier.get_bucket(static_cast(Bucket::cur)).aggregate_size() > 0); + vertex_frontier.bucket(bucket_idx_cur).clear(); + vertex_frontier.bucket(bucket_idx_cur).shrink_to_fit(); + vertex_frontier.swap_buckets(bucket_idx_cur, bucket_idx_next); + } while (vertex_frontier.bucket(bucket_idx_cur).aggregate_size() > 0); // FIXME: scanning the remaining vertices can add significant overhead if the number of // distinct core numbers in [k_first, std::min(max_degree, k_last)] is large and there are @@ -280,12 +279,15 @@ void core_number(raft::handle_t const& handle, v_to_core_number_t{core_numbers, graph_view.local_vertex_partition_range_first()}); auto min_core_number = - reduce_v(handle, - graph_view, - remaining_vertex_core_number_first, - remaining_vertex_core_number_first + remaining_vertices.size(), - std::numeric_limits::max(), - raft::comms::op_t::MIN); + thrust::reduce(handle.get_thrust_policy(), + remaining_vertex_core_number_first, + remaining_vertex_core_number_first + remaining_vertices.size(), + std::numeric_limits::max(), + thrust::minimum{}); + if constexpr (multi_gpu) { + min_core_number = host_scalar_allreduce( + handle.get_comms(), min_core_number, raft::comms::op_t::MIN, handle.get_stream()); + } k = std::max(k + delta, static_cast(min_core_number + edge_t{delta})); } } diff --git a/cpp/src/link_analysis/hits_impl.cuh b/cpp/src/link_analysis/hits_impl.cuh index 5eb7fa2918c..5422c1d327f 100644 --- a/cpp/src/link_analysis/hits_impl.cuh +++ b/cpp/src/link_analysis/hits_impl.cuh @@ -35,12 +35,7 @@ void normalize(raft::handle_t const& handle, result_t* hubs, raft::comms::op_t op) { - auto hubs_norm = reduce_v(handle, - graph_view, - hubs, - hubs + graph_view.local_vertex_partition_range_size(), - identity_element(op), - op); + auto hubs_norm = reduce_v(handle, graph_view, hubs, identity_element(op), op); CUGRAPH_EXPECTS(hubs_norm > 0, "Norm is required to be a positive value."); thrust::transform(handle.get_thrust_policy(), hubs, @@ -80,7 +75,7 @@ std::tuple hits(raft::handle_t const& handle, // Check validity of initial guess if supplied if (has_initial_hubs_guess && do_expensive_check) { auto num_negative_values = - count_if_v(handle, graph_view, hubs, [] __device__(auto val) { return val < 0.0; }); + count_if_v(handle, graph_view, hubs, [] __device__(auto, auto val) { return val < 0.0; }); CUGRAPH_EXPECTS(num_negative_values == 0, "Invalid input argument: initial guess values should be non-negative."); } @@ -102,7 +97,7 @@ std::tuple hits(raft::handle_t const& handle, if (has_initial_hubs_guess) { update_edge_partition_src_property(handle, graph_view, prev_hubs, prev_src_hubs); } else { - prev_src_hubs.fill(result_t{1.0} / num_vertices, handle.get_stream()); + prev_src_hubs.fill(handle, result_t{1.0} / num_vertices); thrust::fill(handle.get_thrust_policy(), prev_hubs, prev_hubs + graph_view.local_vertex_partition_range_size(), @@ -144,7 +139,7 @@ std::tuple hits(raft::handle_t const& handle, handle, graph_view, thrust::make_zip_iterator(thrust::make_tuple(curr_hubs, prev_hubs)), - [] __device__(auto val) { return std::abs(thrust::get<0>(val) - thrust::get<1>(val)); }, + [] __device__(auto, auto val) { return std::abs(thrust::get<0>(val) - thrust::get<1>(val)); }, result_t{0}); if (diff_sum < epsilon) { final_iteration_count = iter; diff --git a/cpp/src/link_analysis/pagerank_impl.cuh b/cpp/src/link_analysis/pagerank_impl.cuh index c262d913f7c..5d4c7bde1ed 100644 --- a/cpp/src/link_analysis/pagerank_impl.cuh +++ b/cpp/src/link_analysis/pagerank_impl.cuh @@ -90,10 +90,11 @@ void pagerank( if (do_expensive_check) { if (precomputed_vertex_out_weight_sums) { - auto num_negative_precomputed_vertex_out_weight_sums = count_if_v( - handle, pull_graph_view, *precomputed_vertex_out_weight_sums, [] __device__(auto val) { - return val < result_t{0.0}; - }); + auto num_negative_precomputed_vertex_out_weight_sums = + count_if_v(handle, + pull_graph_view, + *precomputed_vertex_out_weight_sums, + [] __device__(auto, auto val) { return val < result_t{0.0}; }); CUGRAPH_EXPECTS( num_negative_precomputed_vertex_out_weight_sums == 0, "Invalid input argument: outgoing edge weight sum values should be non-negative."); @@ -112,7 +113,7 @@ void pagerank( if (has_initial_guess) { auto num_negative_values = count_if_v( - handle, pull_graph_view, pageranks, [] __device__(auto val) { return val < 0.0; }); + handle, pull_graph_view, pageranks, [] __device__(auto, auto val) { return val < 0.0; }); CUGRAPH_EXPECTS(num_negative_values == 0, "Invalid input argument: initial guess values should be non-negative."); } @@ -121,21 +122,28 @@ void pagerank( auto vertex_partition = vertex_partition_device_view_t( pull_graph_view.local_vertex_partition_view()); auto num_invalid_vertices = - count_if_v(handle, - pull_graph_view, - *personalization_vertices, - *personalization_vertices + *personalization_vector_size, - [vertex_partition] __device__(auto val) { - return !(vertex_partition.is_valid_vertex(val) && - vertex_partition.in_local_vertex_partition_range_nocheck(val)); - }); + thrust::count_if(handle.get_thrust_policy(), + *personalization_vertices, + *personalization_vertices + *personalization_vector_size, + [vertex_partition] __device__(auto val) { + return !(vertex_partition.is_valid_vertex(val) && + vertex_partition.in_local_vertex_partition_range_nocheck(val)); + }); + if constexpr (GraphViewType::is_multi_gpu) { + num_invalid_vertices = host_scalar_allreduce( + handle.get_comms(), num_invalid_vertices, raft::comms::op_t::SUM, handle.get_stream()); + } CUGRAPH_EXPECTS(num_invalid_vertices == 0, "Invalid input argument: peresonalization vertices have invalid vertex IDs."); - auto num_negative_values = count_if_v(handle, - pull_graph_view, - *personalization_values, - *personalization_values + *personalization_vector_size, - [] __device__(auto val) { return val < 0.0; }); + auto num_negative_values = + thrust::count_if(handle.get_thrust_policy(), + *personalization_values, + *personalization_values + *personalization_vector_size, + [] __device__(auto val) { return val < 0.0; }); + if constexpr (GraphViewType::is_multi_gpu) { + num_negative_values = host_scalar_allreduce( + handle.get_comms(), num_negative_values, raft::comms::op_t::SUM, handle.get_stream()); + } CUGRAPH_EXPECTS(num_negative_values == 0, "Invalid input argument: peresonalization values should be non-negative."); } @@ -174,11 +182,14 @@ void pagerank( result_t personalization_sum{0.0}; if (aggregate_personalization_vector_size > 0) { - personalization_sum = reduce_v(handle, - pull_graph_view, - *personalization_values, - *personalization_values + *personalization_vector_size, - result_t{0.0}); + personalization_sum = thrust::reduce(handle.get_thrust_policy(), + *personalization_values, + *personalization_values + *personalization_vector_size, + result_t{0.0}); + if constexpr (GraphViewType::is_multi_gpu) { + personalization_sum = host_scalar_allreduce( + handle.get_comms(), personalization_sum, raft::comms::op_t::SUM, handle.get_stream()); + } CUGRAPH_EXPECTS(personalization_sum > 0.0, "Invalid input argument: sum of personalization valuese " "should be positive."); @@ -205,7 +216,7 @@ void pagerank( handle, pull_graph_view, vertex_val_first, - [] __device__(auto val) { + [] __device__(auto, auto val) { auto const pagerank = thrust::get<0>(val); auto const out_weight_sum = thrust::get<1>(val); return out_weight_sum == result_t{0.0} ? pagerank : result_t{0.0}; @@ -266,7 +277,7 @@ void pagerank( handle, pull_graph_view, thrust::make_zip_iterator(thrust::make_tuple(pageranks, old_pageranks.data())), - [] __device__(auto val) { return std::abs(thrust::get<0>(val) - thrust::get<1>(val)); }, + [] __device__(auto, auto val) { return std::abs(thrust::get<0>(val) - thrust::get<1>(val)); }, result_t{0.0}); iter++; diff --git a/cpp/src/traversal/bfs_impl.cuh b/cpp/src/traversal/bfs_impl.cuh index bdc1f8ff602..525328ab563 100644 --- a/cpp/src/traversal/bfs_impl.cuh +++ b/cpp/src/traversal/bfs_impl.cuh @@ -17,7 +17,6 @@ #include #include -#include #include #include #include @@ -127,14 +126,17 @@ void bfs(raft::handle_t const& handle, auto vertex_partition = vertex_partition_device_view_t( push_graph_view.local_vertex_partition_view()); auto num_invalid_vertices = - count_if_v(handle, - push_graph_view, - sources, - sources + n_sources, - [vertex_partition] __device__(auto val) { - return !(vertex_partition.is_valid_vertex(val) && - vertex_partition.in_local_vertex_partition_range_nocheck(val)); - }); + thrust::count_if(handle.get_thrust_policy(), + sources, + sources + n_sources, + [vertex_partition] __device__(auto val) { + return !(vertex_partition.is_valid_vertex(val) && + vertex_partition.in_local_vertex_partition_range_nocheck(val)); + }); + if constexpr (GraphViewType::is_multi_gpu) { + num_invalid_vertices = host_scalar_allreduce( + handle.get_comms(), num_invalid_vertices, raft::comms::op_t::SUM, handle.get_stream()); + } CUGRAPH_EXPECTS(num_invalid_vertices == 0, "Invalid input argument: sources have invalid vertex IDs."); } @@ -166,14 +168,15 @@ void bfs(raft::handle_t const& handle, } // 3. initialize BFS frontier - enum class Bucket { cur, next, num_buckets }; - VertexFrontier(Bucket::num_buckets)> - vertex_frontier(handle); - - vertex_frontier.get_bucket(static_cast(Bucket::cur)).insert(sources, sources + n_sources); + + constexpr size_t bucket_idx_cur = 0; + constexpr size_t bucket_idx_next = 1; + constexpr size_t num_buckets = 2; + + vertex_frontier_t vertex_frontier(handle, + num_buckets); + + vertex_frontier.bucket(bucket_idx_cur).insert(sources, sources + n_sources); rmm::device_uvector visited_flags( (push_graph_view.local_vertex_partition_range_size() + (sizeof(uint32_t) * 8 - 1)) / (sizeof(uint32_t) * 8), @@ -188,9 +191,7 @@ void bfs(raft::handle_t const& handle, : edge_partition_dst_property_t( handle); // relevant only if GraphViewType::is_multi_gpu is true - if constexpr (GraphViewType::is_multi_gpu) { - dst_visited_flags.fill(uint8_t{0}, handle.get_stream()); - } + if constexpr (GraphViewType::is_multi_gpu) { dst_visited_flags.fill(handle, uint8_t{0}); } // 4. BFS iteration vertex_t depth{0}; @@ -199,13 +200,12 @@ void bfs(raft::handle_t const& handle, CUGRAPH_FAIL("unimplemented."); } else { if (GraphViewType::is_multi_gpu) { - update_edge_partition_dst_property( - handle, - push_graph_view, - vertex_frontier.get_bucket(static_cast(Bucket::cur)).begin(), - vertex_frontier.get_bucket(static_cast(Bucket::cur)).end(), - thrust::make_constant_iterator(uint8_t{1}), - dst_visited_flags); + update_edge_partition_dst_property(handle, + push_graph_view, + vertex_frontier.bucket(bucket_idx_cur).begin(), + vertex_frontier.bucket(bucket_idx_cur).end(), + thrust::make_constant_iterator(uint8_t{1}), + dst_visited_flags); } else { thrust::copy(handle.get_thrust_policy(), visited_flags.begin(), @@ -226,8 +226,8 @@ void bfs(raft::handle_t const& handle, handle, push_graph_view, vertex_frontier, - static_cast(Bucket::cur), - std::vector{static_cast(Bucket::next)}, + bucket_idx_cur, + std::vector{bucket_idx_next}, dummy_property_t{}.device_view(), dummy_property_t{}.device_view(), #if 1 @@ -254,28 +254,19 @@ void bfs(raft::handle_t const& handle, return (v_val == invalid_distance) ? thrust::optional< thrust::tuple>>{thrust::make_tuple( - static_cast(Bucket::next), - thrust::make_tuple(depth + 1, pushed_val))} + bucket_idx_next, thrust::make_tuple(depth + 1, pushed_val))} : thrust::nullopt; }); - vertex_frontier.get_bucket(static_cast(Bucket::cur)).clear(); - vertex_frontier.get_bucket(static_cast(Bucket::cur)).shrink_to_fit(); - vertex_frontier.swap_buckets(static_cast(Bucket::cur), - static_cast(Bucket::next)); - if (vertex_frontier.get_bucket(static_cast(Bucket::cur)).aggregate_size() == 0) { - break; - } + vertex_frontier.bucket(bucket_idx_cur).clear(); + vertex_frontier.bucket(bucket_idx_cur).shrink_to_fit(); + vertex_frontier.swap_buckets(bucket_idx_cur, bucket_idx_next); + if (vertex_frontier.bucket(bucket_idx_cur).aggregate_size() == 0) { break; } } depth++; if (depth >= depth_limit) { break; } } - - RAFT_CUDA_TRY(cudaStreamSynchronize( - handle.get_stream())); // this is as necessary vertex_frontier will become out-of-scope once - // this function returns (FIXME: should I stream sync in VertexFrontier - // destructor?) } } // namespace detail diff --git a/cpp/src/traversal/sssp_impl.cuh b/cpp/src/traversal/sssp_impl.cuh index 262a36aed4f..c7f1e8b3748 100644 --- a/cpp/src/traversal/sssp_impl.cuh +++ b/cpp/src/traversal/sssp_impl.cuh @@ -125,12 +125,13 @@ void sssp(raft::handle_t const& handle, // 4. initialize SSSP frontier - enum class Bucket { cur_near, next_near, far, num_buckets }; - VertexFrontier(Bucket::num_buckets)> - vertex_frontier(handle); + constexpr size_t bucket_idx_cur_near = 0; + constexpr size_t bucket_idx_next_near = 1; + constexpr size_t bucket_idx_far = 2; + constexpr size_t num_buckets = 3; + + vertex_frontier_t vertex_frontier(handle, + num_buckets); // 5. SSSP iteration @@ -139,23 +140,22 @@ void sssp(raft::handle_t const& handle, ? edge_partition_src_property_t(handle, push_graph_view) : edge_partition_src_property_t(handle); if (GraphViewType::is_multi_gpu) { - edge_partition_src_distances.fill(std::numeric_limits::max(), handle.get_stream()); + edge_partition_src_distances.fill(handle, std::numeric_limits::max()); } if (push_graph_view.in_local_vertex_partition_range_nocheck(source_vertex)) { - vertex_frontier.get_bucket(static_cast(Bucket::cur_near)).insert(source_vertex); + vertex_frontier.bucket(bucket_idx_cur_near).insert(source_vertex); } auto near_far_threshold = delta; while (true) { if (GraphViewType::is_multi_gpu) { - update_edge_partition_src_property( - handle, - push_graph_view, - vertex_frontier.get_bucket(static_cast(Bucket::cur_near)).begin(), - vertex_frontier.get_bucket(static_cast(Bucket::cur_near)).end(), - distances, - edge_partition_src_distances); + update_edge_partition_src_property(handle, + push_graph_view, + vertex_frontier.bucket(bucket_idx_cur_near).begin(), + vertex_frontier.bucket(bucket_idx_cur_near).end(), + distances, + edge_partition_src_distances); } auto vertex_partition = vertex_partition_device_view_t( @@ -165,8 +165,8 @@ void sssp(raft::handle_t const& handle, handle, push_graph_view, vertex_frontier, - static_cast(Bucket::cur_near), - std::vector{static_cast(Bucket::next_near), static_cast(Bucket::far)}, + bucket_idx_cur_near, + std::vector{bucket_idx_next_near, bucket_idx_far}, GraphViewType::is_multi_gpu ? edge_partition_src_distances.device_view() : detail::edge_partition_major_property_device_view_t(distances), @@ -193,23 +193,20 @@ void sssp(raft::handle_t const& handle, [near_far_threshold] __device__(auto v, auto v_val, auto pushed_val) { auto new_dist = thrust::get<0>(pushed_val); auto idx = new_dist < v_val - ? (new_dist < near_far_threshold ? static_cast(Bucket::next_near) - : static_cast(Bucket::far)) - : VertexFrontier::kInvalidBucketIdx; + ? (new_dist < near_far_threshold ? bucket_idx_next_near : bucket_idx_far) + : vertex_frontier_t::kInvalidBucketIdx; return new_dist < v_val ? thrust::optional>{thrust::make_tuple( - static_cast(new_dist < near_far_threshold ? Bucket::next_near - : Bucket::far), + new_dist < near_far_threshold ? bucket_idx_next_near : bucket_idx_far, pushed_val)} : thrust::nullopt; }); - vertex_frontier.get_bucket(static_cast(Bucket::cur_near)).clear(); - vertex_frontier.get_bucket(static_cast(Bucket::cur_near)).shrink_to_fit(); - if (vertex_frontier.get_bucket(static_cast(Bucket::next_near)).aggregate_size() > 0) { - vertex_frontier.swap_buckets(static_cast(Bucket::cur_near), - static_cast(Bucket::next_near)); - } else if (vertex_frontier.get_bucket(static_cast(Bucket::far)).aggregate_size() > + vertex_frontier.bucket(bucket_idx_cur_near).clear(); + vertex_frontier.bucket(bucket_idx_cur_near).shrink_to_fit(); + if (vertex_frontier.bucket(bucket_idx_next_near).aggregate_size() > 0) { + vertex_frontier.swap_buckets(bucket_idx_cur_near, bucket_idx_next_near); + } else if (vertex_frontier.bucket(bucket_idx_far).aggregate_size() > 0) { // near queue is empty, split the far queue auto old_near_far_threshold = near_far_threshold; near_far_threshold += delta; @@ -218,20 +215,19 @@ void sssp(raft::handle_t const& handle, size_t far_size{0}; while (true) { vertex_frontier.split_bucket( - static_cast(Bucket::far), - std::vector{static_cast(Bucket::cur_near)}, + bucket_idx_far, + std::vector{bucket_idx_cur_near}, [vertex_partition, distances, old_near_far_threshold, near_far_threshold] __device__( auto v) { auto dist = *(distances + vertex_partition.local_vertex_partition_offset_from_vertex_nocheck(v)); return dist >= old_near_far_threshold - ? thrust::optional{static_cast( - dist < near_far_threshold ? Bucket::cur_near : Bucket::far)} + ? thrust::optional{dist < near_far_threshold ? bucket_idx_cur_near + : bucket_idx_far} : thrust::nullopt; }); - near_size = - vertex_frontier.get_bucket(static_cast(Bucket::cur_near)).aggregate_size(); - far_size = vertex_frontier.get_bucket(static_cast(Bucket::far)).aggregate_size(); + near_size = vertex_frontier.bucket(bucket_idx_cur_near).aggregate_size(); + far_size = vertex_frontier.bucket(bucket_idx_far).aggregate_size(); if ((near_size > 0) || (far_size == 0)) { break; } else { @@ -243,11 +239,6 @@ void sssp(raft::handle_t const& handle, break; } } - - RAFT_CUDA_TRY(cudaStreamSynchronize( - handle.get_stream())); // this is as necessary vertex_frontier will become out-of-scope once - // this function returns (FIXME: should I stream sync in VertexFrontier - // destructor?) } } // namespace detail diff --git a/cpp/src/utilities/cython.cu b/cpp/src/utilities/cython.cu index 1f6651d8942..c9180e75e33 100644 --- a/cpp/src/utilities/cython.cu +++ b/cpp/src/utilities/cython.cu @@ -1185,7 +1185,8 @@ std::unique_ptr> call_shuffle( edgelist_major_vertices, // [IN / OUT]: groupby_gpu_id_and_shuffle_values() sorts in-place vertex_t* edgelist_minor_vertices, // [IN / OUT] weight_t* edgelist_weights, // [IN / OUT] - edge_t num_edgelist_edges) + edge_t num_edgelist_edges, + bool is_weighted) { auto& comm = handle.get_comms(); auto const comm_size = comm.get_size(); @@ -1197,7 +1198,7 @@ std::unique_ptr> call_shuffle( std::unique_ptr> ptr_ret = std::make_unique>(handle); - if (edgelist_weights != nullptr) { + if (is_weighted) { auto zip_edge = thrust::make_zip_iterator( thrust::make_tuple(edgelist_major_vertices, edgelist_minor_vertices, edgelist_weights)); @@ -1242,7 +1243,7 @@ std::unique_ptr> call_shuffle( auto pair_first = thrust::make_zip_iterator( thrust::make_tuple(ptr_ret->get_major().data(), ptr_ret->get_minor().data())); - auto edge_counts = (edgelist_weights != nullptr) + auto edge_counts = (is_weighted) ? cugraph::groupby_and_count(pair_first, pair_first + ptr_ret->get_major().size(), ptr_ret->get_weights().data(), @@ -1639,42 +1640,48 @@ template std::unique_ptr> call_sh int32_t* edgelist_major_vertices, int32_t* edgelist_minor_vertices, float* edgelist_weights, - int32_t num_edgelist_edges); + int32_t num_edgelist_edges, + bool is_weighted); template std::unique_ptr> call_shuffle( raft::handle_t const& handle, int32_t* edgelist_major_vertices, int32_t* edgelist_minor_vertices, float* edgelist_weights, - int64_t num_edgelist_edges); + int64_t num_edgelist_edges, + bool is_weighted); template std::unique_ptr> call_shuffle( raft::handle_t const& handle, int32_t* edgelist_major_vertices, int32_t* edgelist_minor_vertices, double* edgelist_weights, - int32_t num_edgelist_edges); + int32_t num_edgelist_edges, + bool is_weighted); template std::unique_ptr> call_shuffle( raft::handle_t const& handle, int32_t* edgelist_major_vertices, int32_t* edgelist_minor_vertices, double* edgelist_weights, - int64_t num_edgelist_edges); + int64_t num_edgelist_edges, + bool is_weighted); template std::unique_ptr> call_shuffle( raft::handle_t const& handle, int64_t* edgelist_major_vertices, int64_t* edgelist_minor_vertices, float* edgelist_weights, - int64_t num_edgelist_edges); + int64_t num_edgelist_edges, + bool is_weighted); template std::unique_ptr> call_shuffle( raft::handle_t const& handle, int64_t* edgelist_major_vertices, int64_t* edgelist_minor_vertices, double* edgelist_weights, - int64_t num_edgelist_edges); + int64_t num_edgelist_edges, + bool is_weighted); // TODO: add the remaining relevant EIDIr's: // diff --git a/cpp/tests/prims/mg_count_if_v.cu b/cpp/tests/prims/mg_count_if_v.cu index 81512f0b832..de5d0a559c4 100644 --- a/cpp/tests/prims/mg_count_if_v.cu +++ b/cpp/tests/prims/mg_count_if_v.cu @@ -43,7 +43,7 @@ template struct test_predicate { int mod{}; test_predicate(int mod_count) : mod(mod_count) {} - __device__ bool operator()(const vertex_t& val) + __device__ bool operator()(vertex_t, const vertex_t& val) { cuco::detail::MurmurHash3_32 hash_func{}; return (0 == (hash_func(val) % mod)); @@ -137,10 +137,10 @@ class Tests_MG_CountIfV cugraph::test::construct_graph( handle, input_usecase, true, false); auto sg_graph_view = sg_graph.view(); - auto expected_vertex_count = thrust::count_if( - handle.get_thrust_policy(), + auto expected_vertex_count = count_if_v( + handle, + sg_graph_view, thrust::make_counting_iterator(sg_graph_view.local_vertex_partition_range_first()), - thrust::make_counting_iterator(sg_graph_view.local_vertex_partition_range_last()), test_predicate(hash_bin_count)); ASSERT_TRUE(expected_vertex_count == vertex_count); } diff --git a/cpp/tests/prims/mg_reduce_v.cu b/cpp/tests/prims/mg_reduce_v.cu index 2c3109c4e01..16133ce18e2 100644 --- a/cpp/tests/prims/mg_reduce_v.cu +++ b/cpp/tests/prims/mg_reduce_v.cu @@ -255,12 +255,7 @@ class Tests_MG_ReduceV hr_clock.start(); } - results[op] = reduce_v(handle, - mg_graph_view, - property_iter, - property_iter + (*d_mg_renumber_map_labels).size(), - property_initial_value, - op); + results[op] = reduce_v(handle, mg_graph_view, property_iter, property_initial_value, op); if (cugraph::test::g_perf) { RAFT_CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement diff --git a/cpp/tests/prims/mg_transform_reduce_v.cu b/cpp/tests/prims/mg_transform_reduce_v.cu index e3d75b2f4e4..4bd8bf5fe60 100644 --- a/cpp/tests/prims/mg_transform_reduce_v.cu +++ b/cpp/tests/prims/mg_transform_reduce_v.cu @@ -43,7 +43,7 @@ template struct property_transform : public thrust::unary_function { int mod{}; property_transform(int mod_count) : mod(mod_count) {} - constexpr __device__ auto operator()(const vertex_t& val) + constexpr __device__ auto operator()(vertex_t, const vertex_t& val) { cuco::detail::MurmurHash3_32 hash_func{}; auto value = hash_func(val) % mod; @@ -56,7 +56,7 @@ struct property_transform> : public thrust::unary_function> { int mod{}; property_transform(int mod_count) : mod(mod_count) {} - constexpr __device__ auto operator()(const vertex_t& val) + constexpr __device__ auto operator()(vertex_t, const vertex_t& val) { cuco::detail::MurmurHash3_32 hash_func{}; auto value = hash_func(val) % mod; @@ -213,7 +213,7 @@ class Tests_MG_TransformReduceV } } - //// 4. compare SG & MG results + // 4. compare SG & MG results if (prims_usecase.check_correctness) { cugraph::graph_t sg_graph(handle); @@ -223,16 +223,13 @@ class Tests_MG_TransformReduceV auto sg_graph_view = sg_graph.view(); for (auto op : ops) { - auto expected_result = cugraph::op_dispatch( - op, [&handle, &sg_graph_view, prop, property_initial_value](auto op) { - return thrust::transform_reduce( - handle.get_thrust_policy(), - thrust::make_counting_iterator(sg_graph_view.local_vertex_partition_range_first()), - thrust::make_counting_iterator(sg_graph_view.local_vertex_partition_range_last()), - prop, - property_initial_value, - op); - }); + auto expected_result = transform_reduce_v( + handle, + sg_graph_view, + thrust::make_counting_iterator(sg_graph_view.local_vertex_partition_range_first()), + prop, + property_initial_value, + op); result_compare compare{}; ASSERT_TRUE(compare(expected_result, results[op])); } diff --git a/cpp/tests/prims/mg_update_frontier_v_push_if_out_nbr.cu b/cpp/tests/prims/mg_update_frontier_v_push_if_out_nbr.cu index 4e065c8c119..18e70bd693f 100644 --- a/cpp/tests/prims/mg_update_frontier_v_push_if_out_nbr.cu +++ b/cpp/tests/prims/mg_update_frontier_v_push_if_out_nbr.cu @@ -175,11 +175,13 @@ class Tests_MG_UpdateFrontierVPushIfOutNbr cugraph::get_dataframe_buffer_cbegin(mg_property_buffer), mg_dst_properties); - enum class Bucket { cur, next, num_buckets }; - cugraph::VertexFrontier(Bucket::num_buckets)> - mg_vertex_frontier(handle); - mg_vertex_frontier.get_bucket(static_cast(Bucket::cur)) - .insert(sources.begin(), sources.end()); + constexpr size_t bucket_idx_cur = 0; + constexpr size_t bucket_idx_next = 1; + constexpr size_t num_buckets = 2; + + cugraph::vertex_frontier_t mg_vertex_frontier(handle, + num_buckets); + mg_vertex_frontier.bucket(bucket_idx_cur).insert(sources.begin(), sources.end()); if (cugraph::test::g_perf) { RAFT_CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement @@ -192,8 +194,8 @@ class Tests_MG_UpdateFrontierVPushIfOutNbr handle, mg_graph_view, mg_vertex_frontier, - static_cast(Bucket::cur), - std::vector{static_cast(Bucket::next)}, + bucket_idx_cur, + std::vector{bucket_idx_next}, mg_src_properties.device_view(), mg_dst_properties.device_view(), [] __device__(vertex_t src, vertex_t dst, auto src_val, auto dst_val) { @@ -206,7 +208,7 @@ class Tests_MG_UpdateFrontierVPushIfOutNbr thrust::make_discard_iterator() /* dummy */, [] __device__(auto v, auto v_val, auto pushed_val) { return thrust::optional>{ - thrust::make_tuple(static_cast(Bucket::next), std::byte{0} /* dummy */)}; + thrust::make_tuple(bucket_idx_next, std::byte{0} /* dummy */)}; }); if (cugraph::test::g_perf) { @@ -223,7 +225,7 @@ class Tests_MG_UpdateFrontierVPushIfOutNbr auto mg_aggregate_renumber_map_labels = cugraph::test::device_gatherv( handle, (*mg_renumber_map_labels).data(), (*mg_renumber_map_labels).size()); - auto& next_bucket = mg_vertex_frontier.get_bucket(static_cast(Bucket::next)); + auto& next_bucket = mg_vertex_frontier.bucket(bucket_idx_next); auto mg_aggregate_frontier_dsts = cugraph::test::device_gatherv(handle, next_bucket.begin(), next_bucket.size()); @@ -266,18 +268,16 @@ class Tests_MG_UpdateFrontierVPushIfOutNbr sg_graph_view, cugraph::get_dataframe_buffer_cbegin(sg_property_buffer), sg_dst_properties); - cugraph:: - VertexFrontier(Bucket::num_buckets)> - sg_vertex_frontier(handle); - sg_vertex_frontier.get_bucket(static_cast(Bucket::cur)) - .insert(sources.begin(), sources.end()); + cugraph::vertex_frontier_t sg_vertex_frontier(handle, + num_buckets); + sg_vertex_frontier.bucket(bucket_idx_cur).insert(sources.begin(), sources.end()); update_frontier_v_push_if_out_nbr( handle, sg_graph_view, sg_vertex_frontier, - static_cast(Bucket::cur), - std::vector{static_cast(Bucket::next)}, + bucket_idx_cur, + std::vector{bucket_idx_next}, sg_src_properties.device_view(), sg_dst_properties.device_view(), [] __device__(vertex_t src, vertex_t dst, auto src_val, auto dst_val) { @@ -290,17 +290,16 @@ class Tests_MG_UpdateFrontierVPushIfOutNbr thrust::make_discard_iterator() /* dummy */, [] __device__(auto v, auto v_val, auto pushed_val) { return thrust::optional>{ - thrust::make_tuple(static_cast(Bucket::next), std::byte{0} /* dummy */)}; + thrust::make_tuple(bucket_idx_next, std::byte{0} /* dummy */)}; }); thrust::sort(handle.get_thrust_policy(), - sg_vertex_frontier.get_bucket(static_cast(Bucket::next)).begin(), - sg_vertex_frontier.get_bucket(static_cast(Bucket::next)).end()); - bool passed = - thrust::equal(handle.get_thrust_policy(), - sg_vertex_frontier.get_bucket(static_cast(Bucket::next)).begin(), - sg_vertex_frontier.get_bucket(static_cast(Bucket::next)).end(), - mg_aggregate_frontier_dsts.begin()); + sg_vertex_frontier.bucket(bucket_idx_next).begin(), + sg_vertex_frontier.bucket(bucket_idx_next).end()); + bool passed = thrust::equal(handle.get_thrust_policy(), + sg_vertex_frontier.bucket(bucket_idx_next).begin(), + sg_vertex_frontier.bucket(bucket_idx_next).end(), + mg_aggregate_frontier_dsts.begin()); ASSERT_TRUE(passed); } } diff --git a/datasets/toy_graph.csv b/datasets/toy_graph.csv index d295499a832..dec4a956c85 100644 --- a/datasets/toy_graph.csv +++ b/datasets/toy_graph.csv @@ -2,7 +2,7 @@ 1 3 2.1 1 4 1.1 2 0 5.1 -2 3 3.1 -3 5 4.1 +2 1 3.1 +2 3 4.1 3 5 7.2 4 5 3.2 \ No newline at end of file diff --git a/docs/cugraph/source/api_docs/sampling.rst b/docs/cugraph/source/api_docs/sampling.rst index 6255c981a28..d633e94b778 100644 --- a/docs/cugraph/source/api_docs/sampling.rst +++ b/docs/cugraph/source/api_docs/sampling.rst @@ -11,3 +11,5 @@ Random Walks :toctree: api/ cugraph.random_walks + cugraph.ego_graph + cugraph.experimental.dask.uniform_neighborhood_sampling \ No newline at end of file diff --git a/docs/cugraph/source/basics/cugraph_ref.rst b/docs/cugraph/source/basics/cugraph_ref.rst index e0f113eaba4..f099752c543 100644 --- a/docs/cugraph/source/basics/cugraph_ref.rst +++ b/docs/cugraph/source/basics/cugraph_ref.rst @@ -28,6 +28,13 @@ Betweenness Centrality Katz +- Katz, L. (1953). *A new status index derived from sociometric analysis*. Psychometrika, 18(1), 39-43. +- Foster, K.C., Muth, S.Q., Potterat, J.J. et al. *A faster Katz status score algorithm*. Computational & Mathematical Organization Theory (2001) 7: 275. + + + +K-Truss + - J. Cohen, *Trusses: Cohesive subgraphs for social network analysis* National security agency technical report, 2008 - O. Green, J. Fox, E. Kim, F. Busato, et al. *Quickly Finding a Truss in a Haystack* IEEE High Performance Extreme Computing Conference (HPEC), 2017 https://doi.org/10.1109/HPEC.2017.8091038 - O. Green, P. Yalamanchili, L.M. Munguia, *Fast Triangle Counting on GPU* Irregular Applications: Architectures and Algorithms (IA3), 2014 @@ -36,6 +43,18 @@ Hungarian Algorithm - Date, K., & Nagi, R. (2016). GPU-accelerated Hungarian algorithms for the Linear Assignment Problem. Parallel Computing, 57, 52-72. + +Leiden + +- Traag, V. A., Waltman, L., & Van Eck, N. J. (2019). *From Louvain to Leiden: guaranteeing well-connected communities*. Scientific reports, 9(1), 1-12. + +Louvain + +- VD Blondel, J-L Guillaume, R Lambiotte and E Lefebvre. *Fast unfolding of community hierarchies in large networks*. J Stat Mech P10008 (2008) + + + + | | diff --git a/python/cugraph/cugraph/centrality/katz_centrality.py b/python/cugraph/cugraph/centrality/katz_centrality.py index 000da2b9405..30f84d66fa0 100644 --- a/python/cugraph/cugraph/centrality/katz_centrality.py +++ b/python/cugraph/cugraph/centrality/katz_centrality.py @@ -116,6 +116,18 @@ def katz_centrality( >>> kc = cugraph.katz_centrality(G) """ + if (alpha is not None) and (alpha <= 0.0): + raise ValueError(f"'alpha' must be a positive float or None, " + f"got: {alpha}") + if (not isinstance(beta, float)) or (beta <= 0.0): + raise ValueError(f"'beta' must be a positive float, got: {beta}") + if (not isinstance(max_iter, int)): + raise ValueError(f"'max_iter' must be an integer, got: {max_iter}") + elif max_iter <= 0: + max_iter = 1000 + if (not isinstance(tol, float)) or (tol <= 0.0): + raise ValueError(f"'tol' must be a positive float, got: {tol}") + G, isNx = ensure_cugraph_obj_for_nx(G) srcs = G.edgelist.edgelist_df['src'] @@ -127,7 +139,7 @@ def katz_centrality( # with type hardcoded to float32 is passed into wrapper weights = cudf.Series((srcs + 1) / (srcs + 1), dtype="float32") - if alpha is None or alpha <= 0.0: + if alpha is None: largest_out_degree = G.degrees().nlargest(n=1, columns="out_degree") largest_out_degree = largest_out_degree["out_degree"].iloc[0] alpha = 1 / (largest_out_degree + 1) diff --git a/python/cugraph/cugraph/dask/common/part_utils.py b/python/cugraph/cugraph/dask/common/part_utils.py index c8e675c7a29..bc15e0a7da9 100644 --- a/python/cugraph/cugraph/dask/common/part_utils.py +++ b/python/cugraph/cugraph/dask/common/part_utils.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2021, NVIDIA CORPORATION. +# Copyright (c) 2019-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -88,9 +88,23 @@ async def _extract_partitions(dask_obj, client=None, batch_enabled=False): if batch_enabled: persisted = client.persist(dask_obj, workers=worker_list[0]) else: + # Have the first n workers persisting the n partitions + # Ideally, there would be as many partitions as there are workers persisted = [client.persist( dask_obj.get_partition(p), workers=w) for p, w in enumerate( - worker_list)] + worker_list[:dask_obj.npartitions])] + # Persist empty dataframe with the remaining workers if there are + # less partitions than workers + if dask_obj.npartitions < len(worker_list): + # The empty df should have the same column names and dtypes as + # dask_obj + empty_df = cudf.DataFrame(columns=list(dask_obj.columns)) + empty_df = empty_df.astype(dict(zip( + dask_obj.columns, dask_obj.dtypes))) + for p, w in enumerate(worker_list[dask_obj.npartitions:]): + empty_ddf = dask_cudf.from_cudf(empty_df, npartitions=1) + persisted.append(client.persist(empty_ddf, workers=w)) + parts = futures_of(persisted) # iterable of dask collections (need to colocate them) elif isinstance(dask_obj, collections.abc.Sequence): diff --git a/python/cugraph/cugraph/gnn/graph_store.py b/python/cugraph/cugraph/gnn/graph_store.py index 2823d69aab1..4681d7891bf 100644 --- a/python/cugraph/cugraph/gnn/graph_store.py +++ b/python/cugraph/cugraph/gnn/graph_store.py @@ -13,8 +13,11 @@ import cudf import cugraph -from cugraph.experimental import EXPERIMENTAL__PropertyGraph as PropertyGraph +from cugraph.experimental import PropertyGraph from cugraph.community.egonet import batched_ego_graphs +import cupy +import random +import numpy as np class CuGraphStore: @@ -33,11 +36,11 @@ class CuGraphStore: @property def ndata(self): - raise NotImplementedError("not yet implemented") + return self.__G._vertex_prop_dataframe @property def edata(self): - raise NotImplementedError("not yet implemented") + return self.__G._edge_prop_dataframe @property def gdata(self): @@ -97,11 +100,37 @@ def sample_neighbors(self, Returns ------- - DGLGraph - The sampled subgraph with the same node ID space with the original - graph. + CuPy array + The sampled arrays for bipartite graph. """ - pass + num_nodes = len(nodes) + current_seeds = nodes.reindex(index=np.arange(0, num_nodes)) + _g = self.__G.extract_subgraph(create_using=cugraph.Graph, + allow_multi_edges=True) + ego_edge_list, seeds_offsets = batched_ego_graphs(_g, + current_seeds, + radius=1) + all_parents = cupy.ndarray(0) + all_children = cupy.ndarray(0) + # filter and get a certain size neighborhood + for i in range(1, len(seeds_offsets)): + pos0 = seeds_offsets.values_host[i-1] + pos1 = seeds_offsets.values_host[i] + edge_list = ego_edge_list[pos0:pos1] + # get randomness fanout + filtered_list = edge_list[edge_list['dst'] == current_seeds[i-1]] + + # get sampled_list + if len(filtered_list) > fanout: + sampled_indices = random.sample( + filtered_list.index.to_arrow().to_pylist(), fanout) + filtered_list = filtered_list.reindex(index=sampled_indices) + + children = cupy.asarray(filtered_list['src']) + parents = cupy.asarray(filtered_list['dst']) + all_parents = cupy.append(all_parents, parents) + all_children = cupy.append(all_children, children) + return all_parents, all_children def node_subgraph(self, nodes=None, diff --git a/python/cugraph/cugraph/structure/graph_utilities.pxd b/python/cugraph/cugraph/structure/graph_utilities.pxd index 7f3f88cdee0..de93da56af8 100644 --- a/python/cugraph/cugraph/structure/graph_utilities.pxd +++ b/python/cugraph/cugraph/structure/graph_utilities.pxd @@ -182,7 +182,8 @@ cdef extern from "cugraph/utilities/cython.hpp" namespace "cugraph::cython": vertex_t *edgelist_major_vertices, vertex_t *edgelist_minor_vertices, weight_t* edgelist_weights, - edge_t num_edges) except + + edge_t num_edges, + bool is_weighted) except + # 5. `renumber_edgelist()` wrapper # diff --git a/python/cugraph/cugraph/structure/renumber_wrapper.pyx b/python/cugraph/cugraph/structure/renumber_wrapper.pyx index 739a75df471..515e53f59f1 100644 --- a/python/cugraph/cugraph/structure/renumber_wrapper.pyx +++ b/python/cugraph/cugraph/structure/renumber_wrapper.pyx @@ -53,11 +53,13 @@ cdef renumber_helper(shuffled_vertices_t* ptr_maj_min_w, vertex_t, weights): # vertex_t or weight_t. Failing to do that will create am empty column of type object # which is not supported by '__cuda_array_interface__' if shuffled_major_series is None: - shuffled_major_series = cudf.Series(dtype=vertex_t) + shuffled_df['major_vertices'] = cudf.Series(dtype=vertex_t) + else: + shuffled_df['major_vertices']= shuffled_major_series if shuffled_minor_series is None: - shuffled_minor_series = cudf.Series(dtype=vertex_t) - shuffled_df['major_vertices']= shuffled_major_series - shuffled_df['minor_vertices']= shuffled_minor_series + shuffled_df['minor_vertices'] = cudf.Series(dtype=vertex_t) + else: + shuffled_df['minor_vertices']= shuffled_minor_series if weights is not None: weight_t = weights.dtype @@ -65,7 +67,8 @@ cdef renumber_helper(shuffled_vertices_t* ptr_maj_min_w, vertex_t, weights): move(pair_s_weights.first), weight_t, "shuffled_weights") if shuffled_weights_series is None: shuffled_df['value']= cudf.Series(dtype=weight_t) - shuffled_df['value']= shuffled_weights_series + else: + shuffled_df['value']= shuffled_weights_series return shuffled_df @@ -176,7 +179,8 @@ def renumber(input_df, # maybe use cpdef ? c_major_vertices, c_minor_vertices, c_edge_weights, - num_local_edges).release()) + num_local_edges, + weights is not None).release()) shuffled_df = renumber_helper(ptr_shuffled_32_32_32.get(), vertex_t, weights) major_vertices = shuffled_df['major_vertices'] minor_vertices = shuffled_df['minor_vertices'] @@ -245,7 +249,8 @@ def renumber(input_df, # maybe use cpdef ? c_major_vertices, c_minor_vertices, c_edge_weights, - num_local_edges).release()) + num_local_edges, + weights is not None).release()) shuffled_df = renumber_helper(ptr_shuffled_32_32_64.get(), vertex_t, weights) major_vertices = shuffled_df['major_vertices'] @@ -317,7 +322,8 @@ def renumber(input_df, # maybe use cpdef ? c_major_vertices, c_minor_vertices, c_edge_weights, - num_local_edges).release()) + num_local_edges, + weights is not None).release()) shuffled_df = renumber_helper(ptr_shuffled_32_64_32.get(), vertex_t, weights) major_vertices = shuffled_df['major_vertices'] @@ -387,7 +393,8 @@ def renumber(input_df, # maybe use cpdef ? c_major_vertices, c_minor_vertices, c_edge_weights, - num_local_edges).release()) + num_local_edges, + weights is not None).release()) shuffled_df = renumber_helper(ptr_shuffled_32_64_64.get(), vertex_t, weights) major_vertices = shuffled_df['major_vertices'] @@ -459,7 +466,8 @@ def renumber(input_df, # maybe use cpdef ? c_major_vertices, c_minor_vertices, c_edge_weights, - num_local_edges).release()) + num_local_edges, + weights is not None).release()) shuffled_df = renumber_helper(ptr_shuffled_64_64_32.get(), vertex_t, weights) major_vertices = shuffled_df['major_vertices'] @@ -530,7 +538,8 @@ def renumber(input_df, # maybe use cpdef ? c_major_vertices, c_minor_vertices, c_edge_weights, - num_local_edges).release()) + num_local_edges, + weights is not None).release()) shuffled_df = renumber_helper(ptr_shuffled_64_64_64.get(), vertex_t, weights) major_vertices = shuffled_df['major_vertices'] diff --git a/python/cugraph/cugraph/tests/dask/test_mg_bfs.py b/python/cugraph/cugraph/tests/dask/test_mg_bfs.py index 19e345f1ee1..58b18a02cf5 100644 --- a/python/cugraph/cugraph/tests/dask/test_mg_bfs.py +++ b/python/cugraph/cugraph/tests/dask/test_mg_bfs.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -13,17 +13,17 @@ import cugraph.dask as dcg import gc -import pytest +# import pytest import cugraph import dask_cudf import cudf -from cugraph.dask.common.mg_utils import is_single_gpu +# from cugraph.dask.common.mg_utils import is_single_gpu from cugraph.tests.utils import RAPIDS_DATASET_ROOT_DIR_PATH -@pytest.mark.skipif( - is_single_gpu(), reason="skipping MG testing on Single GPU system" -) +# @pytest.mark.skipif( +# is_single_gpu(), reason="skipping MG testing on Single GPU system" +# ) def test_dask_bfs(dask_client): gc.collect() @@ -60,10 +60,10 @@ def modify_dataset(df): df = modify_dataset(df) - g = cugraph.DiGraph() + g = cugraph.Graph(directed=True) g.from_cudf_edgelist(df, "src", "dst") - dg = cugraph.DiGraph() + dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst") expected_dist = cugraph.bfs(g, [0, 1000]) @@ -85,9 +85,9 @@ def modify_dataset(df): assert err == 0 -@pytest.mark.skipif( - is_single_gpu(), reason="skipping MG testing on Single GPU system" -) +# @pytest.mark.skipif( +# is_single_gpu(), reason="skipping MG testing on Single GPU system" +# ) def test_dask_bfs_multi_column_depthlimit(dask_client): gc.collect() @@ -115,10 +115,10 @@ def test_dask_bfs_multi_column_depthlimit(dask_client): df['src_b'] = df['src_a'] + 1000 df['dst_b'] = df['dst_a'] + 1000 - g = cugraph.DiGraph() + g = cugraph.Graph(directed=True) g.from_cudf_edgelist(df, ["src_a", "src_b"], ["dst_a", "dst_b"]) - dg = cugraph.DiGraph() + dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, ["src_a", "src_b"], ["dst_a", "dst_b"]) start = cudf.DataFrame() diff --git a/python/cugraph/cugraph/tests/dask/test_mg_comms.py b/python/cugraph/cugraph/tests/dask/test_mg_comms.py index ab80d781ba8..2c02779f199 100644 --- a/python/cugraph/cugraph/tests/dask/test_mg_comms.py +++ b/python/cugraph/cugraph/tests/dask/test_mg_comms.py @@ -13,17 +13,17 @@ import cugraph.dask as dcg import gc -import pytest +# import pytest import cugraph import dask_cudf import cudf -from cugraph.dask.common.mg_utils import is_single_gpu +# from cugraph.dask.common.mg_utils import is_single_gpu from cugraph.tests.utils import RAPIDS_DATASET_ROOT_DIR_PATH -@pytest.mark.skipif( - is_single_gpu(), reason="skipping MG testing on Single GPU system" -) +# @pytest.mark.skipif( +# is_single_gpu(), reason="skipping MG testing on Single GPU system" +# ) def test_dask_pagerank(dask_client): gc.collect() diff --git a/python/cugraph/cugraph/tests/dask/test_mg_connectivity.py b/python/cugraph/cugraph/tests/dask/test_mg_connectivity.py index 08195a6bdda..9427b18aa92 100644 --- a/python/cugraph/cugraph/tests/dask/test_mg_connectivity.py +++ b/python/cugraph/cugraph/tests/dask/test_mg_connectivity.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -13,17 +13,17 @@ import cugraph.dask as dcg import gc -import pytest +# import pytest import cugraph import dask_cudf import cudf -from cugraph.dask.common.mg_utils import is_single_gpu +# from cugraph.dask.common.mg_utils import is_single_gpu from cugraph.tests.utils import RAPIDS_DATASET_ROOT_DIR_PATH -@pytest.mark.skipif( - is_single_gpu(), reason="skipping MG testing on Single GPU system" -) +# @pytest.mark.skipif( +# is_single_gpu(), reason="skipping MG testing on Single GPU system" +# ) def test_dask_wcc(dask_client): gc.collect() @@ -47,10 +47,10 @@ def test_dask_wcc(dask_client): dtype=["int32", "int32", "float32"], ) - g = cugraph.DiGraph() + g = cugraph.Graph(directed=True) g.from_cudf_edgelist(df, "src", "dst", renumber=True) - dg = cugraph.DiGraph() + dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst") expected_dist = cugraph.weakly_connected_components(g) diff --git a/python/cugraph/cugraph/tests/dask/test_mg_degree.py b/python/cugraph/cugraph/tests/dask/test_mg_degree.py index f8af0d0e87d..b0b46e93d10 100644 --- a/python/cugraph/cugraph/tests/dask/test_mg_degree.py +++ b/python/cugraph/cugraph/tests/dask/test_mg_degree.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018-2021, NVIDIA CORPORATION. +# Copyright (c) 2018-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -49,10 +49,10 @@ def test_dask_mg_degree(dask_client): dtype=["int32", "int32", "float32"], ) - dg = cugraph.DiGraph() + dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst") - g = cugraph.DiGraph() + g = cugraph.Graph(directed=True) g.from_cudf_edgelist(df, "src", "dst") merge_df_in = ( diff --git a/python/cugraph/cugraph/tests/dask/test_mg_hits.py b/python/cugraph/cugraph/tests/dask/test_mg_hits.py index 3cff17e62c8..124bc5066cb 100644 --- a/python/cugraph/cugraph/tests/dask/test_mg_hits.py +++ b/python/cugraph/cugraph/tests/dask/test_mg_hits.py @@ -16,7 +16,7 @@ import pytest import cugraph import dask_cudf -from cugraph.dask.common.mg_utils import is_single_gpu +# from cugraph.dask.common.mg_utils import is_single_gpu from cugraph.tests import utils # ============================================================================= @@ -97,9 +97,9 @@ def input_expected_output(input_combo): # ============================================================================= -@pytest.mark.skipif( - is_single_gpu(), reason="skipping MG testing on Single GPU system" -) +# @pytest.mark.skipif( +# is_single_gpu(), reason="skipping MG testing on Single GPU system" +# ) def test_dask_hits(dask_client, benchmark, input_expected_output): dg = input_expected_output["MGGraph"] diff --git a/python/cugraph/cugraph/tests/dask/test_mg_louvain.py b/python/cugraph/cugraph/tests/dask/test_mg_louvain.py index 086d26316d8..fbf3dab90e2 100644 --- a/python/cugraph/cugraph/tests/dask/test_mg_louvain.py +++ b/python/cugraph/cugraph/tests/dask/test_mg_louvain.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -17,7 +17,7 @@ import cugraph import dask_cudf from cugraph.tests import utils -from cugraph.dask.common.mg_utils import is_single_gpu +# from cugraph.dask.common.mg_utils import is_single_gpu try: from rapids_pytest_benchmark import setFixtureParamNames @@ -39,9 +39,9 @@ def setFixtureParamNames(*args, **kwargs): ############################################################################### # Fixtures -@pytest.mark.skipif( - is_single_gpu(), reason="skipping MG testing on Single GPU system" -) +# @pytest.mark.skipif( +# is_single_gpu(), reason="skipping MG testing on Single GPU system" +# ) @pytest.fixture(scope="module", params=utils.DATASETS_UNDIRECTED, ids=[f"dataset={d.as_posix()}" @@ -64,16 +64,16 @@ def daskGraphFromDataset(request, dask_client): dtype=["int32", "int32", "float32"], ) - dg = cugraph.DiGraph() + dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst") return dg ############################################################################### # Tests -@pytest.mark.skipif( - is_single_gpu(), reason="skipping MG testing on Single GPU system" -) +# @pytest.mark.skipif( +# is_single_gpu(), reason="skipping MG testing on Single GPU system" +# ) def test_mg_louvain_with_edgevals(daskGraphFromDataset): # FIXME: daskGraphFromDataset returns a DiGraph, which Louvain is currently # accepting. In the future, an MNMG symmeterize will need to be called to diff --git a/python/cugraph/cugraph/tests/dask/test_mg_pagerank.py b/python/cugraph/cugraph/tests/dask/test_mg_pagerank.py index 30f9844a25b..957e9a7747e 100644 --- a/python/cugraph/cugraph/tests/dask/test_mg_pagerank.py +++ b/python/cugraph/cugraph/tests/dask/test_mg_pagerank.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -17,7 +17,7 @@ import cugraph import dask_cudf import cudf -from cugraph.dask.common.mg_utils import is_single_gpu +# from cugraph.dask.common.mg_utils import is_single_gpu from cugraph.tests.utils import RAPIDS_DATASET_ROOT_DIR_PATH @@ -50,9 +50,9 @@ def personalize(vertices, personalization_perc): PERSONALIZATION_PERC = [0, 10, 50] -@pytest.mark.skipif( - is_single_gpu(), reason="skipping MG testing on Single GPU system" -) +# @pytest.mark.skipif( +# is_single_gpu(), reason="skipping MG testing on Single GPU system" +# ) @pytest.mark.parametrize("personalization_perc", PERSONALIZATION_PERC) def test_dask_pagerank(dask_client, personalization_perc): gc.collect() @@ -77,10 +77,10 @@ def test_dask_pagerank(dask_client, personalization_perc): dtype=["int32", "int32", "float32"], ) - g = cugraph.DiGraph() + g = cugraph.Graph(directed=True) g.from_cudf_edgelist(df, "src", "dst") - dg = cugraph.DiGraph() + dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst") personalization = None diff --git a/python/cugraph/cugraph/tests/dask/test_mg_sssp.py b/python/cugraph/cugraph/tests/dask/test_mg_sssp.py index 0e3c45607af..656c91d1754 100644 --- a/python/cugraph/cugraph/tests/dask/test_mg_sssp.py +++ b/python/cugraph/cugraph/tests/dask/test_mg_sssp.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -13,17 +13,17 @@ import cugraph.dask as dcg import gc -import pytest +# import pytest import cugraph import dask_cudf import cudf -from cugraph.dask.common.mg_utils import is_single_gpu +# from cugraph.dask.common.mg_utils import is_single_gpu from cugraph.tests.utils import RAPIDS_DATASET_ROOT_DIR_PATH -@pytest.mark.skipif( - is_single_gpu(), reason="skipping MG testing on Single GPU system" -) +# @pytest.mark.skipif( +# is_single_gpu(), reason="skipping MG testing on Single GPU system" +# ) def test_dask_sssp(dask_client): gc.collect() @@ -47,10 +47,10 @@ def test_dask_sssp(dask_client): dtype=["int32", "int32", "float32"], ) - g = cugraph.DiGraph() + g = cugraph.Graph(directed=True) g.from_cudf_edgelist(df, "src", "dst", "value", renumber=True) - dg = cugraph.DiGraph() + dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", "value") expected_dist = cugraph.sssp(g, 0) diff --git a/python/cugraph/cugraph/tests/dask/test_mg_utility.py b/python/cugraph/cugraph/tests/dask/test_mg_utility.py index 89600260714..732c785df68 100644 --- a/python/cugraph/cugraph/tests/dask/test_mg_utility.py +++ b/python/cugraph/cugraph/tests/dask/test_mg_utility.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018-2021, NVIDIA CORPORATION. +# Copyright (c) 2018-2022, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -35,9 +35,9 @@ def setup_function(): gc.collect() -@pytest.mark.skipif( - is_single_gpu(), reason="skipping MG testing on Single GPU system" -) +# @pytest.mark.skipif( +# is_single_gpu(), reason="skipping MG testing on Single GPU system" +# ) def test_from_edgelist(dask_client): input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / "karate.csv").as_posix() @@ -53,9 +53,9 @@ def test_from_edgelist(dask_client): dg1 = cugraph.from_edgelist( ddf, source="src", destination="dst", edge_attr="value", - create_using=cugraph.DiGraph) + create_using=cugraph.Graph(directed=True)) - dg2 = cugraph.DiGraph() + dg2 = cugraph.Graph(directed=True) dg2.from_dask_cudf_edgelist( ddf, source="src", destination="dst", edge_attr="value" ) diff --git a/python/cugraph/cugraph/tests/test_graph_store.py b/python/cugraph/cugraph/tests/test_graph_store.py index 5f783c3baa8..794811d5ee9 100644 --- a/python/cugraph/cugraph/tests/test_graph_store.py +++ b/python/cugraph/cugraph/tests/test_graph_store.py @@ -16,6 +16,8 @@ import cugraph from cugraph.tests import utils from cugraph.experimental import PropertyGraph +import numpy as np +import cudf # Test @@ -63,19 +65,20 @@ def test_using_pgraph(graph_file): @pytest.mark.parametrize("graph_file", utils.DATASETS) def test_node_data_pg(graph_file): - with pytest.raises(NotImplementedError): - cu_M = utils.read_csv_file(graph_file) + cu_M = utils.read_csv_file(graph_file) - pG = PropertyGraph() - pG.add_edge_data(cu_M, - type_name="edge", - vertex_col_names=("0", "1"), - property_columns=None) + pG = PropertyGraph() + pG.add_edge_data(cu_M, + type_name="edge", + vertex_col_names=("0", "1"), + property_columns=None) + + gstore = cugraph.gnn.CuGraphStore(graph=pG) - gstore = cugraph.gnn.CuGraphStore(graph=pG) + edata = gstore.edata - gstore.ndata + assert edata.shape[0] > 0 @pytest.mark.parametrize("graph_file", utils.DATASETS) @@ -132,3 +135,79 @@ def test_workflow(graph_file): ego_edge_list, seeds_offsets = gstore.egonet(sampled_nodes, k=1) assert len(ego_edge_list) > 0 + + +@pytest.mark.parametrize("graph_file", utils.DATASETS) +def test_sample_neighbors(graph_file): + cu_M = utils.read_csv_file(graph_file) + + g = cugraph.Graph(directed=True) + g.from_cudf_edgelist(cu_M, source='0', destination='1', renumber=True) + + pg = PropertyGraph() + pg.add_edge_data(cu_M, + type_name="edge", + vertex_col_names=("0", "1"), + property_columns=["2"]) + + gstore = cugraph.gnn.CuGraphStore(graph=pg) + + nodes = gstore.get_vertex_ids() + num_nodes = len(nodes) + + assert num_nodes > 0 + + sampled_nodes = nodes[:5] + + parents_list, children_list = gstore.sample_neighbors(sampled_nodes, 2) + + assert len(parents_list) > 0 + + +@pytest.mark.parametrize("graph_file", utils.DATASETS) +def test_n_data(graph_file): + cu_M = utils.read_csv_file(graph_file) + + g = cugraph.Graph(directed=True) + g.from_cudf_edgelist(cu_M, source='0', destination='1', renumber=True) + + pg = PropertyGraph() + pg.add_edge_data(cu_M, + type_name="edge", + vertex_col_names=("0", "1"), + property_columns=["2"]) + + num_nodes = g.number_of_nodes() + df_feat = cudf.DataFrame() + df_feat['node_id'] = np.arange(num_nodes) + df_feat['val0'] = [float(i+1) for i in range(num_nodes)] + df_feat['val1'] = [float(i+2) for i in range(num_nodes)] + pg.add_vertex_data(df_feat, + type_name="test_feat", + vertex_col_name="node_id", + property_columns=None) + gstore = cugraph.gnn.CuGraphStore(graph=pg) + + ndata = gstore.ndata + + assert ndata.shape[0] > 0 + + +@pytest.mark.parametrize("graph_file", utils.DATASETS) +def test_e_data(graph_file): + cu_M = utils.read_csv_file(graph_file) + + g = cugraph.Graph(directed=True) + g.from_cudf_edgelist(cu_M, source='0', destination='1', renumber=True) + + pg = PropertyGraph() + pg.add_edge_data(cu_M, + type_name="edge", + vertex_col_names=("0", "1"), + property_columns=["2"]) + + gstore = cugraph.gnn.CuGraphStore(graph=pg) + + edata = gstore.edata + + assert edata.shape[0] > 0 diff --git a/python/cugraph/cugraph/tests/test_katz_centrality.py b/python/cugraph/cugraph/tests/test_katz_centrality.py index ce50575d467..aed5930665b 100644 --- a/python/cugraph/cugraph/tests/test_katz_centrality.py +++ b/python/cugraph/cugraph/tests/test_katz_centrality.py @@ -30,7 +30,15 @@ warnings.filterwarnings("ignore", category=DeprecationWarning) import networkx as nx -LIBCUGRAPH_C_DATASET = utils.RAPIDS_DATASET_ROOT_DIR_PATH/"toy_graph.csv" +# This toy graph is used in multiple tests throughout libcugraph_c and pylib. +TOY_DATASET = utils.RAPIDS_DATASET_ROOT_DIR_PATH/"toy_graph.csv" + + +# ============================================================================= +# Pytest Setup / Teardown - called for each test function +# ============================================================================= +def setup_function(): + gc.collect() def topKVertices(katz, col, k): @@ -64,8 +72,6 @@ def calc_katz(graph_file): @pytest.mark.parametrize("graph_file", utils.DATASETS) def test_katz_centrality(graph_file): - gc.collect() - katz_scores = calc_katz(graph_file) topKNX = topKVertices(katz_scores, "nx_katz", 10) @@ -76,8 +82,6 @@ def test_katz_centrality(graph_file): @pytest.mark.parametrize("graph_file", utils.DATASETS_UNDIRECTED) def test_katz_centrality_nx(graph_file): - gc.collect() - NM = utils.read_csv_for_nx(graph_file) Gnx = nx.from_pandas_edgelist( @@ -109,8 +113,6 @@ def test_katz_centrality_nx(graph_file): @pytest.mark.parametrize("graph_file", utils.DATASETS_UNDIRECTED) def test_katz_centrality_multi_column(graph_file): - gc.collect() - cu_M = utils.read_csv_file(graph_file) cu_M.rename(columns={'0': 'src_0', '1': 'dst_0'}, inplace=True) cu_M['src_1'] = cu_M['src_0'] + 1000 @@ -142,12 +144,10 @@ def test_katz_centrality_multi_column(graph_file): assert top_res.equals(top_exp) -@pytest.mark.parametrize("graph_file", [LIBCUGRAPH_C_DATASET]) +@pytest.mark.parametrize("graph_file", [TOY_DATASET]) def test_katz_centrality_toy(graph_file): # This test is based off of libcugraph_c and pylibcugraph tests - gc.collect() - - df = cudf.read_csv(LIBCUGRAPH_C_DATASET, delimiter=' ', + df = cudf.read_csv(TOY_DATASET, delimiter=' ', dtype=['int32', 'int32', 'float32'], header=None) G = cugraph.Graph(directed=True) G.from_cudf_edgelist(df, source='0', destination='1', edge_attr='2') @@ -166,6 +166,6 @@ def test_katz_centrality_toy(graph_file): for vertex in ck["vertex"].to_pandas(): expected_score = centralities[vertex] actual_score = ck["katz_centrality"].iloc[vertex] - if pytest.approx(expected_score, abs=1e-2) != actual_score: - raise ValueError(f"Katz centrality score is {actual_score}" - f", should have been {expected_score}") + assert pytest.approx(expected_score, abs=1e-2) == actual_score, \ + f"Katz centrality score is {actual_score}, should have" \ + f"been {expected_score}" diff --git a/python/cugraph/cugraph/tests/utils.py b/python/cugraph/cugraph/tests/utils.py index 403ba55edcb..bcfa29bed1b 100755 --- a/python/cugraph/cugraph/tests/utils.py +++ b/python/cugraph/cugraph/tests/utils.py @@ -20,12 +20,12 @@ import networkx as nx import numpy as np import cupy as cp -from cupyx.scipy.sparse.coo import coo_matrix as cp_coo_matrix -from cupyx.scipy.sparse.csr import csr_matrix as cp_csr_matrix -from cupyx.scipy.sparse.csc import csc_matrix as cp_csc_matrix -from scipy.sparse.coo import coo_matrix as sp_coo_matrix -from scipy.sparse.csr import csr_matrix as sp_csr_matrix -from scipy.sparse.csc import csc_matrix as sp_csc_matrix +from cupyx.scipy.sparse import coo_matrix as cp_coo_matrix +from cupyx.scipy.sparse import csr_matrix as cp_csr_matrix +from cupyx.scipy.sparse import csc_matrix as cp_csc_matrix +from scipy.sparse import coo_matrix as sp_coo_matrix +from scipy.sparse import csr_matrix as sp_csr_matrix +from scipy.sparse import csc_matrix as sp_csc_matrix from pathlib import Path import cudf import dask_cudf diff --git a/python/cugraph/cugraph/utilities/utils.py b/python/cugraph/cugraph/utilities/utils.py index 332e09a545a..b35eab937ad 100644 --- a/python/cugraph/cugraph/utilities/utils.py +++ b/python/cugraph/cugraph/utilities/utils.py @@ -24,9 +24,9 @@ # optional dependencies try: import cupy as cp - from cupyx.scipy.sparse.coo import coo_matrix as cp_coo_matrix - from cupyx.scipy.sparse.csr import csr_matrix as cp_csr_matrix - from cupyx.scipy.sparse.csc import csc_matrix as cp_csc_matrix + from cupyx.scipy.sparse import coo_matrix as cp_coo_matrix + from cupyx.scipy.sparse import csr_matrix as cp_csr_matrix + from cupyx.scipy.sparse import csc_matrix as cp_csc_matrix __cp_matrix_types = [cp_coo_matrix, cp_csr_matrix, cp_csc_matrix] __cp_compressed_matrix_types = [cp_csr_matrix, cp_csc_matrix] @@ -39,9 +39,9 @@ try: import scipy as sp - from scipy.sparse.coo import coo_matrix as sp_coo_matrix - from scipy.sparse.csr import csr_matrix as sp_csr_matrix - from scipy.sparse.csc import csc_matrix as sp_csc_matrix + from scipy.sparse import coo_matrix as sp_coo_matrix + from scipy.sparse import csr_matrix as sp_csr_matrix + from scipy.sparse import csc_matrix as sp_csc_matrix __sp_matrix_types = [sp_coo_matrix, sp_csr_matrix, sp_csc_matrix] __sp_compressed_matrix_types = [sp_csr_matrix, sp_csc_matrix] diff --git a/python/pylibcugraph/pylibcugraph/tests/test_katz_centrality.py b/python/pylibcugraph/pylibcugraph/tests/test_katz_centrality.py index 3422b7d9827..e50e2401039 100644 --- a/python/pylibcugraph/pylibcugraph/tests/test_katz_centrality.py +++ b/python/pylibcugraph/pylibcugraph/tests/test_katz_centrality.py @@ -18,6 +18,11 @@ GraphProperties, SGGraph, katz_centrality) +import pathlib +import pylibcugraph + + +datasets = pathlib.Path(pylibcugraph.__path__[0]).parent.parent.parent # ============================================================================= @@ -74,13 +79,13 @@ def _generic_katz_test(src_arr, def test_katz(): num_edges = 8 num_vertices = 6 - src = cp.asarray([0, 1, 1, 2, 2, 2, 3, 4], dtype=np.int32) - dst = cp.asarray([1, 3, 4, 0, 1, 3, 5, 5], dtype=np.int32) - wgt = cp.asarray([0.1, 2.1, 1.1, 5.1, 3.1, 4.1, 7.2, 3.2], - dtype=np.float32) + graph_data = np.genfromtxt(datasets / 'datasets/toy_graph.csv', + delimiter=' ') + src = cp.asarray(graph_data[:, 0], dtype=np.int32) + dst = cp.asarray(graph_data[:, 1], dtype=np.int32) + wgt = cp.asarray(graph_data[:, 2], dtype=np.float32) result = cp.asarray([0.410614, 0.403211, 0.390689, 0.415175, 0.395125, 0.433226], dtype=np.float32) - alpha = 0.01 beta = 1.0 epsilon = 0.000001