Skip to content

Commit

Permalink
Some MTMG code cleanup and small optimizations (#3894)
Browse files Browse the repository at this point in the history
Added some missing documentation.

A couple of optimizations:
 * Modified the `append` logic to keep the mutex lock only long enough to compute what needs to be copied and where.
 * Modified the handle created by the resource manager for each GPU to have a stream pool to enable different threads to operate on different streams.

Authors:
  - Chuck Hastings (https://github.com/ChuckHastings)

Approvers:
  - Seunghwa Kang (https://github.com/seunghwak)

URL: #3894
  • Loading branch information
ChuckHastings authored Oct 24, 2023
1 parent d9a8dc7 commit 9b28458
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 63 deletions.
116 changes: 79 additions & 37 deletions cpp/include/cugraph/mtmg/detail/per_device_edgelist.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include <cugraph/detail/shuffle_wrappers.hpp>
#include <cugraph/mtmg/handle.hpp>

// FIXME: Could use std::span once compiler supports C++20
Expand Down Expand Up @@ -58,6 +59,15 @@ class per_device_edgelist_t {
per_device_edgelist_t& operator=(per_device_edgelist_t const&) = delete;
per_device_edgelist_t& operator=(per_device_edgelist_t&&) = delete;

/**
* @brief Construct a new per device edgelist t object
*
* @param handle MTMG resource handle - used to identify GPU resources
* @param device_buffer_size Number of edges to store in each device buffer
* @param use_weight Whether or not the edgelist will have weights
* @param use_edge_id Whether or not the edgelist will have edge ids
* @param use_edge_type Whether or not the edgelist will have edge types
*/
per_device_edgelist_t(cugraph::mtmg::handle_t const& handle,
size_t device_buffer_size,
bool use_weight,
Expand All @@ -82,6 +92,11 @@ class per_device_edgelist_t {
create_new_buffers(handle);
}

/**
* @brief Move construct a new per device edgelist t object
*
* @param other Object to move into this instance
*/
per_device_edgelist_t(per_device_edgelist_t&& other)
: device_buffer_size_{other.device_buffer_size_},
current_pos_{other.current_pos_},
Expand Down Expand Up @@ -110,45 +125,72 @@ class per_device_edgelist_t {
std::optional<raft::host_span<edge_t const>> edge_id,
std::optional<raft::host_span<edge_type_t const>> edge_type)
{
// FIXME: This lock guard could be on a smaller region, but it
// would require more careful coding. The raft::update_device
// calls could be done without the lock if we made a local
// of the values of *.back() and did an increment of current_pos_
// while we hold the lock.
std::lock_guard<std::mutex> lock(lock_);

size_t count = src.size();
size_t pos = 0;

while (count > 0) {
size_t copy_count = std::min(count, (src_.back().size() - current_pos_));

raft::update_device(
src_.back().begin() + current_pos_, src.begin() + pos, copy_count, handle.get_stream());
raft::update_device(
dst_.back().begin() + current_pos_, dst.begin() + pos, copy_count, handle.get_stream());
if (wgt)
raft::update_device(
wgt_->back().begin() + current_pos_, wgt->begin() + pos, copy_count, handle.get_stream());
if (edge_id)
raft::update_device(edge_id_->back().begin() + current_pos_,
edge_id->begin() + pos,
copy_count,
handle.get_stream());
if (edge_type)
raft::update_device(edge_type_->back().begin() + current_pos_,
edge_type->begin() + pos,
copy_count,
handle.get_stream());

count -= copy_count;
pos += copy_count;
current_pos_ += copy_count;

if (current_pos_ == src_.back().size()) { create_new_buffers(handle); }
std::vector<std::tuple<size_t, size_t, size_t, size_t>> copy_positions;

{
std::lock_guard<std::mutex> lock(lock_);

size_t count = src.size();
size_t pos = 0;

while (count > 0) {
size_t copy_count = std::min(count, (src_.back().size() - current_pos_));

copy_positions.push_back(std::make_tuple(src_.size() - 1, current_pos_, pos, copy_count));

count -= copy_count;
pos += copy_count;
current_pos_ += copy_count;

if (current_pos_ == src_.back().size()) { create_new_buffers(handle); }
}
}

handle.raft_handle().sync_stream();
std::for_each(copy_positions.begin(),
copy_positions.end(),
[&handle,
&this_src = src_,
&src,
&this_dst = dst_,
&dst,
&this_wgt = wgt_,
&wgt,
&this_edge_id = edge_id_,
&edge_id,
&this_edge_type = edge_type_,
&edge_type](auto tuple) {
auto [buffer_idx, buffer_pos, input_pos, copy_count] = tuple;

raft::update_device(this_src[buffer_idx].begin() + buffer_pos,
src.begin() + input_pos,
copy_count,
handle.get_stream());

raft::update_device(this_dst[buffer_idx].begin() + buffer_pos,
dst.begin() + input_pos,
copy_count,
handle.get_stream());

if (this_wgt)
raft::update_device((*this_wgt)[buffer_idx].begin() + buffer_pos,
wgt->begin() + input_pos,
copy_count,
handle.get_stream());

if (this_edge_id)
raft::update_device((*this_edge_id)[buffer_idx].begin() + buffer_pos,
edge_id->begin() + input_pos,
copy_count,
handle.get_stream());

if (this_edge_type)
raft::update_device((*this_edge_type)[buffer_idx].begin() + buffer_pos,
edge_type->begin() + input_pos,
copy_count,
handle.get_stream());
});

handle.sync_stream();
}

/**
Expand Down
35 changes: 34 additions & 1 deletion cpp/include/cugraph/mtmg/handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

#include <raft/core/handle.hpp>

#include <rmm/exec_policy.hpp>

namespace cugraph {
namespace mtmg {

Expand Down Expand Up @@ -60,10 +62,41 @@ class handle_t {
rmm::cuda_stream_view get_stream() const
{
return raft_handle_.is_stream_pool_initialized()
? raft_handle_.get_stream_from_stream_pool(device_id_)
? raft_handle_.get_stream_from_stream_pool(thread_rank_)
: raft_handle_.get_stream();
}

/**
* @brief Sync on the cuda stream
*
* @param stream Which stream to synchronize (defaults to the stream for this handle)
*/
void sync_stream(rmm::cuda_stream_view stream) const { raft_handle_.sync_stream(stream); }

/**
* @brief Sync on the cuda stream for this handle
*/
void sync_stream() const { sync_stream(get_stream()); }

/**
* @brief get thrust policy for the stream
*
* @param stream Which stream to use for this thrust call
*
* @return exec policy using the current stream
*/
rmm::exec_policy get_thrust_policy(rmm::cuda_stream_view stream) const
{
return rmm::exec_policy(stream);
}

/**
* @brief get thrust policy for the stream for this handle
*
* @return exec policy using the current stream
*/
rmm::exec_policy get_thrust_policy() const { return get_thrust_policy(get_stream()); }

/**
* @brief Get thread rank
*
Expand Down
12 changes: 6 additions & 6 deletions cpp/include/cugraph/mtmg/instance_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,18 @@ class instance_manager_t {
* the request. Threads will be assigned to GPUs in a round-robin fashion to
* spread requesting threads around the GPU resources.
*
* This function will be CPU thread-safe.
* This function is CPU thread-safe.
*
* @return a handle for this thread.
*/
handle_t get_handle()
{
int local_id = thread_counter_++;
int local_id = thread_counter_++;
int gpu_id = local_id % raft_handle_.size();
int thread_id = local_id / raft_handle_.size();

RAFT_CUDA_TRY(cudaSetDevice(device_ids_[local_id % raft_handle_.size()].value()));
return handle_t(*raft_handle_[local_id % raft_handle_.size()],
local_id / raft_handle_.size(),
static_cast<size_t>(local_id % raft_handle_.size()));
RAFT_CUDA_TRY(cudaSetDevice(device_ids_[gpu_id].value()));
return handle_t(*raft_handle_[gpu_id], thread_id, static_cast<size_t>(gpu_id));
}

/**
Expand Down
15 changes: 9 additions & 6 deletions cpp/include/cugraph/mtmg/resource_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
#include <raft/comms/std_comms.hpp>

#include <rmm/cuda_device.hpp>
#include <rmm/exec_policy.hpp>
#include <rmm/mr/device/cuda_memory_resource.hpp>
#include <rmm/mr/device/owning_wrapper.hpp>
#include <rmm/mr/device/pool_memory_resource.hpp>

#include <execution>
Expand Down Expand Up @@ -121,11 +121,14 @@ class resource_manager_t {
* @param instance_manager_id a ncclUniqueId that is shared by all processes participating
* in this instance. All processes must use the same ID in this call, it is up
* to the calling code to share this ID properly before the call.
* @param n_streams The number of streams to create in a stream pool for
* each GPU. Defaults to 16.
*
* @return unique pointer to instance manager
*/
std::unique_ptr<instance_manager_t> create_instance_manager(
std::vector<int> ranks_to_include, ncclUniqueId instance_manager_id) const
std::unique_ptr<instance_manager_t> create_instance_manager(std::vector<int> ranks_to_include,
ncclUniqueId instance_manager_id,
size_t n_streams = 16) const
{
std::for_each(
ranks_to_include.begin(), ranks_to_include.end(), [local_ranks = local_rank_map_](int rank) {
Expand Down Expand Up @@ -153,11 +156,11 @@ class resource_manager_t {
auto pos = local_rank_map_.find(rank);
RAFT_CUDA_TRY(cudaSetDevice(pos->second.value()));

raft::handle_t tmp_handle;

nccl_comms.push_back(std::make_unique<ncclComm_t>());
handles.push_back(
std::make_unique<raft::handle_t>(tmp_handle, per_device_rmm_resources_.find(rank)->second));
std::make_unique<raft::handle_t>(rmm::cuda_stream_per_thread,
std::make_shared<rmm::cuda_stream_pool>(n_streams),
per_device_rmm_resources_.find(rank)->second));
device_ids.push_back(pos->second);
}

Expand Down
2 changes: 2 additions & 0 deletions cpp/include/cugraph/mtmg/vertex_result_view.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <cugraph/mtmg/handle.hpp>
#include <cugraph/mtmg/renumber_map.hpp>

#include <optional>

namespace cugraph {
namespace mtmg {

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/mtmg/vertex_result.cu
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ rmm::device_uvector<result_t> vertex_result_view_t<result_t>::gather(
return vertex_partition.local_vertex_partition_offset_from_vertex_nocheck(v);
});

thrust::gather(handle.raft_handle().get_thrust_policy(),
thrust::gather(handle.get_thrust_policy(),
iter,
iter + local_vertices.size(),
wrapped.begin(),
Expand All @@ -118,7 +118,7 @@ rmm::device_uvector<result_t> vertex_result_view_t<result_t>::gather(
//
// Finally, reorder result
//
thrust::scatter(handle.raft_handle().get_thrust_policy(),
thrust::scatter(handle.get_thrust_policy(),
tmp_result.begin(),
tmp_result.end(),
vertex_pos.begin(),
Expand Down
16 changes: 5 additions & 11 deletions cpp/tests/mtmg/threaded_test.cu
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,9 @@ class Tests_Multithreaded
size_t device_buffer_size{64 * 1024 * 1024};
size_t thread_buffer_size{4 * 1024 * 1024};

const int num_threads_per_gpu{4};
int num_gpus = gpu_list.size();
int num_threads = num_gpus * 4;
int num_threads = num_gpus * num_threads_per_gpu;

cugraph::mtmg::resource_manager_t resource_manager;

Expand All @@ -106,8 +107,10 @@ class Tests_Multithreaded
ncclUniqueId instance_manager_id;
ncclGetUniqueId(&instance_manager_id);

// Currently the only uses for multiple streams for each CPU threads
// associated with a particular GPU, which is a constant set above
auto instance_manager = resource_manager.create_instance_manager(
resource_manager.registered_ranks(), instance_manager_id);
resource_manager.registered_ranks(), instance_manager_id, num_threads_per_gpu);

cugraph::mtmg::edgelist_t<vertex_t, weight_t, edge_t, edge_type_t> edgelist;
cugraph::mtmg::graph_t<vertex_t, edge_t, true, multi_gpu> graph;
Expand Down Expand Up @@ -172,15 +175,6 @@ class Tests_Multithreaded
per_thread_edgelist(edgelist.get(thread_handle), thread_buffer_size);

for (size_t j = i; j < h_src_v.size(); j += num_threads) {
#if 0
if (h_weights_v) {
thread_edgelist.append(
thread_handle, h_src_v[j], h_dst_v[j], (*h_weights_v)[j], std::nullopt, std::nullopt);
} else {
thread_edgelist.append(
thread_handle, h_src_v[j], h_dst_v[j], std::nullopt, std::nullopt, std::nullopt);
}
#endif
per_thread_edgelist.append(
thread_handle,
h_src_v[j],
Expand Down

0 comments on commit 9b28458

Please sign in to comment.