Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#410: Dependent Epochs rewritten #2204

Open
wants to merge 20 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ccd5afd
#410: epoch: change unused InsertEpoch to DependentEpoch
lifflander Jun 17, 2019
b393469
#410: epoch: add function to bit-combine epoch category bits
lifflander Jun 19, 2019
9a1efe8
#410: termination: add isDep check
lifflander Sep 28, 2023
72fbfec
#410: term: implement dependent epochs
lifflander Jun 20, 2019
5cd8599
#410: test: add release dependent epoch test
lifflander Jun 20, 2019
10147f4
#410: reduce: fix warning
lifflander Oct 12, 2023
1db25fa
#410: epoch: add test, move pending epochs to scheduler
lifflander Oct 12, 2023
a9816c6
#410: epoch: rework deps, objgroup dep epochs, scheduler buffers
lifflander Oct 16, 2023
4de8a74
#410: objgroup: implement objgroup proxy functions for dependent epochs
lifflander Oct 17, 2023
bfdc3f4
#410: collection: add dependent epochs to collections, system message…
lifflander Oct 18, 2023
3c1ac7f
#410: test: add new test for dep epochs and collections
lifflander Aug 16, 2019
cdf69ab
#410: collection: add missing header include
lifflander Oct 18, 2023
9764708
#410: tests: cleanup tests, fix name collison
lifflander Oct 18, 2023
a974aba
#410: tests: fix license
lifflander Oct 18, 2023
3e76af5
#410: tests: fix some small compilation errors
lifflander Oct 18, 2023
3d2b117
#410: collection: switch broadcast after system broadcast to user msg
lifflander Oct 18, 2023
f746269
#410: collection: fix missing system message designation
lifflander Oct 19, 2023
cb2b519
#410: tests: rewrite dep epoch test to fix logic error
lifflander Oct 19, 2023
4b3a9c6
#410: termination: remove unneeded code, cleanup scheduler
lifflander Oct 31, 2023
48f7a8c
#410: termination: cleanup more code---remove unecessary condition
lifflander Oct 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/vt/context/runnable_context/td.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ struct TD {
*/
void resume();

/**
* \brief Get epoch for this context
*
* \return the epoch
*/
EpochType getEpoch() const { return ep_; }

private:
EpochType ep_ = no_epoch; /**< The epoch for the task */
#if vt_check_enabled(fcontext)
Expand Down
33 changes: 12 additions & 21 deletions src/vt/epoch/epoch_manip.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,6 @@ namespace vt { namespace epoch {

static EpochType const arch_epoch_coll = makeEpochZero();

EpochManip::EpochManip()
: terminated_collective_epochs_(
std::make_unique<EpochWindow>(arch_epoch_coll)
)
{ }

/*static*/ EpochType EpochManip::generateEpoch(
bool const& is_rooted, NodeType const& root_node,
eEpochCategory const& category
Expand Down Expand Up @@ -123,22 +117,19 @@ EpochType EpochManip::getArchetype(EpochType epoch) const {

EpochWindow* EpochManip::getTerminatedWindow(EpochType epoch) {
auto const is_rooted = isRooted(epoch);
if (is_rooted and epoch != term::any_epoch_sentinel) {
auto const& arch_epoch = getArchetype(epoch);
auto iter = terminated_epochs_.find(arch_epoch);
if (iter == terminated_epochs_.end()) {
terminated_epochs_.emplace(
std::piecewise_construct,
std::forward_as_tuple(arch_epoch),
std::forward_as_tuple(std::make_unique<EpochWindow>(arch_epoch))
);
iter = terminated_epochs_.find(arch_epoch);
}
return iter->second.get();
} else {
vtAssertExpr(terminated_collective_epochs_ != nullptr);
return terminated_collective_epochs_.get();
auto& container = is_rooted and epoch != term::any_epoch_sentinel ?
terminated_epochs_ : terminated_collective_epochs_;
auto const& arch_epoch = getArchetype(epoch);
auto iter = container.find(arch_epoch);
if (iter == container.end()) {
container.emplace(
std::piecewise_construct,
std::forward_as_tuple(arch_epoch),
std::forward_as_tuple(std::make_unique<EpochWindow>(arch_epoch))
);
iter = container.find(arch_epoch);
}
return iter->second.get();
}

/*static*/ bool EpochManip::isRooted(EpochType const& epoch) {
Expand Down
12 changes: 7 additions & 5 deletions src/vt/epoch/epoch_manip.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ namespace vt { namespace epoch {
struct EpochManip : runtime::component::Component<EpochManip> {
using CapturedContextType = term::ParentEpochCapture;

EpochManip();
EpochManip() = default;

std::string name() override { return "EpochManip"; }

Expand Down Expand Up @@ -276,10 +276,12 @@ struct EpochManip : runtime::component::Component<EpochManip> {
}

private:
// epoch window container for specific archetyped epochs
std::unordered_map<EpochType,std::unique_ptr<EpochWindow>> terminated_epochs_;
// epoch window for basic collective epochs
std::unique_ptr<EpochWindow> terminated_collective_epochs_ = nullptr;
/// epoch window container for specific archetyped epochs
std::unordered_map<EpochType, std::unique_ptr<EpochWindow>> terminated_epochs_;
/// epoch window for basic collective epochs
std::unordered_map<
EpochType, std::unique_ptr<EpochWindow>
> terminated_collective_epochs_;
};

}} /* end namespace vt::epoch */
Expand Down
20 changes: 1 addition & 19 deletions src/vt/messaging/active.cc
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ void ActiveMessenger::finishPendingDataMsgAsyncRecv(InProgressDataIRecv* irecv)
theTerm()->consume(term::any_epoch_sentinel,1,sender);
theTerm()->hangDetectRecv();
};
theSched()->enqueue(irecv->priority, run);
theSched()->enqueueLambda(irecv->priority, run);
}
}

Expand Down Expand Up @@ -964,13 +964,6 @@ void ActiveMessenger::prepareActiveMsgToRun(
if (is_obj) {
objgroup::dispatchObjGroup(base, handler, from_node, cont);
} else {
if (epoch != term::any_epoch_sentinel and epoch::EpochManip::isDep(epoch)) {
if (not theTerm()->epochReleased(epoch)) {
pending_epoch_msgs_[epoch].emplace_back(base, from_node);
return;
}
}

runnable::makeRunnable(base, not is_term, handler, from_node)
.withContinuation(cont)
.withTDEpochFromMsg(is_term)
Expand All @@ -989,17 +982,6 @@ void ActiveMessenger::prepareActiveMsgToRun(
}
}

void ActiveMessenger::releaseEpochMsgs(EpochType epoch) {
auto iter = pending_epoch_msgs_.find(epoch);
if (iter != pending_epoch_msgs_.end()) {
auto msgs = std::move(iter->second);
pending_epoch_msgs_.erase(iter);
for (auto&& m : msgs) {
prepareActiveMsgToRun(m.buffered_msg, m.from_node, true, m.cont);
}
}
}

bool ActiveMessenger::tryProcessIncomingActiveMsg() {
CountType num_probe_bytes;
MPI_Status stat;
Expand Down
10 changes: 0 additions & 10 deletions src/vt/messaging/active.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,6 @@ struct ActiveMessenger : runtime::component::PollableComponent<ActiveMessenger>
using SendFnType = std::function<SendInfo(PtrLenPairType,NodeType,TagType)>;
using UserSendFnType = std::function<void(SendFnType)>;
using ContainerPendingType = std::unordered_map<TagType,PendingRecvType>;
using MsgContType = std::list<BufferedMsgType>;
using EpochWaitType = std::unordered_map<EpochType, MsgContType>;
using ReadyHanTagType = std::tuple<HandlerType, TagType>;
using HandlerManagerType = HandlerManager;
using PendingSendType = PendingSend;
Expand Down Expand Up @@ -1652,13 +1650,6 @@ struct ActiveMessenger : runtime::component::PollableComponent<ActiveMessenger>
# endif
}

/*
* \brief Deliver messages that are now released with a dependent epoch
*
* \param[in] epoch the epoch to release
*/
void releaseEpochMsgs(EpochType epoch);

private:
/**
* \internal \brief Allocate a new, unused tag.
Expand Down Expand Up @@ -1773,7 +1764,6 @@ struct ActiveMessenger : runtime::component::PollableComponent<ActiveMessenger>
elm::ElementIDStruct bare_handler_dummy_elm_id_for_lb_data_ = {};
elm::ElementLBData bare_handler_lb_data_;
MPI_Comm comm_ = MPI_COMM_NULL;
EpochWaitType pending_epoch_msgs_ = {};
};

}} // end namespace vt::messaging
Expand Down
8 changes: 8 additions & 0 deletions src/vt/runnable/runnable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,14 @@ void RunnableNew::resume(TimeType time) {
#endif
}

EpochType RunnableNew::getEpoch() const {
if (contexts_.has_td) {
return contexts_.td.getEpoch();
} else {
return no_epoch;
}
}

void RunnableNew::send(elm::ElementIDStruct elm, MsgSizeType bytes) {
if (contexts_.has_lb) contexts_.lb.send(elm, bytes);
}
Expand Down
7 changes: 7 additions & 0 deletions src/vt/runnable/runnable.h
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,13 @@ struct RunnableNew {
*/
static void operator delete(void* ptr);

/**
* \brief Get the epoch for a runnable
*
* \return the epoch
*/
EpochType getEpoch() const;

private:
detail::Contexts contexts_; /**< The contexts */
MsgSharedPtr<BaseMsgType> msg_ = nullptr; /**< The associated message */
Expand Down
2 changes: 1 addition & 1 deletion src/vt/runtime/runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ bool Runtime::initialize(bool const force_now) {
printStartupBanner();
// Enqueue a check for later in case arguments are modified before work
// actually executes
theSched->enqueue([this]{
theSched->enqueueLambda([this]{
this->checkForArgumentErrors();
});

Expand Down
2 changes: 1 addition & 1 deletion src/vt/runtime/runtime_banner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ void Runtime::printStartupBanner() {

// Enqueue a check for later in case arguments are modified before work
// actually executes
theSched->enqueue([this]{
theSched->enqueueLambda([this]{
this->checkForArgumentErrors();
});
}
Expand Down
11 changes: 11 additions & 0 deletions src/vt/scheduler/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,17 @@ void Scheduler::resume(ThreadIDType tid) {
suspended_.resumeRunnable(tid);
}

void Scheduler::releaseEpoch(EpochType ep) {
auto iter = pending_work_.find(ep);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another possible use of pending_work_.extract(ep)

if (iter != pending_work_.end()) {
auto& container = iter->second;
while (container.size() > 0) {
work_queue_.emplace(container.pop());
}
pending_work_.erase(iter);
}
}

#if vt_check_enabled(fcontext)
ThreadManager* Scheduler::getThreadManager() {
return thread_manager_.get();
Expand Down
31 changes: 30 additions & 1 deletion src/vt/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,24 @@ struct Scheduler : runtime::component::Component<Scheduler> {
template <typename RunT>
void enqueue(RunT r);

/**
* \brief Enqueue a callable to execute later with the default priority
* \c default_priority
*
* \param[in] r action to execute
*/
template <typename Callable>
void enqueueLambda(Callable&& c);

/**
* \brief Enqueue a callable to execute later with a priority
*
* \param[in] priority the priority of the action
* \param[in] r action to execute
*/
template <typename Callable>
void enqueueLambda(PriorityType priority, Callable&& c);

/**
* \brief Enqueue an runnable with a priority to execute later
*
Expand Down Expand Up @@ -352,6 +370,13 @@ struct Scheduler : runtime::component::Component<Scheduler> {
ThreadManager* getThreadManager();
#endif

/**
* \brief Release an epoch to run
*
* \param[in] ep the epoch to release
*/
void releaseEpoch(EpochType ep);

template <typename SerializerT>
void serialize(SerializerT& s) {
s | work_queue_
Expand Down Expand Up @@ -381,7 +406,8 @@ struct Scheduler : runtime::component::Component<Scheduler> {
| vtLiveTime
| schedLoopTime
| idleTime
| idleTimeMinusTerm;
| idleTimeMinusTerm
| pending_work_;
}

private:
Expand Down Expand Up @@ -419,6 +445,9 @@ struct Scheduler : runtime::component::Component<Scheduler> {
Queue<UnitType> work_queue_;
# endif

/// Unreleased work pending an epoch release
std::unordered_map<EpochType, Queue<UnitType>> pending_work_;

#if vt_check_enabled(fcontext)
std::unique_ptr<ThreadManager> thread_manager_ = nullptr;
#endif
Expand Down
Loading