Skip to content

Commit

Permalink
Merge pull request open-mpi#10 from anandhis/topic/rmlofi
Browse files Browse the repository at this point in the history
Addressed pull-request comments from jfsquyres->
  • Loading branch information
rhc54 authored Jul 14, 2016
2 parents d3f028c + 7f9c701 commit 591482a
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 58 deletions.
28 changes: 2 additions & 26 deletions orte/mca/ess/base/ess_base_std_orted.c
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ int orte_ess_base_orted_setup(char **hosts)
jdata->num_reported = 1;


//[A]

/* setup the PMIx framework - ensure it skips all non-PMIx components,
* but do not override anything we were given */
opal_setenv("OMPI_MCA_pmix", "^s1,s2,cray,isolated", false, &environ);
Expand All @@ -383,7 +383,7 @@ int orte_ess_base_orted_setup(char **hosts)
}
/* set the event base */
opal_pmix_base_set_evbase(orte_event_base);
//[A]


/* Setup the communication infrastructure */
if (ORTE_SUCCESS != (ret = mca_base_framework_open(&orte_oob_base_framework, 0))) {
Expand Down Expand Up @@ -524,30 +524,6 @@ int orte_ess_base_orted_setup(char **hosts)
}
}

/* setup the PMIx framework - ensure it skips all non-PMIx components,
* but do not override anything we were given */
//[A]
//[A] opal_setenv("OMPI_MCA_pmix", "^s1,s2,cray,isolated", false, &environ);
//[A] if (OPAL_SUCCESS != (ret = mca_base_framework_open(&opal_pmix_base_framework, 0))) {
//[A] ORTE_ERROR_LOG(ret);
//[A] error = "orte_pmix_base_open";
//[A] goto error;
//[A] }

//[A] if (ORTE_SUCCESS != (ret = opal_pmix_base_select())) {
//[A]ORTE_ERROR_LOG(ret);
//[A] error = "opal_pmix_base_select";
//[A] goto error;
//[A] }
/* set the event base */
//[A] opal_pmix_base_set_evbase(orte_event_base);
/* setup the PMIx server */
//[A] if (ORTE_SUCCESS != (ret = pmix_server_init())) {
//[A] ORTE_ERROR_LOG(ret);
//[A] error = "pmix server init";
//[A] goto error;
//[A] }

/* setup the routed info - the selected routed component
* will know what to do.
*/
Expand Down
5 changes: 1 addition & 4 deletions orte/mca/rml/base/rml_base_stubs.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,7 @@ void orte_rml_API_finalize(void)
active->module->finalize();
}
}
//[A]
//3/25 -> moving here to avoid multiple calls from each plugin to this base fn
orte_rml_base_comm_stop();

orte_rml_base_comm_stop();
}

/** Get contact information for local process */
Expand Down
2 changes: 1 addition & 1 deletion orte/mca/rml/ofi/rml_ofi.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

#include "rml_ofi_request.h"

//[A]

/** the maximum open conduit - assuming system will have no more than 20 transports*/
#define MAX_CONDUIT 20

Expand Down
57 changes: 36 additions & 21 deletions orte/mca/rml/ofi/rml_ofi_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ orte_rml_component_t mca_rml_ofi_component = {

orte_rml_ofi_module_t orte_rml_ofi = {
{
.enable_comm = orte_rml_ofi_enable_comm, // [A] should we be calling this ?
.enable_comm = orte_rml_ofi_enable_comm,
.finalize = orte_rml_ofi_fini,
.query_transports = orte_rml_ofi_query_transports,
.send_transport_nb = orte_rml_ofi_send_transport_nb,
Expand Down Expand Up @@ -348,24 +348,21 @@ void print_transports_query()
"\n calling the orte_rml_ofi_query_transports() ");
if( ORTE_SUCCESS == orte_rml_ofi_query_transports(&providers))
{
/*opal_output_verbose(10,orte_rml_base_framework.framework_output,
"\n query_transports() completed, printing details\n"); */
opal_output_verbose(20,orte_rml_base_framework.framework_output,
"\n query_transports() completed, printing details\n");
while (providers)
{
//get the first opal_list_t;
opal_list_t *prov;
ret = opal_value_unload(providers,(void **)&prov,OPAL_PTR);
if (ret == OPAL_SUCCESS) {
/* opal_output_verbose(1,orte_rml_base_framework.framework_output,
"\n %s:%d opal_value_unload() succeeded, opal_list* prov = %x",
__FILE__,__LINE__,prov); */
if( orte_get_attribute( prov, ORTE_CONDUIT_ID, (void **)&prov_num,OPAL_UINT8)) {
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"\n Provider conduit_id : %d",*prov_num);
}
if( orte_get_attribute( prov, ORTE_PROTOCOL, (void **)&protocol_ptr,OPAL_UINT32)) {
opal_output_verbose(10,orte_rml_base_framework.framework_output,
"\n Protocol : %d", *protocol_ptr); //fi_tostr(protocol_ptr,FI_TYPE_PROTOCOL));
"\n Protocol : %d", *protocol_ptr);
}
if( orte_get_attribute( prov, ORTE_PROV_NAME, (void **)&prov_name ,OPAL_STRING)) {
opal_output_verbose(10,orte_rml_base_framework.framework_output,
Expand All @@ -380,9 +377,6 @@ void print_transports_query()
__FILE__,__LINE__,prov);
}
providers = (opal_value_t *)providers->super.opal_list_next;
/* opal_output_verbose(1,orte_rml_base_framework.framework_output,"\n %s:%d
* - Moving on to next provider provders=%x",__FILE__,__LINE__,providers);
*/
}
} else {
opal_output_verbose(10,orte_rml_base_framework.framework_output,
Expand Down Expand Up @@ -426,7 +420,7 @@ orte_rml_ofi_progress(ofi_transport_conduit_t* conduit)
conduit->conduit_id, wc.flags);
count++;
// check the flags to see if this is a send-completion or receive
if ( (wc.flags & FI_SEND) || ((wc.flags & FI_SEND) && (wc.flags & FI_TRANSMIT_COMPLETE)) )
if ( wc.flags & FI_SEND )
{
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s Send completion received on conduitid %d",
Expand All @@ -439,11 +433,9 @@ orte_rml_ofi_progress(ofi_transport_conduit_t* conduit)
ret = orte_rml_ofi_send_callback(&wc, ofi_req);
if (ORTE_SUCCESS != ret) {
opal_output(orte_rml_base_framework.framework_output,
"Error returned by OFI request event callback: %zd",
"Error returned by OFI send callback handler when a send completion was received on conduit: %zd",
ret);
abort();
}

}
}
} else if ( (wc.flags & FI_RECV) && (wc.flags & FI_MULTI_RECV) ) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
Expand All @@ -458,14 +450,23 @@ orte_rml_ofi_progress(ofi_transport_conduit_t* conduit)
0,&(conduit->rx_ctx1));
// call the receive message handler that will call the rml_base
ret = orte_rml_ofi_recv_handler(&wc, conduit->conduit_id);

if (ORTE_SUCCESS != ret) {
opal_output(orte_rml_base_framework.framework_output,
"Error returned by OFI Recv handler when handling the received message on the conduit: %zd",
ret);
}
} else if ( wc.flags & FI_RECV ) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s Received message on conduitid %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
conduit->conduit_id);
// call the receive message handler that will call the rml_base
ret = orte_rml_ofi_recv_handler(&wc, conduit->conduit_id);
if (ORTE_SUCCESS != ret) {
opal_output(orte_rml_base_framework.framework_output,
"Error returned by OFI Recv handler when handling the received message on the conduit: %zd",
ret);
}
} else if ( wc.flags & FI_MULTI_RECV ) {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s Received buffer overrun message on conduitid %d - need to repost",
Expand All @@ -477,6 +478,11 @@ orte_rml_ofi_progress(ofi_transport_conduit_t* conduit)
orte_rml_ofi.ofi_conduits[conduit->conduit_id].rxbuf_size,
fi_mr_desc(orte_rml_ofi.ofi_conduits[conduit->conduit_id].mr_multi_recv),
0,&(conduit->rx_ctx1));
if (ORTE_SUCCESS != ret) {
opal_output(orte_rml_base_framework.framework_output,
"Error returned by OFI when reposting buffer on the conduit: %zd",
ret);
}
}else {
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"CQ has unhandled completion event with FLAG wc.flags = 0x%x",
Expand All @@ -498,7 +504,6 @@ orte_rml_ofi_progress(ofi_transport_conduit_t* conduit)
if (0 > ret) {
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"Error returned from fi_cq_readerr: %zd", ret);
abort();
}
assert(error.op_context);
/* get the context from wc and call the error handler */
Expand All @@ -509,9 +514,9 @@ orte_rml_ofi_progress(ofi_transport_conduit_t* conduit)
opal_output_verbose(1,orte_rml_base_framework.framework_output,
"Error returned by request error callback: %zd",
ret);
abort();
}
} else {
break;
} else if (ret == -FI_EAGAIN){
/**
* The CQ is empty. Return.
*/
Expand All @@ -520,7 +525,14 @@ orte_rml_ofi_progress(ofi_transport_conduit_t* conduit)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
conduit->conduit_id );
break;
}
} else {
opal_output_verbose(1, orte_rml_base_framework.framework_output,
"%s cq_read for conduitid %d returned error 0x%x <%s>",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
conduit->conduit_id, ret,
fi_strerror((int) -ret) );
break;
}
}
return count;
}
Expand Down Expand Up @@ -616,7 +628,7 @@ rml_ofi_component_init(int* priority)
* Specify the version of OFI is coded to, the provider will select struct
* layouts that are compatible with this version.
*/
fi_version = FI_VERSION(1, 0);
fi_version = FI_VERSION(1, 1);

/**
* fi_getinfo: returns information about fabric services for reaching a
Expand Down Expand Up @@ -866,6 +878,9 @@ rml_ofi_component_init(int* priority)
"%s:%d ERROR: Cannot register address, Unhandled addr_format - %d, ep_name - %s ",
__FILE__,__LINE__,orte_rml_ofi.ofi_conduits[cur_conduit].fabric_info->addr_format,
orte_rml_ofi.ofi_conduits[cur_conduit].ep_name);
free_conduit_resources(cur_conduit);
/*abort this current transport, but check if next transport can be opened*/
continue;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion orte/mca/rml/ofi/rml_ofi_send.c
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ int orte_rml_ofi_recv_handler(struct fi_cq_data_entry *wc, uint8_t conduit_id)

}
}

return ORTE_SUCCESS;
}


Expand Down
6 changes: 1 addition & 5 deletions orte/mca/rml/oob/rml_oob_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,7 @@ orte_rml_oob_fini(void)
}
OBJ_DESTRUCT(&orte_rml_oob_module.exceptions);

/* clear the base receive */
//[A]
/*3/25 -> this needs to be moved to stub?
orte_rml_base_comm_stop();
*/
/* the rml_base_stub takes care of clearing the base receive */
}

#if OPAL_ENABLE_FT_CR == 1
Expand Down

0 comments on commit 591482a

Please sign in to comment.