diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index 6760cf6ce2e..fffec71ec70 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -838,7 +838,8 @@ static void ucp_ep_cleanup_unexp(ucp_ep_h ep) rreq = matchq->exp_req; if (rreq->recv.tag.ep_ptr == (uintptr_t)ep) { ucs_debug("completing req %p", rreq); - ucp_request_complete_tag_recv(rreq, UCS_ERR_CANCELED); + ucp_request_complete_tag_recv(ep->worker, rreq, UCS_ERR_CANCELED, + "rndv_cancel"); kh_del(ucp_tag_frag_hash, &tm->frag_hash, iter); } } else { diff --git a/src/ucp/core/ucp_request.c b/src/ucp/core/ucp_request.c index 9b649554535..c56d133a180 100644 --- a/src/ucp/core/ucp_request.c +++ b/src/ucp/core/ucp_request.c @@ -124,7 +124,8 @@ UCS_PROFILE_FUNC_VOID(ucp_request_cancel, (worker, request), removed = ucp_tag_exp_remove(&worker->tm, req); /* If tag posted to the transport need to wait its completion */ if (removed && !(req->flags & UCP_REQUEST_FLAG_OFFLOADED)) { - ucp_request_complete_tag_recv(req, UCS_ERR_CANCELED); + ucp_request_complete_tag_recv(worker, req, UCS_ERR_CANCELED, + "user_cancel"); } UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker); diff --git a/src/ucp/core/ucp_request.inl b/src/ucp/core/ucp_request.inl index dafa3f8203c..15b2928e04d 100644 --- a/src/ucp/core/ucp_request.inl +++ b/src/ucp/core/ucp_request.inl @@ -98,13 +98,23 @@ ucp_request_complete_send(ucp_request_t *req, ucs_status_t status) } static UCS_F_ALWAYS_INLINE void -ucp_request_complete_tag_recv(ucp_request_t *req, ucs_status_t status) +ucp_request_complete_tag_recv(ucp_worker_h worker, ucp_request_t *req, + ucs_status_t status, const char *state) { ucs_trace_req("completing receive request %p (%p) "UCP_REQUEST_FLAGS_FMT " stag 0x%"PRIx64" len %zu, %s", req, req + 1, UCP_REQUEST_FLAGS_ARG(req->flags), req->recv.tag.info.sender_tag, req->recv.tag.info.length, ucs_status_string(status)); + + if (ucs_unlikely(worker->tm.rndv_debug.queue_length > 0)) { + ucp_tag_rndv_debug_entry_t *entry = + ucp_worker_rndv_debug_entry(worker, req->recv.req_id); + entry->send_tag = req->recv.tag.info.sender_tag; + entry->status = state; + entry->recvd_size = req->recv.tag.info.length; + } + UCS_PROFILE_REQUEST_EVENT(req, "complete_recv", status); ucp_request_complete(req, recv.tag.cb, status, &req->recv.tag.info); } diff --git a/src/ucp/tag/eager_rcv.c b/src/ucp/tag/eager_rcv.c index bbf563caed1..74e2e581e42 100644 --- a/src/ucp/tag/eager_rcv.c +++ b/src/ucp/tag/eager_rcv.c @@ -55,7 +55,7 @@ ucp_eager_offload_handler(void *arg, void *data, size_t length, ucp_eager_expected_handler(worker, req, data, length, recv_tag, flags); req->recv.tag.info.length = length; status = ucp_request_recv_data_unpack(req, data, length, 0, 1); - ucp_request_complete_tag_recv(req, status); + ucp_request_complete_tag_recv(worker, req, status, "exp_offload"); status = UCS_OK; } else { status = ucp_recv_desc_init(worker, data, length, sizeof(ucp_tag_t), @@ -107,7 +107,7 @@ ucp_eager_tagged_handler(void *arg, void *data, size_t length, unsigned am_flags status = ucp_request_recv_data_unpack(req, UCS_PTR_BYTE_OFFSET(data, hdr_len), recv_len, 0, 1); - ucp_request_complete_tag_recv(req, status); + ucp_request_complete_tag_recv(worker, req, status, "eager_only"); } else { eagerf_hdr = data; req->recv.tag.info.length = diff --git a/src/ucp/tag/offload.c b/src/ucp/tag/offload.c index 80fec50ada7..a56697df79f 100644 --- a/src/ucp/tag/offload.c +++ b/src/ucp/tag/offload.c @@ -127,7 +127,7 @@ UCS_PROFILE_FUNC_VOID(ucp_tag_offload_completed, UCP_WORKER_STAT_TAG_OFFLOAD(req->recv.worker, MATCHED); out: --req->recv.tag.wiface->post_count; - ucp_request_complete_tag_recv(req, status); + ucp_request_complete_tag_recv(req->recv.worker, req, status, "offload"); } /* RNDV request matched by the transport. Need to proceed with SW based RNDV */ @@ -145,7 +145,8 @@ UCS_PROFILE_FUNC_VOID(ucp_tag_offload_rndv_cb, --req->recv.tag.wiface->post_count; if (ucs_unlikely(status != UCS_OK)) { ucp_tag_offload_release_buf(req, 1); - ucp_request_complete_tag_recv(req, status); + ucp_request_complete_tag_recv(req->recv.worker, req, status, + "offload_rndv"); return; } diff --git a/src/ucp/tag/rndv.c b/src/ucp/tag/rndv.c index 99fee376b9c..994f975479e 100644 --- a/src/ucp/tag/rndv.c +++ b/src/ucp/tag/rndv.c @@ -518,16 +518,19 @@ static void ucp_rndv_send_frag_atp(ucp_request_t *fsreq, uintptr_t remote_reques ucp_request_send(fsreq, 0); } -static void ucp_rndv_zcopy_recv_req_complete(ucp_request_t *req, ucs_status_t status) +static void +ucp_rndv_zcopy_recv_req_complete(ucp_worker_h worker, ucp_request_t *req, + ucs_status_t status) { ucp_request_recv_buffer_dereg(req); - ucp_request_complete_tag_recv(req, status); + ucp_request_complete_tag_recv(worker, req, status, "rndv_zcopy"); } static void ucp_rndv_complete_rma_get_zcopy(ucp_request_t *rndv_req, ucs_status_t status) { ucp_request_t *rreq = rndv_req->send.rndv_get.rreq; + ucp_worker_h worker = rndv_req->send.ep->worker; ucs_assertv(rndv_req->send.state.dt.offset == rndv_req->send.length, "rndv_req=%p offset=%zu length=%zu", rndv_req, @@ -537,6 +540,12 @@ static void ucp_rndv_complete_rma_get_zcopy(ucp_request_t *rndv_req, ucs_status_string(status)); UCS_PROFILE_REQUEST_EVENT(rreq, "complete_rndv_get", 0); + if (ucs_unlikely(worker->tm.rndv_debug.queue_length > 0)) { + ucp_tag_rndv_debug_entry_t *entry = + ucp_worker_rndv_debug_entry(worker, rndv_req->send.rndv_req_id); + entry->status = "rndv_completed"; + } + ucp_rkey_destroy(rndv_req->send.rndv_get.rkey); ucp_request_send_buffer_dereg(rndv_req); @@ -548,7 +557,7 @@ static void ucp_rndv_complete_rma_get_zcopy(ucp_request_t *rndv_req, ucp_request_put(rndv_req); } - ucp_rndv_zcopy_recv_req_complete(rreq, status); + ucp_rndv_zcopy_recv_req_complete(worker, rreq, status); } static void ucp_rndv_recv_data_init(ucp_request_t *rreq, size_t size) @@ -994,7 +1003,7 @@ static unsigned ucp_rndv_progress_rkey_ptr(void *arg) ucp_rndv_rkey_ptr_rreq_advance(rreq, seg_size); if (ucs_unlikely(status != UCS_OK) || (rreq->recv.state.offset == rndv_req->send.length)) { - ucp_request_complete_tag_recv(rreq, status); + ucp_request_complete_tag_recv(worker, rreq, status, "rkey_ptr"); ucp_rkey_destroy(rndv_req->send.rndv_get.rkey); ucp_rndv_req_send_ats(rndv_req, rreq, rndv_req->send.rndv_get.remote_request, status); @@ -1053,7 +1062,7 @@ static void ucp_rndv_do_rkey_ptr(ucp_request_t *rndv_req, ucp_request_t *rreq, &rkey->tl_rkey[rkey_index].rkey, rndv_rts_hdr->address, &local_ptr); if (status != UCS_OK) { - ucp_request_complete_tag_recv(rreq, status); + ucp_request_complete_tag_recv(worker, rreq, status, "rkey_ptr"); ucp_rkey_destroy(rkey); ucp_rndv_req_send_ats(rndv_req, rreq, rndv_rts_hdr->sreq.reqptr, status); return; @@ -1109,7 +1118,7 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr, rts_seq), ep = ucp_worker_get_ep_by_ptr(worker, rndv_rts_hdr->sreq.ep_ptr); if (ep == NULL) { - ucp_request_complete_tag_recv(rreq, UCS_ERR_CANCELED); + ucp_request_complete_tag_recv(worker, rreq, UCS_ERR_CANCELED, "rndv_no_ep"); goto out; } @@ -1138,7 +1147,7 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr, rts_seq), rndv_rts_hdr->size, rreq->recv.length, rreq); ucp_rndv_req_send_ats(rndv_req, rreq, rndv_rts_hdr->sreq.reqptr, UCS_OK); ucp_request_recv_generic_dt_finish(rreq); - ucp_rndv_zcopy_recv_req_complete(rreq, UCS_ERR_MESSAGE_TRUNCATED); + ucp_rndv_zcopy_recv_req_complete(worker, rreq, UCS_ERR_MESSAGE_TRUNCATED); goto out; } @@ -1210,12 +1219,28 @@ static void ucp_rndv_send_cancel_ack(ucp_worker_h worker, } static void ucp_rndv_unexp_cancel(ucp_worker_h worker, - ucp_rndv_rts_hdr_t *rndv_rts_hdr) + ucp_rndv_rts_hdr_t *rndv_rts_hdr, + uint64_t rts_seq) { const ucp_rndv_rts_hdr_t *rdesc_rts_hdr; - + ucp_tag_rndv_debug_entry_t *entry; ucp_recv_desc_t *rdesc; ucs_list_link_t *list; + uint64_t req_id; + + req_id = worker->rndv_req_id++; + + if (ucs_unlikely(worker->tm.rndv_debug.queue_length > 0)) { + entry = ucp_worker_rndv_debug_entry(worker, req_id); + entry->type = "rndv_cancel"; + entry->rts_seq = rts_seq; + entry->send_tag = rndv_rts_hdr->super.tag; + entry->ep = ucp_worker_get_ep_by_ptr(worker, + rndv_rts_hdr->sreq.ep_ptr); + entry->remote_reqptr = rndv_rts_hdr->sreq.reqptr; + entry->remote_address = rndv_rts_hdr->address; + entry->size = rndv_rts_hdr->size; + } list = ucp_tag_unexp_get_list_for_tag(&worker->tm, rndv_rts_hdr->super.tag); ucs_list_for_each(rdesc, list, tag_list[UCP_RDESC_HASH_LIST]) { @@ -1249,7 +1274,7 @@ ucs_status_t ucp_rndv_process_rts(void *arg, void *data, size_t length, seq = worker->rndv_rts_recv_seq++; if (rndv_rts_hdr->status == UCS_ERR_CANCELED) { - ucp_rndv_unexp_cancel(worker, rndv_rts_hdr); + ucp_rndv_unexp_cancel(worker, rndv_rts_hdr, seq); return UCS_OK; } @@ -1514,7 +1539,8 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_frag_recv_put_completion, (self, status), ucp_request_put(freq); if (req->recv.tag.remaining == 0) { - ucp_request_complete_tag_recv(req, UCS_OK); + ucp_request_complete_tag_recv(req->send.ep->worker, req, UCS_OK, + "freq"); } } @@ -1676,10 +1702,10 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_atp_handler, (arg, data, length, flags), void *arg, void *data, size_t length, unsigned flags) { + ucp_worker_h worker = arg; ucp_reply_hdr_t *rep_hdr = data; ucp_request_t *req; ucp_request_t *rreq; - ucp_worker_h worker; ucp_lane_index_t mem_type_rma_lane; ucp_mem_desc_t *mdesc; ucp_md_index_t md_index; @@ -1731,7 +1757,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_atp_handler, ucp_request_send(req, 0); } else { UCS_PROFILE_REQUEST_EVENT(req, "rndv_atp_recv", 0); - ucp_rndv_zcopy_recv_req_complete(req, UCS_OK); + ucp_rndv_zcopy_recv_req_complete(worker, req, UCS_OK); } return UCS_OK; diff --git a/src/ucp/tag/tag_match.h b/src/ucp/tag/tag_match.h index abe24fc3d34..f15082ba41d 100644 --- a/src/ucp/tag/tag_match.h +++ b/src/ucp/tag/tag_match.h @@ -57,6 +57,7 @@ KHASH_INIT(ucp_tag_frag_hash, uint64_t, ucp_tag_frag_match_t, 1, typedef struct ucp_tag_rndv_debug_entry { const char *type; + const char *status; uint64_t id; uint64_t rts_seq; ucp_ep_h ep; @@ -66,6 +67,7 @@ typedef struct ucp_tag_rndv_debug_entry { uintptr_t remote_reqptr; void *local_address; size_t size; + size_t recvd_size; ucp_request_t *rndv_get_req; ucp_request_t *send_req; ucp_request_t *recv_req; diff --git a/src/ucp/tag/tag_match.inl b/src/ucp/tag/tag_match.inl index 6452265c089..1c0bbbb124d 100644 --- a/src/ucp/tag/tag_match.inl +++ b/src/ucp/tag/tag_match.inl @@ -335,7 +335,8 @@ ucp_tag_request_process_recv_data(ucp_request_t *req, const void *data, if (dereg) { ucp_request_recv_buffer_dereg(req); } - ucp_request_complete_tag_recv(req, status); + ucp_request_complete_tag_recv(req->recv.worker, req, status, + "tag_data_last"); ucs_assert(status != UCS_INPROGRESS); return status; } else { diff --git a/src/ucp/tag/tag_recv.c b/src/ucp/tag/tag_recv.c index 7f26fff3747..b979e295e22 100644 --- a/src/ucp/tag/tag_recv.c +++ b/src/ucp/tag/tag_recv.c @@ -20,13 +20,22 @@ static UCS_F_ALWAYS_INLINE void -ucp_tag_recv_request_completed(ucp_request_t *req, ucs_status_t status, - ucp_tag_recv_info_t *info, const char *function) +ucp_tag_recv_request_completed(ucp_worker_h worker, ucp_request_t *req, + ucs_status_t status, ucp_tag_recv_info_t *info, + const char *function) { ucs_trace_req("%s returning completed request %p (%p) stag 0x%"PRIx64" len %zu, %s", function, req, req + 1, info->sender_tag, info->length, ucs_status_string(status)); + if (ucs_unlikely(worker->tm.rndv_debug.queue_length > 0)) { + ucp_tag_rndv_debug_entry_t *entry = + ucp_worker_rndv_debug_entry(worker, req->recv.req_id); + entry->send_tag = info->sender_tag; + entry->status = "recv_completed1"; + entry->recvd_size = info->length; + } + req->status = status; if ((req->flags |= UCP_REQUEST_FLAG_COMPLETED) & UCP_REQUEST_FLAG_RELEASED) { ucp_request_put(req); @@ -111,7 +120,7 @@ ucp_tag_recv_common(ucp_worker_h worker, void *buffer, size_t count, if (req_flags & UCP_REQUEST_FLAG_CALLBACK) { cb(req + 1, status, &req->recv.tag.info); } - ucp_tag_recv_request_completed(req, status, &req->recv.tag.info, + ucp_tag_recv_request_completed(worker, req, status, &req->recv.tag.info, debug_name); return; }