From 2f3ed7ab01beed122a916e9c6f57e87f0fdbe3b9 Mon Sep 17 00:00:00 2001 From: Kumar Aatish Date: Wed, 27 Oct 2021 17:02:13 -0400 Subject: [PATCH] Add raft ops for reduce_v and transform_reduce_v (#1902) reduce_v and transform_reduce_v support raft::comms::op_t as a parameter. Tests have been updated accordingly. Fixes #1903 Authors: - Kumar Aatish (https://github.com/kaatish) Approvers: - Chuck Hastings (https://github.com/ChuckHastings) - Seunghwa Kang (https://github.com/seunghwak) URL: https://github.com/rapidsai/cugraph/pull/1902 --- .../copy_v_transform_reduce_in_out_nbr.cuh | 8 +- .../cugraph/prims/property_op_utils.cuh | 36 +++++++-- cpp/include/cugraph/prims/reduce_v.cuh | 40 ++++----- .../cugraph/prims/transform_reduce_e.cuh | 10 +-- .../cugraph/prims/transform_reduce_v.cuh | 45 ++++++----- cpp/tests/prims/mg_reduce_v.cu | 81 ++++++++++++------- cpp/tests/prims/mg_transform_reduce_v.cu | 75 ++++++++++------- 7 files changed, 183 insertions(+), 112 deletions(-) diff --git a/cpp/include/cugraph/prims/copy_v_transform_reduce_in_out_nbr.cuh b/cpp/include/cugraph/prims/copy_v_transform_reduce_in_out_nbr.cuh index 0e5f873dfcd..9fb7e8bf2a1 100644 --- a/cpp/include/cugraph/prims/copy_v_transform_reduce_in_out_nbr.cuh +++ b/cpp/include/cugraph/prims/copy_v_transform_reduce_in_out_nbr.cuh @@ -74,7 +74,7 @@ __global__ void for_all_major_for_all_nbr_hypersparse( auto dcs_nzd_vertex_count = *(matrix_partition.get_dcs_nzd_vertex_count()); - property_add edge_property_add{}; + property_op edge_property_add{}; while (idx < static_cast(dcs_nzd_vertex_count)) { auto major = *(matrix_partition.get_major_from_major_hypersparse_idx_nocheck(static_cast(idx))); @@ -169,7 +169,7 @@ __global__ void for_all_major_for_all_nbr_low_degree( auto major_start_offset = static_cast(major_first - matrix_partition.get_major_first()); auto idx = static_cast(tid); - property_add edge_property_add{}; + property_op edge_property_add{}; while (idx < static_cast(major_last - major_first)) { auto major_offset = major_start_offset + idx; vertex_t const* indices{nullptr}; @@ -271,7 +271,7 @@ __global__ void for_all_major_for_all_nbr_mid_degree( __shared__ typename WarpReduce::TempStorage temp_storage[copy_v_transform_reduce_nbr_for_all_block_size / raft::warp_size()]; - property_add edge_property_add{}; + property_op edge_property_add{}; while (idx < static_cast(major_last - major_first)) { auto major_offset = major_start_offset + idx; vertex_t const* indices{nullptr}; @@ -355,7 +355,7 @@ __global__ void for_all_major_for_all_nbr_high_degree( cub::BlockReduce; __shared__ typename BlockReduce::TempStorage temp_storage; - property_add edge_property_add{}; + property_op edge_property_add{}; while (idx < static_cast(major_last - major_first)) { auto major_offset = major_start_offset + idx; vertex_t const* indices{nullptr}; diff --git a/cpp/include/cugraph/prims/property_op_utils.cuh b/cpp/include/cugraph/prims/property_op_utils.cuh index b3a859479dd..e0ad03b4762 100644 --- a/cpp/include/cugraph/prims/property_op_utils.cuh +++ b/cpp/include/cugraph/prims/property_op_utils.cuh @@ -17,6 +17,7 @@ #include +#include #include #include @@ -121,12 +122,12 @@ struct cast_edge_op_bool_to_integer { } }; -template -struct property_add : public thrust::plus { +template typename Op> +struct property_op : public Op { }; -template -struct property_add> +template typename Op> +struct property_op, Op> : public thrust:: binary_function, thrust::tuple, thrust::tuple> { using Type = thrust::tuple; @@ -135,7 +136,8 @@ struct property_add> template __host__ __device__ constexpr auto sum_impl(T& t1, T& t2, std::index_sequence) { - return thrust::make_tuple((thrust::get(t1) + thrust::get(t2))...); + return thrust::make_tuple((Op::type>()( + thrust::get(t1), thrust::get(t2)))...); } public: @@ -145,6 +147,30 @@ struct property_add> } }; +template +constexpr auto op_dispatch(raft::comms::op_t op, F&& f) +{ + switch (op) { + case raft::comms::op_t::SUM: { + return std::invoke(f, property_op()); + } break; + case raft::comms::op_t::PROD: { + CUGRAPH_FAIL("raft::comms::op_t::PROD is not supported for op_dispatch"); + return std::invoke_result_t>{}; + } break; + case raft::comms::op_t::MIN: { + return std::invoke(f, property_op()); + } break; + case raft::comms::op_t::MAX: { + return std::invoke(f, property_op()); + } break; + default: { + CUGRAPH_FAIL("Unhandled raft::comms::op_t"); + return std::invoke_result_t>{}; + } + }; +} + template __device__ std::enable_if_t::value, void> atomic_accumulate_edge_op_result(Iterator iter, T const& value) diff --git a/cpp/include/cugraph/prims/reduce_v.cuh b/cpp/include/cugraph/prims/reduce_v.cuh index b63c0ed43b7..48b5445d5e2 100644 --- a/cpp/include/cugraph/prims/reduce_v.cuh +++ b/cpp/include/cugraph/prims/reduce_v.cuh @@ -49,17 +49,19 @@ template T reduce_v(raft::handle_t const& handle, GraphViewType const& graph_view, VertexValueInputIterator vertex_value_input_first, - T init) + T init = T{}, + raft::comms::op_t op = raft::comms::op_t::SUM) { - auto ret = thrust::reduce( - handle.get_thrust_policy(), - vertex_value_input_first, - vertex_value_input_first + graph_view.get_number_of_local_vertices(), - ((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init, - property_add()); + auto ret = op_dispatch(op, [&handle, &graph_view, vertex_value_input_first, init](auto op) { + return thrust::reduce( + handle.get_thrust_policy(), + vertex_value_input_first, + vertex_value_input_first + graph_view.get_number_of_local_vertices(), + ((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init, + op); + }); if constexpr (GraphViewType::is_multi_gpu) { - ret = - host_scalar_allreduce(handle.get_comms(), ret, raft::comms::op_t::SUM, handle.get_stream()); + ret = host_scalar_allreduce(handle.get_comms(), ret, op, handle.get_stream()); } return ret; } @@ -87,17 +89,19 @@ T reduce_v(raft::handle_t const& handle, GraphViewType const& graph_view, InputIterator input_first, InputIterator input_last, - T init) + T init = T{}, + raft::comms::op_t op = raft::comms::op_t::SUM) { - auto ret = thrust::reduce( - handle.get_thrust_policy(), - input_first, - input_last, - ((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init, - property_add()); + auto ret = op_dispatch(op, [&handle, &graph_view, input_first, input_last, init](auto op) { + return thrust::reduce( + handle.get_thrust_policy(), + input_first, + input_last, + ((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init, + op); + }); if constexpr (GraphViewType::is_multi_gpu) { - ret = - host_scalar_allreduce(handle.get_comms(), ret, raft::comms::op_t::SUM, handle.get_stream()); + ret = host_scalar_allreduce(handle.get_comms(), ret, op, handle.get_stream()); } return ret; } diff --git a/cpp/include/cugraph/prims/transform_reduce_e.cuh b/cpp/include/cugraph/prims/transform_reduce_e.cuh index 86a67a18e4c..9698a8e968a 100644 --- a/cpp/include/cugraph/prims/transform_reduce_e.cuh +++ b/cpp/include/cugraph/prims/transform_reduce_e.cuh @@ -68,7 +68,7 @@ __global__ void for_all_major_for_all_nbr_hypersparse( using BlockReduce = cub::BlockReduce; __shared__ typename BlockReduce::TempStorage temp_storage; - property_add edge_property_add{}; + property_op edge_property_add{}; e_op_result_t e_op_result_sum{}; while (idx < static_cast(dcs_nzd_vertex_count)) { auto major = @@ -154,7 +154,7 @@ __global__ void for_all_major_for_all_nbr_low_degree( using BlockReduce = cub::BlockReduce; __shared__ typename BlockReduce::TempStorage temp_storage; - property_add edge_property_add{}; + property_op edge_property_add{}; e_op_result_t e_op_result_sum{}; while (idx < static_cast(major_last - major_first)) { auto major_offset = major_start_offset + idx; @@ -241,7 +241,7 @@ __global__ void for_all_major_for_all_nbr_mid_degree( using BlockReduce = cub::BlockReduce; __shared__ typename BlockReduce::TempStorage temp_storage; - property_add edge_property_add{}; + property_op edge_property_add{}; e_op_result_t e_op_result_sum{}; while (idx < static_cast(major_last - major_first)) { auto major_offset = major_start_offset + idx; @@ -312,7 +312,7 @@ __global__ void for_all_major_for_all_nbr_high_degree( using BlockReduce = cub::BlockReduce; __shared__ typename BlockReduce::TempStorage temp_storage; - property_add edge_property_add{}; + property_op edge_property_add{}; e_op_result_t e_op_result_sum{}; while (idx < static_cast(major_last - major_first)) { auto major_offset = major_start_offset + idx; @@ -407,7 +407,7 @@ T transform_reduce_e(raft::handle_t const& handle, using edge_t = typename GraphViewType::edge_type; using weight_t = typename GraphViewType::weight_type; - property_add edge_property_add{}; + property_op edge_property_add{}; auto result_buffer = allocate_dataframe_buffer(1, handle.get_stream()); thrust::fill(handle.get_thrust_policy(), diff --git a/cpp/include/cugraph/prims/transform_reduce_v.cuh b/cpp/include/cugraph/prims/transform_reduce_v.cuh index 6b909ecc120..cba0fd86f92 100644 --- a/cpp/include/cugraph/prims/transform_reduce_v.cuh +++ b/cpp/include/cugraph/prims/transform_reduce_v.cuh @@ -53,18 +53,21 @@ T transform_reduce_v(raft::handle_t const& handle, GraphViewType const& graph_view, VertexValueInputIterator vertex_value_input_first, VertexOp v_op, - T init) + T init = T{}, + raft::comms::op_t op = raft::comms::op_t::SUM) { - auto ret = thrust::transform_reduce( - handle.get_thrust_policy(), - vertex_value_input_first, - vertex_value_input_first + graph_view.get_number_of_local_vertices(), - v_op, - ((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init, - property_add()); + auto ret = + op_dispatch(op, [&handle, &graph_view, vertex_value_input_first, v_op, init](auto op) { + return thrust::transform_reduce( + handle.get_thrust_policy(), + vertex_value_input_first, + vertex_value_input_first + graph_view.get_number_of_local_vertices(), + v_op, + ((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init, + op); + }); if (GraphViewType::is_multi_gpu) { - ret = - host_scalar_allreduce(handle.get_comms(), ret, raft::comms::op_t::SUM, handle.get_stream()); + ret = host_scalar_allreduce(handle.get_comms(), ret, op, handle.get_stream()); } return ret; } @@ -97,18 +100,20 @@ T transform_reduce_v(raft::handle_t const& handle, InputIterator input_first, InputIterator input_last, VertexOp v_op, - T init) + T init = T{}, + raft::comms::op_t op = raft::comms::op_t::SUM) { - auto ret = thrust::transform_reduce( - handle.get_thrust_policy(), - input_first, - input_last, - v_op, - ((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init, - property_add()); + auto ret = op_dispatch(op, [&handle, input_first, input_last, v_op, init](auto op) { + return thrust::transform_reduce( + handle.get_thrust_policy(), + input_first, + input_last, + v_op, + ((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init, + op); + }); if (GraphViewType::is_multi_gpu) { - ret = - host_scalar_allreduce(handle.get_comms(), ret, raft::comms::op_t::SUM, handle.get_stream()); + ret = host_scalar_allreduce(handle.get_comms(), ret, op, handle.get_stream()); } return ret; } diff --git a/cpp/tests/prims/mg_reduce_v.cu b/cpp/tests/prims/mg_reduce_v.cu index e57cbce089a..94cd69965a1 100644 --- a/cpp/tests/prims/mg_reduce_v.cu +++ b/cpp/tests/prims/mg_reduce_v.cu @@ -26,6 +26,7 @@ #include #include +#include #include #include @@ -119,11 +120,14 @@ struct generate_impl { template struct result_compare { - static constexpr double threshold_ratio{1e-3}; + static constexpr double threshold_ratio{1e-2}; constexpr auto operator()(const T& t1, const T& t2) { if constexpr (std::is_floating_point_v) { - return std::abs(t1 - t2) < (std::max(t1, t2) * threshold_ratio); + bool passed = (t1 == t2) // when t1 == t2 == 0 + || + (std::abs(t1 - t2) < (std::max(std::abs(t1), std::abs(t2)) * threshold_ratio)); + return passed; } return t1 == t2; } @@ -144,7 +148,10 @@ struct result_compare> { constexpr bool equal(T t1, T t2) { if constexpr (std::is_floating_point_v) { - return std::abs(t1 - t2) < (std::max(t1, t2) * threshold_ratio); + bool passed = (t1 == t2) // when t1 == t2 == 0 + || + (std::abs(t1 - t2) < (std::max(std::abs(t1), std::abs(t2)) * threshold_ratio)); + return passed; } return t1 == t2; } @@ -231,28 +238,37 @@ class Tests_MG_ReduceV const int initial_value = 10; auto property_initial_value = generate::initial_value(initial_value); + using property_t = decltype(property_initial_value); auto property_data = generate::property((*d_mg_renumber_map_labels), hash_bin_count, handle); auto property_iter = get_property_iterator(property_data); - if (cugraph::test::g_perf) { - CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement - handle.get_comms().barrier(); - hr_clock.start(); - } - - auto result = reduce_v(handle, - mg_graph_view, - property_iter, - property_iter + (*d_mg_renumber_map_labels).size(), - property_initial_value); - - if (cugraph::test::g_perf) { - CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement - handle.get_comms().barrier(); - double elapsed_time{0.0}; - hr_clock.stop(&elapsed_time); - std::cout << "MG reduce_v took " << elapsed_time * 1e-6 << " s.\n"; + raft::comms::op_t ops[] = { + raft::comms::op_t::SUM, raft::comms::op_t::MIN, raft::comms::op_t::MAX}; + + std::unordered_map results; + + for (auto op : ops) { + if (cugraph::test::g_perf) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + handle.get_comms().barrier(); + hr_clock.start(); + } + + results[op] = reduce_v(handle, + mg_graph_view, + property_iter, + property_iter + (*d_mg_renumber_map_labels).size(), + property_initial_value, + op); + + if (cugraph::test::g_perf) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + handle.get_comms().barrier(); + double elapsed_time{0.0}; + hr_clock.stop(&elapsed_time); + std::cout << "MG reduce_v took " << elapsed_time * 1e-6 << " s.\n"; + } } //// 4. compare SG & MG results @@ -270,16 +286,19 @@ class Tests_MG_ReduceV hash_bin_count, handle); auto sg_property_iter = get_property_iterator(sg_property_data); - using property_t = decltype(property_initial_value); - - auto expected_result = - thrust::reduce(handle.get_thrust_policy(), - sg_property_iter, - sg_property_iter + sg_graph_view.get_number_of_local_vertices(), - property_initial_value, - cugraph::property_add()); - result_compare compare{}; - ASSERT_TRUE(compare(expected_result, result)); + + for (auto op : ops) { + auto expected_result = cugraph::op_dispatch( + op, [&handle, &sg_graph_view, sg_property_iter, property_initial_value](auto op) { + return thrust::reduce(handle.get_thrust_policy(), + sg_property_iter, + sg_property_iter + sg_graph_view.get_number_of_local_vertices(), + property_initial_value, + op); + }); + result_compare compare{}; + ASSERT_TRUE(compare(expected_result, results[op])); + } } } }; diff --git a/cpp/tests/prims/mg_transform_reduce_v.cu b/cpp/tests/prims/mg_transform_reduce_v.cu index a93d1c0045b..f33fc97ee09 100644 --- a/cpp/tests/prims/mg_transform_reduce_v.cu +++ b/cpp/tests/prims/mg_transform_reduce_v.cu @@ -70,7 +70,10 @@ struct result_compare { constexpr auto operator()(const T& t1, const T& t2) { if constexpr (std::is_floating_point_v) { - return std::abs(t1 - t2) < (std::max(t1, t2) * threshold_ratio); + bool passed = (t1 == t2) // when t1 == t2 == 0 + || + (std::abs(t1 - t2) < (std::max(std::abs(t1), std::abs(t2)) * threshold_ratio)); + return passed; } return t1 == t2; } @@ -91,7 +94,10 @@ struct result_compare> { constexpr bool equal(T t1, T t2) { if constexpr (std::is_floating_point_v) { - return std::abs(t1 - t2) < (std::max(t1, t2) * threshold_ratio); + bool passed = (t1 == t2) // when t1 == t2 == 0 + || + (std::abs(t1 - t2) < (std::max(std::abs(t1), std::abs(t2)) * threshold_ratio)); + return passed; } return t1 == t2; } @@ -182,22 +188,29 @@ class Tests_MG_TransformReduceV property_transform prop(hash_bin_count); auto property_initial_value = generate::initial_value(initial_value); - - if (cugraph::test::g_perf) { - CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement - handle.get_comms().barrier(); - hr_clock.start(); - } - - auto result = transform_reduce_v( - handle, mg_graph_view, d_mg_renumber_map_labels->begin(), prop, property_initial_value); - - if (cugraph::test::g_perf) { - CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement - handle.get_comms().barrier(); - double elapsed_time{0.0}; - hr_clock.stop(&elapsed_time); - std::cout << "MG transform reduce took " << elapsed_time * 1e-6 << " s.\n"; + using property_t = decltype(property_initial_value); + raft::comms::op_t ops[] = { + raft::comms::op_t::SUM, raft::comms::op_t::MIN, raft::comms::op_t::MAX}; + + std::unordered_map results; + + for (auto op : ops) { + if (cugraph::test::g_perf) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + handle.get_comms().barrier(); + hr_clock.start(); + } + + results[op] = transform_reduce_v( + handle, mg_graph_view, d_mg_renumber_map_labels->begin(), prop, property_initial_value, op); + + if (cugraph::test::g_perf) { + CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement + handle.get_comms().barrier(); + double elapsed_time{0.0}; + hr_clock.stop(&elapsed_time); + std::cout << "MG transform reduce took " << elapsed_time * 1e-6 << " s.\n"; + } } //// 4. compare SG & MG results @@ -208,17 +221,21 @@ class Tests_MG_TransformReduceV cugraph::test::construct_graph( handle, input_usecase, true, false); auto sg_graph_view = sg_graph.view(); - using property_t = decltype(property_initial_value); - - auto expected_result = thrust::transform_reduce( - handle.get_thrust_policy(), - thrust::make_counting_iterator(sg_graph_view.get_local_vertex_first()), - thrust::make_counting_iterator(sg_graph_view.get_local_vertex_last()), - prop, - property_initial_value, - cugraph::property_add()); - result_compare compare{}; - ASSERT_TRUE(compare(expected_result, result)); + + for (auto op : ops) { + auto expected_result = cugraph::op_dispatch( + op, [&handle, &sg_graph_view, prop, property_initial_value](auto op) { + return thrust::transform_reduce( + handle.get_thrust_policy(), + thrust::make_counting_iterator(sg_graph_view.get_local_vertex_first()), + thrust::make_counting_iterator(sg_graph_view.get_local_vertex_last()), + prop, + property_initial_value, + op); + }); + result_compare compare{}; + ASSERT_TRUE(compare(expected_result, results[op])); + } } } };