Skip to content

Commit

Permalink
Fix serial to parallel chunked dataset file space allocation bug (HDF…
Browse files Browse the repository at this point in the history
  • Loading branch information
jhendersonHDF committed Aug 31, 2023
1 parent ecae25d commit f83f1fc
Show file tree
Hide file tree
Showing 5 changed files with 315 additions and 12 deletions.
11 changes: 11 additions & 0 deletions release_docs/RELEASE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,17 @@ Bug Fixes since HDF5-1.12.2 release
===================================
Library
-------
- Fixed a file space allocation bug in the parallel library for chunked
datasets

With the addition of support for incremental file space allocation for
chunked datasets with filters applied to them that are created/accessed
in parallel, a bug was introduced to the library's parallel file space
allocation code. This could cause file space to not be allocated correctly
for datasets without filters applied to them that are created with serial
file access and later opened with parallel file access. In turn, this could
cause parallel writes to those datasets to place incorrect data in the file.

- Fixed a bug in H5Ocopy that could generate invalid HDF5 files

H5Ocopy was missing a check to determine whether the new object's
Expand Down
36 changes: 24 additions & 12 deletions src/H5Dint.c
Original file line number Diff line number Diff line change
Expand Up @@ -1715,12 +1715,13 @@ H5D__append_flush_setup(H5D_t *dset, hid_t dapl_id)
static herr_t
H5D__open_oid(H5D_t *dataset, hid_t dapl_id)
{
H5P_genplist_t *plist; /* Property list */
H5O_fill_t *fill_prop; /* Pointer to dataset's fill value info */
unsigned alloc_time_state; /* Allocation time state */
htri_t msg_exists; /* Whether a particular type of message exists */
hbool_t layout_init = FALSE; /* Flag to indicate that chunk information was initialized */
herr_t ret_value = SUCCEED; /* Return value */
H5P_genplist_t *plist; /* Property list */
H5O_fill_t *fill_prop; /* Pointer to dataset's fill value info */
unsigned alloc_time_state; /* Allocation time state */
htri_t msg_exists; /* Whether a particular type of message exists */
hbool_t layout_init = FALSE; /* Flag to indicate that chunk information was initialized */
hbool_t must_init_storage = FALSE;
herr_t ret_value = SUCCEED; /* Return value */

FUNC_ENTER_STATIC_TAG(dataset->oloc.addr)

Expand Down Expand Up @@ -1862,17 +1863,28 @@ H5D__open_oid(H5D_t *dataset, hid_t dapl_id)
* Make sure all storage is properly initialized.
* This is important only for parallel I/O where the space must
* be fully allocated before I/O can happen.
*
* Storage will be initialized here if either the VFD being used
* has set the H5FD_FEAT_ALLOCATE_EARLY flag to indicate that it
* wishes to force early space allocation OR a parallel VFD is
* being used and the dataset in question doesn't have any filters
* applied to it. If filters are applied to the dataset, collective
* I/O will be required when writing to the dataset, so we don't
* need to initialize storage here, as the collective I/O process
* will coordinate that.
*/
if ((H5F_INTENT(dataset->oloc.file) & H5F_ACC_RDWR) &&
!(*dataset->shared->layout.ops->is_space_alloc)(&dataset->shared->layout.storage) &&
H5F_HAS_FEATURE(dataset->oloc.file, H5FD_FEAT_ALLOCATE_EARLY)) {
H5D_io_info_t io_info;
must_init_storage = (H5F_INTENT(dataset->oloc.file) & H5F_ACC_RDWR) &&
!(*dataset->shared->layout.ops->is_space_alloc)(&dataset->shared->layout.storage);
must_init_storage = must_init_storage && (H5F_HAS_FEATURE(dataset->oloc.file, H5FD_FEAT_ALLOCATE_EARLY) ||
(H5F_HAS_FEATURE(dataset->oloc.file, H5FD_FEAT_HAS_MPI) &&
dataset->shared->dcpl_cache.pline.nused == 0));

if (must_init_storage) {
io_info.dset = dataset;

if (H5D__alloc_storage(&io_info, H5D_ALLOC_OPEN, FALSE, NULL) < 0)
if (H5D__alloc_storage(dataset, H5D_ALLOC_OPEN, FALSE, NULL) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize file storage")
} /* end if */
}

done:
if (ret_value < 0) {
Expand Down
277 changes: 277 additions & 0 deletions testpar/t_chunk_alloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -483,3 +483,280 @@ test_chunk_alloc(void)
/* reopen dataset in parallel, read and verify the data */
verify_data(filename, CHUNK_FACTOR, all, CLOSE, &file_id, &dataset);
}

/*
* A test to verify the following:
*
* - That the library forces allocation of all space in the file
* for a chunked dataset opened with parallel file access when
* that dataset:
*
* - was created with serial file access
* - was created with the default incremental file space
* allocation time
* - has no filters applied to it
*
* In this case, the library has to ensure that all the file
* space for the dataset is allocated so that the MPI processes
* can write to chunks independently of each other and still have
* a consistent view of the file.
*
* - That the library DOES NOT force allocation of all space in
* the file for a chunked dataset opened with parallel file access
* when that dataset:
*
* - was created with serial file access
* - was created with the default incremental file space
* allocation time
* - has filters applied to it
*
* In this case, writes to the dataset are required to be collective,
* so file space can be allocated incrementally in a coordinated
* fashion.
*/
void
test_chunk_alloc_incr_ser_to_par(void)
{
H5D_space_status_t space_status;
const char *filename;
hsize_t dset_dims[1];
hsize_t start[1];
hsize_t stride[1];
hsize_t count[1];
hsize_t block[1];
hsize_t alloc_size;
size_t nchunks;
herr_t ret;
hid_t fid = H5I_INVALID_HID;
hid_t fapl_id = H5I_INVALID_HID;
hid_t dset_id = H5I_INVALID_HID;
hid_t fspace_id = H5I_INVALID_HID;
hid_t dxpl_id = H5I_INVALID_HID;
int *data = NULL;
int *correct_data = NULL;
int *read_data = NULL;

MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);

filename = (const char *)GetTestParameters();
if (MAINPROCESS && VERBOSE_MED)
printf("Chunked dataset incremental file space allocation serial to parallel test on file %s\n",
filename);

nchunks = (size_t)(CHUNK_FACTOR * mpi_size);
dset_dims[0] = (hsize_t)(nchunks * CHUNK_SIZE);

if (mpi_rank == 0) {
hsize_t chunk_dims[1] = {CHUNK_SIZE};
hid_t space_id = H5I_INVALID_HID;
hid_t dcpl_id = H5I_INVALID_HID;

fid = H5Fcreate(filename, H5F_ACC_TRUNC, H5P_DEFAULT, H5P_DEFAULT);
VRFY((fid >= 0), "H5Fcreate");

dcpl_id = H5Pcreate(H5P_DATASET_CREATE);
VRFY((dcpl_id >= 0), "H5Pcreate");

ret = H5Pset_chunk(dcpl_id, 1, chunk_dims);
VRFY((ret == SUCCEED), "H5Pset_chunk");

ret = H5Pset_alloc_time(dcpl_id, H5D_ALLOC_TIME_INCR);
VRFY((ret == SUCCEED), "H5Pset_alloc_time");

space_id = H5Screate_simple(1, dset_dims, NULL);
VRFY((space_id >= 0), "H5Screate_simple");

/* Create a chunked dataset without a filter applied to it */
dset_id =
H5Dcreate2(fid, "dset_no_filter", H5T_NATIVE_INT, space_id, H5P_DEFAULT, dcpl_id, H5P_DEFAULT);
VRFY((dset_id >= 0), "H5Dcreate2");

ret = H5Dclose(dset_id);
VRFY((ret == SUCCEED), "H5Dclose");

/* Create a chunked dataset with a filter applied to it */
ret = H5Pset_shuffle(dcpl_id);
VRFY((ret == SUCCEED), "H5Pset_shuffle");

dset_id = H5Dcreate2(fid, "dset_filter", H5T_NATIVE_INT, space_id, H5P_DEFAULT, dcpl_id, H5P_DEFAULT);
VRFY((dset_id >= 0), "H5Dcreate2");

ret = H5Dclose(dset_id);
VRFY((ret == SUCCEED), "H5Dclose");
ret = H5Pclose(dcpl_id);
VRFY((ret == SUCCEED), "H5Pclose");
ret = H5Sclose(space_id);
VRFY((ret == SUCCEED), "H5Sclose");
ret = H5Fclose(fid);
VRFY((ret == SUCCEED), "H5Fclose");
}

MPI_Barrier(MPI_COMM_WORLD);

fapl_id = H5Pcreate(H5P_FILE_ACCESS);
VRFY((fapl_id >= 0), "H5Pcreate");

ret = H5Pset_fapl_mpio(fapl_id, MPI_COMM_WORLD, MPI_INFO_NULL);
VRFY((ret == SUCCEED), "H5Pset_fapl_mpio");

fid = H5Fopen(filename, H5F_ACC_RDWR, fapl_id);
VRFY((fid >= 0), "H5Fopen");

data = malloc((dset_dims[0] / (hsize_t)mpi_size) * sizeof(int));
VRFY(data, "malloc");
read_data = malloc(dset_dims[0] * sizeof(int));
VRFY(read_data, "malloc");
correct_data = malloc(dset_dims[0] * sizeof(int));
VRFY(correct_data, "malloc");

/*
* Check the file space allocation status/size and dataset
* data before and after writing to the dataset without a
* filter
*/
dset_id = H5Dopen2(fid, "dset_no_filter", H5P_DEFAULT);
VRFY((dset_id >= 0), "H5Dopen2");

ret = H5Dget_space_status(dset_id, &space_status);
VRFY((ret == SUCCEED), "H5Dread");

VRFY((space_status == H5D_SPACE_STATUS_ALLOCATED), "file space allocation status verification succeeded");

alloc_size = H5Dget_storage_size(dset_id);
VRFY(((dset_dims[0] * sizeof(int)) == alloc_size), "file space allocation size verification succeeded");

memset(read_data, 255, dset_dims[0] * sizeof(int));
memset(correct_data, 0, dset_dims[0] * sizeof(int));

ret = H5Dread(dset_id, H5T_NATIVE_INT, H5S_BLOCK, H5S_ALL, H5P_DEFAULT, read_data);
VRFY((ret == SUCCEED), "H5Dread");

MPI_Barrier(MPI_COMM_WORLD);

VRFY((0 == memcmp(read_data, correct_data, dset_dims[0] * sizeof(int))), "data verification succeeded");

fspace_id = H5Dget_space(dset_id);
VRFY((ret == SUCCEED), "H5Dget_space");

start[0] = ((hsize_t)mpi_rank * (dset_dims[0] / (hsize_t)mpi_size));
stride[0] = 1;
count[0] = (dset_dims[0] / (hsize_t)mpi_size);
block[0] = 1;

ret = H5Sselect_hyperslab(fspace_id, H5S_SELECT_SET, start, stride, count, block);
VRFY((ret == SUCCEED), "H5Sselect_hyperslab");

memset(data, 255, (dset_dims[0] / (hsize_t)mpi_size) * sizeof(int));

ret = H5Dwrite(dset_id, H5T_NATIVE_INT, H5S_BLOCK, fspace_id, H5P_DEFAULT, data);
VRFY((ret == SUCCEED), "H5Dwrite");

MPI_Barrier(MPI_COMM_WORLD);

ret = H5Dget_space_status(dset_id, &space_status);
VRFY((ret == SUCCEED), "H5Dread");

VRFY((space_status == H5D_SPACE_STATUS_ALLOCATED), "file space allocation status verification succeeded");

alloc_size = H5Dget_storage_size(dset_id);
VRFY(((dset_dims[0] * sizeof(int)) == alloc_size), "file space allocation size verification succeeded");

memset(read_data, 0, dset_dims[0] * sizeof(int));
memset(correct_data, 255, dset_dims[0] * sizeof(int));

ret = H5Dread(dset_id, H5T_NATIVE_INT, H5S_BLOCK, H5S_ALL, H5P_DEFAULT, read_data);
VRFY((ret == SUCCEED), "H5Dread");

MPI_Barrier(MPI_COMM_WORLD);

VRFY((0 == memcmp(read_data, correct_data, dset_dims[0] * sizeof(int))), "data verification succeeded");

ret = H5Sclose(fspace_id);
VRFY((ret == SUCCEED), "H5Sclose");
ret = H5Dclose(dset_id);
VRFY((ret == SUCCEED), "H5Dclose");

/*
* Check the file space allocation status/size and dataset
* data before and after writing to the dataset with a
* filter
*/
dset_id = H5Dopen2(fid, "dset_filter", H5P_DEFAULT);
VRFY((dset_id >= 0), "H5Dopen2");

ret = H5Dget_space_status(dset_id, &space_status);
VRFY((ret == SUCCEED), "H5Dread");

VRFY((space_status == H5D_SPACE_STATUS_NOT_ALLOCATED),
"file space allocation status verification succeeded");

alloc_size = H5Dget_storage_size(dset_id);
VRFY((0 == alloc_size), "file space allocation size verification succeeded");

memset(read_data, 255, dset_dims[0] * sizeof(int));
memset(correct_data, 0, dset_dims[0] * sizeof(int));

ret = H5Dread(dset_id, H5T_NATIVE_INT, H5S_BLOCK, H5S_ALL, H5P_DEFAULT, read_data);
VRFY((ret == SUCCEED), "H5Dread");

MPI_Barrier(MPI_COMM_WORLD);

VRFY((0 == memcmp(read_data, correct_data, dset_dims[0] * sizeof(int))), "data verification succeeded");

fspace_id = H5Dget_space(dset_id);
VRFY((ret == SUCCEED), "H5Dget_space");

start[0] = ((hsize_t)mpi_rank * (dset_dims[0] / (hsize_t)mpi_size));
stride[0] = 1;
count[0] = (dset_dims[0] / (hsize_t)mpi_size);
block[0] = 1;

ret = H5Sselect_hyperslab(fspace_id, H5S_SELECT_SET, start, stride, count, block);
VRFY((ret == SUCCEED), "H5Sselect_hyperslab");

memset(data, 255, (dset_dims[0] / (hsize_t)mpi_size) * sizeof(int));

dxpl_id = H5Pcreate(H5P_DATASET_XFER);
VRFY((dxpl_id >= 0), "H5Pcreate");

ret = H5Pset_dxpl_mpio(dxpl_id, H5FD_MPIO_COLLECTIVE);
VRFY((ret == SUCCEED), "H5Pset_dxpl_mpio");

ret = H5Dwrite(dset_id, H5T_NATIVE_INT, H5S_BLOCK, fspace_id, dxpl_id, data);
VRFY((ret == SUCCEED), "H5Dwrite");

MPI_Barrier(MPI_COMM_WORLD);

ret = H5Dget_space_status(dset_id, &space_status);
VRFY((ret == SUCCEED), "H5Dread");

VRFY((space_status == H5D_SPACE_STATUS_ALLOCATED), "file space allocation status verification succeeded");

alloc_size = H5Dget_storage_size(dset_id);
VRFY(((dset_dims[0] * sizeof(int)) == alloc_size), "file space allocation size verification succeeded");

memset(read_data, 0, dset_dims[0] * sizeof(int));
memset(correct_data, 255, dset_dims[0] * sizeof(int));

ret = H5Dread(dset_id, H5T_NATIVE_INT, H5S_BLOCK, H5S_ALL, H5P_DEFAULT, read_data);
VRFY((ret == SUCCEED), "H5Dread");

MPI_Barrier(MPI_COMM_WORLD);

VRFY((0 == memcmp(read_data, correct_data, dset_dims[0] * sizeof(int))), "data verification succeeded");

ret = H5Pclose(dxpl_id);
VRFY((ret == SUCCEED), "H5Pclose");
ret = H5Sclose(fspace_id);
VRFY((ret == SUCCEED), "H5Sclose");
ret = H5Dclose(dset_id);
VRFY((ret == SUCCEED), "H5Dclose");

free(correct_data);
free(read_data);
free(data);

H5Pclose(fapl_id);
H5Fclose(fid);
}
2 changes: 2 additions & 0 deletions testpar/testphdf5.c
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,8 @@ main(int argc, char **argv)
AddTest("selnone", none_selection_chunk, NULL, "chunked dataset with none-selection", PARATESTFILE);
AddTest("calloc", test_chunk_alloc, NULL, "parallel extend Chunked allocation on serial file",
PARATESTFILE);
AddTest("chkallocser2par", test_chunk_alloc_incr_ser_to_par, NULL,
"chunk allocation from serial to parallel file access", PARATESTFILE);
AddTest("fltread", test_filter_read, NULL, "parallel read of dataset written serially with filters",
PARATESTFILE);

Expand Down
1 change: 1 addition & 0 deletions testpar/testphdf5.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ void none_selection_chunk(void);
void actual_io_mode_tests(void);
void no_collective_cause_tests(void);
void test_chunk_alloc(void);
void test_chunk_alloc_incr_ser_to_par(void);
void test_filter_read(void);
void compact_dataset(void);
void null_dataset(void);
Expand Down

0 comments on commit f83f1fc

Please sign in to comment.