Skip to content

Commit

Permalink
#1899: registry: completely remove old manual registry
Browse files Browse the repository at this point in the history
  • Loading branch information
lifflander authored and cz4rs committed Sep 28, 2022
1 parent d1de6ad commit 4419698
Show file tree
Hide file tree
Showing 26 changed files with 23 additions and 582 deletions.
4 changes: 0 additions & 4 deletions docs/md/registry.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
\page registry Registry
\brief Registered handlers

The registry component `vt::registry::Registry`, accessed via
`vt::theRegistry()` holds type-safe active handlers for execution across a
distributed machine.

- The \ref active-messenger uses the registry to store/dispatch active
function and active functor handlers.
- The \ref objgroup uses the registry to store/dispatch active member
Expand Down
5 changes: 0 additions & 5 deletions src/vt/collective/collective_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,6 @@ void CollectiveAnyOps<instance>::output(
}
}

template <runtime::RuntimeInstType instance>
HandlerType CollectiveAnyOps<instance>::registerHandler(ActiveClosureFnType fn) {
return theRegistry()->registerActiveHandler(fn);
}

template struct CollectiveAnyOps<collective_default_inst>;

} /* end namespace vt */
3 changes: 0 additions & 3 deletions src/vt/collective/collective_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
#include "vt/config.h"
#include "vt/context/context.h"
#include "vt/runtime/runtime_headers.h"
#include "vt/registry/registry.h"

#include <string>

Expand Down Expand Up @@ -77,8 +76,6 @@ struct CollectiveAnyOps {
bool error = false, bool decorate = true, bool formatted = false,
bool abort_out = false
);

static HandlerType registerHandler(ActiveClosureFnType fn);
};

using CollectiveOps = CollectiveAnyOps<collective_default_inst>;
Expand Down
1 change: 0 additions & 1 deletion src/vt/collective/reduce/reduce.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@

#include "vt/config.h"
#include "vt/collective/collective_alg.h"
#include "vt/registry/registry.h"
#include "vt/registry/auto/auto_registry_interface.h"
#include "vt/messaging/active.h"
#include "vt/messaging/message.h"
Expand Down
153 changes: 16 additions & 137 deletions src/vt/messaging/active.cc
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ bool ActiveMessenger::recvDataMsg(
);
}

bool ActiveMessenger::processActiveMsg(
void ActiveMessenger::processActiveMsg(
MsgSharedPtr<BaseMsgType> const& base, NodeType const& from,
bool insert, ActionType cont
) {
Expand All @@ -913,18 +913,17 @@ bool ActiveMessenger::processActiveMsg(
}

if (deliver) {
return prepareActiveMsgToRun(base,from,insert,cont);
prepareActiveMsgToRun(base,from,insert,cont);
} else {
amForwardCounterGauge.incrementUpdate(base.size(), 1);

if (cont != nullptr) {
cont();
}
return false;
}
}

bool ActiveMessenger::prepareActiveMsgToRun(
void ActiveMessenger::prepareActiveMsgToRun(
MsgSharedPtr<BaseMsgType> const& base, NodeType const& in_from_node,
bool insert, ActionType cont
) {
Expand All @@ -950,11 +949,6 @@ bool ActiveMessenger::prepareActiveMsgToRun(
);
}

bool const is_auto = HandlerManagerType::isHandlerAuto(handler);
bool const is_obj = HandlerManagerType::isHandlerObjGroup(handler);
bool has_handler =
(is_obj or is_auto) or theRegistry()->getHandler(handler, tag) != nullptr;

if (!is_term || vt_check_enabled(print_term_msgs)) {
vt_debug_print(
normal, active,
Expand All @@ -965,41 +959,22 @@ bool ActiveMessenger::prepareActiveMsgToRun(
);
}

if (has_handler) {
runnable::makeRunnable(base, not is_term, handler, from_node)
.withContinuation(cont)
.withTag(tag)
.withTDEpochFromMsg(is_term)
.withLBData(&bare_handler_lb_data_, bare_handler_dummy_elm_id_for_lb_data_)
.enqueue();

if (is_term) {
tdRecvCount.increment(1);
}
amHandlerCount.increment(1);
runnable::makeRunnable(base, not is_term, handler, from_node)
.withContinuation(cont)
.withTag(tag)
.withTDEpochFromMsg(is_term)
.withLBData(&bare_handler_lb_data_, bare_handler_dummy_elm_id_for_lb_data_)
.enqueue();

if (not is_term) {
theTerm()->consume(epoch,1,in_from_node);
theTerm()->hangDetectRecv();
}
} else {
if (insert) {
auto iter = pending_handler_msgs_.find(handler);
if (iter == pending_handler_msgs_.end()) {
pending_handler_msgs_.emplace(
std::piecewise_construct,
std::forward_as_tuple(handler),
std::forward_as_tuple(
MsgContType{BufferedMsgType{base,from_node,cont}}
)
);
} else {
iter->second.push_back(BufferedMsgType{base,from_node,cont});
}
}
if (is_term) {
tdRecvCount.increment(1);
}
amHandlerCount.increment(1);

return has_handler;
if (not is_term) {
theTerm()->consume(epoch,1,in_from_node);
theTerm()->hangDetectRecv();
}
}

bool ActiveMessenger::tryProcessIncomingActiveMsg() {
Expand Down Expand Up @@ -1176,7 +1151,6 @@ bool ActiveMessenger::testPendingAsyncOps() {
int ActiveMessenger::progress(TimeType current_time) {
bool const started_irecv_active_msg = tryProcessIncomingActiveMsg();
bool const started_irecv_data_msg = tryProcessDataMsgRecv();
processMaybeReadyHanTag();
bool const received_active_msg = testPendingActiveMsgAsyncRecv();
bool const received_data_msg = testPendingDataMsgAsyncRecv();
bool const general_async = testPendingAsyncOps();
Expand All @@ -1185,101 +1159,6 @@ int ActiveMessenger::progress(TimeType current_time) {
received_active_msg or received_data_msg or general_async;
}

void ActiveMessenger::processMaybeReadyHanTag() {
decltype(maybe_ready_tag_han_) maybe_ready = maybe_ready_tag_han_;
// Clear first so clearing doesn't happen after new entries may be added by an
// active message arriving
maybe_ready_tag_han_.clear();
for (auto&& x : maybe_ready) {
deliverPendingMsgsHandler(std::get<0>(x), std::get<1>(x));
}
}

HandlerType ActiveMessenger::registerNewHandler(
ActiveClosureFnType fn, TagType const& tag
) {
return theRegistry()->registerNewHandler(fn, tag);
}

HandlerType ActiveMessenger::collectiveRegisterHandler(
ActiveClosureFnType fn, TagType const& tag
) {
return theRegistry()->registerActiveHandler(fn, tag);
}

void ActiveMessenger::swapHandlerFn(
HandlerType const han, ActiveClosureFnType fn, TagType const& tag
) {
vt_debug_print(
verbose, active,
"swapHandlerFn: han={}, tag={}\n", han, tag
);

theRegistry()->swapHandler(han, fn, tag);

if (fn != nullptr) {
maybe_ready_tag_han_.push_back(ReadyHanTagType{han,tag});
}
}

void ActiveMessenger::deliverPendingMsgsHandler(
HandlerType const han, TagType const& tag
) {
vt_debug_print(
normal, active,
"deliverPendingMsgsHandler: han={}, tag={}\n", han, tag
);
auto iter = pending_handler_msgs_.find(han);
if (iter != pending_handler_msgs_.end()) {
if (iter->second.size() > 0) {
for (auto cur = iter->second.begin(); cur != iter->second.end(); ) {
vt_debug_print(
verbose, active,
"deliverPendingMsgsHandler: msg={}, from={}\n",
print_ptr(cur->buffered_msg.get()), cur->from_node
);
if (
prepareActiveMsgToRun(
cur->buffered_msg, cur->from_node, false, cur->cont
)
) {
cur = iter->second.erase(cur);
} else {
++cur;
}
}
} else {
pending_handler_msgs_.erase(iter);
}
}
}

void ActiveMessenger::registerHandlerFn(
HandlerType const han, ActiveClosureFnType fn, TagType const& tag
) {
vt_debug_print(
verbose, active,
"registerHandlerFn: han={}, tag={}\n", han, tag
);

swapHandlerFn(han, fn, tag);

if (fn != nullptr) {
maybe_ready_tag_han_.push_back(ReadyHanTagType{han,tag});
}
}

void ActiveMessenger::unregisterHandlerFn(
HandlerType const han, TagType const& tag
) {
vt_debug_print(
verbose, active,
"unregisterHandlerFn: han={}, tag={}\n", han, tag
);

return theRegistry()->unregisterHandlerFn(han, tag);
}

void ActiveMessenger::registerAsyncOp(std::unique_ptr<AsyncOp> in) {
in_progress_ops.emplace(AsyncOpWrapper{std::move(in)});
}
Expand Down
Loading

0 comments on commit 4419698

Please sign in to comment.