diff --git a/fabtests/functional/rdm_tagged_peek.c b/fabtests/functional/rdm_tagged_peek.c index c5b4375c759..c2f727b2485 100644 --- a/fabtests/functional/rdm_tagged_peek.c +++ b/fabtests/functional/rdm_tagged_peek.c @@ -38,6 +38,10 @@ #include + +#define BASE_TAG 0x900d +#define SEND_CNT 8 + static struct fi_context fi_context; static int wait_for_send_comp(int count) @@ -103,91 +107,177 @@ static int trecv_op(uint64_t tag, uint64_t flags, bool ignore_nomsg) return ret; } -static int run(void) +static int test_bad(void) { - int i, ret; + int ret; - ret = ft_init_fabric(); - if (ret) + printf("Peek for a bad msg\n"); + ret = trecv_op(0xbad, FI_PEEK, false); + if (ret != -FI_ENOMSG) { + FT_PRINTERR("FI_PEEK - bad msg", ret); return ret; + } - if (opts.dst_addr) { - printf("Searching for a bad msg\n"); - ret = trecv_op(0xbad, FI_PEEK, false); - if (ret != -FI_ENOMSG) { - FT_PRINTERR("FI_PEEK", ret); - return ret; - } + printf("Peek w/ claim for a bad msg\n"); + ret = trecv_op(0xbad, FI_PEEK | FI_CLAIM, false); + if (ret != -FI_ENOMSG) { + FT_PRINTERR("FI_PEEK - claim bad msg", ret); + return ret; + } - printf("Searching for a bad msg with claim\n"); - ret = trecv_op(0xbad, FI_PEEK | FI_CLAIM, false); - if (ret != -FI_ENOMSG) { - FT_PRINTERR("FI_PEEK", ret); - return ret; - } + return 0; +} - printf("Searching for first msg\n"); - ret = trecv_op(0x900d, FI_PEEK, true); - if (ret != 1) { - FT_PRINTERR("FI_PEEK", ret); - return ret; - } +static int test_peek(void) +{ + int ret; - printf("Receiving first msg\n"); - ret = trecv_op(0x900d, 0, false); - if (ret != 1) { - FT_PRINTERR("Receive after peek", ret); - return ret; - } + printf("Peek msg 1\n"); + ret = trecv_op(BASE_TAG + 1, FI_PEEK, true); + if (ret != 1) { + FT_PRINTERR("FI_PEEK", ret); + return ret; + } - printf("Searching for second msg to claim\n"); - ret = trecv_op(0x900d + 1, FI_PEEK | FI_CLAIM, true); - if (ret != 1) { - FT_PRINTERR("FI_PEEK | FI_CLAIM", ret); - return ret; - } + printf("Receive msg 1\n"); + ret = trecv_op(BASE_TAG + 1, 0, false); + if (ret != 1) { + FT_PRINTERR("Receive after peek", ret); + return ret; + } - printf("Receiving second msg\n"); - ret = trecv_op(0x900d + 1, FI_CLAIM, false); - if (ret != 1) { - FT_PRINTERR("FI_CLAIM", ret); - return ret; - } + return 0; +} - printf("Searching for third msg to peek and discard\n"); - ret = trecv_op(0x900d + 2, FI_PEEK | FI_DISCARD, true); - if (ret != 1) { - FT_PRINTERR("FI_PEEK | FI_DISCARD", ret); - return ret; - } +static int test_claim(void) +{ + int ret; - printf("Checking to see if third msg was discarded\n"); - ret = trecv_op(0x900d + 2, FI_PEEK, false); - if (ret != -FI_ENOMSG) { - FT_PRINTERR("FI_PEEK", ret); - return ret; - } + printf("Peek w/ claim msg 2\n"); + ret = trecv_op(BASE_TAG + 2, FI_PEEK | FI_CLAIM, true); + if (ret != 1) { + FT_PRINTERR("FI_PEEK | FI_CLAIM", ret); + return ret; + } + + printf("Receive claimed msg 2\n"); + ret = trecv_op(BASE_TAG + 2, FI_CLAIM, false); + if (ret != 1) { + FT_PRINTERR("FI_CLAIM", ret); + return ret; + } + + return 0; +} + +static int test_discard(void) +{ + int ret; + + printf("Peek & discard msg 3\n"); + ret = trecv_op(BASE_TAG + 3, FI_PEEK | FI_DISCARD, true); + if (ret != 1) { + FT_PRINTERR("FI_PEEK | FI_DISCARD", ret); + return ret; + } + + printf("Checking to see if msg 3 was discarded\n"); + ret = trecv_op(BASE_TAG + 3, FI_PEEK, false); + if (ret != -FI_ENOMSG) { + FT_PRINTERR("FI_PEEK", ret); + return ret; + } - printf("Searching for fourth msg to claim and discard\n"); - ret = trecv_op(0x900d + 3, FI_PEEK | FI_CLAIM, true); + printf("Peek w/ claim msg 4\n"); + ret = trecv_op(BASE_TAG + 4, FI_PEEK | FI_CLAIM, true); + if (ret != 1) { + FT_PRINTERR("FI_DISCARD", ret); + return ret; + } + + printf("Claim and discard msg 4\n"); + ret = trecv_op(BASE_TAG + 4, FI_CLAIM | FI_DISCARD, false); + if (ret != 1) { + FT_PRINTERR("FI_CLAIM", ret); + return ret; + } + + return 0; +} + +static int test_ooo(void) +{ + int i, ret; + + for (i = SEND_CNT; i >= 5; i--) { + printf("Receive msg %d\n", i); + ret = trecv_op(BASE_TAG + i, 0, false); if (ret != 1) { - FT_PRINTERR("FI_DISCARD", ret); + FT_PRINTERR("trecv", ret); return ret; } + } - printf("Discarding fourth msg\n"); - ret = trecv_op(0x900d + 3, FI_CLAIM | FI_DISCARD, false); - if (ret != 1) { - FT_PRINTERR("FI_CLAIM", ret); + return 0; +} + +static int do_recvs(void) +{ + int ret; + + ret = test_bad(); + if (ret) + return ret; + + ret = test_peek(); + if (ret) + return ret; + + ret = test_claim(); + if (ret) + return ret; + + ret = test_discard(); + if (ret) + return ret; + + ret = test_ooo(); + if (ret) + return ret; + + return 0; +} + +static int do_sends(void) +{ + int i, ret; + + printf("Sending %d tagged messages\n", SEND_CNT); + for(i = 1; i <= SEND_CNT; i++) { + ret = fi_tsend(ep, tx_buf, tx_size, mr_desc, + remote_fi_addr, BASE_TAG + i, + &tx_ctx_arr[i].context); + if (ret) return ret; - } + } - printf("Retrieving fifth message\n"); - ret = trecv_op(0x900d + 4, 0, false); - if (ret != 1) { - FT_PRINTERR("Receive after peek", ret); + printf("Waiting for messages to complete\n"); + ret = wait_for_send_comp(SEND_CNT); + return ret; +} + +static int run(void) +{ + int ret; + + ret = ft_init_fabric(); + if (ret) + return ret; + + if (opts.dst_addr) { + ret = do_recvs(); + if (ret) return ret; - } /* sync with sender before ft_finalize, since we sent * and received messages outside of the sequence numbers @@ -202,16 +292,7 @@ static int run(void) if (ret) return ret; } else { - printf("Sending five tagged messages\n"); - for(i = 0; i < 5; i++) { - ret = fi_tsend(ep, tx_buf, tx_size, mr_desc, - remote_fi_addr, 0x900d+i, - &tx_ctx_arr[i].context); - if (ret) - return ret; - } - printf("Waiting for messages to complete\n"); - ret = wait_for_send_comp(5); + ret = do_sends(); if (ret) return ret; @@ -232,8 +313,9 @@ int main(int argc, char **argv) opts = INIT_OPTS; opts.options |= FT_OPT_SIZE; + opts.transfer_size = 64; /* Don't expect receiver buffering */ opts.comp_method = FT_COMP_SREAD; - opts.window_size = 5; + opts.window_size = SEND_CNT; hints = fi_allocinfo(); if (!hints) { diff --git a/include/ofi_mem.h b/include/ofi_mem.h index a3e8155dfb4..c36bcc982d9 100644 --- a/include/ofi_mem.h +++ b/include/ofi_mem.h @@ -39,6 +39,7 @@ #include #include #include +#include #include #include #include @@ -97,6 +98,44 @@ static inline int ofi_str_dup(const char *src, char **dst) return 0; } +static inline void add_char(char **buf, uint8_t c, uint8_t space) { + if ( space ) { + sprintf(*buf, " "); + *buf += 1; + } + sprintf (*buf, "%02x", c); + *buf += 2; +} + +static inline void ofi_mem_dump(const struct fi_provider *prov, + enum fi_log_level level, enum fi_log_subsys subsys, const char *func, int line, + const void *src, size_t size) +{ + unsigned int width = 32; + char temp[128] = {0}; + char *offset = temp; + const char *orig = src; + unsigned int i; + for (i = 0; i <= size; i++) { + if ((i % width) == 0 || i == size) { + if (i != 0) { + while ((i % width) != 0) { + add_char(&offset, 0, (i % 8) == 0); + i++; + } + snprintf(offset++, width, " %s", &orig[i - width]); + fi_log(prov, level, subsys, func, line, "%s\n", temp); + } + offset = &temp[i % width]; + sprintf(offset, "%08x ", i); + offset += 9; + } + add_char(&offset, orig[i], (i % 8) == 0); + } +} + +#define FI_LOG_BUF(prov, level, subsys, buf, len) ofi_mem_dump(prov, level, subsys, __func__, __LINE__, buf, len) + /* Dynamic array -- see ofi_indexer.h */ /* diff --git a/man/fi_peer.3.md b/man/fi_peer.3.md index 46618e965b3..e43cd8e8008 100644 --- a/man/fi_peer.3.md +++ b/man/fi_peer.3.md @@ -10,9 +10,18 @@ tagline: Libfabric Programmer's Manual fi_export_fid / fi_import_fid : Share a fabric object between different providers or resources +struct fid_peer_av +: An address vector sharable between independent providers + +struct fid_peer_av_set +: An AV set sharable between independent providers + struct fid_peer_cq : A completion queue that may be shared between independent providers +struct fid_peer_srx +: A shared receive context that may be shared between independent providers + # SYNOPSIS ```c @@ -41,6 +50,11 @@ int fi_import_fid(struct fid *fid, struct fid *expfid, uint64_t flags); # DESCRIPTION +NOTICE: The peer APIs describe by this man page are developmental and may +change between libfabric versions. The data structures and API definitions +should not be considered stable between versions. Providers being used +as peers must target the same libfabric version. + Functions defined in this man page are typically used by providers to communicate with other providers, known as peer providers, or by other libraries to communicate with the libfabric core, known as peer libraries. diff --git a/prov/net/src/xnet.h b/prov/net/src/xnet.h index df7d31c8492..0dbf03cc918 100644 --- a/prov/net/src/xnet.h +++ b/prov/net/src/xnet.h @@ -88,6 +88,7 @@ extern size_t xnet_zerocopy_size; extern int xnet_trace_msg; extern int xnet_disable_autoprog; extern int xnet_io_uring; +extern int xnet_max_saved; struct xnet_xfer_entry; struct xnet_ep; @@ -138,12 +139,13 @@ void xnet_connect_done(struct xnet_ep *ep); void xnet_req_done(struct xnet_ep *ep); int xnet_send_cm_msg(struct xnet_ep *ep); +/* Inject buffer space is included */ union xnet_hdrs { struct xnet_base_hdr base_hdr; struct xnet_cq_data_hdr cq_data_hdr; struct xnet_tag_data_hdr tag_data_hdr; struct xnet_tag_hdr tag_hdr; - uint8_t max_hdr[XNET_MAX_HDR]; + uint8_t max_hdr[XNET_MAX_HDR + XNET_MAX_INJECT]; }; struct xnet_active_rx { @@ -191,7 +193,6 @@ struct xnet_ep { struct xnet_active_tx cur_tx; OFI_DBG_VAR(uint8_t, tx_id) OFI_DBG_VAR(uint8_t, rx_id) - struct xnet_active_rx saved_rx; struct dlist_entry unexp_entry; struct dlist_entry saved_entry; @@ -201,6 +202,8 @@ struct xnet_ep { struct slist need_ack_queue; struct slist async_queue; struct slist rma_read_queue; + struct slist saved_queue; + int saved_cnt; int rx_avail; struct xnet_srx *srx; @@ -362,6 +365,8 @@ static inline void xnet_signal_progress(struct xnet_progress *progress) #define XNET_ASYNC BIT(5) #define XNET_INJECT_OP BIT(6) #define XNET_FREE_BUF BIT(7) +#define XNET_SAVED_XFER BIT(8) +#define XNET_COPY_RECV BIT(9) #define XNET_MULTI_RECV FI_MULTI_RECV /* BIT(16) */ struct xnet_xfer_entry { @@ -612,14 +617,9 @@ static inline bool xnet_has_unexp(struct xnet_ep *ep) return ep->cur_rx.handler && !ep->cur_rx.entry; } -static inline bool xnet_has_saved_rx(struct xnet_ep *ep) -{ - assert(xnet_progress_locked(xnet_ep2_progress(ep))); - return ep->saved_rx.hdr_done != 0; -} - -void xnet_complete_saved(struct xnet_ep *ep, struct xnet_xfer_entry *rx_entry); -void xnet_clear_saved_rx(struct xnet_ep *ep); +void xnet_recv_saved(struct xnet_xfer_entry *saved_entry, + struct xnet_xfer_entry *rx_entry); +void xnet_complete_saved(struct xnet_xfer_entry *saved_entry); #define XNET_WARN_ERR(subsystem, log_str, err) \ FI_WARN(&xnet_prov, subsystem, log_str "%s (%d)\n", \ diff --git a/prov/net/src/xnet_cq.c b/prov/net/src/xnet_cq.c index 86b63576c31..25bbe51269f 100644 --- a/prov/net/src/xnet_cq.c +++ b/prov/net/src/xnet_cq.c @@ -132,9 +132,14 @@ void xnet_report_success(struct xnet_ep *ep, struct util_cq *cq, size_t len; if (!(xfer_entry->cq_flags & FI_COMPLETION) || - (xfer_entry->ctrl_flags & XNET_INTERNAL_XFER)) + (xfer_entry->ctrl_flags & (XNET_INTERNAL_XFER | XNET_SAVED_XFER))) return; + if (xfer_entry->ctrl_flags & XNET_COPY_RECV) { + xnet_complete_saved(xfer_entry); + return; + } + flags = xfer_entry->cq_flags & ~FI_COMPLETION; if (flags & FI_RECV) { len = xfer_entry->hdr.base_hdr.size - @@ -169,13 +174,16 @@ void xnet_cq_report_error(struct util_cq *cq, { struct fi_cq_err_entry err_entry; - if (xfer_entry->ctrl_flags & (XNET_INTERNAL_XFER | XNET_INJECT_OP)) { - if (xfer_entry->ctrl_flags & XNET_INTERNAL_XFER) - FI_WARN(&xnet_prov, FI_LOG_CQ, "internal transfer " + if (xfer_entry->ctrl_flags & + (XNET_INTERNAL_XFER | XNET_SAVED_XFER | XNET_INJECT_OP)) { + if (xfer_entry->ctrl_flags & + (XNET_INTERNAL_XFER | XNET_SAVED_XFER)) { + FI_WARN(&xnet_prov, FI_LOG_CQ, "internal/saved transfer " "failed (%s)\n", fi_strerror(err)); - else + } else { FI_WARN(&xnet_prov, FI_LOG_CQ, "inject transfer " "failed (%s)\n", fi_strerror(err)); + } return; } diff --git a/prov/net/src/xnet_ep.c b/prov/net/src/xnet_ep.c index 9f2e607b80e..4ae5c645d41 100644 --- a/prov/net/src/xnet_ep.c +++ b/prov/net/src/xnet_ep.c @@ -338,6 +338,8 @@ static void xnet_ep_flush_all_queues(struct xnet_ep *ep) xnet_ep_flush_queue(ep, &ep->rma_read_queue, cq); xnet_ep_flush_queue(ep, &ep->need_ack_queue, cq); xnet_ep_flush_queue(ep, &ep->async_queue, cq); + xnet_ep_flush_queue(ep, &ep->saved_queue, cq); + ep->saved_cnt = 0; cq = container_of(ep->util_ep.rx_cq, struct xnet_cq, util_cq); if (ep->cur_rx.entry) { @@ -367,9 +369,8 @@ void xnet_ep_disable(struct xnet_ep *ep, int cm_err, void* err_data, return; }; - if (xnet_has_saved_rx(ep)) - xnet_clear_saved_rx(ep); dlist_remove_init(&ep->unexp_entry); + dlist_remove_init(&ep->saved_entry); xnet_halt_sock(xnet_ep2_progress(ep), ep->bsock.sock); ret = ofi_shutdown(ep->bsock.sock, SHUT_RDWR); @@ -736,12 +737,14 @@ int xnet_endpoint(struct fid_domain *domain, struct fi_info *info, } dlist_init(&ep->unexp_entry); + dlist_init(&ep->saved_entry); slist_init(&ep->rx_queue); slist_init(&ep->tx_queue); slist_init(&ep->priority_queue); slist_init(&ep->rma_read_queue); slist_init(&ep->need_ack_queue); slist_init(&ep->async_queue); + slist_init(&ep->saved_queue); if (info->ep_attr->rx_ctx_cnt != FI_SHARED_CONTEXT) ep->rx_avail = (int) info->rx_attr->size; diff --git a/prov/net/src/xnet_init.c b/prov/net/src/xnet_init.c index 6dea6da2003..5b0f58fff88 100644 --- a/prov/net/src/xnet_init.c +++ b/prov/net/src/xnet_init.c @@ -67,6 +67,7 @@ size_t xnet_zerocopy_size = SIZE_MAX; int xnet_trace_msg; int xnet_disable_autoprog; int xnet_io_uring; +int xnet_max_saved = 4; static void xnet_init_env(void) @@ -115,8 +116,18 @@ static void xnet_init_env(void) if (!fi_param_get_size_t(&xnet_prov, "rx_size", &rx_size)) xnet_default_rx_size = rx_size; + fi_param_define(&xnet_prov, "max_saved", FI_PARAM_INT, + "maximum number of received messages that do not " + "have a posted application buffer that will be " + "queued by the provider. A larger value increases " + "memory and processing overhead, negatively " + "impacting performance, but may be required by some " + "applications to prevent hangs. (default: %d)", + xnet_max_saved); + fi_param_get_int(&xnet_prov, "max_saved", &xnet_max_saved); fi_param_define(&xnet_prov, "nodelay", FI_PARAM_BOOL, - "overrides default TCP_NODELAY socket setting"); + "overrides default TCP_NODELAY socket setting " + "(default %d)", xnet_nodelay); fi_param_get_bool(&xnet_prov, "nodelay", &xnet_nodelay); fi_param_define(&xnet_prov, "staging_sbuf_size", FI_PARAM_INT, diff --git a/prov/net/src/xnet_progress.c b/prov/net/src/xnet_progress.c index bfea8646105..cb6e93ef35f 100644 --- a/prov/net/src/xnet_progress.c +++ b/prov/net/src/xnet_progress.c @@ -77,40 +77,146 @@ static void xnet_submit_uring(struct xnet_uring *uring) assert(ready == submitted); } -void xnet_clear_saved_rx(struct xnet_ep *ep) -{ - assert(xnet_progress_locked(xnet_ep2_progress(ep))); - assert(!dlist_empty(&xnet_ep2_progress(ep)->saved_tag_list)); - dlist_remove(&ep->saved_entry); - ep->saved_rx.hdr_done = 0; -} - -/* MPI uses 0-byte transfers for barrier, which is the problem - * message that we need to defer. Deferring larger messages - * requires buffering the message data as well, but we currently - * only reserve space to save and restore the header. - */ static bool xnet_save_and_cont(struct xnet_ep *ep) { assert(xnet_progress_locked(xnet_ep2_progress(ep))); assert(ep->cur_rx.hdr.base_hdr.op == ofi_op_tagged); - return (ep->cur_rx.data_left == 0) && !xnet_has_saved_rx(ep); + + return (ep->saved_cnt < xnet_max_saved) && + (ep->cur_rx.data_left <= XNET_MAX_INJECT); } -static void xnet_save_rx(struct xnet_ep *ep) +static struct xnet_xfer_entry * +xnet_get_save_rx(struct xnet_ep *ep, uint64_t tag) { struct xnet_progress *progress; + struct xnet_xfer_entry *rx_entry; progress = xnet_ep2_progress(ep); assert(xnet_progress_locked(progress)); assert(xnet_save_and_cont(ep)); assert(ep->cur_rx.hdr_done == ep->cur_rx.hdr_len && !ep->cur_rx.claim_ctx); - assert(dlist_empty(&ep->unexp_entry)); - ep->saved_rx = ep->cur_rx; - xnet_reset_rx(ep); - dlist_insert_tail(&ep->saved_entry, &progress->saved_tag_list); + FI_DBG(&xnet_prov, FI_LOG_EP_DATA, "Saving msg tag 0x%zx src %zu\n", + tag, ep->peer->fi_addr); + rx_entry = xnet_alloc_xfer(xnet_srx2_progress(ep->srx)); + if (!rx_entry) + return NULL; + + rx_entry->ctrl_flags = XNET_SAVED_XFER; + rx_entry->tag = tag; + rx_entry->ignore = 0; + rx_entry->src_addr = ep->peer->fi_addr; + rx_entry->context = NULL; + rx_entry->user_buf = NULL; + rx_entry->iov_cnt = 1; + rx_entry->iov[0].iov_base = &rx_entry->hdr.max_hdr[XNET_MAX_HDR]; + rx_entry->iov[0].iov_len = XNET_MAX_INJECT; + + slist_insert_tail(&rx_entry->entry, &ep->saved_queue); + if (!ep->saved_cnt++) { + assert(dlist_empty(&ep->saved_entry)); + dlist_insert_tail(&ep->saved_entry, + &progress->saved_tag_list); + } + + return rx_entry; +} + +void xnet_complete_saved(struct xnet_xfer_entry *saved_entry) +{ + struct xnet_ep *ep; + struct xnet_progress *progress; + size_t msg_len, copied; + + ep = saved_entry->ep; + progress = xnet_ep2_progress(ep); + assert(xnet_progress_locked(progress)); + + msg_len = (saved_entry->hdr.base_hdr.size - + saved_entry->hdr.base_hdr.hdr_size); + FI_DBG(&xnet_prov, FI_LOG_EP_DATA, "Completing saved msg " + "tag 0x%zx src %zu size %zu\n", saved_entry->tag, + saved_entry->src_addr, msg_len); + + if (msg_len) { + copied = ofi_copy_iov_buf(saved_entry->iov, + saved_entry->iov_cnt, 0, + &saved_entry->hdr.max_hdr[XNET_MAX_HDR], + msg_len, OFI_COPY_BUF_TO_IOV); + FI_LOG_BUF(&xnet_prov, FI_LOG_DEBUG, FI_LOG_EP_DATA, + &saved_entry->hdr.max_hdr[XNET_MAX_HDR], msg_len); + } else { + copied = 0; + } + + if (copied == msg_len) { + ep->report_success(ep, ep->util_ep.rx_cq, saved_entry); + } else { + FI_WARN(&xnet_prov, FI_LOG_EP_DATA, "saved recv truncated\n"); + xnet_cntr_incerr(ep, saved_entry); + xnet_cq_report_error(ep->util_ep.rx_cq, saved_entry, FI_ETRUNC); + } + xnet_free_xfer(progress, saved_entry); +} + +void xnet_recv_saved(struct xnet_xfer_entry *saved_entry, + struct xnet_xfer_entry *rx_entry) +{ + struct xnet_ep *ep; + struct xnet_progress *progress; + size_t msg_len, done_len; + int ret; + + ep = saved_entry->ep; + progress = xnet_ep2_progress(ep); + assert(xnet_progress_locked(progress)); + FI_DBG(&xnet_prov, FI_LOG_EP_DATA, "recv matched saved msg " + "tag 0x%zx src %zu\n", saved_entry->tag, saved_entry->src_addr); + + saved_entry->ctrl_flags &= ~XNET_SAVED_XFER; + saved_entry->context = rx_entry->context; + saved_entry->user_buf = rx_entry->user_buf; + + if (rx_entry->iov_cnt) { + memcpy(&saved_entry->iov[0], &rx_entry->iov[0], + rx_entry->iov_cnt * sizeof(rx_entry->iov[0])); + saved_entry->iov_cnt = rx_entry->iov_cnt; + } + + if (saved_entry != ep->cur_rx.entry) { + xnet_complete_saved(saved_entry); + /* TODO: need io_uring async recv posted check + } else if (async recv posted using io_uring) { + saved_entry->ctrl_flags |= XNET_COPY_RECV; + */ + } else { + FI_DBG(&xnet_prov, FI_LOG_EP_DATA, "saved msg still active " + "needs %zu bytes\n", ep->cur_rx.data_left); + + msg_len = (saved_entry->hdr.base_hdr.size - + saved_entry->hdr.base_hdr.hdr_size); + done_len = msg_len - ep->cur_rx.data_left; + assert(msg_len && ep->cur_rx.data_left); + + ret = ofi_truncate_iov(&saved_entry->iov[0], + &saved_entry->iov_cnt, msg_len); + if (ret) { + /* truncation failure */ + saved_entry->iov_cnt = 0; + xnet_complete_saved(saved_entry); + } else { + (void) ofi_copy_iov_buf(saved_entry->iov, + saved_entry->iov_cnt, 0, + &saved_entry->hdr.max_hdr[XNET_MAX_HDR], + done_len, OFI_COPY_BUF_TO_IOV); + ofi_consume_iov(&saved_entry->iov[0], + &saved_entry->iov_cnt, done_len); + } + } + + xnet_free_xfer(progress, rx_entry); } void xnet_update_pollflag(struct xnet_ep *ep, short pollflag, bool set) @@ -141,10 +247,19 @@ static ssize_t xnet_send_msg(struct xnet_ep *ep) struct xnet_xfer_entry *tx_entry; ssize_t ret; size_t len; + size_t msg_len; assert(xnet_progress_locked(xnet_ep2_progress(ep))); assert(ep->cur_tx.entry); tx_entry = ep->cur_tx.entry; + msg_len = (tx_entry->hdr.base_hdr.size - + tx_entry->hdr.base_hdr.hdr_size); + + FI_DBG(&xnet_prov, FI_LOG_EP_DATA, "Sending msg " + "tag 0x%zx src %zu size %zu\n", tx_entry->hdr.tag_hdr.tag, + tx_entry->src_addr, msg_len); + FI_LOG_BUF(&xnet_prov, FI_LOG_DEBUG, FI_LOG_EP_DATA, + tx_entry->iov[0].iov_base, tx_entry->iov[0].iov_len); ret = ofi_bsock_sendv(&ep->bsock, tx_entry->iov, tx_entry->iov_cnt, &len); if (ret < 0 && ret != -OFI_EINPROGRESS_ASYNC) @@ -302,10 +417,12 @@ static int xnet_queue_ack(struct xnet_xfer_entry *rx_entry) static ssize_t xnet_process_recv(struct xnet_ep *ep) { + struct xnet_progress *progress; struct xnet_xfer_entry *rx_entry; ssize_t ret; - assert(xnet_progress_locked(xnet_ep2_progress(ep))); + progress = xnet_ep2_progress(ep); + assert(xnet_progress_locked(progress)); rx_entry = ep->cur_rx.entry; ret = xnet_recv_msg_data(ep); if (ret) { @@ -321,8 +438,10 @@ static ssize_t xnet_process_recv(struct xnet_ep *ep) goto err; } - ep->report_success(ep, ep->util_ep.rx_cq, rx_entry); - xnet_free_xfer(xnet_ep2_progress(ep), rx_entry); + if (!(rx_entry->ctrl_flags & XNET_SAVED_XFER)) { + ep->report_success(ep, ep->util_ep.rx_cq, rx_entry); + xnet_free_xfer(progress, rx_entry); + } xnet_reset_rx(ep); return 0; @@ -331,40 +450,11 @@ static ssize_t xnet_process_recv(struct xnet_ep *ep) "msg recv failed ret = %zd (%s)\n", ret, fi_strerror((int)-ret)); xnet_cntr_incerr(ep, rx_entry); xnet_cq_report_error(rx_entry->ep->util_ep.rx_cq, rx_entry, (int) -ret); - xnet_free_xfer(xnet_ep2_progress(ep), rx_entry); + xnet_free_xfer(progress, rx_entry); xnet_reset_rx(ep); return ret; } -void xnet_complete_saved(struct xnet_ep *ep, struct xnet_xfer_entry *rx_entry) -{ - ssize_t ret; - - assert(xnet_progress_locked(xnet_ep2_progress(ep))); - assert(xnet_has_saved_rx(ep)); - - rx_entry->cq_flags |= xnet_rx_completion_flag(ep); - rx_entry->ep = ep; - - if (rx_entry->hdr.base_hdr.flags & XNET_DELIVERY_COMPLETE) { - ret = xnet_queue_ack(rx_entry); - if (ret) { - FI_WARN(&xnet_prov, FI_LOG_EP_DATA, - "msg recv failed ret = %zd (%s)\n", - ret, fi_strerror((int)-ret)); - xnet_cntr_incerr(ep, rx_entry); - xnet_cq_report_error(rx_entry->ep->util_ep.rx_cq, - rx_entry, (int) -ret); - goto out; - } - } - - ep->report_success(ep, ep->util_ep.rx_cq, rx_entry); -out: - xnet_free_xfer(xnet_ep2_progress(ep), rx_entry); - xnet_clear_saved_rx(ep); -} - static void xnet_pmem_commit(struct xnet_xfer_entry *rx_entry) { struct ofi_rma_iov *rma_iov; @@ -627,8 +717,9 @@ static ssize_t xnet_op_tagged(struct xnet_ep *ep) rx_entry = ep->srx->match_tag_rx(ep->srx, ep, tag); if (!rx_entry) { if (xnet_save_and_cont(ep)) { - xnet_save_rx(ep); - return FI_SUCCESS; + rx_entry = xnet_get_save_rx(ep, tag); + if (rx_entry) + goto start; } if (dlist_empty(&ep->unexp_entry)) { dlist_insert_tail(&ep->unexp_entry, @@ -638,6 +729,7 @@ static ssize_t xnet_op_tagged(struct xnet_ep *ep) return -FI_EAGAIN; } +start: return xnet_start_recv(ep, rx_entry); } diff --git a/prov/net/src/xnet_srx.c b/prov/net/src/xnet_srx.c index 2e5a745a0e5..f04b6ea9f88 100644 --- a/prov/net/src/xnet_srx.c +++ b/prov/net/src/xnet_srx.c @@ -212,16 +212,58 @@ static int xnet_match_unexp(struct dlist_entry *item, const void *arg) return xnet_match_msg(ep->cur_rx.claim_ctx, &ep->cur_rx.hdr, arg); } -static int xnet_match_saved(struct dlist_entry *item, const void *arg) +static struct xnet_xfer_entry * +xnet_match_saved(struct xnet_ep *ep, struct xnet_xfer_entry *rx_entry, + bool remove) +{ + struct xnet_xfer_entry *saved_entry; + struct slist_entry *item, *prev; + + assert(xnet_progress_locked(xnet_ep2_progress(ep))); + assert(ep->saved_cnt); + + slist_foreach(&ep->saved_queue, item, prev) { + saved_entry = container_of(item, struct xnet_xfer_entry, entry); + if (xnet_match_msg(saved_entry->context, &saved_entry->hdr, + rx_entry)) { + if (remove) { + slist_remove(&ep->saved_queue, item, prev); + if (!--ep->saved_cnt) { + assert(!dlist_empty(&ep->saved_entry)); + dlist_remove_init(&ep->saved_entry); + } + } + return saved_entry; + } + } + return NULL; +} + +static struct xnet_xfer_entry * +xnet_search_saved(struct xnet_progress *progress, + struct xnet_xfer_entry *rx_entry, bool remove) { + struct xnet_xfer_entry *saved_entry; + struct dlist_entry *item; struct xnet_ep *ep; - ep = container_of(item, struct xnet_ep, unexp_entry); - return xnet_match_msg(ep->saved_rx.claim_ctx, &ep->saved_rx.hdr, arg); + + assert(ofi_genlock_held(progress->active_lock)); + dlist_foreach(&progress->saved_tag_list, item) { + ep = container_of(item, struct xnet_ep, saved_entry); + assert(ep->saved_cnt); + assert(ep->state == XNET_CONNECTED); + + saved_entry = xnet_match_saved(ep, rx_entry, remove); + if (saved_entry) + return saved_entry; + + } + return NULL; } static struct xnet_ep * xnet_find_msg(struct xnet_srx *srx, struct xnet_xfer_entry *recv_entry, - struct xnet_active_rx **rx) + struct xnet_xfer_entry **saved_entry, bool remove) { struct xnet_progress *progress; struct dlist_entry *entry; @@ -233,13 +275,9 @@ xnet_find_msg(struct xnet_srx *srx, struct xnet_xfer_entry *recv_entry, if ((srx->match_tag_rx == xnet_match_tag) || (recv_entry->src_addr == FI_ADDR_UNSPEC)) { - entry = dlist_find_first_match(&progress->saved_tag_list, - xnet_match_saved, recv_entry); - if (entry) { - ep = container_of(entry, struct xnet_ep, saved_entry); - *rx = &ep->saved_rx; - return ep; - } + *saved_entry = xnet_search_saved(progress, recv_entry, remove); + if (*saved_entry) + return (*saved_entry)->ep; entry = dlist_find_first_match(&progress->unexp_tag_list, xnet_match_unexp, recv_entry); @@ -248,6 +286,7 @@ xnet_find_msg(struct xnet_srx *srx, struct xnet_xfer_entry *recv_entry, ep = container_of(entry, struct xnet_ep, unexp_entry); } else { + *saved_entry = NULL; queue = ofi_array_at(&srx->src_tag_queues, recv_entry->src_addr); if (!queue) return NULL; @@ -256,19 +295,19 @@ xnet_find_msg(struct xnet_srx *srx, struct xnet_xfer_entry *recv_entry, if (!ep) return NULL; - if (xnet_has_saved_rx(ep) && - xnet_match_msg(ep->saved_rx.claim_ctx, &ep->saved_rx.hdr, recv_entry)) { - *rx = &ep->saved_rx; - return ep; + if (ep->saved_cnt) { + *saved_entry = xnet_match_saved(ep, recv_entry, remove); + if (*saved_entry) + return (*saved_entry)->ep; } if (!xnet_has_unexp(ep) || - !xnet_match_msg(ep->cur_rx.claim_ctx, &ep->cur_rx.hdr, recv_entry)) + !xnet_match_msg(ep->cur_rx.claim_ctx, + &ep->cur_rx.hdr, recv_entry)) return NULL; } assert(!dlist_empty(&ep->unexp_entry)); - *rx = &ep->cur_rx; return ep; } @@ -277,7 +316,8 @@ xnet_srx_claim(struct xnet_srx *srx, struct xnet_xfer_entry *recv_entry, uint64_t flags) { struct xnet_ep *ep; - struct xnet_active_rx *rx; + struct xnet_xfer_entry *saved_entry; + union xnet_hdrs *hdr; ssize_t ret; size_t msg_len; @@ -285,12 +325,13 @@ xnet_srx_claim(struct xnet_srx *srx, struct xnet_xfer_entry *recv_entry, assert(srx->rdm); recv_entry->tag |= XNET_CLAIM_TAG_BIT; - ep = xnet_find_msg(srx, recv_entry, &rx); + ep = xnet_find_msg(srx, recv_entry, &saved_entry, true); if (!ep) return -FI_ENOMSG; if (flags & FI_DISCARD) { - msg_len = rx->hdr.base_hdr.size - rx->hdr.base_hdr.hdr_size; + hdr = saved_entry ? &saved_entry->hdr : &ep->cur_rx.hdr; + msg_len = hdr->base_hdr.size - hdr->base_hdr.hdr_size; if (msg_len) { recv_entry->user_buf = calloc(1, msg_len); if (!recv_entry->user_buf) @@ -305,8 +346,8 @@ xnet_srx_claim(struct xnet_srx *srx, struct xnet_xfer_entry *recv_entry, } } - if (rx == &ep->saved_rx) { - xnet_complete_saved(ep, recv_entry); + if (saved_entry) { + xnet_recv_saved(saved_entry, recv_entry); } else { ret = xnet_start_recv(ep, recv_entry); if (ret && !OFI_SOCK_TRY_SND_RCV_AGAIN(-ret)) @@ -323,27 +364,32 @@ xnet_srx_peek(struct xnet_srx *srx, struct xnet_xfer_entry *recv_entry, uint64_t flags) { struct xnet_ep *ep; - struct xnet_active_rx *rx; + struct xnet_xfer_entry *saved_entry; + union xnet_hdrs *hdr; struct fi_cq_err_entry err_entry; ssize_t ret = FI_ENOMSG; assert(xnet_progress_locked(xnet_srx2_progress(srx))); assert(srx->rdm); - ep = xnet_find_msg(srx, recv_entry, &rx); + ep = xnet_find_msg(srx, recv_entry, &saved_entry, false); if (!ep) goto nomatch; - memcpy(&recv_entry->hdr, &rx->hdr, (size_t) rx->hdr.base_hdr.hdr_size); + hdr = saved_entry ? &saved_entry->hdr : &ep->cur_rx.hdr; + memcpy(&recv_entry->hdr, hdr, (size_t) hdr->base_hdr.hdr_size); recv_entry->cq_flags |= xnet_rx_completion_flag(ep); if (flags & (FI_CLAIM | FI_DISCARD)) { FI_DBG(&xnet_prov, FI_LOG_EP_DATA, "Marking message for Claim\n"); - if (rx->hdr.base_hdr.flags & XNET_REMOTE_CQ_DATA) - rx->hdr.tag_data_hdr.tag |= XNET_CLAIM_TAG_BIT; + if (hdr->base_hdr.flags & XNET_REMOTE_CQ_DATA) + hdr->tag_data_hdr.tag |= XNET_CLAIM_TAG_BIT; + else + hdr->tag_hdr.tag |= XNET_CLAIM_TAG_BIT; + if (saved_entry) + saved_entry->context = recv_entry->context; else - rx->hdr.tag_hdr.tag |= XNET_CLAIM_TAG_BIT; - rx->claim_ctx = recv_entry->context; + ep->cur_rx.claim_ctx = recv_entry->context; } if (flags & FI_DISCARD) { @@ -368,25 +414,6 @@ xnet_srx_peek(struct xnet_srx *srx, struct xnet_xfer_entry *recv_entry, return FI_SUCCESS; } -static struct xnet_ep * -xnet_search_saved(struct xnet_progress *progress, - struct xnet_xfer_entry *rx_entry) -{ - struct dlist_entry *item; - struct xnet_ep *ep; - - assert(ofi_genlock_held(progress->active_lock)); - dlist_foreach(&progress->saved_tag_list, item) { - ep = container_of(item, struct xnet_ep, saved_entry); - assert(xnet_has_saved_rx(ep)); - assert(ep->state == XNET_CONNECTED); - - if (xnet_check_match(&ep->saved_rx.hdr, rx_entry)) - return ep; - } - return NULL; -} - /* It's possible that an endpoint may be waiting for the message being * posted (i.e. it has an unexpected message). If so, kick off progress * to handle it immediately. @@ -400,6 +427,7 @@ static ssize_t xnet_srx_tag(struct xnet_srx *srx, struct xnet_xfer_entry *recv_entry) { struct xnet_progress *progress; + struct xnet_xfer_entry *saved_entry; struct xnet_ep *ep; struct slist *queue; @@ -412,9 +440,9 @@ xnet_srx_tag(struct xnet_srx *srx, struct xnet_xfer_entry *recv_entry) if ((srx->match_tag_rx == xnet_match_tag) || (recv_entry->src_addr == FI_ADDR_UNSPEC)) { - ep = xnet_search_saved(progress, recv_entry); - if (ep) { - xnet_complete_saved(ep, recv_entry); + saved_entry = xnet_search_saved(progress, recv_entry, true); + if (saved_entry) { + xnet_recv_saved(saved_entry, recv_entry); return 0; } @@ -434,10 +462,15 @@ xnet_srx_tag(struct xnet_srx *srx, struct xnet_xfer_entry *recv_entry) return 0; } - if (xnet_has_saved_rx(ep) && - xnet_check_match(&ep->saved_rx.hdr, recv_entry)) { - xnet_complete_saved(ep, recv_entry); - } else if (xnet_has_unexp(ep)) { + if (ep->saved_cnt) { + saved_entry = xnet_match_saved(ep, recv_entry, true); + if (saved_entry) { + xnet_recv_saved(saved_entry, recv_entry); + return 0; + } + } + + if (xnet_has_unexp(ep)) { assert(!dlist_empty(&ep->unexp_entry)); slist_insert_tail(&recv_entry->entry, queue); xnet_progress_rx(ep);