diff --git a/prov/verbs/src/fi_verbs.h b/prov/verbs/src/fi_verbs.h index 29ff483fb4c..5ccd04b4735 100644 --- a/prov/verbs/src/fi_verbs.h +++ b/prov/verbs/src/fi_verbs.h @@ -577,6 +577,7 @@ struct vrb_ep { /* Protected by send CQ lock */ uint64_t sq_credits; uint64_t peer_rq_credits; + struct slist sq_list; /* Protected by recv CQ lock */ int64_t rq_credits_avail; int64_t threshold; @@ -619,15 +620,22 @@ struct vrb_ep { }; -/* Must be cast-able to struct fi_context */ +enum vrb_op_ctx { + VRB_POST_SQ, + VRB_POST_RQ, + VRB_POST_SRQ, +}; + struct vrb_context { - struct vrb_ep *ep; - struct vrb_srq_ep *srx; + struct slist_entry entry; + union { + struct vrb_ep *ep; + struct vrb_srq_ep *srx; + }; void *user_ctx; - uint32_t flags; + enum vrb_op_ctx op_ctx; }; - #define VERBS_XRC_EP_MAGIC 0x1F3D5B79 struct vrb_xrc_ep { /* Must be first */ diff --git a/prov/verbs/src/verbs_cq.c b/prov/verbs/src/verbs_cq.c index 541982417c3..e8f306b8e53 100644 --- a/prov/verbs/src/verbs_cq.c +++ b/prov/verbs/src/verbs_cq.c @@ -239,18 +239,22 @@ int vrb_poll_cq(struct vrb_cq *cq, struct ibv_wc *wc) ctx = (struct vrb_context *) (uintptr_t) wc->wr_id; wc->wr_id = (uintptr_t) ctx->user_ctx; - if (ctx->flags & FI_TRANSMIT) { + if (ctx->op_ctx == VRB_POST_SQ) { + assert(ctx->ep); + assert(!slist_empty(&ctx->ep->sq_list)); + assert(ctx->ep->sq_list.head == &ctx->entry); + (void) slist_remove_head(&ctx->ep->sq_list); cq->credits++; ctx->ep->sq_credits++; } if (wc->status) { - if (ctx->flags & FI_RECV) - wc->opcode |= IBV_WC_RECV; - else + if (ctx->op_ctx == VRB_POST_SQ) wc->opcode &= ~IBV_WC_RECV; + else + wc->opcode |= IBV_WC_RECV; } - if (ctx->srx) { + if (ctx->op_ctx == VRB_POST_SRQ) { fastlock_acquire(&ctx->srx->ctx_lock); ofi_buf_free(ctx); fastlock_release(&ctx->srx->ctx_lock); diff --git a/prov/verbs/src/verbs_ep.c b/prov/verbs/src/verbs_ep.c index bae5b153b71..a60c89211b1 100644 --- a/prov/verbs/src/verbs_ep.c +++ b/prov/verbs/src/verbs_ep.c @@ -69,9 +69,9 @@ ssize_t vrb_post_recv(struct vrb_ep *ep, struct ibv_recv_wr *wr) if (!ctx) goto unlock; - ctx->ep = ep; + OFI_DBG_SET(ctx->ep, ep); ctx->user_ctx = (void *) (uintptr_t) wr->wr_id; - ctx->flags = FI_RECV; + ctx->op_ctx = VRB_POST_RQ; wr->wr_id = (uintptr_t) ctx; ret = ibv_post_recv(ep->ibv_qp, wr, &bad_wr); @@ -143,7 +143,7 @@ ssize_t vrb_post_send(struct vrb_ep *ep, struct ibv_send_wr *wr, uint64_t flags) ctx->ep = ep; ctx->user_ctx = (void *) (uintptr_t) wr->wr_id; - ctx->flags = FI_TRANSMIT | flags; + ctx->op_ctx = VRB_POST_SQ; wr->wr_id = (uintptr_t) ctx; ret = ibv_post_send(ep->ibv_qp, wr, &bad_wr); @@ -153,6 +153,7 @@ ssize_t vrb_post_send(struct vrb_ep *ep, struct ibv_send_wr *wr, uint64_t flags) vrb_convert_ret(ret)); goto credits; } + slist_insert_tail(&ctx->entry, &ep->sq_list); cq->util_cq.cq_fastlock_release(&cq->util_cq.cq_lock); return 0; @@ -391,6 +392,7 @@ vrb_alloc_init_ep(struct fi_info *info, struct vrb_domain *domain, goto err2; } + slist_init(&ep->sq_list); ep->util_ep.ep_fid.msg = calloc(1, sizeof(*ep->util_ep.ep_fid.msg)); if (!ep->util_ep.ep_fid.msg) goto err3; @@ -405,6 +407,41 @@ vrb_alloc_init_ep(struct fi_info *info, struct vrb_domain *domain, return NULL; } +/* Generate flush completion entries for any queued send requests. + * We only need to record the wr_id and that the entry was not a + * receive (indicated by lack of IBV_WC_RECV flag). + */ +static void vrb_flush_sq(struct vrb_ep *ep) +{ + struct vrb_context *ctx; + struct vrb_cq *cq; + struct slist_entry *entry; + struct ibv_wc wc = {0}; + + if (!ep->util_ep.tx_cq) + return; + + cq = container_of(ep->util_ep.tx_cq, struct vrb_cq, util_cq); + wc.status = IBV_WC_WR_FLUSH_ERR; + wc.vendor_err = FI_ECANCELED; + + cq->util_cq.cq_fastlock_acquire(&cq->util_cq.cq_lock); + while (!slist_empty(&ep->sq_list)) { + entry = slist_remove_head(&ep->sq_list); + ctx = container_of(entry, struct vrb_context, entry); + assert(ctx->op_ctx == VRB_POST_SQ); + + wc.wr_id = (uintptr_t) ctx->user_ctx; + cq->credits++; + ctx->ep->sq_credits++; + ofi_buf_free(ctx); + + if (wc.wr_id != VERBS_NO_COMP_FLAG) + vrb_save_wc(cq, &wc); + } + cq->util_cq.cq_fastlock_release(&cq->util_cq.cq_lock); +} + static int vrb_close_free_ep(struct vrb_ep *ep) { struct vrb_cq *cq; @@ -478,6 +515,7 @@ static int vrb_ep_close(fid_t fid) if (ep->eq) fastlock_release(&ep->eq->lock); vrb_cleanup_cq(ep); + vrb_flush_sq(ep); break; case FI_EP_DGRAM: fab = container_of(&ep->util_ep.domain->fabric->fabric_fid, @@ -490,6 +528,7 @@ static int vrb_ep_close(fid_t fid) return -errno; } vrb_cleanup_cq(ep); + vrb_flush_sq(ep); break; default: VRB_WARN(FI_LOG_DOMAIN, "Unknown EP type\n"); @@ -1464,7 +1503,7 @@ ssize_t vrb_post_srq(struct vrb_srq_ep *ep, struct ibv_recv_wr *wr) ctx->srx = ep; ctx->user_ctx = (void *) (uintptr_t) wr->wr_id; - ctx->flags = FI_RECV; + ctx->op_ctx = VRB_POST_SRQ; wr->wr_id = (uintptr_t) ctx; ret = ibv_post_srq_recv(ep->srq, wr, &bad_wr);