Skip to content

Commit

Permalink
Add support for emulating C11 atomics, if the compiler doesn't provid…
Browse files Browse the repository at this point in the history
…e them.

Signed-off-by: Quincey Koziol <[email protected]>
  • Loading branch information
qkoziol committed Mar 27, 2024
1 parent 2a0aa45 commit f68db6d
Show file tree
Hide file tree
Showing 19 changed files with 490 additions and 72 deletions.
11 changes: 4 additions & 7 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -772,13 +772,6 @@ if (HDF5_ENABLE_SUBFILING_VFD)
message (FATAL_ERROR "Subfiling requires thread operations support")
endif ()

CHECK_INCLUDE_FILE("stdatomic.h" HAVE_STDATOMIC_H)
if (NOT HAVE_STDATOMIC_H)
message (FATAL_ERROR "Subfiling VFD requires atomic operations support. C11 stdatomic.h NOT available.")
else()
set (H5_HAVE_STDATOMIC_H 1)
endif()

set (H5_HAVE_SUBFILING_VFD 1)
# IOC VFD is currently only built when subfiling is enabled
set (H5_HAVE_IOC_VFD 1)
Expand Down Expand Up @@ -915,6 +908,10 @@ if (HDF5_ENABLE_THREADSAFE)
if (Threads_FOUND)
set (H5_HAVE_THREADSAFE 1)
endif ()
CHECK_INCLUDE_FILE("stdatomic.h" HAVE_STDATOMIC_H)
if (HAVE_STDATOMIC_H)
set (H5_HAVE_STDATOMIC_H 1)
endif()
endif ()

#-----------------------------------------------------------------------------
Expand Down
10 changes: 4 additions & 6 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -2186,6 +2186,10 @@ if test "X$THREADSAFE" = "Xyes"; then
fi

fi # end of Pthreads checks

## Check for C11 atomics
AC_CHECK_HEADERS([stdatomic.h],[HAVE_STDATOMIC_H="yes"],[HAVE_STDATOMIC_H="no"])

fi # end of threadsafe processing

## ----------------------------------------------------------------------
Expand Down Expand Up @@ -3385,12 +3389,6 @@ if test "X$SUBFILING_VFD" = "Xyes"; then
]
)

HAVE_STDATOMIC_H="yes"
AC_CHECK_HEADERS([stdatomic.h],,[HAVE_STDATOMIC_H="no"])
if test "x$HAVE_STDATOMIC_H" = "xno"; then
AC_MSG_ERROR([Subfiling VFD requires atomic operations support. C11 stdatomic.h NOT available.])
fi

# Checks for threadsafe operation
if test "x$HAVE_THREADSAFE" = "xno"; then
AC_MSG_ERROR([Subfiling VFD requires thread operations support.])
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,7 @@ IDE_GENERATED_PROPERTIES ("H5T" "${H5T_HDRS}" "${H5T_SOURCES}" )

set (H5TS_SOURCES
${HDF5_SRC_DIR}/H5TS.c
${HDF5_SRC_DIR}/H5TSatomic.c
${HDF5_SRC_DIR}/H5TSbarrier.c
${HDF5_SRC_DIR}/H5TSc11.c
${HDF5_SRC_DIR}/H5TScond.c
Expand Down
6 changes: 0 additions & 6 deletions src/H5FDsubfiling/H5FDioc_priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@
#ifndef H5FDioc_priv_H
#define H5FDioc_priv_H

/********************/
/* Standard Headers */
/********************/

#include <stdatomic.h>

/**************/
/* H5 Headers */
/**************/
Expand Down
70 changes: 35 additions & 35 deletions src/H5FDsubfiling/H5FDioc_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@ typedef struct ioc_data_t {
H5TS_pool_t *io_thread_pool;
int64_t sf_context_id;

atomic_int sf_ioc_ready;
atomic_int sf_shutdown_flag;
H5TS_atomic_int_t sf_ioc_ready;
H5TS_atomic_int_t sf_shutdown_flag;
/* sf_io_ops_pending tracks the number of I/O operations pending so that we can wait
* until all I/O operations have been serviced before shutting down the worker thread pool.
* The value of this variable must always be non-negative.
*
* Note that this is a convenience variable -- we could use io_queue.q_len instead.
* However, accessing this field requires locking io_queue.q_mutex.
*/
atomic_int sf_io_ops_pending;
atomic_int sf_work_pending;
H5TS_atomic_int_t sf_io_ops_pending;
H5TS_atomic_int_t sf_work_pending;
} ioc_data_t;

#ifdef H5FD_IOC_COLLECT_STATS
Expand Down Expand Up @@ -142,10 +142,10 @@ initialize_ioc_threads(void *_sf_context)
sf_context->ioc_data = ioc_data;

/* Initialize atomic vars */
atomic_init(&ioc_data->sf_ioc_ready, 0);
atomic_init(&ioc_data->sf_shutdown_flag, 0);
atomic_init(&ioc_data->sf_io_ops_pending, 0);
atomic_init(&ioc_data->sf_work_pending, 0);
H5TS_atomic_init_int(&ioc_data->sf_ioc_ready, 0);
H5TS_atomic_init_int(&ioc_data->sf_shutdown_flag, 0);
H5TS_atomic_init_int(&ioc_data->sf_io_ops_pending, 0);
H5TS_atomic_init_int(&ioc_data->sf_work_pending, 0);

#ifdef H5FD_IOC_COLLECT_STATS
t_start = MPI_Wtime();
Expand All @@ -171,7 +171,7 @@ initialize_ioc_threads(void *_sf_context)
H5_SUBFILING_GOTO_ERROR(H5E_RESOURCE, H5E_CANTINIT, (-1), "can't create IOC main thread");

/* Wait until ioc_main() reports that it is ready */
while (atomic_load(&ioc_data->sf_ioc_ready) != 1) {
while (H5TS_atomic_load_int(&ioc_data->sf_ioc_ready) != 1) {
usleep(20);
}

Expand Down Expand Up @@ -203,18 +203,18 @@ finalize_ioc_threads(void *_sf_context)

ioc_data = sf_context->ioc_data;
if (ioc_data) {
assert(0 == atomic_load(&ioc_data->sf_shutdown_flag));
assert(0 == H5TS_atomic_load_int(&ioc_data->sf_shutdown_flag));

/* Shutdown the main IOC thread */
atomic_store(&ioc_data->sf_shutdown_flag, 1);
H5TS_atomic_store_int(&ioc_data->sf_shutdown_flag, 1);

/* Allow ioc_main to exit.*/
do {
usleep(20);
} while (0 != atomic_load(&ioc_data->sf_shutdown_flag));
} while (0 != H5TS_atomic_load_int(&ioc_data->sf_shutdown_flag));

/* Tear down IOC worker thread pool */
assert(0 == atomic_load(&ioc_data->sf_io_ops_pending));
assert(0 == H5TS_atomic_load_int(&ioc_data->sf_io_ops_pending));
H5TS_pool_destroy(ioc_data->io_thread_pool);

H5TS_mutex_destroy(&ioc_data->io_queue.q_mutex);
Expand Down Expand Up @@ -337,12 +337,12 @@ ioc_main(ioc_data_t *ioc_data)
*/

/* tell initialize_ioc_threads() that ioc_main() is ready to enter its main loop */
atomic_store(&ioc_data->sf_ioc_ready, 1);
H5TS_atomic_store_int(&ioc_data->sf_ioc_ready, 1);

shutdown_requested = 0;

while ((!shutdown_requested) || (0 < atomic_load(&ioc_data->sf_io_ops_pending)) ||
(0 < atomic_load(&ioc_data->sf_work_pending))) {
while ((!shutdown_requested) || (0 < H5TS_atomic_load_int(&ioc_data->sf_io_ops_pending)) ||
(0 < H5TS_atomic_load_int(&ioc_data->sf_work_pending))) {
MPI_Status status;
int flag = 0;
int mpi_code;
Expand Down Expand Up @@ -395,7 +395,7 @@ ioc_main(ioc_data_t *ioc_data)

ioc_io_queue_add_entry(ioc_data, &wk_req);

assert(atomic_load(&ioc_data->sf_io_ops_pending) >= 0);
assert(H5TS_atomic_load_int(&ioc_data->sf_io_ops_pending) >= 0);
}
else {
struct timespec sleep_spec = {0, IOC_MAIN_SLEEP_DELAY};
Expand All @@ -405,11 +405,11 @@ ioc_main(ioc_data_t *ioc_data)

ioc_io_queue_dispatch_eligible_entries(ioc_data, flag ? 0 : 1);

shutdown_requested = atomic_load(&ioc_data->sf_shutdown_flag);
shutdown_requested = H5TS_atomic_load_int(&ioc_data->sf_shutdown_flag);
}

/* Reset the shutdown flag */
atomic_store(&ioc_data->sf_shutdown_flag, 0);
H5TS_atomic_store_int(&ioc_data->sf_shutdown_flag, 0);

done:
H5_SUBFILING_FUNC_LEAVE;
Expand Down Expand Up @@ -488,7 +488,7 @@ handle_work_request(void *arg)
ioc_data = sf_context->ioc_data;
assert(ioc_data);

atomic_fetch_add(&ioc_data->sf_work_pending, 1);
H5TS_atomic_fetch_add_int(&ioc_data->sf_work_pending, 1);

switch (msg->tag) {
case WRITE_INDEP:
Expand Down Expand Up @@ -519,7 +519,7 @@ handle_work_request(void *arg)
break;
}

atomic_fetch_sub(&ioc_data->sf_work_pending, 1);
H5TS_atomic_fetch_sub_int(&ioc_data->sf_work_pending, 1);

if (op_ret < 0) {
#ifdef H5_SUBFILING_DEBUG
Expand All @@ -535,15 +535,15 @@ handle_work_request(void *arg)

#ifdef H5FD_IOC_DEBUG
{
int curr_io_ops_pending = atomic_load(&ioc_data->sf_io_ops_pending);
int curr_io_ops_pending = H5TS_atomic_load_int(&ioc_data->sf_io_ops_pending);
assert(curr_io_ops_pending > 0);
}
#endif

/* complete the I/O request */
ioc_io_queue_complete_entry(ioc_data, q_entry_ptr);

assert(atomic_load(&ioc_data->sf_io_ops_pending) >= 0);
assert(H5TS_atomic_load_int(&ioc_data->sf_io_ops_pending) >= 0);

/* Check the I/O Queue to see if there are any dispatchable entries */
ioc_io_queue_dispatch_eligible_entries(ioc_data, 1);
Expand Down Expand Up @@ -1272,15 +1272,15 @@ ioc_io_queue_add_entry(ioc_data_t *ioc_data, sf_work_request_t *wk_req_ptr)
/* must obtain io_queue mutex before appending */
H5TS_mutex_lock(&ioc_data->io_queue.q_mutex);

assert(ioc_data->io_queue.q_len == atomic_load(&ioc_data->sf_io_ops_pending));
assert(ioc_data->io_queue.q_len == H5TS_atomic_load_int(&ioc_data->sf_io_ops_pending));

entry_ptr->counter = ioc_data->io_queue.req_counter++;

ioc_data->io_queue.num_pending++;

H5FD_IOC__Q_APPEND(&ioc_data->io_queue, entry_ptr);

atomic_fetch_add(&ioc_data->sf_io_ops_pending, 1);
H5TS_atomic_fetch_add_int(&ioc_data->sf_io_ops_pending, 1);

#ifdef H5_SUBFILING_DEBUG
H5_subfiling_log(
Expand All @@ -1289,7 +1289,7 @@ ioc_io_queue_add_entry(ioc_data_t *ioc_data, sf_work_request_t *wk_req_ptr)
entry_ptr->counter, (entry_ptr->wk_req.tag), (long long)(entry_ptr->wk_req.header[0]),
(long long)(entry_ptr->wk_req.header[1]), (long long)(entry_ptr->wk_req.header[2]),
ioc_data->io_queue.num_pending, ioc_data->io_queue.num_in_progress,
atomic_load(&ioc_data->sf_io_ops_pending));
H5TS_atomic_load_int(&ioc_data->sf_io_ops_pending));
#endif

assert(ioc_data->io_queue.num_pending + ioc_data->io_queue.num_in_progress == ioc_data->io_queue.q_len);
Expand Down Expand Up @@ -1322,15 +1322,15 @@ ioc_io_queue_add_entry(ioc_data_t *ioc_data, sf_work_request_t *wk_req_ptr)
#endif

#ifdef H5_SUBFILING_DEBUG
if (ioc_data->io_queue.q_len != atomic_load(&ioc_data->sf_io_ops_pending)) {
if (ioc_data->io_queue.q_len != H5TS_atomic_load_int(&ioc_data->sf_io_ops_pending)) {
H5_subfiling_log(
wk_req_ptr->context_id,
"%s: ioc_data->io_queue->q_len = %d != %d = atomic_load(&ioc_data->sf_io_ops_pending).", __func__,
ioc_data->io_queue.q_len, atomic_load(&ioc_data->sf_io_ops_pending));
"%s: ioc_data->io_queue->q_len = %d != %d = H5TS_atomic_load_int(&ioc_data->sf_io_ops_pending).", __func__,
ioc_data->io_queue.q_len, H5TS_atomic_load_int(&ioc_data->sf_io_ops_pending));
}
#endif

assert(ioc_data->io_queue.q_len == atomic_load(&ioc_data->sf_io_ops_pending));
assert(ioc_data->io_queue.q_len == H5TS_atomic_load_int(&ioc_data->sf_io_ops_pending));

H5TS_mutex_unlock(&ioc_data->io_queue.q_mutex);

Expand Down Expand Up @@ -1495,7 +1495,7 @@ ioc_io_queue_dispatch_eligible_entries(ioc_data_t *ioc_data, bool try_lock)
__func__, entry_ptr->counter, (entry_ptr->wk_req.tag),
(long long)(entry_ptr->wk_req.header[0]), (long long)(entry_ptr->wk_req.header[1]),
(long long)(entry_ptr->wk_req.header[2]), ioc_data->io_queue.num_pending,
ioc_data->io_queue.num_in_progress, atomic_load(&ioc_data->sf_io_ops_pending));
ioc_data->io_queue.num_in_progress, H5TS_atomic_load_int(&ioc_data->sf_io_ops_pending));
#endif

#ifdef H5FD_IOC_COLLECT_STATS
Expand All @@ -1515,7 +1515,7 @@ ioc_io_queue_dispatch_eligible_entries(ioc_data_t *ioc_data, bool try_lock)
entry_ptr = entry_ptr->next;
}

assert(ioc_data->io_queue.q_len == atomic_load(&ioc_data->sf_io_ops_pending));
assert(ioc_data->io_queue.q_len == H5TS_atomic_load_int(&ioc_data->sf_io_ops_pending));

H5TS_mutex_unlock(&ioc_data->io_queue.q_mutex);
} /* ioc_io_queue_dispatch_eligible_entries() */
Expand Down Expand Up @@ -1566,7 +1566,7 @@ ioc_io_queue_complete_entry(ioc_data_t *ioc_data, ioc_io_queue_entry_t *entry_pt

assert(ioc_data->io_queue.num_pending + ioc_data->io_queue.num_in_progress == ioc_data->io_queue.q_len);

atomic_fetch_sub(&ioc_data->sf_io_ops_pending, 1);
H5TS_atomic_fetch_sub_int(&ioc_data->sf_io_ops_pending, 1);

#ifdef H5_SUBFILING_DEBUG
H5_subfiling_log(entry_ptr->wk_req.context_id,
Expand All @@ -1575,7 +1575,7 @@ ioc_io_queue_complete_entry(ioc_data_t *ioc_data, ioc_io_queue_entry_t *entry_pt
__func__, entry_ptr->counter, entry_ptr->wk_ret, (entry_ptr->wk_req.tag),
(long long)(entry_ptr->wk_req.header[0]), (long long)(entry_ptr->wk_req.header[1]),
(long long)(entry_ptr->wk_req.header[2]), ioc_data->io_queue.num_pending,
ioc_data->io_queue.num_in_progress, atomic_load(&ioc_data->sf_io_ops_pending));
ioc_data->io_queue.num_in_progress, H5TS_atomic_load_int(&ioc_data->sf_io_ops_pending));

/*
* If this I/O request is a truncate or "get eof" op, make sure
Expand All @@ -1585,7 +1585,7 @@ ioc_io_queue_complete_entry(ioc_data_t *ioc_data, ioc_io_queue_entry_t *entry_pt
assert(ioc_data->io_queue.num_in_progress == 0);
#endif

assert(ioc_data->io_queue.q_len == atomic_load(&ioc_data->sf_io_ops_pending));
assert(ioc_data->io_queue.q_len == H5TS_atomic_load_int(&ioc_data->sf_io_ops_pending));

#ifdef H5FD_IOC_COLLECT_STATS
/* Compute the queued and execution time */
Expand Down
6 changes: 0 additions & 6 deletions src/H5FDsubfiling/H5FDsubfiling_priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@
#ifndef H5FDsubfiling_priv_H
#define H5FDsubfiling_priv_H

/********************/
/* Standard Headers */
/********************/

#include <stdatomic.h>

/**************/
/* H5 Headers */
/**************/
Expand Down
2 changes: 0 additions & 2 deletions src/H5FDsubfiling/H5subfiling_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
#ifndef H5_SUBFILING_COMMON_H
#define H5_SUBFILING_COMMON_H

#include <stdatomic.h>

#include "H5private.h"
#include "H5FDprivate.h"
#include "H5Iprivate.h"
Expand Down
Loading

0 comments on commit f68db6d

Please sign in to comment.