Skip to content

Commit

Permalink
Merge pull request #1 from HDFGroup/feature/vfd_swmr
Browse files Browse the repository at this point in the history
Feature/vfd swmr
  • Loading branch information
vchoi-hdfgroup authored Mar 18, 2021
2 parents 84b31b5 + e74ffa9 commit 2a4ec12
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 47 deletions.
5 changes: 2 additions & 3 deletions test/testvfdswmr.sh.in
Original file line number Diff line number Diff line change
Expand Up @@ -616,19 +616,18 @@ fi
if [ ${do_groups:-no} = yes ]; then
echo launch vfd_swmr_group_writer
catch_out_err_and_rc vfd_swmr_group_writer \
../vfd_swmr_group_writer -q -u 10 -n 10000 &
../vfd_swmr_group_writer -q -c 1000 -n 10000 &
pid_writer=$!

catch_out_err_and_rc vfd_swmr_group_reader \
../vfd_swmr_group_reader -q -u 10 -n 10000 -W &
../vfd_swmr_group_reader -q -c 1000 -n 10000 &
pid_reader=$!

# Wait for the reader to finish before signalling the
# writer to quit: the writer holds the file open so that the
# reader will find the shadow file when it opens
# the .h5 file.
wait $pid_reader
kill -USR1 $(cat vfd_swmr_group_writer.pid)
wait $pid_writer

# Collect exit code of the reader
Expand Down
164 changes: 120 additions & 44 deletions test/vfd_swmr_group_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include "hdf5.h"

#include "H5Fpkg.h"
// #include "H5Iprivate.h"
#include "H5HGprivate.h"
#include "H5VLprivate.h"

Expand All @@ -29,8 +28,8 @@ typedef struct {
hid_t file, filetype, one_by_one_sid;
char filename[PATH_MAX];
char progname[PATH_MAX];
struct timespec update_interval;
unsigned int asteps;
unsigned int csteps;
unsigned int nsteps;
bool wait_for_signal;
bool use_vfd_swmr;
Expand All @@ -42,12 +41,10 @@ typedef struct {
, .filename = "" \
, .filetype = H5T_NATIVE_UINT32 \
, .asteps = 10 \
, .csteps = 10 \
, .nsteps = 100 \
, .wait_for_signal = true \
, .use_vfd_swmr = true \
, .update_interval = (struct timespec){ \
.tv_sec = 0 \
, .tv_nsec = 1000000000UL / 30 /* 1/30 second */}}
, .use_vfd_swmr = true}

static void state_init(state_t *, int, char **);

Expand All @@ -56,19 +53,18 @@ static const hid_t badhid = H5I_INVALID_HID;
static void
usage(const char *progname)
{
fprintf(stderr, "usage: %s [-S] [-W] [-a steps] [-b]\n"
" [-n iterations] [-u milliseconds]\n"
fprintf(stderr, "usage: %s [-S] [-W] [-a steps] [-b] [-c]\n"
" [-n iterations]\n"
"\n"
"-S: do not use VFD SWMR\n"
"-W: do not wait for a signal before\n"
" exiting\n"
"-a steps: `steps` between adding attributes\n"
"-b: write data in big-endian byte order\n"
"-n iterations: how many times to expand each dataset\n"
"-u ms: milliseconds interval between updates\n"
" to %s.h5\n"
"-c steps: `steps` between communication between the writer and reader\n"
"-n ngroups: the number of groups\n"
"\n",
progname, progname);
progname);
exit(EXIT_FAILURE);
}

Expand All @@ -80,13 +76,12 @@ state_init(state_t *s, int argc, char **argv)
const hsize_t dims = 1;
char tfile[PATH_MAX];
char *end;
unsigned long millis;

*s = ALL_HID_INITIALIZER;
esnprintf(tfile, sizeof(tfile), "%s", argv[0]);
esnprintf(s->progname, sizeof(s->progname), "%s", HDbasename(tfile));

while ((ch = getopt(argc, argv, "SWa:bn:qu:")) != -1) {
while ((ch = getopt(argc, argv, "SWa:bc:n:q")) != -1) {
switch (ch) {
case 'S':
s->use_vfd_swmr = false;
Expand All @@ -95,6 +90,7 @@ state_init(state_t *s, int argc, char **argv)
s->wait_for_signal = false;
break;
case 'a':
case 'c':
case 'n':
errno = 0;
tmp = strtoul(optarg, &end, 0);
Expand All @@ -109,7 +105,9 @@ state_init(state_t *s, int argc, char **argv)

if (ch == 'a')
s->asteps = (unsigned)tmp;
else
else if (ch == 'c')
s->csteps = (unsigned)tmp;
else if (ch == 'n')
s->nsteps = (unsigned)tmp;
break;
case 'b':
Expand All @@ -118,21 +116,6 @@ state_init(state_t *s, int argc, char **argv)
case 'q':
verbosity = 0;
break;
case 'u':
errno = 0;
millis = strtoul(optarg, &end, 0);
if (millis == ULONG_MAX && errno == ERANGE) {
err(EXIT_FAILURE,
"option -p argument \"%s\"", optarg);
} else if (*end != '\0') {
errx(EXIT_FAILURE,
"garbage after -p argument \"%s\"", optarg);
}
s->update_interval.tv_sec = (time_t)(millis / 1000UL);
s->update_interval.tv_nsec =
(long)((millis * 1000000UL) % 1000000000UL);
dbgf(1, "%lu milliseconds between updates\n", millis);
break;
case '?':
default:
usage(s->progname);
Expand All @@ -146,6 +129,12 @@ state_init(state_t *s, int argc, char **argv)
if ((s->one_by_one_sid = H5Screate_simple(1, &dims, &dims)) < 0)
errx(EXIT_FAILURE, "H5Screate_simple failed");

if( s->csteps < 1 || s->csteps > s->nsteps)
errx(EXIT_FAILURE, "communication interval is out of bounds");

if( s->asteps < 1 || s->asteps > s->nsteps)
errx(EXIT_FAILURE, "attribute interval is out of bounds");

if (argc > 0)
errx(EXIT_FAILURE, "unexpected command-line arguments");

Expand Down Expand Up @@ -259,17 +248,30 @@ verify_group(state_t *s, unsigned int which)
return result;
}

/* Sleep for `tenths` tenths of a second */
static void
decisleep(uint32_t tenths)
{
uint64_t nsec = tenths * 100 * 1000 * 1000;

H5_nanosleep(nsec);
}

int
main(int argc, char **argv)
{
hid_t fapl, fcpl;
sigset_t oldsigs;
herr_t ret;
unsigned step;
bool writer;
state_t s;
const char *personality;
H5F_vfd_swmr_config_t config;
const char *fifo_writer_to_reader = "./fifo_group_writer_to_reader";
const char *fifo_reader_to_writer = "./fifo_group_reader_to_writer";
int fd_writer_to_reader, fd_reader_to_writer;
int notify = 0, verify = 0;
unsigned int i;

state_init(&s, argc, argv);

Expand Down Expand Up @@ -310,27 +312,85 @@ main(int argc, char **argv)
if (s.file == badhid)
errx(EXIT_FAILURE, writer ? "H5Fcreate" : "H5Fopen");

block_signals(&oldsigs);
/* Use two named pipes(FIFO) to coordinate the writer and reader for
* two-way communication so that the two sides can move forward together.
* One is for the writer to write to the reader.
* The other one is for the reader to signal the writer. */
if (writer) {
/* Writer creates two named pipes(FIFO) */
if (HDmkfifo(fifo_writer_to_reader, 0600) < 0)
errx(EXIT_FAILURE, "HDmkfifo");

if (HDmkfifo(fifo_reader_to_writer, 0600) < 0)
errx(EXIT_FAILURE, "HDmkfifo");
}

/* Both the writer and reader open the pipes */
if ((fd_writer_to_reader = HDopen(fifo_writer_to_reader, O_RDWR)) < 0)
errx(EXIT_FAILURE, "fifo_writer_to_reader open failed");

if ((fd_reader_to_writer = HDopen(fifo_reader_to_writer, O_RDWR)) < 0)
errx(EXIT_FAILURE, "fifo_reader_to_writer open failed");

if (writer) {
for (step = 0; step < s.nsteps; step++) {
dbgf(2, "step %d\n", step);
dbgf(2, "writer: step %d\n", step);

write_group(&s, step);
nanosleep(&s.update_interval, NULL);

/* At communication interval, notifies the reader and waits for its response */
if (step % s.csteps == 0) {
/* Bump up the value of notify to notice the reader to start to read */
notify++;
if (HDwrite(fd_writer_to_reader, &notify, sizeof(int)) < 0)
err(EXIT_FAILURE, "write failed");

/* During the wait, writer makes repeated HDF5 API calls
* to trigger EOT at approximately the correct time */
for(i = 0; i < config.max_lag + 1; i++) {
decisleep(config.tick_len);
H5Aexists(s.file, "nonexistent");
}

/* Receive the same value from the reader and verify it before
* going to the next step */
verify++;
if (HDread(fd_reader_to_writer, &notify, sizeof(int)) < 0)
err(EXIT_FAILURE, "read failed");

if (notify != verify)
errx(EXIT_FAILURE, "received message %d, expecting %d", notify, verify);
}
}
} else {
for (step = 0; step < s.nsteps;) {
dbgf(2, "step %d\n", step);
if (verify_group(&s, step))
step++;
nanosleep(&s.update_interval, NULL);
}
}
for (step = 0; step < s.nsteps; step++) {
dbgf(2, "reader: step %d\n", step);

/* At communication interval, waits for the writer to finish creation before starting verification */
if (step % s.csteps == 0) {
/* The writer should have bumped up the value of notify.
* Do the same with verify and confirm it */
verify++;

/* Receive the notify that the writer bumped up the value */
if (HDread(fd_writer_to_reader, &notify, sizeof(int)) < 0)
err(EXIT_FAILURE, "read failed");

if (notify != verify)
errx(EXIT_FAILURE, "received message %d, expecting %d", notify, verify);
}

if (s.use_vfd_swmr && s.wait_for_signal)
await_signal(s.file);
while (!verify_group(&s, step))
;

restore_signals(&oldsigs);
if (step % s.csteps == 0) {
/* Send back the same nofity value for acknowledgement to tell the writer
* move to the next step. */
if (HDwrite(fd_reader_to_writer, &notify, sizeof(int)) < 0)
err(EXIT_FAILURE, "write failed");
}
}
}

if (H5Pclose(fapl) < 0)
errx(EXIT_FAILURE, "H5Pclose(fapl)");
Expand All @@ -341,5 +401,21 @@ main(int argc, char **argv)
if (H5Fclose(s.file) < 0)
errx(EXIT_FAILURE, "H5Fclose");

/* Both the writer and reader close the named pipes */
if (HDclose(fd_writer_to_reader) < 0)
errx(EXIT_FAILURE, "HDclose");

if (HDclose(fd_reader_to_writer) < 0)
errx(EXIT_FAILURE, "HDclose");

/* Reader finishes last and deletes the named pipes */
if(!writer) {
if(HDremove(fifo_writer_to_reader) != 0)
errx(EXIT_FAILURE, "fifo_writer_to_reader deletion failed");

if(HDremove(fifo_reader_to_writer) != 0)
errx(EXIT_FAILURE, "fifo_reader_to_writer deletion failed");
}

return EXIT_SUCCESS;
}

0 comments on commit 2a4ec12

Please sign in to comment.