Skip to content

Commit

Permalink
#410: epoch: add test, move pending epochs to scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
lifflander committed Oct 18, 2023
1 parent 10147f4 commit 1db25fa
Show file tree
Hide file tree
Showing 17 changed files with 188 additions and 99 deletions.
7 changes: 7 additions & 0 deletions src/vt/context/runnable_context/td.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ struct TD {
*/
void resume();

/**
* \brief Get epoch for this context
*
* \return the epoch
*/
EpochType getEpoch() const { return ep_; }

private:
EpochType ep_ = no_epoch; /**< The epoch for the task */
#if vt_check_enabled(fcontext)
Expand Down
33 changes: 12 additions & 21 deletions src/vt/epoch/epoch_manip.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,6 @@ namespace vt { namespace epoch {

static EpochType const arch_epoch_coll = makeEpochZero();

EpochManip::EpochManip()
: terminated_collective_epochs_(
std::make_unique<EpochWindow>(arch_epoch_coll)
)
{ }

/*static*/ EpochType EpochManip::generateEpoch(
bool const& is_rooted, NodeType const& root_node,
eEpochCategory const& category
Expand Down Expand Up @@ -123,22 +117,19 @@ EpochType EpochManip::getArchetype(EpochType epoch) const {

EpochWindow* EpochManip::getTerminatedWindow(EpochType epoch) {
auto const is_rooted = isRooted(epoch);
if (is_rooted and epoch != term::any_epoch_sentinel) {
auto const& arch_epoch = getArchetype(epoch);
auto iter = terminated_epochs_.find(arch_epoch);
if (iter == terminated_epochs_.end()) {
terminated_epochs_.emplace(
std::piecewise_construct,
std::forward_as_tuple(arch_epoch),
std::forward_as_tuple(std::make_unique<EpochWindow>(arch_epoch))
);
iter = terminated_epochs_.find(arch_epoch);
}
return iter->second.get();
} else {
vtAssertExpr(terminated_collective_epochs_ != nullptr);
return terminated_collective_epochs_.get();
auto& container = is_rooted and epoch != term::any_epoch_sentinel ?
terminated_epochs_ : terminated_collective_epochs_;
auto const& arch_epoch = getArchetype(epoch);
auto iter = container.find(arch_epoch);
if (iter == container.end()) {
container.emplace(
std::piecewise_construct,
std::forward_as_tuple(arch_epoch),
std::forward_as_tuple(std::make_unique<EpochWindow>(arch_epoch))
);
iter = container.find(arch_epoch);
}
return iter->second.get();
}

/*static*/ bool EpochManip::isRooted(EpochType const& epoch) {
Expand Down
12 changes: 7 additions & 5 deletions src/vt/epoch/epoch_manip.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ namespace vt { namespace epoch {
struct EpochManip : runtime::component::Component<EpochManip> {
using CapturedContextType = term::ParentEpochCapture;

EpochManip();
EpochManip() = default;

std::string name() override { return "EpochManip"; }

Expand Down Expand Up @@ -276,10 +276,12 @@ struct EpochManip : runtime::component::Component<EpochManip> {
}

private:
// epoch window container for specific archetyped epochs
std::unordered_map<EpochType,std::unique_ptr<EpochWindow>> terminated_epochs_;
// epoch window for basic collective epochs
std::unique_ptr<EpochWindow> terminated_collective_epochs_ = nullptr;
/// epoch window container for specific archetyped epochs
std::unordered_map<EpochType, std::unique_ptr<EpochWindow>> terminated_epochs_;
/// epoch window for basic collective epochs
std::unordered_map<
EpochType, std::unique_ptr<EpochWindow>
> terminated_collective_epochs_;
};

}} /* end namespace vt::epoch */
Expand Down
20 changes: 1 addition & 19 deletions src/vt/messaging/active.cc
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ void ActiveMessenger::finishPendingDataMsgAsyncRecv(InProgressDataIRecv* irecv)
theTerm()->consume(term::any_epoch_sentinel,1,sender);
theTerm()->hangDetectRecv();
};
theSched()->enqueue(irecv->priority, run);
theSched()->enqueueLambda(irecv->priority, run);
}
}

Expand Down Expand Up @@ -964,13 +964,6 @@ void ActiveMessenger::prepareActiveMsgToRun(
if (is_obj) {
objgroup::dispatchObjGroup(base, handler, from_node, cont);
} else {
if (epoch != term::any_epoch_sentinel and epoch::EpochManip::isDep(epoch)) {
if (not theTerm()->epochReleased(epoch)) {
pending_epoch_msgs_[epoch].emplace_back(base, from_node);
return;
}
}

runnable::makeRunnable(base, not is_term, handler, from_node)
.withContinuation(cont)
.withTDEpochFromMsg(is_term)
Expand All @@ -989,17 +982,6 @@ void ActiveMessenger::prepareActiveMsgToRun(
}
}

void ActiveMessenger::releaseEpochMsgs(EpochType epoch) {
auto iter = pending_epoch_msgs_.find(epoch);
if (iter != pending_epoch_msgs_.end()) {
auto msgs = std::move(iter->second);
pending_epoch_msgs_.erase(iter);
for (auto&& m : msgs) {
prepareActiveMsgToRun(m.buffered_msg, m.from_node, true, m.cont);
}
}
}

bool ActiveMessenger::tryProcessIncomingActiveMsg() {
CountType num_probe_bytes;
MPI_Status stat;
Expand Down
10 changes: 0 additions & 10 deletions src/vt/messaging/active.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,6 @@ struct ActiveMessenger : runtime::component::PollableComponent<ActiveMessenger>
using SendFnType = std::function<SendInfo(PtrLenPairType,NodeType,TagType)>;
using UserSendFnType = std::function<void(SendFnType)>;
using ContainerPendingType = std::unordered_map<TagType,PendingRecvType>;
using MsgContType = std::list<BufferedMsgType>;
using EpochWaitType = std::unordered_map<EpochType, MsgContType>;
using ReadyHanTagType = std::tuple<HandlerType, TagType>;
using HandlerManagerType = HandlerManager;
using PendingSendType = PendingSend;
Expand Down Expand Up @@ -1652,13 +1650,6 @@ struct ActiveMessenger : runtime::component::PollableComponent<ActiveMessenger>
# endif
}

/*
* \brief Deliver messages that are now released with a dependent epoch
*
* \param[in] epoch the epoch to release
*/
void releaseEpochMsgs(EpochType epoch);

private:
/**
* \internal \brief Allocate a new, unused tag.
Expand Down Expand Up @@ -1773,7 +1764,6 @@ struct ActiveMessenger : runtime::component::PollableComponent<ActiveMessenger>
elm::ElementIDStruct bare_handler_dummy_elm_id_for_lb_data_ = {};
elm::ElementLBData bare_handler_lb_data_;
MPI_Comm comm_ = MPI_COMM_NULL;
EpochWaitType pending_epoch_msgs_ = {};
};

}} // end namespace vt::messaging
Expand Down
8 changes: 8 additions & 0 deletions src/vt/runnable/runnable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,14 @@ void RunnableNew::resume(TimeType time) {
#endif
}

EpochType RunnableNew::getEpoch() const {
if (contexts_.has_td) {
return contexts_.td.getEpoch();
} else {
return no_epoch;
}
}

void RunnableNew::send(elm::ElementIDStruct elm, MsgSizeType bytes) {
if (contexts_.has_lb) contexts_.lb.send(elm, bytes);
}
Expand Down
7 changes: 7 additions & 0 deletions src/vt/runnable/runnable.h
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,13 @@ struct RunnableNew {
*/
static void operator delete(void* ptr);

/**
* \brief Get the epoch for a runnable
*
* \return the epoch
*/
EpochType getEpoch() const;

private:
detail::Contexts contexts_; /**< The contexts */
MsgSharedPtr<BaseMsgType> msg_ = nullptr; /**< The associated message */
Expand Down
2 changes: 1 addition & 1 deletion src/vt/runtime/runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ bool Runtime::initialize(bool const force_now) {
printStartupBanner();
// Enqueue a check for later in case arguments are modified before work
// actually executes
theSched->enqueue([this]{
theSched->enqueueLambda([this]{
this->checkForArgumentErrors();
});

Expand Down
2 changes: 1 addition & 1 deletion src/vt/runtime/runtime_banner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ void Runtime::printStartupBanner() {

// Enqueue a check for later in case arguments are modified before work
// actually executes
theSched->enqueue([this]{
theSched->enqueueLambda([this]{
this->checkForArgumentErrors();
});
}
Expand Down
11 changes: 11 additions & 0 deletions src/vt/scheduler/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,17 @@ void Scheduler::resume(ThreadIDType tid) {
suspended_.resumeRunnable(tid);
}

void Scheduler::releaseEpoch(EpochType ep) {
auto iter = pending_work_.find(ep);
if (iter != pending_work_.end()) {
auto& container = iter->second;
while (container.size() > 0) {
work_queue_.emplace(container.pop());
}
pending_work_.erase(iter);
}
}

#if vt_check_enabled(fcontext)
ThreadManager* Scheduler::getThreadManager() {
return thread_manager_.get();
Expand Down
31 changes: 30 additions & 1 deletion src/vt/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,24 @@ struct Scheduler : runtime::component::Component<Scheduler> {
template <typename RunT>
void enqueue(RunT r);

/**
* \brief Enqueue a callable to execute later with the default priority
* \c default_priority
*
* \param[in] r action to execute
*/
template <typename Callable>
void enqueueLambda(Callable&& c);

/**
* \brief Enqueue a callable to execute later with a priority
*
* \param[in] priority the priority of the action
* \param[in] r action to execute
*/
template <typename Callable>
void enqueueLambda(PriorityType priority, Callable&& c);

/**
* \brief Enqueue an runnable with a priority to execute later
*
Expand Down Expand Up @@ -352,6 +370,13 @@ struct Scheduler : runtime::component::Component<Scheduler> {
ThreadManager* getThreadManager();
#endif

/**
* \brief Release an epoch to run
*
* \param[in] ep the epoch to release
*/
void releaseEpoch(EpochType ep);

template <typename SerializerT>
void serialize(SerializerT& s) {
s | work_queue_
Expand Down Expand Up @@ -381,7 +406,8 @@ struct Scheduler : runtime::component::Component<Scheduler> {
| vtLiveTime
| schedLoopTime
| idleTime
| idleTimeMinusTerm;
| idleTimeMinusTerm
| pending_work_;
}

private:
Expand Down Expand Up @@ -419,6 +445,9 @@ struct Scheduler : runtime::component::Component<Scheduler> {
Queue<UnitType> work_queue_;
# endif

/// Unreleased work pending an epoch release
std::unordered_map<EpochType, Queue<UnitType>> pending_work_;

#if vt_check_enabled(fcontext)
std::unique_ptr<ThreadManager> thread_manager_ = nullptr;
#endif
Expand Down
Loading

0 comments on commit 1db25fa

Please sign in to comment.