Skip to content

Commit

Permalink
Merge pull request #714 from DARMA-tasking/702-make-reduce-return-Pen…
Browse files Browse the repository at this point in the history
…dingSend

702 make reduce return pending send
  • Loading branch information
PhilMiller authored Aug 26, 2020
2 parents 306d713 + beea5ea commit c959dbc
Show file tree
Hide file tree
Showing 17 changed files with 634 additions and 118 deletions.
2 changes: 1 addition & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ set(TOP_LEVEL_SUBDIRS
activefn
# Add single-directory components
context event handler parameterization sequence termination
scheduler worker standalone runtime trace timing demangle
scheduler worker standalone runtime trace timing demangle rdmahandle
)
set(
PROJECT_SUBDIRS_LIST
Expand Down
71 changes: 68 additions & 3 deletions src/vt/collective/reduce/reduce.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#include "vt/messaging/message.h"
#include "vt/collective/tree/tree.h"
#include "vt/utils/hash/hash_tuple.h"
#include "vt/messaging/pending_send.h"

#include <tuple>
#include <unordered_map>
Expand All @@ -80,6 +81,7 @@ namespace vt { namespace collective { namespace reduce {
struct Reduce : virtual collective::tree::Tree {
using ReduceStateType = ReduceState;
using ReduceNumType = typename ReduceStateType::ReduceNumType;
using PendingSendType = messaging::PendingSend;

/**
* \internal \brief Construct a new reducer instance
Expand All @@ -106,6 +108,23 @@ struct Reduce : virtual collective::tree::Tree {
*/
detail::ReduceStamp generateNextID();

/**
* \brief Reduce a message up the tree, possibly delayed through a pending send
*
* \param[in] root the root node where the final handler provides the result
* \param[in] msg the message to reduce on this node
* \param[in] id the reduction stamp (optional), provided if out-of-order
* \param[in] num_contrib number of expected contributions from this node
*
* \return the pending send corresponding to the reduce
*/
template <typename MsgT, ActiveTypedFnType<MsgT>* f>
PendingSendType reduce(
NodeType root, MsgT* const msg,
detail::ReduceStamp id = detail::ReduceStamp{},
ReduceNumType num_contrib = 1
);

/**
* \brief Reduce a message up the tree
*
Expand All @@ -117,7 +136,7 @@ struct Reduce : virtual collective::tree::Tree {
* \return the next reduction stamp
*/
template <typename MsgT, ActiveTypedFnType<MsgT>* f>
detail::ReduceStamp reduce(
detail::ReduceStamp reduceImmediate(
NodeType root, MsgT* const msg,
detail::ReduceStamp id = detail::ReduceStamp{},
ReduceNumType num_contrib = 1
Expand All @@ -141,7 +160,31 @@ struct Reduce : virtual collective::tree::Tree {
MsgT, OpT, collective::reduce::operators::ReduceCallback<MsgT>
>
>
detail::ReduceStamp reduce(
PendingSendType reduce(
NodeType const& root, MsgT* msg, Callback<MsgT> cb,
detail::ReduceStamp id = detail::ReduceStamp{},
ReduceNumType const& num_contrib = 1
);

/**
* \brief Reduce a message up the tree
*
* \param[in] root the root node where the final handler provides the result
* \param[in] msg the message to reduce on this node
* \param[in] cb the callback to trigger on the root node
* \param[in] id the reduction stamp (optional), provided if out-of-order
* \param[in] num_contrib number of expected contributions from this node
*
* \return the next reduction stamp
*/
template <
typename OpT,
typename MsgT,
ActiveTypedFnType<MsgT> *f = MsgT::template msgHandler<
MsgT, OpT, collective::reduce::operators::ReduceCallback<MsgT>
>
>
detail::ReduceStamp reduceImmediate(
NodeType const& root, MsgT* msg, Callback<MsgT> cb,
detail::ReduceStamp id = detail::ReduceStamp{},
ReduceNumType const& num_contrib = 1
Expand All @@ -163,7 +206,29 @@ struct Reduce : virtual collective::tree::Tree {
typename MsgT,
ActiveTypedFnType<MsgT> *f = MsgT::template msgHandler<MsgT, OpT, FunctorT>
>
detail::ReduceStamp reduce(
PendingSendType reduce(
NodeType const& root, MsgT* msg,
detail::ReduceStamp id = detail::ReduceStamp{},
ReduceNumType const& num_contrib = 1
);

/**
* \brief Reduce a message up the tree with a target function on the root node
*
* \param[in] root the root node where the final handler provides the result
* \param[in] msg the message to reduce on this node
* \param[in] id the reduction stamp (optional), provided if out-of-order
* \param[in] num_contrib number of expected contributions from this node
*
* \return the next reduction stamp
*/
template <
typename OpT,
typename FunctorT,
typename MsgT,
ActiveTypedFnType<MsgT> *f = MsgT::template msgHandler<MsgT, OpT, FunctorT>
>
detail::ReduceStamp reduceImmediate(
NodeType const& root, MsgT* msg,
detail::ReduceStamp id = detail::ReduceStamp{},
ReduceNumType const& num_contrib = 1
Expand Down
36 changes: 33 additions & 3 deletions src/vt/collective/reduce/reduce.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,26 +80,56 @@ void Reduce::reduceRootRecv(MsgT* msg) {
}

template <typename OpT, typename MsgT, ActiveTypedFnType<MsgT> *f>
detail::ReduceStamp Reduce::reduce(
Reduce::PendingSendType Reduce::reduce(
NodeType const& root, MsgT* msg, Callback<MsgT> cb, detail::ReduceStamp id,
ReduceNumType const& num_contrib
) {
msg->setCallback(cb);
return reduce<MsgT,f>(root,msg,id,num_contrib);
}

template <typename OpT, typename MsgT, ActiveTypedFnType<MsgT> *f>
detail::ReduceStamp Reduce::reduceImmediate(
NodeType const& root, MsgT* msg, Callback<MsgT> cb, detail::ReduceStamp id,
ReduceNumType const& num_contrib
) {
msg->setCallback(cb);
return reduceImmediate<MsgT,f>(root,msg,id,num_contrib);
}

template <
typename OpT, typename FunctorT, typename MsgT, ActiveTypedFnType<MsgT> *f
>
detail::ReduceStamp Reduce::reduce(
Reduce::PendingSendType Reduce::reduce(
NodeType const& root, MsgT* msg, detail::ReduceStamp id,
ReduceNumType const& num_contrib
) {
return reduce<MsgT,f>(root,msg,id,num_contrib);
}

template <
typename OpT, typename FunctorT, typename MsgT, ActiveTypedFnType<MsgT> *f
>
detail::ReduceStamp Reduce::reduceImmediate(
NodeType const& root, MsgT* msg, detail::ReduceStamp id,
ReduceNumType const& num_contrib
) {
return reduceImmediate<MsgT,f>(root,msg,id,num_contrib);
}

template <typename MsgT, ActiveTypedFnType<MsgT>* f>
Reduce::PendingSendType Reduce::reduce(
NodeType root, MsgT* const msg, detail::ReduceStamp id,
ReduceNumType num_contrib
) {
auto msg_ptr = promoteMsg(msg);
return PendingSendType{theMsg()->getEpochContextMsg(msg_ptr), [=](){
reduceImmediate<MsgT, f>(root, msg_ptr.get(), id, num_contrib);
} };
}

template <typename MsgT, ActiveTypedFnType<MsgT>* f>
detail::ReduceStamp Reduce::reduce(
detail::ReduceStamp Reduce::reduceImmediate(
NodeType root, MsgT* const msg, detail::ReduceStamp id,
ReduceNumType num_contrib
) {
Expand Down
Loading

0 comments on commit c959dbc

Please sign in to comment.