Skip to content

Commit

Permalink
Change openucx#8
Browse files Browse the repository at this point in the history
  • Loading branch information
andypauloramirez authored Aug 16, 2022
1 parent 753457e commit e99e73c
Showing 1 changed file with 124 additions and 104 deletions.
228 changes: 124 additions & 104 deletions examples/ucp_client_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ static int num_iterations = DEFAULT_NUM_ITERATIONS;
typedef enum {
/* CLIENT_SERVER_SEND_RECV_STREAM = UCS_BIT(0), */
CLIENT_SERVER_SEND_RECV_TAG = UCS_BIT(0), /*----------MODIFIED IBA 1 */
CLIENT_SERVER_SEND_RECV_AM = UCS_BIT(1), /*----------MODIFIED IBA 2 */
/*CLIENT_SERVER_SEND_RECV_AM = UCS_BIT(1), */ /*----------MODIFIED IBA 2 */
CLIENT_SERVER_SEND_RECV_DEFAULT = CLIENT_SERVER_SEND_RECV_TAG /*----------MODIFIED */
} send_recv_type_t;

Expand Down Expand Up @@ -172,12 +172,14 @@ static void tag_recv_cb(void *request, ucs_status_t status,
* The callback on the receiving side, which is invoked upon receiving the
* active message.
*/
static void am_recv_cb(void *request, ucs_status_t status, size_t length,
void *user_data)
{
common_cb(user_data, "am_recv_cb");
}

/** --------------MODIFIED AM -----------------------------------
*static void am_recv_cb(void *request, ucs_status_t status, size_t length,
* void *user_data)
*{
* common_cb(user_data, "am_recv_cb");
*}
* -------------------MODIFIED AM-------------------------------------
**/
/**
* The callback on the sending side, which is invoked after finishing sending
* the message.
Expand Down Expand Up @@ -472,111 +474,124 @@ static int send_recv_tag(ucp_worker_h ucp_worker, ucp_ep_h ep, int is_server,
current_iter);
}

ucs_status_t ucp_am_data_cb(void *arg, const void *header, size_t header_length,
void *data, size_t length,
const ucp_am_recv_param_t *param)
{
ucp_dt_iov_t *iov;
size_t idx;
size_t offset;

if (length != iov_cnt * test_string_length) {
fprintf(stderr, "received wrong data length %ld (expected %ld)",
length, iov_cnt * test_string_length);
return UCS_OK;
}
/** --------------------------MODIFIED AM-----------------------------------------
*ucs_status_t ucp_am_data_cb(void *arg, const void *header, size_t header_length,
* void *data, size_t length,
* const ucp_am_recv_param_t *param)
*{
* ucp_dt_iov_t *iov;
* size_t idx;
* size_t offset;
* if (length != iov_cnt * test_string_length) {
* fprintf(stderr, "received wrong data length %ld (expected %ld)",
* length, iov_cnt * test_string_length);
* return UCS_OK;
* }
if (header_length != 0) {
fprintf(stderr, "received unexpected header, length %ld", header_length);
}
* if (header_length != 0) {
* fprintf(stderr, "received unexpected header, length %ld", header_length);
* }
am_data_desc.complete = 1;
* am_data_desc.complete = 1;
if (param->recv_attr & UCP_AM_RECV_ATTR_FLAG_RNDV) {
* if (param->recv_attr & UCP_AM_RECV_ATTR_FLAG_RNDV) {
/* Rendezvous request arrived, data contains an internal UCX descriptor,
* which has to be passed to ucp_am_recv_data_nbx function to confirm
* data transfer.
*/
am_data_desc.is_rndv = 1;
am_data_desc.desc = data;
return UCS_INPROGRESS;
}

**/
/**
* am_data_desc.is_rndv = 1;
* am_data_desc.desc = data;
* return UCS_INPROGRESS;
* }
**/
/* Message delivered with eager protocol, data should be available
* immediately
*/
am_data_desc.is_rndv = 0;
/* am_data_desc.is_rndv = 0; */

iov = am_data_desc.recv_buf;
offset = 0;
for (idx = 0; idx < iov_cnt; idx++) {
mem_type_memcpy(iov[idx].buffer, UCS_PTR_BYTE_OFFSET(data, offset),
iov[idx].length);
offset += iov[idx].length;
}
/**
* iov = am_data_desc.recv_buf;
* offset = 0;
* for (idx = 0; idx < iov_cnt; idx++) {
* mem_type_memcpy(iov[idx].buffer, UCS_PTR_BYTE_OFFSET(data, offset),
* iov[idx].length);
* offset += iov[idx].length;
* }
* return UCS_OK;
*}
**/

return UCS_OK;
}

/**
* Send and receive a message using Active Message API.
* The client sends a message to the server and waits until the send is completed.
* The server gets a message from the client and if it is rendezvous request,
* initiates receive operation.
*/
static int send_recv_am(ucp_worker_h ucp_worker, ucp_ep_h ep, int is_server,
int current_iter)
{
ucp_dt_iov_t *iov = alloca(iov_cnt * sizeof(ucp_dt_iov_t));
test_req_t *request;
ucp_request_param_t params;
size_t msg_length;
void *msg;
test_req_t ctx;
**/

memset(iov, 0, iov_cnt * sizeof(*iov));
/**
*static int send_recv_am(ucp_worker_h ucp_worker, ucp_ep_h ep, int is_server,
* int current_iter)
*{
* ucp_dt_iov_t *iov = alloca(iov_cnt * sizeof(ucp_dt_iov_t));
* test_req_t *request;
* ucp_request_param_t params;
* size_t msg_length;
* void *msg;
* test_req_t ctx;
if (fill_request_param(iov, !is_server, &msg, &msg_length,
&ctx, &params) != 0) {
return -1;
}
* memset(iov, 0, iov_cnt * sizeof(*iov));
if (is_server) {
am_data_desc.recv_buf = iov;
* if (fill_request_param(iov, !is_server, &msg, &msg_length,
* &ctx, &params) != 0) {
* return -1;
* }
* if (is_server) {
* am_data_desc.recv_buf = iov;
/* waiting for AM callback has called */
while (!am_data_desc.complete) {
ucp_worker_progress(ucp_worker);
}
/**
* while (!am_data_desc.complete) {
* ucp_worker_progress(ucp_worker);
* }
am_data_desc.complete = 0;
* am_data_desc.complete = 0;
if (am_data_desc.is_rndv) {
* if (am_data_desc.is_rndv) {
**/
/* Rendezvous request has arrived, need to invoke receive operation
* to confirm data transfer from the sender to the "recv_message"
* buffer. */
params.op_attr_mask |= UCP_OP_ATTR_FLAG_NO_IMM_CMPL;
params.cb.recv_am = am_recv_cb,
request = ucp_am_recv_data_nbx(ucp_worker,
am_data_desc.desc,
msg, msg_length,
&params);
} else {
/**
* params.op_attr_mask |= UCP_OP_ATTR_FLAG_NO_IMM_CMPL;
* params.cb.recv_am = am_recv_cb,
* request = ucp_am_recv_data_nbx(ucp_worker,
* am_data_desc.desc,
* msg, msg_length,
* &params);
**/
/* } else { */
/* Data has arrived eagerly and is ready for use, no need to
* initiate receive operation. */
request = NULL;
}
} else {
/**
* request = NULL;
* }
* } else {
/* Client sends a message to the server using the AM API */
params.cb.send = (ucp_send_nbx_callback_t)send_cb,
request = ucp_am_send_nbx(ep, TEST_AM_ID, NULL, 0ul, msg,
msg_length, &params);
}

return request_finalize(ucp_worker, request, &ctx, is_server, iov,
current_iter);
}
/**
* params.cb.send = (ucp_send_nbx_callback_t)send_cb,
* request = ucp_am_send_nbx(ep, TEST_AM_ID, NULL, 0ul, msg,
* msg_length, &params);
* }
* return request_finalize(ucp_worker, request, &ctx, is_server, iov,
* current_iter);
*}
**/
/**
* Print this application's usage help message.
*/
Expand All @@ -598,7 +613,7 @@ static void usage()
" Valid values are:\n"
/* " 'stream' : Stream API\n" */
" 'tag' : Tag API\n"
" 'am' : AM API\n"
/* " 'am' : AM API\n" */
" If not specified, %s API will be used.\n", COMM_TYPE_DEFAULT);
fprintf(stderr, " -i Number of iterations to run. Client and server must "
"have the same value. (default = %d).\n",
Expand Down Expand Up @@ -630,8 +645,9 @@ static int parse_cmd(int argc, char *const argv[], char **server_addr,
}*/
if (!strcasecmp(optarg, "tag")) {
*send_recv_type = CLIENT_SERVER_SEND_RECV_TAG;
} else if (!strcasecmp(optarg, "am")) {
} /* else if (!strcasecmp(optarg, "am")) {
*send_recv_type = CLIENT_SERVER_SEND_RECV_AM;
*/
} else {
fprintf(stderr, "Wrong communication type %s. "
"Using %s as default\n", optarg, COMM_TYPE_DEFAULT);
Expand Down Expand Up @@ -740,10 +756,10 @@ static int client_server_communication(ucp_worker_h worker, ucp_ep_h ep,
/* Client-Server communication via Tag-Matching API */
ret = send_recv_tag(worker, ep, is_server, current_iter);
break;
case CLIENT_SERVER_SEND_RECV_AM:
/* case CLIENT_SERVER_SEND_RECV_AM: */
/* Client-Server communication via AM API. */
ret = send_recv_am(worker, ep, is_server, current_iter);
break;
/* ret = send_recv_am(worker, ep, is_server, current_iter);
break; */
default:
fprintf(stderr, "unknown send-recv type %d\n", send_recv_type);
return -1;
Expand Down Expand Up @@ -924,23 +940,27 @@ static int run_server(ucp_context_h ucp_context, ucp_worker_h ucp_worker,
if (ret != 0) {
goto err;
}


if (send_recv_type == CLIENT_SERVER_SEND_RECV_AM) {
/* if (send_recv_type == CLIENT_SERVER_SEND_RECV_AM) { */
/* Initialize Active Message data handler */
param.field_mask = UCP_AM_HANDLER_PARAM_FIELD_ID |
UCP_AM_HANDLER_PARAM_FIELD_CB |
UCP_AM_HANDLER_PARAM_FIELD_ARG;
param.id = TEST_AM_ID;
param.cb = ucp_am_data_cb;
param.arg = ucp_data_worker; /* not used in our callback */
status = ucp_worker_set_am_recv_handler(ucp_data_worker,
&param);
if (status != UCS_OK) {
ret = -1;
goto err_worker;
}
}

/**
* param.field_mask = UCP_AM_HANDLER_PARAM_FIELD_ID |
* UCP_AM_HANDLER_PARAM_FIELD_CB |
* UCP_AM_HANDLER_PARAM_FIELD_ARG;
* param.id = TEST_AM_ID;
* param.cb = ucp_am_data_cb;
**/

/* param.arg = ucp_data_worker; */ /* not used in our callback */
/** status = ucp_worker_set_am_recv_handler(ucp_data_worker,
* &param);
* if (status != UCS_OK) {
* ret = -1;
* goto err_worker;
* }
* }
**/
/* Initialize the server's context. */
context.conn_request = NULL;

Expand Down Expand Up @@ -1046,9 +1066,9 @@ static int init_context(ucp_context_h *ucp_context, ucp_worker_h *ucp_worker,
}**/
if (send_recv_type == CLIENT_SERVER_SEND_RECV_TAG) { /*---------elseif en tag-----*/
ucp_params.features = UCP_FEATURE_TAG;
} else {
ucp_params.features = UCP_FEATURE_AM;
}
} /**else {
*ucp_params.features = UCP_FEATURE_AM;
}**/

status = ucp_init(&ucp_params, NULL, ucp_context);
if (status != UCS_OK) {
Expand Down

0 comments on commit e99e73c

Please sign in to comment.