Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement selection vector I/O with collective chunk filling #3826

Merged
merged 14 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 46 additions & 224 deletions src/H5Dchunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,6 @@ static herr_t H5D__chunk_prune_fill(H5D_chunk_it_ud1_t *udata, bool new_unfilt
#ifdef H5_HAVE_PARALLEL
static herr_t H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_fill_info,
const void *fill_buf, const void *partial_chunk_fill_buf);
static int H5D__chunk_cmp_coll_fill_info(const void *_entry1, const void *_entry2);
#endif /* H5_HAVE_PARALLEL */

/* Debugging helper routine callback */
Expand Down Expand Up @@ -5536,9 +5535,7 @@ H5D__chunk_update_old_edge_chunks(H5D_t *dset, hsize_t old_dim[])
/*-------------------------------------------------------------------------
* Function: H5D__chunk_collective_fill
*
* Purpose: Use MPIO collective write to fill the chunks (if number of
* chunks to fill is greater than the number of MPI procs;
* otherwise use independent I/O).
* Purpose: Use MPIO selection vector I/O for writing fill chunks
*
* Return: Non-negative on success/Negative on failure
*
Expand All @@ -5548,211 +5545,60 @@ static herr_t
H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_fill_info,
const void *fill_buf, const void *partial_chunk_fill_buf)
{
MPI_Comm mpi_comm = MPI_COMM_NULL; /* MPI communicator for file */
int mpi_rank = (-1); /* This process's rank */
int mpi_size = (-1); /* MPI Comm size */
int mpi_code; /* MPI return code */
size_t num_blocks; /* Number of blocks between processes. */
size_t leftover_blocks; /* Number of leftover blocks to handle */
int blocks, leftover; /* converted to int for MPI */
MPI_Aint *chunk_disp_array = NULL;
MPI_Aint *block_disps = NULL;
int *block_lens = NULL;
MPI_Datatype mem_type = MPI_BYTE, file_type = MPI_BYTE;
H5FD_mpio_xfer_t prev_xfer_mode; /* Previous data xfer mode */
bool have_xfer_mode = false; /* Whether the previous xffer mode has been retrieved */
bool need_sort = false;
size_t i; /* Local index variable */
size_t i; /* Local index variable */
uint32_t io_count = 0;
haddr_t *io_addrs = NULL;
size_t *io_sizes = NULL;
const void **io_wbufs = NULL;
H5FD_mem_t io_types[2];
bool all_same_block_len = true;
size_t io_2sizes[2];
herr_t ret_value = SUCCEED; /* Return value */

FUNC_ENTER_PACKAGE

/*
* If a separate fill buffer is provided for partial chunks, ensure
* that the "don't filter partial edge chunks" flag is set.
*/
if (partial_chunk_fill_buf)
assert(dset->shared->layout.u.chunk.flags & H5O_LAYOUT_CHUNK_DONT_FILTER_PARTIAL_BOUND_CHUNKS);

/* Get the MPI communicator */
if (MPI_COMM_NULL == (mpi_comm = H5F_mpi_get_comm(dset->oloc.file)))
HGOTO_ERROR(H5E_INTERNAL, H5E_MPI, FAIL, "Can't retrieve MPI communicator");

/* Get the MPI rank */
if ((mpi_rank = H5F_mpi_get_rank(dset->oloc.file)) < 0)
HGOTO_ERROR(H5E_INTERNAL, H5E_MPI, FAIL, "Can't retrieve MPI rank");
assert(chunk_fill_info->num_chunks != 0);

/* Get the MPI size */
if ((mpi_size = H5F_mpi_get_size(dset->oloc.file)) < 0)
HGOTO_ERROR(H5E_INTERNAL, H5E_MPI, FAIL, "Can't retrieve MPI size");

/* Distribute evenly the number of blocks between processes. */
if (mpi_size == 0)
HGOTO_ERROR(H5E_DATASET, H5E_BADVALUE, FAIL, "Resulted in division by zero");
num_blocks =
(size_t)(chunk_fill_info->num_chunks / (size_t)mpi_size); /* value should be the same on all procs */

/* After evenly distributing the blocks between processes, are there any
* leftover blocks for each individual process (round-robin)?
*/
leftover_blocks = (size_t)(chunk_fill_info->num_chunks % (size_t)mpi_size);

/* Cast values to types needed by MPI */
H5_CHECKED_ASSIGN(blocks, int, num_blocks, size_t);
H5_CHECKED_ASSIGN(leftover, int, leftover_blocks, size_t);

/* Check if we have any chunks to write on this rank */
if (num_blocks > 0 || (leftover && leftover > mpi_rank)) {
MPI_Aint partial_fill_buf_disp = 0;
bool all_same_block_len = true;

/* Allocate buffers */
if (NULL == (chunk_disp_array = (MPI_Aint *)H5MM_malloc((size_t)(blocks + 1) * sizeof(MPI_Aint))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk file displacement buffer");

if (partial_chunk_fill_buf) {
MPI_Aint fill_buf_addr;
MPI_Aint partial_fill_buf_addr;

/* Calculate the displacement between the fill buffer and partial chunk fill buffer */
if (MPI_SUCCESS != (mpi_code = MPI_Get_address(fill_buf, &fill_buf_addr)))
HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code)
if (MPI_SUCCESS != (mpi_code = MPI_Get_address(partial_chunk_fill_buf, &partial_fill_buf_addr)))
HMPI_GOTO_ERROR(FAIL, "MPI_Get_address failed", mpi_code)

#if H5_CHECK_MPI_VERSION(3, 1)
partial_fill_buf_disp = MPI_Aint_diff(partial_fill_buf_addr, fill_buf_addr);
#else
partial_fill_buf_disp = partial_fill_buf_addr - fill_buf_addr;
#endif

/*
* Allocate all-zero block displacements array. If a block's displacement
* is left as zero, that block will be written to from the regular fill
* buffer. If a block represents an unfiltered partial edge chunk, its
* displacement will be set so that the block is written to from the
* unfiltered fill buffer.
*/
if (NULL == (block_disps = (MPI_Aint *)H5MM_calloc((size_t)(blocks + 1) * sizeof(MPI_Aint))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate block displacements buffer");
}

/*
* Perform initial scan of chunk info list to:
* - make sure that chunk addresses are monotonically non-decreasing
* - check if all blocks have the same length
*/
for (i = 1; i < chunk_fill_info->num_chunks; i++) {
if (chunk_fill_info->chunk_info[i].addr < chunk_fill_info->chunk_info[i - 1].addr)
need_sort = true;

if (chunk_fill_info->chunk_info[i].chunk_size != chunk_fill_info->chunk_info[i - 1].chunk_size)
all_same_block_len = false;
}

if (need_sort)
qsort(chunk_fill_info->chunk_info, chunk_fill_info->num_chunks,
sizeof(struct chunk_coll_fill_info), H5D__chunk_cmp_coll_fill_info);

/* Allocate buffer for block lengths if necessary */
if (!all_same_block_len)
if (NULL == (block_lens = (int *)H5MM_malloc((size_t)(blocks + 1) * sizeof(int))))
HGOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, "couldn't allocate chunk lengths buffer");

for (i = 0; i < (size_t)blocks; i++) {
size_t idx = i + (size_t)(mpi_rank * blocks);

/* store the chunk address as an MPI_Aint */
chunk_disp_array[i] = (MPI_Aint)(chunk_fill_info->chunk_info[idx].addr);

if (!all_same_block_len)
H5_CHECKED_ASSIGN(block_lens[i], int, chunk_fill_info->chunk_info[idx].chunk_size, size_t);

if (chunk_fill_info->chunk_info[idx].unfiltered_partial_chunk) {
assert(partial_chunk_fill_buf);
block_disps[i] = partial_fill_buf_disp;
}
} /* end for */

/* Calculate if there are any leftover blocks after evenly
* distributing. If there are, then round-robin the distribution
* to processes 0 -> leftover.
*/
if (leftover && leftover > mpi_rank) {
chunk_disp_array[blocks] =
(MPI_Aint)chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].addr;

if (!all_same_block_len)
H5_CHECKED_ASSIGN(block_lens[blocks], int,
chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].chunk_size,
size_t);

if (chunk_fill_info->chunk_info[(blocks * mpi_size) + mpi_rank].unfiltered_partial_chunk) {
assert(partial_chunk_fill_buf);
block_disps[blocks] = partial_fill_buf_disp;
}
io_count = (uint32_t)chunk_fill_info->num_chunks;

blocks++;
for (i = 1; i < io_count; i++) {
if (chunk_fill_info->chunk_info[i].chunk_size != chunk_fill_info->chunk_info[i - 1].chunk_size) {
all_same_block_len = false;
break;
}
}

/* Create file and memory types for the write operation */
if (all_same_block_len) {
int block_len;

H5_CHECKED_ASSIGN(block_len, int, chunk_fill_info->chunk_info[0].chunk_size, size_t);

mpi_code =
MPI_Type_create_hindexed_block(blocks, block_len, chunk_disp_array, MPI_BYTE, &file_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed_block failed", mpi_code)
if (all_same_block_len) {
io_2sizes[0] = chunk_fill_info->chunk_info[0].chunk_size;
io_2sizes[1] = 0;
}
else {
if (NULL == (io_sizes = H5MM_malloc(io_count * sizeof(*io_sizes))))
HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate space for I/O sizes vector");
}

if (partial_chunk_fill_buf) {
/*
* If filters are disabled for partial edge chunks, those chunks could
* potentially have the same block length as the other chunks, but still
* need to be written to using the unfiltered fill buffer. Use an hindexed
* block type rather than an hvector.
*/
mpi_code =
MPI_Type_create_hindexed_block(blocks, block_len, block_disps, MPI_BYTE, &mem_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed_block failed", mpi_code)
}
else {
mpi_code = MPI_Type_create_hvector(blocks, block_len, 0, MPI_BYTE, &mem_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hvector failed", mpi_code)
}
}
else {
/*
* Currently, different block lengths implies that there are partial
* edge chunks and the "don't filter partial edge chunks" flag is set.
*/
assert(partial_chunk_fill_buf);
assert(block_lens);
assert(block_disps);
io_types[0] = H5FD_MEM_DRAW;
io_types[1] = H5FD_MEM_NOLIST;

mpi_code = MPI_Type_create_hindexed(blocks, block_lens, chunk_disp_array, MPI_BYTE, &file_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)
if (NULL == (io_addrs = H5MM_malloc(io_count * sizeof(*io_addrs))))
HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate space for I/O addresses vector");

mpi_code = MPI_Type_create_hindexed(blocks, block_lens, block_disps, MPI_BYTE, &mem_type);
if (mpi_code != MPI_SUCCESS)
HMPI_GOTO_ERROR(FAIL, "MPI_Type_create_hindexed failed", mpi_code)
}
if (NULL == (io_wbufs = H5MM_malloc(io_count * sizeof(*io_wbufs))))
HGOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, FAIL, "couldn't allocate space for I/O buffers vector");

if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(&file_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
if (MPI_SUCCESS != (mpi_code = MPI_Type_commit(&mem_type)))
HMPI_GOTO_ERROR(FAIL, "MPI_Type_commit failed", mpi_code)
} /* end if */
for (i = 0; i < io_count; i++) {
io_addrs[i] = chunk_fill_info->chunk_info[i].addr;

/* Set MPI-IO VFD properties */
if (!all_same_block_len)
io_sizes[i] = chunk_fill_info->chunk_info[i].chunk_size;

/* Set MPI datatypes for operation */
if (H5CX_set_mpi_coll_datatypes(mem_type, file_type) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set MPI-I/O properties");
if (chunk_fill_info->chunk_info[i].unfiltered_partial_chunk)
io_wbufs[i] = partial_chunk_fill_buf;
else
io_wbufs[i] = fill_buf;
}

/* Get current transfer mode */
if (H5CX_get_io_xfer_mode(&prev_xfer_mode) < 0)
Expand All @@ -5763,48 +5609,24 @@ H5D__chunk_collective_fill(const H5D_t *dset, H5D_chunk_coll_fill_info_t *chunk_
if (H5CX_set_io_xfer_mode(H5FD_MPIO_COLLECTIVE) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set transfer mode");

/* Low-level write (collective) */
if (H5F_shared_block_write(H5F_SHARED(dset->oloc.file), H5FD_MEM_DRAW, (haddr_t)0,
(blocks) ? (size_t)1 : (size_t)0, fill_buf) < 0)
HGOTO_ERROR(H5E_IO, H5E_WRITEERROR, FAIL, "unable to write raw data to file");

/* Barrier so processes don't race ahead */
if (MPI_SUCCESS != (mpi_code = MPI_Barrier(mpi_comm)))
HMPI_GOTO_ERROR(FAIL, "MPI_Barrier failed", mpi_code)
if (H5F_shared_vector_write(H5F_SHARED(dset->oloc.file), io_count, io_types, io_addrs,
all_same_block_len ? io_2sizes : io_sizes, io_wbufs) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "vector write call failed");

done:
if (have_xfer_mode)
/* Set transfer mode */
if (H5CX_set_io_xfer_mode(prev_xfer_mode) < 0)
HDONE_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set transfer mode");

/* free things */
if (MPI_BYTE != file_type)
if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&file_type)))
HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
if (MPI_BYTE != mem_type)
if (MPI_SUCCESS != (mpi_code = MPI_Type_free(&mem_type)))
HMPI_DONE_ERROR(FAIL, "MPI_Type_free failed", mpi_code)
H5MM_xfree(chunk_disp_array);
H5MM_xfree(block_disps);
H5MM_xfree(block_lens);
H5MM_xfree(io_addrs);
H5MM_xfree(io_wbufs);
if (!all_same_block_len)
H5MM_xfree(io_sizes);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comment: I'd prefer to just always call H5MM_xfree here to make the code more robust against future changes. It's a no-op if io_sizes in NULL so this won't cause problems

Copy link
Member

@derobins derobins Nov 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The C standard explicitly says that calling free() on NULL pointers is acceptable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


FUNC_LEAVE_NOAPI(ret_value)
} /* end H5D__chunk_collective_fill() */

static int
H5D__chunk_cmp_coll_fill_info(const void *_entry1, const void *_entry2)
{
const struct chunk_coll_fill_info *entry1;
const struct chunk_coll_fill_info *entry2;

FUNC_ENTER_PACKAGE_NOERR

entry1 = (const struct chunk_coll_fill_info *)_entry1;
entry2 = (const struct chunk_coll_fill_info *)_entry2;

FUNC_LEAVE_NOAPI(H5_addr_cmp(entry1->addr, entry2->addr))
} /* end H5D__chunk_cmp_coll_fill_info() */
#endif /* H5_HAVE_PARALLEL */

/*-------------------------------------------------------------------------
Expand Down
9 changes: 7 additions & 2 deletions src/H5FDmpio.c
Original file line number Diff line number Diff line change
Expand Up @@ -2473,8 +2473,13 @@ H5FD__mpio_write_vector(H5FD_t *_file, hid_t H5_ATTR_UNUSED dxpl_id, uint32_t co
HGOTO_ERROR(H5E_VFL, H5E_CANTGET, FAIL, "can't build MPI datatypes for I/O");

/* Compute max address written to */
if (count > 0)
max_addr = s_addrs[count - 1] + (haddr_t)(s_sizes[count - 1]);
/* Bug: need to account for fixed size optimization */
if (count > 0) {
Copy link
Member

@fortnern fortnern Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few things here:

  • If count == 1 this reads off the end of the s_sizes array (maybe just change the condition to count > 1)
  • The sizes can actually end anywhere in the array, not just at index 1. I'd recommend checking the last element to see if it's 0, then scanning from index 1 and up until you see a 0. So maybe something like:
    if (count > 1) {
    uint32_t idx = count - 1;
    if (s_sizes[idx] == 0)
    for (idx = 0; idx < count - 1; idx++)
    if (s_sizes[idx + 1] == 0)
    break;
    max_addr = s_addrs[count - 1] + (haddr_t)(s_sizes[idx])
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the comment shouldn't proclaim it as a bug, just explain what it's doing

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually my code is wrong too, I'll try to fix it later tonight. We might need to change the H5FD__mpio_vector_build_types function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think <s_sizes> returned from H5FD__mpio_vector_build_types() can be (1) or (2) as follows:

  1. entries of sizes
  2. Two entries for same size optimization: entry 0 is the size and entry 1 is 0
    Maybe the better way is to change H5FD__mpio_vector_build_types().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be any number of entries between 1 (with a 0 terminator if count > 1) and count

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have time it would be great to add a test for this case. We would have to use the public H5FD interface

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied the patch from Neil.
I will work on adding a test for this case.

uint32_t idx = count - 1;
if (s_sizes[0] != 0 && s_sizes[1] == 0)
idx = 0;
max_addr = s_addrs[count - 1] + (haddr_t)(s_sizes[idx]);
}

/* free sorted vectors if they exist */
if (!vector_was_sorted) {
Expand Down
21 changes: 15 additions & 6 deletions testpar/t_filters_parallel.c
Original file line number Diff line number Diff line change
Expand Up @@ -556,10 +556,20 @@ verify_chunk_opt_status(size_t num_dsets, test_mode_t test_mode, bool any_io, bo
* elements (because no elements were raw data), which is what happens when performing I/O on a
* filtered dataset with no selection. Vector I/O does report an I/O call was made if passed a raw
* data element of size 0, so this is consistent. */
/* Changes: for allocation, the reported I/O is vector I/O */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to call out "changes" in the code comment here. Just explain why we expect the different modes under each condition as the code stands with this PR

if (!any_io) {
if (did_alloc || (num_dsets > 0 && test_mode == USE_MULTIPLE_DATASETS_MIXED_FILTERED))
if (did_alloc && (num_dsets > 0 && test_mode == USE_MULTIPLE_DATASETS_MIXED_FILTERED)) {
VRFY((H5D_VECTOR_IO | H5D_SCALAR_IO) == actual_sel_io_mode_reduced,
"verified actual selection I/O mode was vector and scalar I/O");
}
else if (did_alloc) {
VRFY(H5D_VECTOR_IO == actual_sel_io_mode_reduced,
"verified actual selection I/O mode was vector I/O");
}
else if (num_dsets > 0 && test_mode == USE_MULTIPLE_DATASETS_MIXED_FILTERED) {
VRFY(H5D_SCALAR_IO == actual_sel_io_mode_reduced,
"verified actual selection I/O mode was scalar I/O");
}
else
VRFY(0 == actual_sel_io_mode_reduced,
"verified actual selection I/O mode was 0 (no I/O)");
Expand Down Expand Up @@ -592,15 +602,14 @@ verify_chunk_opt_status(size_t num_dsets, test_mode_t test_mode, bool any_io, bo
* should be scalar I/O for allocation in addition to vector I/O for the actual data.
* If we're reading from an unallocated dataset then there should be no actual I/O.
* Otherwise there should only be vector I/O. */
if (did_alloc)
VRFY((H5D_SCALAR_IO | H5D_VECTOR_IO) == actual_sel_io_mode_reduced,
"verified actual selection I/O mode was scalar and vector I/O");
else if (unalloc_read)
/* Changes: for allocation, the reported I/O is vector I/O */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. Explain that both collective filling/allocation and filtered I/O use vector I/O so only that is reported whether or not allocation happened.

if (unalloc_read)
VRFY(0 == actual_sel_io_mode_reduced,
"verified actual selection I/O mode was 0 (no I/O)");
else
else { /* did_alloc || !unalloc_read */
VRFY(H5D_VECTOR_IO == actual_sel_io_mode_reduced,
"verified actual selection I/O mode was vector I/O");
}
break;

case USE_MULTIPLE_DATASETS_MIXED_FILTERED:
Expand Down