Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topic/umr v3: temporary PR for shm transport and RNDV protocol #3

Merged
merged 3 commits into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/ucp/api/ucp.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <stdio.h>
#include <sys/types.h>

#define DEBUG(EXP) {fprintf(stderr, "DEBUG(%s->%s[#%d]): " #EXP " = %d\n", __FILE__, __FUNCTION__, __LINE__, EXP);}

BEGIN_C_DECLS

/**
Expand Down
5 changes: 4 additions & 1 deletion src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -1435,7 +1435,10 @@ ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config,

/* GET Zcopy */
if (iface_attr->cap.flags & UCT_IFACE_FLAG_GET_ZCOPY) {
config->tag.rndv.min_get_zcopy = ucs_max(config->tag.rndv.min_get_zcopy,
/* YQ: DEBUG */
//config->tag.rndv.min_get_zcopy = ucs_max(config->tag.rndv.min_get_zcopy,
// iface_attr->cap.get.min_zcopy);
config->tag.rndv.min_get_zcopy = ucs_min(config->tag.rndv.min_get_zcopy,
iface_attr->cap.get.min_zcopy);

config->tag.rndv.max_get_zcopy = ucs_min(config->tag.rndv.max_get_zcopy,
Expand Down
5 changes: 4 additions & 1 deletion src/ucp/dt/dt_struct.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,10 @@ static size_t _dte_pack( const ucp_dt_struct_t *s,
ptrdiff_t elem_offs = 0;
ucp_dt_struct_t *sub_s;

//DEBUG(0);

/* Seek for the offset */
elem_idx = _elem_by_offset(s, out_offs, &elem_offs_int, &elem_rep_num);
elem_idx = _elem_by_offset(s, out_offset_orig, &elem_offs_int, &elem_rep_num);

while( (0 < len) && elem_rep_num < s->rep_count){
ucp_struct_dt_desc_t *dsc = &s->desc[elem_idx];
Expand Down Expand Up @@ -634,6 +636,7 @@ ucs_status_t ucp_dt_struct_register(ucp_context_t *context, ucp_md_index_t md_id

}
*memh = val.noncontig.memh[0];
if (*memh) (*md_map_p)++;

return status;
}
4 changes: 4 additions & 0 deletions src/ucp/proto/proto_am.inl
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ ucs_status_t ucp_do_am_zcopy_single(uct_pending_req_t *self, uint8_t am_id,
ucp_dt_state_t state = req->send.state.dt;
ucs_status_t status;

//DEBUG(3);

req->send.lane = ucp_ep_get_am_lane(ep);

ucp_dt_iov_copy_uct(ep->worker->context,iov, &iovcnt, max_iov,
Expand Down Expand Up @@ -309,6 +311,8 @@ ucs_status_t ucp_do_am_zcopy_multi(uct_pending_req_t *self, uint8_t am_id_first,
uct_ep_h uct_ep;
int pending_adde_res;

//DEBUG(4);

if (enable_am_bw && (req->send.state.dt.offset != 0)) {
req->send.lane = ucp_send_request_get_am_bw_lane(req);
ucp_send_request_add_reg_lane(req, req->send.lane);
Expand Down
68 changes: 47 additions & 21 deletions src/ucp/tag/rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,25 @@ size_t ucp_tag_rndv_rts_pack(void *dest, void *arg)
rndv_rts_hdr->size = sreq->send.length;

/* Pack remote keys (which can be empty list) */
if (UCP_DT_IS_CONTIG(sreq->send.datatype) &&
if ((UCP_DT_IS_CONTIG(sreq->send.datatype) ||
UCP_DT_IS_STRUCT(sreq->send.datatype)) &&
ucp_rndv_is_get_zcopy(sreq->send.mem_type,
worker->context->config.ext.rndv_mode)) {
/* pack rkey, ask target to do get_zcopy */
rndv_rts_hdr->address = (uintptr_t)sreq->send.buffer;
packed_rkey_size = ucp_rkey_pack_uct(worker->context,
sreq->send.state.dt.dt.contig.md_map,
sreq->send.state.dt.dt.contig.memh,
sreq->send.mem_type,
rndv_rts_hdr + 1);
if (UCP_DT_IS_CONTIG(sreq->send.datatype)) {
packed_rkey_size = ucp_rkey_pack_uct(worker->context,
sreq->send.state.dt.dt.contig.md_map,
sreq->send.state.dt.dt.contig.memh,
sreq->send.mem_type,
rndv_rts_hdr + 1);
} else {
packed_rkey_size = ucp_rkey_pack_uct(worker->context,
sreq->send.state.dt.dt.struct_dt.non_contig.md_map,
sreq->send.state.dt.dt.struct_dt.non_contig.memh,
sreq->send.mem_type,
rndv_rts_hdr + 1);
}
if (packed_rkey_size < 0) {
ucs_fatal("failed to pack rendezvous remote key: %s",
ucs_status_string((ucs_status_t)packed_rkey_size));
Expand Down Expand Up @@ -112,16 +121,25 @@ static size_t ucp_tag_rndv_rtr_pack(void *dest, void *arg)
rndv_rtr_hdr->rreq_ptr = (uintptr_t)rreq; /* request of receiver side */

/* Pack remote keys (which can be empty list) */
if (UCP_DT_IS_CONTIG(rreq->recv.datatype)) {
if (UCP_DT_IS_CONTIG(rreq->recv.datatype) ||
UCP_DT_IS_STRUCT(rreq->recv.datatype)) {
rndv_rtr_hdr->address = (uintptr_t)rreq->recv.buffer;
rndv_rtr_hdr->size = rndv_req->send.rndv_rtr.length;
rndv_rtr_hdr->offset = rreq->recv.frag.offset;

packed_rkey_size = ucp_rkey_pack_uct(rndv_req->send.ep->worker->context,
rreq->recv.state.dt.contig.md_map,
rreq->recv.state.dt.contig.memh,
rreq->recv.mem_type,
rndv_rtr_hdr + 1);
if (UCP_DT_IS_CONTIG(rreq->recv.datatype)) {
packed_rkey_size = ucp_rkey_pack_uct(rndv_req->send.ep->worker->context,
rreq->recv.state.dt.contig.md_map,
rreq->recv.state.dt.contig.memh,
rreq->recv.mem_type,
rndv_rtr_hdr + 1);
} else {
packed_rkey_size = ucp_rkey_pack_uct(rndv_req->send.ep->worker->context,
rreq->recv.state.dt.struct_dt.non_contig.md_map,
rreq->recv.state.dt.struct_dt.non_contig.memh,
rreq->recv.mem_type,
rndv_rtr_hdr + 1);
}
if (packed_rkey_size < 0) {
return packed_rkey_size;
}
Expand Down Expand Up @@ -160,7 +178,8 @@ ucs_status_t ucp_tag_rndv_reg_send_buffer(ucp_request_t *sreq)
ucp_md_map_t md_map;
ucs_status_t status;

if (UCP_DT_IS_CONTIG(sreq->send.datatype) &&
if ((UCP_DT_IS_CONTIG(sreq->send.datatype) ||
UCP_DT_IS_STRUCT(sreq->send.datatype)) &&
ucp_rndv_is_get_zcopy(sreq->send.mem_type,
ep->worker->context->config.ext.rndv_mode)) {

Expand Down Expand Up @@ -427,6 +446,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_progress_rma_get_zcopy, (self),
int pending_add_res;
ucp_lane_index_t lane;

//DEBUG(2);

ucp_rndv_get_lanes_count(rndv_req);

/* Figure out which lane to use for get operation */
Expand Down Expand Up @@ -503,7 +524,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_progress_rma_get_zcopy, (self),
* registration is not supported. for now SHM may avoid registration,
* but it will work on single lane */
ucp_dt_iov_copy_uct(ep->worker->context, iov, &iovcnt, max_iovcnt, &state,
rndv_req->send.buffer, ucp_dt_make_contig(1), length,
rndv_req->send.buffer, rndv_req->send.datatype, length,
ucp_ep_md_index(ep, lane),
rndv_req->send.mdesc);

Expand Down Expand Up @@ -574,14 +595,13 @@ static void ucp_rndv_req_send_rma_get(ucp_request_t *rndv_req, ucp_request_t *rr
rndv_req->send.uct.func = ucp_rndv_progress_rma_get_zcopy;
rndv_req->send.buffer = rreq->recv.buffer;
rndv_req->send.mem_type = rreq->recv.mem_type;
rndv_req->send.datatype = ucp_dt_make_contig(1);
rndv_req->send.datatype = rreq->recv.datatype;
rndv_req->send.length = rndv_rts_hdr->size;
rndv_req->send.rndv_get.remote_request = rndv_rts_hdr->sreq.reqptr;
rndv_req->send.rndv_get.remote_address = rndv_rts_hdr->address;
rndv_req->send.rndv_get.rreq = rreq;
rndv_req->send.rndv_get.lanes_map = 0;
rndv_req->send.rndv_get.lane_count = 0;
rndv_req->send.datatype = rreq->recv.datatype;

status = ucp_ep_rkey_unpack(rndv_req->send.ep, rndv_rts_hdr + 1,
&rndv_req->send.rndv_get.rkey);
Expand All @@ -590,7 +610,7 @@ static void ucp_rndv_req_send_rma_get(ucp_request_t *rndv_req, ucp_request_t *rr
ucp_ep_peer_name(rndv_req->send.ep), ucs_status_string(status));
}

ucp_request_send_state_init(rndv_req, ucp_dt_make_contig(1), 0);
ucp_request_send_state_init(rndv_req, rndv_req->send.datatype, 0);
ucp_request_send_state_reset(rndv_req, ucp_rndv_get_completion,
UCP_REQUEST_SEND_PROTO_RNDV_GET);

Expand Down Expand Up @@ -784,13 +804,15 @@ UCS_PROFILE_FUNC_VOID(ucp_rndv_matched, (worker, rreq, rndv_rts_hdr),
/* if the receive side is not connected yet then the RTS was received on a stub ep */
ep = rndv_req->send.ep;
rndv_mode = worker->context->config.ext.rndv_mode;
if (!rreq->send.ep) rreq->send.ep = ep;

if (ucp_rndv_is_rkey_ptr(rndv_rts_hdr, ep, rreq->recv.mem_type, rndv_mode)) {
ucp_rndv_do_rkey_ptr(rndv_req, rreq, rndv_rts_hdr);
goto out;
}

if (UCP_DT_IS_CONTIG(rreq->recv.datatype)) {
if (UCP_DT_IS_CONTIG(rreq->recv.datatype) ||
UCP_DT_IS_STRUCT(rreq->recv.datatype)) {
if ((rndv_rts_hdr->address != 0) &&
(ucp_rndv_is_get_zcopy(rreq->recv.mem_type, rndv_mode)) &&
/* is it allowed to use GET Zcopy for the current message? */
Expand Down Expand Up @@ -939,6 +961,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_progress_rma_put_zcopy, (self),
size_t iovcnt;
ucp_dt_state_t state;

//DEBUG(1);

if (!sreq->send.mdesc) {
status = ucp_request_send_buffer_reg_lane(sreq, sreq->send.lane, 0);
ucs_assert_always(status == UCS_OK);
Expand All @@ -965,7 +989,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_progress_rma_put_zcopy, (self),

state = sreq->send.state.dt;
ucp_dt_iov_copy_uct(ep->worker->context, iov, &iovcnt, max_iovcnt, &state,
sreq->send.buffer, ucp_dt_make_contig(1), length,
sreq->send.buffer, sreq->send.datatype, length,
ucp_ep_md_index(ep, sreq->send.lane), sreq->send.mdesc);
status = uct_ep_put_zcopy(ep->uct_eps[sreq->send.lane],
iov, iovcnt,
Expand Down Expand Up @@ -1311,7 +1335,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_rtr_handler,
ucp_tag_offload_cancel_rndv(sreq);
}

if (UCP_DT_IS_CONTIG(sreq->send.datatype) && rndv_rtr_hdr->address) {
if ((UCP_DT_IS_CONTIG(sreq->send.datatype) ||
UCP_DT_IS_STRUCT(sreq->send.datatype)) && rndv_rtr_hdr->address) {
status = ucp_ep_rkey_unpack(ep, rndv_rtr_hdr + 1,
&sreq->send.rndv_put.rkey);
if (status != UCS_OK) {
Expand Down Expand Up @@ -1368,7 +1393,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_rtr_handler,
/* switch to AM */
sreq->send.tag.rreq_ptr = rndv_rtr_hdr->rreq_ptr;

if (UCP_DT_IS_CONTIG(sreq->send.datatype) &&
if ((UCP_DT_IS_CONTIG(sreq->send.datatype) ||
UCP_DT_IS_STRUCT(sreq->send.datatype)) &&
(sreq->send.length >=
ucp_ep_config(ep)->am.mem_type_zcopy_thresh[sreq->send.mem_type]))
{
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/tag/tag_send.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ ucp_tag_get_rndv_threshold(const ucp_request_t *req, size_t count,
case UCP_DATATYPE_GENERIC:
return rndv_am_thresh;
case UCP_DATATYPE_STRUCT:
return (size_t)(-1);
return ucs_min(rndv_rma_thresh, rndv_am_thresh);
default:
ucs_error("Invalid data type %lx", req->send.datatype);
}
Expand Down