Skip to content

Commit

Permalink
Merge pull request #6 from tkordenbrock/topic/dynamic.add_procs.with.…
Browse files Browse the repository at this point in the history
…PML.flags

portals4: add support for dynamic add_procs() to all Portals4 components
  • Loading branch information
hjelmn committed Sep 25, 2015
2 parents 54a4061 + 3e63a34 commit 0ca4bb4
Show file tree
Hide file tree
Showing 15 changed files with 483 additions and 166 deletions.
8 changes: 3 additions & 5 deletions ompi/mca/coll/portals4/coll_portals4.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include "ompi/communicator/communicator.h"
#include "ompi/mca/coll/base/base.h"

#include "ompi/mca/mtl/portals4/mtl_portals4.h"

BEGIN_C_DECLS

#define COLL_PORTALS4_NO_OP ((ptl_op_t)-1)
Expand Down Expand Up @@ -178,11 +180,7 @@ ompi_coll_portals4_iallreduce_intra_fini(struct ompi_coll_portals4_request_t *re
static inline ptl_process_t
ompi_coll_portals4_get_peer(struct ompi_communicator_t *comm, int rank)
{
ompi_proc_t *proc = ompi_comm_peer_lookup(comm, rank);
if (proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PORTALS4] == NULL) {
printf("ompi_coll_portals4_get_peer failure\n");
}
return *((ptl_process_t*) proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PORTALS4]);
return ompi_mtl_portals4_get_peer(comm, rank);
}


Expand Down
282 changes: 213 additions & 69 deletions ompi/mca/mtl/portals4/mtl_portals4.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <portals4.h>

#include "ompi/communicator/communicator.h"
#include "ompi/proc/proc.h"
#include "ompi/mca/mtl/mtl.h"
#include "opal/class/opal_list.h"
Expand Down Expand Up @@ -241,45 +242,26 @@ portals4_init_interface(void)
return OMPI_ERROR;
}

int
ompi_mtl_portals4_add_procs(struct mca_mtl_base_module_t *mtl,
size_t nprocs,
struct ompi_proc_t** procs)
static int
create_maptable(size_t nprocs,
ompi_proc_t **procs)
{
int ret, me;
int ret;
size_t i;
bool new_found = false;
ptl_process_t *maptable;

if (ompi_mtl_portals4.use_logical) {
maptable = malloc(sizeof(ptl_process_t) * nprocs);
if (NULL == maptable) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: malloc failed\n",
__FILE__, __LINE__);
return OMPI_ERR_OUT_OF_RESOURCE;
}
maptable = malloc(sizeof(ptl_process_t) * nprocs);
if (NULL == maptable) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: malloc failed\n",
__FILE__, __LINE__);
return OMPI_ERR_OUT_OF_RESOURCE;
}

/* Get the list of ptl_process_id_t from the runtime and copy into structure */
for (i = 0 ; i < nprocs ; ++i) {
for (i=0;i<nprocs;i++) {
ptl_process_t *modex_id;
size_t size;

if( procs[i] == ompi_proc_local_proc ) {
me = i;
}

if (procs[i]->super.proc_arch != ompi_proc_local()->super.proc_arch) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"Portals 4 MTL does not support heterogeneous operations.");
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"Proc %s architecture %x, mine %x.",
OMPI_NAME_PRINT(&procs[i]->super.proc_name),
procs[i]->super.proc_arch, ompi_proc_local()->super.proc_arch);
return OMPI_ERR_NOT_SUPPORTED;
}

OPAL_MODEX_RECV(ret, &mca_mtl_portals4_component.mtl_version,
&procs[i]->super.proc_name, (uint8_t**)&modex_id, &size);
if (OMPI_SUCCESS != ret) {
Expand All @@ -294,40 +276,159 @@ ompi_mtl_portals4_add_procs(struct mca_mtl_base_module_t *mtl,
return OMPI_ERR_BAD_PARAM;
}

if (NULL == procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PORTALS4]) {
ptl_process_t *peer_id;
peer_id = malloc(sizeof(ptl_process_t));
if (NULL == peer_id) {
maptable[i].phys.pid = modex_id->phys.pid;
maptable[i].phys.nid = modex_id->phys.nid;
opal_output_verbose(50, ompi_mtl_base_framework.framework_output,
"logical: global rank=%d pid=%d nid=%d\n",
(int)i, maptable[i].phys.pid, maptable[i].phys.nid);
}

ret = PtlSetMap(ompi_mtl_portals4.ni_h, nprocs, maptable);
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: logical mapping failed: %d\n",
__FILE__, __LINE__, ret);
return ret;
}
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"logical mapping OK\n");

free(maptable);

return OMPI_SUCCESS;
}

static int
create_endpoint(ompi_proc_t *proc)
{
ptl_process_t *endpoint;

endpoint = malloc(sizeof(ptl_process_t));
if (NULL == endpoint) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: malloc failed: %s\n",
__FILE__, __LINE__, strerror(errno));
return OMPI_ERR_OUT_OF_RESOURCE;
} else {
if (ompi_mtl_portals4.use_logical) {
endpoint->rank = proc->super.proc_name.vpid;
} else {
int ret;
ptl_process_t *modex_id;
size_t size;

OPAL_MODEX_RECV(ret, &mca_mtl_portals4_component.mtl_version,
&proc->super.proc_name, (uint8_t**)&modex_id, &size);
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: ompi_modex_recv failed: %d\n",
__FILE__, __LINE__, ret);
return ret;
} else if (sizeof(ptl_process_t) != size) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: malloc failed: %d\n",
"%s:%d: ompi_modex_recv failed (size mismatch): %d\n",
__FILE__, __LINE__, ret);
return OMPI_ERR_OUT_OF_RESOURCE;
return OMPI_ERR_BAD_PARAM;
}
if (ompi_mtl_portals4.use_logical) {
peer_id->rank = i;
maptable[i].phys.pid = modex_id->phys.pid;
maptable[i].phys.nid = modex_id->phys.nid;
opal_output_verbose(50, ompi_mtl_base_framework.framework_output,
"logical: global rank=%d pid=%d nid=%d\n",
(int)i, maptable[i].phys.pid, maptable[i].phys.nid);
} else {
*peer_id = *modex_id;

*endpoint = *modex_id;
}
}

proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PORTALS4] = endpoint;

return OMPI_SUCCESS;
}

ompi_proc_t *
ompi_mtl_portals4_get_proc_group(struct ompi_group_t *group, int rank)
{
int ret;

ompi_proc_t *proc = ompi_group_peer_lookup (group, rank);
if (NULL == proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PORTALS4]) {
ret = create_endpoint(proc);
if (OMPI_SUCCESS != ret) {
return NULL;
}
#if 0
} else {
/*
* sanity check
*/
int ret;
ptl_process_t *modex_id;
size_t size;

OPAL_MODEX_RECV(ret, &mca_mtl_portals4_component.mtl_version,
&proc->super.proc_name, (uint8_t**)&modex_id, &size);

ptl_process_t *peer = (ptl_process_t*) proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PORTALS4];
if (ompi_mtl_portals4.use_logical) {
if ((size_t)peer->rank != proc->super.proc_name.vpid) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: existing peer and rank don't match\n",
__FILE__, __LINE__);
return OMPI_ERROR;
}
}
else if (peer->phys.nid != modex_id->phys.nid ||
peer->phys.pid != modex_id->phys.pid) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: existing peer and modex peer don't match\n",
__FILE__, __LINE__);
return OMPI_ERROR;
}
#endif
}

procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PORTALS4] = peer_id;
return proc;
}

static int
add_endpoints(size_t nprocs,
ompi_proc_t **procs)
{
int ret;
size_t i;

new_found = true;
/* Get the list of ptl_process_id_t from the runtime and copy into structure */
for (i = 0 ; i < nprocs ; ++i) {
if (procs[i]->super.proc_arch != ompi_proc_local()->super.proc_arch) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"Portals 4 MTL does not support heterogeneous operations.");
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"Proc %s architecture %x, mine %x.",
OMPI_NAME_PRINT(&procs[i]->super.proc_name),
procs[i]->super.proc_arch, ompi_proc_local()->super.proc_arch);
return OMPI_ERR_NOT_SUPPORTED;
}

if (NULL == procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PORTALS4]) {
ret = create_endpoint(procs[i]);
if (OMPI_SUCCESS != ret) {
return ret;
}
#if 0
} else {
/*
* sanity check
*/
int ret;
ptl_process_t *modex_id;
size_t size;

OPAL_MODEX_RECV(ret, &mca_mtl_portals4_component.mtl_version,
&procs[i]->super.proc_name, (uint8_t**)&modex_id, &size);

ptl_process_t *proc = (ptl_process_t*) procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PORTALS4];
if (ompi_mtl_portals4.use_logical) {
if ((size_t)proc->rank != i) {
if ((size_t)proc->rank != procs[i]->super.proc_name.vpid) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: existing peer and rank don't match\n",
__FILE__, __LINE__);
return OMPI_ERROR;
}
maptable[i].phys.pid = modex_id->phys.pid;
maptable[i].phys.nid = modex_id->phys.nid;
}
else if (proc->phys.nid != modex_id->phys.nid ||
proc->phys.pid != modex_id->phys.pid) {
Expand All @@ -336,45 +437,82 @@ ompi_mtl_portals4_add_procs(struct mca_mtl_base_module_t *mtl,
__FILE__, __LINE__);
return OMPI_ERROR;
}
#endif
}
}

if (ompi_mtl_portals4.use_logical) {
ret = PtlSetMap(ompi_mtl_portals4.ni_h, nprocs, maptable);
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: logical mapping failed: %d\n",
__FILE__, __LINE__, ret);
return ret;
}
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"logical mapping OK\n");
free(maptable);
}
return OMPI_SUCCESS;
}

portals4_init_interface();
#define NEED_ALL_PROCS (ompi_mtl_portals4.use_logical || ompi_mtl_portals4.use_flowctl)

/* activate progress callback */
ret = opal_progress_register(ompi_mtl_portals4_progress);
int
ompi_mtl_portals4_add_procs(struct mca_mtl_base_module_t *mtl,
size_t nprocs,
struct ompi_proc_t** procs)
{
int ret;

/*
* The PML handed us a list of procs that need Portals4
* peer info. Complete those procs here.
*/
ret = add_endpoints(nprocs,
procs);
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: opal_progress_register failed: %d\n",
"%s:%d: add_endpoints failed: %d\n",
__FILE__, __LINE__, ret);
return ret;
}

if (1 == ompi_mtl_portals4.need_init) {
if (1 == ompi_mtl_portals4.use_logical) {
ret = create_maptable(nprocs, procs);
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: ompi_mtl_portals4_add_procs::create_maptable() failed: %d\n",
__FILE__, __LINE__, ret);
return ret;
}
}

/*
* This is the first time through here. Initialize
* Portals4 and register the progress thread.
*/
portals4_init_interface();

/* activate progress callback */
ret = opal_progress_register(ompi_mtl_portals4_progress);
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: opal_progress_register failed: %d\n",
__FILE__, __LINE__, ret);
return ret;
}

#if OMPI_MTL_PORTALS4_FLOW_CONTROL
if (new_found) {
ret = ompi_mtl_portals4_flowctl_add_procs(me, nprocs, procs);
opal_output_verbose(50, ompi_mtl_base_framework.framework_output,
"add_procs() - me=%d\n", ompi_proc_local_proc->super.proc_name.vpid);

opal_output_verbose(50, ompi_mtl_base_framework.framework_output,
"add_procs() - adding flowctl procs\n");

ret = ompi_mtl_portals4_flowctl_add_procs(ompi_proc_local_proc->super.proc_name.vpid,
nprocs,
procs);
if (OMPI_SUCCESS != ret) {
opal_output_verbose(1, ompi_mtl_base_framework.framework_output,
"%s:%d: flowctl_add_procs failed: %d\n",
__FILE__, __LINE__, ret);
return ret;
}
}
#endif

ompi_mtl_portals4.need_init = 0;
}

return OMPI_SUCCESS;
}

Expand All @@ -386,13 +524,19 @@ ompi_mtl_portals4_del_procs(struct mca_mtl_base_module_t *mtl,
{
size_t i;

opal_output_verbose(50, ompi_mtl_base_framework.framework_output,
"del_procs() - enter\n");

for (i = 0 ; i < nprocs ; ++i) {
if (NULL != procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PORTALS4]) {
free(procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PORTALS4]);
procs[i]->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PORTALS4] = NULL;
}
}

opal_output_verbose(50, ompi_mtl_base_framework.framework_output,
"del_procs() - exit\n");

return OMPI_SUCCESS;
}

Expand Down
Loading

0 comments on commit 0ca4bb4

Please sign in to comment.