Skip to content

Commit

Permalink
#410: collection: add dependent epochs to collections, system message…
Browse files Browse the repository at this point in the history
…s to keep progress
  • Loading branch information
lifflander committed Oct 18, 2023
1 parent 3642cfa commit 7a9f9a7
Show file tree
Hide file tree
Showing 17 changed files with 417 additions and 16 deletions.
12 changes: 12 additions & 0 deletions src/vt/group/collective/group_info_collective.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ void InfoColl::setupCollective() {
auto msg = makeMessage<GroupCollectiveMsg>(
group_, up_tree_cont_, in_group, size, child
);
envelopeSetSystemMsg(msg->env, true);
theMsg()->sendMsg<upHan>(parent, msg);
}
}
Expand Down Expand Up @@ -336,6 +337,7 @@ void InfoColl::upTree() {
auto msg = makeMessage<GroupCollectiveMsg>(
group,new_root_cont_,true,subtree_zero,root_node,0,extra
);
envelopeSetSystemMsg(msg->env, true);
theMsg()->sendMsg<newRootHan>(root_node, msg);

for (std::size_t i = 1; i < msg_list.size(); i++) {
Expand Down Expand Up @@ -377,6 +379,7 @@ void InfoColl::upTree() {
auto cmsg = makeMessage<GroupCollectiveMsg>(
group,op,is_in_group,total_subtree,child,level
);
envelopeSetSystemMsg(cmsg->env, true);
theMsg()->sendMsg<upHan>(p, cmsg);

for (auto&& msg : msg_in_group) {
Expand Down Expand Up @@ -406,6 +409,7 @@ void InfoColl::upTree() {
auto msg = makeMessage<GroupCollectiveMsg>(
group,op,is_in_group,static_cast<NodeType>(subtree_),child,0,extra
);
envelopeSetSystemMsg(msg->env, true);
theMsg()->sendMsg<upHan>(p, msg);
/*
* Forward all the children messages up the tree (up to 2 of them)
Expand Down Expand Up @@ -438,9 +442,11 @@ void InfoColl::upTree() {
auto msg = makeMessage<GroupCollectiveMsg>(
group,op,is_in_group,total_subtree,child,0,extra
);
envelopeSetSystemMsg(msg->env, true);
theMsg()->sendMsg<upHan>(p, msg);
// new MsgPtr to avoid thief of original in collection
auto msg_out = promoteMsg(msg_in_group[0].get());
envelopeSetSystemMsg(msg_out->env, true);
theMsg()->sendMsg<upHan>(p, msg_out);
} else {
vtAssertExpr(msg_in_group.size() > 2);
Expand Down Expand Up @@ -478,6 +484,7 @@ void InfoColl::upTree() {
auto msg = makeMessage<GroupCollectiveMsg>(
group,op,is_in_group,total_subtree,child,0,extra
);
envelopeSetSystemMsg(msg->env, true);
theMsg()->sendMsg<upHan>(p, msg);

vt_debug_print(
Expand Down Expand Up @@ -639,12 +646,14 @@ void InfoColl::downTree(GroupCollectiveMsg* msg) {
auto const& num = collective_->span_children_.size();
auto const& child = collective_->span_children_[msg->getChild() % num];
auto nmsg = makeMessage<GroupCollectiveMsg>(*msg);
envelopeSetSystemMsg(nmsg->env, true);
theMsg()->sendMsg<downHan>(child, nmsg);
++send_down_;
}

auto const& group_ = getGroupID();
auto nmsg = makeMessage<GroupOnlyMsg>(group_,down_tree_fin_cont_);
envelopeSetSystemMsg(nmsg->env, true);
theMsg()->sendMsg<downFinishedHan>(from, nmsg);
}

Expand Down Expand Up @@ -686,6 +695,7 @@ void InfoColl::sendDownNewTree() {
group_, c
);
auto msg = makeMessage<GroupOnlyMsg>(group_,new_tree_cont_);
envelopeSetSystemMsg(msg->env, true);
theMsg()->sendMsg<newTreeHan>(c, msg);
}
}
Expand Down Expand Up @@ -727,6 +737,7 @@ void InfoColl::finalize() {
auto msg = makeMessage<GroupOnlyMsg>(
group_,finalize_cont_,known_root_node_,is_default_group_
);
envelopeSetSystemMsg(msg->env, true);
theMsg()->sendMsg<finalizeHan>(c, msg);
}

Expand All @@ -744,6 +755,7 @@ void InfoColl::finalize() {

auto stamp = makeStamp<StrongUserID>(group_);
auto msg = makeMessage<FinishedReduceMsg>(group_);
envelopeSetSystemMsg(msg->env, true);
using OpType = collective::PlusOp<collective::NoneType>;

auto r = theGroup()->reducer();
Expand Down
2 changes: 2 additions & 0 deletions src/vt/group/rooted/group_info_rooted.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ void InfoRooted::setupRooted() {
auto msg = makeMessage<GroupListMsg>(
low_node, listsize, group_, op, listsize, this_node, &lst
);
envelopeSetSystemMsg(msg->env, true);
is_forward_ = true;
forward_node_ = low_node;
if (this_node != low_node) {
Expand All @@ -142,6 +143,7 @@ void InfoRooted::setupRooted() {
low_node, regsize, group_, op, regsize, this_node,
static_cast<region::Range*>(region_.get())
);
envelopeSetSystemMsg(msg->env, true);
is_forward_ = true;
forward_node_ = low_node;
if (this_node != low_node) {
Expand Down
4 changes: 4 additions & 0 deletions src/vt/messaging/active.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,27 +68,31 @@ void ActiveMessenger::markAsTermMessage(MsgPtrT const msg) {
#if vt_check_enabled(trace_enabled)
envelopeSetTraceRuntimeEnabled(msg->env, theConfig()->traceTerm());
#endif
envelopeSetSystemMsg(msg->env, true);
}

template <typename MsgPtrT>
void ActiveMessenger::markAsLocationMessage(MsgPtrT const msg) {
#if vt_check_enabled(trace_enabled)
envelopeSetTraceRuntimeEnabled(msg->env, theConfig()->traceLocation());
#endif
envelopeSetSystemMsg(msg->env, true);
}

template <typename MsgPtrT>
void ActiveMessenger::markAsSerialMsgMessage(MsgPtrT const msg) {
#if vt_check_enabled(trace_enabled)
envelopeSetTraceRuntimeEnabled(msg->env, theConfig()->traceSerialMsg());
#endif
envelopeSetSystemMsg(msg->env, true);
}

template <typename MsgPtrT>
void ActiveMessenger::markAsCollectionMessage(MsgPtrT const msg) {
#if vt_check_enabled(trace_enabled)
envelopeSetTraceRuntimeEnabled(msg->env, theConfig()->traceCollection());
#endif
envelopeSetSystemMsg(msg->env, true);
}

template <typename MsgT>
Expand Down
4 changes: 4 additions & 0 deletions src/vt/messaging/envelope/envelope_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ struct ActiveEnvelope {
/// Used to denote that the message's bare handlers shouldn't record
/// communication LB data due to redundancy
bool comm_lb_data_recorded_above_bare_handler : 1;

/// Used to denote that this is an internal system message that does not do
/// application work
bool system_msg : 1;
};

}} /* end namespace vt::messaging */
Expand Down
10 changes: 10 additions & 0 deletions src/vt/messaging/envelope/envelope_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,16 @@ inline void envelopeSetIsLocked(Env& env, bool is_locked);
template <typename Env>
inline void envelopeUnlockForForwarding(Env& env);

/**
* \brief Set whether this is an internal system message
*
* \param[in,out] env the envelope
* \param[in] is_system_msg value indicating message is a system message
*/
template <typename Env>
inline void envelopeSetSystemMsg(Env& env, bool is_system_msg);


} /* end namespace vt */

#include "vt/messaging/envelope/envelope_set.impl.h"
Expand Down
5 changes: 5 additions & 0 deletions src/vt/messaging/envelope/envelope_set.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ inline void envelopeSetCommLBDataRecordedAboveBareHandler(
comm_lb_data_recorded_above_bare_handler;
}

template <typename Env>
inline void envelopeSetSystemMsg(Env& env, bool is_system_msg) {
reinterpret_cast<Envelope*>(&env)->system_msg = is_system_msg;
}

template <typename Env>
inline void envelopeSetIsLocked(Env& env, bool is_locked) {
reinterpret_cast<Envelope*>(&env)->is_locked = is_locked;
Expand Down
1 change: 1 addition & 0 deletions src/vt/messaging/envelope/envelope_setup.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ inline void envelopeInit(Env& env) {
#endif
envelopeSetHasBeenSerialized(env, false);
envelopeSetCommLBDataRecordedAboveBareHandler(env, false);
envelopeSetSystemMsg(env, false);
}

inline void envelopeInitEmpty(Envelope& env) {
Expand Down
18 changes: 18 additions & 0 deletions src/vt/scheduler/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,24 @@ void Scheduler::releaseEpochObjgroup(EpochType ep, ObjGroupProxyType proxy) {
}
}

void Scheduler::releaseEpochCollection(EpochType ep, UntypedCollection* untyped) {
if (
auto iter = pending_collection_work_.find(ep);
iter != pending_collection_work_.end())
{
if (auto iter2 = iter->second.find(untyped); iter2 != iter->second.end()) {
auto& container = iter2->second;
while (container.size() > 0) {
work_queue_.emplace(container.pop());
}
iter->second.erase(iter2);
}
if (iter->second.size() == 0) {
pending_collection_work_.erase(iter);
}
}
}

bool Scheduler::isReleasedEpochObjgroup(
EpochType ep, ObjGroupProxyType proxy
) const {
Expand Down
14 changes: 14 additions & 0 deletions src/vt/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include "vt/timing/timing.h"
#include "vt/runtime/component/component_pack.h"
#include "vt/messaging/async_op_wrapper.fwd.h"
#include "vt/vrt/collection/types/untyped.h"

#include <cassert>
#include <vector>
Expand Down Expand Up @@ -115,6 +116,7 @@ struct Scheduler : runtime::component::Component<Scheduler> {
using TriggerContainerType = std::list<TriggerType>;
using EventTriggerContType = std::vector<TriggerContainerType>;
using RunnablePtrType = runnable::RunnableNew*;
using UntypedCollection = vrt::collection::UntypedCollection;

struct SchedulerLoopGuard {
SchedulerLoopGuard(Scheduler* scheduler);
Expand Down Expand Up @@ -384,6 +386,13 @@ struct Scheduler : runtime::component::Component<Scheduler> {
*/
void releaseEpochObjgroup(EpochType ep, ObjGroupProxyType proxy);

/**
* \brief Release an epoch to run
*
* \param[in] ep the epoch to release
*/
void releaseEpochCollection(EpochType ep, UntypedCollection* untyped);

/**
* \brief Check if a epoch is released for an objgroup
*
Expand Down Expand Up @@ -426,6 +435,7 @@ struct Scheduler : runtime::component::Component<Scheduler> {
| idleTimeMinusTerm
| pending_work_
| pending_objgroup_work_
| pending_collection_work_
| released_objgroups_;
}

Expand Down Expand Up @@ -472,6 +482,10 @@ struct Scheduler : runtime::component::Component<Scheduler> {
EpochType, std::unordered_map<ObjGroupProxyType, Queue<UnitType>>
> pending_objgroup_work_;

std::unordered_map<
EpochType, std::unordered_map<UntypedCollection*, Queue<UnitType>>
> pending_collection_work_;

/// Released epochs for an objgroup
std::unordered_map<EpochType, std::set<ObjGroupProxyType>> released_objgroups_;

Expand Down
39 changes: 25 additions & 14 deletions src/vt/scheduler/scheduler.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,22 +120,33 @@ void Scheduler::enqueue(MsgT* msg, RunT r) {
template <typename UnitT>
void Scheduler::enqueueOrPostpone(UnitT unit) {
auto r = unit.getRunnable();
auto ep = r->getEpoch();

bool const is_dep = epoch::EpochManip::isDep(ep);

if (is_dep) {
auto obj = r->getObj();
if (ep != no_epoch and ep != term::any_epoch_sentinel) {
if (obj and r->isObjGroup()) {
auto proxy = objgroup::getProxyFromPtr(obj);
if (released_objgroups_[ep].find(proxy) == released_objgroups_[ep].end()) {
pending_objgroup_work_[ep][proxy].push(unit);
auto m = r->getMsg();

// If it's a system message we can schedule it right away
if (m and not m->env.system_msg) {
auto ep = r->getEpoch();
bool const is_dep = epoch::EpochManip::isDep(ep);
if (is_dep) {
auto obj = r->getObj();
if (ep != no_epoch and ep != term::any_epoch_sentinel) {
if (obj) {
if (r->isObjGroup()) {
auto proxy = objgroup::getProxyFromPtr(obj);
if (released_objgroups_[ep].find(proxy) == released_objgroups_[ep].end()) {
pending_objgroup_work_[ep][proxy].push(unit);
return;
}
} else {
auto untyped = static_cast<UntypedCollection*>(obj);
if (not untyped->isReleasedEpoch(ep)) {
pending_collection_work_[ep][untyped].push(unit);
return;
}
}
} else if (not theTerm()->epochReleased(ep)) {
pending_work_[ep].push(unit);
return;
}
} else if (not theTerm()->epochReleased(ep)) {
pending_work_[ep].push(unit);
return;
}
}
}
Expand Down
45 changes: 45 additions & 0 deletions src/vt/vrt/collection/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -1467,6 +1467,50 @@ struct CollectionManager
template <typename ColT>
void addCleanupFn(VirtualProxyType proxy);

public:
/**
* \brief Release a dependent epoch for a given proxy index
*
* \param[in] proxy the element proxy
* \param[in] epoch epoch to release
*/
template <typename ColT>
void releaseEpoch(
VrtElmProxy<ColT, typename ColT::IndexType> proxy, EpochType epoch
);

/**
* \brief Release a dependent epoch for the whole collection
*
* \param[in] proxy the collection proxy
* \param[in] epoch the epoch to release
*/
template <typename ColT>
void releaseEpochCollection(VirtualProxyType proxy, EpochType epoch);

/**
* \brief Check if a dependent epoch for a given proxy index is released
*
* \param[in] proxy the element proxy
* \param[in] epoch epoch to check
*
* \return whether it is released on this element
*/
template <typename ColT>
bool isReleasedEpoch(
VrtElmProxy<ColT, typename ColT::IndexType> proxy, EpochType epoch
);

private:
/**
* \brief Handler to release a dependent epoch across the whole collection
*
* \param[in] proxy the collection proxy
* \param[in] epoch epoch to release
*/
template <typename ColT>
static void releaseWholeCollection(VirtualProxyType proxy, EpochType epoch);

private:
template <typename ColT, typename IndexT>
friend struct CollectionElmAttorney;
Expand Down Expand Up @@ -1782,6 +1826,7 @@ struct CollectionManager
#include "vt/vrt/collection/destroy/manager_destroy_attorney.impl.h"
#include "vt/vrt/collection/broadcast/broadcastable.impl.h"
#include "vt/vrt/collection/rdmaable/rdmaable.impl.h"
#include "vt/vrt/collection/release/releasable.impl.h"
#include "vt/vrt/collection/balance/col_lb_data.impl.h"
#include "vt/vrt/collection/types/indexable.impl.h"
#include "vt/vrt/collection/dispatch/dispatch.impl.h"
Expand Down
Loading

0 comments on commit 7a9f9a7

Please sign in to comment.