Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvs: modify kvs_watch callback type to return integer #77

Merged
merged 6 commits into from
Oct 19, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/bindings/lua/flux-lua.c
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ static int l_msghandler_newindex (lua_State *L)
return (0);
}

static void l_kvswatcher (const char *key, json_object *val, void *arg, int errnum)
static int l_kvswatcher (const char *key, json_object *val, void *arg, int errnum)
{
int rc;
int t;
Expand Down Expand Up @@ -903,6 +903,7 @@ static void l_kvswatcher (const char *key, json_object *val, void *arg, int errn
}
/* Reset stack */
lua_settop (L, 0);
return 0;
}

static int l_kvswatcher_remove (lua_State *L)
Expand Down
5 changes: 3 additions & 2 deletions src/common/libzio/kz.c
Original file line number Diff line number Diff line change
Expand Up @@ -348,14 +348,15 @@ int kz_close (kz_t kz)
return rc;
}

void kvswatch_cb (const char *key, kvsdir_t dir, void *arg, int errnum)
int kvswatch_cb (const char *key, kvsdir_t dir, void *arg, int errnum)
{
kz_t kz = arg;

if (errnum != 0 && errnum != ENOENT)
flux_reactor_stop (kz->h);
return -1;
else if (errnum == 0 && kz->ready_cb)
kz->ready_cb (kz, kz->ready_arg);
return 0;
}

int kz_set_ready_cb (kz_t kz, kz_ready_f ready_cb, void *arg)
Expand Down
14 changes: 7 additions & 7 deletions src/modules/kvs/kvs.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@

typedef struct kvsdir_struct *kvsdir_t;

typedef void (KVSSetF(const char *key, json_object *val, void *arg,int errnum));
typedef void (KVSSetDirF(const char *key, kvsdir_t dir, void *arg, int errnum));
typedef void (KVSSetStringF(const char *key, const char *val, void *arg, int errnum));
typedef void (KVSSetIntF(const char *key, int val, void *arg, int errnum));
typedef void (KVSSetInt64F(const char *key, int64_t val, void *arg,int errnum));
typedef void (KVSSetDoubleF(const char *key, double val, void *arg,int errnum));
typedef void (KVSSetBooleanF(const char *key, bool val, void *arg, int errnum));
typedef int (KVSSetF(const char *key, json_object *val, void *arg,int errnum));
typedef int (KVSSetDirF(const char *key, kvsdir_t dir, void *arg, int errnum));
typedef int (KVSSetStringF(const char *key, const char *val, void *arg, int errnum));
typedef int (KVSSetIntF(const char *key, int val, void *arg, int errnum));
typedef int (KVSSetInt64F(const char *key, int64_t val, void *arg,int errnum));
typedef int (KVSSetDoubleF(const char *key, double val, void *arg,int errnum));
typedef int (KVSSetBooleanF(const char *key, bool val, void *arg, int errnum));

/* Destroy a kvsdir object returned from kvs_get_dir() or kvsdir_get_dir()
*/
Expand Down
25 changes: 15 additions & 10 deletions src/modules/kvs/libkvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -489,55 +489,57 @@ int kvs_get_boolean (flux_t h, const char *key, bool *valp)
** WATCH
**/

static void dispatch_watch (flux_t h, kvs_watcher_t *wp, const char *key,
static int dispatch_watch (flux_t h, kvs_watcher_t *wp, const char *key,
json_object *val)
{
int errnum = val ? 0 : ENOENT;
int rc = -1;

switch (wp->type) {
case WATCH_STRING: {
KVSSetStringF *set = (KVSSetStringF *)wp->set;
const char *s = val ? json_object_get_string (val) : NULL;
set (key, s, wp->arg, errnum);
rc = set (key, s, wp->arg, errnum);
break;
}
case WATCH_INT: {
KVSSetIntF *set = (KVSSetIntF *)wp->set;
int i = val ? json_object_get_int (val) : 0;
set (key, i, wp->arg, errnum);
rc = set (key, i, wp->arg, errnum);
break;
}
case WATCH_INT64: {
KVSSetInt64F *set = (KVSSetInt64F *)wp->set;
int64_t i = val ? json_object_get_int64 (val) : 0;
set (key, i, wp->arg, errnum);
rc = set (key, i, wp->arg, errnum);
break;
}
case WATCH_DOUBLE: {
KVSSetDoubleF *set = (KVSSetDoubleF *)wp->set;
double d = val ? json_object_get_double (val) : 0;
set (key, d, wp->arg, errnum);
rc = set (key, d, wp->arg, errnum);
break;
}
case WATCH_BOOLEAN: {
KVSSetBooleanF *set = (KVSSetBooleanF *)wp->set;
bool b = val ? json_object_get_boolean (val) : false;
set (key, b, wp->arg, errnum);
rc = set (key, b, wp->arg, errnum);
break;
}
case WATCH_DIR: {
KVSSetDirF *set = (KVSSetDirF *)wp->set;
kvsdir_t dir = val ? kvsdir_alloc (h, key, val) : NULL;
set (key, dir, wp->arg, errnum);
rc = set (key, dir, wp->arg, errnum);
if (dir)
kvsdir_destroy (dir);
break;
}
case WATCH_OBJECT: {
wp->set (key, val, wp->arg, errnum);
rc = wp->set (key, val, wp->arg, errnum);
break;
}
}
return rc;
}

static int watch_rep_cb (flux_t h, int typemask, zmsg_t **zmsg, void *arg)
Expand All @@ -547,20 +549,23 @@ static int watch_rep_cb (flux_t h, int typemask, zmsg_t **zmsg, void *arg)
json_object_iter iter;
kvs_watcher_t *wp;
bool match = false;
int rc = 0;

if (flux_msg_decode (*zmsg, NULL, &reply) == 0 && reply != NULL) {
json_object_object_foreachC (reply, iter) {
if ((wp = zhash_lookup (ctx->watchers, iter.key))) {
dispatch_watch (h, wp, iter.key, iter.val);
rc = dispatch_watch (h, wp, iter.key, iter.val);
match = true;
if (rc < 0)
break;
}
}
}
if (reply)
json_object_put (reply);
if (match)
zmsg_destroy (zmsg);
return 0;
return rc;
}

static kvs_watcher_t *add_watcher (flux_t h, const char *key, watch_type_t type,
Expand Down
10 changes: 6 additions & 4 deletions src/modules/live/live.c
Original file line number Diff line number Diff line change
Expand Up @@ -492,26 +492,28 @@ static void manage_subscriptions (ctx_t *ctx)
}
}

static void max_idle_cb (const char *key, int val, void *arg, int errnum)
static int max_idle_cb (const char *key, int val, void *arg, int errnum)
{
ctx_t *ctx = arg;

if (errnum != ENOENT && errnum != 0)
return;
return 0;
if (errnum == ENOENT)
val = default_max_idle;
ctx->max_idle = val;
return 0;
}

static void slow_idle_cb (const char *key, int val, void *arg, int errnum)
static int slow_idle_cb (const char *key, int val, void *arg, int errnum)
{
ctx_t *ctx = arg;

if (errnum != ENOENT && errnum != 0)
return;
return 0;
if (errnum == ENOENT)
val = default_slow_idle;
ctx->slow_idle = val;
return 0;
}

static int goodbye_request_cb (flux_t h, int typemask, zmsg_t **zmsg, void *arg)
Expand Down
3 changes: 2 additions & 1 deletion src/modules/modctl/modctl.c
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ static int installmod (ctx_t *ctx, const char *name)
/* This function is called whenver conf.modctl.seq is updated by master.
* It syncs the set of loaded modules with the KVS.
*/
static void conf_cb (const char *path, int seq, void *arg, int errnum)
static int conf_cb (const char *path, int seq, void *arg, int errnum)
{
ctx_t *ctx = arg;
kvsitr_t itr;
Expand Down Expand Up @@ -332,6 +332,7 @@ static void conf_cb (const char *path, int seq, void *arg, int errnum)
lsmod_reduce (ctx, seq);
done:
Jput (lsmod);
return 0;
}

static int seq_incr (flux_t h)
Expand Down
7 changes: 3 additions & 4 deletions src/test/tkvswatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,11 @@ static void wait_ready (void)
/* expect val: {-1,0,1,...,(changes - 1)}
* count will therefore run 0...changes.
*/
static void watch_cb (const char *k, int val, void *arg, int errnum)
static int watch_cb (const char *k, int val, void *arg, int errnum)
{
thd_t *t = arg;

if (errnum == 0 && val + 1 == changes)
flux_reactor_stop (t->h);
return -1;
return 0;
}

void *thread (void *arg)
Expand Down