Skip to content

Commit

Permalink
in copy_v_transform_reduce_in|out_nbr, implement missing communicatio…
Browse files Browse the repository at this point in the history
…n alnog the minor direction
  • Loading branch information
seunghwak committed Oct 3, 2020
1 parent 7463576 commit 6e1b152
Showing 1 changed file with 153 additions and 73 deletions.
226 changes: 153 additions & 73 deletions cpp/include/patterns/copy_v_transform_reduce_in_out_nbr.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -359,58 +359,73 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle,
: static_cast<size_t>(row_comm_size);
}

for (size_t i = 0; i < loop_count; ++i) {
matrix_partition_device_t<GraphViewType> matrix_partition(
graph_view, (GraphViewType::is_multi_gpu && !graph_view.is_hypergraph_partitioned()) ? 0 : i);

auto tmp_buffer_size = vertex_t{0};
auto minor_tmp_buffer_size =
(GraphViewType::is_multi_gpu && (in != GraphViewType::is_adj_matrix_transposed))
? GraphViewType::is_adj_matrix_transposed
? graph_view.get_number_of_local_adj_matrix_partition_rows()
: graph_view.get_number_of_local_adj_matrix_partition_cols()
: vertex_t{0};
auto minor_tmp_buffer = allocate_comm_buffer<T>(minor_tmp_buffer_size, handle.get_stream());
auto minor_buffer_first = get_comm_buffer_begin<T>(minor_tmp_buffer);

if (in != GraphViewType::is_adj_matrix_transposed) {
auto minor_init = init;
if (GraphViewType::is_multi_gpu) {
auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name());
auto const row_comm_size = row_comm.get_size();
auto const row_comm_rank = row_comm.get_rank();
auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name());
auto const col_comm_rank = col_comm.get_rank();
minor_init = graph_view.is_hypergraph_partitioned() ? (row_comm_rank == 0) ? init : T{}
: (col_comm_rank == 0) ? init : T{};
}

tmp_buffer_size =
in ? GraphViewType::is_adj_matrix_transposed
? graph_view.is_hypergraph_partitioned()
? matrix_partition.get_major_size()
: graph_view.get_vertex_partition_size(col_comm_rank * row_comm_size + i)
: matrix_partition.get_minor_size()
: GraphViewType::is_adj_matrix_transposed
? matrix_partition.get_minor_size()
: graph_view.is_hypergraph_partitioned()
? matrix_partition.get_major_size()
: graph_view.get_vertex_partition_size(col_comm_rank * row_comm_size + i);
if (GraphViewType::is_multi_gpu) {
thrust::fill(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()),
minor_buffer_first,
minor_buffer_first + minor_tmp_buffer_size,
minor_init);
} else {
thrust::fill(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()),
vertex_value_output_first,
vertex_value_output_first + graph_view.get_number_of_local_vertices(),
minor_init);
}
auto tmp_buffer = allocate_comm_buffer<T>(tmp_buffer_size, handle.get_stream());
auto buffer_first = get_comm_buffer_begin<T>(tmp_buffer);
} else {
assert(minor_tmp_buffer_size == 0);
}

auto local_init = init;
for (size_t i = 0; i < loop_count; ++i) {
matrix_partition_device_t<GraphViewType> matrix_partition(
graph_view, (GraphViewType::is_multi_gpu && !graph_view.is_hypergraph_partitioned()) ? 0 : i);

auto major_tmp_buffer_size = vertex_t{0};
if (GraphViewType::is_multi_gpu) {
auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name());
auto const row_comm_rank = row_comm.get_rank();
auto const row_comm_size = row_comm.get_size();
auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name());
auto const col_comm_rank = col_comm.get_rank();
if (in == GraphViewType::is_adj_matrix_transposed) {
local_init = graph_view.is_hypergraph_partitioned() ? (col_comm_rank == 0) ? init : T{}
: (row_comm_rank == 0) ? init : T{};
} else {
local_init = graph_view.is_hypergraph_partitioned() ? (row_comm_rank == 0) ? init : T{}
: (col_comm_rank == 0) ? init : T{};
}

major_tmp_buffer_size =
(in == GraphViewType::is_adj_matrix_transposed)
? graph_view.is_hypergraph_partitioned()
? matrix_partition.get_major_size()
: graph_view.get_vertex_partition_size(col_comm_rank * row_comm_size + i)
: vertex_t{0};
}
auto major_tmp_buffer = allocate_comm_buffer<T>(major_tmp_buffer_size, handle.get_stream());
auto major_buffer_first = get_comm_buffer_begin<T>(major_tmp_buffer);

if (in != GraphViewType::is_adj_matrix_transposed) {
auto major_init = T{};
if (in == GraphViewType::is_adj_matrix_transposed) {
if (GraphViewType::is_multi_gpu) {
thrust::fill(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()),
buffer_first,
buffer_first + tmp_buffer_size,
local_init);
auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name());
auto const row_comm_rank = row_comm.get_rank();
auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name());
auto const col_comm_rank = col_comm.get_rank();
major_init = graph_view.is_hypergraph_partitioned() ? (col_comm_rank == 0) ? init : T{}
: (row_comm_rank == 0) ? init : T{};
} else {
thrust::fill(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()),
vertex_value_output_first,
vertex_value_output_first + graph_view.get_number_of_local_vertices(),
local_init);
major_init = init;
}
}

Expand Down Expand Up @@ -457,9 +472,9 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle,
graph_view.get_vertex_partition_last(comm_root_rank),
adj_matrix_row_value_input_first + row_value_input_offset,
adj_matrix_col_value_input_first + col_value_input_offset,
buffer_first,
(in == GraphViewType::is_adj_matrix_transposed) ? major_buffer_first : minor_buffer_first,
e_op,
local_init);
major_init);
} else {
detail::for_all_major_for_all_nbr_low_degree<in == GraphViewType::is_adj_matrix_transposed>
<<<update_grid.num_blocks, update_grid.block_size, 0, handle.get_stream()>>>(
Expand All @@ -470,43 +485,108 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle,
adj_matrix_col_value_input_first,
vertex_value_output_first,
e_op,
local_init);
major_init);
}

if (GraphViewType::is_multi_gpu) {
if (in == GraphViewType::is_adj_matrix_transposed) {
auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name());
auto const row_comm_rank = row_comm.get_rank();
auto const row_comm_size = row_comm.get_size();
auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name());
auto const col_comm_rank = col_comm.get_rank();
auto const col_comm_size = col_comm.get_size();

if (graph_view.is_hypergraph_partitioned()) {
device_reduce(
col_comm,
buffer_first,
vertex_value_output_first,
static_cast<size_t>(graph_view.get_vertex_partition_size(i * row_comm_size + i)),
raft::comms::op_t::SUM,
i,
handle.get_stream());
} else {
for (int j = 0; j < row_comm_size; ++j) {
auto comm_root_rank = col_comm_rank * row_comm_size + j;
device_reduce(
row_comm,
buffer_first + (graph_view.get_vertex_partition_first(comm_root_rank) -
graph_view.get_vertex_partition_first(col_comm_rank * row_comm_size)),
vertex_value_output_first,
static_cast<size_t>(graph_view.get_vertex_partition_size(comm_root_rank)),
raft::comms::op_t::SUM,
j,
handle.get_stream());
}
}
if (GraphViewType::is_multi_gpu && (in == GraphViewType::is_adj_matrix_transposed)) {
auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name());
auto const row_comm_rank = row_comm.get_rank();
auto const row_comm_size = row_comm.get_size();
auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name());
auto const col_comm_rank = col_comm.get_rank();
auto const col_comm_size = col_comm.get_size();

if (graph_view.is_hypergraph_partitioned()) {
device_reduce(
col_comm,
major_buffer_first,
vertex_value_output_first,
static_cast<size_t>(graph_view.get_vertex_partition_size(i * row_comm_size + i)),
raft::comms::op_t::SUM,
i,
handle.get_stream());
} else {
CUGRAPH_FAIL("unimplemented.");
device_reduce(row_comm,
major_buffer_first,
vertex_value_output_first,
static_cast<size_t>(
graph_view.get_vertex_partition_size(col_comm_rank * row_comm_size + i)),
raft::comms::op_t::SUM,
i,
handle.get_stream());
}
}
}

if (GraphViewType::is_multi_gpu && (in != GraphViewType::is_adj_matrix_transposed)) {
auto& comm = handle.get_comms();
auto const comm_rank = comm.get_rank();
auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name());
auto const row_comm_rank = row_comm.get_rank();
auto const row_comm_size = row_comm.get_size();
auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name());
auto const col_comm_rank = col_comm.get_rank();
auto const col_comm_size = col_comm.get_size();

if (graph_view.is_hypergraph_partitioned()) {
CUGRAPH_FAIL("unimplemented.");
} else {
for (int i = 0; i < col_comm_size; ++i) {
auto offset = (graph_view.get_vertex_partition_first(row_comm_rank * col_comm_size + i) -
graph_view.get_vertex_partition_first(row_comm_rank * col_comm_size));
auto size = static_cast<size_t>(
graph_view.get_vertex_partition_size(row_comm_rank * col_comm_size + i));
device_reduce(col_comm,
minor_buffer_first + offset,
minor_buffer_first + offset,
size,
raft::comms::op_t::SUM,
i,
handle.get_stream());
}

// FIXME: this P2P is unnecessary if we apply the partitioning scheme used with hypergraph
// partitioning
auto comm_src_rank = (comm_rank % col_comm_size) * row_comm_size + comm_rank / col_comm_size;
auto comm_dst_rank = row_comm_rank * col_comm_size + col_comm_rank;
// FIXME: it seems like raft::isend and raft::irecv do not properly handle the destination (or
// source) == self case. Need to double check and fix this if this is indeed the case (or RAFT
// may use ncclSend/ncclRecv instead of UCX for device data).
if (comm_src_rank == comm_rank) {
assert(comm_dst_rank == comm_rank);
auto offset =
graph_view.get_vertex_partition_first(row_comm_rank * col_comm_size + col_comm_rank) -
graph_view.get_vertex_partition_first(row_comm_rank * col_comm_size);
auto size = static_cast<size_t>(
graph_view.get_vertex_partition_size(row_comm_rank * col_comm_size + col_comm_rank));
thrust::copy(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()),
minor_buffer_first + offset,
minor_buffer_first + offset + size,
vertex_value_output_first);
} else {
auto constexpr tuple_size = thrust_tuple_size_or_one<
typename std::iterator_traits<VertexValueOutputIterator>::value_type>::value;
std::vector<raft::comms::request_t> requests(2 * tuple_size);
device_isend<decltype(minor_buffer_first), VertexValueOutputIterator>(
comm,
minor_buffer_first +
(graph_view.get_vertex_partition_first(row_comm_rank * col_comm_size + col_comm_rank) -
graph_view.get_vertex_partition_first(row_comm_rank * col_comm_size)),
static_cast<size_t>(
graph_view.get_vertex_partition_size(row_comm_rank * col_comm_size + col_comm_rank)),
comm_dst_rank,
int{0} /* base_tag */,
requests.data());
device_irecv<decltype(minor_buffer_first), VertexValueOutputIterator>(
comm,
vertex_value_output_first,
static_cast<size_t>(graph_view.get_vertex_partition_size(comm_rank)),
comm_src_rank,
int{0} /* base_tag */,
requests.data() + tuple_size);
// FIXME: this waitall can fail if VertexValueOutputIterator is a discard iterator or a zip
// iterator having one or more discard iterator
comm.waitall(requests.size(), requests.data());
}
}
}
Expand Down

0 comments on commit 6e1b152

Please sign in to comment.