Skip to content

Commit

Permalink
#1111: active: return event and fix managed msg bug
Browse files Browse the repository at this point in the history
  • Loading branch information
lifflander committed Oct 16, 2020
1 parent 0973c7b commit ea5ca54
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 13 deletions.
34 changes: 22 additions & 12 deletions src/vt/messaging/active.cc
Original file line number Diff line number Diff line change
Expand Up @@ -299,19 +299,27 @@ void ActiveMessenger::handleChunkedMultiMsg(MultiMsg* msg) {
recvDataDirect(nchunks, buf, tag, sender, size, 0, nullptr, fn);
}

void ActiveMessenger::sendMsgMPI(
EventType ActiveMessenger::sendMsgMPI(
NodeType const& dest, MsgSharedPtr<BaseMsgType> const& base,
MsgSizeType const& msg_size, TagType const& send_tag
) {
BaseMsgType* base_typed_msg = base.get();

char* untyped_msg = reinterpret_cast<char*>(base_typed_msg);

vt_debug_print(
active, node,
"sendMsgMPI: dest={}, msg_size={}, send_tag={}\n",
dest, msg_size, send_tag
);

if (static_cast<std::size_t>(msg_size) < max_per_send) {
auto const event_id = theEvent()->createMPIEvent(this_node_);
auto& holder = theEvent()->getEventHolder(event_id);
auto mpi_event = holder.get_event();

mpi_event->setManagedMessage(base.to<ShortMessage>());

int small_msg_size = static_cast<int>(msg_size);
{
VT_ALLOW_MPI_CALLS;
Expand All @@ -335,6 +343,8 @@ void ActiveMessenger::sendMsgMPI(
}
#endif
}

return event_id;
} else {
vt_debug_print(
active, node,
Expand All @@ -352,6 +362,8 @@ void ActiveMessenger::sendMsgMPI(

auto m = makeMessage<MultiMsg>(info, this_node, msg_size);
sendMsg<MultiMsg, chunkedMultiMsg>(dest, m);

return event_id;
}
}

Expand Down Expand Up @@ -381,17 +393,15 @@ EventType ActiveMessenger::sendMsgBytes(
dest >= theContext()->getNumNodes() || dest < 0, "Invalid destination: {}"
);

{
if (is_bcast) {
bcastsSentCount.increment(1);
}
if (is_term) {
tdSentCount.increment(1);
}
amSentCounterGauge.incrementUpdate(msg_size, 1);

sendMsgMPI(dest, base, msg_size, send_tag);
if (is_bcast) {
bcastsSentCount.increment(1);
}
if (is_term) {
tdSentCount.increment(1);
}
amSentCounterGauge.incrementUpdate(msg_size, 1);

EventType const event_id = sendMsgMPI(dest, base, msg_size, send_tag);

if (not is_term) {
theTerm()->produce(epoch,1,dest);
Expand All @@ -402,7 +412,7 @@ EventType ActiveMessenger::sendMsgBytes(
l->send(dest, msg_size, is_bcast);
}

return no_event;
return event_id;
}

#if vt_check_enabled(trace_enabled)
Expand Down
4 changes: 3 additions & 1 deletion src/vt/messaging/active.h
Original file line number Diff line number Diff line change
Expand Up @@ -1496,8 +1496,10 @@ struct ActiveMessenger : runtime::component::PollableComponent<ActiveMessenger>
* \param[in] base the message base pointer
* \param[in] msg_size the size of the message
* \param[in] send_tag the send tag on the message
*
* \return the event to test/wait for completion
*/
void sendMsgMPI(
EventType sendMsgMPI(
NodeType const& dest, MsgSharedPtr<BaseMsgType> const& base,
MsgSizeType const& msg_size, TagType const& send_tag
);
Expand Down

0 comments on commit ea5ca54

Please sign in to comment.