From ccd5afd39cddf6eeb93d45eb682a066469cdef9b Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Sun, 16 Jun 2019 22:04:46 -0700 Subject: [PATCH 01/20] #410: epoch: change unused InsertEpoch to DependentEpoch --- src/vt/epoch/epoch.h | 2 +- tests/unit/epoch/test_epoch.nompi.cc | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/vt/epoch/epoch.h b/src/vt/epoch/epoch.h index f3a84d9e94..7745b1f95f 100644 --- a/src/vt/epoch/epoch.h +++ b/src/vt/epoch/epoch.h @@ -115,7 +115,7 @@ static constexpr BitCountType const epoch_category_num_bits = 2; */ enum struct eEpochCategory : int8_t { NoCategoryEpoch = 0x0, - InsertEpoch = 0x1, + DependentEpoch = 0x1, DijkstraScholtenEpoch = 0x2 }; diff --git a/tests/unit/epoch/test_epoch.nompi.cc b/tests/unit/epoch/test_epoch.nompi.cc index e7f7a2226f..3c1f2ca5be 100644 --- a/tests/unit/epoch/test_epoch.nompi.cc +++ b/tests/unit/epoch/test_epoch.nompi.cc @@ -96,7 +96,7 @@ TEST_P(TestEpochParam, basic_test_epoch_category_1) { auto const start_seq = GetParam(); auto epoch = epoch::EpochManip::generateEpoch( false, uninitialized_destination, - epoch::eEpochCategory::InsertEpoch + epoch::eEpochCategory::DependentEpoch ); epoch::EpochManip::setSeq(epoch, start_seq); auto const is_rooted = epoch::EpochManip::isRooted(epoch); @@ -105,14 +105,14 @@ TEST_P(TestEpochParam, basic_test_epoch_category_1) { EXPECT_TRUE(!is_rooted); EXPECT_EQ(get_seq, start_seq); - EXPECT_EQ(cat, epoch::eEpochCategory::InsertEpoch); + EXPECT_EQ(cat, epoch::eEpochCategory::DependentEpoch); } TEST_P(TestEpochParam, basic_test_epoch_all_1) { auto const& n = 48; auto const start_seq = GetParam(); auto epoch = epoch::EpochManip::generateEpoch( - true, n, epoch::eEpochCategory::InsertEpoch + true, n, epoch::eEpochCategory::DependentEpoch ); epoch::EpochManip::setSeq(epoch, start_seq); auto const is_rooted = epoch::EpochManip::isRooted(epoch); @@ -123,7 +123,7 @@ TEST_P(TestEpochParam, basic_test_epoch_all_1) { EXPECT_TRUE(is_rooted); EXPECT_EQ(get_seq, start_seq); EXPECT_EQ(ep_node, n); - EXPECT_EQ(cat, epoch::eEpochCategory::InsertEpoch); + EXPECT_EQ(cat, epoch::eEpochCategory::DependentEpoch); } INSTANTIATE_TEST_SUITE_P( From b3934694ed70950c5fcd750564510f7a3d93b195 Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Wed, 19 Jun 2019 16:34:18 -0700 Subject: [PATCH 02/20] #410: epoch: add function to bit-combine epoch category bits --- src/vt/epoch/epoch_manip.cc | 8 ++++++++ src/vt/epoch/epoch_manip.h | 8 ++++++++ 2 files changed, 16 insertions(+) diff --git a/src/vt/epoch/epoch_manip.cc b/src/vt/epoch/epoch_manip.cc index 779286c6f1..0ddc328d10 100644 --- a/src/vt/epoch/epoch_manip.cc +++ b/src/vt/epoch/epoch_manip.cc @@ -190,6 +190,14 @@ void EpochManip::setCategory(EpochType& epoch, eEpochCategory const cat) { >(*epoch,cat); } +/*static*/ inline eEpochCategory EpochManip::makeCat( + eEpochCategory c1, eEpochCategory c2 +) { + using T = typename std::underlying_type::type; + auto ret = static_cast(c1) | static_cast(c2); + return static_cast(ret); +} + /*static*/ void EpochManip::setNode(EpochType& epoch, NodeType const node) { vtAssert(isRooted(epoch), "Must be rooted to manipulate the node"); diff --git a/src/vt/epoch/epoch_manip.h b/src/vt/epoch/epoch_manip.h index c27649b87d..e6488a26a7 100644 --- a/src/vt/epoch/epoch_manip.h +++ b/src/vt/epoch/epoch_manip.h @@ -152,6 +152,14 @@ struct EpochManip : runtime::component::Component { */ static void setSeq(EpochType& epoch, EpochType::ImplType const seq); + /** + * \brief Combine eEpochCategory elements + * + * \param[in] c1 category 1 + * \param[in] c2 category 2 + */ + static eEpochCategory makeCat(eEpochCategory c1, eEpochCategory c2); + /* * General (stateless) methods for creating a epoch with certain properties * based on a current sequence number From 9a1efe8bc36215564d843cc8891981e656a799e2 Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Thu, 28 Sep 2023 16:45:52 -0700 Subject: [PATCH 03/20] #410: termination: add isDep check --- src/vt/termination/termination.h | 13 +++++++++++-- src/vt/termination/termination.impl.h | 22 +++++++++++++++++----- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/src/vt/termination/termination.h b/src/vt/termination/termination.h index cc535f105a..0301ac43ba 100644 --- a/src/vt/termination/termination.h +++ b/src/vt/termination/termination.h @@ -190,7 +190,7 @@ struct TerminationDetector : * * \return whether it is rooted */ - bool isRooted(EpochType epoch); + static bool isRooted(EpochType epoch); /** * \brief Check if the algorithm behind an epoch is Dijkstra-Scholten parental @@ -200,7 +200,16 @@ struct TerminationDetector : * * \return whether is it DS */ - bool isDS(EpochType epoch); + static bool isDS(EpochType epoch); + + /** + * \brief Check if the an epoch is a dependent epoch + * + * \param[in] epoch the epoch to check + * + * \return whether is it dependent + */ + static bool isDep(EpochType epoch); /** * \internal \brief Get or create the DS terminator for an epoch diff --git a/src/vt/termination/termination.impl.h b/src/vt/termination/termination.impl.h index 219ed4c44c..d2f246330c 100644 --- a/src/vt/termination/termination.impl.h +++ b/src/vt/termination/termination.impl.h @@ -67,22 +67,34 @@ inline void TerminationDetector::consume( return produceConsume(in_epoch, num_units, false, node); } -inline bool TerminationDetector::isRooted(EpochType epoch) { +/*static*/ inline bool TerminationDetector::isRooted(EpochType epoch) { bool const is_sentinel = epoch == any_epoch_sentinel or epoch == no_epoch; return is_sentinel ? false : epoch::EpochManip::isRooted(epoch); } -inline bool TerminationDetector::isDS(EpochType epoch) { +/*static*/ inline bool TerminationDetector::isDS(EpochType epoch) { + using T = typename std::underlying_type::type; if (isRooted(epoch)) { - auto const ds_epoch = epoch::eEpochCategory::DijkstraScholtenEpoch; - auto const epoch_category = epoch::EpochManip::category(epoch); - auto const is_ds = epoch_category == ds_epoch; + auto const ds_bit = epoch::eEpochCategory::DijkstraScholtenEpoch; + auto const cat = epoch::EpochManip::category(epoch); + bool const is_ds = static_cast(cat) & static_cast(ds_bit); return is_ds; } else { return false; } } +/*static*/ inline bool TerminationDetector::isDep(EpochType epoch) { + using T = typename std::underlying_type::type; + if (epoch == no_epoch or epoch == term::any_epoch_sentinel) { + return false; + } + auto const dep_bit = epoch::eEpochCategory::DependentEpoch; + auto const cat = epoch::EpochManip::category(epoch); + bool const is_dep = static_cast(cat) & static_cast(dep_bit); + return is_dep; +} + inline void TerminationDetector::produceConsumeState( TermStateType& state, TermCounterType const num_units, bool produce ) { From 72fbfec689236d95fdf7ae7e64858dae63848304 Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Wed, 19 Jun 2019 20:55:52 -0700 Subject: [PATCH 04/20] #410: term: implement dependent epochs --- src/vt/epoch/epoch_manip.cc | 25 +++- src/vt/epoch/epoch_manip.h | 18 +++ src/vt/messaging/active.cc | 29 ++++- src/vt/messaging/active.h | 12 +- src/vt/termination/termination.cc | 158 +++++++++++++++++++++++--- src/vt/termination/termination.h | 91 +++++++++++++-- src/vt/termination/termination.impl.h | 19 +--- 7 files changed, 299 insertions(+), 53 deletions(-) diff --git a/src/vt/epoch/epoch_manip.cc b/src/vt/epoch/epoch_manip.cc index 0ddc328d10..1f138d984e 100644 --- a/src/vt/epoch/epoch_manip.cc +++ b/src/vt/epoch/epoch_manip.cc @@ -148,6 +148,29 @@ EpochWindow* EpochManip::getTerminatedWindow(EpochType epoch) { return BitPackerType::boolGetField(*epoch); } +/*static*/ bool EpochManip::isDS(EpochType epoch) { + using T = typename std::underlying_type::type; + if (isRooted(epoch)) { + auto const ds_bit = epoch::eEpochCategory::DijkstraScholtenEpoch; + auto const cat = category(epoch); + bool const is_ds = static_cast(cat) & static_cast(ds_bit); + return is_ds; + } else { + return false; + } +} + +/*static*/ bool EpochManip::isDep(EpochType epoch) { + using T = typename std::underlying_type::type; + if (epoch == no_epoch or epoch == term::any_epoch_sentinel) { + return false; + } + auto const dep_bit = epoch::eEpochCategory::DependentEpoch; + auto const cat = epoch::EpochManip::category(epoch); + bool const is_dep = static_cast(cat) & static_cast(dep_bit); + return is_dep; +} + /*static*/ eEpochCategory EpochManip::category(EpochType const& epoch) { return BitPackerType::getField< eEpochRoot::rEpochCategory, epoch_category_num_bits, eEpochCategory @@ -190,7 +213,7 @@ void EpochManip::setCategory(EpochType& epoch, eEpochCategory const cat) { >(*epoch,cat); } -/*static*/ inline eEpochCategory EpochManip::makeCat( +/*static*/ eEpochCategory EpochManip::makeCat( eEpochCategory c1, eEpochCategory c2 ) { using T = typename std::underlying_type::type; diff --git a/src/vt/epoch/epoch_manip.h b/src/vt/epoch/epoch_manip.h index e6488a26a7..d7092ace85 100644 --- a/src/vt/epoch/epoch_manip.h +++ b/src/vt/epoch/epoch_manip.h @@ -89,6 +89,24 @@ struct EpochManip : runtime::component::Component { */ static bool isRooted(EpochType const& epoch); + /** + * \brief Gets whether an epoch is DS or onot + * + * \param[in] epoch the epoch + * + * \return whether it is DS + */ + static bool isDS(EpochType epoch); + + /** + * \brief Gets whether an epoch is dependent or onot + * + * \param[in] epoch the epoch + * + * \return whether it is dependent + */ + static bool isDep(EpochType epoch); + /** * \brief Gets the \c eEpochCategory of a given epoch * diff --git a/src/vt/messaging/active.cc b/src/vt/messaging/active.cc index a0eca38400..cdfcd4e8d2 100644 --- a/src/vt/messaging/active.cc +++ b/src/vt/messaging/active.cc @@ -934,12 +934,13 @@ void ActiveMessenger::prepareActiveMsgToRun( using MsgType = ShortMessage; auto msg = base.to().get(); - auto const is_term = envelopeIsTerm(msg->env); - auto const is_bcast = envelopeIsBcast(msg->env); - auto const dest = envelopeGetDest(msg->env); - auto const handler = envelopeGetHandler(msg->env); - auto const epoch = envelopeIsEpochType(msg->env) ? + auto const is_term = envelopeIsTerm(msg->env); + auto const is_bcast = envelopeIsBcast(msg->env); + auto const dest = envelopeGetDest(msg->env); + auto const handler = envelopeGetHandler(msg->env); + auto const epoch = envelopeIsEpochType(msg->env) ? envelopeGetEpoch(msg->env) : term::any_epoch_sentinel; + auto const from_node = is_bcast ? dest : in_from_node; if (!is_term || vt_check_enabled(print_term_msgs)) { @@ -963,6 +964,13 @@ void ActiveMessenger::prepareActiveMsgToRun( if (is_obj) { objgroup::dispatchObjGroup(base, handler, from_node, cont); } else { + if (epoch != term::any_epoch_sentinel and epoch::EpochManip::isDep(epoch)) { + if (not theTerm()->epochReleased(epoch)) { + pending_epoch_msgs_[epoch].emplace_back(base, from_node); + return; + } + } + runnable::makeRunnable(base, not is_term, handler, from_node) .withContinuation(cont) .withTDEpochFromMsg(is_term) @@ -981,6 +989,17 @@ void ActiveMessenger::prepareActiveMsgToRun( } } +void ActiveMessenger::releaseEpochMsgs(EpochType epoch) { + auto iter = pending_epoch_msgs_.find(epoch); + if (iter != pending_epoch_msgs_.end()) { + auto msgs = std::move(iter->second); + pending_epoch_msgs_.erase(iter); + for (auto&& m : msgs) { + prepareActiveMsgToRun(m.buffered_msg, m.from_node, true, m.cont); + } + } +} + bool ActiveMessenger::tryProcessIncomingActiveMsg() { CountType num_probe_bytes; MPI_Status stat; diff --git a/src/vt/messaging/active.h b/src/vt/messaging/active.h index 9901166fa3..ad1c8f577d 100644 --- a/src/vt/messaging/active.h +++ b/src/vt/messaging/active.h @@ -268,7 +268,7 @@ struct BufferedActiveMsg { BufferedActiveMsg( MessageType const& in_buffered_msg, NodeType const& in_from_node, - ActionType in_cont + ActionType in_cont = nullptr ) : buffered_msg(in_buffered_msg), from_node(in_from_node), cont(in_cont) { } @@ -327,6 +327,8 @@ struct ActiveMessenger : runtime::component::PollableComponent using SendFnType = std::function; using UserSendFnType = std::function; using ContainerPendingType = std::unordered_map; + using MsgContType = std::list; + using EpochWaitType = std::unordered_map; using ReadyHanTagType = std::tuple; using HandlerManagerType = HandlerManager; using PendingSendType = PendingSend; @@ -1650,6 +1652,13 @@ struct ActiveMessenger : runtime::component::PollableComponent # endif } + /* + * \brief Deliver messages that are now released with a dependent epoch + * + * \param[in] epoch the epoch to release + */ + void releaseEpochMsgs(EpochType epoch); + private: /** * \internal \brief Allocate a new, unused tag. @@ -1764,6 +1773,7 @@ struct ActiveMessenger : runtime::component::PollableComponent elm::ElementIDStruct bare_handler_dummy_elm_id_for_lb_data_ = {}; elm::ElementLBData bare_handler_lb_data_; MPI_Comm comm_ = MPI_COMM_NULL; + EpochWaitType pending_epoch_msgs_ = {}; }; }} // end namespace vt::messaging diff --git a/src/vt/termination/termination.cc b/src/vt/termination/termination.cc index 1798bad28f..1d11595d30 100644 --- a/src/vt/termination/termination.cc +++ b/src/vt/termination/termination.cc @@ -931,10 +931,12 @@ void TerminationDetector::finishedEpoch(EpochType const& epoch) { } EpochType TerminationDetector::makeEpochRootedWave( - ParentEpochCapture successor, std::string const& label + ParentEpochCapture successor, std::string const& label, bool is_dep ) { - auto const no_cat = epoch::eEpochCategory::NoCategoryEpoch; - auto const epoch = theEpoch()->getNextRootedEpoch(no_cat); + auto const cat = is_dep ? + epoch::eEpochCategory::DependentEpoch : + epoch::eEpochCategory::NoCategoryEpoch; + auto const epoch = theEpoch()->getNextRootedEpoch(cat); initializeRootedWaveEpoch(epoch, successor, label); return epoch; @@ -970,10 +972,16 @@ void TerminationDetector::initializeRootedWaveEpoch( } EpochType TerminationDetector::makeEpochRootedDS( - ParentEpochCapture successor, std::string const& label + ParentEpochCapture successor, std::string const& label, bool is_dep ) { - auto const ds_cat = epoch::eEpochCategory::DijkstraScholtenEpoch; - auto const epoch = theEpoch()->getNextRootedEpoch(ds_cat); + auto cat = epoch::eEpochCategory::DijkstraScholtenEpoch; + if (is_dep) { + cat = theEpoch()->makeCat( + epoch::eEpochCategory::DependentEpoch, + epoch::eEpochCategory::DijkstraScholtenEpoch + ); + } + auto const epoch = theEpoch()->getNextRootedEpoch(cat); initializeRootedDSEpoch(epoch, successor, label); return epoch; } @@ -1002,13 +1010,14 @@ void TerminationDetector::initializeRootedDSEpoch( } EpochType TerminationDetector::makeEpochRooted( - UseDS use_ds, ParentEpochCapture successor + UseDS use_ds, ParentEpochCapture successor, bool is_dep ) { - return makeEpochRooted("", use_ds, successor); + return makeEpochRooted("", use_ds, successor, is_dep); } EpochType TerminationDetector::makeEpochRooted( - std::string const& label, UseDS use_ds, ParentEpochCapture successor + std::string const& label, UseDS use_ds, ParentEpochCapture successor, + bool is_dep ) { /* * This method should only be called by the root node for the rooted epoch @@ -1029,9 +1038,9 @@ EpochType TerminationDetector::makeEpochRooted( vtAssertExpr(not (force_use_ds and force_use_wave)); if ((use_ds or force_use_ds) and not force_use_wave) { - return makeEpochRootedDS(successor, label); + return makeEpochRootedDS(successor, label, is_dep); } else { - return makeEpochRootedWave(successor, label); + return makeEpochRootedWave(successor, label, is_dep); } } @@ -1047,20 +1056,23 @@ void TerminationDetector::initializeRootedEpoch( } EpochType TerminationDetector::makeEpochCollective( - ParentEpochCapture successor + ParentEpochCapture successor, bool is_dep ) { vt_debug_print( normal, term, "makeEpochCollective: no label\n" ); - return makeEpochCollective("", successor); + return makeEpochCollective("", successor, is_dep); } EpochType TerminationDetector::makeEpochCollective( - std::string const& label, ParentEpochCapture successor + std::string const& label, ParentEpochCapture successor, bool is_dep ) { - auto const epoch = theEpoch()->getNextCollectiveEpoch(); + auto const cat = is_dep ? + epoch::eEpochCategory::DependentEpoch : + epoch::eEpochCategory::NoCategoryEpoch; + auto const epoch = theEpoch()->getNextCollectiveEpoch(cat); initializeCollectiveEpoch(epoch, label, successor); return epoch; } @@ -1105,11 +1117,121 @@ void TerminationDetector::initializeCollectiveEpoch( EpochType TerminationDetector::makeEpoch( std::string const& label, bool is_coll, UseDS use_ds, - ParentEpochCapture successor + ParentEpochCapture successor, bool is_dep ) { return is_coll ? - makeEpochCollective(label, successor) : - makeEpochRooted(label, use_ds, successor); + makeEpochCollective(label, successor, is_dep) : + makeEpochRooted(label, use_ds, successor, is_dep); +} + +void TerminationDetector::releaseEpoch(EpochType epoch) { + bool const is_dep = isDep(epoch); + + if (is_dep) { + // Put the epoch in the released set, which is not conclusive due to + // dependencies, which effects the status. An epoch is *released* iff the + // epoch is in the released set and all succesrros are *released* (or there + // are no successors). The epoch any_epoch_sentinel does not count as a + // succcessor. + epoch_released_.insert(epoch); + + bool const is_released = epochReleased(epoch); + if (is_released) { + runReleaseEpochActions(epoch); + } else { + // Enqueue continuations to potentially release this epoch since the + // successor graph is not inverted (one-way knowledge) + auto const& successors = getEpochDep(epoch)->getSuccessors(); + vtAssert(successors.size() > 0, "Must have unreleased successors in this case"); + for (auto&& suc : successors) { + if (not epochReleased(suc)) { + onReleaseEpoch(suc, [epoch]{ theTerm()->releaseEpoch(epoch); }); + } + } + } + } else { + // The user might have made a mistake if they are trying to release an epoch + // that is released-by-default (not dependent) + vtWarn("Trying to release non-dependent epoch"); + } +} + +void TerminationDetector::runReleaseEpochActions(EpochType epoch) { + auto iter = epoch_release_action_.find(epoch); + if (iter != epoch_release_action_.end()) { + auto actions = std::move(iter->second); + epoch_release_action_.erase(iter); + for (auto&& fn : actions) { + fn(); + } + } + theMsg()->releaseEpochMsgs(epoch); +} + +void TerminationDetector::onReleaseEpoch(EpochType epoch, ActionType action) { + // Run an action if an epoch has been released + bool const is_dep = isDep(epoch); + if (not is_dep or (is_dep and epochReleased(epoch))) { + action(); + } else { + epoch_release_action_[epoch].push_back(action); + } +} + +bool TerminationDetector::epochSuccessorsReleased(EpochType epoch) { + // Test of all parents of a given epoch are released + bool released = true; + auto const& successors = getEpochDep(epoch)->getSuccessors(); + if (successors.size() != 0) { + for (auto&& suc : successors) { + released &= epochReleased(suc); + } + } + return released; +} + +bool TerminationDetector::epochReleased(EpochType epoch) { + // Because of case (2), ignore dep <- no-dep because this should not be called + // unless dep is released + bool const is_dep = isDep(epoch); + if (not is_dep) { + return true; + } + + // Terminated epochs are always released + bool const is_term = theEpoch()->getTerminatedWindow(epoch)->isTerminated( + epoch + ); + if (is_term) { + return true; + } + + // All successors must be released for an epoch to be released even if its in + // the release set. Epochs are put in the release set early as to reduce + // tracking of epoch "release chains" + bool const is_successors_released = epochSuccessorsReleased(epoch); + if (not is_successors_released) { + return false; + } + + // Check the release set + auto iter = epoch_released_.find(epoch); + return iter != epoch_released_.end(); +} + +void TerminationDetector::cleanupReleasedEpoch(EpochType epoch) { + bool const is_dep = isDep(epoch); + if (is_dep) { + bool const is_term = theEpoch()->getTerminatedWindow(epoch)->isTerminated( + epoch + ); + if (is_term) { + auto iter = epoch_released_.find(epoch); + if (iter != epoch_released_.end()) { + epoch_released_.erase(iter); + } + } + } } void TerminationDetector::activateEpoch(EpochType const& epoch) { diff --git a/src/vt/termination/termination.h b/src/vt/termination/termination.h index 0301ac43ba..1e94a6d09c 100644 --- a/src/vt/termination/termination.h +++ b/src/vt/termination/termination.h @@ -243,23 +243,27 @@ struct TerminationDetector : * * \param[in] use_ds whether to use the Dijkstra-Scholten algorithm * \param[in] parent parent epoch that waits for this new epoch + * \param[in] is_dep whether it's a dependent epoch * * \return the new epoch */ EpochType makeEpochRooted( UseDS use_ds = UseDS{true}, - ParentEpochCapture parent = ParentEpochCapture{} + ParentEpochCapture parent = ParentEpochCapture{}, + bool is_dep = false ); /** * \brief Create a new collective epoch * * \param[in] parent parent epoch that waits for this new epoch + * \param[in] is_dep whether it's a dependent epoch * * \return the new epoch */ EpochType makeEpochCollective( - ParentEpochCapture parent = ParentEpochCapture{} + ParentEpochCapture parent = ParentEpochCapture{}, + bool is_dep = false ); /** @@ -268,13 +272,15 @@ struct TerminationDetector : * \param[in] label epoch label for debugging purposes * \param[in] use_ds whether to use the Dijkstra-Scholten algorithm * \param[in] parent parent epoch that waits for this new epoch + * \param[in] is_dep whether it's a dependent epoch * * \return the new epoch */ EpochType makeEpochRooted( std::string const& label, UseDS use_ds = UseDS{true}, - ParentEpochCapture parent = ParentEpochCapture{} + ParentEpochCapture parent = ParentEpochCapture{}, + bool is_dep = false ); /** @@ -282,12 +288,14 @@ struct TerminationDetector : * * \param[in] label epoch label for debugging purposes * \param[in] parent parent epoch that waits for this new epoch + * \param[in] is_dep whether it's a dependent epoch * * \return the new epoch */ EpochType makeEpochCollective( std::string const& label, - ParentEpochCapture parent = ParentEpochCapture{} + ParentEpochCapture parent = ParentEpochCapture{}, + bool is_dep = false ); /** @@ -297,6 +305,7 @@ struct TerminationDetector : * \param[in] is_coll whether to create a collective or rooted epoch * \param[in] use_ds whether to use the Dijkstra-Scholten algorithm * \param[in] parent parent epoch that waits for this new epoch + * \param[in] is_dep whether it's a dependent epoch * * \return the new epoch */ @@ -304,7 +313,8 @@ struct TerminationDetector : std::string const& label, bool is_coll, UseDS use_ds = UseDS{false}, - ParentEpochCapture parent = ParentEpochCapture{} + ParentEpochCapture parent = ParentEpochCapture{}, + bool is_dep = false ); /** @@ -327,6 +337,7 @@ struct TerminationDetector : * \param[in] label epoch label for debugging purposes * \param[in] use_ds whether to use the Dijkstra-Scholten algorithm * \param[in] parent parent epoch that waits for this new epoch + * \param[in] is_dep whether it's a dependent epoch */ void initializeRootedEpoch( EpochType const epoch, @@ -358,6 +369,54 @@ struct TerminationDetector : */ void finishNoActivateEpoch(EpochType const& epoch); + /** + * \brief Release a dependent epoch + * + * \param[in] epoch the epoch to release + */ + void releaseEpoch(EpochType epoch); + + /** + * \brief Action to run on release of a dependent epoch + * + * \param[in] epoch the epoch + * \param[in] action the action + */ + void onReleaseEpoch(EpochType epoch, ActionType action); + + /** + * \brief Test if an epoch is dependent and if it is, if that epoch has been released + * + * \param[in] epoch the epoch + * + * \return if it is released + */ + bool epochReleased(EpochType epoch); + +private: + /** + * \brief When an epoch is terminated, we know it's released so we can cleanup + * + * \param[in] epoch the epoch to cleanup + */ + void cleanupReleasedEpoch(EpochType epoch); + + /** + * \brief Run all actions when an epoch is released + * + * \param[in] epoch the epoch to run actions for + */ + void runReleaseEpochActions(EpochType epoch); + + /** + * \brief Check if all epoch successors are released + * + * \param[in] epoch the epoch to check + * + * \return whether they are all released + */ + bool epochSuccessorsReleased(EpochType epoch); + public: /* * Directly call into a specific type of rooted epoch, can not be overridden @@ -368,11 +427,13 @@ struct TerminationDetector : * * \param[in] parent parent epoch that waits for this new epoch * \param[in] label epoch label for debugging purposes + * \param[in] is_dep whether it's a dependent epoch * * \return the new epoch */ EpochType makeEpochRootedWave( - ParentEpochCapture parent, std::string const& label = "" + ParentEpochCapture parent, std::string const& label = "", + bool is_dep = false ); /** @@ -380,11 +441,13 @@ struct TerminationDetector : * * \param[in] parent parent epoch that waits for this new epoch * \param[in] label epoch label for debugging purposes + * \param[in] is_dep whether it's a dependent epoch * * \return the new epoch */ EpochType makeEpochRootedDS( - ParentEpochCapture parent, std::string const& label = "" + ParentEpochCapture parent, std::string const& label = "", + bool is_dep = false ); /** @@ -823,16 +886,22 @@ struct TerminationDetector : // hang detector termination state TermStateType hang_; private: + using ActionListType = std::list; + // epoch termination state - EpochContainerType epoch_state_ = {}; + EpochContainerType epoch_state_ = {}; // ready epoch list (misnomer: finishedEpoch was invoked) - std::unordered_set epoch_ready_ = {}; + std::unordered_set epoch_ready_ = {}; // list of remote epochs pending status report of finished - std::unordered_set epoch_wait_status_ = {}; + std::unordered_set epoch_wait_status_ = {}; // has printed epoch graph during abort - bool has_printed_epoch_graph = false; + bool has_printed_epoch_graph = false; NodeType this_node_ = uninitialized_destination; EpochStackType epoch_stack_; + // released epoch list for dependent epochs + std::unordered_set epoch_released_ = {}; + // release epoch action list for dependent epochs + std::unordered_map epoch_release_action_ = {}; }; }} // end namespace vt::term diff --git a/src/vt/termination/termination.impl.h b/src/vt/termination/termination.impl.h index d2f246330c..ff6f67d024 100644 --- a/src/vt/termination/termination.impl.h +++ b/src/vt/termination/termination.impl.h @@ -73,26 +73,11 @@ inline void TerminationDetector::consume( } /*static*/ inline bool TerminationDetector::isDS(EpochType epoch) { - using T = typename std::underlying_type::type; - if (isRooted(epoch)) { - auto const ds_bit = epoch::eEpochCategory::DijkstraScholtenEpoch; - auto const cat = epoch::EpochManip::category(epoch); - bool const is_ds = static_cast(cat) & static_cast(ds_bit); - return is_ds; - } else { - return false; - } + return epoch::EpochManip::isDS(epoch); } /*static*/ inline bool TerminationDetector::isDep(EpochType epoch) { - using T = typename std::underlying_type::type; - if (epoch == no_epoch or epoch == term::any_epoch_sentinel) { - return false; - } - auto const dep_bit = epoch::eEpochCategory::DependentEpoch; - auto const cat = epoch::EpochManip::category(epoch); - bool const is_dep = static_cast(cat) & static_cast(dep_bit); - return is_dep; + return epoch::EpochManip::isDep(epoch); } inline void TerminationDetector::produceConsumeState( From 5cd8599c750154cfb52efd0d5deba17f594c4baa Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Thu, 20 Jun 2019 13:35:50 -0700 Subject: [PATCH 05/20] #410: test: add release dependent epoch test --- .../termination/test_term_dep_epoch_active.cc | 129 ++++++++++++++++++ 1 file changed, 129 insertions(+) create mode 100644 tests/unit/termination/test_term_dep_epoch_active.cc diff --git a/tests/unit/termination/test_term_dep_epoch_active.cc b/tests/unit/termination/test_term_dep_epoch_active.cc new file mode 100644 index 0000000000..c0641d5949 --- /dev/null +++ b/tests/unit/termination/test_term_dep_epoch_active.cc @@ -0,0 +1,129 @@ +/* +//@HEADER +// ************************************************************************ +// +// test_term_dep_epoch_active.cc +// vt (Virtual Transport) +// Copyright (C) 2018 NTESS, LLC +// +// Under the terms of Contract DE-NA-0003525 with NTESS, LLC, +// the U.S. Government retains certain rights in this software. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// 1. Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the Corporation nor the names of the +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +// Questions? Contact darma@sandia.gov +// +// ************************************************************************ +//@HEADER +*/ + +#include + +#include "test_parallel_harness.h" +#include "data_message.h" + +#include "vt/transport.h" +#include "vt/messaging/collection_chain_set.h" + +namespace vt { namespace tests { namespace unit { + +using namespace vt; +using namespace vt::tests::unit; + +struct TestTermDepEpochActive : TestParallelHarness { }; + +struct TestMsg : vt::Message { }; + +struct TestDep { + static void depHandler(TestMsg* msg) { + //auto const& node = theContext()->getNode(); + num_dep++; + //fmt::print("{}: depHandler: num_dep={}\n", node, num_dep); + EXPECT_EQ(num_non_dep, 1); + } + + static void nonDepHandler(TestMsg* msg) { + //auto const& node = theContext()->getNode(); + num_non_dep++; + //fmt::print("{}: nonDepHandler: num_non_dep={}\n", node, num_non_dep); + EXPECT_EQ(num_dep, 0); + } + + static int num_dep; + static int num_non_dep; +}; + +/*static*/ int TestDep::num_dep = 0; +/*static*/ int TestDep::num_non_dep = 0; + +TEST_F(TestTermDepEpochActive, test_term_dep_epoch_active) { + auto const& this_node = theContext()->getNode(); + auto const& num_nodes = theContext()->getNumNodes(); + bool const bcast = true; + int const k = 10; + + TestDep::num_dep = 0; + TestDep::num_non_dep = 0; + vt::theCollective()->barrier(); + + auto epoch = vt::theTerm()->makeEpochCollectiveDep(); + vt::theMsg()->pushEpoch(epoch); + if (bcast) { + for (int i = 0; i < k; i++) { + auto msg = vt::makeSharedMessage(); + vt::theMsg()->broadcastMsg(msg); + } + } else { + } + vt::theMsg()->popEpoch(epoch); + vt::theTerm()->finishedEpoch(epoch); + + auto chain = std::make_unique>(); + chain->addIndex(this_node); + + chain->nextStep([=](NodeType node) { + auto const next = this_node + 1 < num_nodes ? this_node + 1 : 0; + auto msg = vt::makeSharedMessage(); + return vt::theMsg()->sendMsg(next,msg); + }); + + chain->nextStep([=](NodeType node) { + auto msg = vt::makeMessage(); + return vt::messaging::PendingSend(msg, [=](MsgVirtualPtr){ + EXPECT_EQ(TestDep::num_dep, 0); + theTerm()->releaseEpoch(epoch); + }); + }); + + vt::theTerm()->addAction([=]{ + EXPECT_EQ(TestDep::num_non_dep, 1); + EXPECT_EQ(TestDep::num_dep, (num_nodes - 1)*k); + }); +} + +}}} // end namespace vt::tests::unit From 10147f45c37b5d28df6c9dafe2a0d69b8f9b6eda Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Thu, 12 Oct 2023 13:56:58 -0700 Subject: [PATCH 06/20] #410: reduce: fix warning --- src/vt/collective/reduce/reduce_manager.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/vt/collective/reduce/reduce_manager.cc b/src/vt/collective/reduce/reduce_manager.cc index 9faa9783b4..bd2bba08fa 100644 --- a/src/vt/collective/reduce/reduce_manager.cc +++ b/src/vt/collective/reduce/reduce_manager.cc @@ -46,12 +46,12 @@ namespace vt { namespace collective { namespace reduce { +static std::unique_ptr makeReduceScope(detail::ReduceScope const& scope) { + return std::make_unique(scope); +} + ReduceManager::ReduceManager() - : reducers_( // default cons reducer for non-group - [](detail::ReduceScope const& scope) { - return std::make_unique(scope); - } - ) + : reducers_(makeReduceScope) { // insert the default reducer scope reducers_.make( From 1db25fa00d1a43bac09d8c6ee442ec84ac305c41 Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Thu, 12 Oct 2023 13:58:26 -0700 Subject: [PATCH 07/20] #410: epoch: add test, move pending epochs to scheduler --- src/vt/context/runnable_context/td.h | 7 ++ src/vt/epoch/epoch_manip.cc | 33 +++--- src/vt/epoch/epoch_manip.h | 12 ++- src/vt/messaging/active.cc | 20 +--- src/vt/messaging/active.h | 10 -- src/vt/runnable/runnable.cc | 8 ++ src/vt/runnable/runnable.h | 7 ++ src/vt/runtime/runtime.cc | 2 +- src/vt/runtime/runtime_banner.cc | 2 +- src/vt/scheduler/scheduler.cc | 11 ++ src/vt/scheduler/scheduler.h | 31 +++++- src/vt/scheduler/scheduler.impl.h | 101 +++++++++++++++--- src/vt/termination/dijkstra-scholten/comm.cc | 2 +- src/vt/termination/termination.cc | 6 +- src/vt/vrt/collection/manager.cc | 2 +- src/vt/vrt/collection/manager.impl.h | 2 +- .../termination/test_term_dep_epoch_active.cc | 31 ++---- 17 files changed, 188 insertions(+), 99 deletions(-) diff --git a/src/vt/context/runnable_context/td.h b/src/vt/context/runnable_context/td.h index f3ee0f49f3..3ab038e256 100644 --- a/src/vt/context/runnable_context/td.h +++ b/src/vt/context/runnable_context/td.h @@ -104,6 +104,13 @@ struct TD { */ void resume(); + /** + * \brief Get epoch for this context + * + * \return the epoch + */ + EpochType getEpoch() const { return ep_; } + private: EpochType ep_ = no_epoch; /**< The epoch for the task */ #if vt_check_enabled(fcontext) diff --git a/src/vt/epoch/epoch_manip.cc b/src/vt/epoch/epoch_manip.cc index 1f138d984e..19d717a433 100644 --- a/src/vt/epoch/epoch_manip.cc +++ b/src/vt/epoch/epoch_manip.cc @@ -55,12 +55,6 @@ namespace vt { namespace epoch { static EpochType const arch_epoch_coll = makeEpochZero(); -EpochManip::EpochManip() - : terminated_collective_epochs_( - std::make_unique(arch_epoch_coll) - ) -{ } - /*static*/ EpochType EpochManip::generateEpoch( bool const& is_rooted, NodeType const& root_node, eEpochCategory const& category @@ -123,22 +117,19 @@ EpochType EpochManip::getArchetype(EpochType epoch) const { EpochWindow* EpochManip::getTerminatedWindow(EpochType epoch) { auto const is_rooted = isRooted(epoch); - if (is_rooted and epoch != term::any_epoch_sentinel) { - auto const& arch_epoch = getArchetype(epoch); - auto iter = terminated_epochs_.find(arch_epoch); - if (iter == terminated_epochs_.end()) { - terminated_epochs_.emplace( - std::piecewise_construct, - std::forward_as_tuple(arch_epoch), - std::forward_as_tuple(std::make_unique(arch_epoch)) - ); - iter = terminated_epochs_.find(arch_epoch); - } - return iter->second.get(); - } else { - vtAssertExpr(terminated_collective_epochs_ != nullptr); - return terminated_collective_epochs_.get(); + auto& container = is_rooted and epoch != term::any_epoch_sentinel ? + terminated_epochs_ : terminated_collective_epochs_; + auto const& arch_epoch = getArchetype(epoch); + auto iter = container.find(arch_epoch); + if (iter == container.end()) { + container.emplace( + std::piecewise_construct, + std::forward_as_tuple(arch_epoch), + std::forward_as_tuple(std::make_unique(arch_epoch)) + ); + iter = container.find(arch_epoch); } + return iter->second.get(); } /*static*/ bool EpochManip::isRooted(EpochType const& epoch) { diff --git a/src/vt/epoch/epoch_manip.h b/src/vt/epoch/epoch_manip.h index d7092ace85..342f803da1 100644 --- a/src/vt/epoch/epoch_manip.h +++ b/src/vt/epoch/epoch_manip.h @@ -72,7 +72,7 @@ namespace vt { namespace epoch { struct EpochManip : runtime::component::Component { using CapturedContextType = term::ParentEpochCapture; - EpochManip(); + EpochManip() = default; std::string name() override { return "EpochManip"; } @@ -276,10 +276,12 @@ struct EpochManip : runtime::component::Component { } private: - // epoch window container for specific archetyped epochs - std::unordered_map> terminated_epochs_; - // epoch window for basic collective epochs - std::unique_ptr terminated_collective_epochs_ = nullptr; + /// epoch window container for specific archetyped epochs + std::unordered_map> terminated_epochs_; + /// epoch window for basic collective epochs + std::unordered_map< + EpochType, std::unique_ptr + > terminated_collective_epochs_; }; }} /* end namespace vt::epoch */ diff --git a/src/vt/messaging/active.cc b/src/vt/messaging/active.cc index cdfcd4e8d2..15effc9677 100644 --- a/src/vt/messaging/active.cc +++ b/src/vt/messaging/active.cc @@ -859,7 +859,7 @@ void ActiveMessenger::finishPendingDataMsgAsyncRecv(InProgressDataIRecv* irecv) theTerm()->consume(term::any_epoch_sentinel,1,sender); theTerm()->hangDetectRecv(); }; - theSched()->enqueue(irecv->priority, run); + theSched()->enqueueLambda(irecv->priority, run); } } @@ -964,13 +964,6 @@ void ActiveMessenger::prepareActiveMsgToRun( if (is_obj) { objgroup::dispatchObjGroup(base, handler, from_node, cont); } else { - if (epoch != term::any_epoch_sentinel and epoch::EpochManip::isDep(epoch)) { - if (not theTerm()->epochReleased(epoch)) { - pending_epoch_msgs_[epoch].emplace_back(base, from_node); - return; - } - } - runnable::makeRunnable(base, not is_term, handler, from_node) .withContinuation(cont) .withTDEpochFromMsg(is_term) @@ -989,17 +982,6 @@ void ActiveMessenger::prepareActiveMsgToRun( } } -void ActiveMessenger::releaseEpochMsgs(EpochType epoch) { - auto iter = pending_epoch_msgs_.find(epoch); - if (iter != pending_epoch_msgs_.end()) { - auto msgs = std::move(iter->second); - pending_epoch_msgs_.erase(iter); - for (auto&& m : msgs) { - prepareActiveMsgToRun(m.buffered_msg, m.from_node, true, m.cont); - } - } -} - bool ActiveMessenger::tryProcessIncomingActiveMsg() { CountType num_probe_bytes; MPI_Status stat; diff --git a/src/vt/messaging/active.h b/src/vt/messaging/active.h index ad1c8f577d..a85dcf4bd9 100644 --- a/src/vt/messaging/active.h +++ b/src/vt/messaging/active.h @@ -327,8 +327,6 @@ struct ActiveMessenger : runtime::component::PollableComponent using SendFnType = std::function; using UserSendFnType = std::function; using ContainerPendingType = std::unordered_map; - using MsgContType = std::list; - using EpochWaitType = std::unordered_map; using ReadyHanTagType = std::tuple; using HandlerManagerType = HandlerManager; using PendingSendType = PendingSend; @@ -1652,13 +1650,6 @@ struct ActiveMessenger : runtime::component::PollableComponent # endif } - /* - * \brief Deliver messages that are now released with a dependent epoch - * - * \param[in] epoch the epoch to release - */ - void releaseEpochMsgs(EpochType epoch); - private: /** * \internal \brief Allocate a new, unused tag. @@ -1773,7 +1764,6 @@ struct ActiveMessenger : runtime::component::PollableComponent elm::ElementIDStruct bare_handler_dummy_elm_id_for_lb_data_ = {}; elm::ElementLBData bare_handler_lb_data_; MPI_Comm comm_ = MPI_COMM_NULL; - EpochWaitType pending_epoch_msgs_ = {}; }; }} // end namespace vt::messaging diff --git a/src/vt/runnable/runnable.cc b/src/vt/runnable/runnable.cc index b260b46962..853da3f9ac 100644 --- a/src/vt/runnable/runnable.cc +++ b/src/vt/runnable/runnable.cc @@ -246,6 +246,14 @@ void RunnableNew::resume(TimeType time) { #endif } +EpochType RunnableNew::getEpoch() const { + if (contexts_.has_td) { + return contexts_.td.getEpoch(); + } else { + return no_epoch; + } +} + void RunnableNew::send(elm::ElementIDStruct elm, MsgSizeType bytes) { if (contexts_.has_lb) contexts_.lb.send(elm, bytes); } diff --git a/src/vt/runnable/runnable.h b/src/vt/runnable/runnable.h index 3151a4fe43..af1c8b793d 100644 --- a/src/vt/runnable/runnable.h +++ b/src/vt/runnable/runnable.h @@ -353,6 +353,13 @@ struct RunnableNew { */ static void operator delete(void* ptr); + /** + * \brief Get the epoch for a runnable + * + * \return the epoch + */ + EpochType getEpoch() const; + private: detail::Contexts contexts_; /**< The contexts */ MsgSharedPtr msg_ = nullptr; /**< The associated message */ diff --git a/src/vt/runtime/runtime.cc b/src/vt/runtime/runtime.cc index 7d73260004..5bfebb4d5e 100644 --- a/src/vt/runtime/runtime.cc +++ b/src/vt/runtime/runtime.cc @@ -444,7 +444,7 @@ bool Runtime::initialize(bool const force_now) { printStartupBanner(); // Enqueue a check for later in case arguments are modified before work // actually executes - theSched->enqueue([this]{ + theSched->enqueueLambda([this]{ this->checkForArgumentErrors(); }); diff --git a/src/vt/runtime/runtime_banner.cc b/src/vt/runtime/runtime_banner.cc index ee02c32387..e969710fe2 100644 --- a/src/vt/runtime/runtime_banner.cc +++ b/src/vt/runtime/runtime_banner.cc @@ -939,7 +939,7 @@ void Runtime::printStartupBanner() { // Enqueue a check for later in case arguments are modified before work // actually executes - theSched->enqueue([this]{ + theSched->enqueueLambda([this]{ this->checkForArgumentErrors(); }); } diff --git a/src/vt/scheduler/scheduler.cc b/src/vt/scheduler/scheduler.cc index 7c15cc3482..a35df19fe8 100644 --- a/src/vt/scheduler/scheduler.cc +++ b/src/vt/scheduler/scheduler.cc @@ -384,6 +384,17 @@ void Scheduler::resume(ThreadIDType tid) { suspended_.resumeRunnable(tid); } +void Scheduler::releaseEpoch(EpochType ep) { + auto iter = pending_work_.find(ep); + if (iter != pending_work_.end()) { + auto& container = iter->second; + while (container.size() > 0) { + work_queue_.emplace(container.pop()); + } + pending_work_.erase(iter); + } +} + #if vt_check_enabled(fcontext) ThreadManager* Scheduler::getThreadManager() { return thread_manager_.get(); diff --git a/src/vt/scheduler/scheduler.h b/src/vt/scheduler/scheduler.h index 85fbb2a58f..72584b4c92 100644 --- a/src/vt/scheduler/scheduler.h +++ b/src/vt/scheduler/scheduler.h @@ -234,6 +234,24 @@ struct Scheduler : runtime::component::Component { template void enqueue(RunT r); + /** + * \brief Enqueue a callable to execute later with the default priority + * \c default_priority + * + * \param[in] r action to execute + */ + template + void enqueueLambda(Callable&& c); + + /** + * \brief Enqueue a callable to execute later with a priority + * + * \param[in] priority the priority of the action + * \param[in] r action to execute + */ + template + void enqueueLambda(PriorityType priority, Callable&& c); + /** * \brief Enqueue an runnable with a priority to execute later * @@ -352,6 +370,13 @@ struct Scheduler : runtime::component::Component { ThreadManager* getThreadManager(); #endif + /** + * \brief Release an epoch to run + * + * \param[in] ep the epoch to release + */ + void releaseEpoch(EpochType ep); + template void serialize(SerializerT& s) { s | work_queue_ @@ -381,7 +406,8 @@ struct Scheduler : runtime::component::Component { | vtLiveTime | schedLoopTime | idleTime - | idleTimeMinusTerm; + | idleTimeMinusTerm + | pending_work_; } private: @@ -419,6 +445,9 @@ struct Scheduler : runtime::component::Component { Queue work_queue_; # endif + /// Unreleased work pending an epoch release + std::unordered_map> pending_work_; + #if vt_check_enabled(fcontext) std::unique_ptr thread_manager_ = nullptr; #endif diff --git a/src/vt/scheduler/scheduler.impl.h b/src/vt/scheduler/scheduler.impl.h index c7834bbd4e..60dff063dd 100644 --- a/src/vt/scheduler/scheduler.impl.h +++ b/src/vt/scheduler/scheduler.impl.h @@ -92,7 +92,15 @@ void Scheduler::enqueue(bool is_term, RunT r) { if (is_term) { num_term_msgs_++; } - work_queue_.emplace(UnitType(is_term, r)); + auto ep = r->getEpoch(); + if ( + ep != no_epoch and ep != term::any_epoch_sentinel and + not theTerm()->epochReleased(ep)) + { + pending_work_[ep].push(UnitType(is_term, r)); + } else { + work_queue_.emplace(UnitType(is_term, r)); + } } template @@ -103,12 +111,25 @@ void Scheduler::enqueue(MsgT* msg, RunT r) { num_term_msgs_++; } -# if vt_check_enabled(priorities) - auto priority = envelopeGetPriority(msg->env); - work_queue_.emplace(UnitType(is_term, std::move(r), priority)); -# else - work_queue_.emplace(UnitType(is_term, std::move(r))); -# endif + auto ep = r->getEpoch(); + if ( + ep != no_epoch and ep != term::any_epoch_sentinel and + not theTerm()->epochReleased(ep)) + { +# if vt_check_enabled(priorities) + auto priority = envelopeGetPriority(msg->env); + pending_work_[ep].push(UnitType(is_term, std::move(r), priority)); +# else + pending_work_[ep].push(UnitType(is_term, std::move(r))); +# endif + } else { +# if vt_check_enabled(priorities) + auto priority = envelopeGetPriority(msg->env); + work_queue_.emplace(UnitType(is_term, std::move(r), priority)); +# else + work_queue_.emplace(UnitType(is_term, std::move(r))); +# endif + } } template @@ -127,26 +148,76 @@ void Scheduler::enqueue(MsgSharedPtr const& msg, RunT r) { #endif } -template -void Scheduler::enqueue(RunT r) { +template +void Scheduler::enqueueLambda(Callable&& c) { bool const is_term = false; # if vt_check_enabled(priorities) - work_queue_.emplace(UnitType(is_term, std::move(r), default_priority)); + work_queue_.emplace( + UnitType(is_term, std::forward(c), default_priority) + ); # else - work_queue_.emplace(UnitType(is_term, std::move(r))); + work_queue_.emplace(UnitType(is_term, std::forward(c))); # endif } -template -void Scheduler::enqueue(PriorityType priority, RunT r) { +template +void Scheduler::enqueueLambda(PriorityType priority, Callable&& c) { bool const is_term = false; # if vt_check_enabled(priorities) - work_queue_.emplace(UnitType(is_term, std::move(r), priority)); + work_queue_.emplace( + UnitType(is_term, std::forward(c), priority) + ); # else - work_queue_.emplace(UnitType(is_term, std::move(r))); + work_queue_.emplace(UnitType(is_term, std::forward(c))); # endif } +template +void Scheduler::enqueue(RunT r) { + bool const is_term = false; + + auto ep = r->getEpoch(); + if ( + ep != no_epoch and ep != term::any_epoch_sentinel and + not theTerm()->epochReleased(ep)) + { +# if vt_check_enabled(priorities) + pending_work_[ep].push(UnitType(is_term, std::move(r), default_priority)); +# else + pending_work_[ep].push(UnitType(is_term, std::move(r))); +# endif + } else { +# if vt_check_enabled(priorities) + work_queue_.emplace(UnitType(is_term, std::move(r), default_priority)); +# else + work_queue_.emplace(UnitType(is_term, std::move(r))); +# endif + } +} + +template +void Scheduler::enqueue(PriorityType priority, RunT r) { + bool const is_term = false; + + auto ep = r->getEpoch(); + if ( + ep != no_epoch and ep != term::any_epoch_sentinel and + not theTerm()->epochReleased(ep)) + { +# if vt_check_enabled(priorities) + pending_work_[ep].push(UnitType(is_term, std::move(r), priority)); +# else + pending_work_[ep].push(UnitType(is_term, std::move(r))); +# endif + } else { +# if vt_check_enabled(priorities) + work_queue_.emplace(UnitType(is_term, std::move(r), priority)); +# else + work_queue_.emplace(UnitType(is_term, std::move(r))); +# endif + } +} + }} /* end namespace vt::sched */ #endif /*INCLUDED_VT_SCHEDULER_SCHEDULER_IMPL_H*/ diff --git a/src/vt/termination/dijkstra-scholten/comm.cc b/src/vt/termination/dijkstra-scholten/comm.cc index 3788348429..208d396ae2 100644 --- a/src/vt/termination/dijkstra-scholten/comm.cc +++ b/src/vt/termination/dijkstra-scholten/comm.cc @@ -102,7 +102,7 @@ namespace vt { namespace term { namespace ds { // DS terminator state. Thus, we can't remove it safely right here. So we will // enqueue an action to do the cleanup. This action must see if the DS term is // still disengaged (it can easily be re-engaged after it disengages). - theSched()->enqueue([epoch]{ + theSched()->enqueueLambda([epoch]{ auto ptr = theTerm()->getDSTerm(epoch); if (ptr != nullptr) { if (not ptr->isEngaged()) { diff --git a/src/vt/termination/termination.cc b/src/vt/termination/termination.cc index 1d11595d30..c462b59747 100644 --- a/src/vt/termination/termination.cc +++ b/src/vt/termination/termination.cc @@ -612,7 +612,7 @@ void TerminationDetector::cleanupEpoch(EpochType const& epoch, CallFromEnum from term_.erase(ds_term_iter); } } else { - theSched()->enqueue([epoch]{ + theSched()->enqueueLambda([epoch]{ theTerm()->cleanupEpoch(epoch, CallFromEnum::NonRoot); }); } @@ -627,7 +627,7 @@ void TerminationDetector::cleanupEpoch(EpochType const& epoch, CallFromEnum from } else { // Schedule the cleanup for later, we are in the midst of iterating and // can't safely erase it immediately - theSched()->enqueue([epoch]{ + theSched()->enqueueLambda([epoch]{ theTerm()->cleanupEpoch(epoch, CallFromEnum::NonRoot); }); } @@ -1165,7 +1165,7 @@ void TerminationDetector::runReleaseEpochActions(EpochType epoch) { fn(); } } - theMsg()->releaseEpochMsgs(epoch); + theSched()->releaseEpoch(epoch); } void TerminationDetector::onReleaseEpoch(EpochType epoch, ActionType action) { diff --git a/src/vt/vrt/collection/manager.cc b/src/vt/vrt/collection/manager.cc index 95c1ef5a95..23be79c724 100644 --- a/src/vt/vrt/collection/manager.cc +++ b/src/vt/vrt/collection/manager.cc @@ -92,7 +92,7 @@ elm::ElementIDStruct CollectionManager::getCurrentContext() const { } void CollectionManager::schedule(ActionType action) { - theSched()->enqueue(action); + theSched()->enqueueLambda(action); } VirtualProxyType CollectionManager::makeCollectionProxy( diff --git a/src/vt/vrt/collection/manager.impl.h b/src/vt/vrt/collection/manager.impl.h index 65679531a3..589787e94f 100644 --- a/src/vt/vrt/collection/manager.impl.h +++ b/src/vt/vrt/collection/manager.impl.h @@ -1721,7 +1721,7 @@ void CollectionManager::destroyElm( if (elm_holder->exists(idx)) { // Delay this so we can finish processing this work unit first (which might // be this collection element running) - theSched()->enqueue([idx,untyped_proxy]{ + theSched()->enqueueLambda([idx,untyped_proxy]{ auto elm = theCollection()->findElmHolder(untyped_proxy); if (elm->exists(idx)) { elm->remove(idx); diff --git a/tests/unit/termination/test_term_dep_epoch_active.cc b/tests/unit/termination/test_term_dep_epoch_active.cc index c0641d5949..ce1c2d14ae 100644 --- a/tests/unit/termination/test_term_dep_epoch_active.cc +++ b/tests/unit/termination/test_term_dep_epoch_active.cc @@ -57,17 +57,14 @@ using namespace vt::tests::unit; struct TestTermDepEpochActive : TestParallelHarness { }; -struct TestMsg : vt::Message { }; - struct TestDep { - static void depHandler(TestMsg* msg) { - //auto const& node = theContext()->getNode(); + static void depHandler() { num_dep++; - //fmt::print("{}: depHandler: num_dep={}\n", node, num_dep); + vt_print(gen, "depHandler: num_dep={}, epoch={:x}\n", num_dep, theTerm()->getEpoch()); EXPECT_EQ(num_non_dep, 1); } - static void nonDepHandler(TestMsg* msg) { + static void nonDepHandler() { //auto const& node = theContext()->getNode(); num_non_dep++; //fmt::print("{}: nonDepHandler: num_non_dep={}\n", node, num_non_dep); @@ -84,21 +81,16 @@ struct TestDep { TEST_F(TestTermDepEpochActive, test_term_dep_epoch_active) { auto const& this_node = theContext()->getNode(); auto const& num_nodes = theContext()->getNumNodes(); - bool const bcast = true; int const k = 10; TestDep::num_dep = 0; TestDep::num_non_dep = 0; vt::theCollective()->barrier(); - auto epoch = vt::theTerm()->makeEpochCollectiveDep(); + auto epoch = vt::theTerm()->makeEpochCollective(term::ParentEpochCapture{}, true); vt::theMsg()->pushEpoch(epoch); - if (bcast) { - for (int i = 0; i < k; i++) { - auto msg = vt::makeSharedMessage(); - vt::theMsg()->broadcastMsg(msg); - } - } else { + for (int i = 0; i < k; i++) { + vt::theMsg()->broadcast(); } vt::theMsg()->popEpoch(epoch); vt::theTerm()->finishedEpoch(epoch); @@ -107,14 +99,13 @@ TEST_F(TestTermDepEpochActive, test_term_dep_epoch_active) { chain->addIndex(this_node); chain->nextStep([=](NodeType node) { - auto const next = this_node + 1 < num_nodes ? this_node + 1 : 0; - auto msg = vt::makeSharedMessage(); - return vt::theMsg()->sendMsg(next,msg); + NodeType const next = this_node + 1 < num_nodes ? this_node + 1 : 0; + return vt::theMsg()->send(Node{next}); }); chain->nextStep([=](NodeType node) { - auto msg = vt::makeMessage(); - return vt::messaging::PendingSend(msg, [=](MsgVirtualPtr){ + auto msg = vt::makeMessage(); + return vt::messaging::PendingSend(msg, [=](MsgSharedPtr&){ EXPECT_EQ(TestDep::num_dep, 0); theTerm()->releaseEpoch(epoch); }); @@ -122,7 +113,7 @@ TEST_F(TestTermDepEpochActive, test_term_dep_epoch_active) { vt::theTerm()->addAction([=]{ EXPECT_EQ(TestDep::num_non_dep, 1); - EXPECT_EQ(TestDep::num_dep, (num_nodes - 1)*k); + EXPECT_EQ(TestDep::num_dep, num_nodes * k); }); } From a9816c65d31e9e07494b03717c00efa4412236fc Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Mon, 16 Oct 2023 14:44:54 -0700 Subject: [PATCH 08/20] #410: epoch: rework deps, objgroup dep epochs, scheduler buffers --- src/vt/epoch/epoch_manip.cc | 22 ++-- src/vt/objgroup/manager.cc | 4 + src/vt/objgroup/manager.fwd.h | 2 + src/vt/objgroup/manager.h | 17 +++ src/vt/runnable/runnable.cc | 2 + src/vt/runnable/runnable.h | 15 +++ src/vt/scheduler/base_unit.h | 7 ++ src/vt/scheduler/scheduler.cc | 21 +++- src/vt/scheduler/scheduler.h | 41 +++++-- src/vt/scheduler/scheduler.impl.h | 108 +++++++----------- src/vt/termination/termination.cc | 43 +------ src/vt/termination/termination.h | 9 -- .../test_scheduler_progress.extended.cc | 4 +- 13 files changed, 154 insertions(+), 141 deletions(-) diff --git a/src/vt/epoch/epoch_manip.cc b/src/vt/epoch/epoch_manip.cc index 19d717a433..1cdda37995 100644 --- a/src/vt/epoch/epoch_manip.cc +++ b/src/vt/epoch/epoch_manip.cc @@ -140,26 +140,26 @@ EpochWindow* EpochManip::getTerminatedWindow(EpochType epoch) { } /*static*/ bool EpochManip::isDS(EpochType epoch) { - using T = typename std::underlying_type::type; - if (isRooted(epoch)) { - auto const ds_bit = epoch::eEpochCategory::DijkstraScholtenEpoch; - auto const cat = category(epoch); - bool const is_ds = static_cast(cat) & static_cast(ds_bit); - return is_ds; + using T = typename std::underlying_type::type; + if (epoch != term::any_epoch_sentinel and isRooted(epoch)) { + BitPackerType::FieldType const ds_bit = + static_cast(eEpochCategory::DijkstraScholtenEpoch) - 1; + auto cat = static_cast(EpochManip::category(epoch)); + return BitPackerType::boolGetField(cat); } else { return false; } } /*static*/ bool EpochManip::isDep(EpochType epoch) { - using T = typename std::underlying_type::type; + using T = typename std::underlying_type::type; if (epoch == no_epoch or epoch == term::any_epoch_sentinel) { return false; } - auto const dep_bit = epoch::eEpochCategory::DependentEpoch; - auto const cat = epoch::EpochManip::category(epoch); - bool const is_dep = static_cast(cat) & static_cast(dep_bit); - return is_dep; + BitPackerType::FieldType const dep_bit = + static_cast(eEpochCategory::DependentEpoch) - 1; + auto cat = static_cast(EpochManip::category(epoch)); + return BitPackerType::boolGetField(cat); } /*static*/ eEpochCategory EpochManip::category(EpochType const& epoch) { diff --git a/src/vt/objgroup/manager.cc b/src/vt/objgroup/manager.cc index 18888093dc..f8e5280a91 100644 --- a/src/vt/objgroup/manager.cc +++ b/src/vt/objgroup/manager.cc @@ -145,4 +145,8 @@ std::unordered_map>& getPending() { return theObjGroup()->pending_; } +ObjGroupProxyType getProxyFromPtr(void* obj) { + return theObjGroup()->getProxyFromPtr(obj); +} + }} /* end namespace vt::objgroup */ diff --git a/src/vt/objgroup/manager.fwd.h b/src/vt/objgroup/manager.fwd.h index 14ad2bb4af..967898f9dd 100644 --- a/src/vt/objgroup/manager.fwd.h +++ b/src/vt/objgroup/manager.fwd.h @@ -75,6 +75,8 @@ decltype(auto) invoke(messaging::MsgSharedPtr msg, HandlerType han, NodeTy template messaging::PendingSend broadcast(MsgSharedPtr msg, HandlerType han); +ObjGroupProxyType getProxyFromPtr(void* obj); + }} /* end namespace vt::objgroup */ namespace vt { diff --git a/src/vt/objgroup/manager.h b/src/vt/objgroup/manager.h index 0080182e5e..59b38ebed7 100644 --- a/src/vt/objgroup/manager.h +++ b/src/vt/objgroup/manager.h @@ -490,6 +490,23 @@ struct ObjGroupManager : runtime::component::Component { */ elm::ElementIDStruct getNextElm(ObjGroupProxyType proxy); +public: + /** + * \brief Get an objgroup proxy from pointer + * + * \param[in] obj the pointer + * + * \return objgroup proxy + */ + ObjGroupProxyType getProxyFromPtr(void* obj) const { + auto iter = obj_to_proxy_.find(obj); + if (iter != obj_to_proxy_.end()) { + return iter->second; + } else { + return no_obj_group; + } + } + private: /// The current obj ID, sequential on each node for collective construction ObjGroupIDType cur_obj_id_ = fst_obj_group_id; diff --git a/src/vt/runnable/runnable.cc b/src/vt/runnable/runnable.cc index 853da3f9ac..2a42b5a314 100644 --- a/src/vt/runnable/runnable.cc +++ b/src/vt/runnable/runnable.cc @@ -82,6 +82,7 @@ void RunnableNew::setupHandler(HandlerType handler) { void RunnableNew::setupHandlerObjGroup(void* obj, HandlerType handler) { f_.func_ = auto_registry::getAutoHandlerObjGroup(handler).get(); obj_ = obj; + is_objgroup_ = true; } void RunnableNew::setupHandlerElement( @@ -92,6 +93,7 @@ void RunnableNew::setupHandlerElement( auto_registry::getAutoHandlerCollectionMem(handler).get() : auto_registry::getAutoHandlerCollection(handler).get(); obj_ = elm; + is_objgroup_ = false; } void RunnableNew::setupHandlerElement( diff --git a/src/vt/runnable/runnable.h b/src/vt/runnable/runnable.h index af1c8b793d..b2da85d583 100644 --- a/src/vt/runnable/runnable.h +++ b/src/vt/runnable/runnable.h @@ -360,6 +360,20 @@ struct RunnableNew { */ EpochType getEpoch() const; + /** + * \brief Whether this runnable targets an object group + * + * \return targets object group + */ + bool isObjGroup() const { return is_objgroup_; } + + /** + * \brief Get the object this runnable targets + * + * \return the obj + */ + void* getObj() const { return obj_; } + private: detail::Contexts contexts_; /**< The contexts */ MsgSharedPtr msg_ = nullptr; /**< The associated message */ @@ -369,6 +383,7 @@ struct RunnableNew { DispatcherScatterType func_scat_; } f_; bool is_scatter_ = false; + bool is_objgroup_ = false; #if vt_check_enabled(fcontext) bool is_threaded_ = false; /**< Whether ULTs are supported */ bool done_ = false; /**< Whether task is complete */ diff --git a/src/vt/scheduler/base_unit.h b/src/vt/scheduler/base_unit.h index 027485e028..499241e9e5 100644 --- a/src/vt/scheduler/base_unit.h +++ b/src/vt/scheduler/base_unit.h @@ -101,6 +101,13 @@ struct BaseUnit { */ void execute(); + /** + * \brief Get the runnable + * + * \return the runnable + */ + RunnablePtrType getRunnable() const { return r_; } + protected: RunnablePtrType r_ = nullptr; /**< the runnable task */ ActionType work_ = nullptr; /**< the lambda task */ diff --git a/src/vt/scheduler/scheduler.cc b/src/vt/scheduler/scheduler.cc index a35df19fe8..9ea55ecf3b 100644 --- a/src/vt/scheduler/scheduler.cc +++ b/src/vt/scheduler/scheduler.cc @@ -385,8 +385,7 @@ void Scheduler::resume(ThreadIDType tid) { } void Scheduler::releaseEpoch(EpochType ep) { - auto iter = pending_work_.find(ep); - if (iter != pending_work_.end()) { + if (auto iter = pending_work_.find(ep); iter != pending_work_.end()) { auto& container = iter->second; while (container.size() > 0) { work_queue_.emplace(container.pop()); @@ -395,6 +394,24 @@ void Scheduler::releaseEpoch(EpochType ep) { } } +void Scheduler::releaseEpochObjgroup(EpochType ep, ObjGroupProxyType proxy) { + released_objgroups_[ep].insert(proxy); + + if (auto iter = pending_objgroup_work_.find(ep); + iter != pending_objgroup_work_.end()) { + if (auto iter2 = iter->second.find(proxy); iter2 != iter->second.end()) { + auto& container = iter2->second; + while (container.size() > 0) { + work_queue_.emplace(container.pop()); + } + iter->second.erase(iter2); + } + if (iter->second.size() == 0) { + pending_objgroup_work_.erase(iter); + } + } +} + #if vt_check_enabled(fcontext) ThreadManager* Scheduler::getThreadManager() { return thread_manager_.get(); diff --git a/src/vt/scheduler/scheduler.h b/src/vt/scheduler/scheduler.h index 72584b4c92..9b7ec6a670 100644 --- a/src/vt/scheduler/scheduler.h +++ b/src/vt/scheduler/scheduler.h @@ -226,13 +226,14 @@ struct Scheduler : runtime::component::Component { bool hasSchedRun() const { return has_executed_; } /** - * \brief Enqueue an action to execute later with the default priority + * \brief Enqueue an action to execute later with a priority * \c default_priority * * \param[in] r action to execute + * \param[in] priority the priority of the action */ template - void enqueue(RunT r); + void enqueue(RunT r, PriorityType priority = default_priority); /** * \brief Enqueue a callable to execute later with the default priority @@ -252,15 +253,6 @@ struct Scheduler : runtime::component::Component { template void enqueueLambda(PriorityType priority, Callable&& c); - /** - * \brief Enqueue an runnable with a priority to execute later - * - * \param[in] priority the priority of the action - * \param[in] r the runnable to execute later - */ - template - void enqueue(PriorityType priority, RunT r); - /** * \brief Print current memory usage */ @@ -295,6 +287,14 @@ struct Scheduler : runtime::component::Component { template void enqueue(messaging::MsgSharedPtr const& msg, RunT r); + /** + * \brief Enqueue a work unit or postpone + * + * \param[in] u the work unit + */ + template + void enqueueOrPostpone(UnitT u); + /** * \brief Get the work queue size * @@ -377,6 +377,13 @@ struct Scheduler : runtime::component::Component { */ void releaseEpoch(EpochType ep); + /** + * \brief Release an epoch to run + * + * \param[in] ep the epoch to release + */ + void releaseEpochObjgroup(EpochType ep, ObjGroupProxyType proxy); + template void serialize(SerializerT& s) { s | work_queue_ @@ -407,7 +414,9 @@ struct Scheduler : runtime::component::Component { | schedLoopTime | idleTime | idleTimeMinusTerm - | pending_work_; + | pending_work_ + | pending_objgroup_work_ + | released_objgroups_; } private: @@ -448,6 +457,14 @@ struct Scheduler : runtime::component::Component { /// Unreleased work pending an epoch release std::unordered_map> pending_work_; + /// Unreleased work pending on an objgroup epoch release + std::unordered_map< + EpochType, std::unordered_map> + > pending_objgroup_work_; + + /// Released epochs for an objgroup + std::unordered_map> released_objgroups_; + #if vt_check_enabled(fcontext) std::unique_ptr thread_manager_ = nullptr; #endif diff --git a/src/vt/scheduler/scheduler.impl.h b/src/vt/scheduler/scheduler.impl.h index 60dff063dd..3a0e7ec561 100644 --- a/src/vt/scheduler/scheduler.impl.h +++ b/src/vt/scheduler/scheduler.impl.h @@ -47,6 +47,7 @@ #include "vt/config.h" #include "vt/messaging/active.h" #include "vt/termination/termination.h" +#include "vt/objgroup/manager.fwd.h" namespace vt { @@ -92,15 +93,10 @@ void Scheduler::enqueue(bool is_term, RunT r) { if (is_term) { num_term_msgs_++; } - auto ep = r->getEpoch(); - if ( - ep != no_epoch and ep != term::any_epoch_sentinel and - not theTerm()->epochReleased(ep)) - { - pending_work_[ep].push(UnitType(is_term, r)); - } else { - work_queue_.emplace(UnitType(is_term, r)); - } + + UnitType unit(is_term, r); + + enqueueOrPostpone(std::move(unit)); } template @@ -111,25 +107,40 @@ void Scheduler::enqueue(MsgT* msg, RunT r) { num_term_msgs_++; } +# if vt_check_enabled(priorities) + auto priority = envelopeGetPriority(msg->env); + UnitType unit(is_term, std::move(r), priority); +# else + UnitType unit(is_term, std::move(r)); +# endif + + enqueueOrPostpone(std::move(unit)); +} + +template +void Scheduler::enqueueOrPostpone(UnitT unit) { + auto r = unit.getRunnable(); auto ep = r->getEpoch(); - if ( - ep != no_epoch and ep != term::any_epoch_sentinel and - not theTerm()->epochReleased(ep)) - { -# if vt_check_enabled(priorities) - auto priority = envelopeGetPriority(msg->env); - pending_work_[ep].push(UnitType(is_term, std::move(r), priority)); -# else - pending_work_[ep].push(UnitType(is_term, std::move(r))); -# endif - } else { -# if vt_check_enabled(priorities) - auto priority = envelopeGetPriority(msg->env); - work_queue_.emplace(UnitType(is_term, std::move(r), priority)); -# else - work_queue_.emplace(UnitType(is_term, std::move(r))); -# endif + + bool const is_dep = epoch::EpochManip::isDep(ep); + + if (is_dep) { + auto obj = r->getObj(); + if (ep != no_epoch and ep != term::any_epoch_sentinel) { + if (obj and r->isObjGroup()) { + auto proxy = objgroup::getProxyFromPtr(obj); + if (released_objgroups_[ep].find(proxy) == released_objgroups_[ep].end()) { + pending_objgroup_work_[ep][proxy].push(unit); + return; + } + } else if (not theTerm()->epochReleased(ep)) { + pending_work_[ep].push(unit); + return; + } + } } + + work_queue_.push(unit); } template @@ -173,49 +184,16 @@ void Scheduler::enqueueLambda(PriorityType priority, Callable&& c) { } template -void Scheduler::enqueue(RunT r) { +void Scheduler::enqueue(RunT r, PriorityType priority) { bool const is_term = false; - auto ep = r->getEpoch(); - if ( - ep != no_epoch and ep != term::any_epoch_sentinel and - not theTerm()->epochReleased(ep)) - { -# if vt_check_enabled(priorities) - pending_work_[ep].push(UnitType(is_term, std::move(r), default_priority)); -# else - pending_work_[ep].push(UnitType(is_term, std::move(r))); -# endif - } else { -# if vt_check_enabled(priorities) - work_queue_.emplace(UnitType(is_term, std::move(r), default_priority)); -# else - work_queue_.emplace(UnitType(is_term, std::move(r))); +# if vt_check_enabled(priorities) + UnitType unit(is_term, std::move(r), priority); +# else + UnitType unit(is_term, std::move(r)); # endif - } -} -template -void Scheduler::enqueue(PriorityType priority, RunT r) { - bool const is_term = false; - - auto ep = r->getEpoch(); - if ( - ep != no_epoch and ep != term::any_epoch_sentinel and - not theTerm()->epochReleased(ep)) - { -# if vt_check_enabled(priorities) - pending_work_[ep].push(UnitType(is_term, std::move(r), priority)); -# else - pending_work_[ep].push(UnitType(is_term, std::move(r))); -# endif - } else { -# if vt_check_enabled(priorities) - work_queue_.emplace(UnitType(is_term, std::move(r), priority)); -# else - work_queue_.emplace(UnitType(is_term, std::move(r))); -# endif - } + enqueueOrPostpone(std::move(unit)); } }} /* end namespace vt::sched */ diff --git a/src/vt/termination/termination.cc b/src/vt/termination/termination.cc index c462b59747..392419bee1 100644 --- a/src/vt/termination/termination.cc +++ b/src/vt/termination/termination.cc @@ -1128,27 +1128,10 @@ void TerminationDetector::releaseEpoch(EpochType epoch) { bool const is_dep = isDep(epoch); if (is_dep) { - // Put the epoch in the released set, which is not conclusive due to - // dependencies, which effects the status. An epoch is *released* iff the - // epoch is in the released set and all succesrros are *released* (or there - // are no successors). The epoch any_epoch_sentinel does not count as a - // succcessor. + // Put the epoch in the released set. The epoch any_epoch_sentinel does not + // count as a succcessor. epoch_released_.insert(epoch); - - bool const is_released = epochReleased(epoch); - if (is_released) { - runReleaseEpochActions(epoch); - } else { - // Enqueue continuations to potentially release this epoch since the - // successor graph is not inverted (one-way knowledge) - auto const& successors = getEpochDep(epoch)->getSuccessors(); - vtAssert(successors.size() > 0, "Must have unreleased successors in this case"); - for (auto&& suc : successors) { - if (not epochReleased(suc)) { - onReleaseEpoch(suc, [epoch]{ theTerm()->releaseEpoch(epoch); }); - } - } - } + runReleaseEpochActions(epoch); } else { // The user might have made a mistake if they are trying to release an epoch // that is released-by-default (not dependent) @@ -1178,18 +1161,6 @@ void TerminationDetector::onReleaseEpoch(EpochType epoch, ActionType action) { } } -bool TerminationDetector::epochSuccessorsReleased(EpochType epoch) { - // Test of all parents of a given epoch are released - bool released = true; - auto const& successors = getEpochDep(epoch)->getSuccessors(); - if (successors.size() != 0) { - for (auto&& suc : successors) { - released &= epochReleased(suc); - } - } - return released; -} - bool TerminationDetector::epochReleased(EpochType epoch) { // Because of case (2), ignore dep <- no-dep because this should not be called // unless dep is released @@ -1206,14 +1177,6 @@ bool TerminationDetector::epochReleased(EpochType epoch) { return true; } - // All successors must be released for an epoch to be released even if its in - // the release set. Epochs are put in the release set early as to reduce - // tracking of epoch "release chains" - bool const is_successors_released = epochSuccessorsReleased(epoch); - if (not is_successors_released) { - return false; - } - // Check the release set auto iter = epoch_released_.find(epoch); return iter != epoch_released_.end(); diff --git a/src/vt/termination/termination.h b/src/vt/termination/termination.h index 1e94a6d09c..7e28ae64b4 100644 --- a/src/vt/termination/termination.h +++ b/src/vt/termination/termination.h @@ -408,15 +408,6 @@ struct TerminationDetector : */ void runReleaseEpochActions(EpochType epoch); - /** - * \brief Check if all epoch successors are released - * - * \param[in] epoch the epoch to check - * - * \return whether they are all released - */ - bool epochSuccessorsReleased(EpochType epoch); - public: /* * Directly call into a specific type of rooted epoch, can not be overridden diff --git a/tests/unit/scheduler/test_scheduler_progress.extended.cc b/tests/unit/scheduler/test_scheduler_progress.extended.cc index 100fe98488..1b82e7bc26 100644 --- a/tests/unit/scheduler/test_scheduler_progress.extended.cc +++ b/tests/unit/scheduler/test_scheduler_progress.extended.cc @@ -77,7 +77,7 @@ TEST_F(TestSchedProgress, test_scheduler_progress_1) { // Fill the queue with a second amount of work, in smaller increments to see // if progress triggers too early for (int i = 0; i < vt::theConfig()->vt_sched_progress_sec / 0.05; i++) { - testSched->enqueue([]{ sleep_for(50ms); }); + testSched->enqueueLambda([]{ sleep_for(50ms); }); } testSched->runSchedulerWhile([&done]{return not done;}); @@ -116,7 +116,7 @@ TEST_F(TestSchedProgress, test_scheduler_progress_2) { theEvent()->getEventHolder(event).attachAction([&]{ done = true; }); for (int i = 0; i < 10; i++) { - testSched->enqueue([]{ sleep_for(100ms); }); + testSched->enqueueLambda([]{ sleep_for(100ms); }); } testSched->runSchedulerWhile([&done]{return not done;}); From 4de8a7444743427c52d8a5ea8150313c64fe5dfb Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Tue, 17 Oct 2023 09:25:58 -0700 Subject: [PATCH 09/20] #410: objgroup: implement objgroup proxy functions for dependent epochs --- src/vt/objgroup/proxy/proxy_objgroup.h | 7 ++++++ src/vt/objgroup/proxy/proxy_objgroup.impl.h | 11 +++++++++ src/vt/objgroup/proxy/proxy_objgroup_elm.h | 16 +++++++++++++ .../objgroup/proxy/proxy_objgroup_elm.impl.h | 24 +++++++++++++++++++ src/vt/scheduler/scheduler.cc | 9 +++++++ src/vt/scheduler/scheduler.h | 10 ++++++++ 6 files changed, 77 insertions(+) diff --git a/src/vt/objgroup/proxy/proxy_objgroup.h b/src/vt/objgroup/proxy/proxy_objgroup.h index 1d4c34b525..65e2cb715f 100644 --- a/src/vt/objgroup/proxy/proxy_objgroup.h +++ b/src/vt/objgroup/proxy/proxy_objgroup.h @@ -431,6 +431,13 @@ struct Proxy { s | proxy_; } + /** + * \brief Release a dependent epoch for all nodes + * + * \param[in] epoch the epoch to release + */ + void release(EpochType epoch) const; + private: ObjGroupProxyType proxy_ = no_obj_group; /**< The raw proxy ID bits */ }; diff --git a/src/vt/objgroup/proxy/proxy_objgroup.impl.h b/src/vt/objgroup/proxy/proxy_objgroup.impl.h index 50d28298f0..702340d0a3 100644 --- a/src/vt/objgroup/proxy/proxy_objgroup.impl.h +++ b/src/vt/objgroup/proxy/proxy_objgroup.impl.h @@ -316,6 +316,17 @@ void Proxy::destroyHandleSetRDMA(vt::rdma::HandleSet set) const { return vt::theHandleRDMA()->deleteHandleSetCollectiveObjGroup(set); } +template +inline void releaseRemoteObjGroupBcast(Proxy proxy, EpochType ep) { + auto const node = theContext()->getNode(); + proxy[node].release(ep); +} + +template +void Proxy::release(EpochType epoch) const { + theMsg()->broadcast>(*this, epoch); +} + inline DefaultProxyElm Proxy::operator[](NodeType node) const { return DefaultProxyElm{node}; } diff --git a/src/vt/objgroup/proxy/proxy_objgroup_elm.h b/src/vt/objgroup/proxy/proxy_objgroup_elm.h index 812de35633..7d876b847e 100644 --- a/src/vt/objgroup/proxy/proxy_objgroup_elm.h +++ b/src/vt/objgroup/proxy/proxy_objgroup_elm.h @@ -211,6 +211,22 @@ struct ProxyElm { */ NodeType getNode() const { return node_; } + /** + * \brief Check if dependent epoch is released + * + * \param[in] epoch the epoch in question + * + * \return whether it is released + */ + bool isReleased(EpochType epoch) const; + + /** + * \brief Release a dependent epoch for this element + * + * \param[in] epoch the epoch to release + */ + void release(EpochType epoch) const; + public: /** * \brief Serialize the element proxy diff --git a/src/vt/objgroup/proxy/proxy_objgroup_elm.impl.h b/src/vt/objgroup/proxy/proxy_objgroup_elm.impl.h index dccd7b4f09..1e77053569 100644 --- a/src/vt/objgroup/proxy/proxy_objgroup_elm.impl.h +++ b/src/vt/objgroup/proxy/proxy_objgroup_elm.impl.h @@ -140,6 +140,30 @@ ObjT* ProxyElm::get() const { return theObjGroup()->get(proxy); } +template +bool ProxyElm::isReleased(EpochType epoch) const { + if (node_ == theContext()->getNode()) { + return theSched()->isReleasedEpochObjgroup(epoch, proxy_); + } else { + vtAbort("isReleased can only be called locally"); + return false; + } +} + +template +inline void releaseRemoteObjGroup(ProxyElm proxy, EpochType ep) { + proxy.release(ep); +} + +template +void ProxyElm::release(EpochType epoch) const { + if (node_ == theContext()->getNode()) { + theSched()->releaseEpochObjgroup(epoch, proxy_); + } else { + theMsg()->send>(vt::Node(node_), *this, epoch); + } +} + inline ProxyElm::ProxyElm(NodeType in_node) : node_{in_node} {} template * f, typename... Args> diff --git a/src/vt/scheduler/scheduler.cc b/src/vt/scheduler/scheduler.cc index 9ea55ecf3b..05f5f722e1 100644 --- a/src/vt/scheduler/scheduler.cc +++ b/src/vt/scheduler/scheduler.cc @@ -412,6 +412,15 @@ void Scheduler::releaseEpochObjgroup(EpochType ep, ObjGroupProxyType proxy) { } } +bool Scheduler::isReleasedEpochObjgroup( + EpochType ep, ObjGroupProxyType proxy +) const { + if (auto it = released_objgroups_.find(ep); it != released_objgroups_.end()) { + return it->second.find(proxy) != it->second.end(); + } + return false; +} + #if vt_check_enabled(fcontext) ThreadManager* Scheduler::getThreadManager() { return thread_manager_.get(); diff --git a/src/vt/scheduler/scheduler.h b/src/vt/scheduler/scheduler.h index 9b7ec6a670..19e482c5ce 100644 --- a/src/vt/scheduler/scheduler.h +++ b/src/vt/scheduler/scheduler.h @@ -384,6 +384,16 @@ struct Scheduler : runtime::component::Component { */ void releaseEpochObjgroup(EpochType ep, ObjGroupProxyType proxy); + /** + * \brief Check if a epoch is released for an objgroup + * + * \param[in] ep the epoch to check + * \param[in] proxy the objgroup proxy + * + * \return whether it's released + */ + bool isReleasedEpochObjgroup(EpochType ep, ObjGroupProxyType proxy) const; + template void serialize(SerializerT& s) { s | work_queue_ From bfdc3f47df0b1169c1b8be58122443b78e128e53 Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Tue, 17 Oct 2023 17:30:26 -0700 Subject: [PATCH 10/20] #410: collection: add dependent epochs to collections, system messages to keep progress --- .../group/collective/group_info_collective.cc | 12 +++ src/vt/group/rooted/group_info_rooted.cc | 2 + src/vt/messaging/active.impl.h | 4 + src/vt/messaging/envelope/envelope_base.h | 4 + src/vt/messaging/envelope/envelope_set.h | 10 ++ src/vt/messaging/envelope/envelope_set.impl.h | 5 + .../messaging/envelope/envelope_setup.impl.h | 1 + src/vt/scheduler/scheduler.cc | 18 ++++ src/vt/scheduler/scheduler.h | 14 +++ src/vt/scheduler/scheduler.impl.h | 39 +++++--- src/vt/vrt/collection/manager.h | 45 +++++++++ src/vt/vrt/collection/manager.impl.h | 53 +++++++++++ .../proxy_traits/proxy_col_traits.h | 6 +- .../proxy_traits/proxy_elm_traits.h | 6 +- src/vt/vrt/collection/release/releasable.h | 92 +++++++++++++++++++ .../vrt/collection/release/releasable.impl.h | 87 ++++++++++++++++++ src/vt/vrt/collection/types/untyped.h | 35 +++++++ 17 files changed, 417 insertions(+), 16 deletions(-) create mode 100644 src/vt/vrt/collection/release/releasable.h create mode 100644 src/vt/vrt/collection/release/releasable.impl.h diff --git a/src/vt/group/collective/group_info_collective.cc b/src/vt/group/collective/group_info_collective.cc index 7c7075eb67..f0ba9400cd 100644 --- a/src/vt/group/collective/group_info_collective.cc +++ b/src/vt/group/collective/group_info_collective.cc @@ -227,6 +227,7 @@ void InfoColl::setupCollective() { auto msg = makeMessage( group_, up_tree_cont_, in_group, size, child ); + envelopeSetSystemMsg(msg->env, true); theMsg()->sendMsg(parent, msg); } } @@ -336,6 +337,7 @@ void InfoColl::upTree() { auto msg = makeMessage( group,new_root_cont_,true,subtree_zero,root_node,0,extra ); + envelopeSetSystemMsg(msg->env, true); theMsg()->sendMsg(root_node, msg); for (std::size_t i = 1; i < msg_list.size(); i++) { @@ -377,6 +379,7 @@ void InfoColl::upTree() { auto cmsg = makeMessage( group,op,is_in_group,total_subtree,child,level ); + envelopeSetSystemMsg(cmsg->env, true); theMsg()->sendMsg(p, cmsg); for (auto&& msg : msg_in_group) { @@ -406,6 +409,7 @@ void InfoColl::upTree() { auto msg = makeMessage( group,op,is_in_group,static_cast(subtree_),child,0,extra ); + envelopeSetSystemMsg(msg->env, true); theMsg()->sendMsg(p, msg); /* * Forward all the children messages up the tree (up to 2 of them) @@ -438,9 +442,11 @@ void InfoColl::upTree() { auto msg = makeMessage( group,op,is_in_group,total_subtree,child,0,extra ); + envelopeSetSystemMsg(msg->env, true); theMsg()->sendMsg(p, msg); // new MsgPtr to avoid thief of original in collection auto msg_out = promoteMsg(msg_in_group[0].get()); + envelopeSetSystemMsg(msg_out->env, true); theMsg()->sendMsg(p, msg_out); } else { vtAssertExpr(msg_in_group.size() > 2); @@ -478,6 +484,7 @@ void InfoColl::upTree() { auto msg = makeMessage( group,op,is_in_group,total_subtree,child,0,extra ); + envelopeSetSystemMsg(msg->env, true); theMsg()->sendMsg(p, msg); vt_debug_print( @@ -639,12 +646,14 @@ void InfoColl::downTree(GroupCollectiveMsg* msg) { auto const& num = collective_->span_children_.size(); auto const& child = collective_->span_children_[msg->getChild() % num]; auto nmsg = makeMessage(*msg); + envelopeSetSystemMsg(nmsg->env, true); theMsg()->sendMsg(child, nmsg); ++send_down_; } auto const& group_ = getGroupID(); auto nmsg = makeMessage(group_,down_tree_fin_cont_); + envelopeSetSystemMsg(nmsg->env, true); theMsg()->sendMsg(from, nmsg); } @@ -686,6 +695,7 @@ void InfoColl::sendDownNewTree() { group_, c ); auto msg = makeMessage(group_,new_tree_cont_); + envelopeSetSystemMsg(msg->env, true); theMsg()->sendMsg(c, msg); } } @@ -727,6 +737,7 @@ void InfoColl::finalize() { auto msg = makeMessage( group_,finalize_cont_,known_root_node_,is_default_group_ ); + envelopeSetSystemMsg(msg->env, true); theMsg()->sendMsg(c, msg); } @@ -744,6 +755,7 @@ void InfoColl::finalize() { auto stamp = makeStamp(group_); auto msg = makeMessage(group_); + envelopeSetSystemMsg(msg->env, true); using OpType = collective::PlusOp; auto r = theGroup()->reducer(); diff --git a/src/vt/group/rooted/group_info_rooted.cc b/src/vt/group/rooted/group_info_rooted.cc index 757f85d97a..1f1a21594e 100644 --- a/src/vt/group/rooted/group_info_rooted.cc +++ b/src/vt/group/rooted/group_info_rooted.cc @@ -119,6 +119,7 @@ void InfoRooted::setupRooted() { auto msg = makeMessage( low_node, listsize, group_, op, listsize, this_node, &lst ); + envelopeSetSystemMsg(msg->env, true); is_forward_ = true; forward_node_ = low_node; if (this_node != low_node) { @@ -142,6 +143,7 @@ void InfoRooted::setupRooted() { low_node, regsize, group_, op, regsize, this_node, static_cast(region_.get()) ); + envelopeSetSystemMsg(msg->env, true); is_forward_ = true; forward_node_ = low_node; if (this_node != low_node) { diff --git a/src/vt/messaging/active.impl.h b/src/vt/messaging/active.impl.h index 610b8ece67..053c14cd0d 100644 --- a/src/vt/messaging/active.impl.h +++ b/src/vt/messaging/active.impl.h @@ -68,6 +68,7 @@ void ActiveMessenger::markAsTermMessage(MsgPtrT const msg) { #if vt_check_enabled(trace_enabled) envelopeSetTraceRuntimeEnabled(msg->env, theConfig()->traceTerm()); #endif + envelopeSetSystemMsg(msg->env, true); } template @@ -75,6 +76,7 @@ void ActiveMessenger::markAsLocationMessage(MsgPtrT const msg) { #if vt_check_enabled(trace_enabled) envelopeSetTraceRuntimeEnabled(msg->env, theConfig()->traceLocation()); #endif + envelopeSetSystemMsg(msg->env, true); } template @@ -82,6 +84,7 @@ void ActiveMessenger::markAsSerialMsgMessage(MsgPtrT const msg) { #if vt_check_enabled(trace_enabled) envelopeSetTraceRuntimeEnabled(msg->env, theConfig()->traceSerialMsg()); #endif + envelopeSetSystemMsg(msg->env, true); } template @@ -89,6 +92,7 @@ void ActiveMessenger::markAsCollectionMessage(MsgPtrT const msg) { #if vt_check_enabled(trace_enabled) envelopeSetTraceRuntimeEnabled(msg->env, theConfig()->traceCollection()); #endif + envelopeSetSystemMsg(msg->env, true); } template diff --git a/src/vt/messaging/envelope/envelope_base.h b/src/vt/messaging/envelope/envelope_base.h index 1f0660a2b0..0177919fc6 100644 --- a/src/vt/messaging/envelope/envelope_base.h +++ b/src/vt/messaging/envelope/envelope_base.h @@ -105,6 +105,10 @@ struct ActiveEnvelope { /// Used to denote that the message's bare handlers shouldn't record /// communication LB data due to redundancy bool comm_lb_data_recorded_above_bare_handler : 1; + + /// Used to denote that this is an internal system message that does not do + /// application work + bool system_msg : 1; }; }} /* end namespace vt::messaging */ diff --git a/src/vt/messaging/envelope/envelope_set.h b/src/vt/messaging/envelope/envelope_set.h index 440275be59..23d7e04409 100644 --- a/src/vt/messaging/envelope/envelope_set.h +++ b/src/vt/messaging/envelope/envelope_set.h @@ -250,6 +250,16 @@ inline void envelopeSetIsLocked(Env& env, bool is_locked); template inline void envelopeUnlockForForwarding(Env& env); +/** + * \brief Set whether this is an internal system message + * + * \param[in,out] env the envelope + * \param[in] is_system_msg value indicating message is a system message + */ +template +inline void envelopeSetSystemMsg(Env& env, bool is_system_msg); + + } /* end namespace vt */ #include "vt/messaging/envelope/envelope_set.impl.h" diff --git a/src/vt/messaging/envelope/envelope_set.impl.h b/src/vt/messaging/envelope/envelope_set.impl.h index 7a5c59e996..9c9c1b63ad 100644 --- a/src/vt/messaging/envelope/envelope_set.impl.h +++ b/src/vt/messaging/envelope/envelope_set.impl.h @@ -159,6 +159,11 @@ inline void envelopeSetCommLBDataRecordedAboveBareHandler( comm_lb_data_recorded_above_bare_handler; } +template +inline void envelopeSetSystemMsg(Env& env, bool is_system_msg) { + reinterpret_cast(&env)->system_msg = is_system_msg; +} + template inline void envelopeSetIsLocked(Env& env, bool is_locked) { reinterpret_cast(&env)->is_locked = is_locked; diff --git a/src/vt/messaging/envelope/envelope_setup.impl.h b/src/vt/messaging/envelope/envelope_setup.impl.h index ec7ed83d80..ca407934e7 100644 --- a/src/vt/messaging/envelope/envelope_setup.impl.h +++ b/src/vt/messaging/envelope/envelope_setup.impl.h @@ -76,6 +76,7 @@ inline void envelopeInit(Env& env) { #endif envelopeSetHasBeenSerialized(env, false); envelopeSetCommLBDataRecordedAboveBareHandler(env, false); + envelopeSetSystemMsg(env, false); } inline void envelopeInitEmpty(Envelope& env) { diff --git a/src/vt/scheduler/scheduler.cc b/src/vt/scheduler/scheduler.cc index 05f5f722e1..71a9f05dcd 100644 --- a/src/vt/scheduler/scheduler.cc +++ b/src/vt/scheduler/scheduler.cc @@ -412,6 +412,24 @@ void Scheduler::releaseEpochObjgroup(EpochType ep, ObjGroupProxyType proxy) { } } +void Scheduler::releaseEpochCollection(EpochType ep, UntypedCollection* untyped) { + if ( + auto iter = pending_collection_work_.find(ep); + iter != pending_collection_work_.end()) + { + if (auto iter2 = iter->second.find(untyped); iter2 != iter->second.end()) { + auto& container = iter2->second; + while (container.size() > 0) { + work_queue_.emplace(container.pop()); + } + iter->second.erase(iter2); + } + if (iter->second.size() == 0) { + pending_collection_work_.erase(iter); + } + } +} + bool Scheduler::isReleasedEpochObjgroup( EpochType ep, ObjGroupProxyType proxy ) const { diff --git a/src/vt/scheduler/scheduler.h b/src/vt/scheduler/scheduler.h index 19e482c5ce..7fb423fe0d 100644 --- a/src/vt/scheduler/scheduler.h +++ b/src/vt/scheduler/scheduler.h @@ -53,6 +53,7 @@ #include "vt/timing/timing.h" #include "vt/runtime/component/component_pack.h" #include "vt/messaging/async_op_wrapper.fwd.h" +#include "vt/vrt/collection/types/untyped.h" #include #include @@ -115,6 +116,7 @@ struct Scheduler : runtime::component::Component { using TriggerContainerType = std::list; using EventTriggerContType = std::vector; using RunnablePtrType = runnable::RunnableNew*; + using UntypedCollection = vrt::collection::UntypedCollection; struct SchedulerLoopGuard { SchedulerLoopGuard(Scheduler* scheduler); @@ -384,6 +386,13 @@ struct Scheduler : runtime::component::Component { */ void releaseEpochObjgroup(EpochType ep, ObjGroupProxyType proxy); + /** + * \brief Release an epoch to run + * + * \param[in] ep the epoch to release + */ + void releaseEpochCollection(EpochType ep, UntypedCollection* untyped); + /** * \brief Check if a epoch is released for an objgroup * @@ -426,6 +435,7 @@ struct Scheduler : runtime::component::Component { | idleTimeMinusTerm | pending_work_ | pending_objgroup_work_ + | pending_collection_work_ | released_objgroups_; } @@ -472,6 +482,10 @@ struct Scheduler : runtime::component::Component { EpochType, std::unordered_map> > pending_objgroup_work_; + std::unordered_map< + EpochType, std::unordered_map> + > pending_collection_work_; + /// Released epochs for an objgroup std::unordered_map> released_objgroups_; diff --git a/src/vt/scheduler/scheduler.impl.h b/src/vt/scheduler/scheduler.impl.h index 3a0e7ec561..0be989272b 100644 --- a/src/vt/scheduler/scheduler.impl.h +++ b/src/vt/scheduler/scheduler.impl.h @@ -120,22 +120,33 @@ void Scheduler::enqueue(MsgT* msg, RunT r) { template void Scheduler::enqueueOrPostpone(UnitT unit) { auto r = unit.getRunnable(); - auto ep = r->getEpoch(); - - bool const is_dep = epoch::EpochManip::isDep(ep); - - if (is_dep) { - auto obj = r->getObj(); - if (ep != no_epoch and ep != term::any_epoch_sentinel) { - if (obj and r->isObjGroup()) { - auto proxy = objgroup::getProxyFromPtr(obj); - if (released_objgroups_[ep].find(proxy) == released_objgroups_[ep].end()) { - pending_objgroup_work_[ep][proxy].push(unit); + auto m = r->getMsg(); + + // If it's a system message we can schedule it right away + if (m and not m->env.system_msg) { + auto ep = r->getEpoch(); + bool const is_dep = epoch::EpochManip::isDep(ep); + if (is_dep) { + auto obj = r->getObj(); + if (ep != no_epoch and ep != term::any_epoch_sentinel) { + if (obj) { + if (r->isObjGroup()) { + auto proxy = objgroup::getProxyFromPtr(obj); + if (released_objgroups_[ep].find(proxy) == released_objgroups_[ep].end()) { + pending_objgroup_work_[ep][proxy].push(unit); + return; + } + } else { + auto untyped = static_cast(obj); + if (not untyped->isReleasedEpoch(ep)) { + pending_collection_work_[ep][untyped].push(unit); + return; + } + } + } else if (not theTerm()->epochReleased(ep)) { + pending_work_[ep].push(unit); return; } - } else if (not theTerm()->epochReleased(ep)) { - pending_work_[ep].push(unit); - return; } } } diff --git a/src/vt/vrt/collection/manager.h b/src/vt/vrt/collection/manager.h index 9ef6540206..3a4dd0a181 100644 --- a/src/vt/vrt/collection/manager.h +++ b/src/vt/vrt/collection/manager.h @@ -1467,6 +1467,50 @@ struct CollectionManager template void addCleanupFn(VirtualProxyType proxy); +public: + /** + * \brief Release a dependent epoch for a given proxy index + * + * \param[in] proxy the element proxy + * \param[in] epoch epoch to release + */ + template + void releaseEpoch( + VrtElmProxy proxy, EpochType epoch + ); + + /** + * \brief Release a dependent epoch for the whole collection + * + * \param[in] proxy the collection proxy + * \param[in] epoch the epoch to release + */ + template + void releaseEpochCollection(VirtualProxyType proxy, EpochType epoch); + + /** + * \brief Check if a dependent epoch for a given proxy index is released + * + * \param[in] proxy the element proxy + * \param[in] epoch epoch to check + * + * \return whether it is released on this element + */ + template + bool isReleasedEpoch( + VrtElmProxy proxy, EpochType epoch + ); + +private: + /** + * \brief Handler to release a dependent epoch across the whole collection + * + * \param[in] proxy the collection proxy + * \param[in] epoch epoch to release + */ + template + static void releaseWholeCollection(VirtualProxyType proxy, EpochType epoch); + private: template friend struct CollectionElmAttorney; @@ -1782,6 +1826,7 @@ struct CollectionManager #include "vt/vrt/collection/destroy/manager_destroy_attorney.impl.h" #include "vt/vrt/collection/broadcast/broadcastable.impl.h" #include "vt/vrt/collection/rdmaable/rdmaable.impl.h" +#include "vt/vrt/collection/release/releasable.impl.h" #include "vt/vrt/collection/balance/col_lb_data.impl.h" #include "vt/vrt/collection/types/indexable.impl.h" #include "vt/vrt/collection/dispatch/dispatch.impl.h" diff --git a/src/vt/vrt/collection/manager.impl.h b/src/vt/vrt/collection/manager.impl.h index 589787e94f..459b5e8c75 100644 --- a/src/vt/vrt/collection/manager.impl.h +++ b/src/vt/vrt/collection/manager.impl.h @@ -2318,6 +2318,59 @@ messaging::PendingSend CollectionManager::schedule( }); } +namespace { + +template +void releaseRemoteCollection( + ColT* col, VrtElmProxy proxy, EpochType ep +) { + proxy.release(ep); +} + +} /* end anon namespace */ + +template +void CollectionManager::releaseEpoch( + VrtElmProxy proxy, EpochType epoch +) { + if (auto ptr = proxy.tryGetLocalPtr(); ptr != nullptr) { + ptr->addReleasedEpoch(epoch); + theSched()->releaseEpochCollection(epoch, ptr); + } else { + proxy.template send>(proxy, epoch); + } +} + +template +void CollectionManager::releaseEpochCollection( + VirtualProxyType proxy, EpochType epoch +) { + theMsg()->broadcast>(proxy, epoch); +} + +template +bool CollectionManager::isReleasedEpoch( + VrtElmProxy proxy, EpochType epoch +) { + if (auto ptr = proxy.tryGetLocalPtr(); ptr != nullptr) { + return ptr->isReleasedEpoch(epoch); + } else { + vtAbort("Can not call isReleased on a non-local proxy"); + return false; + } +} + +template +/*static*/ void CollectionManager::releaseWholeCollection( + VirtualProxyType proxy, EpochType epoch +) { + CollectionProxyWrapType typed_proxy{proxy}; + auto const& idxs = theCollection()->getLocalIndices(typed_proxy); + for (auto&& idx : idxs) { + theCollection()->releaseEpoch(typed_proxy[idx], epoch); + } +} + }}} /* end namespace vt::vrt::collection */ #include "vt/vrt/collection/collection_builder.impl.h" diff --git a/src/vt/vrt/collection/proxy_traits/proxy_col_traits.h b/src/vt/vrt/collection/proxy_traits/proxy_col_traits.h index d93b75fcd6..c098014e0f 100644 --- a/src/vt/vrt/collection/proxy_traits/proxy_col_traits.h +++ b/src/vt/vrt/collection/proxy_traits/proxy_col_traits.h @@ -51,13 +51,17 @@ #include "vt/vrt/collection/insert/modifiable.h" #include "vt/vrt/collection/rdmaable/rdmaable.h" #include "vt/vrt/proxy/base_collection_proxy.h" +#include "vt/vrt/collection/release/releasable.h" namespace vt { namespace vrt { namespace collection { namespace col_proxy { template -using Chain5 = RDMAable>; +using Chain6 = ReleasableCol>; + +template +using Chain5 = RDMAable>; template using Chain4 = Modifiable>; diff --git a/src/vt/vrt/collection/proxy_traits/proxy_elm_traits.h b/src/vt/vrt/collection/proxy_traits/proxy_elm_traits.h index e5632b477f..388b981fde 100644 --- a/src/vt/vrt/collection/proxy_traits/proxy_elm_traits.h +++ b/src/vt/vrt/collection/proxy_traits/proxy_elm_traits.h @@ -51,13 +51,17 @@ #include "vt/vrt/collection/invoke/invokable.h" #include "vt/vrt/collection/gettable/gettable.h" #include "vt/vrt/collection/insert/insertable.h" +#include "vt/vrt/collection/release/releasable.h" namespace vt { namespace vrt { namespace collection { namespace elm_proxy { template -using Chain4 = Invokable>; +using Chain5 = Releaseable>; + +template +using Chain4 = Invokable>; template using Chain3 = Gettable>; diff --git a/src/vt/vrt/collection/release/releasable.h b/src/vt/vrt/collection/release/releasable.h new file mode 100644 index 0000000000..df6c06a70e --- /dev/null +++ b/src/vt/vrt/collection/release/releasable.h @@ -0,0 +1,92 @@ +/* +//@HEADER +// ***************************************************************************** +// +// releasable.h +// DARMA/vt => Virtual Transport +// +// Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC +// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. +// Government retains certain rights in this software. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from this +// software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// Questions? Contact darma@sandia.gov +// +// ***************************************************************************** +//@HEADER +*/ + +#if !defined INCLUDED_VT_VRT_COLLECTION_RELEASE_RELEASABLE_H +#define INCLUDED_VT_VRT_COLLECTION_RELEASE_RELEASABLE_H + +#include "vt/vrt/proxy/base_collection_proxy.h" + +namespace vt::vrt::collection { + +template +struct Releaseable : BaseProxyT { + Releaseable() = default; + Releaseable( + typename BaseProxyT::ProxyType const& in_proxy, + typename BaseProxyT::ElementProxyType const& in_elm + ); + + /** + * \brief Check if dependent epoch is released + * + * \param[in] epoch the epoch in question + * + * \return whether it is released + */ + bool isReleased(EpochType epoch) const; + + /** + * \brief Release a dependent epoch for this element + * + * \param[in] epoch the epoch to release + */ + void release(EpochType epoch) const; +}; + +template +struct ReleasableCol : BaseProxyT { + ReleasableCol() = default; + + explicit ReleasableCol(VirtualProxyType const in_proxy); + + /** + * \brief Release a dependent epoch for the whole collection + * + * \param[in] epoch the epoch to release + */ + void release(EpochType epoch) const; +}; + +} /* end namespace vt::vrt::collection */ + +#endif /*INCLUDED_VT_VRT_COLLECTION_RELEASE_RELEASABLE_H*/ diff --git a/src/vt/vrt/collection/release/releasable.impl.h b/src/vt/vrt/collection/release/releasable.impl.h new file mode 100644 index 0000000000..02dfa0bccf --- /dev/null +++ b/src/vt/vrt/collection/release/releasable.impl.h @@ -0,0 +1,87 @@ +/* +//@HEADER +// ***************************************************************************** +// +// releasable.impl.h +// DARMA/vt => Virtual Transport +// +// Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC +// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. +// Government retains certain rights in this software. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from this +// software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// Questions? Contact darma@sandia.gov +// +// ***************************************************************************** +//@HEADER +*/ + +#if !defined INCLUDED_VT_VRT_COLLECTION_RELEASE_RELEASABLE_IMPL_H +#define INCLUDED_VT_VRT_COLLECTION_RELEASE_RELEASABLE_IMPL_H + +#include "vt/vrt/collection/release/releasable.h" + +namespace vt::vrt::collection { + +template +Releaseable::Releaseable( + typename BaseProxyT::ProxyType const& in_proxy, + typename BaseProxyT::ElementProxyType const& in_elm +) : BaseProxyT(in_proxy, in_elm) +{ } + +template +bool Releaseable::isReleased(EpochType epoch) const { + auto const& proxy = VrtElmProxy( + this->getCollectionProxy(), this->getElementProxy() + ); + return theCollection()->isReleasedEpoch(proxy, epoch); +} + +template +void Releaseable::release(EpochType epoch) const { + auto const& proxy = VrtElmProxy( + this->getCollectionProxy(), this->getElementProxy() + ); + theCollection()->releaseEpoch(proxy, epoch); +} + +template +ReleasableCol::ReleasableCol( + VirtualProxyType const in_proxy +) : BaseProxyT(in_proxy) +{ } + +template +void ReleasableCol::release(EpochType epoch) const { + theCollection()->releaseEpochCollection(this->getProxy(), epoch); +} + +} /* end namespace vt::vrt::collection */ + +#endif /*INCLUDED_VT_VRT_COLLECTION_RELEASE_RELEASABLE_IMPL_H*/ diff --git a/src/vt/vrt/collection/types/untyped.h b/src/vt/vrt/collection/types/untyped.h index f83029f4c0..87d4e68fcb 100644 --- a/src/vt/vrt/collection/types/untyped.h +++ b/src/vt/vrt/collection/types/untyped.h @@ -61,7 +61,42 @@ struct UntypedCollection : VrtBase, HasMigrate { template void serialize(Serializer& s) { VrtBase::serialize(s); + s | released_epochs_; } + +public: + /** + * \brief Add a released epoch to the set + * + * \param[in] epoch the epoch to add + */ + void addReleasedEpoch(EpochType epoch) { released_epochs_.insert(epoch); } + + /** + * \brief Remove a released epoch from the set + * + * \param[in] epoch the epoch to remove + */ + void removeReleasedEpoch(EpochType epoch) { + if (auto i = released_epochs_.find(epoch); i != released_epochs_.end()) { + released_epochs_.erase(i); + } + } + + /** + * \brief Check if an epoch has been released + * + * \param[in] epoch the epoch to check + * + * \return whether it is released + */ + bool isReleasedEpoch(EpochType epoch) const { + return released_epochs_.find(epoch) != released_epochs_.end(); + } + +private: + /// Released epochs for this collection element + std::unordered_set released_epochs_; }; }}} /* end namespace vt::vrt::collection */ From 3c1ac7f980a05ec39512c7e01d821d023020ea86 Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Fri, 16 Aug 2019 11:04:23 -0700 Subject: [PATCH 11/20] #410: test: add new test for dep epochs and collections --- .../test_term_dep_epoch_collection.cc | 153 ++++++++++++++++++ 1 file changed, 153 insertions(+) create mode 100644 tests/unit/termination/test_term_dep_epoch_collection.cc diff --git a/tests/unit/termination/test_term_dep_epoch_collection.cc b/tests/unit/termination/test_term_dep_epoch_collection.cc new file mode 100644 index 0000000000..0d77ee2f31 --- /dev/null +++ b/tests/unit/termination/test_term_dep_epoch_collection.cc @@ -0,0 +1,153 @@ +/* +//@HEADER +// ************************************************************************ +// +// test_term_dep_epoch_collection.cc +// vt (Virtual Transport) +// Copyright (C) 2018 NTESS, LLC +// +// Under the terms of Contract DE-NA-0003525 with NTESS, LLC, +// the U.S. Government retains certain rights in this software. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// 1. Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the Corporation nor the names of the +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +// Questions? Contact darma@sandia.gov +// +// ************************************************************************ +//@HEADER +*/ + +#include + +#include "test_parallel_harness.h" +#include "data_message.h" + +#include "vt/transport.h" +#include "vt/messaging/collection_chain_set.h" + +namespace vt { namespace tests { namespace unit { + +using namespace vt; +using namespace vt::tests::unit; + +struct TestTermDepEpochCollection : TestParallelHarness { }; + +struct TestDep : vt::Collection { + struct TestMsg : vt::CollectionMessage { + int k = 0; + int r = 0; + }; + + void depHandler(TestMsg* msg) { + auto const& node = theContext()->getNode(); + auto idx = this->getIndex(); + num_dep++; + fmt::print("{}: {}: depHandler: num_dep={}\n", node, idx, num_dep); + EXPECT_EQ(num_non_dep, 1); + } + + void nonDepHandler(TestMsg* msg) { + auto const& node = theContext()->getNode(); + auto idx = this->getIndex(); + num_non_dep++; + fmt::print("{}: {}: nonDepHandler: num_non_dep={}\n", node, idx, num_non_dep); + EXPECT_EQ(num_dep, 0); + } + + void check(TestMsg* msg) { + auto const num_nodes = vt::theContext()->getNumNodes(); + EXPECT_EQ(num_non_dep, 1); + EXPECT_EQ(num_dep, num_nodes*msg->k); + } + + int num_dep = 0; + int num_non_dep = 0; +}; + +using TestMsg = typename TestDep::TestMsg; + +TEST_F(TestTermDepEpochCollection, test_term_dep_epoch_collection) { + auto const& this_node = theContext()->getNode(); + auto const& num_nodes = theContext()->getNumNodes(); + bool const bcast = true; + int const k = 10; + int const r = 48; + + auto range = vt::Index1D(r); + auto proxy = vt::theCollection()->constructCollective(range); + vt::theCollective()->barrier(); + + auto epoch = vt::theTerm()->makeEpochCollectiveDep(); + vt::theMsg()->pushEpoch(epoch); + if (bcast) { + for (int i = 0; i < k; i++) { + auto msg = vt::makeSharedMessage(); + //vt::envelopeSetEpoch(msg->env, epoch); + proxy.template broadcast(msg); + } + } else { + } + vt::theMsg()->popEpoch(epoch); + vt::theTerm()->finishedEpoch(epoch); + + auto chain = std::make_unique>(); + + for (int i = 0; i < r; i++) { + if (i % num_nodes == this_node) { + chain->addIndex(vt::Index1D(i)); + } + } + + chain->nextStep([=](vt::Index1D idx) { + auto msg = vt::makeMessage(); + return vt::messaging::PendingSend(msg, [=](MsgVirtualPtr){ + auto msg2 = vt::makeSharedMessage(); + proxy[idx].send(msg2); + }); + }); + + chain->nextStep([=](vt::Index1D idx) { + auto msg = vt::makeMessage(); + return vt::messaging::PendingSend(msg, [=](MsgVirtualPtr){ + fmt::print("release: {}\n", idx); + proxy[idx].release(epoch); + }); + }); + + vt::theTerm()->addAction([=]{ + auto const node = vt::theContext()->getNode(); + if (node == 0) { + auto msg = vt::makeSharedMessage(); + msg->k = k; + msg->r = r; + proxy.template broadcast(msg); + } + }); +} + +}}} // end namespace vt::tests::unit From cdf69abeab2acaf3ab7a4e5cd696a406b4f39e9b Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Wed, 18 Oct 2023 12:26:04 -0700 Subject: [PATCH 12/20] #410: collection: add missing header include --- src/vt/vrt/collection/types/untyped.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/vt/vrt/collection/types/untyped.h b/src/vt/vrt/collection/types/untyped.h index 87d4e68fcb..c17ae00b46 100644 --- a/src/vt/vrt/collection/types/untyped.h +++ b/src/vt/vrt/collection/types/untyped.h @@ -48,6 +48,8 @@ #include "vt/vrt/base/base.h" #include "vt/vrt/collection/types/has_migrate.h" +#include + namespace vt { namespace vrt { namespace collection { /* From 9764708b289853cdbcdc869e9decee6b8d30b685 Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Wed, 18 Oct 2023 12:26:29 -0700 Subject: [PATCH 13/20] #410: tests: cleanup tests, fix name collison --- .../termination/test_term_dep_epoch_active.cc | 20 +++++------ .../test_term_dep_epoch_collection.cc | 35 +++++++------------ 2 files changed, 22 insertions(+), 33 deletions(-) diff --git a/tests/unit/termination/test_term_dep_epoch_active.cc b/tests/unit/termination/test_term_dep_epoch_active.cc index ce1c2d14ae..f291006e38 100644 --- a/tests/unit/termination/test_term_dep_epoch_active.cc +++ b/tests/unit/termination/test_term_dep_epoch_active.cc @@ -57,7 +57,7 @@ using namespace vt::tests::unit; struct TestTermDepEpochActive : TestParallelHarness { }; -struct TestDep { +struct TestDepActive { static void depHandler() { num_dep++; vt_print(gen, "depHandler: num_dep={}, epoch={:x}\n", num_dep, theTerm()->getEpoch()); @@ -75,22 +75,22 @@ struct TestDep { static int num_non_dep; }; -/*static*/ int TestDep::num_dep = 0; -/*static*/ int TestDep::num_non_dep = 0; +/*static*/ int TestDepActive::num_dep = 0; +/*static*/ int TestDepActive::num_non_dep = 0; TEST_F(TestTermDepEpochActive, test_term_dep_epoch_active) { auto const& this_node = theContext()->getNode(); auto const& num_nodes = theContext()->getNumNodes(); int const k = 10; - TestDep::num_dep = 0; - TestDep::num_non_dep = 0; + TestDepActive::num_dep = 0; + TestDepActive::num_non_dep = 0; vt::theCollective()->barrier(); auto epoch = vt::theTerm()->makeEpochCollective(term::ParentEpochCapture{}, true); vt::theMsg()->pushEpoch(epoch); for (int i = 0; i < k; i++) { - vt::theMsg()->broadcast(); + vt::theMsg()->broadcast(); } vt::theMsg()->popEpoch(epoch); vt::theTerm()->finishedEpoch(epoch); @@ -100,20 +100,20 @@ TEST_F(TestTermDepEpochActive, test_term_dep_epoch_active) { chain->nextStep([=](NodeType node) { NodeType const next = this_node + 1 < num_nodes ? this_node + 1 : 0; - return vt::theMsg()->send(Node{next}); + return vt::theMsg()->send(Node{next}); }); chain->nextStep([=](NodeType node) { auto msg = vt::makeMessage(); return vt::messaging::PendingSend(msg, [=](MsgSharedPtr&){ - EXPECT_EQ(TestDep::num_dep, 0); + EXPECT_EQ(TestDepActive::num_dep, 0); theTerm()->releaseEpoch(epoch); }); }); vt::theTerm()->addAction([=]{ - EXPECT_EQ(TestDep::num_non_dep, 1); - EXPECT_EQ(TestDep::num_dep, num_nodes * k); + EXPECT_EQ(TestDepActive::num_non_dep, 1); + EXPECT_EQ(TestDepActive::num_dep, num_nodes * k); }); } diff --git a/tests/unit/termination/test_term_dep_epoch_collection.cc b/tests/unit/termination/test_term_dep_epoch_collection.cc index 0d77ee2f31..269cab7502 100644 --- a/tests/unit/termination/test_term_dep_epoch_collection.cc +++ b/tests/unit/termination/test_term_dep_epoch_collection.cc @@ -57,13 +57,8 @@ using namespace vt::tests::unit; struct TestTermDepEpochCollection : TestParallelHarness { }; -struct TestDep : vt::Collection { - struct TestMsg : vt::CollectionMessage { - int k = 0; - int r = 0; - }; - - void depHandler(TestMsg* msg) { +struct TestDepColl : vt::Collection { + void depHandler() { auto const& node = theContext()->getNode(); auto idx = this->getIndex(); num_dep++; @@ -71,7 +66,7 @@ struct TestDep : vt::Collection { EXPECT_EQ(num_non_dep, 1); } - void nonDepHandler(TestMsg* msg) { + void nonDepHandler() { auto const& node = theContext()->getNode(); auto idx = this->getIndex(); num_non_dep++; @@ -79,17 +74,17 @@ struct TestDep : vt::Collection { EXPECT_EQ(num_dep, 0); } - void check(TestMsg* msg) { + void check(int k, int r) { auto const num_nodes = vt::theContext()->getNumNodes(); EXPECT_EQ(num_non_dep, 1); - EXPECT_EQ(num_dep, num_nodes*msg->k); + EXPECT_EQ(num_dep, num_nodes * msg->k); } int num_dep = 0; int num_non_dep = 0; }; -using TestMsg = typename TestDep::TestMsg; +using TestMsg = typename TestDepColl::TestMsg; TEST_F(TestTermDepEpochCollection, test_term_dep_epoch_collection) { auto const& this_node = theContext()->getNode(); @@ -99,16 +94,14 @@ TEST_F(TestTermDepEpochCollection, test_term_dep_epoch_collection) { int const r = 48; auto range = vt::Index1D(r); - auto proxy = vt::theCollection()->constructCollective(range); + auto proxy = vt::theCollection()->constructCollective(range); vt::theCollective()->barrier(); auto epoch = vt::theTerm()->makeEpochCollectiveDep(); vt::theMsg()->pushEpoch(epoch); if (bcast) { for (int i = 0; i < k; i++) { - auto msg = vt::makeSharedMessage(); - //vt::envelopeSetEpoch(msg->env, epoch); - proxy.template broadcast(msg); + proxy.template broadcast<&TestDepColl::depHandler>(); } } else { } @@ -125,15 +118,14 @@ TEST_F(TestTermDepEpochCollection, test_term_dep_epoch_collection) { chain->nextStep([=](vt::Index1D idx) { auto msg = vt::makeMessage(); - return vt::messaging::PendingSend(msg, [=](MsgVirtualPtr){ - auto msg2 = vt::makeSharedMessage(); - proxy[idx].send(msg2); + return vt::messaging::PendingSend(msg, [=](MsgSharedPtr){ + proxy[idx].send<&TestDepColl::nonDepHandler>(); }); }); chain->nextStep([=](vt::Index1D idx) { auto msg = vt::makeMessage(); - return vt::messaging::PendingSend(msg, [=](MsgVirtualPtr){ + return vt::messaging::PendingSend(msg, [=](MsgSharedPtr){ fmt::print("release: {}\n", idx); proxy[idx].release(epoch); }); @@ -142,10 +134,7 @@ TEST_F(TestTermDepEpochCollection, test_term_dep_epoch_collection) { vt::theTerm()->addAction([=]{ auto const node = vt::theContext()->getNode(); if (node == 0) { - auto msg = vt::makeSharedMessage(); - msg->k = k; - msg->r = r; - proxy.template broadcast(msg); + proxy.template broadcast<&TestDepColl::check>(k, r); } }); } From a974aba6d62583b5ce79894a8390455d16c16716 Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Wed, 18 Oct 2023 13:29:29 -0700 Subject: [PATCH 14/20] #410: tests: fix license --- .../termination/test_term_dep_epoch_active.cc | 55 +++++++++---------- .../test_term_dep_epoch_collection.cc | 55 +++++++++---------- 2 files changed, 54 insertions(+), 56 deletions(-) diff --git a/tests/unit/termination/test_term_dep_epoch_active.cc b/tests/unit/termination/test_term_dep_epoch_active.cc index f291006e38..372a82efb7 100644 --- a/tests/unit/termination/test_term_dep_epoch_active.cc +++ b/tests/unit/termination/test_term_dep_epoch_active.cc @@ -1,44 +1,43 @@ /* //@HEADER -// ************************************************************************ +// ***************************************************************************** // -// test_term_dep_epoch_active.cc -// vt (Virtual Transport) -// Copyright (C) 2018 NTESS, LLC +// test_term_dep_epoch_active.cc +// DARMA/vt => Virtual Transport // -// Under the terms of Contract DE-NA-0003525 with NTESS, LLC, -// the U.S. Government retains certain rights in this software. +// Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC +// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. +// Government retains certain rights in this software. // // Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: +// modification, are permitted provided that the following conditions are met: // -// 1. Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. // -// 2. Redistributions in binary form must reproduce the above copyright -// notice, this list of conditions and the following disclaimer in the -// documentation and/or other materials provided with the distribution. +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. // -// 3. Neither the name of the Corporation nor the names of the -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from this +// software without specific prior written permission. // -// THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY -// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE -// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. // // Questions? Contact darma@sandia.gov // -// ************************************************************************ +// ***************************************************************************** //@HEADER */ diff --git a/tests/unit/termination/test_term_dep_epoch_collection.cc b/tests/unit/termination/test_term_dep_epoch_collection.cc index 269cab7502..0db1f7a12c 100644 --- a/tests/unit/termination/test_term_dep_epoch_collection.cc +++ b/tests/unit/termination/test_term_dep_epoch_collection.cc @@ -1,44 +1,43 @@ /* //@HEADER -// ************************************************************************ +// ***************************************************************************** // -// test_term_dep_epoch_collection.cc -// vt (Virtual Transport) -// Copyright (C) 2018 NTESS, LLC +// test_term_dep_epoch_collection.cc +// DARMA/vt => Virtual Transport // -// Under the terms of Contract DE-NA-0003525 with NTESS, LLC, -// the U.S. Government retains certain rights in this software. +// Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC +// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. +// Government retains certain rights in this software. // // Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: +// modification, are permitted provided that the following conditions are met: // -// 1. Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. // -// 2. Redistributions in binary form must reproduce the above copyright -// notice, this list of conditions and the following disclaimer in the -// documentation and/or other materials provided with the distribution. +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. // -// 3. Neither the name of the Corporation nor the names of the -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from this +// software without specific prior written permission. // -// THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY -// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE -// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. // // Questions? Contact darma@sandia.gov // -// ************************************************************************ +// ***************************************************************************** //@HEADER */ From 3e76af52c3c20f22fdb38f2836e5c722e64d6c8b Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Wed, 18 Oct 2023 13:32:08 -0700 Subject: [PATCH 15/20] #410: tests: fix some small compilation errors --- .../unit/termination/test_term_dep_epoch_collection.cc | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/tests/unit/termination/test_term_dep_epoch_collection.cc b/tests/unit/termination/test_term_dep_epoch_collection.cc index 0db1f7a12c..8d67e67760 100644 --- a/tests/unit/termination/test_term_dep_epoch_collection.cc +++ b/tests/unit/termination/test_term_dep_epoch_collection.cc @@ -76,15 +76,13 @@ struct TestDepColl : vt::Collection { void check(int k, int r) { auto const num_nodes = vt::theContext()->getNumNodes(); EXPECT_EQ(num_non_dep, 1); - EXPECT_EQ(num_dep, num_nodes * msg->k); + EXPECT_EQ(num_dep, num_nodes * k); } int num_dep = 0; int num_non_dep = 0; }; -using TestMsg = typename TestDepColl::TestMsg; - TEST_F(TestTermDepEpochCollection, test_term_dep_epoch_collection) { auto const& this_node = theContext()->getNode(); auto const& num_nodes = theContext()->getNumNodes(); @@ -96,7 +94,7 @@ TEST_F(TestTermDepEpochCollection, test_term_dep_epoch_collection) { auto proxy = vt::theCollection()->constructCollective(range); vt::theCollective()->barrier(); - auto epoch = vt::theTerm()->makeEpochCollectiveDep(); + auto epoch = vt::theTerm()->makeEpochCollective(term::ParentEpochCapture{}, true); vt::theMsg()->pushEpoch(epoch); if (bcast) { for (int i = 0; i < k; i++) { @@ -116,14 +114,14 @@ TEST_F(TestTermDepEpochCollection, test_term_dep_epoch_collection) { } chain->nextStep([=](vt::Index1D idx) { - auto msg = vt::makeMessage(); + auto msg = vt::makeMessage(); return vt::messaging::PendingSend(msg, [=](MsgSharedPtr){ proxy[idx].send<&TestDepColl::nonDepHandler>(); }); }); chain->nextStep([=](vt::Index1D idx) { - auto msg = vt::makeMessage(); + auto msg = vt::makeMessage(); return vt::messaging::PendingSend(msg, [=](MsgSharedPtr){ fmt::print("release: {}\n", idx); proxy[idx].release(epoch); From 3d2b1175edaa13b0add492035ec3cf41140edea6 Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Wed, 18 Oct 2023 13:56:17 -0700 Subject: [PATCH 16/20] #410: collection: switch broadcast after system broadcast to user msg --- src/vt/vrt/collection/manager.impl.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/vt/vrt/collection/manager.impl.h b/src/vt/vrt/collection/manager.impl.h index 459b5e8c75..e905ed2d99 100644 --- a/src/vt/vrt/collection/manager.impl.h +++ b/src/vt/vrt/collection/manager.impl.h @@ -231,6 +231,8 @@ template template /*static*/ void CollectionManager::collectionBcastHandler(MsgT* msg) { + envelopeSetSystemMsg(msg->env, false); + auto const col_msg = static_cast*>(msg); auto const bcast_proxy = col_msg->getBcastProxy(); auto const& group = envelopeGetGroup(msg->env); From f7462698a6f07b523bee7ff0bf25015d05b0f1a5 Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Thu, 19 Oct 2023 08:50:24 -0700 Subject: [PATCH 17/20] #410: collection: fix missing system message designation --- src/vt/vrt/collection/manager.impl.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/vt/vrt/collection/manager.impl.h b/src/vt/vrt/collection/manager.impl.h index e905ed2d99..f9258e65af 100644 --- a/src/vt/vrt/collection/manager.impl.h +++ b/src/vt/vrt/collection/manager.impl.h @@ -219,6 +219,8 @@ template auto m = promoteMsg(msg); + envelopeSetSystemMsg(m->env, false); + runnable::makeRunnable(m, true, han, from) .withTDEpoch(theMsg()->getEpochContextMsg(msg)) .withCollection(base) From cb2b51959ee9d906ebb93d2b6086487bda2cdf10 Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Thu, 19 Oct 2023 14:39:01 -0700 Subject: [PATCH 18/20] #410: tests: rewrite dep epoch test to fix logic error --- .../termination/test_term_dep_epoch_active.cc | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/tests/unit/termination/test_term_dep_epoch_active.cc b/tests/unit/termination/test_term_dep_epoch_active.cc index 372a82efb7..d4a0152045 100644 --- a/tests/unit/termination/test_term_dep_epoch_active.cc +++ b/tests/unit/termination/test_term_dep_epoch_active.cc @@ -57,39 +57,36 @@ using namespace vt::tests::unit; struct TestTermDepEpochActive : TestParallelHarness { }; struct TestDepActive { - static void depHandler() { + void depHandler() { num_dep++; vt_print(gen, "depHandler: num_dep={}, epoch={:x}\n", num_dep, theTerm()->getEpoch()); EXPECT_EQ(num_non_dep, 1); } - static void nonDepHandler() { + void nonDepHandler() { //auto const& node = theContext()->getNode(); num_non_dep++; //fmt::print("{}: nonDepHandler: num_non_dep={}\n", node, num_non_dep); EXPECT_EQ(num_dep, 0); } - static int num_dep; - static int num_non_dep; + int num_dep = 0; + int num_non_dep = 0; }; -/*static*/ int TestDepActive::num_dep = 0; -/*static*/ int TestDepActive::num_non_dep = 0; - TEST_F(TestTermDepEpochActive, test_term_dep_epoch_active) { auto const& this_node = theContext()->getNode(); auto const& num_nodes = theContext()->getNumNodes(); int const k = 10; - TestDepActive::num_dep = 0; - TestDepActive::num_non_dep = 0; + auto proxy = theObjGroup()->makeCollective("TestDepActive"); + vt::theCollective()->barrier(); auto epoch = vt::theTerm()->makeEpochCollective(term::ParentEpochCapture{}, true); vt::theMsg()->pushEpoch(epoch); for (int i = 0; i < k; i++) { - vt::theMsg()->broadcast(); + proxy.broadcast<&TestDepActive::depHandler>(); } vt::theMsg()->popEpoch(epoch); vt::theTerm()->finishedEpoch(epoch); @@ -98,21 +95,20 @@ TEST_F(TestTermDepEpochActive, test_term_dep_epoch_active) { chain->addIndex(this_node); chain->nextStep([=](NodeType node) { - NodeType const next = this_node + 1 < num_nodes ? this_node + 1 : 0; - return vt::theMsg()->send(Node{next}); + return proxy[this_node].send<&TestDepActive::nonDepHandler>(); }); chain->nextStep([=](NodeType node) { auto msg = vt::makeMessage(); return vt::messaging::PendingSend(msg, [=](MsgSharedPtr&){ - EXPECT_EQ(TestDepActive::num_dep, 0); - theTerm()->releaseEpoch(epoch); + EXPECT_EQ(proxy.get()->num_dep, 0); + proxy[this_node].release(epoch); }); }); vt::theTerm()->addAction([=]{ - EXPECT_EQ(TestDepActive::num_non_dep, 1); - EXPECT_EQ(TestDepActive::num_dep, num_nodes * k); + EXPECT_EQ(proxy.get()->num_non_dep, 1); + EXPECT_EQ(proxy.get()->num_dep, num_nodes * k); }); } From 4b3a9c69723d4e60ead1ba20312d13b87c38efb4 Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Tue, 31 Oct 2023 12:54:24 -0700 Subject: [PATCH 19/20] #410: termination: remove unneeded code, cleanup scheduler --- src/vt/scheduler/scheduler.cc | 5 ++--- src/vt/scheduler/scheduler.impl.h | 2 +- src/vt/termination/termination.cc | 25 +------------------------ src/vt/termination/termination.h | 22 +++------------------- 4 files changed, 7 insertions(+), 47 deletions(-) diff --git a/src/vt/scheduler/scheduler.cc b/src/vt/scheduler/scheduler.cc index 71a9f05dcd..fc9725125e 100644 --- a/src/vt/scheduler/scheduler.cc +++ b/src/vt/scheduler/scheduler.cc @@ -385,12 +385,11 @@ void Scheduler::resume(ThreadIDType tid) { } void Scheduler::releaseEpoch(EpochType ep) { - if (auto iter = pending_work_.find(ep); iter != pending_work_.end()) { - auto& container = iter->second; + if (auto result = pending_work_.extract(ep); result) { + auto& container = result.mapped(); while (container.size() > 0) { work_queue_.emplace(container.pop()); } - pending_work_.erase(iter); } } diff --git a/src/vt/scheduler/scheduler.impl.h b/src/vt/scheduler/scheduler.impl.h index 0be989272b..e2f5199243 100644 --- a/src/vt/scheduler/scheduler.impl.h +++ b/src/vt/scheduler/scheduler.impl.h @@ -143,7 +143,7 @@ void Scheduler::enqueueOrPostpone(UnitT unit) { return; } } - } else if (not theTerm()->epochReleased(ep)) { + } else if (not theTerm()->isEpochReleased(ep)) { pending_work_[ep].push(unit); return; } diff --git a/src/vt/termination/termination.cc b/src/vt/termination/termination.cc index 392419bee1..2ac6bbb1a6 100644 --- a/src/vt/termination/termination.cc +++ b/src/vt/termination/termination.cc @@ -1131,7 +1131,6 @@ void TerminationDetector::releaseEpoch(EpochType epoch) { // Put the epoch in the released set. The epoch any_epoch_sentinel does not // count as a succcessor. epoch_released_.insert(epoch); - runReleaseEpochActions(epoch); } else { // The user might have made a mistake if they are trying to release an epoch // that is released-by-default (not dependent) @@ -1139,29 +1138,7 @@ void TerminationDetector::releaseEpoch(EpochType epoch) { } } -void TerminationDetector::runReleaseEpochActions(EpochType epoch) { - auto iter = epoch_release_action_.find(epoch); - if (iter != epoch_release_action_.end()) { - auto actions = std::move(iter->second); - epoch_release_action_.erase(iter); - for (auto&& fn : actions) { - fn(); - } - } - theSched()->releaseEpoch(epoch); -} - -void TerminationDetector::onReleaseEpoch(EpochType epoch, ActionType action) { - // Run an action if an epoch has been released - bool const is_dep = isDep(epoch); - if (not is_dep or (is_dep and epochReleased(epoch))) { - action(); - } else { - epoch_release_action_[epoch].push_back(action); - } -} - -bool TerminationDetector::epochReleased(EpochType epoch) { +bool TerminationDetector::isEpochReleased(EpochType epoch) { // Because of case (2), ignore dep <- no-dep because this should not be called // unless dep is released bool const is_dep = isDep(epoch); diff --git a/src/vt/termination/termination.h b/src/vt/termination/termination.h index 7e28ae64b4..22be8e5450 100644 --- a/src/vt/termination/termination.h +++ b/src/vt/termination/termination.h @@ -377,21 +377,14 @@ struct TerminationDetector : void releaseEpoch(EpochType epoch); /** - * \brief Action to run on release of a dependent epoch - * - * \param[in] epoch the epoch - * \param[in] action the action - */ - void onReleaseEpoch(EpochType epoch, ActionType action); - - /** - * \brief Test if an epoch is dependent and if it is, if that epoch has been released + * \brief Test if an epoch is dependent and if it is, if that epoch has been + * released * * \param[in] epoch the epoch * * \return if it is released */ - bool epochReleased(EpochType epoch); + bool isEpochReleased(EpochType epoch); private: /** @@ -401,13 +394,6 @@ struct TerminationDetector : */ void cleanupReleasedEpoch(EpochType epoch); - /** - * \brief Run all actions when an epoch is released - * - * \param[in] epoch the epoch to run actions for - */ - void runReleaseEpochActions(EpochType epoch); - public: /* * Directly call into a specific type of rooted epoch, can not be overridden @@ -891,8 +877,6 @@ struct TerminationDetector : EpochStackType epoch_stack_; // released epoch list for dependent epochs std::unordered_set epoch_released_ = {}; - // release epoch action list for dependent epochs - std::unordered_map epoch_release_action_ = {}; }; }} // end namespace vt::term From 48f7a8c85c44240934ffc23842b64a808497ead1 Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Tue, 31 Oct 2023 12:56:21 -0700 Subject: [PATCH 20/20] #410: termination: cleanup more code---remove unecessary condition --- src/vt/termination/termination.cc | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/vt/termination/termination.cc b/src/vt/termination/termination.cc index 2ac6bbb1a6..56fd29851e 100644 --- a/src/vt/termination/termination.cc +++ b/src/vt/termination/termination.cc @@ -1166,10 +1166,7 @@ void TerminationDetector::cleanupReleasedEpoch(EpochType epoch) { epoch ); if (is_term) { - auto iter = epoch_released_.find(epoch); - if (iter != epoch_released_.end()) { - epoch_released_.erase(iter); - } + epoch_released_.erase(epoch); } } }