Skip to content

Commit

Permalink
trial openucx#13
Browse files Browse the repository at this point in the history
  • Loading branch information
andypauloramirez authored Aug 16, 2022
1 parent 6fcaca0 commit 98a24d2
Showing 1 changed file with 53 additions and 52 deletions.
105 changes: 53 additions & 52 deletions examples/ucp_client_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
#define IP_STRING_LEN 50
#define PORT_STRING_LEN 8
#define TAG 0xCAFE
#define COMM_TYPE_DEFAULT "STREAM"
#define COMM_TYPE_DEFAULT "TAG" //-------------MODIFIED IBA STREAM
#define PRINT_INTERVAL 2000
#define DEFAULT_NUM_ITERATIONS 1
#define TEST_AM_ID 0
Expand All @@ -55,10 +55,10 @@ static int num_iterations = DEFAULT_NUM_ITERATIONS;


typedef enum {
CLIENT_SERVER_SEND_RECV_STREAM = UCS_BIT(0),
CLIENT_SERVER_SEND_RECV_TAG = UCS_BIT(1),
CLIENT_SERVER_SEND_RECV_AM = UCS_BIT(2),
CLIENT_SERVER_SEND_RECV_DEFAULT = CLIENT_SERVER_SEND_RECV_STREAM
// CLIENT_SERVER_SEND_RECV_STREAM = UCS_BIT(0),
CLIENT_SERVER_SEND_RECV_TAG = UCS_BIT(0), //----------MODIFIED IBA 1
CLIENT_SERVER_SEND_RECV_AM = UCS_BIT(1), //----------MODIFIED IBA 2
CLIENT_SERVER_SEND_RECV_DEFAULT = CLIENT_SERVER_SEND_RECV_TAG //----------MODIFIED
} send_recv_type_t;


Expand Down Expand Up @@ -162,11 +162,11 @@ static void tag_recv_cb(void *request, ucs_status_t status,
* stream message.
*/
//--------------------------MODIFIED-----------------------------------------------
static void stream_recv_cb(void *request, ucs_status_t status, size_t length,
void *user_data)
{
common_cb(user_data, "stream_recv_cb");
}
//static void stream_recv_cb(void *request, ucs_status_t status, size_t length,
// void *user_data)
//{
// common_cb(user_data, "stream_recv_cb");
//}
//-------------------------MODIFIED------------------------------------------------
/**
* The callback on the receiving side, which is invoked upon receiving the
Expand Down Expand Up @@ -398,41 +398,41 @@ fill_request_param(ucp_dt_iov_t *iov, int is_client,
* The client sends a message to the server and waits until the send it completed.
* The server receives a message from the client and waits for its completion.
*/
//-------------------------------------------------------------------------------------------------
static int send_recv_stream(ucp_worker_h ucp_worker, ucp_ep_h ep, int is_server,
int current_iter)
{
ucp_dt_iov_t *iov = alloca(iov_cnt * sizeof(ucp_dt_iov_t));
ucp_request_param_t param;
test_req_t *request;
size_t msg_length;
void *msg;
test_req_t ctx;

memset(iov, 0, iov_cnt * sizeof(*iov));
//static int send_recv_stream(ucp_worker_h ucp_worker, ucp_ep_h ep, int is_server,
// int current_iter)
//{
// ucp_dt_iov_t *iov = alloca(iov_cnt * sizeof(ucp_dt_iov_t));
// ucp_request_param_t param;
// test_req_t *request;
// size_t msg_length;
// void *msg;
// test_req_t ctx;

if (fill_request_param(iov, !is_server, &msg, &msg_length,
&ctx, &param) != 0) {
return -1;
}
// memset(iov, 0, iov_cnt * sizeof(*iov));

if (!is_server) {
// if (fill_request_param(iov, !is_server, &msg, &msg_length,
// &ctx, &param) != 0) {
// return -1;
// }

// if (!is_server) {
/* Client sends a message to the server using the stream API */
param.cb.send = send_cb;
request = ucp_stream_send_nbx(ep, msg, msg_length, &param);
} else {
/* Server receives a message from the client using the stream API */
param.op_attr_mask |= UCP_OP_ATTR_FIELD_FLAGS;
param.flags = UCP_STREAM_RECV_FLAG_WAITALL;
param.cb.recv_stream = stream_recv_cb; //-------------MODIFIED-------------------
request = ucp_stream_recv_nbx(ep, msg, msg_length,
&msg_length, &param);
}
// param.cb.send = send_cb;
// request = ucp_stream_send_nbx(ep, msg, msg_length, &param);
// } else {
// /* Server receives a message from the client using the stream API */
// param.op_attr_mask |= UCP_OP_ATTR_FIELD_FLAGS;
// param.flags = UCP_STREAM_RECV_FLAG_WAITALL;
// //param.cb.recv_stream = stream_recv_cb; -------------MODIFIED-------------------
// request = ucp_stream_recv_nbx(ep, msg, msg_length,
// &msg_length, &param);
// }

// return request_finalize(ucp_worker, request, &ctx, is_server, iov,
// current_iter);
//}

return request_finalize(ucp_worker, request, &ctx, is_server, iov,
current_iter);
}
//---------------------------------------------------------------------------------------------------
/**
* Send and receive a message using the Tag-Matching API.
* The client sends a message to the server and waits until the send it completed.
Expand Down Expand Up @@ -594,7 +594,7 @@ static void usage()
DEFAULT_PORT);
fprintf(stderr, " -c Communication type for the client and server. "
" Valid values are:\n"
" 'stream' : Stream API\n"
// " 'stream' : Stream API\n"
" 'tag' : Tag API\n"
" 'am' : AM API\n"
" If not specified, %s API will be used.\n", COMM_TYPE_DEFAULT);
Expand Down Expand Up @@ -623,9 +623,10 @@ static int parse_cmd(int argc, char *const argv[], char **server_addr,
*server_addr = optarg;
break;
case 'c':
if (!strcasecmp(optarg, "stream")) {
*send_recv_type = CLIENT_SERVER_SEND_RECV_STREAM;
} else if (!strcasecmp(optarg, "tag")) {
//if (!strcasecmp(optarg, "stream")) {
// *send_recv_type = CLIENT_SERVER_SEND_RECV_STREAM;
//}
if (!strcasecmp(optarg, "tag")) {
*send_recv_type = CLIENT_SERVER_SEND_RECV_TAG;
} else if (!strcasecmp(optarg, "am")) {
*send_recv_type = CLIENT_SERVER_SEND_RECV_AM;
Expand Down Expand Up @@ -729,10 +730,10 @@ static int client_server_communication(ucp_worker_h worker, ucp_ep_h ep,
int ret;

switch (send_recv_type) {
case CLIENT_SERVER_SEND_RECV_STREAM:
//case CLIENT_SERVER_SEND_RECV_STREAM:
/* Client-Server communication via Stream API */
ret = send_recv_stream(worker, ep, is_server, current_iter);
break;
// ret = send_recv_stream(worker, ep, is_server, current_iter); //----------MODIFIED--------------
//break;
case CLIENT_SERVER_SEND_RECV_TAG:
/* Client-Server communication via Tag-Matching API */
ret = send_recv_tag(worker, ep, is_server, current_iter);
Expand Down Expand Up @@ -1038,10 +1039,10 @@ static int init_context(ucp_context_h *ucp_context, ucp_worker_h *ucp_worker,
/* UCP initialization */
ucp_params.field_mask = UCP_PARAM_FIELD_FEATURES;

if (send_recv_type == CLIENT_SERVER_SEND_RECV_STREAM) { //-------MODIFICADO------
ucp_params.features = UCP_FEATURE_STREAM;
}
if (send_recv_type == CLIENT_SERVER_SEND_RECV_TAG) {
//if (send_recv_type == CLIENT_SERVER_SEND_RECV_STREAM) { //-------MODIFIED------
// ucp_params.features = UCP_FEATURE_STREAM;
//}
if (send_recv_type == CLIENT_SERVER_SEND_RECV_TAG) { ///---------elseif en tag-----
ucp_params.features = UCP_FEATURE_TAG;
} else {
ucp_params.features = UCP_FEATURE_AM;
Expand Down Expand Up @@ -1079,7 +1080,7 @@ int main(int argc, char **argv)
ucp_context_h ucp_context;
ucp_worker_h ucp_worker;

printf("Hello world! prueba del fork prueba #11\n");
printf("Hello world! prueba del fork prueba #13\n");

ret = parse_cmd(argc, argv, &server_addr, &listen_addr, &send_recv_type);
if (ret != 0) {
Expand Down

0 comments on commit 98a24d2

Please sign in to comment.