Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#410: Dependent Epochs rewritten #2204

Open
wants to merge 20 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ccd5afd
#410: epoch: change unused InsertEpoch to DependentEpoch
lifflander Jun 17, 2019
b393469
#410: epoch: add function to bit-combine epoch category bits
lifflander Jun 19, 2019
9a1efe8
#410: termination: add isDep check
lifflander Sep 28, 2023
72fbfec
#410: term: implement dependent epochs
lifflander Jun 20, 2019
5cd8599
#410: test: add release dependent epoch test
lifflander Jun 20, 2019
10147f4
#410: reduce: fix warning
lifflander Oct 12, 2023
1db25fa
#410: epoch: add test, move pending epochs to scheduler
lifflander Oct 12, 2023
a9816c6
#410: epoch: rework deps, objgroup dep epochs, scheduler buffers
lifflander Oct 16, 2023
4de8a74
#410: objgroup: implement objgroup proxy functions for dependent epochs
lifflander Oct 17, 2023
bfdc3f4
#410: collection: add dependent epochs to collections, system message…
lifflander Oct 18, 2023
3c1ac7f
#410: test: add new test for dep epochs and collections
lifflander Aug 16, 2019
cdf69ab
#410: collection: add missing header include
lifflander Oct 18, 2023
9764708
#410: tests: cleanup tests, fix name collison
lifflander Oct 18, 2023
a974aba
#410: tests: fix license
lifflander Oct 18, 2023
3e76af5
#410: tests: fix some small compilation errors
lifflander Oct 18, 2023
3d2b117
#410: collection: switch broadcast after system broadcast to user msg
lifflander Oct 18, 2023
f746269
#410: collection: fix missing system message designation
lifflander Oct 19, 2023
cb2b519
#410: tests: rewrite dep epoch test to fix logic error
lifflander Oct 19, 2023
4b3a9c6
#410: termination: remove unneeded code, cleanup scheduler
lifflander Oct 31, 2023
48f7a8c
#410: termination: cleanup more code---remove unecessary condition
lifflander Oct 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/vt/collective/reduce/reduce_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@

namespace vt { namespace collective { namespace reduce {

static std::unique_ptr<Reduce> makeReduceScope(detail::ReduceScope const& scope) {
return std::make_unique<Reduce>(scope);
}

ReduceManager::ReduceManager()
: reducers_( // default cons reducer for non-group
[](detail::ReduceScope const& scope) {
return std::make_unique<Reduce>(scope);
}
)
: reducers_(makeReduceScope)
{
// insert the default reducer scope
reducers_.make(
Expand Down
7 changes: 7 additions & 0 deletions src/vt/context/runnable_context/td.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ struct TD {
*/
void resume();

/**
* \brief Get epoch for this context
*
* \return the epoch
*/
EpochType getEpoch() const { return ep_; }

private:
EpochType ep_ = no_epoch; /**< The epoch for the task */
#if vt_check_enabled(fcontext)
Expand Down
2 changes: 1 addition & 1 deletion src/vt/epoch/epoch.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ static constexpr BitCountType const epoch_category_num_bits = 2;
*/
enum struct eEpochCategory : int8_t {
NoCategoryEpoch = 0x0,
InsertEpoch = 0x1,
DependentEpoch = 0x1,
DijkstraScholtenEpoch = 0x2
};

Expand Down
64 changes: 43 additions & 21 deletions src/vt/epoch/epoch_manip.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,6 @@ namespace vt { namespace epoch {

static EpochType const arch_epoch_coll = makeEpochZero();

EpochManip::EpochManip()
: terminated_collective_epochs_(
std::make_unique<EpochWindow>(arch_epoch_coll)
)
{ }

/*static*/ EpochType EpochManip::generateEpoch(
bool const& is_rooted, NodeType const& root_node,
eEpochCategory const& category
Expand Down Expand Up @@ -123,22 +117,19 @@ EpochType EpochManip::getArchetype(EpochType epoch) const {

EpochWindow* EpochManip::getTerminatedWindow(EpochType epoch) {
auto const is_rooted = isRooted(epoch);
if (is_rooted and epoch != term::any_epoch_sentinel) {
auto const& arch_epoch = getArchetype(epoch);
auto iter = terminated_epochs_.find(arch_epoch);
if (iter == terminated_epochs_.end()) {
terminated_epochs_.emplace(
std::piecewise_construct,
std::forward_as_tuple(arch_epoch),
std::forward_as_tuple(std::make_unique<EpochWindow>(arch_epoch))
);
iter = terminated_epochs_.find(arch_epoch);
}
return iter->second.get();
} else {
vtAssertExpr(terminated_collective_epochs_ != nullptr);
return terminated_collective_epochs_.get();
auto& container = is_rooted and epoch != term::any_epoch_sentinel ?
terminated_epochs_ : terminated_collective_epochs_;
auto const& arch_epoch = getArchetype(epoch);
auto iter = container.find(arch_epoch);
if (iter == container.end()) {
container.emplace(
std::piecewise_construct,
std::forward_as_tuple(arch_epoch),
std::forward_as_tuple(std::make_unique<EpochWindow>(arch_epoch))
);
iter = container.find(arch_epoch);
}
return iter->second.get();
}

/*static*/ bool EpochManip::isRooted(EpochType const& epoch) {
Expand All @@ -148,6 +139,29 @@ EpochWindow* EpochManip::getTerminatedWindow(EpochType epoch) {
return BitPackerType::boolGetField<field,size,ImplType>(*epoch);
}

/*static*/ bool EpochManip::isDS(EpochType epoch) {
using T = typename std::underlying_type<eEpochCategory>::type;
if (epoch != term::any_epoch_sentinel and isRooted(epoch)) {
BitPackerType::FieldType const ds_bit =
static_cast<T>(eEpochCategory::DijkstraScholtenEpoch) - 1;
auto cat = static_cast<T>(EpochManip::category(epoch));
return BitPackerType::boolGetField<ds_bit,1,decltype(cat)>(cat);
} else {
return false;
}
}

/*static*/ bool EpochManip::isDep(EpochType epoch) {
using T = typename std::underlying_type<eEpochCategory>::type;
if (epoch == no_epoch or epoch == term::any_epoch_sentinel) {
return false;
}
BitPackerType::FieldType const dep_bit =
static_cast<T>(eEpochCategory::DependentEpoch) - 1;
auto cat = static_cast<T>(EpochManip::category(epoch));
return BitPackerType::boolGetField<dep_bit,1,decltype(cat)>(cat);
}

/*static*/ eEpochCategory EpochManip::category(EpochType const& epoch) {
return BitPackerType::getField<
eEpochRoot::rEpochCategory, epoch_category_num_bits, eEpochCategory
Expand Down Expand Up @@ -190,6 +204,14 @@ void EpochManip::setCategory(EpochType& epoch, eEpochCategory const cat) {
>(*epoch,cat);
}

/*static*/ eEpochCategory EpochManip::makeCat(
eEpochCategory c1, eEpochCategory c2
) {
using T = typename std::underlying_type<eEpochCategory>::type;
auto ret = static_cast<T>(c1) | static_cast<T>(c2);
return static_cast<eEpochCategory>(ret);
}

/*static*/
void EpochManip::setNode(EpochType& epoch, NodeType const node) {
vtAssert(isRooted(epoch), "Must be rooted to manipulate the node");
Expand Down
38 changes: 33 additions & 5 deletions src/vt/epoch/epoch_manip.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ namespace vt { namespace epoch {
struct EpochManip : runtime::component::Component<EpochManip> {
using CapturedContextType = term::ParentEpochCapture;

EpochManip();
EpochManip() = default;

std::string name() override { return "EpochManip"; }

Expand All @@ -89,6 +89,24 @@ struct EpochManip : runtime::component::Component<EpochManip> {
*/
static bool isRooted(EpochType const& epoch);

/**
* \brief Gets whether an epoch is DS or onot
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo.

*
* \param[in] epoch the epoch
*
* \return whether it is DS
*/
static bool isDS(EpochType epoch);

/**
* \brief Gets whether an epoch is dependent or onot
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo.

*
* \param[in] epoch the epoch
*
* \return whether it is dependent
*/
static bool isDep(EpochType epoch);

/**
* \brief Gets the \c eEpochCategory of a given epoch
*
Expand Down Expand Up @@ -152,6 +170,14 @@ struct EpochManip : runtime::component::Component<EpochManip> {
*/
static void setSeq(EpochType& epoch, EpochType::ImplType const seq);

/**
* \brief Combine eEpochCategory elements
*
* \param[in] c1 category 1
* \param[in] c2 category 2
*/
static eEpochCategory makeCat(eEpochCategory c1, eEpochCategory c2);

/*
* General (stateless) methods for creating a epoch with certain properties
* based on a current sequence number
Expand Down Expand Up @@ -250,10 +276,12 @@ struct EpochManip : runtime::component::Component<EpochManip> {
}

private:
// epoch window container for specific archetyped epochs
std::unordered_map<EpochType,std::unique_ptr<EpochWindow>> terminated_epochs_;
// epoch window for basic collective epochs
std::unique_ptr<EpochWindow> terminated_collective_epochs_ = nullptr;
/// epoch window container for specific archetyped epochs
std::unordered_map<EpochType, std::unique_ptr<EpochWindow>> terminated_epochs_;
/// epoch window for basic collective epochs
std::unordered_map<
EpochType, std::unique_ptr<EpochWindow>
> terminated_collective_epochs_;
};

}} /* end namespace vt::epoch */
Expand Down
12 changes: 12 additions & 0 deletions src/vt/group/collective/group_info_collective.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ void InfoColl::setupCollective() {
auto msg = makeMessage<GroupCollectiveMsg>(
group_, up_tree_cont_, in_group, size, child
);
envelopeSetSystemMsg(msg->env, true);
theMsg()->sendMsg<upHan>(parent, msg);
}
}
Expand Down Expand Up @@ -336,6 +337,7 @@ void InfoColl::upTree() {
auto msg = makeMessage<GroupCollectiveMsg>(
group,new_root_cont_,true,subtree_zero,root_node,0,extra
);
envelopeSetSystemMsg(msg->env, true);
theMsg()->sendMsg<newRootHan>(root_node, msg);

for (std::size_t i = 1; i < msg_list.size(); i++) {
Expand Down Expand Up @@ -377,6 +379,7 @@ void InfoColl::upTree() {
auto cmsg = makeMessage<GroupCollectiveMsg>(
group,op,is_in_group,total_subtree,child,level
);
envelopeSetSystemMsg(cmsg->env, true);
theMsg()->sendMsg<upHan>(p, cmsg);

for (auto&& msg : msg_in_group) {
Expand Down Expand Up @@ -406,6 +409,7 @@ void InfoColl::upTree() {
auto msg = makeMessage<GroupCollectiveMsg>(
group,op,is_in_group,static_cast<NodeType>(subtree_),child,0,extra
);
envelopeSetSystemMsg(msg->env, true);
theMsg()->sendMsg<upHan>(p, msg);
/*
* Forward all the children messages up the tree (up to 2 of them)
Expand Down Expand Up @@ -438,9 +442,11 @@ void InfoColl::upTree() {
auto msg = makeMessage<GroupCollectiveMsg>(
group,op,is_in_group,total_subtree,child,0,extra
);
envelopeSetSystemMsg(msg->env, true);
theMsg()->sendMsg<upHan>(p, msg);
// new MsgPtr to avoid thief of original in collection
auto msg_out = promoteMsg(msg_in_group[0].get());
envelopeSetSystemMsg(msg_out->env, true);
theMsg()->sendMsg<upHan>(p, msg_out);
} else {
vtAssertExpr(msg_in_group.size() > 2);
Expand Down Expand Up @@ -478,6 +484,7 @@ void InfoColl::upTree() {
auto msg = makeMessage<GroupCollectiveMsg>(
group,op,is_in_group,total_subtree,child,0,extra
);
envelopeSetSystemMsg(msg->env, true);
theMsg()->sendMsg<upHan>(p, msg);

vt_debug_print(
Expand Down Expand Up @@ -639,12 +646,14 @@ void InfoColl::downTree(GroupCollectiveMsg* msg) {
auto const& num = collective_->span_children_.size();
auto const& child = collective_->span_children_[msg->getChild() % num];
auto nmsg = makeMessage<GroupCollectiveMsg>(*msg);
envelopeSetSystemMsg(nmsg->env, true);
theMsg()->sendMsg<downHan>(child, nmsg);
++send_down_;
}

auto const& group_ = getGroupID();
auto nmsg = makeMessage<GroupOnlyMsg>(group_,down_tree_fin_cont_);
envelopeSetSystemMsg(nmsg->env, true);
theMsg()->sendMsg<downFinishedHan>(from, nmsg);
}

Expand Down Expand Up @@ -686,6 +695,7 @@ void InfoColl::sendDownNewTree() {
group_, c
);
auto msg = makeMessage<GroupOnlyMsg>(group_,new_tree_cont_);
envelopeSetSystemMsg(msg->env, true);
theMsg()->sendMsg<newTreeHan>(c, msg);
}
}
Expand Down Expand Up @@ -727,6 +737,7 @@ void InfoColl::finalize() {
auto msg = makeMessage<GroupOnlyMsg>(
group_,finalize_cont_,known_root_node_,is_default_group_
);
envelopeSetSystemMsg(msg->env, true);
theMsg()->sendMsg<finalizeHan>(c, msg);
}

Expand All @@ -744,6 +755,7 @@ void InfoColl::finalize() {

auto stamp = makeStamp<StrongUserID>(group_);
auto msg = makeMessage<FinishedReduceMsg>(group_);
envelopeSetSystemMsg(msg->env, true);
using OpType = collective::PlusOp<collective::NoneType>;

auto r = theGroup()->reducer();
Expand Down
2 changes: 2 additions & 0 deletions src/vt/group/rooted/group_info_rooted.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ void InfoRooted::setupRooted() {
auto msg = makeMessage<GroupListMsg>(
low_node, listsize, group_, op, listsize, this_node, &lst
);
envelopeSetSystemMsg(msg->env, true);
is_forward_ = true;
forward_node_ = low_node;
if (this_node != low_node) {
Expand All @@ -142,6 +143,7 @@ void InfoRooted::setupRooted() {
low_node, regsize, group_, op, regsize, this_node,
static_cast<region::Range*>(region_.get())
);
envelopeSetSystemMsg(msg->env, true);
is_forward_ = true;
forward_node_ = low_node;
if (this_node != low_node) {
Expand Down
13 changes: 7 additions & 6 deletions src/vt/messaging/active.cc
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ void ActiveMessenger::finishPendingDataMsgAsyncRecv(InProgressDataIRecv* irecv)
theTerm()->consume(term::any_epoch_sentinel,1,sender);
theTerm()->hangDetectRecv();
};
theSched()->enqueue(irecv->priority, run);
theSched()->enqueueLambda(irecv->priority, run);
}
}

Expand Down Expand Up @@ -934,12 +934,13 @@ void ActiveMessenger::prepareActiveMsgToRun(
using MsgType = ShortMessage;
auto msg = base.to<MsgType>().get();

auto const is_term = envelopeIsTerm(msg->env);
auto const is_bcast = envelopeIsBcast(msg->env);
auto const dest = envelopeGetDest(msg->env);
auto const handler = envelopeGetHandler(msg->env);
auto const epoch = envelopeIsEpochType(msg->env) ?
auto const is_term = envelopeIsTerm(msg->env);
auto const is_bcast = envelopeIsBcast(msg->env);
auto const dest = envelopeGetDest(msg->env);
auto const handler = envelopeGetHandler(msg->env);
auto const epoch = envelopeIsEpochType(msg->env) ?
envelopeGetEpoch(msg->env) : term::any_epoch_sentinel;

auto const from_node = is_bcast ? dest : in_from_node;

if (!is_term || vt_check_enabled(print_term_msgs)) {
Expand Down
2 changes: 1 addition & 1 deletion src/vt/messaging/active.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ struct BufferedActiveMsg {

BufferedActiveMsg(
MessageType const& in_buffered_msg, NodeType const& in_from_node,
ActionType in_cont
ActionType in_cont = nullptr
) : buffered_msg(in_buffered_msg), from_node(in_from_node), cont(in_cont)
{ }

Expand Down
4 changes: 4 additions & 0 deletions src/vt/messaging/active.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,27 +68,31 @@ void ActiveMessenger::markAsTermMessage(MsgPtrT const msg) {
#if vt_check_enabled(trace_enabled)
envelopeSetTraceRuntimeEnabled(msg->env, theConfig()->traceTerm());
#endif
envelopeSetSystemMsg(msg->env, true);
}

template <typename MsgPtrT>
void ActiveMessenger::markAsLocationMessage(MsgPtrT const msg) {
#if vt_check_enabled(trace_enabled)
envelopeSetTraceRuntimeEnabled(msg->env, theConfig()->traceLocation());
#endif
envelopeSetSystemMsg(msg->env, true);
}

template <typename MsgPtrT>
void ActiveMessenger::markAsSerialMsgMessage(MsgPtrT const msg) {
#if vt_check_enabled(trace_enabled)
envelopeSetTraceRuntimeEnabled(msg->env, theConfig()->traceSerialMsg());
#endif
envelopeSetSystemMsg(msg->env, true);
}

template <typename MsgPtrT>
void ActiveMessenger::markAsCollectionMessage(MsgPtrT const msg) {
#if vt_check_enabled(trace_enabled)
envelopeSetTraceRuntimeEnabled(msg->env, theConfig()->traceCollection());
#endif
envelopeSetSystemMsg(msg->env, true);
}

template <typename MsgT>
Expand Down
4 changes: 4 additions & 0 deletions src/vt/messaging/envelope/envelope_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ struct ActiveEnvelope {
/// Used to denote that the message's bare handlers shouldn't record
/// communication LB data due to redundancy
bool comm_lb_data_recorded_above_bare_handler : 1;

/// Used to denote that this is an internal system message that does not do
/// application work
bool system_msg : 1;
};

}} /* end namespace vt::messaging */
Expand Down
Loading
Loading