Skip to content

Commit

Permalink
#1024: Prioritize send/broadcast which creates the message and then d…
Browse files Browse the repository at this point in the history
…oes the action instead of passing the message to these functions
  • Loading branch information
JacobDomagala authored and Braden Mailloux committed Oct 15, 2020
1 parent 67c1d94 commit 489dc15
Show file tree
Hide file tree
Showing 19 changed files with 70 additions and 104 deletions.
18 changes: 9 additions & 9 deletions examples/collection/jacobi1d_vt.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ struct LinearPb1DJacobi : vt::Collection<LinearPb1DJacobi,vt::Index1D> {
// Start a new iteration
//
auto proxy = getCollectionProxy();
auto loopMsg = vt::makeMessage<BlankMsg>();
proxy.broadcastMsg<BlankMsg, &LinearPb1DJacobi::sendInfo>(loopMsg.get());
proxy.broadcast<BlankMsg, &LinearPb1DJacobi::sendInfo>();
}
else if (iter > maxIter) {
fmt::print("\n Maximum Number of Iterations Reached. \n\n");
Expand Down Expand Up @@ -259,14 +258,17 @@ struct LinearPb1DJacobi : vt::Collection<LinearPb1DJacobi,vt::Index1D> {
//--- Send the values to the left
auto proxy = this->getCollectionProxy();
if (myIdx > 0) {
auto leftMsg = vt::makeMessage<VecMsg>(myIdx, told_[1]);
proxy[myIdx-1].sendMsg<VecMsg, &LinearPb1DJacobi::exchange>(leftMsg.get());
proxy[myIdx - 1].send<VecMsg, &LinearPb1DJacobi::exchange>(
myIdx, told_[1]
);
}

//--- Send values to the right
if (size_t(myIdx) < numObjs_ - 1) {
auto rightMsg = vt::makeMessage<VecMsg>(myIdx, told_[numRowsPerObject_]);
proxy[myIdx+1].sendMsg<VecMsg, &LinearPb1DJacobi::exchange>(rightMsg.get());
auto rightMsg = vt::makeMessage<VecMsg>();
proxy[myIdx + 1].send<VecMsg, &LinearPb1DJacobi::exchange>(
myIdx, told_[numRowsPerObject_]
);
}
}

Expand Down Expand Up @@ -372,11 +374,9 @@ int main(int argc, char** argv) {
auto range = vt::Index1D(static_cast<BaseIndexType>(num_objs));

auto proxy = vt::theCollection()->construct<LinearPb1DJacobi>(range);
auto rootMsg = vt::makeMessage<LinearPb1DJacobi::LPMsg>(
proxy.broadcast<LinearPb1DJacobi::LPMsg, &LinearPb1DJacobi::solve>(
num_objs, numRowsPerObject, maxIter
);
proxy.broadcastMsg<LinearPb1DJacobi::LPMsg,&LinearPb1DJacobi::solve>(rootMsg.get());

}

vt::finalize();
Expand Down
23 changes: 7 additions & 16 deletions examples/collection/jacobi2d_vt.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ struct LinearPb2DJacobi : vt::Collection<LinearPb2DJacobi,vt::Index2D> {
// Start a new iteration
//
auto proxy = getCollectionProxy();
auto loopMsg = vt::makeMessage<BlankMsg>();
proxy.broadcastMsg<BlankMsg, &LinearPb2DJacobi::sendInfo>(loopMsg.get());
proxy.broadcast<BlankMsg, &LinearPb2DJacobi::sendInfo>();
}
else if (iter > maxIter) {
fmt::print("\n Maximum Number of Iterations Reached. \n\n");
Expand Down Expand Up @@ -312,16 +311,14 @@ struct LinearPb2DJacobi : vt::Collection<LinearPb2DJacobi,vt::Index2D> {
std::vector<double> tcopy(numRowsPerObject_ + 2, 0.0);
for (size_t jy = 1; jy <= numRowsPerObject_; ++jy)
tcopy[jy] = told_[1 + jy * (numRowsPerObject_ + 2)];
auto leftX = vt::makeMessage<VecMsg>(idx, tcopy);
proxy(x-1, y).sendMsg<VecMsg, &LinearPb2DJacobi::exchange>(leftX.get());
proxy(x-1, y).send<VecMsg, &LinearPb2DJacobi::exchange>(idx, tcopy);
}

if (y > 0) {
std::vector<double> tcopy(numRowsPerObject_ + 2, 0.0);
for (size_t jx = 1; jx <= numRowsPerObject_; ++jx)
tcopy[jx] = told_[jx + (numRowsPerObject_ + 2)];
auto bottomY = vt::makeMessage<VecMsg>(idx, tcopy);
proxy(x, y-1).sendMsg<VecMsg, &LinearPb2DJacobi::exchange>(bottomY.get());
proxy(x, y-1).send<VecMsg, &LinearPb2DJacobi::exchange>(idx, tcopy);
}

if (size_t(x) < numObjsX_ - 1) {
Expand All @@ -330,16 +327,14 @@ struct LinearPb2DJacobi : vt::Collection<LinearPb2DJacobi,vt::Index2D> {
tcopy[jy] = told_[numRowsPerObject_ +
jy * (numRowsPerObject_ + 2)];
}
auto rightX = vt::makeMessage<VecMsg>(idx, tcopy);
proxy(x+1, y).sendMsg<VecMsg, &LinearPb2DJacobi::exchange>(rightX.get());
proxy(x+1, y).send<VecMsg, &LinearPb2DJacobi::exchange>(idx, tcopy);
}

if (size_t(y) < numObjsY_ - 1) {
std::vector<double> tcopy(numRowsPerObject_ + 2, 0.0);
for (size_t jx = 1; jx <= numRowsPerObject_; ++jx)
tcopy[jx] = told_[jx + numRowsPerObject_ * (numRowsPerObject_ + 2)];
auto topY = vt::makeMessage<VecMsg>(idx, tcopy);
proxy(x, y+1).sendMsg<VecMsg, &LinearPb2DJacobi::exchange>(topY.get());
proxy(x, y+1).send<VecMsg, &LinearPb2DJacobi::exchange>(idx, tcopy);
}

}
Expand Down Expand Up @@ -503,13 +498,9 @@ int main(int argc, char** argv) {
static_cast<BaseIndexType>(numY_objs)
);
auto proxy = vt::theCollection()->construct<LinearPb2DJacobi>(range);
auto rootMsg = vt::makeMessage<LinearPb2DJacobi::LPMsg>(
numX_objs,
numY_objs,
maxIter
proxy.broadcast<LinearPb2DJacobi::LPMsg, &LinearPb2DJacobi::solve>(
numX_objs, numY_objs, maxIter
);
proxy.broadcastMsg<LinearPb2DJacobi::LPMsg,&LinearPb2DJacobi::solve>(rootMsg.get());

}

vt::finalize();
Expand Down
21 changes: 7 additions & 14 deletions examples/collection/migrate_collection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,20 +106,13 @@ int main(int argc, char** argv) {
auto range = vt::Index1D(num_elms);
auto proxy = vt::theCollection()->construct<Hello>(range, this_node);

vt::runInEpochRooted([=]{
auto msg = vt::makeMessage<ColMsg>(this_node);
proxy.broadcastMsg<ColMsg, doWork>(msg.get());
});

vt::runInEpochRooted([=]{
auto msg = vt::makeMessage<ColMsg>(this_node);
proxy.broadcastMsg<ColMsg, migrateToNext>(msg.get());
});

vt::runInEpochRooted([=]{
auto msg = vt::makeMessage<ColMsg>(this_node);
proxy.broadcastMsg<ColMsg, doWork>(msg.get());
});
vt::runInEpochRooted([=] { proxy.broadcast<ColMsg, doWork>(this_node); });

vt::runInEpochRooted(
[=] { proxy.broadcast<ColMsg, migrateToNext>(this_node); }
);

vt::runInEpochRooted([=] { proxy.broadcast<ColMsg, doWork>(this_node); });
}

vt::finalize();
Expand Down
3 changes: 1 addition & 2 deletions examples/collection/transpose.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,10 @@ vt::NodeType my_map(IndexT* idx, IndexT* max_idx, vt::NodeType num_nodes) {
solver_info.have_blocks_++;
} else {
// It's a remote collection block
auto msg2 = vt::makeMessage<RequestDataMsg<Block>>(this_node);
// Here we will send "this_node" to indicate which nod it should come back
// to. Eventually, I will implement a "sub_rank" in VT which can use the
// sub-rank instead of the global node id.
proxy[block_id].sendMsg<RequestDataMsg<Block>,&Block::dataRequest>(msg2.get());
proxy[block_id].send<RequestDataMsg<Block>,&Block::dataRequest>(this_node);
}
}

Expand Down
7 changes: 2 additions & 5 deletions src/vt/rdmahandle/manager.collection.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,10 @@ Handle<T, E, IndexT> Manager::makeCollectionHandles(
// ObjGroup and all nodes might not have a collection element mapped to
// them.
if (this_node == min_node_mapped and in_next_handle == no_rdma_handle) {
auto msg = makeMessage<impl::InformRDMAMsg<ProxyT,IndexT>>(
collection_proxy, next_handle, uniform_size, map_han, range
);
proxy_.template broadcastMsg<
proxy_.template broadcast<
impl::InformRDMAMsg<ProxyT,IndexT>,
&Manager::informCollectionRDMA<T,E,ProxyT,ColT>
>(msg.get());
>(collection_proxy, next_handle, uniform_size, map_han, range);
}
} else {
auto sub_proxy_bits = iter->second;
Expand Down
5 changes: 3 additions & 2 deletions src/vt/trace/file_spec/spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,9 @@ void TraceSpec::parse() {

void TraceSpec::broadcastSpec() {
auto root = theContext()->getNode();
auto msg = makeMessage<SpecMsg>(spec_mod_, spec_exact_, root);
proxy_.template broadcastMsg<SpecMsg, &TraceSpec::transferSpec>(msg.get());
proxy_.template broadcast<SpecMsg, &TraceSpec::transferSpec>(
spec_mod_, spec_exact_, root
);
}

void TraceSpec::transferSpec(SpecMsg* msg) {
Expand Down
3 changes: 1 addition & 2 deletions src/vt/vrt/collection/balance/baselb/baselb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,7 @@ void BaseLB::applyMigrations(TransferVecType const &transfers) {

void BaseLB::transferSend(NodeType from, TransferVecType const& transfer) {
using MsgType = TransferMsg<TransferVecType>;
auto msg = makeMessage<MsgType>(transfer);
proxy_[from].template sendMsg<MsgType,&BaseLB::transferMigrations>(msg);
proxy_[from].template send<MsgType,&BaseLB::transferMigrations>(transfer);
}

void BaseLB::transferMigrations(TransferMsg<TransferVecType>* msg) {
Expand Down
10 changes: 6 additions & 4 deletions src/vt/vrt/collection/balance/hierarchicallb/hierlb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,9 @@ void HierarchicalLB::downTreeSend(
NodeType const node, NodeType const from, ObjSampleType const& excess,
bool const final_child, std::size_t const& approx_size
) {
auto msg = makeMessage<LBTreeDownMsg>(from,excess,final_child);
proxy[node].template sendMsg<LBTreeDownMsg,&HierarchicalLB::downTreeHandler>(msg);
proxy[node].template send<LBTreeDownMsg, &HierarchicalLB::downTreeHandler>(
from, excess, final_child
);
}

void HierarchicalLB::downTree(
Expand Down Expand Up @@ -385,8 +386,9 @@ void HierarchicalLB::lbTreeUpSend(
NodeType const node, LoadType const child_load, NodeType const child,
ObjSampleType const& load, NodeType const child_size
) {
auto msg = makeMessage<LBTreeUpMsg>(child_load,child,load,child_size);
proxy[node].template sendMsg<LBTreeUpMsg,&HierarchicalLB::lbTreeUpHandler>(msg);
proxy[node].template send<LBTreeUpMsg, &HierarchicalLB::lbTreeUpHandler>(
child_load, child, load, child_size
);
}

void HierarchicalLB::lbTreeUp(
Expand Down
6 changes: 2 additions & 4 deletions src/vt/vrt/collection/balance/stats_restart_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ void StatsRestartReader::createMigrationInfo(
//
// Create a message storing the vector
//
auto msg = makeMessage<VecMsg>(myList);
proxy_[0].sendMsg<VecMsg, &StatsRestartReader::gatherMsgs>(msg.get());
proxy_[0].send<VecMsg, &StatsRestartReader::gatherMsgs>(myList);
//
// Clear old distribution of elements
//
Expand Down Expand Up @@ -272,8 +271,7 @@ void StatsRestartReader::gatherMsgs(VecMsg *msg) {
}
}
if (in > 0) {
auto msg2 = makeMessage<VecMsg>(toMove);
proxy_[in].sendMsg<VecMsg, &StatsRestartReader::scatterMsgs>(msg2.get());
proxy_[in].send<VecMsg, &StatsRestartReader::scatterMsgs>(toMove);
} else {
proc_phase_runs_LB_[phaseID] = (!migrate.empty());
auto& myList = proc_move_list_[phaseID];
Expand Down
10 changes: 5 additions & 5 deletions tests/unit/collection/test_broadcast.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,16 @@ TYPED_TEST_P(TestBroadcast, test_broadcast_1) {
auto range = TestIndex(col_size);
TestParamType args = ConstructTuple<TestParamType>::construct();
auto proxy = theCollection()->construct<ColType>(range);
auto msg = makeMessage<MsgType>(args);
proxy.template broadcastMsg<

proxy.template broadcast<
MsgType,
BroadcastHandlers<ColType>::handler
>(msg.get());
>(args);

auto msg2 = makeMessage<MsgType>(args);
auto msg = makeMessage<MsgType>(args);
theCollection()->broadcastMsg<
MsgType,BroadcastHandlers<ColType>::handler
>(proxy, msg2.get());
>(proxy, msg.get());
}
}

Expand Down
10 changes: 4 additions & 6 deletions tests/unit/collection/test_collection_construct_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,10 @@ TYPED_TEST_P(TestConstruct, test_construct_1) {
auto rng = TestIndex(col_size);
TestParamType args = ConstructTuple<TestParamType>::construct();
auto proxy = ConstructParams<ColType,TestParamType>::constructTup(rng,args);
auto msg = makeMessage<MsgType>();
proxy.template broadcastMsg<
proxy.template broadcast<
MsgType,
ConstructHandlers::handler<ColType,MsgType>
>(msg.get());
>();
}
}

Expand All @@ -170,11 +169,10 @@ TYPED_TEST_P(TestConstructDist, test_construct_distributed_1) {
auto proxy = ConstructParams<ColType,TestParamType>::constructTupCollective(
rng,args
);
auto msg = makeMessage<MsgType>();
proxy.template broadcastMsg<
proxy.template broadcast<
MsgType,
ConstructHandlers::handler<ColType,MsgType>
>(msg.get());
>();
}

REGISTER_TYPED_TEST_SUITE_P(TestConstruct, test_construct_1);
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/collection/test_destroy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ TEST_F(TestDestroy, test_destroy_1) {
if (this_node == 0) {
auto const& range = Index1D(num_nodes * num_elms_per_node);
auto proxy = theCollection()->construct<DestroyTest>(range);
auto msg = makeMessage<WorkMsg>();

// ::fmt::print("broadcasting proxy={:x}\n", proxy.getProxy());
proxy.broadcastMsg<WorkMsg,DestroyTest::work>(msg.get());
proxy.broadcast<WorkMsg,DestroyTest::work>();
}
});
// ::fmt::print("num destroyed={}\n", num_destroyed);
Expand Down
6 changes: 2 additions & 4 deletions tests/unit/collection/test_insert.extended.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,7 @@ TEST_F(TestInsert, test_insert_send_dense_node_1) {
auto proxy = theCollection()->construct<InsertTest>(range);
for (auto i = 0; i < range.x(); i++) {
proxy[i].insert((this_node + 1) % num_nodes);
auto msg = makeMessage<WorkMsg>();
proxy[i].sendMsg<WorkMsg,&InsertTest::work>(msg.get());
proxy[i].send<WorkMsg,&InsertTest::work>();
// ::fmt::print("sending to {}\n", i);
}
}
Expand All @@ -200,8 +199,7 @@ TEST_F(TestInsert, test_insert_send_sparse_node_1) {
auto proxy = theCollection()->construct<InsertTest>(range);
for (auto i = 0; i < range.x(); i+=16) {
proxy[i].insert((this_node + 1) % num_nodes);
auto msg = makeMessage<WorkMsg>();
proxy[i].sendMsg<WorkMsg,&InsertTest::work>(msg.get());
proxy[i].send<WorkMsg,&InsertTest::work>();
}
}
});
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/collection/test_lb_lite.extended.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ static void startIter(int32_t const iter, ColProxyType proxy) {
::fmt::print(
"startIter: iter={}, cur_iter={}\n", iter, iter
);
auto msg = makeMessage<IterMsg>(iter);
proxy.broadcastMsg<IterMsg,LBTest::iterWork>(msg.get());

proxy.broadcast<IterMsg,LBTest::iterWork>(iter);
}

struct TestLB : TestParallelHarness { };
Expand Down
17 changes: 8 additions & 9 deletions tests/unit/collection/test_reduce_collection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,18 @@ TEST_P(TestReduceCollection, test_reduce_op) {
auto size = (reduce_case == 5 ? collect_size * 4 : collect_size);
auto const& range = Index1D(size);
auto proxy = theCollection()->construct<MyCol>(range);
auto msg = makeMessage<ColMsg>(my_node);

switch (reduce_case) {
case 0: proxy.broadcastMsg<ColMsg, colHanBasic>(msg.get()); break;
case 1: proxy.broadcastMsg<ColMsg, colHanVec>(msg.get()); break;
case 2: proxy.broadcastMsg<ColMsg, colHanVecProxy>(msg.get()); break;
case 3: proxy.broadcastMsg<ColMsg, colHanVecProxyCB>(msg.get()); break;
case 4: proxy.broadcastMsg<ColMsg, colHanNoneCB>(msg.get()); break;
case 0: proxy.broadcast<ColMsg, colHanBasic>(my_node); break;
case 1: proxy.broadcast<ColMsg, colHanVec>(my_node); break;
case 2: proxy.broadcast<ColMsg, colHanVecProxy>(my_node); break;
case 3: proxy.broadcast<ColMsg, colHanVecProxyCB>(my_node); break;
case 4: proxy.broadcast<ColMsg, colHanNoneCB>(my_node); break;

#if ENABLE_REDUCE_EXPR_CALLBACK
case 5: proxy.broadcastMsg<ColMsg, colHanPartial>(msg.get()); break;
case 6: proxy.broadcastMsg<ColMsg, colHanPartialMulti>(msg.get()); break;
case 7: proxy.broadcastMsg<ColMsg, colHanPartialProxy>(msg.get()); break;
case 5: proxy.broadcast<ColMsg, colHanPartial>(my_node); break;
case 6: proxy.broadcast<ColMsg, colHanPartialMulti>(my_node); break;
case 7: proxy.broadcast<ColMsg, colHanPartialProxy>(my_node); break;
#endif
default: vtAbort("Failure: should not be reached");
}
Expand Down
9 changes: 3 additions & 6 deletions tests/unit/pipe/test_callback_bcast_collection.extended.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ TEST_F(TestCallbackBcastCollection, test_callback_bcast_collection_1) {

runInEpochCollective([&]{
if (this_node == 0) {
auto msg = makeMessage<TestColMsg>();
proxy.broadcastMsg<TestColMsg, &TestCol::check>(msg.get());
proxy.broadcast<TestColMsg, &TestCol::check>();
}
});
}
Expand Down Expand Up @@ -183,8 +182,7 @@ TEST_F(TestCallbackBcastCollection, test_callback_bcast_collection_2) {

runInEpochCollective([&]{
if (this_node == 0) {
auto msg = makeMessage<TestColMsg>();
proxy.broadcastMsg<TestColMsg, &TestCol::check>(msg.get());
proxy.broadcast<TestColMsg, &TestCol::check>();
}
});
}
Expand Down Expand Up @@ -216,8 +214,7 @@ TEST_F(TestCallbackBcastCollection, test_callback_bcast_collection_3) {

runInEpochCollective([&]{
if (this_node == 0) {
auto msg = makeMessage<TestColMsg>();
proxy.broadcastMsg<TestColMsg, &TestCol::check>(msg.get());
proxy.broadcast<TestColMsg, &TestCol::check>();
}
});
}
Expand Down
Loading

0 comments on commit 489dc15

Please sign in to comment.