Skip to content

Commit

Permalink
Add raft ops for reduce_v and transform_reduce_v (#1902)
Browse files Browse the repository at this point in the history
reduce_v and transform_reduce_v support raft::comms::op_t as a parameter. Tests have been updated accordingly.
Fixes #1903

Authors:
  - Kumar Aatish (https://github.com/kaatish)

Approvers:
  - Chuck Hastings (https://github.com/ChuckHastings)
  - Seunghwa Kang (https://github.com/seunghwak)

URL: #1902
  • Loading branch information
kaatish authored Oct 27, 2021
1 parent e2af5de commit 2f3ed7a
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ __global__ void for_all_major_for_all_nbr_hypersparse(

auto dcs_nzd_vertex_count = *(matrix_partition.get_dcs_nzd_vertex_count());

property_add<T> edge_property_add{};
property_op<T, thrust::plus> edge_property_add{};
while (idx < static_cast<size_t>(dcs_nzd_vertex_count)) {
auto major =
*(matrix_partition.get_major_from_major_hypersparse_idx_nocheck(static_cast<vertex_t>(idx)));
Expand Down Expand Up @@ -169,7 +169,7 @@ __global__ void for_all_major_for_all_nbr_low_degree(
auto major_start_offset = static_cast<size_t>(major_first - matrix_partition.get_major_first());
auto idx = static_cast<size_t>(tid);

property_add<T> edge_property_add{};
property_op<T, thrust::plus> edge_property_add{};
while (idx < static_cast<size_t>(major_last - major_first)) {
auto major_offset = major_start_offset + idx;
vertex_t const* indices{nullptr};
Expand Down Expand Up @@ -271,7 +271,7 @@ __global__ void for_all_major_for_all_nbr_mid_degree(
__shared__ typename WarpReduce::TempStorage
temp_storage[copy_v_transform_reduce_nbr_for_all_block_size / raft::warp_size()];

property_add<e_op_result_t> edge_property_add{};
property_op<e_op_result_t, thrust::plus> edge_property_add{};
while (idx < static_cast<size_t>(major_last - major_first)) {
auto major_offset = major_start_offset + idx;
vertex_t const* indices{nullptr};
Expand Down Expand Up @@ -355,7 +355,7 @@ __global__ void for_all_major_for_all_nbr_high_degree(
cub::BlockReduce<e_op_result_t, copy_v_transform_reduce_nbr_for_all_block_size>;
__shared__ typename BlockReduce::TempStorage temp_storage;

property_add<e_op_result_t> edge_property_add{};
property_op<e_op_result_t, thrust::plus> edge_property_add{};
while (idx < static_cast<size_t>(major_last - major_first)) {
auto major_offset = major_start_offset + idx;
vertex_t const* indices{nullptr};
Expand Down
36 changes: 31 additions & 5 deletions cpp/include/cugraph/prims/property_op_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <cugraph/utilities/thrust_tuple_utils.cuh>

#include <raft/comms/comms.hpp>
#include <raft/device_atomics.cuh>

#include <thrust/detail/type_traits/iterator/is_discard_iterator.h>
Expand Down Expand Up @@ -121,12 +122,12 @@ struct cast_edge_op_bool_to_integer {
}
};

template <typename T>
struct property_add : public thrust::plus<T> {
template <typename T, template <typename> typename Op>
struct property_op : public Op<T> {
};

template <typename... Args>
struct property_add<thrust::tuple<Args...>>
template <typename... Args, template <typename> typename Op>
struct property_op<thrust::tuple<Args...>, Op>
: public thrust::
binary_function<thrust::tuple<Args...>, thrust::tuple<Args...>, thrust::tuple<Args...>> {
using Type = thrust::tuple<Args...>;
Expand All @@ -135,7 +136,8 @@ struct property_add<thrust::tuple<Args...>>
template <typename T, std::size_t... Is>
__host__ __device__ constexpr auto sum_impl(T& t1, T& t2, std::index_sequence<Is...>)
{
return thrust::make_tuple((thrust::get<Is>(t1) + thrust::get<Is>(t2))...);
return thrust::make_tuple((Op<typename thrust::tuple_element<Is, Type>::type>()(
thrust::get<Is>(t1), thrust::get<Is>(t2)))...);
}

public:
Expand All @@ -145,6 +147,30 @@ struct property_add<thrust::tuple<Args...>>
}
};

template <typename T, typename F>
constexpr auto op_dispatch(raft::comms::op_t op, F&& f)
{
switch (op) {
case raft::comms::op_t::SUM: {
return std::invoke(f, property_op<T, thrust::plus>());
} break;
case raft::comms::op_t::PROD: {
CUGRAPH_FAIL("raft::comms::op_t::PROD is not supported for op_dispatch");
return std::invoke_result_t<F, property_op<T, thrust::multiplies>>{};
} break;
case raft::comms::op_t::MIN: {
return std::invoke(f, property_op<T, thrust::less>());
} break;
case raft::comms::op_t::MAX: {
return std::invoke(f, property_op<T, thrust::greater>());
} break;
default: {
CUGRAPH_FAIL("Unhandled raft::comms::op_t");
return std::invoke_result_t<F, property_op<T, thrust::plus>>{};
}
};
}

template <typename Iterator, typename T>
__device__ std::enable_if_t<thrust::detail::is_discard_iterator<Iterator>::value, void>
atomic_accumulate_edge_op_result(Iterator iter, T const& value)
Expand Down
40 changes: 22 additions & 18 deletions cpp/include/cugraph/prims/reduce_v.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,19 @@ template <typename GraphViewType, typename VertexValueInputIterator, typename T>
T reduce_v(raft::handle_t const& handle,
GraphViewType const& graph_view,
VertexValueInputIterator vertex_value_input_first,
T init)
T init = T{},
raft::comms::op_t op = raft::comms::op_t::SUM)
{
auto ret = thrust::reduce(
handle.get_thrust_policy(),
vertex_value_input_first,
vertex_value_input_first + graph_view.get_number_of_local_vertices(),
((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init,
property_add<T>());
auto ret = op_dispatch<T>(op, [&handle, &graph_view, vertex_value_input_first, init](auto op) {
return thrust::reduce(
handle.get_thrust_policy(),
vertex_value_input_first,
vertex_value_input_first + graph_view.get_number_of_local_vertices(),
((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init,
op);
});
if constexpr (GraphViewType::is_multi_gpu) {
ret =
host_scalar_allreduce(handle.get_comms(), ret, raft::comms::op_t::SUM, handle.get_stream());
ret = host_scalar_allreduce(handle.get_comms(), ret, op, handle.get_stream());
}
return ret;
}
Expand Down Expand Up @@ -87,17 +89,19 @@ T reduce_v(raft::handle_t const& handle,
GraphViewType const& graph_view,
InputIterator input_first,
InputIterator input_last,
T init)
T init = T{},
raft::comms::op_t op = raft::comms::op_t::SUM)
{
auto ret = thrust::reduce(
handle.get_thrust_policy(),
input_first,
input_last,
((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init,
property_add<T>());
auto ret = op_dispatch<T>(op, [&handle, &graph_view, input_first, input_last, init](auto op) {
return thrust::reduce(
handle.get_thrust_policy(),
input_first,
input_last,
((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init,
op);
});
if constexpr (GraphViewType::is_multi_gpu) {
ret =
host_scalar_allreduce(handle.get_comms(), ret, raft::comms::op_t::SUM, handle.get_stream());
ret = host_scalar_allreduce(handle.get_comms(), ret, op, handle.get_stream());
}
return ret;
}
Expand Down
10 changes: 5 additions & 5 deletions cpp/include/cugraph/prims/transform_reduce_e.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ __global__ void for_all_major_for_all_nbr_hypersparse(
using BlockReduce = cub::BlockReduce<e_op_result_t, transform_reduce_e_for_all_block_size>;
__shared__ typename BlockReduce::TempStorage temp_storage;

property_add<e_op_result_t> edge_property_add{};
property_op<e_op_result_t, thrust::plus> edge_property_add{};
e_op_result_t e_op_result_sum{};
while (idx < static_cast<size_t>(dcs_nzd_vertex_count)) {
auto major =
Expand Down Expand Up @@ -154,7 +154,7 @@ __global__ void for_all_major_for_all_nbr_low_degree(
using BlockReduce = cub::BlockReduce<e_op_result_t, transform_reduce_e_for_all_block_size>;
__shared__ typename BlockReduce::TempStorage temp_storage;

property_add<e_op_result_t> edge_property_add{};
property_op<e_op_result_t, thrust::plus> edge_property_add{};
e_op_result_t e_op_result_sum{};
while (idx < static_cast<size_t>(major_last - major_first)) {
auto major_offset = major_start_offset + idx;
Expand Down Expand Up @@ -241,7 +241,7 @@ __global__ void for_all_major_for_all_nbr_mid_degree(

using BlockReduce = cub::BlockReduce<e_op_result_t, transform_reduce_e_for_all_block_size>;
__shared__ typename BlockReduce::TempStorage temp_storage;
property_add<e_op_result_t> edge_property_add{};
property_op<e_op_result_t, thrust::plus> edge_property_add{};
e_op_result_t e_op_result_sum{};
while (idx < static_cast<size_t>(major_last - major_first)) {
auto major_offset = major_start_offset + idx;
Expand Down Expand Up @@ -312,7 +312,7 @@ __global__ void for_all_major_for_all_nbr_high_degree(

using BlockReduce = cub::BlockReduce<e_op_result_t, transform_reduce_e_for_all_block_size>;
__shared__ typename BlockReduce::TempStorage temp_storage;
property_add<e_op_result_t> edge_property_add{};
property_op<e_op_result_t, thrust::plus> edge_property_add{};
e_op_result_t e_op_result_sum{};
while (idx < static_cast<size_t>(major_last - major_first)) {
auto major_offset = major_start_offset + idx;
Expand Down Expand Up @@ -407,7 +407,7 @@ T transform_reduce_e(raft::handle_t const& handle,
using edge_t = typename GraphViewType::edge_type;
using weight_t = typename GraphViewType::weight_type;

property_add<T> edge_property_add{};
property_op<T, thrust::plus> edge_property_add{};

auto result_buffer = allocate_dataframe_buffer<T>(1, handle.get_stream());
thrust::fill(handle.get_thrust_policy(),
Expand Down
45 changes: 25 additions & 20 deletions cpp/include/cugraph/prims/transform_reduce_v.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,21 @@ T transform_reduce_v(raft::handle_t const& handle,
GraphViewType const& graph_view,
VertexValueInputIterator vertex_value_input_first,
VertexOp v_op,
T init)
T init = T{},
raft::comms::op_t op = raft::comms::op_t::SUM)
{
auto ret = thrust::transform_reduce(
handle.get_thrust_policy(),
vertex_value_input_first,
vertex_value_input_first + graph_view.get_number_of_local_vertices(),
v_op,
((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init,
property_add<T>());
auto ret =
op_dispatch<T>(op, [&handle, &graph_view, vertex_value_input_first, v_op, init](auto op) {
return thrust::transform_reduce(
handle.get_thrust_policy(),
vertex_value_input_first,
vertex_value_input_first + graph_view.get_number_of_local_vertices(),
v_op,
((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init,
op);
});
if (GraphViewType::is_multi_gpu) {
ret =
host_scalar_allreduce(handle.get_comms(), ret, raft::comms::op_t::SUM, handle.get_stream());
ret = host_scalar_allreduce(handle.get_comms(), ret, op, handle.get_stream());
}
return ret;
}
Expand Down Expand Up @@ -97,18 +100,20 @@ T transform_reduce_v(raft::handle_t const& handle,
InputIterator input_first,
InputIterator input_last,
VertexOp v_op,
T init)
T init = T{},
raft::comms::op_t op = raft::comms::op_t::SUM)
{
auto ret = thrust::transform_reduce(
handle.get_thrust_policy(),
input_first,
input_last,
v_op,
((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init,
property_add<T>());
auto ret = op_dispatch<T>(op, [&handle, input_first, input_last, v_op, init](auto op) {
return thrust::transform_reduce(
handle.get_thrust_policy(),
input_first,
input_last,
v_op,
((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init,
op);
});
if (GraphViewType::is_multi_gpu) {
ret =
host_scalar_allreduce(handle.get_comms(), ret, raft::comms::op_t::SUM, handle.get_stream());
ret = host_scalar_allreduce(handle.get_comms(), ret, op, handle.get_stream());
}
return ret;
}
Expand Down
81 changes: 50 additions & 31 deletions cpp/tests/prims/mg_reduce_v.cu
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <cuco/detail/hash_functions.cuh>
#include <cugraph/graph_view.hpp>
#include <cugraph/prims/property_op_utils.cuh>
#include <cugraph/prims/reduce_v.cuh>

#include <thrust/reduce.h>
Expand Down Expand Up @@ -119,11 +120,14 @@ struct generate_impl {

template <typename T>
struct result_compare {
static constexpr double threshold_ratio{1e-3};
static constexpr double threshold_ratio{1e-2};
constexpr auto operator()(const T& t1, const T& t2)
{
if constexpr (std::is_floating_point_v<T>) {
return std::abs(t1 - t2) < (std::max(t1, t2) * threshold_ratio);
bool passed = (t1 == t2) // when t1 == t2 == 0
||
(std::abs(t1 - t2) < (std::max(std::abs(t1), std::abs(t2)) * threshold_ratio));
return passed;
}
return t1 == t2;
}
Expand All @@ -144,7 +148,10 @@ struct result_compare<thrust::tuple<Args...>> {
constexpr bool equal(T t1, T t2)
{
if constexpr (std::is_floating_point_v<T>) {
return std::abs(t1 - t2) < (std::max(t1, t2) * threshold_ratio);
bool passed = (t1 == t2) // when t1 == t2 == 0
||
(std::abs(t1 - t2) < (std::max(std::abs(t1), std::abs(t2)) * threshold_ratio));
return passed;
}
return t1 == t2;
}
Expand Down Expand Up @@ -231,28 +238,37 @@ class Tests_MG_ReduceV
const int initial_value = 10;

auto property_initial_value = generate<result_t>::initial_value(initial_value);
using property_t = decltype(property_initial_value);
auto property_data =
generate<result_t>::property((*d_mg_renumber_map_labels), hash_bin_count, handle);
auto property_iter = get_property_iterator(property_data);

if (cugraph::test::g_perf) {
CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement
handle.get_comms().barrier();
hr_clock.start();
}

auto result = reduce_v(handle,
mg_graph_view,
property_iter,
property_iter + (*d_mg_renumber_map_labels).size(),
property_initial_value);

if (cugraph::test::g_perf) {
CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement
handle.get_comms().barrier();
double elapsed_time{0.0};
hr_clock.stop(&elapsed_time);
std::cout << "MG reduce_v took " << elapsed_time * 1e-6 << " s.\n";
raft::comms::op_t ops[] = {
raft::comms::op_t::SUM, raft::comms::op_t::MIN, raft::comms::op_t::MAX};

std::unordered_map<raft::comms::op_t, property_t> results;

for (auto op : ops) {
if (cugraph::test::g_perf) {
CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement
handle.get_comms().barrier();
hr_clock.start();
}

results[op] = reduce_v(handle,
mg_graph_view,
property_iter,
property_iter + (*d_mg_renumber_map_labels).size(),
property_initial_value,
op);

if (cugraph::test::g_perf) {
CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement
handle.get_comms().barrier();
double elapsed_time{0.0};
hr_clock.stop(&elapsed_time);
std::cout << "MG reduce_v took " << elapsed_time * 1e-6 << " s.\n";
}
}

//// 4. compare SG & MG results
Expand All @@ -270,16 +286,19 @@ class Tests_MG_ReduceV
hash_bin_count,
handle);
auto sg_property_iter = get_property_iterator(sg_property_data);
using property_t = decltype(property_initial_value);

auto expected_result =
thrust::reduce(handle.get_thrust_policy(),
sg_property_iter,
sg_property_iter + sg_graph_view.get_number_of_local_vertices(),
property_initial_value,
cugraph::property_add<property_t>());
result_compare<property_t> compare{};
ASSERT_TRUE(compare(expected_result, result));

for (auto op : ops) {
auto expected_result = cugraph::op_dispatch<property_t>(
op, [&handle, &sg_graph_view, sg_property_iter, property_initial_value](auto op) {
return thrust::reduce(handle.get_thrust_policy(),
sg_property_iter,
sg_property_iter + sg_graph_view.get_number_of_local_vertices(),
property_initial_value,
op);
});
result_compare<property_t> compare{};
ASSERT_TRUE(compare(expected_result, results[op]));
}
}
}
};
Expand Down
Loading

0 comments on commit 2f3ed7a

Please sign in to comment.