Skip to content

Commit

Permalink
Merge pull request open-mpi#6 from raafatfeki/pr/vulcan_sendbuf_contg
Browse files Browse the repository at this point in the history
Pr/vulcan sendbuf contg
  • Loading branch information
edgargabriel authored May 21, 2018
2 parents 3a6d5d1 + b72534a commit 238e0ee
Showing 1 changed file with 83 additions and 75 deletions.
158 changes: 83 additions & 75 deletions ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
#define DEBUG_ON 0
#define FCOLL_VULCAN_SHUFFLE_TAG 123
#define INIT_LEN 10

#define NOT_AGGR_INDEX -1

/*Used for loading file-offsets per aggregator*/
typedef struct mca_io_ompio_local_io_array{
Expand All @@ -57,13 +57,11 @@ typedef struct mca_io_ompio_aggregator_data {
int current_index, current_position;
int bytes_to_write_in_cycle, bytes_remaining, procs_per_group;
int *procs_in_group, iov_index;
bool sendbuf_is_contiguous, prev_sendbuf_is_contiguous;
int bytes_sent, prev_bytes_sent;
struct iovec *decoded_iov;
int bytes_to_write, prev_bytes_to_write;
mca_io_ompio_io_array_t *io_array, *prev_io_array;
int num_io_entries, prev_num_io_entries;
char *send_buf, *prev_send_buf;
} mca_io_ompio_aggregator_data;


Expand All @@ -78,9 +76,7 @@ typedef struct mca_io_ompio_aggregator_data {
for (_i=0; _i<_num; _i++ ) { \
_aggr[_i]->prev_io_array=_aggr[_i]->io_array; \
_aggr[_i]->prev_num_io_entries=_aggr[_i]->num_io_entries; \
_aggr[_i]->prev_send_buf=_aggr[_i]->send_buf; \
_aggr[_i]->prev_bytes_sent=_aggr[_i]->bytes_sent; \
_aggr[_i]->prev_sendbuf_is_contiguous=_aggr[_i]->sendbuf_is_contiguous; \
_aggr[_i]->prev_bytes_to_write=_aggr[_i]->bytes_to_write; \
_t=_aggr[_i]->prev_global_buf; \
_aggr[_i]->prev_global_buf=_aggr[_i]->global_buf; \
Expand Down Expand Up @@ -213,8 +209,6 @@ int mca_fcoll_vulcan_file_write_all (mca_io_ompio_file_t *fh,
aggr_data[i]->procs_in_group = fh->f_procs_in_group;
aggr_data[i]->comm = fh->f_comm;
aggr_data[i]->buf = (char *)buf; // should not be used in the new version.
aggr_data[i]->sendbuf_is_contiguous = false; //safe assumption for right now
aggr_data[i]->prev_sendbuf_is_contiguous = false; //safe assumption for right now
}

/*********************************************************************
Expand Down Expand Up @@ -544,6 +538,7 @@ int mca_fcoll_vulcan_file_write_all (mca_io_ompio_file_t *fh,
#endif
}

int aggr_index = NOT_AGGR_INDEX;
reqs1 = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*vulcan_num_io_procs *sizeof(ompi_request_t *));
reqs2 = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*vulcan_num_io_procs *sizeof(ompi_request_t *));
if ( NULL == reqs1 || NULL == reqs2 ) {
Expand All @@ -567,6 +562,11 @@ int mca_fcoll_vulcan_file_write_all (mca_io_ompio_file_t *fh,
for ( i=0; i<vulcan_num_io_procs; i++ ) {
ret = shuffle_init ( 0, cycles, aggregators[i], fh->f_rank, aggr_data[i],
&curr_reqs[i*(fh->f_procs_per_group + 1)] );

if(aggregators[i] == fh->f_rank) {
aggr_index = i;
}

if ( OMPI_SUCCESS != ret ) {
goto exit;
}
Expand Down Expand Up @@ -595,23 +595,19 @@ int mca_fcoll_vulcan_file_write_all (mca_io_ompio_file_t *fh,
}


/* Write data for iteration i-1 */
for ( i=0; i<vulcan_num_io_procs; i++ ) {
/* Write data for iteration i-1 only by an aggregator*/
if(NOT_AGGR_INDEX != aggr_index) {
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_write_time = MPI_Wtime();
#endif
ret = write_init (fh, aggregators[i], aggr_data[i], write_chunksize );
ret = write_init (fh, aggregators[aggr_index], aggr_data[aggr_index], write_chunksize );
if (OMPI_SUCCESS != ret){
goto exit;
}
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_write_time = MPI_Wtime();
write_time += end_write_time - start_write_time;
#endif

if (!aggr_data[i]->prev_sendbuf_is_contiguous && aggr_data[i]->prev_bytes_sent) {
free (aggr_data[i]->prev_send_buf);
}
}

} /* end for (index = 0; index < cycles; index++) */
Expand All @@ -629,22 +625,18 @@ int mca_fcoll_vulcan_file_write_all (mca_io_ompio_file_t *fh,
}

/* Write data for iteration i=cycles-1 */
for ( i=0; i<vulcan_num_io_procs; i++ ) {
if(NOT_AGGR_INDEX != aggr_index) {
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
start_write_time = MPI_Wtime();
#endif
ret = write_init (fh, aggregators[i], aggr_data[i], write_chunksize );
ret = write_init (fh, aggregators[aggr_index], aggr_data[aggr_index], write_chunksize );
if (OMPI_SUCCESS != ret){
goto exit;
}
}
#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
end_write_time = MPI_Wtime();
write_time += end_write_time - start_write_time;
#endif

if (!aggr_data[i]->prev_sendbuf_is_contiguous && aggr_data[i]->prev_bytes_sent) {
free (aggr_data[i]->prev_send_buf);
}
}
}

Expand Down Expand Up @@ -742,7 +734,7 @@ static int write_init (mca_io_ompio_file_t *fh, int aggregator, mca_io_ompio_agg
int last_pos=0;


if ( aggregator == fh->f_rank && aggr_data->prev_num_io_entries) {
if (aggr_data->prev_num_io_entries) {
while ( aggr_data->prev_bytes_to_write > 0 ) {
aggr_data->prev_bytes_to_write -= mca_fcoll_vulcan_split_iov_array (fh, aggr_data->prev_io_array,
aggr_data->prev_num_io_entries,
Expand Down Expand Up @@ -779,11 +771,13 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
MPI_Aint *memory_displacements=NULL;
int *temp_disp_index=NULL;
MPI_Aint global_count = 0;
int* blocklength_proc=NULL;
ptrdiff_t* displs_proc=NULL;

data->num_io_entries = 0;
data->bytes_sent = 0;
data->io_array=NULL;
data->send_buf=NULL;

/**********************************************************************
*** 7a. Getting ready for next cycle: initializing and freeing buffers
**********************************************************************/
Expand Down Expand Up @@ -1158,74 +1152,86 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
}
} /* end if (entries_per_aggr > 0 ) */
}/* end if (aggregator == rank ) */

if ( data->sendbuf_is_contiguous ) {
data->send_buf = &((char*)data->buf)[data->total_bytes_written];
}
else if (bytes_sent) {
/* allocate a send buffer and copy the data that needs
to be sent into it in case the data is non-contigous
in memory */
ptrdiff_t mem_address;
size_t remaining = 0;
size_t temp_position = 0;

data->send_buf = malloc (bytes_sent);
if (NULL == data->send_buf) {

if (bytes_sent) {
size_t remaining = bytes_sent;
int block_index = -1;
int blocklength_size = INIT_LEN;

ptrdiff_t send_mem_address = NULL;
ompi_datatype_t *newType = MPI_DATATYPE_NULL;
blocklength_proc = (int *) calloc (blocklength_size, sizeof (int));
displs_proc = (ptrdiff_t *) calloc (blocklength_size, sizeof (ptrdiff_t));

if (NULL == blocklength_proc || NULL == displs_proc ) {
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}

remaining = bytes_sent;


while (remaining) {
mem_address = (ptrdiff_t)
(data->decoded_iov[data->iov_index].iov_base) + data->current_position;

block_index++;

if(0 == block_index) {
send_mem_address = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) +
data->current_position;
}
else {
// Reallocate more memory if blocklength_size is not enough
if(0 == block_index % INIT_LEN) {
blocklength_size += INIT_LEN;
blocklength_proc = (int *) realloc(blocklength_proc, blocklength_size * sizeof(int));
displs_proc = (ptrdiff_t *) realloc(displs_proc, blocklength_size * sizeof(ptrdiff_t));
}
displs_proc[block_index] = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) +
data->current_position - send_mem_address;
}

if (remaining >=
(data->decoded_iov[data->iov_index].iov_len - data->current_position)) {
memcpy (data->send_buf+temp_position,
(IOVBASE_TYPE *)mem_address,
data->decoded_iov[data->iov_index].iov_len - data->current_position);

blocklength_proc[block_index] = data->decoded_iov[data->iov_index].iov_len -
data->current_position;
remaining = remaining -
(data->decoded_iov[data->iov_index].iov_len - data->current_position);
temp_position = temp_position +
(data->decoded_iov[data->iov_index].iov_len - data->current_position);
(data->decoded_iov[data->iov_index].iov_len - data->current_position);
data->iov_index = data->iov_index + 1;
data->current_position = 0;
}
else {
memcpy (data->send_buf+temp_position,
(IOVBASE_TYPE *) mem_address,
remaining);
blocklength_proc[block_index] = remaining;
data->current_position += remaining;
remaining = 0;
}
}
}
data->total_bytes_written += bytes_sent;
data->bytes_sent = bytes_sent;
/* Gather the sendbuf from each process in appropritate locations in
aggregators*/

if (bytes_sent){
ret = MCA_PML_CALL(isend(data->send_buf,
bytes_sent,
MPI_BYTE,
aggregator,
FCOLL_VULCAN_SHUFFLE_TAG+index,
MCA_PML_BASE_SEND_STANDARD,
data->comm,
&reqs[data->procs_per_group]));


if ( OMPI_SUCCESS != ret ){
goto exit;

data->total_bytes_written += bytes_sent;
data->bytes_sent = bytes_sent;

if ( 0 <= block_index ) {
ompi_datatype_create_hindexed(block_index+1,
blocklength_proc,
displs_proc,
MPI_BYTE,
&newType);
ompi_datatype_commit(&newType);

ret = MCA_PML_CALL(isend((char *)send_mem_address,
1,
newType,
aggregator,
FCOLL_VULCAN_SHUFFLE_TAG+index,
MCA_PML_BASE_SEND_STANDARD,
data->comm,
&reqs[data->procs_per_group]));
if ( MPI_DATATYPE_NULL != newType ) {
ompi_datatype_destroy(&newType);
}
if (OMPI_SUCCESS != ret){
goto exit;
}
}

}

#if DEBUG_ON
if (aggregator == rank){
printf("************Cycle: %d, Aggregator: %d ***************\n",
Expand Down Expand Up @@ -1301,7 +1307,9 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
free(sorted_file_offsets);
free(file_offsets_for_agg);
free(memory_displacements);

free(blocklength_proc);
free(displs_proc);

return OMPI_SUCCESS;
}

Expand Down

0 comments on commit 238e0ee

Please sign in to comment.