Skip to content

Commit

Permalink
Merge pull request #6439 from rhc54/topic/xcnt
Browse files Browse the repository at this point in the history
Fix cross-mpirun connect/accept operations
  • Loading branch information
rhc54 authored Feb 27, 2019
2 parents 29fa66c + 60961ce commit ccb59eb
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 10 deletions.
5 changes: 3 additions & 2 deletions orte/mca/state/base/state_base_fns.c
Original file line number Diff line number Diff line change
Expand Up @@ -922,8 +922,9 @@ void orte_state_base_check_all_complete(int fd, short args, void *cbdata)
one_still_alive = false;
j = opal_hash_table_get_first_key_uint32(orte_job_data, &u32, (void **)&job, &nptr);
while (OPAL_SUCCESS == j) {
/* skip the daemon job */
if (job->jobid == ORTE_PROC_MY_NAME->jobid) {
/* skip the daemon job and all jobs from other families */
if (job->jobid == ORTE_PROC_MY_NAME->jobid ||
ORTE_JOB_FAMILY(job->jobid) != ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid)) {
goto next;
}
/* if this is the job we are checking AND it normally terminated,
Expand Down
108 changes: 106 additions & 2 deletions orte/orted/pmix/pmix_server_dyn.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rmaps/base/base.h"
#include "orte/mca/rml/base/rml_contact.h"
#include "orte/mca/state/state.h"
#include "orte/util/name_fns.h"
#include "orte/util/show_help.h"
Expand Down Expand Up @@ -539,7 +540,14 @@ static void _cnlk(int status, opal_list_t *data, void *cbdata)
int rc, cnt;
opal_pmix_pdata_t *pdat;
orte_job_t *jdata;
opal_buffer_t buf;
orte_node_t *node;
orte_proc_t *proc;
opal_buffer_t buf, bucket;
opal_byte_object_t *bo;
orte_process_name_t dmn, pname;
char *uri;
opal_value_t val;
opal_list_t nodes;

ORTE_ACQUIRE_OBJECT(cd);

Expand All @@ -556,6 +564,7 @@ static void _cnlk(int status, opal_list_t *data, void *cbdata)
pdat = (opal_pmix_pdata_t*)opal_list_get_first(data);
if (OPAL_BYTE_OBJECT != pdat->value.type) {
rc = ORTE_ERR_BAD_PARAM;
ORTE_ERROR_LOG(rc);
goto release;
}
/* the data will consist of a packed buffer with the job data in it */
Expand All @@ -565,15 +574,107 @@ static void _cnlk(int status, opal_list_t *data, void *cbdata)
pdat->value.data.bo.size = 0;
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &jdata, &cnt, ORTE_JOB))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
goto release;
}

/* unpack the byte object containing the daemon uri's */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &bo, &cnt, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
goto release;
}
/* load it into a buffer */
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
opal_dss.load(&bucket, bo->bytes, bo->size);
bo->bytes = NULL;
free(bo);
/* prep a list to save the nodes */
OBJ_CONSTRUCT(&nodes, opal_list_t);
/* unpack and store the URI's */
cnt = 1;
while (OPAL_SUCCESS == (rc = opal_dss.unpack(&bucket, &uri, &cnt, OPAL_STRING))) {
rc = orte_rml_base_parse_uris(uri, &dmn, NULL);
if (ORTE_SUCCESS != rc) {
OBJ_DESTRUCT(&buf);
OBJ_DESTRUCT(&bucket);
goto release;
}
/* save a node object for this daemon */
node = OBJ_NEW(orte_node_t);
node->daemon = OBJ_NEW(orte_proc_t);
memcpy(&node->daemon->name, &dmn, sizeof(orte_process_name_t));
opal_list_append(&nodes, &node->super);
/* register the URI */
OBJ_CONSTRUCT(&val, opal_value_t);
val.key = OPAL_PMIX_PROC_URI;
val.type = OPAL_STRING;
val.data.string = uri;
if (OPAL_SUCCESS != (rc = opal_pmix.store_local(&dmn, &val))) {
ORTE_ERROR_LOG(rc);
val.key = NULL;
val.data.string = NULL;
OBJ_DESTRUCT(&val);
OBJ_DESTRUCT(&buf);
OBJ_DESTRUCT(&bucket);
goto release;
}
val.key = NULL;
val.data.string = NULL;
OBJ_DESTRUCT(&val);
cnt = 1;
}
OBJ_DESTRUCT(&bucket);

/* unpack the proc-to-daemon map */
cnt=1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(&buf, &bo, &cnt, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
goto release;
}
/* load it into a buffer */
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
opal_dss.load(&bucket, bo->bytes, bo->size);
bo->bytes = NULL;
free(bo);
/* unpack and store the map */
cnt = 1;
while (OPAL_SUCCESS == (rc = opal_dss.unpack(&bucket, &pname, &cnt, ORTE_NAME))) {
/* get the name of the daemon hosting it */
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&bucket, &dmn, &cnt, ORTE_NAME))) {
OBJ_DESTRUCT(&buf);
OBJ_DESTRUCT(&bucket);
goto release;
}
/* create the proc object */
proc = OBJ_NEW(orte_proc_t);
memcpy(&proc->name, &pname, sizeof(orte_process_name_t));
opal_pointer_array_set_item(jdata->procs, pname.vpid, proc);
/* find the daemon */
OPAL_LIST_FOREACH(node, &nodes, orte_node_t) {
if (node->daemon->name.vpid == dmn.vpid) {
OBJ_RETAIN(node);
proc->node = node;
break;
}
}
}
OBJ_DESTRUCT(&bucket);
OPAL_LIST_DESTRUCT(&nodes);
OBJ_DESTRUCT(&buf);

/* register the nspace */
if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata, true))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(jdata);
goto release;
}
OBJ_RELEASE(jdata); // no reason to keep this around

/* save the job object so we don't endlessly cycle */
opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, jdata);

/* restart the cnct processor */
ORTE_PMIX_OPERATION(cd->procs, cd->info, _cnct, cd->cbfunc, cd->cbdata);
Expand Down Expand Up @@ -619,6 +720,7 @@ static void _cnct(int sd, short args, void *cbdata)
* out about it, and all we can do is return an error */
if (orte_pmix_server_globals.server.jobid == ORTE_PROC_MY_HNP->jobid &&
orte_pmix_server_globals.server.vpid == ORTE_PROC_MY_HNP->vpid) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED);
rc = ORTE_ERR_NOT_SUPPORTED;
goto release;
}
Expand All @@ -634,6 +736,7 @@ static void _cnct(int sd, short args, void *cbdata)
kv->data.uint32 = geteuid();
opal_list_append(cd->info, &kv->super);
if (ORTE_SUCCESS != (rc = pmix_server_lookup_fn(&nm->name, keys, cd->info, _cnlk, cd))) {
ORTE_ERROR_LOG(rc);
opal_argv_free(keys);
goto release;
}
Expand All @@ -647,6 +750,7 @@ static void _cnct(int sd, short args, void *cbdata)
if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_NSPACE_REGISTERED, NULL, OPAL_BOOL)) {
/* it hasn't been registered yet, so register it now */
if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata, true))) {
ORTE_ERROR_LOG(rc);
goto release;
}
}
Expand Down
4 changes: 3 additions & 1 deletion orte/orted/pmix/pmix_server_fence.c
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ static void dmodex_req(int sd, short args, void *cbdata)
rc = ORTE_ERR_NOT_FOUND;
goto callback;
}

/* point the request to the daemon that is hosting the
* target process */
req->proxy.vpid = dmn->name.vpid;
Expand All @@ -240,7 +241,8 @@ static void dmodex_req(int sd, short args, void *cbdata)

/* if we are the host daemon, then this is a local request, so
* just wait for the data to come in */
if (ORTE_PROC_MY_NAME->vpid == dmn->name.vpid) {
if (ORTE_PROC_MY_NAME->jobid == dmn->name.jobid &&
ORTE_PROC_MY_NAME->vpid == dmn->name.vpid) {
return;
}

Expand Down
44 changes: 39 additions & 5 deletions orte/orted/pmix/pmix_server_register_fns.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* All rights reserved.
* Copyright (c) 2009-2018 Cisco Systems, Inc. All rights reserved
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2013-2018 Intel, Inc. All rights reserved.
* Copyright (c) 2013-2019 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Mellanox Technologies, Inc.
* All rights reserved.
* Copyright (c) 2014-2016 Research Organization for Information Science
Expand Down Expand Up @@ -71,6 +71,9 @@ int orte_pmix_server_register_nspace(orte_job_t *jdata, bool force)
gid_t gid;
opal_list_t *cache;
hwloc_obj_t machine;
opal_buffer_t buf, bucket;
opal_byte_object_t bo, *boptr;
orte_proc_t *proc;

opal_output_verbose(2, orte_pmix_server_globals.output,
"%s register nspace for %s",
Expand Down Expand Up @@ -494,21 +497,52 @@ int orte_pmix_server_register_nspace(orte_job_t *jdata, bool force)
jdata->num_local_procs,
info, NULL, NULL);
OPAL_LIST_RELEASE(info);
if (OPAL_SUCCESS != rc) {
return rc;
}

/* if the user has connected us to an external server, then we must
* assume there is going to be some cross-mpirun exchange, and so
/* if I am the HNP and this job is a member of my family, then we must
* assume there could be some cross-mpirun exchange, and so
* we protect against that situation by publishing the job info
* for this job - this allows any subsequent "connect" to retrieve
* the job info */
if (NULL != orte_data_server_uri) {
opal_buffer_t buf;

if (ORTE_PROC_IS_HNP && ORTE_JOB_FAMILY(ORTE_PROC_MY_NAME->jobid) == ORTE_JOB_FAMILY(jdata->jobid)) {
/* pack the job - note that this doesn't include the procs
* or their locations */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &jdata, 1, ORTE_JOB))) {
ORTE_ERROR_LOG(rc);
OBJ_DESTRUCT(&buf);
return rc;
}

/* pack the hostname, daemon vpid and contact URI for each involved node */
map = jdata->map;
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
for (i=0; i < map->nodes->size; i++) {
if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(map->nodes, i))) {
continue;
}
opal_dss.pack(&bucket, &node->daemon->rml_uri, 1, OPAL_STRING);
}
opal_dss.unload(&bucket, (void**)&bo.bytes, &bo.size);
boptr = &bo;
opal_dss.pack(&buf, &boptr, 1, OPAL_BYTE_OBJECT);

/* pack the proc name and daemon vpid for each proc */
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
for (i=0; i < jdata->procs->size; i++) {
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, i))) {
continue;
}
opal_dss.pack(&bucket, &proc->name, 1, ORTE_NAME);
opal_dss.pack(&bucket, &proc->node->daemon->name, 1, ORTE_NAME);
}
opal_dss.unload(&bucket, (void**)&bo.bytes, &bo.size);
boptr = &bo;
opal_dss.pack(&buf, &boptr, 1, OPAL_BYTE_OBJECT);

info = OBJ_NEW(opal_list_t);
/* create a key-value with the key being the string jobid
* and the value being the byte object */
Expand Down

0 comments on commit ccb59eb

Please sign in to comment.