Skip to content

Commit

Permalink
#962 send- update non-core to use MsgPtr& API
Browse files Browse the repository at this point in the history
- Examples / tutorials / tests.
  • Loading branch information
pnstickne committed Aug 14, 2020
1 parent 40c86aa commit 7ee0c10
Show file tree
Hide file tree
Showing 56 changed files with 145 additions and 129 deletions.
4 changes: 2 additions & 2 deletions docs/md/active-messenger.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ int main(int argc, char** argv) {
vec_to_send.push_back(54.);

auto msg = vt::makeMessage<MyMsg>(10, vec_to_send);
vt::theMsg()->sendMsg<MyMsg, myHandler>(1, msg.get()); // send to node 1
vt::theMsg()->sendMsg<MyMsg, myHandler>(1, msg); // send to node 1

auto msg2 = vt::makeMessage<MyMsg>(11, vec_to_send);
vt::theMsg()->sendMsg<MyFunctor>(1, msg2.get()); // send to node 1
vt::theMsg()->sendMsg<MyFunctor>(1, msg2); // send to node 1
});
}

Expand Down
2 changes: 1 addition & 1 deletion docs/md/vt.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ management.

if (this_node == 0) {
auto msg = vt::makeMessage<HelloMsg>(this_node);
vt::theMsg()->broadcastMsg<HelloMsg, hello_world>(msg.get());
vt::theMsg()->broadcastMsg<HelloMsg, hello_world>(msg);
done = true;
}

Expand Down
2 changes: 1 addition & 1 deletion examples/callback/callback.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ void colHan(TestMsg* msg, MyCol* col) {

void bounceCallback(vt::Callback<TestMsg> cb) {
auto msg = vt::makeMessage<HelloMsg>(cb);
vt::theMsg()->sendMsg<HelloMsg, hello_world>(1, msg.get());
vt::theMsg()->sendMsg<HelloMsg, hello_world>(1, msg);
}

int main(int argc, char** argv) {
Expand Down
2 changes: 1 addition & 1 deletion examples/callback/callback_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ int main(int argc, char** argv) {
// Make a callback that triggers the callback with a context
auto cb = vt::theCB()->makeFunc<DataMsg,MyContext>(&my_global_ctx, callbackFn);
auto msg = vt::makeMessage<CallbackMsg>(cb);
vt::theMsg()->sendMsg<CallbackMsg,handler>(1, msg.get());
vt::theMsg()->sendMsg<CallbackMsg,handler>(1, msg);
}

vt::finalize();
Expand Down
6 changes: 3 additions & 3 deletions examples/collection/transpose.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,15 @@ struct Block : vt::Collection<Block, vt::Index1D> {
auto const from_idx = getIndex().x();
auto data_msg = vt::makeMessage<DataMsg>(data_,from_idx);
vt::theMsg()->sendMsg<DataMsg,SubSolveInfo::solveDataIncoming>(
requesting_node, data_msg.get()
requesting_node, data_msg
);
}

void doneInit(InitMsg* msg) {
if (getIndex().x() == 0) {
auto proxy = this->getCollectionProxy();
auto proxy_msg = vt::makeMessage<ProxyMsg>(proxy.getProxy());
vt::theMsg()->broadcastMsg<SetupGroup,ProxyMsg>(proxy_msg.get());
vt::theMsg()->broadcastMsg<SetupGroup,ProxyMsg>(proxy_msg);
// Invoke it locally: broadcast sends to all other nodes
auto proxy_msg_local = vt::makeMessage<ProxyMsg>(proxy.getProxy());
SetupGroup()(proxy_msg_local.get());
Expand Down Expand Up @@ -319,7 +319,7 @@ static void solveGroupSetup(vt::NodeType this_node, vt::VirtualProxyType coll_pr
if (this_node == 1) {
auto msg = vt::makeMessage<SubSolveMsg>(coll_proxy);
vt::envelopeSetGroup(msg->env, group_id);
vt::theMsg()->broadcastMsg<SubSolveMsg,SubSolveInfo::subSolveHandler>(msg.get());
vt::theMsg()->broadcastMsg<SubSolveMsg,SubSolveInfo::subSolveHandler>(msg);
}
}, true
);
Expand Down
2 changes: 1 addition & 1 deletion examples/group/group_collective.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ int main(int argc, char** argv) {
if (this_node == 1) {
auto msg = vt::makeMessage<HelloGroupMsg>();
vt::envelopeSetGroup(msg->env, group);
vt::theMsg()->broadcastMsg<HelloGroupMsg, hello_group_handler>(msg.get());
vt::theMsg()->broadcastMsg<HelloGroupMsg, hello_group_handler>(msg);
}
}
);
Expand Down
4 changes: 2 additions & 2 deletions examples/group/group_rooted.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ int main(int argc, char** argv) {

if (this_node == 0) {
auto msg = vt::makeMessage<HelloMsg>(this_node);
vt::theMsg()->broadcastMsg<HelloMsg, hello_world>(msg.get());
vt::theMsg()->broadcastMsg<HelloMsg, hello_world>(msg);

using RangeType = vt::group::region::Range;
auto list = std::make_unique<RangeType>(num_nodes / 2, num_nodes);

vt::theGroup()->newGroup(std::move(list), [=](vt::GroupType group){
auto gmsg = vt::makeMessage<HelloMsg>(this_node);
vt::envelopeSetGroup(gmsg->env, group);
vt::theMsg()->broadcastMsg<HelloMsg, hello_group_handler>(gmsg.get());
vt::theMsg()->broadcastMsg<HelloMsg, hello_group_handler>(gmsg);
});
}

Expand Down
2 changes: 1 addition & 1 deletion examples/hello_world/hello_world.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ int main(int argc, char** argv) {

if (this_node == 0) {
auto msg = vt::makeMessage<HelloMsg>(this_node);
vt::theMsg()->broadcastMsg<HelloMsg, hello_world>(msg.get());
vt::theMsg()->broadcastMsg<HelloMsg, hello_world>(msg);
}

vt::finalize();
Expand Down
2 changes: 1 addition & 1 deletion examples/hello_world/hello_world_functor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ int main(int argc, char** argv) {

if (this_node == 0) {
auto msg = vt::makeMessage<HelloMsg>(this_node);
vt::theMsg()->broadcastMsg<HelloWorld,HelloMsg>(msg.get());
vt::theMsg()->broadcastMsg<HelloWorld,HelloMsg>(msg);
}

vt::finalize();
Expand Down
2 changes: 1 addition & 1 deletion examples/hello_world/hello_world_virtual_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ int main(int argc, char** argv) {

// send out the proxy to all the nodes
auto msg = vt::makeMessage<HelloMsg>(proxy);
vt::theMsg()->broadcastMsg<HelloMsg, hello_world>(msg.get());
vt::theMsg()->broadcastMsg<HelloMsg, hello_world>(msg);
}

vt::finalize();
Expand Down
2 changes: 1 addition & 1 deletion examples/hello_world/ring.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ static void sendToNext() {
vt::NodeType next_node = this_node + 1 >= num_nodes ? 0 : this_node + 1;

auto msg = vt::makeMessage<RingMsg>(this_node);
vt::theMsg()->sendMsg<RingMsg, ring>(next_node, msg.get());
vt::theMsg()->sendMsg<RingMsg, ring>(next_node, msg);
}

int main(int argc, char** argv) {
Expand Down
2 changes: 1 addition & 1 deletion examples/rdma/rdma_simple_get.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ int main(int argc, char** argv) {

auto msg = vt::makeMessage<HandleMsg>(this_node);
msg->han = my_handle;
vt::theMsg()->broadcastMsg<HandleMsg, tell_handle>(msg.get());
vt::theMsg()->broadcastMsg<HandleMsg, tell_handle>(msg);
}

vt::finalize();
Expand Down
2 changes: 1 addition & 1 deletion examples/rdma/rdma_simple_get_direct.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ int main(int argc, char** argv) {

auto msg = vt::makeMessage<HandleMsg>(this_node);
msg->han = my_handle;
vt::theMsg()->broadcastMsg<HandleMsg, tellHandle>(msg.get());
vt::theMsg()->broadcastMsg<HandleMsg, tellHandle>(msg);
}

vt::finalize();
Expand Down
4 changes: 2 additions & 2 deletions examples/rdma/rdma_simple_put.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ static void put_data_fn(HandleMsg* msg) {
fmt::print("{}: after put: sending msg back to 0\n", this_node);
auto msg2 = vt::makeMessage<HandleMsg>(this_node);
msg2->han = handle;
vt::theMsg()->sendMsg<HandleMsg,read_data_fn>(0, msg2.get());
vt::theMsg()->sendMsg<HandleMsg,read_data_fn>(0, msg2);
}
);
}
Expand Down Expand Up @@ -145,7 +145,7 @@ int main(int argc, char** argv) {

auto msg = vt::makeMessage<HandleMsg>(this_node);
msg->han = my_handle;
vt::theMsg()->broadcastMsg<HandleMsg,put_data_fn>(msg.get());
vt::theMsg()->broadcastMsg<HandleMsg,put_data_fn>(msg);
}

vt::finalize();
Expand Down
6 changes: 3 additions & 3 deletions examples/rdma/rdma_simple_put_direct.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ static void putDataFn(HandleMsg* msg) {
);

auto back = vt::makeMessage<HandleMsg>(han);
vt::theMsg()->sendMsg<HandleMsg, readDataFn>(0, back.get());
vt::theMsg()->sendMsg<HandleMsg, readDataFn>(0, back);
});
}
}
Expand Down Expand Up @@ -118,8 +118,8 @@ int main(int argc, char** argv) {

auto msg1 = vt::makeMessage<HandleMsg>(my_handle);
auto msg2 = vt::makeMessage<HandleMsg>(my_handle);
vt::theMsg()->sendMsg<HandleMsg, putDataFn>(1, msg1.get());
vt::theMsg()->sendMsg<HandleMsg, putDataFn>(2, msg2.get());
vt::theMsg()->sendMsg<HandleMsg, putDataFn>(1, msg1);
vt::theMsg()->sendMsg<HandleMsg, putDataFn>(2, msg2);
}

vt::finalize();
Expand Down
4 changes: 2 additions & 2 deletions examples/termination/termination_collective.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ static void test_handler(TestMsg* msg) {
num--;
if (num > 0) {
auto msg_send = vt::makeMessage<TestMsg>();
vt::theMsg()->sendMsg<TestMsg, test_handler>(nextNode(), msg_send.get());
vt::theMsg()->sendMsg<TestMsg, test_handler>(nextNode(), msg_send);
}
}

Expand All @@ -90,7 +90,7 @@ int main(int argc, char** argv) {
{
auto msg = vt::makeMessage<TestMsg>();
vt::envelopeSetEpoch(msg->env, epoch);
vt::theMsg()->sendMsg<TestMsg, test_handler>(nextNode(), msg.get());
vt::theMsg()->sendMsg<TestMsg, test_handler>(nextNode(), msg);
}

vt::theTerm()->finishedEpoch(epoch);
Expand Down
4 changes: 2 additions & 2 deletions examples/termination/termination_rooted.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ static void test_handler(TestMsg* msg) {
num--;
if (num > 0) {
auto msg_send = vt::makeMessage<TestMsg>();
vt::theMsg()->sendMsg<TestMsg, test_handler>(nextNode(), msg_send.get());
vt::theMsg()->sendMsg<TestMsg, test_handler>(nextNode(), msg_send);
}
}

Expand All @@ -89,7 +89,7 @@ int main(int argc, char** argv) {

auto msg = vt::makeMessage<TestMsg>();
vt::envelopeSetEpoch(msg->env, epoch);
vt::theMsg()->sendMsg<TestMsg, test_handler>(nextNode(), msg.get());
vt::theMsg()->sendMsg<TestMsg, test_handler>(nextNode(), msg);
vt::theTerm()->finishedEpoch(epoch);
}

Expand Down
4 changes: 2 additions & 2 deletions tests/perf/comm_cost_curve.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ static void handler(PingMsg*) {
count++;
if (count == pings) {
auto msg = vt::makeMessage<PingMsg>(1);
vt::theMsg()->sendMsg<PingMsg,done>(0, msg.get());
vt::theMsg()->sendMsg<PingMsg,done>(0, msg);
count = 0;
}
}
Expand All @@ -89,7 +89,7 @@ void sender() {
auto start = vt::timing::Timing::getCurrentTime();
for (int i = 0; i < pings; i++) {
auto msg = vt::makeMessage<PingMsg>(bytes);
vt::theMsg()->sendMsg<PingMsg,handler>(1, msg.get());
vt::theMsg()->sendMsg<PingMsg,handler>(1, msg);
}
while (not is_done) vt::runScheduler();
is_done = false;
Expand Down
8 changes: 4 additions & 4 deletions tests/perf/ping_pong.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ static void finishedPing(FinishedPingMsg<num_bytes>* msg) {
if (num_bytes != max_bytes) {
auto pmsg = makeMessage<PingMsg<num_bytes * 2>>();
theMsg()->sendMsg<PingMsg<num_bytes * 2>, pingPong<num_bytes * 2>>(
pong_node, pmsg.get()
pong_node, pmsg
);
}
}
Expand Down Expand Up @@ -134,7 +134,7 @@ static void pingPong(PingMsg<num_bytes>* in_msg) {
if (cnt >= num_pings) {
auto msg = makeMessage<FinishedPingMsg<num_bytes>>(num_bytes);
theMsg()->sendMsg<FinishedPingMsg<num_bytes>, finishedPing<num_bytes>>(
0, msg.get()
0, msg
);
} else {
NodeType const next =
Expand All @@ -146,7 +146,7 @@ static void pingPong(PingMsg<num_bytes>* in_msg) {
);
#else
auto m = makeMessage<PingMsg<num_bytes>>(cnt + 1);
theMsg()->sendMsg<PingMsg<num_bytes>, pingPong<num_bytes>>(next, m.get());
theMsg()->sendMsg<PingMsg<num_bytes>, pingPong<num_bytes>>(next, m);
#endif
}
}
Expand All @@ -169,7 +169,7 @@ int main(int argc, char** argv) {

if (my_node == 0) {
auto m = makeMessage<PingMsg<min_bytes>>();
theMsg()->sendMsg<PingMsg<min_bytes>, pingPong<min_bytes>>(pong_node, m.get());
theMsg()->sendMsg<PingMsg<min_bytes>, pingPong<min_bytes>>(pong_node, m);
}

while (!rt->isTerminated()) {
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/active/test_active_bcast_put.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ TEST_P(TestActiveBroadcastPut, test_type_safe_active_fn_bcast2) {
for (int i = 0; i < num_msg_sent; i++) {
auto msg = makeMessage<PutTestMessage>();
msg->setPut(&put_payload[0], put_size * sizeof(int));
theMsg()->broadcastMsg<PutTestMessage, test_handler>(msg.get());
theMsg()->broadcastMsg<PutTestMessage, test_handler>(msg);
}
}
});
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/active/test_active_broadcast.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ TEST_P(TestActiveBroadcast, test_type_safe_active_fn_bcast2) {
if (my_node == root) {
for (int i = 0; i < num_msg_sent; i++) {
auto msg = makeMessage<TestMsg>();
theMsg()->broadcastMsg<TestMsg, test_handler>(msg.get());
theMsg()->broadcastMsg<TestMsg, test_handler>(msg);
}
}
});
Expand Down
10 changes: 3 additions & 7 deletions tests/unit/active/test_active_send.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ TEST_F(TestActiveSend, test_type_safe_active_fn_send) {
fmt::print("{}: sendMsg: i={}\n", my_node, i);
#endif
auto msg = makeMessage<TestMsg>();
theMsg()->sendMsg<TestMsg, test_handler>(1, msg.get());
theMsg()->sendMsg<TestMsg, test_handler>(to_node, msg);
}
} else if (my_node == to_node) {
theTerm()->addAction([=]{
Expand Down Expand Up @@ -168,9 +168,7 @@ TEST_F(TestActiveSend, test_type_safe_active_fn_send_small_put) {
#if DEBUG_TEST_HARNESS_PRINT
fmt::print("{}: sendMsg: (put) i={}\n", my_node, i);
#endif
theMsg()->sendMsg<PutTestMessage, test_handler_2>(
1, msg.get()
);
theMsg()->sendMsg<PutTestMessage, test_handler_2>(to_node, msg);
}
}

Expand All @@ -196,9 +194,7 @@ TEST_F(TestActiveSend, test_type_safe_active_fn_send_large_put) {
#if DEBUG_TEST_HARNESS_PRINT
fmt::print("{}: sendMsg: (put) i={}\n", my_node, i);
#endif
theMsg()->sendMsg<PutTestMessage, test_handler_3>(
1, msg.get()
);
theMsg()->sendMsg<PutTestMessage, test_handler_3>(to_node, msg);
}
}

Expand Down
2 changes: 1 addition & 1 deletion tests/unit/active/test_active_send_put.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ TEST_P(TestActiveSendPut, test_active_fn_send_put_param) {
#if DEBUG_TEST_HARNESS_PRINT
fmt::print("{}: sendMsg: (put) i={}\n", my_node, i);
#endif
theMsg()->sendMsg<PutTestMessage, test_handler>(1, msg.get());
theMsg()->sendMsg<PutTestMessage, test_handler>(1, msg);
}

// Spin here so test_vec does not go out of scope before the send completes
Expand Down
8 changes: 5 additions & 3 deletions tests/unit/active/test_pending_send.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ struct TestPendingSend : TestParallelHarness {
auto const num_nodes = theContext()->getNumNodes();
auto prev = this_node - 1 >= 0 ? this_node - 1 : num_nodes - 1;
auto msg = vt::makeMessage<TestMsg>();
theMsg()->sendMsg<TestMsg, handlerPong>(prev, msg.get());
theMsg()->sendMsg<TestMsg, handlerPong>(prev, msg);
}

static bool delivered;
Expand All @@ -81,13 +81,15 @@ TEST_F(TestPendingSend, test_pending_send_hold) {
theMsg()->pushEpoch(ep);

auto next = this_node + 1 < num_nodes ? this_node + 1 : 0;

auto msg = vt::makeMessage<TestMsg>();
auto msg_hold = promoteMsg(msg.get());
pending.emplace_back(
theMsg()->sendMsg<TestMsg, handlerPing>(next, msg.get())
theMsg()->sendMsg<TestMsg, handlerPing>(next, msg)
);

// Must be stamped with the current epoch
EXPECT_EQ(envelopeGetEpoch(msg->env), ep);
EXPECT_EQ(envelopeGetEpoch(msg_hold->env), ep);

theMsg()->popEpoch(ep);
theTerm()->addAction(ep, [&done] { done = true; });
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/collectives/test_mpi_collective.cc
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ TEST_F(TestMPICollective, test_mpi_collective_4) {
// Broadcast out node 0's order to confirm with all other nodes
if (this_node == 0) {
auto msg = makeMessage<OrderMsg>(run_order);
theMsg()->broadcastMsg<OrderMsg,orderHan>(msg.get());
theMsg()->broadcastMsg<OrderMsg,orderHan>(msg);
}
}

Expand Down
6 changes: 3 additions & 3 deletions tests/unit/group/test_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ TEST_F(TestGroup, test_group_range_construct_1) {
fmt::print("Group is created: group={:x}\n", group);
auto msg = makeMessage<TestMsg>();
envelopeSetGroup(msg->env, group);
theMsg()->broadcastMsg<TestMsg,groupHandler>(msg.get());
theMsg()->broadcastMsg<TestMsg,groupHandler>(msg);
}
);
}
Expand Down Expand Up @@ -113,7 +113,7 @@ TEST_F(TestGroup, test_group_range_construct_2) {
fmt::print("Group is created: group={:x}\n", group);
auto msg = makeMessage<TestMsg>();
envelopeSetGroup(msg->env, group);
theMsg()->broadcastMsg<TestMsg,groupHandler>(msg.get());
theMsg()->broadcastMsg<TestMsg,groupHandler>(msg);
}
);
}
Expand Down Expand Up @@ -141,7 +141,7 @@ TEST_F(TestGroup, test_group_collective_construct_1) {
EXPECT_EQ(is_default_group, false);
auto msg = makeMessage<TestMsg>();
envelopeSetGroup(msg->env, group);
theMsg()->broadcastMsg<TestMsg,groupHandler>(msg.get());
theMsg()->broadcastMsg<TestMsg,groupHandler>(msg);
}
);
});
Expand Down
Loading

0 comments on commit 7ee0c10

Please sign in to comment.