diff --git a/cpp/include/patterns/copy_to_adj_matrix_row_col.cuh b/cpp/include/patterns/copy_to_adj_matrix_row_col.cuh index 0aac0c0d053..521b8cf63f5 100644 --- a/cpp/include/patterns/copy_to_adj_matrix_row_col.cuh +++ b/cpp/include/patterns/copy_to_adj_matrix_row_col.cuh @@ -206,6 +206,9 @@ void copy_to_matrix_minor(raft::handle_t const& handle, (graph_view.get_vertex_partition_first(comm_src_rank) - graph_view.get_vertex_partition_first(row_comm_rank * col_comm_size))); } else { + CUDA_TRY(cudaStreamSynchronize( + handle.get_stream())); // to ensure data to be sent are ready (FIXME: this can be removed + // if we use ncclSend in raft::comms) auto constexpr tuple_size = thrust_tuple_size_or_one< typename std::iterator_traits::value_type>::value; std::vector requests(2 * tuple_size); @@ -332,6 +335,10 @@ void copy_to_matrix_minor(raft::handle_t const& handle, vertex_value_input_first, src_value_first); + CUDA_TRY(cudaStreamSynchronize( + handle.get_stream())); // to ensure data to be sent are ready (FIXME: this can be removed + // if we use ncclSend in raft::comms) + std::vector value_requests(2 * (1 + tuple_size)); device_isend(comm, vertex_first, diff --git a/cpp/include/patterns/copy_v_transform_reduce_in_out_nbr.cuh b/cpp/include/patterns/copy_v_transform_reduce_in_out_nbr.cuh index 0676b5037cc..b0d576fd903 100644 --- a/cpp/include/patterns/copy_v_transform_reduce_in_out_nbr.cuh +++ b/cpp/include/patterns/copy_v_transform_reduce_in_out_nbr.cuh @@ -572,6 +572,9 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle, minor_buffer_first + offset + size, vertex_value_output_first); } else { + CUDA_TRY(cudaStreamSynchronize( + handle.get_stream())); // to ensure data to be sent are ready (FIXME: this can be removed + // if we use ncclSend in raft::comms) auto constexpr tuple_size = thrust_tuple_size_or_one< typename std::iterator_traits::value_type>::value; std::vector requests(2 * tuple_size); diff --git a/cpp/include/patterns/update_frontier_v_push_if_out_nbr.cuh b/cpp/include/patterns/update_frontier_v_push_if_out_nbr.cuh index 1a5f7ecece6..a76cf1ee7f3 100644 --- a/cpp/include/patterns/update_frontier_v_push_if_out_nbr.cuh +++ b/cpp/include/patterns/update_frontier_v_push_if_out_nbr.cuh @@ -620,11 +620,16 @@ void update_frontier_v_push_if_out_nbr( buffer_requests.data() + (i + 1) * (1 + tuple_size), std::numeric_limits::max()); } else { - comm.isend(detail::iter_to_raw_ptr(buffer_key_first + tx_offsets[i]), - static_cast(tx_counts[i]), - comm_dst_rank, - int{0} /* tag */, - buffer_requests.data() + i * (1 + tuple_size)); + CUDA_TRY(cudaStreamSynchronize( + handle.get_stream())); // to ensure data to be sent are ready (FIXME: this can be removed + // if we use ncclSend in raft::comms) + + device_isend(comm, + detail::iter_to_raw_ptr(buffer_key_first + tx_offsets[i]), + static_cast(tx_counts[i]), + comm_dst_rank, + int{0} /* tag */, + buffer_requests.data() + i * (1 + tuple_size)); device_isend( comm, buffer_payload_first + tx_offsets[i], @@ -654,11 +659,13 @@ void update_frontier_v_push_if_out_nbr( buffer_requests.data() + (tx_counts.size() + i + 1) * (1 + tuple_size), std::numeric_limits::max()); } else { - comm.irecv(detail::iter_to_raw_ptr(buffer_key_first + num_buffer_elements + rx_offsets[i]), - static_cast(rx_counts[i]), - comm_src_rank, - int{0} /* tag */, - buffer_requests.data() + ((tx_counts.size() + i) * (1 + tuple_size))); + device_irecv( + comm, + detail::iter_to_raw_ptr(buffer_key_first + num_buffer_elements + rx_offsets[i]), + static_cast(rx_counts[i]), + comm_src_rank, + int{0} /* tag */, + buffer_requests.data() + ((tx_counts.size() + i) * (1 + tuple_size))); device_irecv( comm, buffer_payload_first + num_buffer_elements + rx_offsets[i],