Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topic/a2a inplace5.0 #9493

Merged
merged 5 commits into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 83 additions & 59 deletions ompi/mca/coll/base/coll_base_alltoall.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2017 The University of Tennessee and The University
* Copyright (c) 2004-2021 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
Expand Down Expand Up @@ -35,88 +35,112 @@
#include "coll_base_topo.h"
#include "coll_base_util.h"

/* MPI_IN_PLACE all to all algorithm. TODO: implement a better one. */
/*
* We want to minimize the amount of temporary memory needed while allowing as many ranks
* to exchange data simultaneously. We use a variation of the ring algorithm, where in a
* single step a process echange the data with both neighbors at distance k (on the left
* and the right on a logical ring topology). With this approach we need to pack the data
* for a single of the two neighbors, as we can then use the original buffer (and datatype
* and count) to send the data to the other.
*/
int
mca_coll_base_alltoall_intra_basic_inplace(const void *rbuf, int rcount,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
int i, j, size, rank, err = MPI_SUCCESS, line;
ptrdiff_t ext, gap = 0;
int i, size, rank, left, right, err = MPI_SUCCESS, line;
ptrdiff_t extent;
ompi_request_t *req;
char *allocated_buffer = NULL, *tmp_buffer;
size_t max_size;
char *tmp_buffer;
size_t packed_size = 0, max_size;
opal_convertor_t convertor;

/* Initialize. */

size = ompi_comm_size(comm);
rank = ompi_comm_rank(comm);

/* If only one process, we're done. */
if (1 == size) {
ompi_datatype_type_size(rdtype, &max_size);

/* Easy way out */
if ((1 == size) || (0 == rcount) || (0 == max_size) ) {
return MPI_SUCCESS;
}

/* Find the largest receive amount */
ompi_datatype_type_extent (rdtype, &ext);
max_size = opal_datatype_span(&rdtype->super, rcount, &gap);
/* Find the largest amount of packed send/recv data among all peers where
* we need to pack before the send.
*/
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
for (i = 1 ; i <= (size >> 1) ; ++i) {
right = (rank + i) % size;
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, right);

if( OPAL_UNLIKELY(opal_local_arch != ompi_proc->super.proc_convertor->master->remote_arch)) {
packed_size = opal_datatype_compute_remote_size(&rdtype->super,
ompi_proc->super.proc_convertor->master->remote_sizes);
max_size = packed_size > max_size ? packed_size : max_size;
}
}
#endif /* OPAL_ENABLE_HETEROGENEOUS_SUPPORT */
max_size *= rcount;

/* Initiate all send/recv to/from others. */
ompi_datatype_type_extent(rdtype, &extent);

/* Allocate a temporary buffer */
allocated_buffer = calloc (max_size, 1);
if( NULL == allocated_buffer) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto error_hndl; }
tmp_buffer = allocated_buffer - gap;
max_size = ext * rcount;

/* in-place alltoall slow algorithm (but works) */
for (i = 0 ; i < size ; ++i) {
for (j = i+1 ; j < size ; ++j) {
if (i == rank) {
/* Copy the data into the temporary buffer */
err = ompi_datatype_copy_content_same_ddt (rdtype, rcount, tmp_buffer,
(char *) rbuf + j * max_size);
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }

/* Exchange data with the peer */
err = MCA_PML_CALL(irecv ((char *) rbuf + max_size * j, rcount, rdtype,
j, MCA_COLL_BASE_TAG_ALLTOALL, comm, &req));
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }

err = MCA_PML_CALL(send ((char *) tmp_buffer, rcount, rdtype,
j, MCA_COLL_BASE_TAG_ALLTOALL, MCA_PML_BASE_SEND_STANDARD,
comm));
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
} else if (j == rank) {
/* Copy the data into the temporary buffer */
err = ompi_datatype_copy_content_same_ddt (rdtype, rcount, tmp_buffer,
(char *) rbuf + i * max_size);
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }

/* Exchange data with the peer */
err = MCA_PML_CALL(irecv ((char *) rbuf + max_size * i, rcount, rdtype,
i, MCA_COLL_BASE_TAG_ALLTOALL, comm, &req));
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }

err = MCA_PML_CALL(send ((char *) tmp_buffer, rcount, rdtype,
i, MCA_COLL_BASE_TAG_ALLTOALL, MCA_PML_BASE_SEND_STANDARD,
comm));
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
} else {
continue;
}

/* Wait for the requests to complete */
err = ompi_request_wait ( &req, MPI_STATUSES_IGNORE);
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
tmp_buffer = calloc (max_size, 1);
if( NULL == tmp_buffer) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto error_hndl; }

for (i = 1 ; i <= (size >> 1) ; ++i) {
struct iovec iov = {.iov_base = tmp_buffer, .iov_len = max_size};
uint32_t iov_count = 1;

right = (rank + i) % size;
left = (rank + size - i) % size;

ompi_proc_t *right_proc = ompi_comm_peer_lookup(comm, right);
opal_convertor_clone(right_proc->super.proc_convertor, &convertor, 0);
opal_convertor_prepare_for_send(&convertor, &rdtype->super, rcount,
(char *) rbuf + right * extent);
packed_size = max_size;
err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size);
if (1 != err) { goto error_hndl; }

/* Receive data from the right */
err = MCA_PML_CALL(irecv ((char *) rbuf + right * extent, rcount, rdtype,
right, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
if (MPI_SUCCESS != err) { goto error_hndl; }

if( left != right ) {
/* Send data to the left */
err = MCA_PML_CALL(send ((char *) rbuf + left * extent, rcount, rdtype,
left, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
comm));
if (MPI_SUCCESS != err) { goto error_hndl; }

err = ompi_request_wait (&req, MPI_STATUSES_IGNORE);
if (MPI_SUCCESS != err) { goto error_hndl; }

/* Receive data from the left */
err = MCA_PML_CALL(irecv ((char *) rbuf + left * extent, rcount, rdtype,
left, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
if (MPI_SUCCESS != err) { goto error_hndl; }
}

/* Send data to the right */
err = MCA_PML_CALL(send ((char *) tmp_buffer, packed_size, MPI_PACKED,
right, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
comm));
if (MPI_SUCCESS != err) { goto error_hndl; }

err = ompi_request_wait (&req, MPI_STATUSES_IGNORE);
if (MPI_SUCCESS != err) { goto error_hndl; }
}

error_hndl:
/* Free the temporary buffer */
if( NULL != allocated_buffer )
free (allocated_buffer);
if( NULL != tmp_buffer )
free (tmp_buffer);

if( MPI_SUCCESS != err ) {
OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
Expand Down
150 changes: 93 additions & 57 deletions ompi/mca/coll/base/coll_base_alltoallv.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2017 The University of Tennessee and The University
* Copyright (c) 2004-2021 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
Expand Down Expand Up @@ -37,85 +37,121 @@
#include "coll_base_topo.h"
#include "coll_base_util.h"

/*
* We want to minimize the amount of temporary memory needed while allowing as many ranks
* to exchange data simultaneously. We use a variation of the ring algorithm, where in a
* single step a process echange the data with both neighbors at distance k (on the left
* and the right on a logical ring topology). With this approach we need to pack the data
* for a single of the two neighbors, as we can then use the original buffer (and datatype
* and count) to send the data to the other.
*/
int
mca_coll_base_alltoallv_intra_basic_inplace(const void *rbuf, const int *rcounts, const int *rdisps,
struct ompi_datatype_t *rdtype,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
int i, j, size, rank, err=MPI_SUCCESS;
char *allocated_buffer, *tmp_buffer;
size_t max_size;
ptrdiff_t ext, gap = 0;
int i, size, rank, left, right, err = MPI_SUCCESS, line;
ompi_request_t *req;
char *tmp_buffer;
size_t packed_size = 0, max_size;
opal_convertor_t convertor;

/* Initialize. */

size = ompi_comm_size(comm);
rank = ompi_comm_rank(comm);

/* If only one process, we're done. */
if (1 == size) {
ompi_datatype_type_size(rdtype, &max_size);
max_size *= rcounts[rank];

/* Easy way out */
if ((1 == size) || (0 == max_size) ) {
return MPI_SUCCESS;
}
/* Find the largest receive amount */
ompi_datatype_type_extent (rdtype, &ext);
for (i = 0, max_size = 0 ; i < size ; ++i) {
if (i == rank) {
continue;
}
size_t cur_size = opal_datatype_span(&rdtype->super, rcounts[i], &gap);
max_size = cur_size > max_size ? cur_size : max_size;
}
/* The gap will always be the same as we are working on the same datatype */

if (OPAL_UNLIKELY(0 == max_size)) {
return MPI_SUCCESS;
/* Find the largest amount of packed send/recv data among all peers where
* we need to pack before the send.
*/
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
for (i = 1 ; i <= (size >> 1) ; ++i) {
right = (rank + i) % size;
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, right);

if( OPAL_UNLIKELY(opal_local_arch != ompi_proc->super.proc_convertor->master->remote_arch)) {
packed_size = opal_datatype_compute_remote_size(&rdtype->super,
ompi_proc->super.proc_convertor->master->remote_sizes);
packed_size *= rcounts[right];
max_size = packed_size > max_size ? packed_size : max_size;
}
}
#endif /* OPAL_ENABLE_HETEROGENEOUS_SUPPORT */

/* Allocate a temporary buffer */
allocated_buffer = calloc (max_size, 1);
if (NULL == allocated_buffer) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
tmp_buffer = allocated_buffer - gap;

/* Initiate all send/recv to/from others. */
/* in-place alltoallv slow algorithm (but works) */
for (i = 0 ; i < size ; ++i) {
for (j = i+1 ; j < size ; ++j) {
if (i == rank && 0 != rcounts[j]) {
/* Copy the data into the temporary buffer */
err = ompi_datatype_copy_content_same_ddt (rdtype, rcounts[j],
tmp_buffer, (char *) rbuf + rdisps[j] * ext);
if (MPI_SUCCESS != err) { goto error_hndl; }

/* Exchange data with the peer */
err = ompi_coll_base_sendrecv_actual((void *) tmp_buffer, rcounts[j], rdtype,
j, MCA_COLL_BASE_TAG_ALLTOALLV,
(char *)rbuf + rdisps[j] * ext, rcounts[j], rdtype,
j, MCA_COLL_BASE_TAG_ALLTOALLV,
comm, MPI_STATUS_IGNORE);
if (MPI_SUCCESS != err) { goto error_hndl; }
} else if (j == rank && 0 != rcounts[i]) {
/* Copy the data into the temporary buffer */
err = ompi_datatype_copy_content_same_ddt (rdtype, rcounts[i],
tmp_buffer, (char *) rbuf + rdisps[i] * ext);
if (MPI_SUCCESS != err) { goto error_hndl; }

/* Exchange data with the peer */
err = ompi_coll_base_sendrecv_actual((void *) tmp_buffer, rcounts[i], rdtype,
i, MCA_COLL_BASE_TAG_ALLTOALLV,
(char *) rbuf + rdisps[i] * ext, rcounts[i], rdtype,
i, MCA_COLL_BASE_TAG_ALLTOALLV,
comm, MPI_STATUS_IGNORE);
if (MPI_SUCCESS != err) { goto error_hndl; }
}
tmp_buffer = calloc (max_size, 1);
if( NULL == tmp_buffer) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto error_hndl; }

for (i = 1 ; i <= (size >> 1) ; ++i) {
struct iovec iov = {.iov_base = tmp_buffer, .iov_len = max_size};
uint32_t iov_count = 1;

right = (rank + i) % size;
left = (rank + size - i) % size;

if( 0 != rcounts[right] ) { /* nothing to exchange with the peer on the right */
ompi_proc_t *right_proc = ompi_comm_peer_lookup(comm, right);
opal_convertor_clone(right_proc->super.proc_convertor, &convertor, 0);
opal_convertor_prepare_for_send(&convertor, &rdtype->super, rcounts[right],
(char *) rbuf + rdisps[right]);
packed_size = max_size;
err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size);
if (1 != err) { goto error_hndl; }

/* Receive data from the right */
err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[right], rcounts[right], rdtype,
right, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
if (MPI_SUCCESS != err) { goto error_hndl; }
}

if( (left != right) && (0 != rcounts[left]) ) {
/* Send data to the left */
err = MCA_PML_CALL(send ((char *) rbuf + rdisps[left], rcounts[left], rdtype,
left, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
comm));
if (MPI_SUCCESS != err) { goto error_hndl; }

err = ompi_request_wait (&req, MPI_STATUSES_IGNORE);
if (MPI_SUCCESS != err) { goto error_hndl; }

/* Receive data from the left */
err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[left], rcounts[left], rdtype,
left, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
if (MPI_SUCCESS != err) { goto error_hndl; }
}

if( 0 != rcounts[right] ) { /* nothing to exchange with the peer on the right */
/* Send data to the right */
err = MCA_PML_CALL(send ((char *) tmp_buffer, packed_size, MPI_PACKED,
right, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
comm));
if (MPI_SUCCESS != err) { goto error_hndl; }

err = ompi_request_wait (&req, MPI_STATUSES_IGNORE);
if (MPI_SUCCESS != err) { goto error_hndl; }
}
}

error_hndl:
/* Free the temporary buffer */
free (allocated_buffer);
if( NULL != tmp_buffer )
free (tmp_buffer);

if( MPI_SUCCESS != err ) {
OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
"%s:%4d\tError occurred %d, rank %2d", __FILE__, line, err,
rank));
(void)line; // silence compiler warning
}

/* All done */
return err;
Expand Down
Loading