From 2745310e19fc67007078c4d49e205062f5be6f5b Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 23 Oct 2014 13:59:29 -0700 Subject: [PATCH] tkvswatch: add a test kvs watch modifying own key This is a test for issue #81 --- src/test/tkvswatch.c | 105 ++++++++++++++++++++++++++++++++----------- t/t1000-kvs-basic.t | 9 +++- 2 files changed, 85 insertions(+), 29 deletions(-) diff --git a/src/test/tkvswatch.c b/src/test/tkvswatch.c index c4716dbe44f7..7693db6bb474 100644 --- a/src/test/tkvswatch.c +++ b/src/test/tkvswatch.c @@ -33,20 +33,6 @@ #if HAVE_CONFIG_H #include "config.h" #endif -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include #include @@ -101,10 +87,11 @@ static void wait_ready (void) /* expect val: {-1,0,1,...,(changes - 1)} * count will therefore run 0...changes. */ -static int watch_cb (const char *k, int val, void *arg, int errnum) +static int mt_watch_cb (const char *k, int val, void *arg, int errnum) { + thd_t *t = arg; if (errnum == 0 && val + 1 == changes) - return -1; + flux_reactor_stop (t->h); return 0; } @@ -113,14 +100,14 @@ void *thread (void *arg) thd_t *t = arg; if (!(t->h = flux_api_open ())) { - err ("%d: cmb_init", t->n); + err ("%d: flux_api_open", t->n); goto done; } signal_ready (); /* The first kvs.watch reply is handled synchronously, then other kvs.watch * replies will arrive asynchronously and be handled by the reactor. */ - if (kvs_watch_int (t->h, key, watch_cb, t) < 0) { + if (kvs_watch_int (t->h, key, mt_watch_cb, t) < 0) { err ("%d: kvs_watch_int", t->n); goto done; } @@ -135,26 +122,34 @@ void *thread (void *arg) return NULL; } -int main (int argc, char *argv[]) +void usage (void) +{ + fprintf (stderr, +"Usage: tkvswatch mt nthreads changes key\n" +" selfmod key\n" +); + exit (1); + +} + +void test_mt (int argc, char **argv) { thd_t *thd; int i, rc; flux_t h; - log_init (basename (argv[0])); - - if (argc != 4) { + if (argc != 3) { fprintf (stderr, "Usage: tkvswatch nthreads changes key\n"); exit (1); } - nthreads = strtoul (argv[1], NULL, 10); - changes = strtoul (argv[2], NULL, 10); - key = argv[3]; + nthreads = strtoul (argv[0], NULL, 10); + changes = strtoul (argv[1], NULL, 10); + key = argv[2]; thd = xzmalloc (sizeof (*thd) * nthreads); if (!(h = flux_api_open ())) - err_exit ("cmb_init"); + err_exit ("flux_api_open"); if (kvs_put_int (h, key, -1) < 0) err_exit ("kvs_put_int %s", key); @@ -185,9 +180,65 @@ int main (int argc, char *argv[]) free (thd); flux_api_close (h); +} - log_fini (); +static int selfmod_watch_cb (const char *key, int val, void *arg, int errnum) +{ + msg ("%s: value = %d errnum = %d", __FUNCTION__, val, errnum); + flux_t h = arg; + if (kvs_put_int (h, key, val + 1) < 0) + err_exit ("%s: kvs_put_int", __FUNCTION__); + if (kvs_commit (h) < 0) + err_exit ("%s: kvs_commit", __FUNCTION__); + return (val == 0 ? -1 : 0); +} + +void test_selfmod (int argc, char **argv) +{ + flux_t h; + char *key; + + if (argc != 1) { + fprintf (stderr, "Usage: selfmod key\n"); + exit (1); + } + key = argv[0]; + if (!(h = flux_api_open ())) + err_exit ("flux_api_open"); + + if (kvs_put_int (h, key, -1) < 0) + err_exit ("kvs_put_int"); + if (kvs_commit (h) < 0) + err_exit ("kvs_commit"); + if (kvs_watch_int (h, key, selfmod_watch_cb, h) < 0) + err_exit ("kvs_watch_int"); + + msg ("reactor: start"); + flux_reactor_start (h); + msg ("reactor: end"); + + flux_api_close (h); +} + +int main (int argc, char *argv[]) +{ + char *cmd; + + if (argc == 1) + usage (); + cmd = argv[1]; + + log_init (basename (argv[0])); + + if (!strcmp (cmd, "mt")) + test_mt (argc - 2, argv + 2); + else if (!strcmp (cmd, "selfmod")) + test_selfmod (argc - 2, argv + 2); + else + usage (); + + log_fini (); return 0; } diff --git a/t/t1000-kvs-basic.t b/t/t1000-kvs-basic.t index ebbea5cce910..a4c0111b65bb 100755 --- a/t/t1000-kvs-basic.t +++ b/t/t1000-kvs-basic.t @@ -229,8 +229,13 @@ test_expect_success 'kvs: tcommit: start 100 API threads each doing 50 put,fence ${FLUX_BUILD_DIR}/src/test/tcommit --fence 100 50 \ $(basename ${SHARNESS_TEST_FILE}) ' -test_expect_success 'kvs: tkvswatch: multi-threaded kvs watch program' ' - ${FLUX_BUILD_DIR}/src/test/tkvswatch 100 100 TEST.a && +test_expect_success 'kvs: tkvswatch-mt: multi-threaded kvs watch program' ' + ${FLUX_BUILD_DIR}/src/test/tkvswatch mt 100 100 TEST.a && + flux kvs unlink TEST.a +' + +test_expect_success 'kvs: tkvswatch-selfmod: watch callback modifies watched key' ' + ${FLUX_BUILD_DIR}/src/test/tkvswatch selfmod TEST.a && flux kvs unlink TEST.a '