Skip to content

Commit

Permalink
#973: implement default proxy able to send, broadcast and reduce
Browse files Browse the repository at this point in the history
  • Loading branch information
Jakub Strzebonski committed Dec 3, 2020
1 parent dd40cd9 commit 2c88711
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 14 deletions.
5 changes: 3 additions & 2 deletions examples/callback/callback_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ int main(int argc, char** argv) {
auto cb = vt::theCB()->makeFunc<DataMsg,MyContext>(
vt::pipe::LifetimeEnum::Once, &my_global_ctx, callbackFn
);
auto msg = vt::makeMessage<CallbackMsg>(cb);
vt::theMsg()->sendMsg<CallbackMsg,handler>(1, msg);

auto const default_proxy = vt::theObjGroup()->makeCollective();
default_proxy[1].send<CallbackMsg, handler>(cb);
}

vt::finalize();
Expand Down
4 changes: 4 additions & 0 deletions src/vt/objgroup/manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@

namespace vt { namespace objgroup {

proxy::DefaultProxyType ObjGroupManager::makeCollective() const {
return proxy::DefaultProxyType{};
}

ObjGroupProxyType ObjGroupManager::getProxy(ObjGroupProxyType proxy) {
return proxy;
}
Expand Down
8 changes: 8 additions & 0 deletions src/vt/objgroup/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ struct ObjGroupManager : runtime::component::Component<ObjGroupManager> {
* communicator
*/

/**
* \brief Construct a special proxy instance that allows sending, broadcasting
* and reducing without actual object group.
*
* \return proxy to the object group
*/
proxy::DefaultProxyType makeCollective() const;

/**
* \brief Collectively construct a new object group. Allocates and constructs
* the object on each node by forwarding constructor arguments.
Expand Down
77 changes: 77 additions & 0 deletions src/vt/objgroup/proxy/proxy_objgroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,83 @@ struct Proxy {
ObjGroupProxyType proxy_ = no_obj_group; /**< The raw proxy ID bits */
};

template <>
struct Proxy<void> {
/**
* \brief Index the proxy to get the element proxy for a particular node
*
* \param[in] node the desired node
*
* \return an indexed proxy to that node
*/
DefaultProxyElm operator[](NodeType node) const;

/**
* \brief Broadcast a message.
*
* \note Creates message from given args
*
* \param[in] args the arguments used to make a message
*
* \return the \c PendingSend for the sent message
*/
template <typename MsgT, ActiveTypedFnType<MsgT>* f, typename... Args>
messaging::PendingSend broadcast(Args&&... args) const;

/**
* \brief Broadcast a message.
*
* \note Takes ownership of the supplied message.
*
* \param[in] msg the message to broadcast
* \param[in] tag the tag to put on the message
*
* \return the \c PendingSend for the sent message
*/
template <typename MsgT, ActiveTypedFnType<MsgT>* f>
messaging::PendingSend
broadcastMsg(messaging::MsgPtrThief<MsgT> msg, TagType tag = no_tag) const;

/**
* \brief Reduce a message up the tree, possibly delayed through a pending
* send
*
* \note Creates message from given args
*
* \param[in] root the root node where the final handler provides the result
* \param[in] args the arguments used to make a message
*
* \return the pending send corresponding to the reduce
*/
template <
typename OpT,
typename FunctorT,
typename MsgT,
typename... Args,
ActiveTypedFnType<MsgT>* f = MsgT::template msgHandler<MsgT, OpT, FunctorT>
>
messaging::PendingSend reduce(NodeType root, Args&&... args) const;

/**
* \brief Reduce a message up the tree, possibly delayed through a pending
* send
*
* \param[in] root the root node where the final handler provides the result
* \param[in] msg the message to reduce on this node
*
* \return the pending send corresponding to the reduce
*/
template <
typename OpT,
typename FunctorT,
typename MsgT,
ActiveTypedFnType<MsgT>* f = MsgT::template msgHandler<MsgT, OpT, FunctorT>
>
messaging::PendingSend reduceMsg(NodeType root, MsgT* const msg) const;
};

using DefaultProxyType = Proxy<void>;

}}} /* end namespace vt::objgroup::proxy */

#endif /*INCLUDED_VT_OBJGROUP_PROXY_PROXY_OBJGROUP_H*/
39 changes: 39 additions & 0 deletions src/vt/objgroup/proxy/proxy_objgroup.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,45 @@ void Proxy<ObjT>::destroyHandleSetRDMA(vt::rdma::HandleSet<T> set) const {
return vt::theHandleRDMA()->deleteHandleSetCollectiveObjGroup<T>(set);
}

inline DefaultProxyElm Proxy<void>::operator[](NodeType node) const {
return DefaultProxyElm{node};
}

template <typename MsgT, ActiveTypedFnType<MsgT>* f, typename... Args>
messaging::PendingSend Proxy<void>::broadcast(Args&&... args) const {
return broadcastMsg<MsgT, f>(makeMessage<MsgT>(std::forward<Args>(args)...));
}

template <typename MsgT, ActiveTypedFnType<MsgT>* f>
messaging::PendingSend
Proxy<void>::broadcastMsg(messaging::MsgPtrThief<MsgT> msg, TagType tag) const {
return theMsg()->broadcastMsg<MsgT, f>(msg, tag);
}

template <
typename OpT,
typename FunctorT,
typename MsgT,
typename... Args,
ActiveTypedFnType<MsgT>* f
>
messaging::PendingSend
Proxy<void>::reduce(NodeType root, Args&&... args) const {
auto const msg = makeMessage<MsgT>(std::forward<Args>(args)...);
return reduceMsg<OpT, FunctorT, MsgT, f>(root, msg.get());
}

template <
typename OpT,
typename FunctorT,
typename MsgT,
ActiveTypedFnType<MsgT>* f
>
messaging::PendingSend
Proxy<void>::reduceMsg(NodeType root, MsgT* const msg) const {
return theCollective()->global()->reduce<OpT, FunctorT>(root, msg);
}

}}} /* end namespace vt::objgroup::proxy */

#endif /*INCLUDED_VT_OBJGROUP_PROXY_PROXY_OBJGROUP_IMPL_H*/
19 changes: 19 additions & 0 deletions src/vt/objgroup/proxy/proxy_objgroup_elm.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,25 @@ struct ProxyElm {
NodeType node_ = uninitialized_destination; /**< The indexed node */
};

template <>
struct ProxyElm<void> {
explicit ProxyElm(NodeType in_node);

/**
* \brief Send a message to the node indexed by this proxy to be
* delivered to the local object instance
*
* \param[in] args args to pass to the message constructor
*/
template <typename MsgT, ActiveTypedFnType<MsgT>* fn, typename... Args>
void send(Args&&... args) const;

private:
NodeType node_ = uninitialized_destination; /**< The indexed node */
};

using DefaultProxyElm = ProxyElm<void>;

}}} /* end namespace vt::objgroup::proxy */

#endif /*INCLUDED_VT_OBJGROUP_PROXY_PROXY_OBJGROUP_ELM_H*/
8 changes: 8 additions & 0 deletions src/vt/objgroup/proxy/proxy_objgroup_elm.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ ObjT* ProxyElm<ObjT>::get() const {
return theObjGroup()->get<ObjT>(proxy);
}

inline ProxyElm<void>::ProxyElm(NodeType in_node) : node_{in_node} {}

template <typename MsgT, ActiveTypedFnType<MsgT>* fn, typename... Args>
void ProxyElm<void>::send(Args&&... args) const {
vt::theMsg()->sendMsg<MsgT, fn>(
node_, vt::makeMessage<MsgT>(std::forward<Args>(args)...));
}

}}} /* end namespace vt::objgroup::proxy */

#endif /*INCLUDED_VT_OBJGROUP_PROXY_PROXY_OBJGROUP_ELM_IMPL_H*/
10 changes: 6 additions & 4 deletions tests/unit/collectives/test_collectives_reduce.extended.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,12 @@ TEST_F(TestReduce, test_reduce_vec_int_msg) {
vecOfInt.push_back(3);

auto const root = 0;
auto msg = makeMessage<ReduceVecMsg<int>>(vecOfInt);
theCollective()->global()->reduce<
PlusOp<std::vector<int>>, Verify<ReduceOP::Plus>
>(root, msg.get());
auto const default_proxy = theObjGroup()->makeCollective();
default_proxy.reduce<
PlusOp<std::vector<int>>,
Verify<ReduceOP::Plus>,
ReduceVecMsg<int>
>(root, vecOfInt);
}

}}} // end namespace vt::tests::unit
1 change: 0 additions & 1 deletion tests/unit/test_parallel_harness.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ struct TestParallelHarnessAny : TestHarnessAny<TestBase> {

TestHarnessAny<TestBase>::SetUp();

// TODO (STRZ) - do we want to use additional args here as well?
if (mpi_singleton == nullptr) {
mpi_singleton =
std::make_unique<MPISingletonMultiTest>(test_argc, test_argv);
Expand Down
18 changes: 12 additions & 6 deletions tutorial/tutorial_1d.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,26 @@ static inline void activeMessageBroadcast() {
(void)num_nodes; // don't warn about unused variable

/*
* The theMsg()->broadcastMsg(..) will send the message to every node in the
* default_proxy.broadcast(..) will send the message to every node in the
* system. Every node will include all the nodes that VT has depending on the
* MPI communicator passed in or the size attained (number of ranks) when
* executing MPI init directly in non-interoperability mode.
*
* -- Message Ownership --
* As with sendMsg, sending with broadcastMsg relinquishes ownership of
* the message. Most calls to VT that supply a message are expected
* to relinquish ownership.
* default_proxy.broadcast(..) will create message out of args passed
* into it. There's an alternative - default_proxy.broadcastMsg(..) - which
* relinquishes ownership of the message passed to:
*
* auto msg = ::vt::makeMessage<MyDataMsg>(1.0, 2.0, 3.0);
* auto const default_proxy = theObjGroup()->makeCollective();
* default_proxy.broadcastMsg<MyDataMsg, msgHandlerX>(msg);
*
* Most calls to VT that supply a message are expected to relinquish ownership.
*/

if (this_node == 0) {
auto msg = ::vt::makeMessage<MyDataMsg>(1.0,2.0,3.0);
::vt::theMsg()->broadcastMsg<MyDataMsg,msgHandlerX>(msg);
auto const default_proxy = theObjGroup()->makeCollective();
default_proxy.broadcast<MyDataMsg, msgHandlerX>(1.0, 2.0, 3.0);
}
}

Expand Down
3 changes: 2 additions & 1 deletion tutorial/tutorial_1h.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ static inline void activeMessageReduce() {
// Get a reference to the value to set it in this reduce msg
reduce_msg->getVal() = 50;

::vt::theCollective()->global()->reduce<ReduceOp,ReduceResult>(
auto const default_proxy = theObjGroup()->makeCollective();
default_proxy.reduceMsg<ReduceOp, ReduceResult>(
root_reduce_node, reduce_msg.get()
);
}
Expand Down

0 comments on commit 2c88711

Please sign in to comment.