From 6c9ae3af863cf3247fd28acd52822d8ab748942d Mon Sep 17 00:00:00 2001 From: Jakub Strzebonski Date: Sat, 14 Nov 2020 18:35:43 +0100 Subject: [PATCH] #973: implement default proxy able to send, broadcast and reduce --- examples/callback/callback_context.cc | 5 +- src/vt/objgroup/manager.h | 8 ++ src/vt/objgroup/proxy/proxy_objgroup.h | 91 +++++++++++++++++++ src/vt/objgroup/proxy/proxy_objgroup_elm.h | 24 +++++ .../test_collectives_reduce.extended.cc | 10 +- tests/unit/test_parallel_harness.h | 1 - tutorial/tutorial_1d.h | 18 ++-- tutorial/tutorial_1h.h | 3 +- 8 files changed, 146 insertions(+), 14 deletions(-) diff --git a/examples/callback/callback_context.cc b/examples/callback/callback_context.cc index e72915cf00..60cbd3847a 100644 --- a/examples/callback/callback_context.cc +++ b/examples/callback/callback_context.cc @@ -108,8 +108,9 @@ int main(int argc, char** argv) { auto cb = vt::theCB()->makeFunc( vt::pipe::LifetimeEnum::Once, &my_global_ctx, callbackFn ); - auto msg = vt::makeMessage(cb); - vt::theMsg()->sendMsg(1, msg); + + auto const default_proxy = vt::theObjGroup()->makeCollective(); + default_proxy[1].send(cb); } vt::finalize(); diff --git a/src/vt/objgroup/manager.h b/src/vt/objgroup/manager.h index 9d21a4b9a2..dcfcbd6322 100644 --- a/src/vt/objgroup/manager.h +++ b/src/vt/objgroup/manager.h @@ -110,6 +110,14 @@ struct ObjGroupManager : runtime::component::Component { * communicator */ + /** + * \brief Construct a special proxy instance that allows sending, broadcasting + * and reducing without actual object group. + * + * \return proxy to the object group + */ + proxy::DefaultProxyType makeCollective() { return proxy::DefaultProxyType{}; } + /** * \brief Collectively construct a new object group. Allocates and constructs * the object on each node by forwarding constructor arguments. diff --git a/src/vt/objgroup/proxy/proxy_objgroup.h b/src/vt/objgroup/proxy/proxy_objgroup.h index e81712d465..74a9b945a0 100644 --- a/src/vt/objgroup/proxy/proxy_objgroup.h +++ b/src/vt/objgroup/proxy/proxy_objgroup.h @@ -59,6 +59,7 @@ #include "vt/rdmahandle/handle.fwd.h" #include "vt/rdmahandle/handle_set.fwd.h" #include "vt/messaging/pending_send.h" +#include "vt/collective/collective_alg.h" namespace vt { namespace objgroup { namespace proxy { @@ -341,6 +342,96 @@ struct Proxy { ObjGroupProxyType proxy_ = no_obj_group; /**< The raw proxy ID bits */ }; +template <> +struct Proxy { + /** + * \brief Index the proxy to get the element proxy for a particular node + * + * \param[in] node the desired node + * + * \return an indexed proxy to that node + */ + DefaultProxyElm operator[](NodeType node) const { + return DefaultProxyElm{node}; + } + + /** + * \brief Broadcast a message. + * + * \note Creates message from given args + * + * \param[in] args the arguments used to make a message + * + * \return the \c PendingSend for the sent message + */ + template * f, typename... Args> + messaging::PendingSend broadcast(Args&&... args) const { + return broadcastMsg( + makeMessage(std::forward(args)...) + ); + } + + /** + * \brief Broadcast a message. + * + * \note Takes ownership of the supplied message. + * + * \param[in] msg the message to broadcast + * \param[in] tag the tag to put on the message + * + * \return the \c PendingSend for the sent message + */ + template * f> + messaging::PendingSend + broadcastMsg(messaging::MsgPtrThief msg, TagType tag = no_tag) const { + return theMsg()->broadcastMsg(msg, tag); + } + + /** + * \brief Reduce a message up the tree, possibly delayed through a pending + * send + * + * \note Creates message from given args + * + * \param[in] root the root node where the final handler provides the result + * \param[in] args the arguments used to make a message + * + * \return the pending send corresponding to the reduce + */ + template < + typename OpT, + typename FunctorT, + typename MsgT, + typename... Args, + ActiveTypedFnType* f = MsgT::template msgHandler + > + messaging::PendingSend reduce(NodeType root, Args&&... args) const { + auto const msg = makeMessage(std::forward(args)...); + return reduceMsg(root, msg.get()); + } + + /** + * \brief Reduce a message up the tree, possibly delayed through a pending + * send + * + * \param[in] root the root node where the final handler provides the result + * \param[in] msg the message to reduce on this node + * + * \return the pending send corresponding to the reduce + */ + template < + typename OpT, + typename FunctorT, + typename MsgT, + ActiveTypedFnType* f = MsgT::template msgHandler + > + messaging::PendingSend reduceMsg(NodeType root, MsgT* const msg) const { + return theCollective()->global()->reduce(root, msg); + } +}; + +using DefaultProxyType = Proxy; + }}} /* end namespace vt::objgroup::proxy */ #endif /*INCLUDED_VT_OBJGROUP_PROXY_PROXY_OBJGROUP_H*/ diff --git a/src/vt/objgroup/proxy/proxy_objgroup_elm.h b/src/vt/objgroup/proxy/proxy_objgroup_elm.h index db840c8590..eae6ed57f0 100644 --- a/src/vt/objgroup/proxy/proxy_objgroup_elm.h +++ b/src/vt/objgroup/proxy/proxy_objgroup_elm.h @@ -49,6 +49,8 @@ #include "vt/objgroup/common.h" #include "vt/objgroup/proxy/proxy_bits.h" #include "vt/objgroup/active_func/active_func.h" +#include "vt/activefn/activefn.h" +#include "vt/messaging/active.h" #include "vt/messaging/message/smart_ptr.h" namespace vt { namespace objgroup { namespace proxy { @@ -179,6 +181,28 @@ struct ProxyElm { NodeType node_ = uninitialized_destination; /**< The indexed node */ }; +template <> +struct ProxyElm { + explicit ProxyElm(NodeType in_node) : node_{in_node} {} + + /** + * \brief Send a message to the node indexed by this proxy to be + * delivered to the local object instance + * + * \param[in] args args to pass to the message constructor + */ + template * fn, typename... Args> + void send(Args&&... args) const { + vt::theMsg()->sendMsg( + node_, vt::makeMessage(std::forward(args)...)); + } + +private: + NodeType node_ = uninitialized_destination; /**< The indexed node */ +}; + +using DefaultProxyElm = ProxyElm; + }}} /* end namespace vt::objgroup::proxy */ #endif /*INCLUDED_VT_OBJGROUP_PROXY_PROXY_OBJGROUP_ELM_H*/ diff --git a/tests/unit/collectives/test_collectives_reduce.extended.cc b/tests/unit/collectives/test_collectives_reduce.extended.cc index c675cab48b..b897d3b2d7 100644 --- a/tests/unit/collectives/test_collectives_reduce.extended.cc +++ b/tests/unit/collectives/test_collectives_reduce.extended.cc @@ -107,10 +107,12 @@ TEST_F(TestReduce, test_reduce_vec_int_msg) { vecOfInt.push_back(3); auto const root = 0; - auto msg = makeMessage>(vecOfInt); - theCollective()->global()->reduce< - PlusOp>, Verify - >(root, msg.get()); + auto const default_proxy = theObjGroup()->makeCollective(); + default_proxy.reduce< + PlusOp>, + Verify, + ReduceVecMsg + >(root, vecOfInt); } }}} // end namespace vt::tests::unit diff --git a/tests/unit/test_parallel_harness.h b/tests/unit/test_parallel_harness.h index 7b6dfe9b2f..d3c7af8051 100644 --- a/tests/unit/test_parallel_harness.h +++ b/tests/unit/test_parallel_harness.h @@ -92,7 +92,6 @@ struct TestParallelHarnessAny : TestHarnessAny { TestHarnessAny::SetUp(); - // TODO (STRZ) - do we want to use additional args here as well? if (mpi_singleton == nullptr) { mpi_singleton = std::make_unique(test_argc, test_argv); diff --git a/tutorial/tutorial_1d.h b/tutorial/tutorial_1d.h index b68a73c989..49b668862f 100644 --- a/tutorial/tutorial_1d.h +++ b/tutorial/tutorial_1d.h @@ -74,20 +74,26 @@ static inline void activeMessageBroadcast() { (void)num_nodes; // don't warn about unused variable /* - * The theMsg()->broadcastMsg(..) will send the message to every node in the + * default_proxy.broadcast(..) will send the message to every node in the * system. Every node will include all the nodes that VT has depending on the * MPI communicator passed in or the size attained (number of ranks) when * executing MPI init directly in non-interoperability mode. * * -- Message Ownership -- - * As with sendMsg, sending with broadcastMsg relinquishes ownership of - * the message. Most calls to VT that supply a message are expected - * to relinquish ownership. + * default_proxy.broadcast(..) will create message out of args passed + * into it. There's an alternative - default_proxy.broadcastMsg(..) - which + * relinquishes ownership of the message passed to: + * + * auto msg = ::vt::makeMessage(1.0, 2.0, 3.0); + * auto const default_proxy = theObjGroup()->makeCollective(); + * default_proxy.broadcastMsg(msg); + * + * Most calls to VT that supply a message are expected to relinquish ownership. */ if (this_node == 0) { - auto msg = ::vt::makeMessage(1.0,2.0,3.0); - ::vt::theMsg()->broadcastMsg(msg); + auto const default_proxy = theObjGroup()->makeCollective(); + default_proxy.broadcast(1.0, 2.0, 3.0); } } diff --git a/tutorial/tutorial_1h.h b/tutorial/tutorial_1h.h index 56d0391391..cb0f861995 100644 --- a/tutorial/tutorial_1h.h +++ b/tutorial/tutorial_1h.h @@ -90,7 +90,8 @@ static inline void activeMessageReduce() { // Get a reference to the value to set it in this reduce msg reduce_msg->getVal() = 50; - ::vt::theCollective()->global()->reduce( + auto const default_proxy = theObjGroup()->makeCollective(); + default_proxy.reduceMsg( root_reduce_node, reduce_msg.get() ); }