Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MTL OFI: add support for FI_REMOTE_CQ_DATA. #5004

Merged
merged 1 commit into from
Jun 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions ompi/mca/mtl/ofi/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
OFI MTL

The OFI MTL supports Libfabric (a.k.a. Open Fabrics Interfaces OFI,
https://ofiwg.github.io/libfabric/) tagged APIs (fi_tagged(3)). At
initialization time, the MTL queries libfabric for providers supporting tag matching
(fi_getinfo(3)). Libfabric will return a list of providers that satisfy the requested
capabilities, having the most performant one at the top of the list.
The user may modify the OFI provider selection with mca parameters
mtl_ofi_provider_include or mtl_ofi_provider_exclude.

PROGRESS:
The MTL registers a progress function to opal_progress. There is currently
no support for asynchronous progress. The progress function reads multiple events
from the OFI provider Completion Queue (CQ) per iteration (defaults to 100, can be
modified with the mca mtl_ofi_progress_event_cnt) and iterates until the
completion queue is drained.

COMPLETIONS:
Each operation uses a request type ompi_mtl_ofi_request_t which includes a reference
to an operation specific completion callback, an MPI request, and a context. The
context (fi_context) is used to map completion events with MPI_requests when reading the
CQ.

OFI TAG:
MPI needs to send 96 bits of information per message (32 bits communicator id,
32 bits source rank, 32 bits MPI tag) but OFI only offers 64 bits tags. In
addition, the OFI MTL uses 4 bits of the OFI tag for the synchronous send protocol.
Therefore, there are only 60 bits available in the OFI tag for message usage. The
OFI MTL offers the mtl_ofi_tag_mode mca parameter with 4 modes to address this:

"auto" (Default):
After the OFI provider is selected, a runtime check is performed to assess
FI_REMOTE_CQ_DATA and FI_DIRECTED_RECV support (see fi_tagged(3), fi_msg(2)
and fi_getinfo(3)). If supported, "ofi_tag_full" is used. If not supported,
fall back to "ofi_tag_1".

"ofi_tag_1":
For providers that do not support FI_REMOTE_CQ_DATA, the OFI MTL will
trim the fields (Communicator ID, Source Rank, MPI tag) to make them fit the 60
bits available bit in the OFI tag. There are two options available with different
number of bits for the Communicator ID and MPI tag fields. This tag distribution
offers: 12 bits for Communicator ID (max Communicator ID 4,095) subject to
provider reserved bits (see mem_tag_format below), 16 bits for Source Rank (max
Source Rank 65,535), 32 bits for MPI tag (max MPI tag is INT_MAX).

"ofi_tag_2":
Same as 2 "ofi_tag_1" but offering a different OFI tag distribution for
applications that may require a greater number of supported Communicators at the
expense of fewer MPI tag bits. This tag distribution offers: 24 bits for
Communicator ID (max Communicator ED 16,777,215. See mem_tag_format below), 16
bits for Source Rank (max Source Rank 65,535), 20 bits for MPI tag (max MPI tag
524,287).

"ofi_tag_full":
For executions that cannot accept trimming source rank or MPI tag, this mode sends
source rank for each message in the CQ DATA. The Source Rank is made available at
the remote process CQ (FI_CQ_FORMAT_TAGGED is used, see fi_cq(3)) at the completion
of the matching receive operation. Since the minimum size for FI_REMOTE_CQ_DATA
is 32 bits, the Source Rank fits with no limitations. The OFI tag is used for the
Communicator id (28 bits, max Communicator ID 268,435,455. See mem_tag_format below),
and the MPI tag (max MPI tag is INT_MAX). If this mode is selected by the user
and FI_REMOTE_CQ_DATA or FI_DIRECTED_RECV are not supported, the execution will abort.

mem_tag_format (fi_endpoint(3))
Some providers can reserve the higher order bits from the OFI tag for internal purposes.
This is signaled in mem_tag_format (see fi_endpoint(3)) by setting higher order bits
to zero. In such cases, the OFI MTL will reduce the number of communicator ids supported
by reducing the bits available for the communicator ID field in the OFI tag.

6 changes: 3 additions & 3 deletions ompi/mca/mtl/ofi/mtl_ofi.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013-2015 Intel, Inc. All rights reserved
* Copyright (c) 2013-2018 Intel, Inc. All rights reserved
*
* $COPYRIGHT$
*
Expand All @@ -14,8 +14,8 @@ OMPI_DECLSPEC extern mca_mtl_ofi_component_t mca_mtl_ofi_component;

mca_mtl_ofi_module_t ompi_mtl_ofi = {
{
8191, /* max cid - 2^13 - 1 */
(1UL << 30), /* max tag value - must allow negatives */
(int)((1ULL << MTL_OFI_CID_BIT_COUNT_1) - 1), /* max cid */
(int)((1ULL << (MTL_OFI_TAG_BIT_COUNT_1 - 1)) - 1) ,/* max tag value */
0, /* request reserve space */
0, /* flags */

Expand Down
145 changes: 99 additions & 46 deletions ompi/mca/mtl/ofi/mtl_ofi.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved
* Copyright (c) 2013-2018 Intel, Inc. All rights reserved
* Copyright (c) 2017 Los Alamos National Security, LLC. All rights
* reserved.
*
Expand Down Expand Up @@ -244,6 +244,7 @@ ompi_mtl_ofi_send_start(struct mca_mtl_base_module_t *mtl,
ompi_proc_t *ompi_proc = NULL;
mca_mtl_ofi_endpoint_t *endpoint = NULL;
ompi_mtl_ofi_request_t *ack_req = NULL; /* For synchronous send */
fi_addr_t src_addr = 0;

ompi_proc = ompi_comm_peer_lookup(comm, dest);
endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
Expand All @@ -255,6 +256,15 @@ ompi_mtl_ofi_send_start(struct mca_mtl_base_module_t *mtl,
ofi_req->length = length;
ofi_req->status.MPI_ERROR = OMPI_SUCCESS;

if (ompi_mtl_ofi.fi_cq_data) {
match_bits = mtl_ofi_create_send_tag_CQD(comm->c_contextid, tag);
src_addr = endpoint->peer_fiaddr;
} else {
match_bits = mtl_ofi_create_send_tag(comm->c_contextid,
comm->c_my_rank, tag);
/* src_addr is ignored when FI_DIRECTED_RECV is not supported */
}

if (OPAL_UNLIKELY(MCA_PML_BASE_SEND_SYNCHRONOUS == mode)) {
ack_req = malloc(sizeof(ompi_mtl_ofi_request_t));
assert(ack_req);
Expand All @@ -263,14 +273,15 @@ ompi_mtl_ofi_send_start(struct mca_mtl_base_module_t *mtl,
ack_req->error_callback = ompi_mtl_ofi_send_ack_error_callback;

ofi_req->completion_count = 2;
MTL_OFI_SET_SEND_BITS(match_bits, comm->c_contextid,
comm->c_my_rank, tag, MTL_OFI_SYNC_SEND);

MTL_OFI_SET_SYNC_SEND(match_bits);

MTL_OFI_RETRY_UNTIL_DONE(fi_trecv(ompi_mtl_ofi.ep,
NULL,
0,
NULL,
endpoint->peer_fiaddr,
match_bits | MTL_OFI_SYNC_SEND_ACK,
src_addr,
match_bits | ompi_mtl_ofi.sync_send_ack,
0, /* Exact match, no ignore bits */
(void *) &ack_req->ctx));
if (OPAL_UNLIKELY(0 > ret)) {
Expand All @@ -282,20 +293,30 @@ ompi_mtl_ofi_send_start(struct mca_mtl_base_module_t *mtl,
}
} else {
ofi_req->completion_count = 1;
MTL_OFI_SET_SEND_BITS(match_bits, comm->c_contextid,
comm->c_my_rank, tag, 0);
}

if (ompi_mtl_ofi.max_inject_size >= length) {
MTL_OFI_RETRY_UNTIL_DONE(fi_tinject(ompi_mtl_ofi.ep,
if (ompi_mtl_ofi.fi_cq_data) {
MTL_OFI_RETRY_UNTIL_DONE(fi_tinjectdata(ompi_mtl_ofi.ep,
start,
length,
comm->c_my_rank,
endpoint->peer_fiaddr,
match_bits));
} else {
MTL_OFI_RETRY_UNTIL_DONE(fi_tinject(ompi_mtl_ofi.ep,
start,
length,
endpoint->peer_fiaddr,
match_bits));
}

if (OPAL_UNLIKELY(0 > ret)) {
char *fi_api = ompi_mtl_ofi.fi_cq_data ? "fi_tinjectddata" : "fi_tinject";
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_tinject failed: %s(%zd)",
__FILE__, __LINE__, fi_strerror(-ret), ret);
"%s:%d: %s failed: %s(%zd)",
__FILE__, __LINE__,fi_api, fi_strerror(-ret), ret);

if (ack_req) {
fi_cancel((fid_t)ompi_mtl_ofi.ep, &ack_req->ctx);
free(ack_req);
Expand All @@ -305,17 +326,29 @@ ompi_mtl_ofi_send_start(struct mca_mtl_base_module_t *mtl,

ofi_req->event_callback(NULL,ofi_req);
} else {
MTL_OFI_RETRY_UNTIL_DONE(fi_tsend(ompi_mtl_ofi.ep,
if (ompi_mtl_ofi.fi_cq_data) {
MTL_OFI_RETRY_UNTIL_DONE(fi_tsenddata(ompi_mtl_ofi.ep,
start,
length,
NULL,
comm->c_my_rank,
endpoint->peer_fiaddr,
match_bits,
(void *) &ofi_req->ctx));
} else {
MTL_OFI_RETRY_UNTIL_DONE(fi_tsend(ompi_mtl_ofi.ep,
start,
length,
NULL,
endpoint->peer_fiaddr,
match_bits,
(void *) &ofi_req->ctx));
}
if (OPAL_UNLIKELY(0 > ret)) {
char *fi_api = ompi_mtl_ofi.fi_cq_data ? "fi_tsendddata" : "fi_send";
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: fi_tsend failed: %s(%zd)",
__FILE__, __LINE__, fi_strerror(-ret), ret);
"%s:%d: %s failed: %s(%zd)",
__FILE__, __LINE__,fi_api, fi_strerror(-ret), ret);
return ompi_mtl_ofi_get_error(ret);
}
}
Expand Down Expand Up @@ -415,7 +448,7 @@ ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
ssize_t ret;
ompi_proc_t *ompi_proc = NULL;
mca_mtl_ofi_endpoint_t *endpoint = NULL;
int src;
int src = mtl_ofi_get_source(wc);
ompi_status_public_t *status = NULL;

assert(ofi_req->super.ompi_req);
Expand All @@ -427,7 +460,7 @@ ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
*/
ofi_req->req_started = true;

status->MPI_SOURCE = MTL_OFI_GET_SOURCE(wc->tag);
status->MPI_SOURCE = src;
status->MPI_TAG = MTL_OFI_GET_TAG(wc->tag);
status->_ucount = wc->len;

Expand Down Expand Up @@ -474,7 +507,6 @@ ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
* we need to extract the source's actual address.
*/
if (ompi_mtl_ofi.any_addr == ofi_req->remote_addr) {
src = MTL_OFI_GET_SOURCE(wc->tag);
ompi_proc = ompi_comm_peer_lookup(ofi_req->comm, src);
endpoint = ompi_mtl_ofi_get_endpoint(ofi_req->mtl, ompi_proc);
ofi_req->remote_addr = endpoint->peer_fiaddr;
Expand All @@ -484,7 +516,7 @@ ompi_mtl_ofi_recv_callback(struct fi_cq_tagged_entry *wc,
0,
NULL,
ofi_req->remote_addr,
wc->tag | MTL_OFI_SYNC_SEND_ACK,
wc->tag | ompi_mtl_ofi.sync_send_ack,
(void *) &ofi_req->ctx));
if (OPAL_UNLIKELY(0 > ret)) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
Expand All @@ -510,7 +542,7 @@ ompi_mtl_ofi_recv_error_callback(struct fi_cq_err_entry *error,
assert(ofi_req->super.ompi_req);
status = &ofi_req->super.ompi_req->req_status;
status->MPI_TAG = MTL_OFI_GET_TAG(ofi_req->match_bits);
status->MPI_SOURCE = MTL_OFI_GET_SOURCE(ofi_req->match_bits);
status->MPI_SOURCE = mtl_ofi_get_source((struct fi_cq_tagged_entry *) error);

switch (error->err) {
case FI_ETRUNC:
Expand Down Expand Up @@ -538,24 +570,30 @@ ompi_mtl_ofi_irecv(struct mca_mtl_base_module_t *mtl,
int ompi_ret = OMPI_SUCCESS;
ssize_t ret;
uint64_t match_bits, mask_bits;
fi_addr_t remote_addr;
fi_addr_t remote_addr = ompi_mtl_ofi.any_addr;
ompi_proc_t *ompi_proc = NULL;
mca_mtl_ofi_endpoint_t *endpoint = NULL;
ompi_mtl_ofi_request_t *ofi_req = (ompi_mtl_ofi_request_t*) mtl_request;
void *start;
size_t length;
bool free_after;

if (MPI_ANY_SOURCE != src) {
ompi_proc = ompi_comm_peer_lookup(comm, src);
endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
remote_addr = endpoint->peer_fiaddr;

if (ompi_mtl_ofi.fi_cq_data) {
if (MPI_ANY_SOURCE != src) {
ompi_proc = ompi_comm_peer_lookup(comm, src);
endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
remote_addr = endpoint->peer_fiaddr;
}

mtl_ofi_create_recv_tag_CQD(&match_bits, &mask_bits, comm->c_contextid,
tag);
} else {
remote_addr = ompi_mtl_ofi.any_addr;
mtl_ofi_create_recv_tag(&match_bits, &mask_bits, comm->c_contextid, src,
tag);
/* src_addr is ignored when FI_DIRECTED_RECV is not used */
}

MTL_OFI_SET_RECV_BITS(match_bits, mask_bits, comm->c_contextid, src, tag);

ompi_ret = ompi_mtl_datatype_recv_buf(convertor,
&start,
&length,
Expand Down Expand Up @@ -606,7 +644,7 @@ ompi_mtl_ofi_mrecv_callback(struct fi_cq_tagged_entry *wc,
{
struct mca_mtl_request_t *mrecv_req = ofi_req->mrecv_req;
ompi_status_public_t *status = &mrecv_req->ompi_req->req_status;
status->MPI_SOURCE = MTL_OFI_GET_SOURCE(wc->tag);
status->MPI_SOURCE = mtl_ofi_get_source(wc);
status->MPI_TAG = MTL_OFI_GET_TAG(wc->tag);
status->MPI_ERROR = MPI_SUCCESS;
status->_ucount = wc->len;
Expand All @@ -628,7 +666,7 @@ ompi_mtl_ofi_mrecv_error_callback(struct fi_cq_err_entry *error,
struct mca_mtl_request_t *mrecv_req = ofi_req->mrecv_req;
ompi_status_public_t *status = &mrecv_req->ompi_req->req_status;
status->MPI_TAG = MTL_OFI_GET_TAG(ofi_req->match_bits);
status->MPI_SOURCE = MTL_OFI_GET_SOURCE(ofi_req->match_bits);
status->MPI_SOURCE = mtl_ofi_get_source((struct fi_cq_tagged_entry *) error);

switch (error->err) {
case FI_ETRUNC:
Expand Down Expand Up @@ -716,7 +754,7 @@ ompi_mtl_ofi_probe_callback(struct fi_cq_tagged_entry *wc,
{
ofi_req->match_state = 1;
ofi_req->match_bits = wc->tag;
ofi_req->status.MPI_SOURCE = MTL_OFI_GET_SOURCE(wc->tag);
ofi_req->status.MPI_SOURCE = mtl_ofi_get_source(wc);
ofi_req->status.MPI_TAG = MTL_OFI_GET_TAG(wc->tag);
ofi_req->status.MPI_ERROR = MPI_SUCCESS;
ofi_req->status._ucount = wc->len;
Expand Down Expand Up @@ -749,22 +787,28 @@ ompi_mtl_ofi_iprobe(struct mca_mtl_base_module_t *mtl,
struct ompi_mtl_ofi_request_t ofi_req;
ompi_proc_t *ompi_proc = NULL;
mca_mtl_ofi_endpoint_t *endpoint = NULL;
fi_addr_t remote_proc = 0;
fi_addr_t remote_proc = ompi_mtl_ofi.any_addr;
uint64_t match_bits, mask_bits;
ssize_t ret;
struct fi_msg_tagged msg;
uint64_t msgflags = FI_PEEK;

/**
* If the source is known, use its peer_fiaddr.
*/
if (MPI_ANY_SOURCE != src) {
ompi_proc = ompi_comm_peer_lookup( comm, src );
endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
remote_proc = endpoint->peer_fiaddr;
}
if (ompi_mtl_ofi.fi_cq_data) {
/* If the source is known, use its peer_fiaddr. */
if (MPI_ANY_SOURCE != src) {
ompi_proc = ompi_comm_peer_lookup( comm, src );
endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
remote_proc = endpoint->peer_fiaddr;
}

MTL_OFI_SET_RECV_BITS(match_bits, mask_bits, comm->c_contextid, src, tag);
mtl_ofi_create_recv_tag_CQD(&match_bits, &mask_bits, comm->c_contextid,
tag);
}
else {
mtl_ofi_create_recv_tag(&match_bits, &mask_bits, comm->c_contextid, src,
tag);
/* src_addr is ignored when FI_DIRECTED_RECV is not used */
}

/**
* fi_trecvmsg with FI_PEEK:
Expand Down Expand Up @@ -829,7 +873,7 @@ ompi_mtl_ofi_improbe(struct mca_mtl_base_module_t *mtl,
struct ompi_mtl_ofi_request_t *ofi_req;
ompi_proc_t *ompi_proc = NULL;
mca_mtl_ofi_endpoint_t *endpoint = NULL;
fi_addr_t remote_proc = 0;
fi_addr_t remote_proc = ompi_mtl_ofi.any_addr;
uint64_t match_bits, mask_bits;
ssize_t ret;
struct fi_msg_tagged msg;
Expand All @@ -843,13 +887,22 @@ ompi_mtl_ofi_improbe(struct mca_mtl_base_module_t *mtl,
/**
* If the source is known, use its peer_fiaddr.
*/
if (MPI_ANY_SOURCE != src) {
ompi_proc = ompi_comm_peer_lookup( comm, src );
endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
remote_proc = endpoint->peer_fiaddr;
}

MTL_OFI_SET_RECV_BITS(match_bits, mask_bits, comm->c_contextid, src, tag);
if (ompi_mtl_ofi.fi_cq_data) {
if (MPI_ANY_SOURCE != src) {
ompi_proc = ompi_comm_peer_lookup( comm, src );
endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc);
remote_proc = endpoint->peer_fiaddr;
}

mtl_ofi_create_recv_tag_CQD(&match_bits, &mask_bits, comm->c_contextid,
tag);
}
else {
/* src_addr is ignored when FI_DIRECTED_RECV is not used */
mtl_ofi_create_recv_tag(&match_bits, &mask_bits, comm->c_contextid, src,
tag);
}

/**
* fi_trecvmsg with FI_PEEK and FI_CLAIM:
Expand Down
Loading