diff --git a/cpp/include/cugraph/detail/shuffle_wrappers.hpp b/cpp/include/cugraph/detail/shuffle_wrappers.hpp index e205110d4f4..db02ab94a5d 100644 --- a/cpp/include/cugraph/detail/shuffle_wrappers.hpp +++ b/cpp/include/cugraph/detail/shuffle_wrappers.hpp @@ -76,8 +76,8 @@ rmm::device_uvector shuffle_vertices_by_gpu_id( * @param[in/out] d_edgelist_minors Vertex IDs for columns (if the graph adjacency matrix is stored * as is) or rows (if the graph adjacency matrix is stored transposed) * @param[in/out] d_edgelist_weights Optional edge weights - * @param[in] groupby_and_count_local_partition If set to true, groupby and count edges based on - * (local partition ID, GPU ID) pairs (where GPU IDs are computed by applying the + * @param[in] groupby_and_count_local_partition_by_minor If set to true, groupby and count edges + * based on (local partition ID, GPU ID) pairs (where GPU IDs are computed by applying the * compute_gpu_id_from_vertex_t function to the minor vertex ID). If set to false, groupby and count * edges by just local partition ID. * @@ -91,7 +91,7 @@ rmm::device_uvector groupby_and_count_edgelist_by_local_partition_id( rmm::device_uvector& d_edgelist_majors, rmm::device_uvector& d_edgelist_minors, std::optional>& d_edgelist_weights, - bool groupby_and_count_local_partition = false); + bool groupby_and_count_local_partition_by_minor = false); } // namespace detail } // namespace cugraph diff --git a/cpp/include/cugraph/graph_functions.hpp b/cpp/include/cugraph/graph_functions.hpp index 5ddc244b183..c170ce65253 100644 --- a/cpp/include/cugraph/graph_functions.hpp +++ b/cpp/include/cugraph/graph_functions.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,9 +37,6 @@ struct renumber_meta_t> edge_t number_of_edges{}; partition_t partition{}; std::vector segment_offsets{}; - - vertex_t num_local_unique_edge_majors{}; - vertex_t num_local_unique_edge_minors{}; }; template diff --git a/cpp/include/cugraph/prims/copy_v_transform_reduce_key_aggregated_out_nbr.cuh b/cpp/include/cugraph/prims/copy_v_transform_reduce_key_aggregated_out_nbr.cuh index 1dee131a000..1ff109c7766 100644 --- a/cpp/include/cugraph/prims/copy_v_transform_reduce_key_aggregated_out_nbr.cuh +++ b/cpp/include/cugraph/prims/copy_v_transform_reduce_key_aggregated_out_nbr.cuh @@ -482,7 +482,7 @@ void copy_v_transform_reduce_key_aggregated_out_nbr( rmm::device_uvector rx_key_aggregated_edge_weights(0, handle.get_stream()); std::forward_as_tuple( std::tie(rx_major_vertices, rx_minor_keys, rx_key_aggregated_edge_weights), std::ignore) = - groupby_gpuid_and_shuffle_values( + groupby_gpu_id_and_shuffle_values( col_comm, triplet_first, triplet_first + tmp_major_vertices.size(), diff --git a/cpp/include/cugraph/prims/transform_reduce_by_adj_matrix_row_col_key_e.cuh b/cpp/include/cugraph/prims/transform_reduce_by_adj_matrix_row_col_key_e.cuh index 7f4cad5eded..c81cf2d133e 100644 --- a/cpp/include/cugraph/prims/transform_reduce_by_adj_matrix_row_col_key_e.cuh +++ b/cpp/include/cugraph/prims/transform_reduce_by_adj_matrix_row_col_key_e.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -505,7 +505,7 @@ transform_reduce_by_adj_matrix_row_col_key_e( rmm::device_uvector rx_unique_keys(0, handle.get_stream()); auto rx_value_for_unique_key_buffer = allocate_dataframe_buffer(0, handle.get_stream()); std::tie(rx_unique_keys, rx_value_for_unique_key_buffer, std::ignore) = - groupby_gpuid_and_shuffle_kv_pairs( + groupby_gpu_id_and_shuffle_kv_pairs( comm, tmp_keys.begin(), tmp_keys.end(), diff --git a/cpp/include/cugraph/utilities/collect_comm.cuh b/cpp/include/cugraph/utilities/collect_comm.cuh index 8b89d941885..5b414f1f1eb 100644 --- a/cpp/include/cugraph/utilities/collect_comm.cuh +++ b/cpp/include/cugraph/utilities/collect_comm.cuh @@ -103,7 +103,7 @@ collect_values_for_keys(raft::comms::comms_t const& comm, { rmm::device_uvector rx_unique_keys(0, stream_view); std::vector rx_value_counts{}; - std::tie(rx_unique_keys, rx_value_counts) = groupby_gpuid_and_shuffle_values( + std::tie(rx_unique_keys, rx_value_counts) = groupby_gpu_id_and_shuffle_values( comm, unique_keys.begin(), unique_keys.end(), @@ -228,7 +228,7 @@ collect_values_for_unique_keys(raft::comms::comms_t const& comm, { rmm::device_uvector rx_unique_keys(0, stream_view); std::vector rx_value_counts{}; - std::tie(rx_unique_keys, rx_value_counts) = groupby_gpuid_and_shuffle_values( + std::tie(rx_unique_keys, rx_value_counts) = groupby_gpu_id_and_shuffle_values( comm, unique_keys.begin(), unique_keys.end(), diff --git a/cpp/include/cugraph/utilities/cython.hpp b/cpp/include/cugraph/utilities/cython.hpp index 100a9d7db5e..7cc6afb8aee 100644 --- a/cpp/include/cugraph/utilities/cython.hpp +++ b/cpp/include/cugraph/utilities/cython.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -588,7 +588,7 @@ template std::unique_ptr> call_shuffle( raft::handle_t const& handle, vertex_t* - edgelist_major_vertices, // [IN / OUT]: groupby_gpuid_and_shuffle_values() sorts in-place + edgelist_major_vertices, // [IN / OUT]: groupby_gpu_id_and_shuffle_values() sorts in-place vertex_t* edgelist_minor_vertices, // [IN / OUT] weight_t* edgelist_weights, // [IN / OUT] edge_t num_edgelist_edges); diff --git a/cpp/include/cugraph/utilities/shuffle_comm.cuh b/cpp/include/cugraph/utilities/shuffle_comm.cuh index f10f9db95e1..cfed6c33dd3 100644 --- a/cpp/include/cugraph/utilities/shuffle_comm.cuh +++ b/cpp/include/cugraph/utilities/shuffle_comm.cuh @@ -125,21 +125,322 @@ compute_tx_rx_counts_offsets_ranks(raft::comms::comms_t const& comm, return std::make_tuple(tx_counts, tx_offsets, tx_dst_ranks, rx_counts, rx_offsets, rx_src_ranks); } +template +struct key_group_id_less_t { + KeyToGroupIdOp key_to_group_id_op{}; + int pivot{}; + __device__ bool operator()(key_type k) const { return key_to_group_id_op(k) < pivot; } +}; + +template +struct value_group_id_less_t { + ValueToGroupIdOp value_to_group_id_op{}; + int pivot{}; + __device__ bool operator()(value_type v) const { return value_to_group_id_op(v) < pivot; } +}; + +template +struct kv_pair_group_id_less_t { + KeyToGroupIdOp key_to_group_id_op{}; + int pivot{}; + __device__ bool operator()(thrust::tuple t) const + { + return key_to_group_id_op(thrust::get<0>(t)) < pivot; + } +}; + +template +struct value_group_id_greater_equal_t { + ValueToGroupIdOp value_to_group_id_op{}; + int pivot{}; + __device__ bool operator()(value_type v) const { return value_to_group_id_op(v) >= pivot; } +}; + +template +struct kv_pair_group_id_greater_equal_t { + KeyToGroupIdOp key_to_group_id_op{}; + int pivot{}; + __device__ bool operator()(thrust::tuple t) const + { + return key_to_group_id_op(thrust::get<0>(t)) >= pivot; + } +}; + +// Use roughly half temporary buffer than thrust::partition (if first & second partition sizes are +// comparable). This also uses multiple smaller allocations than one single allocation (thrust::sort +// does this) of the same aggregate size if the input iterators are the zip iterators (this is more +// favorable to the pool allocator). +template +ValueIterator mem_frugal_partition( + ValueIterator value_first, + ValueIterator value_last, + ValueToGroupIdOp value_to_group_id_op, + int pivot, // group id less than pivot goes to the first partition + rmm::cuda_stream_view stream_view) +{ + auto num_elements = static_cast(thrust::distance(value_first, value_last)); + auto first_size = static_cast(thrust::count_if( + rmm::exec_policy(stream_view), + value_first, + value_last, + value_group_id_less_t::value_type, + ValueToGroupIdOp>{value_to_group_id_op, pivot})); + auto second_size = num_elements - first_size; + + auto tmp_buffer = + allocate_dataframe_buffer::value_type>( + second_size, stream_view); + + // to limit memory footprint (16 * 1024 * 1024 is a tuning parameter) + // thrust::copy_if (1.15.0) also uses temporary buffer + auto constexpr max_elements_per_iteration = size_t{16} * 1024 * 1024; + auto num_chunks = (num_elements + max_elements_per_iteration - 1) / max_elements_per_iteration; + auto output_chunk_first = get_dataframe_buffer_begin(tmp_buffer); + for (size_t i = 0; i < num_chunks; ++i) { + output_chunk_first = thrust::copy_if( + rmm::exec_policy(stream_view), + value_first + max_elements_per_iteration * i, + value_first + std::min(max_elements_per_iteration * (i + 1), num_elements), + output_chunk_first, + value_group_id_greater_equal_t::value_type, + ValueToGroupIdOp>{value_to_group_id_op, pivot}); + } + + thrust::remove_if( + rmm::exec_policy(stream_view), + value_first, + value_last, + value_group_id_greater_equal_t::value_type, + ValueToGroupIdOp>{value_to_group_id_op, pivot}); + thrust::copy(rmm::exec_policy(stream_view), + get_dataframe_buffer_cbegin(tmp_buffer), + get_dataframe_buffer_cend(tmp_buffer), + value_first + first_size); + + return value_first + first_size; +} + +// Use roughly half temporary buffer than thrust::partition (if first & second partition sizes are +// comparable). This also uses multiple smaller allocations than one single allocation (thrust::sort +// does this) of the same aggregate size if the input iterators are the zip iterators (this is more +// favorable to the pool allocator). +template +std::tuple mem_frugal_partition( + KeyIterator key_first, + KeyIterator key_last, + ValueIterator value_first, + KeyToGroupIdOp key_to_group_id_op, + int pivot, // group Id less than pivot goes to the first partition + rmm::cuda_stream_view stream_view) +{ + auto num_elements = static_cast(thrust::distance(key_first, key_last)); + auto first_size = static_cast(thrust::count_if( + rmm::exec_policy(stream_view), + key_first, + key_last, + key_group_id_less_t::value_type, KeyToGroupIdOp>{ + key_to_group_id_op, pivot})); + auto second_size = num_elements - first_size; + + auto tmp_key_buffer = + allocate_dataframe_buffer::value_type>( + second_size, stream_view); + auto tmp_value_buffer = + allocate_dataframe_buffer::value_type>( + second_size, stream_view); + + // to limit memory footprint (16 * 1024 * 1024 is a tuning parameter) + // thrust::copy_if (1.15.0) also uses temporary buffer + auto max_elements_per_iteration = size_t{16} * 1024 * 1024; + auto num_chunks = (num_elements + max_elements_per_iteration - 1) / max_elements_per_iteration; + auto kv_pair_first = thrust::make_zip_iterator(thrust::make_tuple(key_first, value_first)); + auto output_chunk_first = thrust::make_zip_iterator(thrust::make_tuple( + get_dataframe_buffer_begin(tmp_key_buffer), get_dataframe_buffer_begin(tmp_value_buffer))); + for (size_t i = 0; i < num_chunks; ++i) { + output_chunk_first = thrust::copy_if( + rmm::exec_policy(stream_view), + kv_pair_first + max_elements_per_iteration * i, + kv_pair_first + std::min(max_elements_per_iteration * (i + 1), num_elements), + output_chunk_first, + kv_pair_group_id_greater_equal_t::value_type, + typename thrust::iterator_traits::value_type, + KeyToGroupIdOp>{key_to_group_id_op, pivot}); + } + + thrust::remove_if( + rmm::exec_policy(stream_view), + kv_pair_first, + kv_pair_first + num_elements, + kv_pair_group_id_greater_equal_t::value_type, + typename thrust::iterator_traits::value_type, + KeyToGroupIdOp>{key_to_group_id_op, pivot}); + thrust::copy(rmm::exec_policy(stream_view), + get_dataframe_buffer_cbegin(tmp_key_buffer), + get_dataframe_buffer_cend(tmp_key_buffer), + key_first + first_size); + thrust::copy(rmm::exec_policy(stream_view), + get_dataframe_buffer_cbegin(tmp_value_buffer), + get_dataframe_buffer_cend(tmp_value_buffer), + value_first + first_size); + + return std::make_tuple(key_first + first_size, value_first + first_size); +} + +template +void mem_frugal_groupby( + ValueIterator value_first, + ValueIterator value_last, + ValueToGroupIdOp value_to_group_id_op, + int num_groups, + size_t mem_frugal_threshold, // take the memory frugal approach (instead of thrust::sort) if # + // elements to groupby is no smaller than this value + rmm::cuda_stream_view stream_view) +{ + std::vector group_firsts{}; + std::vector group_lasts{}; + std::vector value_firsts{}; + std::vector value_lasts{}; + if (num_groups > 1) { + group_firsts.push_back(int{0}); + group_lasts.push_back(num_groups); + value_firsts.push_back(value_first); + value_lasts.push_back(value_last); + } + + auto offset_first = size_t{0}; + auto offset_last = group_firsts.size(); + while (offset_first < offset_last) { + for (size_t i = offset_first; i < offset_last; ++i) { + auto pivot = (group_firsts[i] + group_lasts[i]) / 2; + if (static_cast(thrust::distance(value_firsts[i], value_lasts[i])) < + mem_frugal_threshold) { + if (group_lasts[i] - group_firsts[i] == 2) { + thrust::partition( + rmm::exec_policy(stream_view), + value_firsts[i], + value_lasts[i], + value_group_id_less_t::value_type, + ValueToGroupIdOp>{value_to_group_id_op, pivot}); + } else { + thrust::sort(rmm::exec_policy(stream_view), + value_firsts[i], + value_lasts[i], + [value_to_group_id_op] __device__(auto lhs, auto rhs) { + return value_to_group_id_op(lhs) < value_to_group_id_op(rhs); + }); + } + } else { + auto second_first = mem_frugal_partition( + value_firsts[i], value_lasts[i], value_to_group_id_op, pivot, stream_view); + if (pivot - group_firsts[i] > 1) { + group_firsts.push_back(group_firsts[i]); + group_lasts.push_back(pivot); + value_firsts.push_back(value_firsts[i]); + value_lasts.push_back(second_first); + } + if (group_lasts[i] - pivot > 1) { + group_firsts.push_back(pivot); + group_lasts.push_back(group_lasts[i]); + value_firsts.push_back(second_first); + value_lasts.push_back(value_lasts[i]); + } + } + } + offset_first = offset_last; + offset_last = group_firsts.size(); + } +} + +template +void mem_frugal_groupby( + KeyIterator key_first, + KeyIterator key_last, + ValueIterator value_first, + KeyToGroupIdOp key_to_group_id_op, + int num_groups, + size_t mem_frugal_threshold, // take the memory frugal approach (instead of thrust::sort) if # + // elements to groupby is no smaller than this value + rmm::cuda_stream_view stream_view) +{ + std::vector group_firsts{}; + std::vector group_lasts{}; + std::vector key_firsts{}; + std::vector key_lasts{}; + std::vector value_firsts{}; + if (num_groups > 1) { + group_firsts.push_back(int{0}); + group_lasts.push_back(num_groups); + key_firsts.push_back(key_first); + key_lasts.push_back(key_last); + value_firsts.push_back(value_first); + } + + auto offset_first = size_t{0}; + auto offset_last = group_firsts.size(); + while (offset_first < offset_last) { + for (size_t i = offset_first; i < offset_last; ++i) { + auto pivot = (group_firsts[i] + group_lasts[i]) / 2; + if (static_cast(thrust::distance(key_firsts[i], key_lasts[i])) < + mem_frugal_threshold) { + if (group_lasts[i] - group_firsts[i] == 2) { + auto kv_pair_first = + thrust::make_zip_iterator(thrust::make_tuple(key_firsts[i], value_firsts[i])); + thrust::partition( + rmm::exec_policy(stream_view), + kv_pair_first, + kv_pair_first + thrust::distance(key_firsts[i], key_lasts[i]), + kv_pair_group_id_less_t::value_type, + typename thrust::iterator_traits::value_type, + KeyToGroupIdOp>{key_to_group_id_op, pivot}); + } else { + thrust::sort_by_key(rmm::exec_policy(stream_view), + key_firsts[i], + key_lasts[i], + value_firsts[i], + [key_to_group_id_op] __device__(auto lhs, auto rhs) { + return key_to_group_id_op(lhs) < key_to_group_id_op(rhs); + }); + } + } else { + auto second_first = mem_frugal_partition( + key_firsts[i], key_lasts[i], value_firsts[i], key_to_group_id_op, pivot, stream_view); + if (pivot - group_firsts[i] > 1) { + group_firsts.push_back(group_firsts[i]); + group_lasts.push_back(pivot); + key_firsts.push_back(key_firsts[i]); + key_lasts.push_back(std::get<0>(second_first)); + value_firsts.push_back(value_firsts[i]); + } + if (group_lasts[i] - pivot > 1) { + group_firsts.push_back(pivot); + group_lasts.push_back(group_lasts[i]); + key_firsts.push_back(std::get<0>(second_first)); + key_lasts.push_back(key_lasts[i]); + value_firsts.push_back(std::get<1>(second_first)); + } + } + } + offset_first = offset_last; + offset_last = group_firsts.size(); + } +} + } // namespace detail -template +template rmm::device_uvector groupby_and_count(ValueIterator tx_value_first /* [INOUT */, ValueIterator tx_value_last /* [INOUT */, - ValueToGPUIdOp value_to_group_id_op, + ValueToGroupIdOp value_to_group_id_op, int num_groups, + size_t mem_frugal_threshold, rmm::cuda_stream_view stream_view) { - thrust::sort(rmm::exec_policy(stream_view), - tx_value_first, - tx_value_last, - [value_to_group_id_op] __device__(auto lhs, auto rhs) { - return value_to_group_id_op(lhs) < value_to_group_id_op(rhs); - }); + detail::mem_frugal_groupby(tx_value_first, + tx_value_last, + value_to_group_id_op, + num_groups, + mem_frugal_threshold, + stream_view); auto group_id_first = thrust::make_transform_iterator( tx_value_first, @@ -158,21 +459,22 @@ rmm::device_uvector groupby_and_count(ValueIterator tx_value_first /* [I return d_tx_value_counts; } -template +template rmm::device_uvector groupby_and_count(VertexIterator tx_key_first /* [INOUT */, VertexIterator tx_key_last /* [INOUT */, ValueIterator tx_value_first /* [INOUT */, - KeyToGPUIdOp key_to_group_id_op, + KeyToGroupIdOp key_to_group_id_op, int num_groups, + size_t mem_frugal_threshold, rmm::cuda_stream_view stream_view) { - thrust::sort_by_key(rmm::exec_policy(stream_view), - tx_key_first, - tx_key_last, - tx_value_first, - [key_to_group_id_op] __device__(auto lhs, auto rhs) { - return key_to_group_id_op(lhs) < key_to_group_id_op(rhs); - }); + detail::mem_frugal_groupby(tx_key_first, + tx_key_last, + tx_value_first, + key_to_group_id_op, + num_groups, + mem_frugal_threshold, + stream_view); auto group_id_first = thrust::make_transform_iterator( tx_key_first, [key_to_group_id_op] __device__(auto key) { return key_to_group_id_op(key); }); @@ -211,7 +513,7 @@ auto shuffle_values(raft::comms::comms_t const& comm, detail::compute_tx_rx_counts_offsets_ranks(comm, d_tx_value_counts, stream_view); auto rx_value_buffer = - allocate_dataframe_buffer::value_type>( + allocate_dataframe_buffer::value_type>( rx_offsets.size() > 0 ? rx_offsets.back() + rx_counts.back() : size_t{0}, stream_view); // FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released @@ -240,16 +542,20 @@ auto shuffle_values(raft::comms::comms_t const& comm, } template -auto groupby_gpuid_and_shuffle_values(raft::comms::comms_t const& comm, - ValueIterator tx_value_first /* [INOUT */, - ValueIterator tx_value_last /* [INOUT */, - ValueToGPUIdOp value_to_gpu_id_op, - rmm::cuda_stream_view stream_view) +auto groupby_gpu_id_and_shuffle_values(raft::comms::comms_t const& comm, + ValueIterator tx_value_first /* [INOUT */, + ValueIterator tx_value_last /* [INOUT */, + ValueToGPUIdOp value_to_gpu_id_op, + rmm::cuda_stream_view stream_view) { auto const comm_size = comm.get_size(); - auto d_tx_value_counts = groupby_and_count( - tx_value_first, tx_value_last, value_to_gpu_id_op, comm.get_size(), stream_view); + auto d_tx_value_counts = groupby_and_count(tx_value_first, + tx_value_last, + value_to_gpu_id_op, + comm.get_size(), + std::numeric_limits::max(), + stream_view); std::vector tx_counts{}; std::vector tx_offsets{}; @@ -261,7 +567,7 @@ auto groupby_gpuid_and_shuffle_values(raft::comms::comms_t const& comm, detail::compute_tx_rx_counts_offsets_ranks(comm, d_tx_value_counts, stream_view); auto rx_value_buffer = - allocate_dataframe_buffer::value_type>( + allocate_dataframe_buffer::value_type>( rx_offsets.size() > 0 ? rx_offsets.back() + rx_counts.back() : size_t{0}, stream_view); // FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released @@ -289,17 +595,22 @@ auto groupby_gpuid_and_shuffle_values(raft::comms::comms_t const& comm, } template -auto groupby_gpuid_and_shuffle_kv_pairs(raft::comms::comms_t const& comm, - VertexIterator tx_key_first /* [INOUT */, - VertexIterator tx_key_last /* [INOUT */, - ValueIterator tx_value_first /* [INOUT */, - KeyToGPUIdOp key_to_gpu_id_op, - rmm::cuda_stream_view stream_view) +auto groupby_gpu_id_and_shuffle_kv_pairs(raft::comms::comms_t const& comm, + VertexIterator tx_key_first /* [INOUT */, + VertexIterator tx_key_last /* [INOUT */, + ValueIterator tx_value_first /* [INOUT */, + KeyToGPUIdOp key_to_gpu_id_op, + rmm::cuda_stream_view stream_view) { auto const comm_size = comm.get_size(); - auto d_tx_value_counts = groupby_and_count( - tx_key_first, tx_key_last, tx_value_first, key_to_gpu_id_op, comm.get_size(), stream_view); + auto d_tx_value_counts = groupby_and_count(tx_key_first, + tx_key_last, + tx_value_first, + key_to_gpu_id_op, + comm.get_size(), + std::numeric_limits::max(), + stream_view); std::vector tx_counts{}; std::vector tx_offsets{}; @@ -310,10 +621,10 @@ auto groupby_gpuid_and_shuffle_kv_pairs(raft::comms::comms_t const& comm, std::tie(tx_counts, tx_offsets, tx_dst_ranks, rx_counts, rx_offsets, rx_src_ranks) = detail::compute_tx_rx_counts_offsets_ranks(comm, d_tx_value_counts, stream_view); - rmm::device_uvector::value_type> rx_keys( + rmm::device_uvector::value_type> rx_keys( rx_offsets.size() > 0 ? rx_offsets.back() + rx_counts.back() : size_t{0}, stream_view); auto rx_value_buffer = - allocate_dataframe_buffer::value_type>( + allocate_dataframe_buffer::value_type>( rx_keys.size(), stream_view); // FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released diff --git a/cpp/src/community/louvain.cuh b/cpp/src/community/louvain.cuh index 025c520abf5..094f3bc6546 100644 --- a/cpp/src/community/louvain.cuh +++ b/cpp/src/community/louvain.cuh @@ -308,7 +308,7 @@ class Louvain { thrust::make_tuple(cluster_keys_v_.begin(), cluster_weights_v_.begin())); std::forward_as_tuple(std::tie(rx_keys_v, rx_weights_v), std::ignore) = - groupby_gpuid_and_shuffle_values( + groupby_gpu_id_and_shuffle_values( handle_.get_comms(), pair_first, pair_first + current_graph_view_.get_number_of_local_vertices(), diff --git a/cpp/src/components/weakly_connected_components_impl.cuh b/cpp/src/components/weakly_connected_components_impl.cuh index 21e9571fbb2..a1f663a301c 100644 --- a/cpp/src/components/weakly_connected_components_impl.cuh +++ b/cpp/src/components/weakly_connected_components_impl.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -371,17 +371,17 @@ void weakly_connected_components_impl(raft::handle_t const& handle, // with fewer than one root per GPU if (std::reduce(first_candidate_degrees.begin(), first_candidate_degrees.end()) > degree_sum_threshold * comm_size) { - std::vector> degree_gpuid_pairs(comm_size); + std::vector> degree_gpu_id_pairs(comm_size); for (int i = 0; i < comm_size; ++i) { - degree_gpuid_pairs[i] = std::make_tuple(first_candidate_degrees[i], i); + degree_gpu_id_pairs[i] = std::make_tuple(first_candidate_degrees[i], i); } - std::sort(degree_gpuid_pairs.begin(), degree_gpuid_pairs.end(), [](auto lhs, auto rhs) { + std::sort(degree_gpu_id_pairs.begin(), degree_gpu_id_pairs.end(), [](auto lhs, auto rhs) { return std::get<0>(lhs) > std::get<0>(rhs); }); edge_t sum{0}; - for (size_t i = 0; i < degree_gpuid_pairs.size(); ++i) { - sum += std::get<0>(degree_gpuid_pairs[i]); - init_max_new_root_counts[std::get<1>(degree_gpuid_pairs[i])] = 1; + for (size_t i = 0; i < degree_gpu_id_pairs.size(); ++i) { + sum += std::get<0>(degree_gpu_id_pairs[i]); + init_max_new_root_counts[std::get<1>(degree_gpu_id_pairs[i])] = 1; if (sum > degree_sum_threshold * comm_size) { break; } } } @@ -390,18 +390,18 @@ void weakly_connected_components_impl(raft::handle_t const& handle, else if (level_graph_view.get_number_of_vertices() <= static_cast(handle.get_comms().get_size() * ceil(1.0 / max_new_roots_ratio))) { - std::vector gpuids{}; - gpuids.reserve( + std::vector gpu_ids{}; + gpu_ids.reserve( std::reduce(new_root_candidate_counts.begin(), new_root_candidate_counts.end())); for (size_t i = 0; i < new_root_candidate_counts.size(); ++i) { - gpuids.insert(gpuids.end(), new_root_candidate_counts[i], static_cast(i)); + gpu_ids.insert(gpu_ids.end(), new_root_candidate_counts[i], static_cast(i)); } std::random_device rd{}; - std::shuffle(gpuids.begin(), gpuids.end(), std::mt19937(rd())); - gpuids.resize( - std::max(static_cast(gpuids.size() * max_new_roots_ratio), vertex_t{1})); - for (size_t i = 0; i < gpuids.size(); ++i) { - ++init_max_new_root_counts[gpuids[i]]; + std::shuffle(gpu_ids.begin(), gpu_ids.end(), std::mt19937(rd())); + gpu_ids.resize( + std::max(static_cast(gpu_ids.size() * max_new_roots_ratio), vertex_t{1})); + for (size_t i = 0; i < gpu_ids.size(); ++i) { + ++init_max_new_root_counts[gpu_ids[i]]; } } else { std::fill(init_max_new_root_counts.begin(), @@ -678,7 +678,7 @@ void weakly_connected_components_impl(raft::handle_t const& handle, auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); auto const col_comm_size = col_comm.get_size(); - std::tie(edge_buffer, std::ignore) = cugraph::groupby_gpuid_and_shuffle_values( + std::tie(edge_buffer, std::ignore) = cugraph::groupby_gpu_id_and_shuffle_values( comm, get_dataframe_buffer_begin(edge_buffer), get_dataframe_buffer_end(edge_buffer), diff --git a/cpp/src/detail/shuffle_wrappers.cu b/cpp/src/detail/shuffle_wrappers.cu index a9fa67c769f..6e9434882ba 100644 --- a/cpp/src/detail/shuffle_wrappers.cu +++ b/cpp/src/detail/shuffle_wrappers.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,6 +41,23 @@ shuffle_edgelist_by_gpu_id(raft::handle_t const& handle, auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); auto const col_comm_size = col_comm.get_size(); + auto total_global_mem = handle.get_device_properties().totalGlobalMem; + auto element_size = sizeof(vertex_t) * 2 + (d_edgelist_weights ? sizeof(weight_t) : size_t{0}); + auto constexpr mem_frugal_ratio = + 0.1; // if the expected temporary buffer size exceeds the mem_frugal_ratio of the + // total_global_mem, switch to the memory frugal approach (thrust::sort is used to + // group-by by default, and thrust::sort requires temporary buffer comparable to the input + // data size) + auto mem_frugal_threshold = + static_cast(static_cast(total_global_mem / element_size) * mem_frugal_ratio); + + // invoke groupby_and_count and shuffle values to pass mem_frugal_threshold instead of directly + // calling groupby_gpu_id_and_shuffle_values there is no benefit in reducing peak memory as we + // need to allocate a receive buffer anyways) but this reduces the maximum memory allocation size + // by half or more (thrust::sort used inside the groupby_and_count allocates the entire temporary + // buffer in a single chunk, and the pool allocator often cannot handle a large single allocation + // (due to fragmentation) even when the remaining free memory in aggregate is significantly larger + // than the requested size). rmm::device_uvector d_rx_edgelist_majors(0, handle.get_stream()); rmm::device_uvector d_rx_edgelist_minors(0, handle.get_stream()); std::optional> d_rx_edgelist_weights{std::nullopt}; @@ -48,35 +65,53 @@ shuffle_edgelist_by_gpu_id(raft::handle_t const& handle, auto edge_first = thrust::make_zip_iterator(thrust::make_tuple( d_edgelist_majors.begin(), d_edgelist_minors.begin(), (*d_edgelist_weights).begin())); + auto d_tx_value_counts = cugraph::groupby_and_count( + edge_first, + edge_first + d_edgelist_majors.size(), + [key_func = + cugraph::detail::compute_gpu_id_from_edge_t{ + comm_size, row_comm_size, col_comm_size}] __device__(auto val) { + return key_func(thrust::get<0>(val), thrust::get<1>(val)); + }, + comm_size, + mem_frugal_threshold, + handle.get_stream()); + + std::vector h_tx_value_counts(d_tx_value_counts.size()); + raft::update_host(h_tx_value_counts.data(), + d_tx_value_counts.data(), + d_tx_value_counts.size(), + handle.get_stream()); + handle.sync_stream(); + std::forward_as_tuple( - std::tie(d_rx_edgelist_majors, d_rx_edgelist_minors, d_rx_edgelist_weights), - std::ignore) = - cugraph::groupby_gpuid_and_shuffle_values( - comm, // handle.get_comms(), - edge_first, - edge_first + d_edgelist_majors.size(), - [key_func = - cugraph::detail::compute_gpu_id_from_edge_t{ - comm_size, row_comm_size, col_comm_size}] __device__(auto val) { - return key_func(thrust::get<0>(val), thrust::get<1>(val)); - }, - handle.get_stream()); + std::tie(d_rx_edgelist_majors, d_rx_edgelist_minors, d_rx_edgelist_weights), std::ignore) = + shuffle_values(comm, edge_first, h_tx_value_counts, handle.get_stream()); } else { auto edge_first = thrust::make_zip_iterator( thrust::make_tuple(d_edgelist_majors.begin(), d_edgelist_minors.begin())); - std::forward_as_tuple(std::tie(d_rx_edgelist_majors, d_rx_edgelist_minors), - std::ignore) = - cugraph::groupby_gpuid_and_shuffle_values( - comm, // handle.get_comms(), - edge_first, - edge_first + d_edgelist_majors.size(), - [key_func = - cugraph::detail::compute_gpu_id_from_edge_t{ - comm_size, row_comm_size, col_comm_size}] __device__(auto val) { - return key_func(thrust::get<0>(val), thrust::get<1>(val)); - }, - handle.get_stream()); + auto d_tx_value_counts = cugraph::groupby_and_count( + edge_first, + edge_first + d_edgelist_majors.size(), + [key_func = + cugraph::detail::compute_gpu_id_from_edge_t{ + comm_size, row_comm_size, col_comm_size}] __device__(auto val) { + return key_func(thrust::get<0>(val), thrust::get<1>(val)); + }, + comm_size, + mem_frugal_threshold, + handle.get_stream()); + + std::vector h_tx_value_counts(d_tx_value_counts.size()); + raft::update_host(h_tx_value_counts.data(), + d_tx_value_counts.data(), + d_tx_value_counts.size(), + handle.get_stream()); + handle.sync_stream(); + + std::forward_as_tuple(std::tie(d_rx_edgelist_majors, d_rx_edgelist_minors), std::ignore) = + shuffle_values(comm, edge_first, h_tx_value_counts, handle.get_stream()); } return std::make_tuple(std::move(d_rx_edgelist_majors), @@ -124,7 +159,7 @@ rmm::device_uvector shuffle_vertices_by_gpu_id(raft::handle_t const& h auto const comm_size = comm.get_size(); rmm::device_uvector d_rx_vertices(0, handle.get_stream()); - std::tie(d_rx_vertices, std::ignore) = cugraph::groupby_gpuid_and_shuffle_values( + std::tie(d_rx_vertices, std::ignore) = cugraph::groupby_gpu_id_and_shuffle_values( comm, // handle.get_comms(), d_vertices.begin(), d_vertices.end(), @@ -147,7 +182,7 @@ rmm::device_uvector groupby_and_count_edgelist_by_local_partition_id( rmm::device_uvector& d_edgelist_majors, rmm::device_uvector& d_edgelist_minors, std::optional>& d_edgelist_weights, - bool groupby_and_count_local_partition) + bool groupby_and_count_local_partition_by_minor) { auto& comm = handle.get_comms(); auto const comm_size = comm.get_size(); @@ -159,10 +194,20 @@ rmm::device_uvector groupby_and_count_edgelist_by_local_partition_id( auto const col_comm_size = col_comm.get_size(); auto const col_comm_rank = col_comm.get_rank(); + auto total_global_mem = handle.get_device_properties().totalGlobalMem; + auto element_size = sizeof(vertex_t) * 2 + (d_edgelist_weights ? sizeof(weight_t) : size_t{0}); + auto constexpr mem_frugal_ratio = + 0.1; // if the expected temporary buffer size exceeds the mem_frugal_ratio of the + // total_global_mem, switch to the memory frugal approach (thrust::sort is used to + // group-by by default, and thrust::sort requires temporary buffer comparable to the input + // data size) + auto mem_frugal_threshold = + static_cast(static_cast(total_global_mem / element_size) * mem_frugal_ratio); + auto pair_first = thrust::make_zip_iterator( thrust::make_tuple(d_edgelist_majors.begin(), d_edgelist_minors.begin())); - if (groupby_and_count_local_partition) { + if (groupby_and_count_local_partition_by_minor) { auto local_partition_id_gpu_id_pair_op = [comm_size, row_comm_size, @@ -183,11 +228,13 @@ rmm::device_uvector groupby_and_count_edgelist_by_local_partition_id( d_edgelist_weights->begin(), local_partition_id_gpu_id_pair_op, comm_size, + mem_frugal_threshold, handle.get_stream()) : cugraph::groupby_and_count(pair_first, pair_first + d_edgelist_majors.size(), local_partition_id_gpu_id_pair_op, comm_size, + mem_frugal_threshold, handle.get_stream()); } else { auto local_partition_id_op = @@ -203,11 +250,13 @@ rmm::device_uvector groupby_and_count_edgelist_by_local_partition_id( d_edgelist_weights->begin(), local_partition_id_op, col_comm_size, + mem_frugal_threshold, handle.get_stream()) : cugraph::groupby_and_count(pair_first, pair_first + d_edgelist_majors.size(), local_partition_id_op, col_comm_size, + mem_frugal_threshold, handle.get_stream()); } } diff --git a/cpp/src/structure/coarsen_graph_impl.cuh b/cpp/src/structure/coarsen_graph_impl.cuh index b0f6c7eca05..6234acf5559 100644 --- a/cpp/src/structure/coarsen_graph_impl.cuh +++ b/cpp/src/structure/coarsen_graph_impl.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -267,7 +267,7 @@ coarsen_graph( // 1-3. append data to local adjacency matrix partitions - // FIXME: we can skip this if groupby_gpuid_and_shuffle_values is updated to return sorted edge + // FIXME: we can skip this if groupby_gpu_id_and_shuffle_values is updated to return sorted edge // list based on the final matrix partition (maybe add // groupby_adj_matrix_partition_and_shuffle_values). @@ -421,9 +421,7 @@ coarsen_graph( meta.number_of_edges, graph_properties_t{graph_view.is_symmetric(), false}, meta.partition, - meta.segment_offsets, - store_transposed ? meta.num_local_unique_edge_minors : meta.num_local_unique_edge_majors, - store_transposed ? meta.num_local_unique_edge_majors : meta.num_local_unique_edge_minors}), + meta.segment_offsets}), std::move(renumber_map_labels)); } diff --git a/cpp/src/structure/create_graph_from_edgelist_impl.cuh b/cpp/src/structure/create_graph_from_edgelist_impl.cuh index f05f5f957c6..ea12a3562ba 100644 --- a/cpp/src/structure/create_graph_from_edgelist_impl.cuh +++ b/cpp/src/structure/create_graph_from_edgelist_impl.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -283,14 +283,11 @@ create_graph_from_edgelist_impl(raft::handle_t const& handle, cugraph::graph_t( handle, edgelists, - cugraph::graph_meta_t{ - meta.number_of_vertices, - meta.number_of_edges, - graph_properties, - meta.partition, - meta.segment_offsets, - store_transposed ? meta.num_local_unique_edge_minors : meta.num_local_unique_edge_majors, - store_transposed ? meta.num_local_unique_edge_majors : meta.num_local_unique_edge_minors}), + cugraph::graph_meta_t{meta.number_of_vertices, + meta.number_of_edges, + graph_properties, + meta.partition, + meta.segment_offsets}), std::optional>{std::move(renumber_map_labels)}); } diff --git a/cpp/src/structure/graph_impl.cuh b/cpp/src/structure/graph_impl.cuh index 72748fc2e61..eff76df8a79 100644 --- a/cpp/src/structure/graph_impl.cuh +++ b/cpp/src/structure/graph_impl.cuh @@ -80,6 +80,31 @@ struct has_nzd_t { } }; +// can't use lambda due to nvcc limitations (The enclosing parent function ("graph_t") for an +// extended __device__ lambda must allow its address to be taken) +template +struct atomic_or_bitmap_t { + uint32_t* bitmaps{nullptr}; + vertex_t minor_first{}; + + __device__ void operator()(vertex_t minor) const + { + auto minor_offset = minor - minor_first; + auto mask = uint32_t{1} << (minor_offset % (sizeof(uint32_t) * 8)); + atomicOr(bitmaps + (minor_offset / (sizeof(uint32_t) * 8)), mask); + } +}; + +// can't use lambda due to nvcc limitations (The enclosing parent function ("graph_t") for an +// extended __device__ lambda must allow its address to be taken) +template +struct popc_t { + __device__ vertex_t operator()(uint32_t bitmap) const + { + return static_cast(__popc(bitmap)); + } +}; + // can't use lambda due to nvcc limitations (The enclosing parent function ("graph_t") for an // extended __device__ lambda must allow its address to be taken) template @@ -573,48 +598,6 @@ graph_t majors(number_of_local_edges, handle.get_stream()); - rmm::device_uvector minors(number_of_local_edges, handle.get_stream()); - size_t cur_size{0}; - for (size_t i = 0; i < edgelists.size(); ++i) { - auto p_majors = store_transposed ? edgelists[i].p_dst_vertices : edgelists[i].p_src_vertices; - auto p_minors = store_transposed ? edgelists[i].p_src_vertices : edgelists[i].p_dst_vertices; - thrust::copy(handle.get_thrust_policy(), - p_majors, - p_majors + edgelists[i].number_of_edges, - majors.begin() + cur_size); - thrust::copy(handle.get_thrust_policy(), - p_minors, - p_minors + edgelists[i].number_of_edges, - minors.begin() + cur_size); - cur_size += edgelists[i].number_of_edges; - } - thrust::sort(handle.get_thrust_policy(), majors.begin(), majors.end()); - thrust::sort(handle.get_thrust_policy(), minors.begin(), minors.end()); - auto num_local_unique_edge_majors = static_cast(thrust::distance( - majors.begin(), thrust::unique(handle.get_thrust_policy(), majors.begin(), majors.end()))); - auto num_local_unique_edge_minors = static_cast(thrust::distance( - minors.begin(), thrust::unique(handle.get_thrust_policy(), minors.begin(), minors.end()))); - // FIXME: temporarily disable this check as these are currently not used - // (row_col_properties_kv_pair_fill_ratio_threshold is set to 0.0, so (key, value) pairs for - // row/column properties will be never enabled) and we're not currently exposing this to the - // python layer. Should be re-enabled later once we enable the (key, value) pair feature and - // hopefully simplify the python graph creation pipeline as well (so no need to pass this - // information to the python layer). -#if 0 - if constexpr (store_transposed) { - CUGRAPH_EXPECTS(num_local_unique_edge_majors == meta.num_local_unique_edge_cols, - "Invalid input argument: num_local_unique_edge_cols is erroneous."); - CUGRAPH_EXPECTS(num_local_unique_edge_minors == meta.num_local_unique_edge_rows, - "Invalid input argument: num_local_unique_edge_rows is erroneous."); - } else { - CUGRAPH_EXPECTS(num_local_unique_edge_majors == meta.num_local_unique_edge_rows, - "Invalid input argument: num_local_unique_edge_rows is erroneous."); - CUGRAPH_EXPECTS(num_local_unique_edge_minors == meta.num_local_unique_edge_cols, - "Invalid input argument: num_local_unique_edge_cols is erroneous."); - } -#endif } // aggregate segment_offsets @@ -704,10 +687,34 @@ graph_t(adj_matrix_partition_offsets_[i].size() - 1)), + has_nzd_t{adj_matrix_partition_offsets_[i].data(), vertex_t{0}}); + } + + auto [minor_first, minor_last] = partition_.get_matrix_partition_minor_range(); + rmm::device_uvector minor_bitmaps( + ((minor_last - minor_first) + sizeof(uint32_t) * 8 - 1) / (sizeof(uint32_t) * 8), + handle.get_stream()); + thrust::fill(handle.get_thrust_policy(), minor_bitmaps.begin(), minor_bitmaps.end(), uint32_t{0}); + for (size_t i = 0; i < adj_matrix_partition_indices_.size(); ++i) { + thrust::for_each(handle.get_thrust_policy(), + adj_matrix_partition_indices_[i].begin(), + adj_matrix_partition_indices_[i].end(), + atomic_or_bitmap_t{minor_bitmaps.data(), minor_first}); + } + + auto count_first = thrust::make_transform_iterator(minor_bitmaps.begin(), popc_t{}); + auto num_local_unique_edge_minors = thrust::reduce( + handle.get_thrust_policy(), count_first, count_first + minor_bitmaps.size(), vertex_t{0}); + + minor_bitmaps.resize(0, handle.get_stream()); + minor_bitmaps.shrink_to_fit(handle.get_stream()); vertex_t aggregate_major_size{0}; for (size_t i = 0; i < partition_.get_number_of_matrix_partitions(); ++i) { diff --git a/cpp/src/structure/relabel_impl.cuh b/cpp/src/structure/relabel_impl.cuh index d709152f71c..4ace52e351a 100644 --- a/cpp/src/structure/relabel_impl.cuh +++ b/cpp/src/structure/relabel_impl.cuh @@ -95,7 +95,7 @@ void relabel(raft::handle_t const& handle, thrust::make_tuple(label_pair_old_labels.begin(), label_pair_new_labels.begin())); std::forward_as_tuple(std::tie(rx_label_pair_old_labels, rx_label_pair_new_labels), std::ignore) = - groupby_gpuid_and_shuffle_values( + groupby_gpu_id_and_shuffle_values( handle.get_comms(), pair_first, pair_first + num_label_pairs, @@ -136,7 +136,7 @@ void relabel(raft::handle_t const& handle, { rmm::device_uvector rx_unique_old_labels(0, handle.get_stream()); std::vector rx_value_counts{}; - std::tie(rx_unique_old_labels, rx_value_counts) = groupby_gpuid_and_shuffle_values( + std::tie(rx_unique_old_labels, rx_value_counts) = groupby_gpu_id_and_shuffle_values( handle.get_comms(), unique_old_labels.begin(), unique_old_labels.end(), diff --git a/cpp/src/structure/renumber_edgelist_impl.cuh b/cpp/src/structure/renumber_edgelist_impl.cuh index 959d11b783f..aeb7682f440 100644 --- a/cpp/src/structure/renumber_edgelist_impl.cuh +++ b/cpp/src/structure/renumber_edgelist_impl.cuh @@ -77,23 +77,21 @@ struct search_and_set_degree_t { } }; -// returns renumber map, segment_offsets, and # unique edge majors & minors +// returns renumber map and segment_offsets template -std::tuple, std::vector, vertex_t, vertex_t> -compute_renumber_map(raft::handle_t const& handle, - std::optional>&& local_vertices, - std::vector const& edgelist_majors, - std::vector const& edgelist_minors, - std::vector const& edgelist_edge_counts) +std::tuple, std::vector> compute_renumber_map( + raft::handle_t const& handle, + std::optional>&& local_vertices, + std::vector const& edgelist_majors, + std::vector const& edgelist_minors, + std::vector const& edgelist_edge_counts) { rmm::device_uvector sorted_local_vertices(0, handle.get_stream()); - vertex_t num_local_unique_edge_majors{0}; - vertex_t num_local_unique_edge_minors{0}; edge_t num_local_edges = std::reduce(edgelist_edge_counts.begin(), edgelist_edge_counts.end()); // 1. if local_vertices.has_value() is false, find unique vertices from edge majors (to construct - // local_vertices), unique edge majors will be counted in step 4. + // local_vertices) rmm::device_uvector sorted_unique_majors(0, handle.get_stream()); if (!local_vertices) { @@ -127,42 +125,40 @@ compute_renumber_map(raft::handle_t const& handle, sorted_unique_majors.shrink_to_fit(handle.get_stream()); } - // 2. count unique edge minors. - // if local_vertices.has_value() is false, keep unique vertices from edge minors as well (to - // construct local_vertices) + // 2. if local_vertices.has_value() is false, find unique vertices from edge minors (to construct + // local_vertices) - rmm::device_uvector sorted_unique_minors(num_local_edges, handle.get_stream()); - size_t minor_offset{0}; - for (size_t i = 0; i < edgelist_minors.size(); ++i) { - thrust::copy(handle.get_thrust_policy(), - edgelist_minors[i], - edgelist_minors[i] + edgelist_edge_counts[i], - sorted_unique_minors.begin() + minor_offset); - thrust::sort(handle.get_thrust_policy(), - sorted_unique_minors.begin() + minor_offset, - sorted_unique_minors.begin() + minor_offset + edgelist_edge_counts[i]); - minor_offset += static_cast(thrust::distance( - sorted_unique_minors.begin() + minor_offset, - thrust::unique(handle.get_thrust_policy(), - sorted_unique_minors.begin() + minor_offset, - sorted_unique_minors.begin() + minor_offset + edgelist_edge_counts[i]))); - } - sorted_unique_minors.resize(minor_offset, handle.get_stream()); - if (edgelist_minors.size() > 1) { - thrust::sort( - handle.get_thrust_policy(), sorted_unique_minors.begin(), sorted_unique_minors.end()); - sorted_unique_minors.resize(thrust::distance(sorted_unique_minors.begin(), - thrust::unique(handle.get_thrust_policy(), - sorted_unique_minors.begin(), - sorted_unique_minors.end())), - handle.get_stream()); + rmm::device_uvector sorted_unique_minors(0, handle.get_stream()); + if (!local_vertices) { + sorted_unique_minors.resize(num_local_edges, handle.get_stream()); + size_t minor_offset{0}; + for (size_t i = 0; i < edgelist_minors.size(); ++i) { + thrust::copy(handle.get_thrust_policy(), + edgelist_minors[i], + edgelist_minors[i] + edgelist_edge_counts[i], + sorted_unique_minors.begin() + minor_offset); + thrust::sort(handle.get_thrust_policy(), + sorted_unique_minors.begin() + minor_offset, + sorted_unique_minors.begin() + minor_offset + edgelist_edge_counts[i]); + minor_offset += static_cast(thrust::distance( + sorted_unique_minors.begin() + minor_offset, + thrust::unique(handle.get_thrust_policy(), + sorted_unique_minors.begin() + minor_offset, + sorted_unique_minors.begin() + minor_offset + edgelist_edge_counts[i]))); + } + sorted_unique_minors.resize(minor_offset, handle.get_stream()); + if (edgelist_minors.size() > 1) { + thrust::sort( + handle.get_thrust_policy(), sorted_unique_minors.begin(), sorted_unique_minors.end()); + sorted_unique_minors.resize(thrust::distance(sorted_unique_minors.begin(), + thrust::unique(handle.get_thrust_policy(), + sorted_unique_minors.begin(), + sorted_unique_minors.end())), + handle.get_stream()); + } + sorted_unique_minors.shrink_to_fit(handle.get_stream()); } - num_local_unique_edge_minors = static_cast(sorted_unique_minors.size()); - - if (local_vertices) { sorted_unique_minors.resize(0, handle.get_stream()); } - sorted_unique_minors.shrink_to_fit(handle.get_stream()); - // 3. update sorted_local_vertices. // if local_vertices.has_value() is false, reconstruct local_vertices first @@ -207,8 +203,7 @@ compute_renumber_map(raft::handle_t const& handle, } } - // 4. compute global degrees for the sorted local vertices, and count unique edge majors on the - // way + // 4. compute global degrees for the sorted local vertices rmm::device_uvector sorted_local_vertex_degrees(0, handle.get_stream()); std::optional> stream_pool_indices{ @@ -280,8 +275,6 @@ compute_renumber_map(raft::handle_t const& handle, tmp_keys.begin(), tmp_values.begin()); - num_local_unique_edge_majors += num_unique_majors; - tmp_majors.resize(0, loop_stream); tmp_majors.shrink_to_fit(loop_stream); @@ -342,8 +335,6 @@ compute_renumber_map(raft::handle_t const& handle, tmp_keys.begin(), tmp_values.begin()); - num_local_unique_edge_majors += num_unique_majors; - tmp_majors.resize(0, handle.get_stream()); tmp_majors.shrink_to_fit(handle.get_stream()); @@ -426,10 +417,7 @@ compute_renumber_map(raft::handle_t const& handle, handle.get_stream()); handle.sync_stream(); - return std::make_tuple(std::move(sorted_local_vertices), - h_segment_offsets, - num_local_unique_edge_majors, - num_local_unique_edge_minors); + return std::make_tuple(std::move(sorted_local_vertices), h_segment_offsets); } template @@ -682,10 +670,7 @@ renumber_edgelist( // 1. compute renumber map - auto [renumber_map_labels, - vertex_partition_segment_offsets, - num_unique_edge_majors, - num_unique_edge_minors] = + auto [renumber_map_labels, vertex_partition_segment_offsets] = detail::compute_renumber_map(handle, std::move(local_vertices), edgelist_const_majors, @@ -765,7 +750,8 @@ renumber_edgelist( } } - if ((partition.get_matrix_partition_minor_size() >= number_of_edges / comm_size) && + if ((static_cast(partition.get_matrix_partition_minor_size() / load_factor) >= + static_cast(number_of_edges / comm_size)) && edgelist_intra_partition_segment_offsets) { // memory footprint dominated by the O(V/sqrt(P)) // part than the O(E/P) part vertex_t max_segment_size{0}; @@ -863,12 +849,8 @@ renumber_edgelist( return std::make_tuple( std::move(renumber_map_labels), - renumber_meta_t{number_of_vertices, - number_of_edges, - partition, - vertex_partition_segment_offsets, - num_unique_edge_majors, - num_unique_edge_minors}); + renumber_meta_t{ + number_of_vertices, number_of_edges, partition, vertex_partition_segment_offsets}); } template @@ -894,7 +876,7 @@ renumber_edgelist(raft::handle_t const& handle, rmm::device_uvector renumber_map_labels(0, handle.get_stream()); std::vector segment_offsets{}; - std::tie(renumber_map_labels, segment_offsets, std::ignore, std::ignore) = + std::tie(renumber_map_labels, segment_offsets) = detail::compute_renumber_map( handle, std::move(vertices), diff --git a/cpp/src/utilities/cython.cu b/cpp/src/utilities/cython.cu index 35a6be4edc3..1527ae90afd 100644 --- a/cpp/src/utilities/cython.cu +++ b/cpp/src/utilities/cython.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -1182,7 +1182,7 @@ template std::unique_ptr> call_shuffle( raft::handle_t const& handle, vertex_t* - edgelist_major_vertices, // [IN / OUT]: groupby_gpuid_and_shuffle_values() sorts in-place + edgelist_major_vertices, // [IN / OUT]: groupby_gpu_id_and_shuffle_values() sorts in-place vertex_t* edgelist_minor_vertices, // [IN / OUT] weight_t* edgelist_weights, // [IN / OUT] edge_t num_edgelist_edges) @@ -1204,7 +1204,7 @@ std::unique_ptr> call_shuffle( std::forward_as_tuple( std::tie(ptr_ret->get_major(), ptr_ret->get_minor(), ptr_ret->get_weights()), std::ignore) = - cugraph::groupby_gpuid_and_shuffle_values( + cugraph::groupby_gpu_id_and_shuffle_values( comm, // handle.get_comms(), zip_edge, zip_edge + num_edgelist_edges, @@ -1220,7 +1220,7 @@ std::unique_ptr> call_shuffle( std::forward_as_tuple(std::tie(ptr_ret->get_major(), ptr_ret->get_minor()), std::ignore) = - cugraph::groupby_gpuid_and_shuffle_values( + cugraph::groupby_gpu_id_and_shuffle_values( comm, // handle.get_comms(), zip_edge, zip_edge + num_edgelist_edges, @@ -1248,11 +1248,13 @@ std::unique_ptr> call_shuffle( ptr_ret->get_weights().data(), local_partition_id_op, col_comm_size, + false, handle.get_stream()) : cugraph::groupby_and_count(pair_first, pair_first + ptr_ret->get_major().size(), local_partition_id_op, col_comm_size, + false, handle.get_stream()); std::vector h_edge_counts(edge_counts.size()); diff --git a/cpp/tests/utilities/test_graphs.hpp b/cpp/tests/utilities/test_graphs.hpp index 9fa4cee9f7a..dc0f13fc9f0 100644 --- a/cpp/tests/utilities/test_graphs.hpp +++ b/cpp/tests/utilities/test_graphs.hpp @@ -29,6 +29,53 @@ namespace test { namespace detail { +template +std::optional> try_allocate(raft::handle_t const& handle, size_t size) +{ + try { + return std::make_optional>(size, handle.get_stream()); + } catch (std::exception const& e) { + return std::nullopt; + } +} + +// use host memory as temporary buffer if memroy allocation on device fails +template +rmm::device_uvector concatenate(raft::handle_t const& handle, + std::vector>&& inputs) +{ + size_t tot_count{0}; + for (size_t i = 0; i < inputs.size(); ++i) { + tot_count += inputs[i].size(); + } + + auto output = try_allocate(handle, tot_count); + if (output) { + size_t offset{0}; + for (size_t i = 0; i < inputs.size(); ++i) { + raft::copy( + (*output).data() + offset, inputs[i].data(), inputs[i].size(), handle.get_stream()); + offset += inputs[i].size(); + } + inputs.clear(); + inputs.shrink_to_fit(); + } else { + std::vector h_buffer(tot_count); + size_t offset{0}; + for (size_t i = 0; i < inputs.size(); ++i) { + raft::update_host( + h_buffer.data() + offset, inputs[i].data(), inputs[i].size(), handle.get_stream()); + offset += inputs[i].size(); + } + inputs.clear(); + inputs.shrink_to_fit(); + output = rmm::device_uvector(tot_count, handle.get_stream()); + raft::update_device((*output).data(), h_buffer.data(), h_buffer.size(), handle.get_stream()); + } + + return std::move(*output); +} + class TranslateGraph_Usecase { public: TranslateGraph_Usecase() = delete; @@ -146,23 +193,33 @@ class Rmat_Usecase : public detail::TranslateGraph_Usecase { CUGRAPH_EXPECTS(((size_t{1} << scale_) * edge_factor_) <= static_cast(std::numeric_limits::max()), "Invalid template parameter: (scale_, edge_factor_) too large for edge_t"); + // generate in multi-partitions to limit peak memory usage (thrust::sort & + // shuffle_edgelist_by_gpu_id requires a temporary buffer with the size of the original data) + // With the current implementation, the temporary memory requirement is roughly 50% of the + // original data with num_partitions_per_gpu = 2. If we use cuMemAddressReserve + // (https://developer.nvidia.com/blog/introducing-low-level-gpu-virtual-memory-management), we + // can reduce the temporary memory requirement to (1 / num_partitions) * (original data size) + size_t constexpr num_partitions_per_gpu = 2; + + // 1. calculate # partitions, # edges to generate in each partition, and partition vertex ranges - std::vector partition_ids(1); - size_t num_partitions; + std::vector partition_ids{}; + size_t num_partitions{}; if (multi_gpu_usecase_) { auto& comm = handle.get_comms(); - num_partitions = comm.get_size(); + num_partitions = comm.get_size() * num_partitions_per_gpu; auto const comm_rank = comm.get_rank(); - partition_ids.resize(multi_gpu ? size_t{1} : static_cast(num_partitions)); + partition_ids.resize(multi_gpu ? num_partitions_per_gpu : num_partitions); std::iota(partition_ids.begin(), partition_ids.end(), - multi_gpu ? static_cast(comm_rank) : size_t{0}); + multi_gpu ? static_cast(comm_rank) * num_partitions_per_gpu : size_t{0}); } else { - num_partitions = 1; - partition_ids[0] = size_t{0}; + num_partitions = num_partitions_per_gpu; + partition_ids.resize(num_partitions); + std::iota(partition_ids.begin(), partition_ids.end(), size_t{0}); } vertex_t number_of_vertices = static_cast(size_t{1} << scale_); @@ -191,17 +248,20 @@ class Rmat_Usecase : public detail::TranslateGraph_Usecase { } } - rmm::device_uvector src_v(0, handle.get_stream()); - rmm::device_uvector dst_v(0, handle.get_stream()); - auto weights_v = test_weighted - ? std::make_optional>(0, handle.get_stream()) - : std::nullopt; + // 2. generate edges + + std::vector> src_partitions{}; + std::vector> dst_partitions{}; + auto weight_partitions = test_weighted + ? std::make_optional>>() + : std::nullopt; + src_partitions.reserve(partition_ids.size()); + dst_partitions.reserve(partition_ids.size()); + if (weight_partitions) { (*weight_partitions).reserve(partition_ids.size()); } for (size_t i = 0; i < partition_ids.size(); ++i) { auto id = partition_ids[i]; - rmm::device_uvector tmp_src_v(0, handle.get_stream()); - rmm::device_uvector tmp_dst_v(0, handle.get_stream()); - std::tie(i == 0 ? src_v : tmp_src_v, i == 0 ? dst_v : tmp_dst_v) = + auto [tmp_src_v, tmp_dst_v] = cugraph::generate_rmat_edgelist(handle, scale_, partition_edge_counts[i], @@ -212,79 +272,85 @@ class Rmat_Usecase : public detail::TranslateGraph_Usecase { undirected_ ? true : false); std::optional> tmp_weights_v{std::nullopt}; - if (weights_v) { - if (i == 0) { - weights_v->resize(src_v.size(), handle.get_stream()); - } else { - tmp_weights_v = std::make_optional>(tmp_src_v.size(), - handle.get_stream()); - } + if (weight_partitions) { + tmp_weights_v = + std::make_optional>(tmp_src_v.size(), handle.get_stream()); cugraph::detail::uniform_random_fill(handle.get_stream(), - i == 0 ? weights_v->data() : tmp_weights_v->data(), - i == 0 ? weights_v->size() : tmp_weights_v->size(), + tmp_weights_v->data(), + tmp_weights_v->size(), weight_t{0.0}, weight_t{1.0}, seed_ + num_partitions + id); } - if (i > 0) { - auto start_offset = src_v.size(); - src_v.resize(start_offset + tmp_src_v.size(), handle.get_stream()); - dst_v.resize(start_offset + tmp_dst_v.size(), handle.get_stream()); - raft::copy( - src_v.begin() + start_offset, tmp_src_v.begin(), tmp_src_v.size(), handle.get_stream()); - raft::copy( - dst_v.begin() + start_offset, tmp_dst_v.begin(), tmp_dst_v.size(), handle.get_stream()); - - if (weights_v) { - weights_v->resize(start_offset + tmp_weights_v->size(), handle.get_stream()); - raft::copy(weights_v->begin() + start_offset, - tmp_weights_v->begin(), - tmp_weights_v->size(), - handle.get_stream()); - } + translate(handle, tmp_src_v, tmp_dst_v); + + if (undirected_) { + std::tie(tmp_src_v, tmp_dst_v, tmp_weights_v) = + cugraph::symmetrize_edgelist_from_triangular( + handle, std::move(tmp_src_v), std::move(tmp_dst_v), std::move(tmp_weights_v)); + } + + if (multi_gpu) { + std::tie(store_transposed ? tmp_dst_v : tmp_src_v, + store_transposed ? tmp_src_v : tmp_dst_v, + tmp_weights_v) = + cugraph::detail::shuffle_edgelist_by_gpu_id( + handle, + store_transposed ? std::move(tmp_dst_v) : std::move(tmp_src_v), + store_transposed ? std::move(tmp_src_v) : std::move(tmp_dst_v), + std::move(tmp_weights_v)); } - } - translate(handle, src_v, dst_v); + src_partitions.push_back(std::move(tmp_src_v)); + dst_partitions.push_back(std::move(tmp_dst_v)); + if (weight_partitions) { (*weight_partitions).push_back(std::move(*tmp_weights_v)); } + } - if (undirected_) - std::tie(src_v, dst_v, weights_v) = - cugraph::symmetrize_edgelist_from_triangular( - handle, std::move(src_v), std::move(dst_v), std::move(weights_v)); + size_t tot_edge_counts{0}; + for (size_t i = 0; i < src_partitions.size(); ++i) { + tot_edge_counts += src_partitions[i].size(); + } - if (multi_gpu) { - std::tie(store_transposed ? dst_v : src_v, store_transposed ? src_v : dst_v, weights_v) = - cugraph::detail::shuffle_edgelist_by_gpu_id( - handle, - store_transposed ? std::move(dst_v) : std::move(src_v), - store_transposed ? std::move(src_v) : std::move(dst_v), - std::move(weights_v)); + // detail::concatenate uses a host buffer to store input vectors if initial device memory + // allocation for the return vector fails. This does not improve peak memory usage and is not + // helpful with the rmm_mode = cuda. However, if rmm_mode = pool, memory allocation can fail + // even when the aggregate free memory size far exceeds the requested size. This heuristic is + // helpful in this case. + + auto src_v = detail::concatenate(handle, std::move(src_partitions)); + auto dst_v = detail::concatenate(handle, std::move(dst_partitions)); + std::optional> weight_v{std::nullopt}; + if (weight_partitions) { + weight_v = detail::concatenate(handle, std::move(*weight_partitions)); } - rmm::device_uvector vertices_v(0, handle.get_stream()); - for (size_t i = 0; i < partition_ids.size(); ++i) { - auto id = partition_ids[i]; + // 3. generate vertices - auto start_offset = vertices_v.size(); - vertices_v.resize(start_offset + (partition_vertex_lasts[i] - partition_vertex_firsts[i]), - handle.get_stream()); + size_t tot_vertex_counts{0}; + for (size_t i = 0; i < partition_vertex_firsts.size(); ++i) { + tot_vertex_counts += partition_vertex_lasts[i] - partition_vertex_firsts[i]; + } + rmm::device_uvector vertex_v(tot_vertex_counts, handle.get_stream()); + size_t v_offset{0}; + for (size_t i = 0; i < partition_vertex_firsts.size(); ++i) { cugraph::detail::sequence_fill(handle.get_stream(), - vertices_v.begin() + start_offset, - vertices_v.size() - start_offset, + vertex_v.begin() + v_offset, + partition_vertex_lasts[i] - partition_vertex_firsts[i], partition_vertex_firsts[i]); + v_offset += partition_vertex_lasts[i] - partition_vertex_firsts[i]; } if constexpr (multi_gpu) { - vertices_v = cugraph::detail::shuffle_vertices_by_gpu_id(handle, std::move(vertices_v)); + vertex_v = cugraph::detail::shuffle_vertices_by_gpu_id(handle, std::move(vertex_v)); } return std::make_tuple( std::move(src_v), std::move(dst_v), - std::move(weights_v), - std::move(vertices_v), + std::move(weight_v), + std::move(vertex_v), static_cast(detail::TranslateGraph_Usecase::base_vertex_id_) + number_of_vertices, undirected_); }