From 93ffb1fea3d86e91f079b0c09c13a4b352cd61d8 Mon Sep 17 00:00:00 2001 From: Fabian Knorr Date: Tue, 10 Dec 2024 19:56:31 +0100 Subject: [PATCH 1/4] Cache set of active transfers in receive_arbiter --- include/receive_arbiter.h | 3 +++ include/utils.h | 6 ------ src/receive_arbiter.cc | 24 +++++++++++++++--------- 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/include/receive_arbiter.h b/include/receive_arbiter.h index 1f354ac2..1e4b7610 100644 --- a/include/receive_arbiter.h +++ b/include/receive_arbiter.h @@ -143,6 +143,9 @@ class receive_arbiter { /// the same transfer id that did not temporally overlap with the original ones. std::unordered_map m_transfers; + /// Cache for all transfer ids in m_transfers that are not unassigned_transfers. Bounds complexity of iterating to poll all transfer events. + std::vector m_active_transfers; + /// Initiates a new `region_request` for which the caller can construct events to await either the entire region or sub-regions. receive_arbiter_detail::stable_region_request& initiate_region_request( const transfer_id& trid, const region<3>& request, void* allocation, const box<3>& allocated_box, size_t elem_size); diff --git a/include/utils.h b/include/utils.h index 5b6c91a8..4794966b 100644 --- a/include/utils.h +++ b/include/utils.h @@ -21,12 +21,6 @@ namespace celerity::detail::utils { -/// Like std::move, but move-constructs the result so it does not reference the argument after returning. -template -T take(T& from) { - return std::move(from); -} - template bool isa(const P* p) { return dynamic_cast(p) != nullptr; diff --git a/src/receive_arbiter.cc b/src/receive_arbiter.cc index 3b817c23..c0f59825 100644 --- a/src/receive_arbiter.cc +++ b/src/receive_arbiter.cc @@ -117,12 +117,17 @@ receive_arbiter_detail::stable_region_request& receive_arbiter::initiate_region_ multi_region_transfer* mrt = nullptr; if(const auto entry = m_transfers.find(trid); entry != m_transfers.end()) { matchbox::match( - entry->second, // - [&](unassigned_transfer& ut) { mrt = &entry->second.emplace(elem_size, utils::take(ut.pilots)); }, + entry->second, + [&](unassigned_transfer& ut) { + auto pilots = std::move(ut.pilots); + mrt = &entry->second.emplace(elem_size, std::move(pilots)); + m_active_transfers.push_back(trid); + }, [&](multi_region_transfer& existing_mrt) { mrt = &existing_mrt; }, [&](gather_transfer& gt) { utils::panic("calling receive_arbiter::begin_receive on an active gather transfer"); }); } else { mrt = &m_transfers[trid].emplace(elem_size); + m_active_transfers.push_back(trid); } // Add a new region_request to the `mrt` (transfers have transfer_id granularity, but there might be multiple receives from independent range mappers @@ -194,19 +199,20 @@ async_event receive_arbiter::gather_receive(const transfer_id& trid, void* const // Otherwise, we insert the transfer as pending and wait for the first pilots to arrive. m_transfers.emplace(trid, gather_transfer{gr}); } + m_active_transfers.push_back(trid); return make_async_event(gr); } void receive_arbiter::poll_communicator() { // Try completing all pending payload sends / receives by polling their communicator events - for(auto entry = m_transfers.begin(); entry != m_transfers.end();) { - if(std::visit([](auto& transfer) { return transfer.do_complete(); }, entry->second)) { - entry = m_transfers.erase(entry); - } else { - ++entry; - } - } + std::erase_if(m_active_transfers, [&](const transfer_id& trid) { + const auto entry = m_transfers.find(trid); + assert(entry != m_transfers.end()); + const bool is_complete = std::visit([](auto& transfer) { return transfer.do_complete(); }, entry->second); + if(is_complete) { m_transfers.erase(entry); } + return is_complete; + }); for(const auto& pilot : m_comm->poll_inbound_pilots()) { if(const auto entry = m_transfers.find(pilot.message.transfer_id); entry != m_transfers.end()) { From 7b49f17f24997646de7af6988729281e7c997d20 Mon Sep 17 00:00:00 2001 From: Fabian Knorr Date: Tue, 10 Dec 2024 20:21:27 +0100 Subject: [PATCH 2/4] Refactor receive_arbiter control flow for legibility --- src/receive_arbiter.cc | 54 +++++++++++++++++++----------------------- 1 file changed, 25 insertions(+), 29 deletions(-) diff --git a/src/receive_arbiter.cc b/src/receive_arbiter.cc index c0f59825..46b662a3 100644 --- a/src/receive_arbiter.cc +++ b/src/receive_arbiter.cc @@ -64,30 +64,27 @@ class gather_receive_event final : public async_event_impl { }; bool region_request::do_complete() { - const auto complete_fragment = [&](const incoming_region_fragment& fragment) { + std::erase_if(incoming_fragments, [&](const incoming_region_fragment& fragment) { if(!fragment.communication.is_complete()) return false; incomplete_region = region_difference(incomplete_region, fragment.box); return true; - }; - incoming_fragments.erase(std::remove_if(incoming_fragments.begin(), incoming_fragments.end(), complete_fragment), incoming_fragments.end()); + }); assert(!incomplete_region.empty() || incoming_fragments.empty()); return incomplete_region.empty(); } bool multi_region_transfer::do_complete() { - const auto complete_request = [](stable_region_request& rr) { return rr->do_complete(); }; - active_requests.erase(std::remove_if(active_requests.begin(), active_requests.end(), complete_request), active_requests.end()); + std::erase_if(active_requests, [](stable_region_request& rr) { return rr->do_complete(); }); return active_requests.empty() && unassigned_pilots.empty(); } bool gather_request::do_complete() { - const auto complete_chunk = [&](const incoming_gather_chunk& chunk) { + std::erase_if(incoming_chunks, [&](const incoming_gather_chunk& chunk) { if(!chunk.communication.is_complete()) return false; assert(num_incomplete_chunks > 0); num_incomplete_chunks -= 1; return true; - }; - incoming_chunks.erase(std::remove_if(incoming_chunks.begin(), incoming_chunks.end(), complete_chunk), incoming_chunks.end()); + }); return num_incomplete_chunks == 0; } @@ -110,33 +107,33 @@ receive_arbiter::receive_arbiter(communicator& comm) : m_comm(&comm), m_num_node receive_arbiter::~receive_arbiter() { assert(std::uncaught_exceptions() > 0 || m_transfers.empty()); } receive_arbiter_detail::stable_region_request& receive_arbiter::initiate_region_request( - const transfer_id& trid, const region<3>& request, void* const allocation, const box<3>& allocated_box, const size_t elem_size) { + const transfer_id& trid, const region<3>& request, void* const allocation, const box<3>& allocated_box, const size_t elem_size) // +{ assert(allocated_box.covers(bounding_box(request))); // Ensure there is a multi_region_transfer present - if there is none, create it by consuming unassigned pilots - multi_region_transfer* mrt = nullptr; - if(const auto entry = m_transfers.find(trid); entry != m_transfers.end()) { - matchbox::match( - entry->second, - [&](unassigned_transfer& ut) { - auto pilots = std::move(ut.pilots); - mrt = &entry->second.emplace(elem_size, std::move(pilots)); - m_active_transfers.push_back(trid); - }, - [&](multi_region_transfer& existing_mrt) { mrt = &existing_mrt; }, - [&](gather_transfer& gt) { utils::panic("calling receive_arbiter::begin_receive on an active gather transfer"); }); - } else { - mrt = &m_transfers[trid].emplace(elem_size); - m_active_transfers.push_back(trid); - } + auto& transfer = m_transfers[trid]; // allow default-insert as unassigned_transfer + auto& mrt = matchbox::match( + transfer, + [&](unassigned_transfer& ut) -> multi_region_transfer& { + auto pilots = std::move(ut.pilots); + m_active_transfers.push_back(trid); + return transfer.emplace(elem_size, std::move(pilots)); + }, + [&](multi_region_transfer& existing_mrt) -> multi_region_transfer& { // + return existing_mrt; + }, + [&](gather_transfer& gt) -> multi_region_transfer& { // + utils::panic("calling receive_arbiter::begin_receive on an active gather transfer"); + }); // Add a new region_request to the `mrt` (transfers have transfer_id granularity, but there might be multiple receives from independent range mappers - assert(std::all_of(mrt->active_requests.begin(), mrt->active_requests.end(), + assert(std::all_of(mrt.active_requests.begin(), mrt.active_requests.end(), [&](const stable_region_request& rr) { return region_intersection(rr->incomplete_region, request).empty(); })); - auto& rr = mrt->active_requests.emplace_back(std::make_shared(request, allocation, allocated_box)); + auto& rr = mrt.active_requests.emplace_back(std::make_shared(request, allocation, allocated_box)); // If the new region_request matches any of the still-unassigned pilots associated with `mrt`, immediately initiate the appropriate payload-receives - const auto assign_pilot = [&](const inbound_pilot& pilot) { + std::erase_if(mrt.unassigned_pilots, [&](const inbound_pilot& pilot) { assert((region_intersection(rr->incomplete_region, pilot.message.box) != pilot.message.box) == region_intersection(rr->incomplete_region, pilot.message.box).empty()); if(region_intersection(rr->incomplete_region, pilot.message.box) == pilot.message.box) { @@ -144,8 +141,7 @@ receive_arbiter_detail::stable_region_request& receive_arbiter::initiate_region_ return true; } return false; - }; - mrt->unassigned_pilots.erase(std::remove_if(mrt->unassigned_pilots.begin(), mrt->unassigned_pilots.end(), assign_pilot), mrt->unassigned_pilots.end()); + }); return rr; } From 3c867e330fb54ebbdf0aa5b963d2598362064ced Mon Sep 17 00:00:00 2001 From: Fabian Knorr Date: Wed, 11 Dec 2024 09:45:28 +0100 Subject: [PATCH 3/4] Add receive_arbiter fast-path for polling incomplete regions requests --- include/receive_arbiter.h | 12 ++++++++---- src/receive_arbiter.cc | 25 ++++++++++++++++--------- test/receive_arbiter_tests.cc | 8 +++++--- 3 files changed, 29 insertions(+), 16 deletions(-) diff --git a/include/receive_arbiter.h b/include/receive_arbiter.h index 1e4b7610..166e4f39 100644 --- a/include/receive_arbiter.h +++ b/include/receive_arbiter.h @@ -20,6 +20,8 @@ namespace celerity::detail::receive_arbiter_detail { struct incoming_region_fragment { detail::box<3> box; async_event communication; ///< async communicator event for receiving this fragment + + bool is_complete() const { return communication.is_complete(); } }; /// State for a single incomplete `receive` operation or a `begin_split_receive` / `await_split_receive_subregion` tree. @@ -28,9 +30,11 @@ struct region_request { box<3> allocated_box; region<3> incomplete_region; std::vector incoming_fragments; + bool may_await_subregion; - region_request(region<3> requested_region, void* const allocation, const box<3>& allocated_bounding_box) - : allocation(allocation), allocated_box(allocated_bounding_box), incomplete_region(std::move(requested_region)) {} + region_request(region<3> requested_region, void* const allocation, const box<3>& allocated_bounding_box, const bool may_await_subregion) + : allocation(allocation), allocated_box(allocated_bounding_box), incomplete_region(std::move(requested_region)), + may_await_subregion(may_await_subregion) {} bool do_complete(); }; @@ -146,9 +150,9 @@ class receive_arbiter { /// Cache for all transfer ids in m_transfers that are not unassigned_transfers. Bounds complexity of iterating to poll all transfer events. std::vector m_active_transfers; - /// Initiates a new `region_request` for which the caller can construct events to await either the entire region or sub-regions. + /// Initiates a new `region_request` for which the caller can construct events to await either the entire region or sub-regions (may_await_subregion = true). receive_arbiter_detail::stable_region_request& initiate_region_request( - const transfer_id& trid, const region<3>& request, void* allocation, const box<3>& allocated_box, size_t elem_size); + const transfer_id& trid, const region<3>& request, void* allocation, const box<3>& allocated_box, size_t elem_size, bool may_await_subregion); /// Updates the state of an active `region_request` from receiving an inbound pilot. void handle_region_request_pilot(receive_arbiter_detail::region_request& rr, const inbound_pilot& pilot, size_t elem_size); diff --git a/src/receive_arbiter.cc b/src/receive_arbiter.cc index 46b662a3..61ba2891 100644 --- a/src/receive_arbiter.cc +++ b/src/receive_arbiter.cc @@ -64,6 +64,9 @@ class gather_receive_event final : public async_event_impl { }; bool region_request::do_complete() { + // Fast path: Avoid polling the entire fragment set when we know that neither the request as a whole nor any subregion-request will complete at this time + if(!may_await_subregion && !incoming_fragments.empty() && !incoming_fragments.front().communication.is_complete()) return false; + std::erase_if(incoming_fragments, [&](const incoming_region_fragment& fragment) { if(!fragment.communication.is_complete()) return false; incomplete_region = region_difference(incomplete_region, fragment.box); @@ -106,8 +109,8 @@ receive_arbiter::receive_arbiter(communicator& comm) : m_comm(&comm), m_num_node receive_arbiter::~receive_arbiter() { assert(std::uncaught_exceptions() > 0 || m_transfers.empty()); } -receive_arbiter_detail::stable_region_request& receive_arbiter::initiate_region_request( - const transfer_id& trid, const region<3>& request, void* const allocation, const box<3>& allocated_box, const size_t elem_size) // +receive_arbiter_detail::stable_region_request& receive_arbiter::initiate_region_request(const transfer_id& trid, const region<3>& request, + void* const allocation, const box<3>& allocated_box, const size_t elem_size, const bool may_await_subregion) // { assert(allocated_box.covers(bounding_box(request))); @@ -130,7 +133,7 @@ receive_arbiter_detail::stable_region_request& receive_arbiter::initiate_region_ // Add a new region_request to the `mrt` (transfers have transfer_id granularity, but there might be multiple receives from independent range mappers assert(std::all_of(mrt.active_requests.begin(), mrt.active_requests.end(), [&](const stable_region_request& rr) { return region_intersection(rr->incomplete_region, request).empty(); })); - auto& rr = mrt.active_requests.emplace_back(std::make_shared(request, allocation, allocated_box)); + auto& rr = mrt.active_requests.emplace_back(std::make_shared(request, allocation, allocated_box, may_await_subregion)); // If the new region_request matches any of the still-unassigned pilots associated with `mrt`, immediately initiate the appropriate payload-receives std::erase_if(mrt.unassigned_pilots, [&](const inbound_pilot& pilot) { @@ -148,7 +151,7 @@ receive_arbiter_detail::stable_region_request& receive_arbiter::initiate_region_ void receive_arbiter::begin_split_receive( const transfer_id& trid, const region<3>& request, void* const allocation, const box<3>& allocated_box, const size_t elem_size) { - initiate_region_request(trid, request, allocation, allocated_box, elem_size); + initiate_region_request(trid, request, allocation, allocated_box, elem_size, true /* may_await_subregion */); } async_event receive_arbiter::await_split_receive_subregion(const transfer_id& trid, const region<3>& subregion) { @@ -169,16 +172,20 @@ async_event receive_arbiter::await_split_receive_subregion(const transfer_id& tr #endif // If the transfer (by transfer_id) as a whole has not completed yet but the subregion is, this "await" also completes immediately. - const auto req_it = std::find_if(mrt.active_requests.begin(), mrt.active_requests.end(), + const auto rr_it = std::find_if(mrt.active_requests.begin(), mrt.active_requests.end(), [&](const stable_region_request& rr) { return !region_intersection(rr->incomplete_region, subregion).empty(); }); - if(req_it == mrt.active_requests.end()) { return make_complete_event(); } + if(rr_it == mrt.active_requests.end()) { return make_complete_event(); } - return make_async_event(*req_it, subregion); + auto& rr = *rr_it; + assert(rr->may_await_subregion && "attempting await_split_receive_subregion() on region that was not initiated with begin_split_receive()"); + return make_async_event(rr, subregion); } async_event receive_arbiter::receive( - const transfer_id& trid, const region<3>& request, void* const allocation, const box<3>& allocated_box, const size_t elem_size) { - return make_async_event(initiate_region_request(trid, request, allocation, allocated_box, elem_size)); + const transfer_id& trid, const region<3>& request, void* const allocation, const box<3>& allocated_box, const size_t elem_size) // +{ + auto& rr = initiate_region_request(trid, request, allocation, allocated_box, elem_size, false /* may_await_subregion */); + return make_async_event(rr); } async_event receive_arbiter::gather_receive(const transfer_id& trid, void* const allocation, const size_t node_chunk_size) { diff --git a/test/receive_arbiter_tests.cc b/test/receive_arbiter_tests.cc index a36da571..8fa40f54 100644 --- a/test/receive_arbiter_tests.cc +++ b/test/receive_arbiter_tests.cc @@ -219,9 +219,11 @@ TEST_CASE("receive_arbiter aggregates receives from multiple incoming fragments" REQUIRE(receive.has_value()); CHECK(receive->is_complete()); - // it is legal to `await` a transfer that has already been completed and is not tracked by the receive_arbiter anymore - CHECK(ra.await_split_receive_subregion(trid, requested_regions[0]).is_complete()); - CHECK(ra.await_split_receive_subregion(trid, incoming_fragments[0]).is_complete()); + if(receive_method == "split_await") { + // it is legal to `await` a transfer that has already been completed and is not tracked by the receive_arbiter anymore + CHECK(ra.await_split_receive_subregion(trid, requested_regions[0]).is_complete()); + CHECK(ra.await_split_receive_subregion(trid, incoming_fragments[0]).is_complete()); + } std::vector expected_allocation(alloc_box.get_range().size()); for(size_t which = 0; which < incoming_fragments.size(); ++which) { From 65604a0359b85531f323f699b40a82e94cf36e50 Mon Sep 17 00:00:00 2001 From: Fabian Knorr Date: Thu, 12 Dec 2024 21:53:31 +0100 Subject: [PATCH 4/4] Avoid incomplete_region update during recv arbiter poll --- include/receive_arbiter.h | 3 ++- src/receive_arbiter.cc | 7 ++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/include/receive_arbiter.h b/include/receive_arbiter.h index 166e4f39..df6adfa1 100644 --- a/include/receive_arbiter.h +++ b/include/receive_arbiter.h @@ -29,11 +29,12 @@ struct region_request { void* allocation; box<3> allocated_box; region<3> incomplete_region; + size_t incomplete_area; std::vector incoming_fragments; bool may_await_subregion; region_request(region<3> requested_region, void* const allocation, const box<3>& allocated_bounding_box, const bool may_await_subregion) - : allocation(allocation), allocated_box(allocated_bounding_box), incomplete_region(std::move(requested_region)), + : allocation(allocation), allocated_box(allocated_bounding_box), incomplete_region(std::move(requested_region)), incomplete_area(incomplete_region.get_area()), may_await_subregion(may_await_subregion) {} bool do_complete(); }; diff --git a/src/receive_arbiter.cc b/src/receive_arbiter.cc index 61ba2891..09e8dd39 100644 --- a/src/receive_arbiter.cc +++ b/src/receive_arbiter.cc @@ -69,11 +69,12 @@ bool region_request::do_complete() { std::erase_if(incoming_fragments, [&](const incoming_region_fragment& fragment) { if(!fragment.communication.is_complete()) return false; - incomplete_region = region_difference(incomplete_region, fragment.box); + if (may_await_subregion) { incomplete_region = region_difference(incomplete_region, fragment.box); } + incomplete_area -= fragment.box.get_area(); return true; }); - assert(!incomplete_region.empty() || incoming_fragments.empty()); - return incomplete_region.empty(); + assert(incomplete_area > 0 || incoming_fragments.empty()); + return incomplete_area == 0; } bool multi_region_transfer::do_complete() {