Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#410: Dependent Epochs rewritten #2204

Open
wants to merge 20 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ccd5afd
#410: epoch: change unused InsertEpoch to DependentEpoch
lifflander Jun 17, 2019
b393469
#410: epoch: add function to bit-combine epoch category bits
lifflander Jun 19, 2019
9a1efe8
#410: termination: add isDep check
lifflander Sep 28, 2023
72fbfec
#410: term: implement dependent epochs
lifflander Jun 20, 2019
5cd8599
#410: test: add release dependent epoch test
lifflander Jun 20, 2019
10147f4
#410: reduce: fix warning
lifflander Oct 12, 2023
1db25fa
#410: epoch: add test, move pending epochs to scheduler
lifflander Oct 12, 2023
a9816c6
#410: epoch: rework deps, objgroup dep epochs, scheduler buffers
lifflander Oct 16, 2023
4de8a74
#410: objgroup: implement objgroup proxy functions for dependent epochs
lifflander Oct 17, 2023
bfdc3f4
#410: collection: add dependent epochs to collections, system message…
lifflander Oct 18, 2023
3c1ac7f
#410: test: add new test for dep epochs and collections
lifflander Aug 16, 2019
cdf69ab
#410: collection: add missing header include
lifflander Oct 18, 2023
9764708
#410: tests: cleanup tests, fix name collison
lifflander Oct 18, 2023
a974aba
#410: tests: fix license
lifflander Oct 18, 2023
3e76af5
#410: tests: fix some small compilation errors
lifflander Oct 18, 2023
3d2b117
#410: collection: switch broadcast after system broadcast to user msg
lifflander Oct 18, 2023
f746269
#410: collection: fix missing system message designation
lifflander Oct 19, 2023
cb2b519
#410: tests: rewrite dep epoch test to fix logic error
lifflander Oct 19, 2023
4b3a9c6
#410: termination: remove unneeded code, cleanup scheduler
lifflander Oct 31, 2023
48f7a8c
#410: termination: cleanup more code---remove unecessary condition
lifflander Oct 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/vt/group/collective/group_info_collective.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ void InfoColl::setupCollective() {
auto msg = makeMessage<GroupCollectiveMsg>(
group_, up_tree_cont_, in_group, size, child
);
envelopeSetSystemMsg(msg->env, true);
theMsg()->sendMsg<upHan>(parent, msg);
}
}
Expand Down Expand Up @@ -336,6 +337,7 @@ void InfoColl::upTree() {
auto msg = makeMessage<GroupCollectiveMsg>(
group,new_root_cont_,true,subtree_zero,root_node,0,extra
);
envelopeSetSystemMsg(msg->env, true);
theMsg()->sendMsg<newRootHan>(root_node, msg);

for (std::size_t i = 1; i < msg_list.size(); i++) {
Expand Down Expand Up @@ -377,6 +379,7 @@ void InfoColl::upTree() {
auto cmsg = makeMessage<GroupCollectiveMsg>(
group,op,is_in_group,total_subtree,child,level
);
envelopeSetSystemMsg(cmsg->env, true);
theMsg()->sendMsg<upHan>(p, cmsg);

for (auto&& msg : msg_in_group) {
Expand Down Expand Up @@ -406,6 +409,7 @@ void InfoColl::upTree() {
auto msg = makeMessage<GroupCollectiveMsg>(
group,op,is_in_group,static_cast<NodeType>(subtree_),child,0,extra
);
envelopeSetSystemMsg(msg->env, true);
theMsg()->sendMsg<upHan>(p, msg);
/*
* Forward all the children messages up the tree (up to 2 of them)
Expand Down Expand Up @@ -438,9 +442,11 @@ void InfoColl::upTree() {
auto msg = makeMessage<GroupCollectiveMsg>(
group,op,is_in_group,total_subtree,child,0,extra
);
envelopeSetSystemMsg(msg->env, true);
theMsg()->sendMsg<upHan>(p, msg);
// new MsgPtr to avoid thief of original in collection
auto msg_out = promoteMsg(msg_in_group[0].get());
envelopeSetSystemMsg(msg_out->env, true);
theMsg()->sendMsg<upHan>(p, msg_out);
} else {
vtAssertExpr(msg_in_group.size() > 2);
Expand Down Expand Up @@ -478,6 +484,7 @@ void InfoColl::upTree() {
auto msg = makeMessage<GroupCollectiveMsg>(
group,op,is_in_group,total_subtree,child,0,extra
);
envelopeSetSystemMsg(msg->env, true);
theMsg()->sendMsg<upHan>(p, msg);

vt_debug_print(
Expand Down Expand Up @@ -639,12 +646,14 @@ void InfoColl::downTree(GroupCollectiveMsg* msg) {
auto const& num = collective_->span_children_.size();
auto const& child = collective_->span_children_[msg->getChild() % num];
auto nmsg = makeMessage<GroupCollectiveMsg>(*msg);
envelopeSetSystemMsg(nmsg->env, true);
theMsg()->sendMsg<downHan>(child, nmsg);
++send_down_;
}

auto const& group_ = getGroupID();
auto nmsg = makeMessage<GroupOnlyMsg>(group_,down_tree_fin_cont_);
envelopeSetSystemMsg(nmsg->env, true);
theMsg()->sendMsg<downFinishedHan>(from, nmsg);
}

Expand Down Expand Up @@ -686,6 +695,7 @@ void InfoColl::sendDownNewTree() {
group_, c
);
auto msg = makeMessage<GroupOnlyMsg>(group_,new_tree_cont_);
envelopeSetSystemMsg(msg->env, true);
theMsg()->sendMsg<newTreeHan>(c, msg);
}
}
Expand Down Expand Up @@ -727,6 +737,7 @@ void InfoColl::finalize() {
auto msg = makeMessage<GroupOnlyMsg>(
group_,finalize_cont_,known_root_node_,is_default_group_
);
envelopeSetSystemMsg(msg->env, true);
theMsg()->sendMsg<finalizeHan>(c, msg);
}

Expand All @@ -744,6 +755,7 @@ void InfoColl::finalize() {

auto stamp = makeStamp<StrongUserID>(group_);
auto msg = makeMessage<FinishedReduceMsg>(group_);
envelopeSetSystemMsg(msg->env, true);
using OpType = collective::PlusOp<collective::NoneType>;

auto r = theGroup()->reducer();
Expand Down
2 changes: 2 additions & 0 deletions src/vt/group/rooted/group_info_rooted.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ void InfoRooted::setupRooted() {
auto msg = makeMessage<GroupListMsg>(
low_node, listsize, group_, op, listsize, this_node, &lst
);
envelopeSetSystemMsg(msg->env, true);
is_forward_ = true;
forward_node_ = low_node;
if (this_node != low_node) {
Expand All @@ -142,6 +143,7 @@ void InfoRooted::setupRooted() {
low_node, regsize, group_, op, regsize, this_node,
static_cast<region::Range*>(region_.get())
);
envelopeSetSystemMsg(msg->env, true);
is_forward_ = true;
forward_node_ = low_node;
if (this_node != low_node) {
Expand Down
4 changes: 4 additions & 0 deletions src/vt/messaging/active.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,27 +68,31 @@ void ActiveMessenger::markAsTermMessage(MsgPtrT const msg) {
#if vt_check_enabled(trace_enabled)
envelopeSetTraceRuntimeEnabled(msg->env, theConfig()->traceTerm());
#endif
envelopeSetSystemMsg(msg->env, true);
}

template <typename MsgPtrT>
void ActiveMessenger::markAsLocationMessage(MsgPtrT const msg) {
#if vt_check_enabled(trace_enabled)
envelopeSetTraceRuntimeEnabled(msg->env, theConfig()->traceLocation());
#endif
envelopeSetSystemMsg(msg->env, true);
}

template <typename MsgPtrT>
void ActiveMessenger::markAsSerialMsgMessage(MsgPtrT const msg) {
#if vt_check_enabled(trace_enabled)
envelopeSetTraceRuntimeEnabled(msg->env, theConfig()->traceSerialMsg());
#endif
envelopeSetSystemMsg(msg->env, true);
}

template <typename MsgPtrT>
void ActiveMessenger::markAsCollectionMessage(MsgPtrT const msg) {
#if vt_check_enabled(trace_enabled)
envelopeSetTraceRuntimeEnabled(msg->env, theConfig()->traceCollection());
#endif
envelopeSetSystemMsg(msg->env, true);
}

template <typename MsgT>
Expand Down
4 changes: 4 additions & 0 deletions src/vt/messaging/envelope/envelope_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ struct ActiveEnvelope {
/// Used to denote that the message's bare handlers shouldn't record
/// communication LB data due to redundancy
bool comm_lb_data_recorded_above_bare_handler : 1;

/// Used to denote that this is an internal system message that does not do
/// application work
bool system_msg : 1;
};

}} /* end namespace vt::messaging */
Expand Down
10 changes: 10 additions & 0 deletions src/vt/messaging/envelope/envelope_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,16 @@ inline void envelopeSetIsLocked(Env& env, bool is_locked);
template <typename Env>
inline void envelopeUnlockForForwarding(Env& env);

/**
* \brief Set whether this is an internal system message
*
* \param[in,out] env the envelope
* \param[in] is_system_msg value indicating message is a system message
*/
template <typename Env>
inline void envelopeSetSystemMsg(Env& env, bool is_system_msg);


} /* end namespace vt */

#include "vt/messaging/envelope/envelope_set.impl.h"
Expand Down
5 changes: 5 additions & 0 deletions src/vt/messaging/envelope/envelope_set.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ inline void envelopeSetCommLBDataRecordedAboveBareHandler(
comm_lb_data_recorded_above_bare_handler;
}

template <typename Env>
inline void envelopeSetSystemMsg(Env& env, bool is_system_msg) {
reinterpret_cast<Envelope*>(&env)->system_msg = is_system_msg;
}

template <typename Env>
inline void envelopeSetIsLocked(Env& env, bool is_locked) {
reinterpret_cast<Envelope*>(&env)->is_locked = is_locked;
Expand Down
1 change: 1 addition & 0 deletions src/vt/messaging/envelope/envelope_setup.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ inline void envelopeInit(Env& env) {
#endif
envelopeSetHasBeenSerialized(env, false);
envelopeSetCommLBDataRecordedAboveBareHandler(env, false);
envelopeSetSystemMsg(env, false);
}

inline void envelopeInitEmpty(Envelope& env) {
Expand Down
18 changes: 18 additions & 0 deletions src/vt/scheduler/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,24 @@ void Scheduler::releaseEpochObjgroup(EpochType ep, ObjGroupProxyType proxy) {
}
}

void Scheduler::releaseEpochCollection(EpochType ep, UntypedCollection* untyped) {
if (
auto iter = pending_collection_work_.find(ep);
iter != pending_collection_work_.end())
{
if (auto iter2 = iter->second.find(untyped); iter2 != iter->second.end()) {
auto& container = iter2->second;
while (container.size() > 0) {
work_queue_.emplace(container.pop());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to move the values from container, rather than actually removing them, so that no work needs to be done on the structure of container itself, which is going away right after this loop.

I.e.

for (auto &unit : container) {
  work_queue_.emplace(std::move(unit));
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with this is that container is not iterable. The container is a hybrid queue implemented differently depending on whether priorities are enabled. If they are not, it is a 64 entry circular buffer with an overflow std::queue container. We could make it iterable, but it would need a custom iterator.

}
iter->second.erase(iter2);
}
if (iter->second.size() == 0) {
pending_collection_work_.erase(iter);
}
}
}

bool Scheduler::isReleasedEpochObjgroup(
EpochType ep, ObjGroupProxyType proxy
) const {
Expand Down
14 changes: 14 additions & 0 deletions src/vt/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include "vt/timing/timing.h"
#include "vt/runtime/component/component_pack.h"
#include "vt/messaging/async_op_wrapper.fwd.h"
#include "vt/vrt/collection/types/untyped.h"

#include <cassert>
#include <vector>
Expand Down Expand Up @@ -115,6 +116,7 @@ struct Scheduler : runtime::component::Component<Scheduler> {
using TriggerContainerType = std::list<TriggerType>;
using EventTriggerContType = std::vector<TriggerContainerType>;
using RunnablePtrType = runnable::RunnableNew*;
using UntypedCollection = vrt::collection::UntypedCollection;

struct SchedulerLoopGuard {
SchedulerLoopGuard(Scheduler* scheduler);
Expand Down Expand Up @@ -384,6 +386,13 @@ struct Scheduler : runtime::component::Component<Scheduler> {
*/
void releaseEpochObjgroup(EpochType ep, ObjGroupProxyType proxy);

/**
* \brief Release an epoch to run
*
* \param[in] ep the epoch to release
*/
void releaseEpochCollection(EpochType ep, UntypedCollection* untyped);

/**
* \brief Check if a epoch is released for an objgroup
*
Expand Down Expand Up @@ -426,6 +435,7 @@ struct Scheduler : runtime::component::Component<Scheduler> {
| idleTimeMinusTerm
| pending_work_
| pending_objgroup_work_
| pending_collection_work_
| released_objgroups_;
}

Expand Down Expand Up @@ -472,6 +482,10 @@ struct Scheduler : runtime::component::Component<Scheduler> {
EpochType, std::unordered_map<ObjGroupProxyType, Queue<UnitType>>
> pending_objgroup_work_;

std::unordered_map<
EpochType, std::unordered_map<UntypedCollection*, Queue<UnitType>>
> pending_collection_work_;

/// Released epochs for an objgroup
std::unordered_map<EpochType, std::set<ObjGroupProxyType>> released_objgroups_;

Expand Down
39 changes: 25 additions & 14 deletions src/vt/scheduler/scheduler.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,22 +120,33 @@ void Scheduler::enqueue(MsgT* msg, RunT r) {
template <typename UnitT>
void Scheduler::enqueueOrPostpone(UnitT unit) {
auto r = unit.getRunnable();
auto ep = r->getEpoch();

bool const is_dep = epoch::EpochManip::isDep(ep);

if (is_dep) {
auto obj = r->getObj();
if (ep != no_epoch and ep != term::any_epoch_sentinel) {
if (obj and r->isObjGroup()) {
auto proxy = objgroup::getProxyFromPtr(obj);
if (released_objgroups_[ep].find(proxy) == released_objgroups_[ep].end()) {
pending_objgroup_work_[ep][proxy].push(unit);
auto m = r->getMsg();

// If it's a system message we can schedule it right away
if (m and not m->env.system_msg) {
auto ep = r->getEpoch();
bool const is_dep = epoch::EpochManip::isDep(ep);
if (is_dep) {
auto obj = r->getObj();
if (ep != no_epoch and ep != term::any_epoch_sentinel) {
Comment on lines +130 to +131
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Phil's comment about peeling off cases. If you're not going to do that, you might at least reverse the order of these two lines.

if (obj) {
if (r->isObjGroup()) {
auto proxy = objgroup::getProxyFromPtr(obj);
if (released_objgroups_[ep].find(proxy) == released_objgroups_[ep].end()) {
pending_objgroup_work_[ep][proxy].push(unit);
return;
}
} else {
auto untyped = static_cast<UntypedCollection*>(obj);
if (not untyped->isReleasedEpoch(ep)) {
pending_collection_work_[ep][untyped].push(unit);
return;
}
}
} else if (not theTerm()->epochReleased(ep)) {
pending_work_[ep].push(unit);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nesting is getting really deep here. It might be nice to peel off some of the cases as early returns, if/as possible.

return;
}
} else if (not theTerm()->epochReleased(ep)) {
pending_work_[ep].push(unit);
return;
}
}
}
Expand Down
45 changes: 45 additions & 0 deletions src/vt/vrt/collection/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -1467,6 +1467,50 @@ struct CollectionManager
template <typename ColT>
void addCleanupFn(VirtualProxyType proxy);

public:
/**
* \brief Release a dependent epoch for a given proxy index
*
* \param[in] proxy the element proxy
* \param[in] epoch epoch to release
*/
template <typename ColT>
void releaseEpoch(
VrtElmProxy<ColT, typename ColT::IndexType> proxy, EpochType epoch
);

/**
* \brief Release a dependent epoch for the whole collection
*
* \param[in] proxy the collection proxy
* \param[in] epoch the epoch to release
*/
template <typename ColT>
void releaseEpochCollection(VirtualProxyType proxy, EpochType epoch);

/**
* \brief Check if a dependent epoch for a given proxy index is released
*
* \param[in] proxy the element proxy
* \param[in] epoch epoch to check
*
* \return whether it is released on this element
*/
template <typename ColT>
bool isReleasedEpoch(
VrtElmProxy<ColT, typename ColT::IndexType> proxy, EpochType epoch
);

private:
/**
* \brief Handler to release a dependent epoch across the whole collection
*
* \param[in] proxy the collection proxy
* \param[in] epoch epoch to release
*/
template <typename ColT>
static void releaseWholeCollection(VirtualProxyType proxy, EpochType epoch);

private:
template <typename ColT, typename IndexT>
friend struct CollectionElmAttorney;
Expand Down Expand Up @@ -1782,6 +1826,7 @@ struct CollectionManager
#include "vt/vrt/collection/destroy/manager_destroy_attorney.impl.h"
#include "vt/vrt/collection/broadcast/broadcastable.impl.h"
#include "vt/vrt/collection/rdmaable/rdmaable.impl.h"
#include "vt/vrt/collection/release/releasable.impl.h"
#include "vt/vrt/collection/balance/col_lb_data.impl.h"
#include "vt/vrt/collection/types/indexable.impl.h"
#include "vt/vrt/collection/dispatch/dispatch.impl.h"
Expand Down
Loading