Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add raft ops for reduce_v and transform_reduce_v #1902

Merged
merged 2 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ __global__ void for_all_major_for_all_nbr_hypersparse(

auto dcs_nzd_vertex_count = *(matrix_partition.get_dcs_nzd_vertex_count());

property_add<T> edge_property_add{};
property_op<T, thrust::plus> edge_property_add{};
while (idx < static_cast<size_t>(dcs_nzd_vertex_count)) {
auto major =
*(matrix_partition.get_major_from_major_hypersparse_idx_nocheck(static_cast<vertex_t>(idx)));
Expand Down Expand Up @@ -169,7 +169,7 @@ __global__ void for_all_major_for_all_nbr_low_degree(
auto major_start_offset = static_cast<size_t>(major_first - matrix_partition.get_major_first());
auto idx = static_cast<size_t>(tid);

property_add<T> edge_property_add{};
property_op<T, thrust::plus> edge_property_add{};
while (idx < static_cast<size_t>(major_last - major_first)) {
auto major_offset = major_start_offset + idx;
vertex_t const* indices{nullptr};
Expand Down Expand Up @@ -271,7 +271,7 @@ __global__ void for_all_major_for_all_nbr_mid_degree(
__shared__ typename WarpReduce::TempStorage
temp_storage[copy_v_transform_reduce_nbr_for_all_block_size / raft::warp_size()];

property_add<e_op_result_t> edge_property_add{};
property_op<e_op_result_t, thrust::plus> edge_property_add{};
while (idx < static_cast<size_t>(major_last - major_first)) {
auto major_offset = major_start_offset + idx;
vertex_t const* indices{nullptr};
Expand Down Expand Up @@ -355,7 +355,7 @@ __global__ void for_all_major_for_all_nbr_high_degree(
cub::BlockReduce<e_op_result_t, copy_v_transform_reduce_nbr_for_all_block_size>;
__shared__ typename BlockReduce::TempStorage temp_storage;

property_add<e_op_result_t> edge_property_add{};
property_op<e_op_result_t, thrust::plus> edge_property_add{};
while (idx < static_cast<size_t>(major_last - major_first)) {
auto major_offset = major_start_offset + idx;
vertex_t const* indices{nullptr};
Expand Down
35 changes: 30 additions & 5 deletions cpp/include/cugraph/prims/property_op_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <cugraph/utilities/thrust_tuple_utils.cuh>

#include <raft/comms/comms.hpp>
#include <raft/device_atomics.cuh>

#include <thrust/detail/type_traits/iterator/is_discard_iterator.h>
Expand Down Expand Up @@ -121,12 +122,12 @@ struct cast_edge_op_bool_to_integer {
}
};

template <typename T>
struct property_add : public thrust::plus<T> {
template <typename T, template <typename> typename Op>
struct property_op : public Op<T> {
};

template <typename... Args>
struct property_add<thrust::tuple<Args...>>
template <typename... Args, template <typename> typename Op>
struct property_op<thrust::tuple<Args...>, Op>
: public thrust::
binary_function<thrust::tuple<Args...>, thrust::tuple<Args...>, thrust::tuple<Args...>> {
using Type = thrust::tuple<Args...>;
Expand All @@ -135,7 +136,8 @@ struct property_add<thrust::tuple<Args...>>
template <typename T, std::size_t... Is>
__host__ __device__ constexpr auto sum_impl(T& t1, T& t2, std::index_sequence<Is...>)
{
return thrust::make_tuple((thrust::get<Is>(t1) + thrust::get<Is>(t2))...);
return thrust::make_tuple((Op<typename thrust::tuple_element<Is, Type>::type>()(
thrust::get<Is>(t1), thrust::get<Is>(t2)))...);
}

public:
Expand All @@ -145,6 +147,29 @@ struct property_add<thrust::tuple<Args...>>
}
};

template <typename T, typename F>
constexpr auto op_dispatch(raft::comms::op_t op, F&& f)
{
switch (op) {
case raft::comms::op_t::SUM: {
return std::invoke(f, property_op<T, thrust::plus>());
} break;
case raft::comms::op_t::PROD: {
return std::invoke(f, property_op<T, thrust::multiplies>());
} break;
case raft::comms::op_t::MIN: {
return std::invoke(f, property_op<T, thrust::less>());
} break;
case raft::comms::op_t::MAX: {
return std::invoke(f, property_op<T, thrust::greater>());
} break;
default: {
CUGRAPH_FAIL("Unhandled raft::comms::op_t");
return std::invoke_result_t<F, property_op<T, thrust::plus>>{};
}
};
}

template <typename Iterator, typename T>
__device__ std::enable_if_t<thrust::detail::is_discard_iterator<Iterator>::value, void>
atomic_accumulate_edge_op_result(Iterator iter, T const& value)
Expand Down
40 changes: 22 additions & 18 deletions cpp/include/cugraph/prims/reduce_v.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,19 @@ template <typename GraphViewType, typename VertexValueInputIterator, typename T>
T reduce_v(raft::handle_t const& handle,
GraphViewType const& graph_view,
VertexValueInputIterator vertex_value_input_first,
T init)
T init = T{},
raft::comms::op_t op = raft::comms::op_t::SUM)
{
auto ret = thrust::reduce(
handle.get_thrust_policy(),
vertex_value_input_first,
vertex_value_input_first + graph_view.get_number_of_local_vertices(),
((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init,
property_add<T>());
auto ret = op_dispatch<T>(op, [&handle, &graph_view, vertex_value_input_first, init](auto op) {
return thrust::reduce(
handle.get_thrust_policy(),
vertex_value_input_first,
vertex_value_input_first + graph_view.get_number_of_local_vertices(),
((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init,
op);
});
if constexpr (GraphViewType::is_multi_gpu) {
ret =
host_scalar_allreduce(handle.get_comms(), ret, raft::comms::op_t::SUM, handle.get_stream());
ret = host_scalar_allreduce(handle.get_comms(), ret, op, handle.get_stream());
}
return ret;
}
Expand Down Expand Up @@ -87,17 +89,19 @@ T reduce_v(raft::handle_t const& handle,
GraphViewType const& graph_view,
InputIterator input_first,
InputIterator input_last,
T init)
T init = T{},
raft::comms::op_t op = raft::comms::op_t::SUM)
{
auto ret = thrust::reduce(
handle.get_thrust_policy(),
input_first,
input_last,
((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init,
property_add<T>());
auto ret = op_dispatch<T>(op, [&handle, &graph_view, input_first, input_last, init](auto op) {
return thrust::reduce(
handle.get_thrust_policy(),
input_first,
input_last,
((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init,
op);
});
if constexpr (GraphViewType::is_multi_gpu) {
ret =
host_scalar_allreduce(handle.get_comms(), ret, raft::comms::op_t::SUM, handle.get_stream());
ret = host_scalar_allreduce(handle.get_comms(), ret, op, handle.get_stream());
}
return ret;
}
Expand Down
10 changes: 5 additions & 5 deletions cpp/include/cugraph/prims/transform_reduce_e.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ __global__ void for_all_major_for_all_nbr_hypersparse(
using BlockReduce = cub::BlockReduce<e_op_result_t, transform_reduce_e_for_all_block_size>;
__shared__ typename BlockReduce::TempStorage temp_storage;

property_add<e_op_result_t> edge_property_add{};
property_op<e_op_result_t, thrust::plus> edge_property_add{};
e_op_result_t e_op_result_sum{};
while (idx < static_cast<size_t>(dcs_nzd_vertex_count)) {
auto major =
Expand Down Expand Up @@ -154,7 +154,7 @@ __global__ void for_all_major_for_all_nbr_low_degree(
using BlockReduce = cub::BlockReduce<e_op_result_t, transform_reduce_e_for_all_block_size>;
__shared__ typename BlockReduce::TempStorage temp_storage;

property_add<e_op_result_t> edge_property_add{};
property_op<e_op_result_t, thrust::plus> edge_property_add{};
e_op_result_t e_op_result_sum{};
while (idx < static_cast<size_t>(major_last - major_first)) {
auto major_offset = major_start_offset + idx;
Expand Down Expand Up @@ -241,7 +241,7 @@ __global__ void for_all_major_for_all_nbr_mid_degree(

using BlockReduce = cub::BlockReduce<e_op_result_t, transform_reduce_e_for_all_block_size>;
__shared__ typename BlockReduce::TempStorage temp_storage;
property_add<e_op_result_t> edge_property_add{};
property_op<e_op_result_t, thrust::plus> edge_property_add{};
e_op_result_t e_op_result_sum{};
while (idx < static_cast<size_t>(major_last - major_first)) {
auto major_offset = major_start_offset + idx;
Expand Down Expand Up @@ -312,7 +312,7 @@ __global__ void for_all_major_for_all_nbr_high_degree(

using BlockReduce = cub::BlockReduce<e_op_result_t, transform_reduce_e_for_all_block_size>;
__shared__ typename BlockReduce::TempStorage temp_storage;
property_add<e_op_result_t> edge_property_add{};
property_op<e_op_result_t, thrust::plus> edge_property_add{};
e_op_result_t e_op_result_sum{};
while (idx < static_cast<size_t>(major_last - major_first)) {
auto major_offset = major_start_offset + idx;
Expand Down Expand Up @@ -407,7 +407,7 @@ T transform_reduce_e(raft::handle_t const& handle,
using edge_t = typename GraphViewType::edge_type;
using weight_t = typename GraphViewType::weight_type;

property_add<T> edge_property_add{};
property_op<T, thrust::plus> edge_property_add{};

auto result_buffer = allocate_dataframe_buffer<T>(1, handle.get_stream());
thrust::fill(handle.get_thrust_policy(),
Expand Down
45 changes: 25 additions & 20 deletions cpp/include/cugraph/prims/transform_reduce_v.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,21 @@ T transform_reduce_v(raft::handle_t const& handle,
GraphViewType const& graph_view,
VertexValueInputIterator vertex_value_input_first,
VertexOp v_op,
T init)
T init = T{},
raft::comms::op_t op = raft::comms::op_t::SUM)
{
auto ret = thrust::transform_reduce(
handle.get_thrust_policy(),
vertex_value_input_first,
vertex_value_input_first + graph_view.get_number_of_local_vertices(),
v_op,
((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init,
property_add<T>());
auto ret =
op_dispatch<T>(op, [&handle, &graph_view, vertex_value_input_first, v_op, init](auto op) {
return thrust::transform_reduce(
handle.get_thrust_policy(),
vertex_value_input_first,
vertex_value_input_first + graph_view.get_number_of_local_vertices(),
v_op,
((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init,
op);
});
if (GraphViewType::is_multi_gpu) {
ret =
host_scalar_allreduce(handle.get_comms(), ret, raft::comms::op_t::SUM, handle.get_stream());
ret = host_scalar_allreduce(handle.get_comms(), ret, op, handle.get_stream());
}
return ret;
}
Expand Down Expand Up @@ -97,18 +100,20 @@ T transform_reduce_v(raft::handle_t const& handle,
InputIterator input_first,
InputIterator input_last,
VertexOp v_op,
T init)
T init = T{},
raft::comms::op_t op = raft::comms::op_t::SUM)
{
auto ret = thrust::transform_reduce(
handle.get_thrust_policy(),
input_first,
input_last,
v_op,
((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init,
property_add<T>());
auto ret = op_dispatch<T>(op, [&handle, input_first, input_last, v_op, init](auto op) {
return thrust::transform_reduce(
handle.get_thrust_policy(),
input_first,
input_last,
v_op,
((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init,
op);
});
if (GraphViewType::is_multi_gpu) {
ret =
host_scalar_allreduce(handle.get_comms(), ret, raft::comms::op_t::SUM, handle.get_stream());
ret = host_scalar_allreduce(handle.get_comms(), ret, op, handle.get_stream());
}
return ret;
}
Expand Down
83 changes: 52 additions & 31 deletions cpp/tests/prims/mg_reduce_v.cu
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <cuco/detail/hash_functions.cuh>
#include <cugraph/graph_view.hpp>
#include <cugraph/prims/property_op_utils.cuh>
#include <cugraph/prims/reduce_v.cuh>

#include <thrust/reduce.h>
Expand Down Expand Up @@ -119,11 +120,14 @@ struct generate_impl {

template <typename T>
struct result_compare {
static constexpr double threshold_ratio{1e-3};
static constexpr double threshold_ratio{1e-2};
constexpr auto operator()(const T& t1, const T& t2)
{
if constexpr (std::is_floating_point_v<T>) {
return std::abs(t1 - t2) < (std::max(t1, t2) * threshold_ratio);
bool passed = (t1 == t2) // when t1 == t2 == 0
||
(std::abs(t1 - t2) < (std::max(std::abs(t1), std::abs(t2)) * threshold_ratio));
return passed;
}
return t1 == t2;
}
Expand All @@ -144,7 +148,10 @@ struct result_compare<thrust::tuple<Args...>> {
constexpr bool equal(T t1, T t2)
{
if constexpr (std::is_floating_point_v<T>) {
return std::abs(t1 - t2) < (std::max(t1, t2) * threshold_ratio);
bool passed = (t1 == t2) // when t1 == t2 == 0
||
(std::abs(t1 - t2) < (std::max(std::abs(t1), std::abs(t2)) * threshold_ratio));
return passed;
}
return t1 == t2;
}
Expand Down Expand Up @@ -231,28 +238,39 @@ class Tests_MG_ReduceV
const int initial_value = 10;

auto property_initial_value = generate<result_t>::initial_value(initial_value);
using property_t = decltype(property_initial_value);
auto property_data =
generate<result_t>::property((*d_mg_renumber_map_labels), hash_bin_count, handle);
auto property_iter = get_property_iterator(property_data);

if (cugraph::test::g_perf) {
CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement
handle.get_comms().barrier();
hr_clock.start();
}

auto result = reduce_v(handle,
mg_graph_view,
property_iter,
property_iter + (*d_mg_renumber_map_labels).size(),
property_initial_value);

if (cugraph::test::g_perf) {
CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement
handle.get_comms().barrier();
double elapsed_time{0.0};
hr_clock.stop(&elapsed_time);
std::cout << "MG reduce_v took " << elapsed_time * 1e-6 << " s.\n";
raft::comms::op_t ops[] = {raft::comms::op_t::SUM,
// raft::comms::op_t::PROD,
kaatish marked this conversation as resolved.
Show resolved Hide resolved
raft::comms::op_t::MIN,
raft::comms::op_t::MAX};

std::unordered_map<raft::comms::op_t, property_t> results;

for (auto op : ops) {
if (cugraph::test::g_perf) {
CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement
handle.get_comms().barrier();
hr_clock.start();
}

results[op] = reduce_v(handle,
mg_graph_view,
property_iter,
property_iter + (*d_mg_renumber_map_labels).size(),
property_initial_value,
op);

if (cugraph::test::g_perf) {
CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement
handle.get_comms().barrier();
double elapsed_time{0.0};
hr_clock.stop(&elapsed_time);
std::cout << "MG reduce_v took " << elapsed_time * 1e-6 << " s.\n";
}
}

//// 4. compare SG & MG results
Expand All @@ -270,16 +288,19 @@ class Tests_MG_ReduceV
hash_bin_count,
handle);
auto sg_property_iter = get_property_iterator(sg_property_data);
using property_t = decltype(property_initial_value);

auto expected_result =
thrust::reduce(handle.get_thrust_policy(),
sg_property_iter,
sg_property_iter + sg_graph_view.get_number_of_local_vertices(),
property_initial_value,
cugraph::property_add<property_t>());
result_compare<property_t> compare{};
ASSERT_TRUE(compare(expected_result, result));

for (auto op : ops) {
auto expected_result = cugraph::op_dispatch<property_t>(
op, [&handle, &sg_graph_view, sg_property_iter, property_initial_value](auto op) {
return thrust::reduce(handle.get_thrust_policy(),
sg_property_iter,
sg_property_iter + sg_graph_view.get_number_of_local_vertices(),
property_initial_value,
op);
});
result_compare<property_t> compare{};
ASSERT_TRUE(compare(expected_result, results[op]));
}
}
}
};
Expand Down
Loading