Skip to content

Commit

Permalink
Merge pull request #979 from chu11/issue813-unittest
Browse files Browse the repository at this point in the history
Add test for disabling commit-merge in kvs + additional minor cleanup patches
  • Loading branch information
garlick authored Feb 16, 2017
2 parents edc914e + ce06972 commit 0889e59
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 3 deletions.
2 changes: 1 addition & 1 deletion src/broker/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ static void *module_thread (void *arg)
ac = argz_count (p->argz, p->argz_len);
av = xzmalloc (sizeof (av[0]) * (ac + 1));
argz_extract (p->argz, p->argz_len, av);
if (p->main(p->h, ac, av) < 0) {
if (p->main (p->h, ac, av) < 0) {
mod_main_errno = errno;
if (mod_main_errno == 0)
mod_main_errno = ECONNRESET;
Expand Down
7 changes: 7 additions & 0 deletions t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ check_PROGRAMS = \
kvs/watch \
kvs/watch_disconnect \
kvs/commit \
kvs/commitmerge \
kvs/basic \
request/treq

Expand Down Expand Up @@ -279,6 +280,12 @@ kvs_commit_LDADD = \
$(top_builddir)/src/modules/kvs/libflux-kvs.la \
$(test_ldadd) $(LIBDL) $(LIBUTIL)

kvs_commitmerge_SOURCES = kvs/commitmerge.c
kvs_commitmerge_CPPFLAGS = $(test_cppflags)
kvs_commitmerge_LDADD = \
$(top_builddir)/src/modules/kvs/libflux-kvs.la \
$(test_ldadd) $(LIBDL) $(LIBUTIL)

kvs_watch_disconnect_SOURCES = kvs/watch_disconnect.c
kvs_watch_disconnect_CPPFLAGS = $(test_cppflags)
kvs_watch_disconnect_LDADD = \
Expand Down
10 changes: 8 additions & 2 deletions t/kvs/commit.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* See also: http://www.gnu.org/licenses/
\*****************************************************************************/

/* tcommit - performance test for KVS commits */
/* commit - performance test for KVS commits */

#if HAVE_CONFIG_H
#include "config.h"
Expand Down Expand Up @@ -116,7 +116,7 @@ void *thread (void *arg)
done:
if (t->h)
flux_close (t->h);
return NULL;
return NULL;
}

int main (int argc, char *argv[])
Expand All @@ -134,6 +134,8 @@ int main (int argc, char *argv[])
case 'f':
fopt = true;
fence_nprocs = strtoul (optarg, NULL, 10);
if (!fence_nprocs)
log_msg_exit ("fence value must be > 0");
break;
case 's':
sopt = true;
Expand All @@ -146,7 +148,11 @@ int main (int argc, char *argv[])
usage ();

nthreads = strtoul (argv[optind++], NULL, 10);
if (!nthreads)
log_msg_exit ("thread count must be > 0");
count = strtoul (argv[optind++], NULL, 10);
if (!count)
log_msg_exit ("commit count must be > 0");
prefix = argv[optind++];

memset (&ts, 0, sizeof (ts));
Expand Down
266 changes: 266 additions & 0 deletions t/kvs/commitmerge.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
/*****************************************************************************\
* Copyright (c) 2014 Lawrence Livermore National Security, LLC. Produced at
* the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS).
* LLNL-CODE-658032 All rights reserved.
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the license, or (at your option)
* any later version.
*
* Flux is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
* See also: http://www.gnu.org/licenses/
\*****************************************************************************/

/* commitmerge test
*
* Basic purpose of this test is to test if commit merging does not
* work properly when commit-merge is disabled in the kvs
* (commit-merge=0).
*
* A watch thread will watch a key.
* A number of commit threads will commit a value to the key.
*
* The watch should see every single change if commit merging is not enabled.
*/

#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <assert.h>
#include <libgen.h>
#include <pthread.h>
#include <getopt.h>
#include <inttypes.h>
#include <flux/core.h>

#include "src/common/libutil/log.h"
#include "src/common/libutil/xzmalloc.h"
#include "src/common/libutil/shortjson.h"

typedef struct {
pthread_t t;
pthread_attr_t attr;
int n;
flux_t *h;
} thd_t;

typedef struct {
thd_t *t;
int lastcount;
} watch_count_t;

#define KEYSUFFIX "commitwatch-key"

#define WATCH_TIMEOUT 5

static int threadcount = -1;
static int changecount = 0;
static char *prefix = NULL;
static char *key = NULL;

static int watch_init = 0;
static pthread_cond_t watch_init_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t watch_init_lock = PTHREAD_MUTEX_INITIALIZER;

static void usage (void)
{
fprintf (stderr, "Usage: commitmerge threadcount prefix\n");
exit (1);
}

void watch_prepare_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg)
{
/* Tell main it can now launch commit threads */
if (!watch_init) {
pthread_mutex_lock (&watch_init_lock);
watch_init++;
pthread_cond_signal (&watch_init_cond);
pthread_mutex_unlock (&watch_init_lock);
}

if (changecount >= threadcount)
flux_reactor_stop (r);
}

static int watch_count_cb (const char *key, int val, void *arg, int errnum)
{
thd_t *t = arg;

/* First value should be empty & get ENOENT */
if (!errnum) {
//printf ("watch %s = %d\n", key, val);
changecount++;
}

if (changecount == threadcount)
kvs_unwatch (t->h, key);
return 0;
}

static void watch_timeout_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
watch_count_t *wc = arg;

/* timeout */
if (wc->lastcount == changecount)
flux_reactor_stop (r);
else
wc->lastcount = changecount;
}

void *watchthread (void *arg)
{
thd_t *t = arg;
watch_count_t wc;
flux_reactor_t *r;
flux_watcher_t *pw = NULL;
flux_watcher_t *tw = NULL;
char *json_str = NULL;
int rc;

if (!(t->h = flux_open (NULL, 0)))
log_err_exit ("flux_open");

/* Make sure key doesn't already exist, initial value may affect
* test by chance (i.e. initial value = 0, commit 0 and thus no
* change)
*/

rc = kvs_get (t->h, key, &json_str);
if (rc < 0 && errno != ENOENT)
log_err_exit ("kvs_get");

if (!rc) {
if (kvs_unlink (t->h, key) < 0)
log_err_exit ("kvs_unlink");
if (kvs_commit (t->h) < 0)
log_err_exit ("kvs_commit");
}

r = flux_get_reactor (t->h);

if (kvs_watch_int (t->h, key, watch_count_cb, t) < 0)
log_err_exit ("kvs_watch_int %s", key);

pw = flux_prepare_watcher_create (r, watch_prepare_cb, NULL);

wc.t = t;
wc.lastcount = -1;

/* So test won't hang if there's a bug */
tw = flux_timer_watcher_create (r,
WATCH_TIMEOUT,
WATCH_TIMEOUT,
watch_timeout_cb,
&wc);

flux_watcher_start (pw);
flux_watcher_start (tw);

if (flux_reactor_run (r, 0) < 0)
log_err_exit ("flux_reactor_run");

if (json_str)
free (json_str);
flux_watcher_destroy (pw);
flux_watcher_destroy (tw);
flux_close (t->h);
return NULL;
}

void *committhread (void *arg)
{
thd_t *t = arg;

if (!(t->h = flux_open (NULL, 0)))
log_err_exit ("flux_open");

if (kvs_put_int (t->h, key, t->n) < 0)
log_err_exit ("%s", key);

if (kvs_commit (t->h) < 0)
log_err_exit ("kvs_commit");

flux_close (t->h);
return NULL;
}

int main (int argc, char *argv[])
{
thd_t *thd;
int i, rc;

log_init (basename (argv[0]));

if (argc - optind != 2)
usage ();

threadcount = strtoul (argv[optind++], NULL, 10);
if (!threadcount)
log_msg_exit ("thread count must be > 0");
prefix = argv[optind++];

key = xasprintf ("%s.%s", prefix, KEYSUFFIX);

/* +1 for watch thread */
thd = xzmalloc (sizeof (*thd) * (threadcount + 1));

/* start watch thread */
thd[threadcount].n = threadcount;
if ((rc = pthread_attr_init (&thd[threadcount].attr)))
log_errn (rc, "pthread_attr_init");
if ((rc = pthread_create (&thd[threadcount].t,
&thd[threadcount].attr,
watchthread,
&thd[threadcount])))
log_errn (rc, "pthread_create");

/* Wait for watch thread to setup */
pthread_mutex_lock (&watch_init_lock);
while (!watch_init)
pthread_cond_wait (&watch_init_cond, &watch_init_lock);
pthread_mutex_unlock (&watch_init_lock);

/* start commit threads */
for (i = 0; i < threadcount; i++) {
thd[i].n = i;
if ((rc = pthread_attr_init (&thd[i].attr)))
log_errn (rc, "pthread_attr_init");
if ((rc = pthread_create (&thd[i].t, &thd[i].attr, committhread, &thd[i])))
log_errn (rc, "pthread_create");
}

/* +1 for watch thread */
for (i = 0; i < (threadcount + 1); i++) {
if ((rc = pthread_join (thd[i].t, NULL)))
log_errn (rc, "pthread_join");
}

printf("%d\n", changecount);

free (thd);
free (key);

log_fini ();

return 0;
}

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
11 changes: 11 additions & 0 deletions t/t1000-kvs-basic.t
Original file line number Diff line number Diff line change
Expand Up @@ -613,4 +613,15 @@ test_expect_success 'kvs: copy-tokvs and copy-fromkvs work' '
test_cmp random.data reread.data
'

# All tests below assume commit-merge=0

# commit-merge option test
test_expect_success 'kvs: commit-merge disabling works' '
THREADS=64 &&
flux module remove -r 0 kvs &&
flux module load -r 0 kvs commit-merge=0 &&
OUTPUT=`${FLUX_BUILD_DIR}/t/kvs/commitmerge ${THREADS} $(basename ${SHARNESS_TEST_FILE})`
test "$OUTPUT" = "${THREADS}"
'

test_done

0 comments on commit 0889e59

Please sign in to comment.