Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement selection vector I/O with collective chunk filling #3826

Merged
merged 14 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 74 additions & 160 deletions src/H5Dchunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -5536,9 +5536,7 @@ H5D__chunk_update_old_edge_chunks(H5D_t *dset, hsize_t old_dim[])
/*-------------------------------------------------------------------------
* Function: H5D__chunk_collective_fill
*
* Purpose: Use MPIO collective write to fill the chunks (if number of
* chunks to fill is greater than the number of MPI procs;
* otherwise use independent I/O).
* Purpose: Use MPIO selection vector I/O for writing fill chunks
*
* Return: Non-negative on success/Negative on failure
*
Expand All @@ -5554,19 +5552,24 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_
int mpi_code; /* MPI return code */
size_t num_blocks; /* Number of blocks between processes. */
size_t leftover_blocks; /* Number of leftover blocks to handle */
int blocks, leftover; /* converted to int for MPI */
MPI_Aint *chunk_disp_array = NULL;
MPI_Aint *block_disps = NULL;
int *block_lens = NULL;
MPI_Datatype mem_type = MPI_BYTE, file_type = MPI_BYTE;
H5FD_mpio_xfer_t prev_xfer_mode; /* Previous data xfer mode */
bool have_xfer_mode = false; /* Whether the previous xffer mode has been retrieved */
bool need_sort = false;
size_t i; /* Local index variable */
int blocks; /* converted to int for MPI */
int leftover; /* converted to int for MPI */
H5FD_mpio_xfer_t prev_xfer_mode; /* Previous data xfer mode */
bool have_xfer_mode = false; /* Whether the previous xffer mode has been retrieved */
size_t i; /* Local index variable */
haddr_t *io_addrs = NULL;
size_t *io_sizes = NULL;
const void **io_wbufs = NULL;
H5FD_mem_t io_types[2];
bool all_same_block_len = true;
bool need_sort = false;
size_t io_2sizes[2];
herr_t ret_value = SUCCEED; /* Return value */

FUNC_ENTER_PACKAGE

assert(chunk_fill_info->num_chunks != 0);

/*
* If a separate fill buffer is provided for partial chunks, ensure
* that the "don't filter partial edge chunks" flag is set.
Expand All @@ -5589,6 +5592,7 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_
/* Distribute evenly the number of blocks between processes. */
if (mpi_size == 0)
HGOTO_ERROR(H5E_DATASET, H5E_BADVALUE, FAIL, "Resulted in division by zero");

num_blocks =
(size_t)(chunk_fill_info->num_chunks / (size_t)mpi_size); /* value should be the same on all procs */

Expand All @@ -5603,156 +5607,73 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_

/* Check if we have any chunks to write on this rank */
if (num_blocks > 0 || (leftover && leftover > mpi_rank)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nitpick with pre-existing code - the first leftover is redundant since mpi_rank will never be less than 0. This could be if (num_blocks > 0 || leftover > mpi_rank) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done with this and subsequent comments.

MPI_Aint partial_fill_buf_disp = 0;
bool all_same_block_len = true;

/* Allocate buffers */
if (NULL == (chunk_disp_array = (MPI_Aint *)H5MM_malloc((size_t)(blocks + 1) * sizeof(MPI_Aint))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk file displacement buffer");

if (partial_chunk_fill_buf) {
MPI_Aint fill_buf_addr;
MPI_Aint partial_fill_buf_addr;

/* Calculate the displacement between the fill buffer and partial chunk fill buffer */
if (MPI_SUCCESS != (mpi_code = MPI_Get_address(fill_buf, &fill_buf_addr)))
HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code)
if (MPI_SUCCESS != (mpi_code = MPI_Get_address(partial_chunk_fill_buf, &partial_fill_buf_addr)))
HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code)

#if H5_CHECK_MPI_VERSION(3, 1)
partial_fill_buf_disp = MPI_Aint_diff(partial_fill_buf_addr, fill_buf_addr);
#else
partial_fill_buf_disp = partial_fill_buf_addr - fill_buf_addr;
#endif

/*
* Allocate all-zero block displacements array. If a block's displacement
* is left as zero, that block will be written to from the regular fill
* buffer. If a block represents an unfiltered partial edge chunk, its
* displacement will be set so that the block is written to from the
* unfiltered fill buffer.
*/
if (NULL == (block_disps = (MPI_Aint *)H5MM_calloc((size_t)(blocks + 1) * sizeof(MPI_Aint))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate block displacements buffer");
}

/*
* Perform initial scan of chunk info list to:
* - make sure that chunk addresses are monotonically non-decreasing
* - check if all blocks have the same length
*/
for (i = 1; i < chunk_fill_info->num_chunks; i++) {
if (chunk_fill_info->chunk_info[i].addr < chunk_fill_info->chunk_info[i - 1].addr)
need_sort = true;
if (NULL == (io_addrs = H5MM_malloc((size_t)(blocks + 1) * sizeof(*io_addrs))))
HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL,
"couldn't allocate space for I/O addresses vector");

if (chunk_fill_info->chunk_info[i].chunk_size != chunk_fill_info->chunk_info[i - 1].chunk_size)
all_same_block_len = false;
}
if (NULL == (io_wbufs = H5MM_malloc((size_t)(blocks + 1) * sizeof(*io_wbufs))))
HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate space for I/O buffers vector");
}

if (need_sort)
qsort(chunk_fill_info->chunk_info, chunk_fill_info->num_chunks,
sizeof(struct chunk_coll_fill_info), H5D__chunk_cmp_coll_fill_info);
/*
* Perform initial scan of chunk info list to:
* - make sure that chunk addresses are monotonically non-decreasing
* - check if all blocks have the same length
*/
for (i = 1; i < chunk_fill_info->num_chunks; i++) {
if (chunk_fill_info->chunk_info[i].addr < chunk_fill_info->chunk_info[i - 1].addr)
need_sort = true;

/* Allocate buffer for block lengths if necessary */
if (!all_same_block_len)
if (NULL == (block_lens = (int *)H5MM_malloc((size_t)(blocks + 1) * sizeof(int))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk lengths buffer");
if (chunk_fill_info->chunk_info[i].chunk_size != chunk_fill_info->chunk_info[i - 1].chunk_size)
all_same_block_len = false;
}

for (i = 0; i < (size_t)blocks; i++) {
size_t idx = i + (size_t)(mpi_rank * blocks);
if (need_sort)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some more comments to the blocks that don't have them, explaining what the code is doing?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, for this block in particular, add something like this note (I just discussed this with Scot):

Note that we sort all of the chunks here, and not just a subset corresponding to this rank. We do this since we have found MPI I/O to work better when each rank writes blocks that are contiguous in the file, and by sorting the full list we maximize the chance of that happening.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we could potentially be making extra work for ourselves now by sorting here since the MPI I/O VFD is already scanning the chunk list to check for this anyway and we don't necessarily need to impose the monotonically non-decreasing file displacement requirement on other parallel VFDs. That said, since the sort has to happen at some point anyway if the addresses are out of order, I'm not sure if the above note is really needed here.

qsort(chunk_fill_info->chunk_info, chunk_fill_info->num_chunks, sizeof(struct chunk_coll_fill_info),
H5D__chunk_cmp_coll_fill_info);

/* store the chunk address as an MPI_Aint */
chunk_disp_array[i] = (MPI_Aint)(chunk_fill_info->chunk_info[idx].addr);
if (all_same_block_len) {
io_2sizes[0] = chunk_fill_info->chunk_info[0].chunk_size;
io_2sizes[1] = 0;
}
else {
if (NULL == (io_sizes = H5MM_malloc((size_t)(blocks + 1) * sizeof(*io_sizes))))
HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate space for I/O sizes vector");
}

if (!all_same_block_len)
H5_CHECKED_ASSIGN(block_lens[i], int, chunk_fill_info->chunk_info[idx].chunk_size, size_t);
io_types[0] = H5FD_MEM_DRAW;
io_types[1] = H5FD_MEM_NOLIST;

if (chunk_fill_info->chunk_info[idx].unfiltered_partial_chunk) {
assert(partial_chunk_fill_buf);
block_disps[i] = partial_fill_buf_disp;
}
} /* end for */
for (i = 0; i < (size_t)blocks; i++) {
size_t idx = i + (size_t)(mpi_rank * blocks);

/* Calculate if there are any leftover blocks after evenly
* distributing. If there are, then round-robin the distribution
* to processes 0 -> leftover.
*/
if (leftover && leftover > mpi_rank) {
chunk_disp_array[blocks] =
(MPI_Aint)chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].addr;

if (!all_same_block_len)
H5_CHECKED_ASSIGN(block_lens[blocks], int,
chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].chunk_size,
size_t);

if (chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].unfiltered_partial_chunk) {
assert(partial_chunk_fill_buf);
block_disps[blocks] = partial_fill_buf_disp;
}
io_addrs[i] = chunk_fill_info->chunk_info[idx].addr;

blocks++;
}
if (!all_same_block_len)
io_sizes[i] = chunk_fill_info->chunk_info[idx].chunk_size;

/* Create file and memory types for the write operation */
if (all_same_block_len) {
int block_len;
if (chunk_fill_info->chunk_info[idx].unfiltered_partial_chunk)
io_wbufs[i] = partial_chunk_fill_buf;
else
io_wbufs[i] = fill_buf;
}

H5_CHECKED_ASSIGN(block_len, int, chunk_fill_info->chunk_info[0].chunk_size, size_t);
if (leftover && leftover > mpi_rank) {
io_addrs[blocks] = chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].addr;

mpi_code =
MPI_Type_create_hindexed_block(blocks, block_len, chunk_disp_array, MPI_BYTE, &file_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed_block failed", mpi_code)
if (!all_same_block_len)
io_sizes[blocks] = chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].chunk_size;

if (partial_chunk_fill_buf) {
/*
* If filters are disabled for partial edge chunks, those chunks could
* potentially have the same block length as the other chunks, but still
* need to be written to using the unfiltered fill buffer. Use an hindexed
* block type rather than an hvector.
*/
mpi_code =
MPI_Type_create_hindexed_block(blocks, block_len, block_disps, MPI_BYTE, &mem_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed_block failed", mpi_code)
}
else {
mpi_code = MPI_Type_create_hvector(blocks, block_len, 0, MPI_BYTE, &mem_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hvector failed", mpi_code)
}
}
else {
/*
* Currently, different block lengths implies that there are partial
* edge chunks and the "don't filter partial edge chunks" flag is set.
*/
if (chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].unfiltered_partial_chunk) {
assert(partial_chunk_fill_buf);
assert(block_lens);
assert(block_disps);

mpi_code = MPI_Type_create_hindexed(blocks, block_lens, chunk_disp_array, MPI_BYTE, &file_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)

mpi_code = MPI_Type_create_hindexed(blocks, block_lens, block_disps, MPI_BYTE, &mem_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)
io_wbufs[blocks] = partial_chunk_fill_buf;
}
else
io_wbufs[blocks] = fill_buf;

if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(&file_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(&mem_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
} /* end if */

/* Set MPI-IO VFD properties */

/* Set MPI datatypes for operation */
if (H5CX_set_mpi_coll_datatypes(mem_type, file_type) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set MPI-I/O properties");
blocks++;
}

/* Get current transfer mode */
if (H5CX_get_io_xfer_mode(&prev_xfer_mode) < 0)
Expand All @@ -5763,31 +5684,23 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_
if (H5CX_set_io_xfer_mode(H5FD_MPIO_COLLECTIVE) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set transfer mode");

/* Low-level write (collective) */
if (H5F_shared_block_write(H5F_SHARED(dset->oloc.file), H5FD_MEM_DRAW, (haddr_t)0,
(blocks) ? (size_t)1 : (size_t)0, fill_buf) < 0)
HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "unable to write raw data to file");

/* Barrier so processes don't race ahead */
if (MPI_SUCCESS != (mpi_code = MPI_Barrier(mpi_comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Barrier failed", mpi_code)

if (H5F_shared_vector_write(H5F_SHARED(dset->oloc.file), (uint32_t)blocks, io_types, io_addrs,
all_same_block_len ? io_2sizes : io_sizes, io_wbufs) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "vector write call failed");

done:
if (have_xfer_mode)
/* Set transfer mode */
if (H5CX_set_io_xfer_mode(prev_xfer_mode) < 0)
HDONE_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set transfer mode");

/* free things */
if (MPI_BYTE != file_type)
if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type)))
HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
if (MPI_BYTE != mem_type)
if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type)))
HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
H5MM_xfree(chunk_disp_array);
H5MM_xfree(block_disps);
H5MM_xfree(block_lens);
H5MM_xfree(io_addrs);
H5MM_xfree(io_wbufs);
H5MM_xfree(io_sizes);

FUNC_LEAVE_NOAPI(ret_value)
} /* end H5D__chunk_collective_fill() */
Expand All @@ -5805,6 +5718,7 @@ H5D__chunk_cmp_coll_fill_info(const void *_entry1, const void *_entry2)

FUNC_LEAVE_NOAPI(H5_addr_cmp(entry1->addr, entry2->addr))
} /* end H5D__chunk_cmp_coll_fill_info() */

#endif /* H5_HAVE_PARALLEL */

/*-------------------------------------------------------------------------
Expand Down
28 changes: 21 additions & 7 deletions src/H5FDmpio.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ static herr_t H5FD__mpio_ctl(H5FD_t *_file, uint64_t op_code, uint64_t flags, co
/* Other functions */
static herr_t H5FD__mpio_vector_build_types(uint32_t count, H5FD_mem_t types[], haddr_t addrs[],
size_t sizes[], H5_flexible_const_ptr_t bufs[],
haddr_t *s_addrs[], size_t *s_sizes[],
haddr_t *s_addrs[], size_t *s_sizes[], uint32_t *s_sizes_len,
H5_flexible_const_ptr_t *s_bufs[], bool *vector_was_sorted,
MPI_Offset *mpi_off, H5_flexible_const_ptr_t *mpi_bufs_base,
int *size_i, MPI_Datatype *buf_type, bool *buf_type_created,
Expand Down Expand Up @@ -1675,7 +1675,8 @@ H5FD__mpio_write(H5FD_t *_file, H5FD_mem_t type, hid_t H5_ATTR_UNUSED dxpl_id, h
static herr_t
H5FD__mpio_vector_build_types(uint32_t count, H5FD_mem_t types[], haddr_t addrs[], size_t sizes[],
H5_flexible_const_ptr_t bufs[], haddr_t *s_addrs[], size_t *s_sizes[],
H5_flexible_const_ptr_t *s_bufs[], bool *vector_was_sorted, MPI_Offset *mpi_off,
uint32_t *s_sizes_len, H5_flexible_const_ptr_t *s_bufs[],
bool *vector_was_sorted, MPI_Offset *mpi_off,
H5_flexible_const_ptr_t *mpi_bufs_base, int *size_i, MPI_Datatype *buf_type,
bool *buf_type_created, MPI_Datatype *file_type, bool *file_type_created,
char *unused)
Expand Down Expand Up @@ -1716,6 +1717,10 @@ H5FD__mpio_vector_build_types(uint32_t count, H5FD_mem_t types[], haddr_t addrs[
/* Get bio I/O transition point (may be lower than 2G for testing) */
bigio_count = H5_mpi_get_bigio_count();

/* Start with s_sizes_len at count */
if (s_sizes_len)
*s_sizes_len = count;

if (count == 1) {
/* Single block. Just use a series of MPI_BYTEs for the file view.
*/
Expand Down Expand Up @@ -1808,8 +1813,13 @@ H5FD__mpio_vector_build_types(uint32_t count, H5FD_mem_t types[], haddr_t addrs[
if (!fixed_size) {
if ((*s_sizes)[i] == 0) {
assert(vector_was_sorted);
assert(i > 0);
fixed_size = true;
size = sizes[i - 1];

/* Return the used length of the s_sizes buffer */
if (s_sizes_len)
*s_sizes_len = (uint32_t)i;
}
else {
size = (*s_sizes)[i];
Expand Down Expand Up @@ -2098,7 +2108,7 @@ H5FD__mpio_read_vector(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, uint32_t cou
if (xfer_mode == H5FD_MPIO_COLLECTIVE) {
/* Build MPI types, etc. */
if (H5FD__mpio_vector_build_types(count, types, addrs, sizes, (H5_flexible_const_ptr_t *)bufs,
&s_addrs, &s_sizes, (H5_flexible_const_ptr_t **)&s_bufs,
&s_addrs, &s_sizes, NULL, (H5_flexible_const_ptr_t **)&s_bufs,
&vector_was_sorted, &mpi_off,
(H5_flexible_const_ptr_t *)&mpi_bufs_base, &size_i, &buf_type,
&buf_type_created, &file_type, &file_type_created, &unused) < 0)
Expand Down Expand Up @@ -2464,17 +2474,21 @@ H5FD__mpio_write_vector(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, uint32_t co
HGOTO_ERROR(H5E_VFL, H5E_CANTGET, FAIL, "can't get MPI-I/O transfer mode");

if (xfer_mode == H5FD_MPIO_COLLECTIVE) {
uint32_t s_sizes_len;

/* Build MPI types, etc. */
if (H5FD__mpio_vector_build_types(count, types, addrs, sizes, (H5_flexible_const_ptr_t *)bufs,
&s_addrs, &s_sizes, (H5_flexible_const_ptr_t **)&s_bufs,
&vector_was_sorted, &mpi_off,
&s_addrs, &s_sizes, &s_sizes_len,
(H5_flexible_const_ptr_t **)&s_bufs, &vector_was_sorted, &mpi_off,
(H5_flexible_const_ptr_t *)&mpi_bufs_base, &size_i, &buf_type,
&buf_type_created, &file_type, &file_type_created, &unused) < 0)
HGOTO_ERROR(H5E_VFL, H5E_CANTGET, FAIL, "can't build MPI datatypes for I/O");

/* Compute max address written to */
/* Compute max address written to. Note s_sizes is indexed according to the length of that array as
* reported by H5FD__mpio_vector_build_types(), which may be shorter if using the compressed arrays
* feature. */
if (count > 0)
max_addr = s_addrs[count - 1] + (haddr_t)(s_sizes[count - 1]);
max_addr = s_addrs[count - 1] + (haddr_t)(s_sizes[s_sizes_len - 1]);

/* free sorted vectors if they exist */
if (!vector_was_sorted) {
Expand Down
Loading