Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Revert "[1.10 Merge] Fix serial to parallel chunked dataset f… #3486

Merged
merged 1 commit into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions release_docs/RELEASE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,17 @@ Bug Fixes since HDF5-1.10.10 release
===================================
Library
-------
- Fixed a file space allocation bug in the parallel library for chunked
datasets

With the addition of support for incremental file space allocation for
chunked datasets with filters applied to them that are created/accessed
in parallel, a bug was introduced to the library's parallel file space
allocation code. This could cause file space to not be allocated correctly
for datasets without filters applied to them that are created with serial
file access and later opened with parallel file access. In turn, this could
cause parallel writes to those datasets to place incorrect data in the file.

- Fixed an assertion failure in Parallel HDF5 when a file can't be created
due to an invalid library version bounds setting

Expand Down
34 changes: 24 additions & 10 deletions src/H5Dint.c
Original file line number Diff line number Diff line change
Expand Up @@ -1647,12 +1647,13 @@ H5D__append_flush_setup(H5D_t *dset, hid_t dapl_id)
static herr_t
H5D__open_oid(H5D_t *dataset, hid_t dapl_id)
{
H5P_genplist_t *plist; /* Property list */
H5O_fill_t *fill_prop; /* Pointer to dataset's fill value info */
unsigned alloc_time_state; /* Allocation time state */
htri_t msg_exists; /* Whether a particular type of message exists */
hbool_t layout_init = FALSE; /* Flag to indicate that chunk information was initialized */
herr_t ret_value = SUCCEED; /* Return value */
H5P_genplist_t *plist; /* Property list */
H5O_fill_t *fill_prop; /* Pointer to dataset's fill value info */
unsigned alloc_time_state; /* Allocation time state */
htri_t msg_exists; /* Whether a particular type of message exists */
hbool_t layout_init = FALSE; /* Flag to indicate that chunk information was initialized */
hbool_t must_init_storage = FALSE;
herr_t ret_value = SUCCEED; /* Return value */

FUNC_ENTER_STATIC_TAG(dataset->oloc.addr)

Expand Down Expand Up @@ -1794,17 +1795,30 @@ H5D__open_oid(H5D_t *dataset, hid_t dapl_id)
* Make sure all storage is properly initialized.
* This is important only for parallel I/O where the space must
* be fully allocated before I/O can happen.
*
* Storage will be initialized here if either the VFD being used
* has set the H5FD_FEAT_ALLOCATE_EARLY flag to indicate that it
* wishes to force early space allocation OR a parallel VFD is
* being used and the dataset in question doesn't have any filters
* applied to it. If filters are applied to the dataset, collective
* I/O will be required when writing to the dataset, so we don't
* need to initialize storage here, as the collective I/O process
* will coordinate that.
*/
if ((H5F_INTENT(dataset->oloc.file) & H5F_ACC_RDWR) &&
!(*dataset->shared->layout.ops->is_space_alloc)(&dataset->shared->layout.storage) &&
H5F_HAS_FEATURE(dataset->oloc.file, H5FD_FEAT_ALLOCATE_EARLY)) {
must_init_storage = (H5F_INTENT(dataset->oloc.file) & H5F_ACC_RDWR) &&
!(*dataset->shared->layout.ops->is_space_alloc)(&dataset->shared->layout.storage);
must_init_storage = must_init_storage && (H5F_HAS_FEATURE(dataset->oloc.file, H5FD_FEAT_ALLOCATE_EARLY) ||
(H5F_HAS_FEATURE(dataset->oloc.file, H5FD_FEAT_HAS_MPI) &&
dataset->shared->dcpl_cache.pline.nused == 0));

if (must_init_storage) {
H5D_io_info_t io_info;

io_info.dset = dataset;

if (H5D__alloc_storage(&io_info, H5D_ALLOC_OPEN, FALSE, NULL) < 0)
HGOTO_ERROR(H5E_DATASET, H5E_CANTINIT, FAIL, "unable to initialize file storage")
} /* end if */
}

done:
if (ret_value < 0) {
Expand Down
295 changes: 295 additions & 0 deletions testpar/t_chunk_alloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -483,3 +483,298 @@ test_chunk_alloc(void)
/* reopen dataset in parallel, read and verify the data */
verify_data(filename, CHUNK_FACTOR, all, CLOSE, &file_id, &dataset);
}

/*
* A test to verify the following:
*
* - That the library forces allocation of all space in the file
* for a chunked dataset opened with parallel file access when
* that dataset:
*
* - was created with serial file access
* - was created with the default incremental file space
* allocation time
* - has no filters applied to it
*
* In this case, the library has to ensure that all the file
* space for the dataset is allocated so that the MPI processes
* can write to chunks independently of each other and still have
* a consistent view of the file.
*
* - That the library DOES NOT force allocation of all space in
* the file for a chunked dataset opened with parallel file access
* when that dataset:
*
* - was created with serial file access
* - was created with the default incremental file space
* allocation time
* - has filters applied to it
*
* In this case, writes to the dataset are required to be collective,
* so file space can be allocated incrementally in a coordinated
* fashion.
*/
void
test_chunk_alloc_incr_ser_to_par(void)
{
H5D_space_status_t space_status;
const char *filename;
hsize_t dset_dims[1];
hsize_t mem_dims[1];
hsize_t start[1];
hsize_t stride[1];
hsize_t count[1];
hsize_t block[1];
hsize_t alloc_size;
size_t nchunks;
herr_t ret;
hid_t fid = H5I_INVALID_HID;
hid_t fapl_id = H5I_INVALID_HID;
hid_t dset_id = H5I_INVALID_HID;
hid_t fspace_id = H5I_INVALID_HID;
hid_t mspace_id = H5I_INVALID_HID;
hid_t dxpl_id = H5I_INVALID_HID;
int *data = NULL;
int *correct_data = NULL;
int *read_data = NULL;

MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);

filename = (const char *)GetTestParameters();
if (MAINPROCESS && VERBOSE_MED)
printf("Chunked dataset incremental file space allocation serial to parallel test on file %s\n",
filename);

nchunks = (size_t)(CHUNK_FACTOR * mpi_size);
dset_dims[0] = (hsize_t)(nchunks * CHUNK_SIZE);

if (mpi_rank == 0) {
hsize_t chunk_dims[1] = {CHUNK_SIZE};
hid_t space_id = H5I_INVALID_HID;
hid_t dcpl_id = H5I_INVALID_HID;

fid = H5Fcreate(filename, H5F_ACC_TRUNC, H5P_DEFAULT, H5P_DEFAULT);
VRFY((fid >= 0), "H5Fcreate");

dcpl_id = H5Pcreate(H5P_DATASET_CREATE);
VRFY((dcpl_id >= 0), "H5Pcreate");

ret = H5Pset_chunk(dcpl_id, 1, chunk_dims);
VRFY((ret == SUCCEED), "H5Pset_chunk");

ret = H5Pset_alloc_time(dcpl_id, H5D_ALLOC_TIME_INCR);
VRFY((ret == SUCCEED), "H5Pset_alloc_time");

space_id = H5Screate_simple(1, dset_dims, NULL);
VRFY((space_id >= 0), "H5Screate_simple");

/* Create a chunked dataset without a filter applied to it */
dset_id =
H5Dcreate2(fid, "dset_no_filter", H5T_NATIVE_INT, space_id, H5P_DEFAULT, dcpl_id, H5P_DEFAULT);
VRFY((dset_id >= 0), "H5Dcreate2");

ret = H5Dclose(dset_id);
VRFY((ret == SUCCEED), "H5Dclose");

/* Create a chunked dataset with a filter applied to it */
ret = H5Pset_shuffle(dcpl_id);
VRFY((ret == SUCCEED), "H5Pset_shuffle");

dset_id = H5Dcreate2(fid, "dset_filter", H5T_NATIVE_INT, space_id, H5P_DEFAULT, dcpl_id, H5P_DEFAULT);
VRFY((dset_id >= 0), "H5Dcreate2");

ret = H5Dclose(dset_id);
VRFY((ret == SUCCEED), "H5Dclose");
ret = H5Pclose(dcpl_id);
VRFY((ret == SUCCEED), "H5Pclose");
ret = H5Sclose(space_id);
VRFY((ret == SUCCEED), "H5Sclose");
ret = H5Fclose(fid);
VRFY((ret == SUCCEED), "H5Fclose");
}

MPI_Barrier(MPI_COMM_WORLD);

fapl_id = H5Pcreate(H5P_FILE_ACCESS);
VRFY((fapl_id >= 0), "H5Pcreate");

ret = H5Pset_fapl_mpio(fapl_id, MPI_COMM_WORLD, MPI_INFO_NULL);
VRFY((ret == SUCCEED), "H5Pset_fapl_mpio");

fid = H5Fopen(filename, H5F_ACC_RDWR, fapl_id);
VRFY((fid >= 0), "H5Fopen");

data = malloc((dset_dims[0] / (hsize_t)mpi_size) * sizeof(int));
VRFY(data, "malloc");
read_data = malloc(dset_dims[0] * sizeof(int));
VRFY(read_data, "malloc");
correct_data = malloc(dset_dims[0] * sizeof(int));
VRFY(correct_data, "malloc");

/*
* Check the file space allocation status/size and dataset
* data before and after writing to the dataset without a
* filter
*/
dset_id = H5Dopen2(fid, "dset_no_filter", H5P_DEFAULT);
VRFY((dset_id >= 0), "H5Dopen2");

ret = H5Dget_space_status(dset_id, &space_status);
VRFY((ret == SUCCEED), "H5Dread");

VRFY((space_status == H5D_SPACE_STATUS_ALLOCATED), "file space allocation status verification succeeded");

alloc_size = H5Dget_storage_size(dset_id);
VRFY(((dset_dims[0] * sizeof(int)) == alloc_size), "file space allocation size verification succeeded");

memset(read_data, 255, dset_dims[0] * sizeof(int));
memset(correct_data, 0, dset_dims[0] * sizeof(int));

ret = H5Dread(dset_id, H5T_NATIVE_INT, H5S_ALL, H5S_ALL, H5P_DEFAULT, read_data);
VRFY((ret == SUCCEED), "H5Dread");

MPI_Barrier(MPI_COMM_WORLD);

VRFY((0 == memcmp(read_data, correct_data, dset_dims[0] * sizeof(int))), "data verification succeeded");

fspace_id = H5Dget_space(dset_id);
VRFY((ret == SUCCEED), "H5Dget_space");

start[0] = ((hsize_t)mpi_rank * (dset_dims[0] / (hsize_t)mpi_size));
stride[0] = 1;
count[0] = (dset_dims[0] / (hsize_t)mpi_size);
block[0] = 1;

ret = H5Sselect_hyperslab(fspace_id, H5S_SELECT_SET, start, stride, count, block);
VRFY((ret == SUCCEED), "H5Sselect_hyperslab");

mem_dims[0] = count[0] * block[0];

mspace_id = H5Screate_simple(1, mem_dims, NULL);
VRFY((mspace_id >= 0), "H5Screate_simple");

memset(data, 255, (dset_dims[0] / (hsize_t)mpi_size) * sizeof(int));

ret = H5Dwrite(dset_id, H5T_NATIVE_INT, mspace_id, fspace_id, H5P_DEFAULT, data);
VRFY((ret == SUCCEED), "H5Dwrite");

ret = H5Sclose(mspace_id);
VRFY((ret == SUCCEED), "H5Sclose");

MPI_Barrier(MPI_COMM_WORLD);

ret = H5Dget_space_status(dset_id, &space_status);
VRFY((ret == SUCCEED), "H5Dread");

VRFY((space_status == H5D_SPACE_STATUS_ALLOCATED), "file space allocation status verification succeeded");

alloc_size = H5Dget_storage_size(dset_id);
VRFY(((dset_dims[0] * sizeof(int)) == alloc_size), "file space allocation size verification succeeded");

memset(read_data, 0, dset_dims[0] * sizeof(int));
memset(correct_data, 255, dset_dims[0] * sizeof(int));

ret = H5Dread(dset_id, H5T_NATIVE_INT, H5S_ALL, H5S_ALL, H5P_DEFAULT, read_data);
VRFY((ret == SUCCEED), "H5Dread");

MPI_Barrier(MPI_COMM_WORLD);

VRFY((0 == memcmp(read_data, correct_data, dset_dims[0] * sizeof(int))), "data verification succeeded");

ret = H5Sclose(fspace_id);
VRFY((ret == SUCCEED), "H5Sclose");
ret = H5Dclose(dset_id);
VRFY((ret == SUCCEED), "H5Dclose");

/*
* Check the file space allocation status/size and dataset
* data before and after writing to the dataset with a
* filter
*/
dset_id = H5Dopen2(fid, "dset_filter", H5P_DEFAULT);
VRFY((dset_id >= 0), "H5Dopen2");

ret = H5Dget_space_status(dset_id, &space_status);
VRFY((ret == SUCCEED), "H5Dread");

VRFY((space_status == H5D_SPACE_STATUS_NOT_ALLOCATED),
"file space allocation status verification succeeded");

alloc_size = H5Dget_storage_size(dset_id);
VRFY((0 == alloc_size), "file space allocation size verification succeeded");

memset(read_data, 255, dset_dims[0] * sizeof(int));
memset(correct_data, 0, dset_dims[0] * sizeof(int));

ret = H5Dread(dset_id, H5T_NATIVE_INT, H5S_ALL, H5S_ALL, H5P_DEFAULT, read_data);
VRFY((ret == SUCCEED), "H5Dread");

MPI_Barrier(MPI_COMM_WORLD);

VRFY((0 == memcmp(read_data, correct_data, dset_dims[0] * sizeof(int))), "data verification succeeded");

fspace_id = H5Dget_space(dset_id);
VRFY((ret == SUCCEED), "H5Dget_space");

start[0] = ((hsize_t)mpi_rank * (dset_dims[0] / (hsize_t)mpi_size));
stride[0] = 1;
count[0] = (dset_dims[0] / (hsize_t)mpi_size);
block[0] = 1;

ret = H5Sselect_hyperslab(fspace_id, H5S_SELECT_SET, start, stride, count, block);
VRFY((ret == SUCCEED), "H5Sselect_hyperslab");

mem_dims[0] = count[0] * block[0];

mspace_id = H5Screate_simple(1, mem_dims, NULL);
VRFY((mspace_id >= 0), "H5Screate_simple");

memset(data, 255, (dset_dims[0] / (hsize_t)mpi_size) * sizeof(int));

dxpl_id = H5Pcreate(H5P_DATASET_XFER);
VRFY((dxpl_id >= 0), "H5Pcreate");

ret = H5Pset_dxpl_mpio(dxpl_id, H5FD_MPIO_COLLECTIVE);
VRFY((ret == SUCCEED), "H5Pset_dxpl_mpio");

ret = H5Dwrite(dset_id, H5T_NATIVE_INT, mspace_id, fspace_id, dxpl_id, data);
VRFY((ret == SUCCEED), "H5Dwrite");

ret = H5Sclose(mspace_id);
VRFY((ret == SUCCEED), "H5Sclose");

MPI_Barrier(MPI_COMM_WORLD);

ret = H5Dget_space_status(dset_id, &space_status);
VRFY((ret == SUCCEED), "H5Dread");

VRFY((space_status == H5D_SPACE_STATUS_ALLOCATED), "file space allocation status verification succeeded");

alloc_size = H5Dget_storage_size(dset_id);
VRFY(((dset_dims[0] * sizeof(int)) == alloc_size), "file space allocation size verification succeeded");

memset(read_data, 0, dset_dims[0] * sizeof(int));
memset(correct_data, 255, dset_dims[0] * sizeof(int));

ret = H5Dread(dset_id, H5T_NATIVE_INT, H5S_ALL, H5S_ALL, H5P_DEFAULT, read_data);
VRFY((ret == SUCCEED), "H5Dread");

MPI_Barrier(MPI_COMM_WORLD);

VRFY((0 == memcmp(read_data, correct_data, dset_dims[0] * sizeof(int))), "data verification succeeded");

ret = H5Pclose(dxpl_id);
VRFY((ret == SUCCEED), "H5Pclose");
ret = H5Sclose(fspace_id);
VRFY((ret == SUCCEED), "H5Sclose");
ret = H5Dclose(dset_id);
VRFY((ret == SUCCEED), "H5Dclose");

free(correct_data);
free(read_data);
free(data);

H5Pclose(fapl_id);
H5Fclose(fid);
}
2 changes: 2 additions & 0 deletions testpar/testphdf5.c
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,8 @@ main(int argc, char **argv)
AddTest("selnone", none_selection_chunk, NULL, "chunked dataset with none-selection", PARATESTFILE);
AddTest("calloc", test_chunk_alloc, NULL, "parallel extend Chunked allocation on serial file",
PARATESTFILE);
AddTest("chkallocser2par", test_chunk_alloc_incr_ser_to_par, NULL,
"chunk allocation from serial to parallel file access", PARATESTFILE);
AddTest("fltread", test_filter_read, NULL, "parallel read of dataset written serially with filters",
PARATESTFILE);

Expand Down
1 change: 1 addition & 0 deletions testpar/testphdf5.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ void none_selection_chunk(void);
void actual_io_mode_tests(void);
void no_collective_cause_tests(void);
void test_chunk_alloc(void);
void test_chunk_alloc_incr_ser_to_par(void);
void test_filter_read(void);
void compact_dataset(void);
void null_dataset(void);
Expand Down