diff --git a/src/broker/module.c b/src/broker/module.c index 9cdfd059129b..e2e54887c6f8 100644 --- a/src/broker/module.c +++ b/src/broker/module.c @@ -160,7 +160,7 @@ static void *module_thread (void *arg) ac = argz_count (p->argz, p->argz_len); av = xzmalloc (sizeof (av[0]) * (ac + 1)); argz_extract (p->argz, p->argz_len, av); - if (p->main(p->h, ac, av) < 0) { + if (p->main (p->h, ac, av) < 0) { mod_main_errno = errno; if (mod_main_errno == 0) mod_main_errno = ECONNRESET; diff --git a/t/Makefile.am b/t/Makefile.am index 812d96b2145c..5d60c8b6294c 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -164,6 +164,7 @@ check_PROGRAMS = \ kvs/watch \ kvs/watch_disconnect \ kvs/commit \ + kvs/commitmerge \ kvs/basic \ request/treq @@ -279,6 +280,12 @@ kvs_commit_LDADD = \ $(top_builddir)/src/modules/kvs/libflux-kvs.la \ $(test_ldadd) $(LIBDL) $(LIBUTIL) +kvs_commitmerge_SOURCES = kvs/commitmerge.c +kvs_commitmerge_CPPFLAGS = $(test_cppflags) +kvs_commitmerge_LDADD = \ + $(top_builddir)/src/modules/kvs/libflux-kvs.la \ + $(test_ldadd) $(LIBDL) $(LIBUTIL) + kvs_watch_disconnect_SOURCES = kvs/watch_disconnect.c kvs_watch_disconnect_CPPFLAGS = $(test_cppflags) kvs_watch_disconnect_LDADD = \ diff --git a/t/kvs/commit.c b/t/kvs/commit.c index 3eaf6eee2a29..957789005662 100644 --- a/t/kvs/commit.c +++ b/t/kvs/commit.c @@ -22,7 +22,7 @@ * See also: http://www.gnu.org/licenses/ \*****************************************************************************/ -/* tcommit - performance test for KVS commits */ +/* commit - performance test for KVS commits */ #if HAVE_CONFIG_H #include "config.h" @@ -116,7 +116,7 @@ void *thread (void *arg) done: if (t->h) flux_close (t->h); - return NULL; + return NULL; } int main (int argc, char *argv[]) @@ -134,6 +134,8 @@ int main (int argc, char *argv[]) case 'f': fopt = true; fence_nprocs = strtoul (optarg, NULL, 10); + if (!fence_nprocs) + log_msg_exit ("fence value must be > 0"); break; case 's': sopt = true; @@ -146,7 +148,11 @@ int main (int argc, char *argv[]) usage (); nthreads = strtoul (argv[optind++], NULL, 10); + if (!nthreads) + log_msg_exit ("thread count must be > 0"); count = strtoul (argv[optind++], NULL, 10); + if (!count) + log_msg_exit ("commit count must be > 0"); prefix = argv[optind++]; memset (&ts, 0, sizeof (ts)); diff --git a/t/kvs/commitmerge.c b/t/kvs/commitmerge.c new file mode 100644 index 000000000000..427c5c6cc208 --- /dev/null +++ b/t/kvs/commitmerge.c @@ -0,0 +1,266 @@ +/*****************************************************************************\ + * Copyright (c) 2014 Lawrence Livermore National Security, LLC. Produced at + * the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS). + * LLNL-CODE-658032 All rights reserved. + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the license, or (at your option) + * any later version. + * + * Flux is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * See also: http://www.gnu.org/licenses/ +\*****************************************************************************/ + +/* commitmerge test + * + * Basic purpose of this test is to test if commit merging does not + * work properly when commit-merge is disabled in the kvs + * (commit-merge=0). + * + * A watch thread will watch a key. + * A number of commit threads will commit a value to the key. + * + * The watch should see every single change if commit merging is not enabled. + */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include +#include +#include +#include + +#include "src/common/libutil/log.h" +#include "src/common/libutil/xzmalloc.h" +#include "src/common/libutil/shortjson.h" + +typedef struct { + pthread_t t; + pthread_attr_t attr; + int n; + flux_t *h; +} thd_t; + +typedef struct { + thd_t *t; + int lastcount; +} watch_count_t; + +#define KEYSUFFIX "commitwatch-key" + +#define WATCH_TIMEOUT 5 + +static int threadcount = -1; +static int changecount = 0; +static char *prefix = NULL; +static char *key = NULL; + +static int watch_init = 0; +static pthread_cond_t watch_init_cond = PTHREAD_COND_INITIALIZER; +static pthread_mutex_t watch_init_lock = PTHREAD_MUTEX_INITIALIZER; + +static void usage (void) +{ + fprintf (stderr, "Usage: commitmerge threadcount prefix\n"); + exit (1); +} + +void watch_prepare_cb (flux_reactor_t *r, flux_watcher_t *w, + int revents, void *arg) +{ + /* Tell main it can now launch commit threads */ + if (!watch_init) { + pthread_mutex_lock (&watch_init_lock); + watch_init++; + pthread_cond_signal (&watch_init_cond); + pthread_mutex_unlock (&watch_init_lock); + } + + if (changecount >= threadcount) + flux_reactor_stop (r); +} + +static int watch_count_cb (const char *key, int val, void *arg, int errnum) +{ + thd_t *t = arg; + + /* First value should be empty & get ENOENT */ + if (!errnum) { + //printf ("watch %s = %d\n", key, val); + changecount++; + } + + if (changecount == threadcount) + kvs_unwatch (t->h, key); + return 0; +} + +static void watch_timeout_cb (flux_reactor_t *r, + flux_watcher_t *w, + int revents, + void *arg) +{ + watch_count_t *wc = arg; + + /* timeout */ + if (wc->lastcount == changecount) + flux_reactor_stop (r); + else + wc->lastcount = changecount; +} + +void *watchthread (void *arg) +{ + thd_t *t = arg; + watch_count_t wc; + flux_reactor_t *r; + flux_watcher_t *pw = NULL; + flux_watcher_t *tw = NULL; + char *json_str = NULL; + int rc; + + if (!(t->h = flux_open (NULL, 0))) + log_err_exit ("flux_open"); + + /* Make sure key doesn't already exist, initial value may affect + * test by chance (i.e. initial value = 0, commit 0 and thus no + * change) + */ + + rc = kvs_get (t->h, key, &json_str); + if (rc < 0 && errno != ENOENT) + log_err_exit ("kvs_get"); + + if (!rc) { + if (kvs_unlink (t->h, key) < 0) + log_err_exit ("kvs_unlink"); + if (kvs_commit (t->h) < 0) + log_err_exit ("kvs_commit"); + } + + r = flux_get_reactor (t->h); + + if (kvs_watch_int (t->h, key, watch_count_cb, t) < 0) + log_err_exit ("kvs_watch_int %s", key); + + pw = flux_prepare_watcher_create (r, watch_prepare_cb, NULL); + + wc.t = t; + wc.lastcount = -1; + + /* So test won't hang if there's a bug */ + tw = flux_timer_watcher_create (r, + WATCH_TIMEOUT, + WATCH_TIMEOUT, + watch_timeout_cb, + &wc); + + flux_watcher_start (pw); + flux_watcher_start (tw); + + if (flux_reactor_run (r, 0) < 0) + log_err_exit ("flux_reactor_run"); + + if (json_str) + free (json_str); + flux_watcher_destroy (pw); + flux_watcher_destroy (tw); + flux_close (t->h); + return NULL; +} + +void *committhread (void *arg) +{ + thd_t *t = arg; + + if (!(t->h = flux_open (NULL, 0))) + log_err_exit ("flux_open"); + + if (kvs_put_int (t->h, key, t->n) < 0) + log_err_exit ("%s", key); + + if (kvs_commit (t->h) < 0) + log_err_exit ("kvs_commit"); + + flux_close (t->h); + return NULL; +} + +int main (int argc, char *argv[]) +{ + thd_t *thd; + int i, rc; + + log_init (basename (argv[0])); + + if (argc - optind != 2) + usage (); + + threadcount = strtoul (argv[optind++], NULL, 10); + if (!threadcount) + log_msg_exit ("thread count must be > 0"); + prefix = argv[optind++]; + + key = xasprintf ("%s.%s", prefix, KEYSUFFIX); + + /* +1 for watch thread */ + thd = xzmalloc (sizeof (*thd) * (threadcount + 1)); + + /* start watch thread */ + thd[threadcount].n = threadcount; + if ((rc = pthread_attr_init (&thd[threadcount].attr))) + log_errn (rc, "pthread_attr_init"); + if ((rc = pthread_create (&thd[threadcount].t, + &thd[threadcount].attr, + watchthread, + &thd[threadcount]))) + log_errn (rc, "pthread_create"); + + /* Wait for watch thread to setup */ + pthread_mutex_lock (&watch_init_lock); + while (!watch_init) + pthread_cond_wait (&watch_init_cond, &watch_init_lock); + pthread_mutex_unlock (&watch_init_lock); + + /* start commit threads */ + for (i = 0; i < threadcount; i++) { + thd[i].n = i; + if ((rc = pthread_attr_init (&thd[i].attr))) + log_errn (rc, "pthread_attr_init"); + if ((rc = pthread_create (&thd[i].t, &thd[i].attr, committhread, &thd[i]))) + log_errn (rc, "pthread_create"); + } + + /* +1 for watch thread */ + for (i = 0; i < (threadcount + 1); i++) { + if ((rc = pthread_join (thd[i].t, NULL))) + log_errn (rc, "pthread_join"); + } + + printf("%d\n", changecount); + + free (thd); + free (key); + + log_fini (); + + return 0; +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/t/t1000-kvs-basic.t b/t/t1000-kvs-basic.t index 55b1c3deaeed..bfd70f3dfb92 100755 --- a/t/t1000-kvs-basic.t +++ b/t/t1000-kvs-basic.t @@ -613,4 +613,15 @@ test_expect_success 'kvs: copy-tokvs and copy-fromkvs work' ' test_cmp random.data reread.data ' +# All tests below assume commit-merge=0 + +# commit-merge option test +test_expect_success 'kvs: commit-merge disabling works' ' + THREADS=64 && + flux module remove -r 0 kvs && + flux module load -r 0 kvs commit-merge=0 && + OUTPUT=`${FLUX_BUILD_DIR}/t/kvs/commitmerge ${THREADS} $(basename ${SHARNESS_TEST_FILE})` + test "$OUTPUT" = "${THREADS}" +' + test_done