Skip to content

Commit

Permalink
tkvswatch: add a test kvs watch modifying own key
Browse files Browse the repository at this point in the history
This is a test for issue flux-framework#81
  • Loading branch information
garlick authored and grondo committed Oct 31, 2014
1 parent 11133c3 commit 2745310
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 29 deletions.
105 changes: 78 additions & 27 deletions src/test/tkvswatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,6 @@
#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <sys/types.h>
#include <sys/time.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <unistd.h>
#include <getopt.h>
#include <assert.h>
#include <string.h>
#include <zmq.h>
#include <czmq.h>
#include <libgen.h>
#include <pthread.h>
#include <json.h>
#include <flux/core.h>

Expand Down Expand Up @@ -101,10 +87,11 @@ static void wait_ready (void)
/* expect val: {-1,0,1,...,(changes - 1)}
* count will therefore run 0...changes.
*/
static int watch_cb (const char *k, int val, void *arg, int errnum)
static int mt_watch_cb (const char *k, int val, void *arg, int errnum)
{
thd_t *t = arg;
if (errnum == 0 && val + 1 == changes)
return -1;
flux_reactor_stop (t->h);
return 0;
}

Expand All @@ -113,14 +100,14 @@ void *thread (void *arg)
thd_t *t = arg;

if (!(t->h = flux_api_open ())) {
err ("%d: cmb_init", t->n);
err ("%d: flux_api_open", t->n);
goto done;
}
signal_ready ();
/* The first kvs.watch reply is handled synchronously, then other kvs.watch
* replies will arrive asynchronously and be handled by the reactor.
*/
if (kvs_watch_int (t->h, key, watch_cb, t) < 0) {
if (kvs_watch_int (t->h, key, mt_watch_cb, t) < 0) {
err ("%d: kvs_watch_int", t->n);
goto done;
}
Expand All @@ -135,26 +122,34 @@ void *thread (void *arg)
return NULL;
}

int main (int argc, char *argv[])
void usage (void)
{
fprintf (stderr,
"Usage: tkvswatch mt nthreads changes key\n"
" selfmod key\n"
);
exit (1);

}

void test_mt (int argc, char **argv)
{
thd_t *thd;
int i, rc;
flux_t h;

log_init (basename (argv[0]));

if (argc != 4) {
if (argc != 3) {
fprintf (stderr, "Usage: tkvswatch nthreads changes key\n");
exit (1);
}
nthreads = strtoul (argv[1], NULL, 10);
changes = strtoul (argv[2], NULL, 10);
key = argv[3];
nthreads = strtoul (argv[0], NULL, 10);
changes = strtoul (argv[1], NULL, 10);
key = argv[2];

thd = xzmalloc (sizeof (*thd) * nthreads);

if (!(h = flux_api_open ()))
err_exit ("cmb_init");
err_exit ("flux_api_open");

if (kvs_put_int (h, key, -1) < 0)
err_exit ("kvs_put_int %s", key);
Expand Down Expand Up @@ -185,9 +180,65 @@ int main (int argc, char *argv[])
free (thd);

flux_api_close (h);
}

log_fini ();
static int selfmod_watch_cb (const char *key, int val, void *arg, int errnum)
{
msg ("%s: value = %d errnum = %d", __FUNCTION__, val, errnum);

flux_t h = arg;
if (kvs_put_int (h, key, val + 1) < 0)
err_exit ("%s: kvs_put_int", __FUNCTION__);
if (kvs_commit (h) < 0)
err_exit ("%s: kvs_commit", __FUNCTION__);
return (val == 0 ? -1 : 0);
}

void test_selfmod (int argc, char **argv)
{
flux_t h;
char *key;

if (argc != 1) {
fprintf (stderr, "Usage: selfmod key\n");
exit (1);
}
key = argv[0];
if (!(h = flux_api_open ()))
err_exit ("flux_api_open");

if (kvs_put_int (h, key, -1) < 0)
err_exit ("kvs_put_int");
if (kvs_commit (h) < 0)
err_exit ("kvs_commit");
if (kvs_watch_int (h, key, selfmod_watch_cb, h) < 0)
err_exit ("kvs_watch_int");

msg ("reactor: start");
flux_reactor_start (h);
msg ("reactor: end");

flux_api_close (h);
}

int main (int argc, char *argv[])
{
char *cmd;

if (argc == 1)
usage ();
cmd = argv[1];

log_init (basename (argv[0]));

if (!strcmp (cmd, "mt"))
test_mt (argc - 2, argv + 2);
else if (!strcmp (cmd, "selfmod"))
test_selfmod (argc - 2, argv + 2);
else
usage ();

log_fini ();
return 0;
}

Expand Down
9 changes: 7 additions & 2 deletions t/t1000-kvs-basic.t
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,13 @@ test_expect_success 'kvs: tcommit: start 100 API threads each doing 50 put,fence
${FLUX_BUILD_DIR}/src/test/tcommit --fence 100 50 \
$(basename ${SHARNESS_TEST_FILE})
'
test_expect_success 'kvs: tkvswatch: multi-threaded kvs watch program' '
${FLUX_BUILD_DIR}/src/test/tkvswatch 100 100 TEST.a &&
test_expect_success 'kvs: tkvswatch-mt: multi-threaded kvs watch program' '
${FLUX_BUILD_DIR}/src/test/tkvswatch mt 100 100 TEST.a &&
flux kvs unlink TEST.a
'

test_expect_success 'kvs: tkvswatch-selfmod: watch callback modifies watched key' '
${FLUX_BUILD_DIR}/src/test/tkvswatch selfmod TEST.a &&
flux kvs unlink TEST.a
'

Expand Down

0 comments on commit 2745310

Please sign in to comment.