Skip to content

Commit

Permalink
Merge pull request #1314 from chu11/kvscleanup9
Browse files Browse the repository at this point in the history
kvs: general refactoring and cleanup
  • Loading branch information
garlick authored Jan 9, 2018
2 parents 6cd7cf7 + 24e13de commit a4d438a
Show file tree
Hide file tree
Showing 7 changed files with 1,476 additions and 1,255 deletions.
89 changes: 82 additions & 7 deletions src/modules/kvs/commit.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,16 @@
#include "commit.h"
#include "kvs_util.h"

#define FENCE_READY_MASK 0x01

struct commit_mgr {
struct cache *cache;
const char *namespace;
const char *hash_name;
int noop_stores; /* for kvs.stats.get, etc.*/
zhash_t *fences;
bool iterating_fences;
zlist_t *removelist;
zlist_t *ready;
flux_t *h;
void *aux;
Expand Down Expand Up @@ -940,6 +944,11 @@ commit_mgr_t *commit_mgr_create (struct cache *cache,
saved_errno = ENOMEM;
goto error;
}
cm->iterating_fences = false;
if (!(cm->removelist = zlist_new ())) {
saved_errno = ENOMEM;
goto error;
}
cm->h = h;
cm->aux = aux;
return cm;
Expand All @@ -957,22 +966,41 @@ void commit_mgr_destroy (commit_mgr_t *cm)
zhash_destroy (&cm->fences);
if (cm->ready)
zlist_destroy (&cm->ready);
if (cm->removelist)
zlist_destroy (&cm->removelist);
free (cm);
}
}

int commit_mgr_add_fence (commit_mgr_t *cm, fence_t *f)
{
json_t *names;
json_t *name;

if (!(name = json_array_get (fence_get_json_names (f), 0))) {
/* Don't modify hash while iterating */
if (cm->iterating_fences) {
errno = EAGAIN;
goto error;
}

if (!(names = fence_get_json_names (f))) {
errno = EINVAL;
goto error;
}
if (json_array_size (names) != 1) {
errno = EINVAL;
goto error;
}
if (!(name = json_array_get (names, 0))) {
errno = EINVAL;
goto error;
}
if (zhash_insert (cm->fences, json_string_value (name), f) < 0) {
errno = EEXIST;
goto error;
}
/* initial fence aux int to 0 */
fence_set_aux_int (f, 0);
zhash_freefn (cm->fences,
json_string_value (name),
(zhash_free_fn *)fence_destroy);
Expand All @@ -986,28 +1014,54 @@ fence_t *commit_mgr_lookup_fence (commit_mgr_t *cm, const char *name)
return zhash_lookup (cm->fences, name);
}

int commit_mgr_iter_fences (commit_mgr_t *cm, commit_fence_f cb, void *data)
int commit_mgr_iter_not_ready_fences (commit_mgr_t *cm, commit_fence_f cb,
void *data)
{
fence_t *f;
char *name;
int rc = -1;

cm->iterating_fences = true;

f = zhash_first (cm->fences);
while (f) {
if (cb (f, data) < 0)
goto done;
if (!(fence_get_aux_int (f) & FENCE_READY_MASK)) {
if (cb (f, data) < 0)
goto done;
}

f = zhash_next (cm->fences);
}

cm->iterating_fences = false;

while ((name = zlist_pop (cm->removelist))) {
commit_mgr_remove_fence (cm, name);
free (name);
}

rc = 0;
done:
cm->iterating_fences = false;
return rc;
}

int commit_mgr_process_fence_request (commit_mgr_t *cm, fence_t *f)
int commit_mgr_process_fence_request (commit_mgr_t *cm, const char *name)
{
fence_t *f;

if (!(f = commit_mgr_lookup_fence (cm, name))) {
errno = EINVAL;
return -1;
}

if (fence_count_reached (f)) {
commit_t *c;
int aux_int = fence_get_aux_int (f);

/* fence is already ready */
if (aux_int & FENCE_READY_MASK)
return 0;

if (!(c = commit_create (f, cm)))
return -1;
Expand All @@ -1017,6 +1071,8 @@ int commit_mgr_process_fence_request (commit_mgr_t *cm, fence_t *f)
errno = ENOMEM;
return -1;
}
/* we use this flag to indicate if a fence is "ready" */
fence_set_aux_int (f, aux_int | FENCE_READY_MASK);
zlist_freefn (cm->ready, c, (zlist_free_fn *)commit_destroy, true);
}

Expand Down Expand Up @@ -1044,9 +1100,28 @@ void commit_mgr_remove_commit (commit_mgr_t *cm, commit_t *c)
zlist_remove (cm->ready, c);
}

void commit_mgr_remove_fence (commit_mgr_t *cm, const char *name)
int commit_mgr_remove_fence (commit_mgr_t *cm, const char *name)
{
zhash_delete (cm->fences, name);
/* it's dangerous to remove if we're in the middle of an
* interation, so save name for removal later.
*/
if (cm->iterating_fences) {
char *str = strdup (name);

if (!str) {
errno = ENOMEM;
return -1;
}

if (zlist_append (cm->removelist, str) < 0) {
free (str);
errno = ENOMEM;
return -1;
}
}
else
zhash_delete (cm->fences, name);
return 0;
}

int commit_mgr_get_noop_stores (commit_mgr_t *cm)
Expand Down
11 changes: 6 additions & 5 deletions src/modules/kvs/commit.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,19 @@ int commit_mgr_add_fence (commit_mgr_t *cm, fence_t *f);
/* Lookup a fence previously stored via commit_mgr_add_fence(), via name */
fence_t *commit_mgr_lookup_fence (commit_mgr_t *cm, const char *name);

/* Iterate through all stored fences
* - do not call commit_mgr_remove_fence() within the callback, is unsafe
/* Iterate through all not-ready fences
* - this is typically called during a needed cleanup path
*/
int commit_mgr_iter_fences (commit_mgr_t *cm, commit_fence_f cb, void *data);
int commit_mgr_iter_not_ready_fences (commit_mgr_t *cm, commit_fence_f cb,
void *data);

/* commit_mgr_process_fence_request() should be called once per fence
* request, after fence_add_request_data() has been called.
*
* If conditions are correct, will internally create at commit_t and
* store it to a queue of ready to process commits.
*/
int commit_mgr_process_fence_request (commit_mgr_t *cm, fence_t *f);
int commit_mgr_process_fence_request (commit_mgr_t *cm, const char *name);

/* returns true if there are commits ready for processing and are not
* blocked, false if not.
Expand All @@ -145,7 +146,7 @@ commit_t *commit_mgr_get_ready_commit (commit_mgr_t *cm);
void commit_mgr_remove_commit (commit_mgr_t *cm, commit_t *c);

/* remove a fence from the commit manager */
void commit_mgr_remove_fence (commit_mgr_t *cm, const char *name);
int commit_mgr_remove_fence (commit_mgr_t *cm, const char *name);

int commit_mgr_get_noop_stores (commit_mgr_t *cm);
void commit_mgr_clear_noop_stores (commit_mgr_t *cm);
Expand Down
12 changes: 12 additions & 0 deletions src/modules/kvs/fence.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct fence {
json_t *ops;
json_t *names;
int flags;
int aux_int;
};

void fence_destroy (fence_t *f)
Expand Down Expand Up @@ -72,6 +73,7 @@ fence_t *fence_create (const char *name, int nprocs, int flags)
}
f->nprocs = nprocs;
f->flags = flags;
f->aux_int = 0;
if (name) {
if (!(s = json_string (name))) {
saved_errno = ENOMEM;
Expand Down Expand Up @@ -224,6 +226,16 @@ int fence_merge (fence_t *dest, fence_t *src)
return -1;
}

int fence_get_aux_int (fence_t *f)
{
return f->aux_int;
}

void fence_set_aux_int (fence_t *f, int n)
{
f->aux_int = n;
}

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
5 changes: 5 additions & 0 deletions src/modules/kvs/fence.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ int fence_iter_request_copies (fence_t *f, fence_msg_cb cb, void *data);
*/
int fence_merge (fence_t *dest, fence_t *src);

/* Auxiliary convenience data
*/
int fence_get_aux_int (fence_t *f);
void fence_set_aux_int (fence_t *f, int n);

#endif /* !_FLUX_KVS_FENCE_H */

/*
Expand Down
Loading

0 comments on commit a4d438a

Please sign in to comment.