From 489dc15c7260e155fbf22517fd5f7bcd6e1a77e5 Mon Sep 17 00:00:00 2001 From: Jakub Domagala Date: Wed, 7 Oct 2020 23:08:26 +0200 Subject: [PATCH] #1024: Prioritize send/broadcast which creates the message and then does the action instead of passing the message to these functions --- examples/collection/jacobi1d_vt.cc | 18 +++++++-------- examples/collection/jacobi2d_vt.cc | 23 ++++++------------- examples/collection/migrate_collection.cc | 21 ++++++----------- examples/collection/transpose.cc | 3 +-- src/vt/rdmahandle/manager.collection.impl.h | 7 ++---- src/vt/trace/file_spec/spec.cc | 5 ++-- .../vrt/collection/balance/baselb/baselb.cc | 3 +-- .../balance/hierarchicallb/hierlb.cc | 10 ++++---- .../balance/stats_restart_reader.cc | 6 ++--- tests/unit/collection/test_broadcast.h | 10 ++++---- .../test_collection_construct_common.h | 10 ++++---- tests/unit/collection/test_destroy.cc | 4 ++-- tests/unit/collection/test_insert.extended.cc | 6 ++--- .../unit/collection/test_lb_lite.extended.cc | 4 ++-- .../unit/collection/test_reduce_collection.cc | 17 +++++++------- ...test_callback_bcast_collection.extended.cc | 9 +++----- .../test_callback_send_collection.extended.cc | 9 +++----- tutorial/tutorial_2a.h | 6 ++--- tutorial/tutorial_2b.h | 3 +-- 19 files changed, 70 insertions(+), 104 deletions(-) diff --git a/examples/collection/jacobi1d_vt.cc b/examples/collection/jacobi1d_vt.cc index 54239d4dc0..cf7c78ea5d 100644 --- a/examples/collection/jacobi1d_vt.cc +++ b/examples/collection/jacobi1d_vt.cc @@ -131,8 +131,7 @@ struct LinearPb1DJacobi : vt::Collection { // Start a new iteration // auto proxy = getCollectionProxy(); - auto loopMsg = vt::makeMessage(); - proxy.broadcastMsg(loopMsg.get()); + proxy.broadcast(); } else if (iter > maxIter) { fmt::print("\n Maximum Number of Iterations Reached. \n\n"); @@ -259,14 +258,17 @@ struct LinearPb1DJacobi : vt::Collection { //--- Send the values to the left auto proxy = this->getCollectionProxy(); if (myIdx > 0) { - auto leftMsg = vt::makeMessage(myIdx, told_[1]); - proxy[myIdx-1].sendMsg(leftMsg.get()); + proxy[myIdx - 1].send( + myIdx, told_[1] + ); } //--- Send values to the right if (size_t(myIdx) < numObjs_ - 1) { - auto rightMsg = vt::makeMessage(myIdx, told_[numRowsPerObject_]); - proxy[myIdx+1].sendMsg(rightMsg.get()); + auto rightMsg = vt::makeMessage(); + proxy[myIdx + 1].send( + myIdx, told_[numRowsPerObject_] + ); } } @@ -372,11 +374,9 @@ int main(int argc, char** argv) { auto range = vt::Index1D(static_cast(num_objs)); auto proxy = vt::theCollection()->construct(range); - auto rootMsg = vt::makeMessage( + proxy.broadcast( num_objs, numRowsPerObject, maxIter ); - proxy.broadcastMsg(rootMsg.get()); - } vt::finalize(); diff --git a/examples/collection/jacobi2d_vt.cc b/examples/collection/jacobi2d_vt.cc index a0c4de023e..bbe112dab2 100644 --- a/examples/collection/jacobi2d_vt.cc +++ b/examples/collection/jacobi2d_vt.cc @@ -144,8 +144,7 @@ struct LinearPb2DJacobi : vt::Collection { // Start a new iteration // auto proxy = getCollectionProxy(); - auto loopMsg = vt::makeMessage(); - proxy.broadcastMsg(loopMsg.get()); + proxy.broadcast(); } else if (iter > maxIter) { fmt::print("\n Maximum Number of Iterations Reached. \n\n"); @@ -312,16 +311,14 @@ struct LinearPb2DJacobi : vt::Collection { std::vector tcopy(numRowsPerObject_ + 2, 0.0); for (size_t jy = 1; jy <= numRowsPerObject_; ++jy) tcopy[jy] = told_[1 + jy * (numRowsPerObject_ + 2)]; - auto leftX = vt::makeMessage(idx, tcopy); - proxy(x-1, y).sendMsg(leftX.get()); + proxy(x-1, y).send(idx, tcopy); } if (y > 0) { std::vector tcopy(numRowsPerObject_ + 2, 0.0); for (size_t jx = 1; jx <= numRowsPerObject_; ++jx) tcopy[jx] = told_[jx + (numRowsPerObject_ + 2)]; - auto bottomY = vt::makeMessage(idx, tcopy); - proxy(x, y-1).sendMsg(bottomY.get()); + proxy(x, y-1).send(idx, tcopy); } if (size_t(x) < numObjsX_ - 1) { @@ -330,16 +327,14 @@ struct LinearPb2DJacobi : vt::Collection { tcopy[jy] = told_[numRowsPerObject_ + jy * (numRowsPerObject_ + 2)]; } - auto rightX = vt::makeMessage(idx, tcopy); - proxy(x+1, y).sendMsg(rightX.get()); + proxy(x+1, y).send(idx, tcopy); } if (size_t(y) < numObjsY_ - 1) { std::vector tcopy(numRowsPerObject_ + 2, 0.0); for (size_t jx = 1; jx <= numRowsPerObject_; ++jx) tcopy[jx] = told_[jx + numRowsPerObject_ * (numRowsPerObject_ + 2)]; - auto topY = vt::makeMessage(idx, tcopy); - proxy(x, y+1).sendMsg(topY.get()); + proxy(x, y+1).send(idx, tcopy); } } @@ -503,13 +498,9 @@ int main(int argc, char** argv) { static_cast(numY_objs) ); auto proxy = vt::theCollection()->construct(range); - auto rootMsg = vt::makeMessage( - numX_objs, - numY_objs, - maxIter + proxy.broadcast( + numX_objs, numY_objs, maxIter ); - proxy.broadcastMsg(rootMsg.get()); - } vt::finalize(); diff --git a/examples/collection/migrate_collection.cc b/examples/collection/migrate_collection.cc index e04b66b444..0cf14f90cd 100644 --- a/examples/collection/migrate_collection.cc +++ b/examples/collection/migrate_collection.cc @@ -106,20 +106,13 @@ int main(int argc, char** argv) { auto range = vt::Index1D(num_elms); auto proxy = vt::theCollection()->construct(range, this_node); - vt::runInEpochRooted([=]{ - auto msg = vt::makeMessage(this_node); - proxy.broadcastMsg(msg.get()); - }); - - vt::runInEpochRooted([=]{ - auto msg = vt::makeMessage(this_node); - proxy.broadcastMsg(msg.get()); - }); - - vt::runInEpochRooted([=]{ - auto msg = vt::makeMessage(this_node); - proxy.broadcastMsg(msg.get()); - }); + vt::runInEpochRooted([=] { proxy.broadcast(this_node); }); + + vt::runInEpochRooted( + [=] { proxy.broadcast(this_node); } + ); + + vt::runInEpochRooted([=] { proxy.broadcast(this_node); }); } vt::finalize(); diff --git a/examples/collection/transpose.cc b/examples/collection/transpose.cc index bae60b1bbb..890d3a83fe 100644 --- a/examples/collection/transpose.cc +++ b/examples/collection/transpose.cc @@ -260,11 +260,10 @@ vt::NodeType my_map(IndexT* idx, IndexT* max_idx, vt::NodeType num_nodes) { solver_info.have_blocks_++; } else { // It's a remote collection block - auto msg2 = vt::makeMessage>(this_node); // Here we will send "this_node" to indicate which nod it should come back // to. Eventually, I will implement a "sub_rank" in VT which can use the // sub-rank instead of the global node id. - proxy[block_id].sendMsg,&Block::dataRequest>(msg2.get()); + proxy[block_id].send,&Block::dataRequest>(this_node); } } diff --git a/src/vt/rdmahandle/manager.collection.impl.h b/src/vt/rdmahandle/manager.collection.impl.h index 830a61535b..d90c022735 100644 --- a/src/vt/rdmahandle/manager.collection.impl.h +++ b/src/vt/rdmahandle/manager.collection.impl.h @@ -159,13 +159,10 @@ Handle Manager::makeCollectionHandles( // ObjGroup and all nodes might not have a collection element mapped to // them. if (this_node == min_node_mapped and in_next_handle == no_rdma_handle) { - auto msg = makeMessage>( - collection_proxy, next_handle, uniform_size, map_han, range - ); - proxy_.template broadcastMsg< + proxy_.template broadcast< impl::InformRDMAMsg, &Manager::informCollectionRDMA - >(msg.get()); + >(collection_proxy, next_handle, uniform_size, map_han, range); } } else { auto sub_proxy_bits = iter->second; diff --git a/src/vt/trace/file_spec/spec.cc b/src/vt/trace/file_spec/spec.cc index 2b25293c9a..199b33e590 100644 --- a/src/vt/trace/file_spec/spec.cc +++ b/src/vt/trace/file_spec/spec.cc @@ -199,8 +199,9 @@ void TraceSpec::parse() { void TraceSpec::broadcastSpec() { auto root = theContext()->getNode(); - auto msg = makeMessage(spec_mod_, spec_exact_, root); - proxy_.template broadcastMsg(msg.get()); + proxy_.template broadcast( + spec_mod_, spec_exact_, root + ); } void TraceSpec::transferSpec(SpecMsg* msg) { diff --git a/src/vt/vrt/collection/balance/baselb/baselb.cc b/src/vt/vrt/collection/balance/baselb/baselb.cc index 6cad23e0ab..b10ac48603 100644 --- a/src/vt/vrt/collection/balance/baselb/baselb.cc +++ b/src/vt/vrt/collection/balance/baselb/baselb.cc @@ -216,8 +216,7 @@ void BaseLB::applyMigrations(TransferVecType const &transfers) { void BaseLB::transferSend(NodeType from, TransferVecType const& transfer) { using MsgType = TransferMsg; - auto msg = makeMessage(transfer); - proxy_[from].template sendMsg(msg); + proxy_[from].template send(transfer); } void BaseLB::transferMigrations(TransferMsg* msg) { diff --git a/src/vt/vrt/collection/balance/hierarchicallb/hierlb.cc b/src/vt/vrt/collection/balance/hierarchicallb/hierlb.cc index 1a28c5ed42..64c087fb98 100644 --- a/src/vt/vrt/collection/balance/hierarchicallb/hierlb.cc +++ b/src/vt/vrt/collection/balance/hierarchicallb/hierlb.cc @@ -328,8 +328,9 @@ void HierarchicalLB::downTreeSend( NodeType const node, NodeType const from, ObjSampleType const& excess, bool const final_child, std::size_t const& approx_size ) { - auto msg = makeMessage(from,excess,final_child); - proxy[node].template sendMsg(msg); + proxy[node].template send( + from, excess, final_child + ); } void HierarchicalLB::downTree( @@ -385,8 +386,9 @@ void HierarchicalLB::lbTreeUpSend( NodeType const node, LoadType const child_load, NodeType const child, ObjSampleType const& load, NodeType const child_size ) { - auto msg = makeMessage(child_load,child,load,child_size); - proxy[node].template sendMsg(msg); + proxy[node].template send( + child_load, child, load, child_size + ); } void HierarchicalLB::lbTreeUp( diff --git a/src/vt/vrt/collection/balance/stats_restart_reader.cc b/src/vt/vrt/collection/balance/stats_restart_reader.cc index e0b30f1bf4..ab356d5e2b 100644 --- a/src/vt/vrt/collection/balance/stats_restart_reader.cc +++ b/src/vt/vrt/collection/balance/stats_restart_reader.cc @@ -212,8 +212,7 @@ void StatsRestartReader::createMigrationInfo( // // Create a message storing the vector // - auto msg = makeMessage(myList); - proxy_[0].sendMsg(msg.get()); + proxy_[0].send(myList); // // Clear old distribution of elements // @@ -272,8 +271,7 @@ void StatsRestartReader::gatherMsgs(VecMsg *msg) { } } if (in > 0) { - auto msg2 = makeMessage(toMove); - proxy_[in].sendMsg(msg2.get()); + proxy_[in].send(toMove); } else { proc_phase_runs_LB_[phaseID] = (!migrate.empty()); auto& myList = proc_move_list_[phaseID]; diff --git a/tests/unit/collection/test_broadcast.h b/tests/unit/collection/test_broadcast.h index ad50e51af6..5c2a529c8b 100644 --- a/tests/unit/collection/test_broadcast.h +++ b/tests/unit/collection/test_broadcast.h @@ -128,16 +128,16 @@ TYPED_TEST_P(TestBroadcast, test_broadcast_1) { auto range = TestIndex(col_size); TestParamType args = ConstructTuple::construct(); auto proxy = theCollection()->construct(range); - auto msg = makeMessage(args); - proxy.template broadcastMsg< + + proxy.template broadcast< MsgType, BroadcastHandlers::handler - >(msg.get()); + >(args); - auto msg2 = makeMessage(args); + auto msg = makeMessage(args); theCollection()->broadcastMsg< MsgType,BroadcastHandlers::handler - >(proxy, msg2.get()); + >(proxy, msg.get()); } } diff --git a/tests/unit/collection/test_collection_construct_common.h b/tests/unit/collection/test_collection_construct_common.h index 83e0550d9d..a87f7766e4 100644 --- a/tests/unit/collection/test_collection_construct_common.h +++ b/tests/unit/collection/test_collection_construct_common.h @@ -151,11 +151,10 @@ TYPED_TEST_P(TestConstruct, test_construct_1) { auto rng = TestIndex(col_size); TestParamType args = ConstructTuple::construct(); auto proxy = ConstructParams::constructTup(rng,args); - auto msg = makeMessage(); - proxy.template broadcastMsg< + proxy.template broadcast< MsgType, ConstructHandlers::handler - >(msg.get()); + >(); } } @@ -170,11 +169,10 @@ TYPED_TEST_P(TestConstructDist, test_construct_distributed_1) { auto proxy = ConstructParams::constructTupCollective( rng,args ); - auto msg = makeMessage(); - proxy.template broadcastMsg< + proxy.template broadcast< MsgType, ConstructHandlers::handler - >(msg.get()); + >(); } REGISTER_TYPED_TEST_SUITE_P(TestConstruct, test_construct_1); diff --git a/tests/unit/collection/test_destroy.cc b/tests/unit/collection/test_destroy.cc index 4720719838..231f500feb 100644 --- a/tests/unit/collection/test_destroy.cc +++ b/tests/unit/collection/test_destroy.cc @@ -114,9 +114,9 @@ TEST_F(TestDestroy, test_destroy_1) { if (this_node == 0) { auto const& range = Index1D(num_nodes * num_elms_per_node); auto proxy = theCollection()->construct(range); - auto msg = makeMessage(); + // ::fmt::print("broadcasting proxy={:x}\n", proxy.getProxy()); - proxy.broadcastMsg(msg.get()); + proxy.broadcast(); } }); // ::fmt::print("num destroyed={}\n", num_destroyed); diff --git a/tests/unit/collection/test_insert.extended.cc b/tests/unit/collection/test_insert.extended.cc index 07ccd766db..9ab7372055 100644 --- a/tests/unit/collection/test_insert.extended.cc +++ b/tests/unit/collection/test_insert.extended.cc @@ -174,8 +174,7 @@ TEST_F(TestInsert, test_insert_send_dense_node_1) { auto proxy = theCollection()->construct(range); for (auto i = 0; i < range.x(); i++) { proxy[i].insert((this_node + 1) % num_nodes); - auto msg = makeMessage(); - proxy[i].sendMsg(msg.get()); + proxy[i].send(); // ::fmt::print("sending to {}\n", i); } } @@ -200,8 +199,7 @@ TEST_F(TestInsert, test_insert_send_sparse_node_1) { auto proxy = theCollection()->construct(range); for (auto i = 0; i < range.x(); i+=16) { proxy[i].insert((this_node + 1) % num_nodes); - auto msg = makeMessage(); - proxy[i].sendMsg(msg.get()); + proxy[i].send(); } } }); diff --git a/tests/unit/collection/test_lb_lite.extended.cc b/tests/unit/collection/test_lb_lite.extended.cc index edc36f5d94..9159557d5b 100644 --- a/tests/unit/collection/test_lb_lite.extended.cc +++ b/tests/unit/collection/test_lb_lite.extended.cc @@ -169,8 +169,8 @@ static void startIter(int32_t const iter, ColProxyType proxy) { ::fmt::print( "startIter: iter={}, cur_iter={}\n", iter, iter ); - auto msg = makeMessage(iter); - proxy.broadcastMsg(msg.get()); + + proxy.broadcast(iter); } struct TestLB : TestParallelHarness { }; diff --git a/tests/unit/collection/test_reduce_collection.cc b/tests/unit/collection/test_reduce_collection.cc index 2670ceb3b7..ff2bd2ba06 100644 --- a/tests/unit/collection/test_reduce_collection.cc +++ b/tests/unit/collection/test_reduce_collection.cc @@ -59,19 +59,18 @@ TEST_P(TestReduceCollection, test_reduce_op) { auto size = (reduce_case == 5 ? collect_size * 4 : collect_size); auto const& range = Index1D(size); auto proxy = theCollection()->construct(range); - auto msg = makeMessage(my_node); switch (reduce_case) { - case 0: proxy.broadcastMsg(msg.get()); break; - case 1: proxy.broadcastMsg(msg.get()); break; - case 2: proxy.broadcastMsg(msg.get()); break; - case 3: proxy.broadcastMsg(msg.get()); break; - case 4: proxy.broadcastMsg(msg.get()); break; + case 0: proxy.broadcast(my_node); break; + case 1: proxy.broadcast(my_node); break; + case 2: proxy.broadcast(my_node); break; + case 3: proxy.broadcast(my_node); break; + case 4: proxy.broadcast(my_node); break; #if ENABLE_REDUCE_EXPR_CALLBACK - case 5: proxy.broadcastMsg(msg.get()); break; - case 6: proxy.broadcastMsg(msg.get()); break; - case 7: proxy.broadcastMsg(msg.get()); break; + case 5: proxy.broadcast(my_node); break; + case 6: proxy.broadcast(my_node); break; + case 7: proxy.broadcast(my_node); break; #endif default: vtAbort("Failure: should not be reached"); } diff --git a/tests/unit/pipe/test_callback_bcast_collection.extended.cc b/tests/unit/pipe/test_callback_bcast_collection.extended.cc index ec54fe4f3a..6a96e3c786 100644 --- a/tests/unit/pipe/test_callback_bcast_collection.extended.cc +++ b/tests/unit/pipe/test_callback_bcast_collection.extended.cc @@ -150,8 +150,7 @@ TEST_F(TestCallbackBcastCollection, test_callback_bcast_collection_1) { runInEpochCollective([&]{ if (this_node == 0) { - auto msg = makeMessage(); - proxy.broadcastMsg(msg.get()); + proxy.broadcast(); } }); } @@ -183,8 +182,7 @@ TEST_F(TestCallbackBcastCollection, test_callback_bcast_collection_2) { runInEpochCollective([&]{ if (this_node == 0) { - auto msg = makeMessage(); - proxy.broadcastMsg(msg.get()); + proxy.broadcast(); } }); } @@ -216,8 +214,7 @@ TEST_F(TestCallbackBcastCollection, test_callback_bcast_collection_3) { runInEpochCollective([&]{ if (this_node == 0) { - auto msg = makeMessage(); - proxy.broadcastMsg(msg.get()); + proxy.broadcast(); } }); } diff --git a/tests/unit/pipe/test_callback_send_collection.extended.cc b/tests/unit/pipe/test_callback_send_collection.extended.cc index 20144ad869..a2458e71d5 100644 --- a/tests/unit/pipe/test_callback_send_collection.extended.cc +++ b/tests/unit/pipe/test_callback_send_collection.extended.cc @@ -157,8 +157,7 @@ TEST_F(TestCallbackSendCollection, test_callback_send_collection_1) { runInEpochCollective([this_node, proxy]{ if (this_node == 0) { - auto msg = makeMessage(); - proxy.broadcastMsg(msg.get()); + proxy.broadcast(); } }); } @@ -200,8 +199,7 @@ TEST_F(TestCallbackSendCollection, test_callback_send_collection_2) { runInEpochCollective([this_node, proxy]{ if (this_node == 0) { - auto msg = makeMessage(); - proxy.broadcastMsg(msg.get()); + proxy.broadcast(); } }); } @@ -235,8 +233,7 @@ TEST_F(TestCallbackSendCollection, test_callback_send_collection_3) { runInEpochCollective([this_node, proxy]{ if (this_node == 0) { - auto msg = makeMessage(); - proxy.broadcastMsg(msg.get()); + proxy.broadcast(); } }); } diff --git a/tutorial/tutorial_2a.h b/tutorial/tutorial_2a.h index 4bf874b348..e7085c560b 100644 --- a/tutorial/tutorial_2a.h +++ b/tutorial/tutorial_2a.h @@ -93,12 +93,10 @@ static inline void collection() { // Broadcast a message to the entire collection. The msgHandler will be // invoked on every element to the collection - auto msg = ::vt::makeMessage(); - proxy.broadcastMsg(msg.get()); + proxy.broadcast(); // Send a message to the 5th element of the collection - auto msg2 = ::vt::makeMessage(); - proxy[5].sendMsg(msg2.get()); + proxy[5].send(); } } /// [Tutorial2A] diff --git a/tutorial/tutorial_2b.h b/tutorial/tutorial_2b.h index f04dcd85eb..50bad6fa95 100644 --- a/tutorial/tutorial_2b.h +++ b/tutorial/tutorial_2b.h @@ -123,8 +123,7 @@ static inline void collectionReduce() { // Broadcast a message to the entire collection. The reduceHandler will be // invoked on every element to the collection - auto msg = ::vt::makeMessage(); - proxy.broadcastMsg(msg.get()); + proxy.broadcast(); } } /// [Tutorial2B]