Skip to content

Commit

Permalink
Merge pull request #1227 from chu11/issue1202-part1
Browse files Browse the repository at this point in the history
KVS: Support reading valref objects with multiple blobrefs
  • Loading branch information
garlick authored Oct 17, 2017
2 parents 9f03dda + a1233f6 commit eebb5e7
Show file tree
Hide file tree
Showing 6 changed files with 678 additions and 127 deletions.
104 changes: 76 additions & 28 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ typedef struct {
const char *hash_name;
} kvs_ctx_t;

struct kvs_cb_data {
kvs_ctx_t *ctx;
wait_t *wait;
int errnum;
};

static int setroot_event_send (kvs_ctx_t *ctx, json_t *names);
static int error_event_send (kvs_ctx_t *ctx, json_t *names, int errnum);
static void commit_prep_cb (flux_reactor_t *r, flux_watcher_t *w,
Expand Down Expand Up @@ -444,15 +450,9 @@ static void setroot (kvs_ctx_t *ctx, const char *rootdir, int rootseq)
}
}

struct commit_cb_data {
kvs_ctx_t *ctx;
wait_t *wait;
int errnum;
};

static int commit_load_cb (commit_t *c, const char *ref, void *data)
{
struct commit_cb_data *cbd = data;
struct kvs_cb_data *cbd = data;
bool stall;

/* is_raw flag is always false on commit loads, we will never
Expand All @@ -474,7 +474,7 @@ static int commit_load_cb (commit_t *c, const char *ref, void *data)
*/
static int commit_cache_cb (commit_t *c, struct cache_entry *hp, void *data)
{
struct commit_cb_data *cbd = data;
struct kvs_cb_data *cbd = data;
void *storedata;
int storedatalen = 0;
bool is_raw;
Expand Down Expand Up @@ -529,7 +529,7 @@ static void commit_apply (commit_t *c)
}

if (ret == COMMIT_PROCESS_LOAD_MISSING_REFS) {
struct commit_cb_data cbd;
struct kvs_cb_data cbd;

if (!(wait = wait_create ((wait_cb_f)commit_apply, c))) {
errnum = errno;
Expand All @@ -556,7 +556,7 @@ static void commit_apply (commit_t *c)
goto stall;
}
else if (ret == COMMIT_PROCESS_DIRTY_CACHE_ENTRIES) {
struct commit_cb_data cbd;
struct kvs_cb_data cbd;

if (!(wait = wait_create ((wait_cb_f)commit_apply, c))) {
errnum = errno;
Expand Down Expand Up @@ -714,6 +714,22 @@ static void heartbeat_cb (flux_t *h, flux_msg_handler_t *w,
flux_log_error (ctx->h, "%s: cache_expire_entries", __FUNCTION__);
}

static int lookup_load_cb (lookup_t *lh, const char *ref,
bool raw_data, void *data)
{
struct kvs_cb_data *cbd = data;
bool stall;

if (load (cbd->ctx, ref, raw_data, cbd->wait, &stall) < 0) {
cbd->errnum = errno;
flux_log_error (cbd->ctx->h, "%s: load", __FUNCTION__);
return -1;
}
/* if not stalling, logic issue within code */
assert (stall);
return 0;
}

static void get_request_cb (flux_t *h, flux_msg_handler_t *w,
const flux_msg_t *msg, void *arg)
{
Expand Down Expand Up @@ -769,8 +785,16 @@ static void get_request_cb (flux_t *h, flux_msg_handler_t *w,
assert (ret == 0);
}
else {
int err;

lh = arg;

/* error in prior load(), waited for in flight rpcs to complete */
if ((err = lookup_get_aux_errnum (lh))) {
errno = err;
goto done;
}

ctx = lookup_get_aux_data (lh);
assert (ctx);

Expand All @@ -779,20 +803,28 @@ static void get_request_cb (flux_t *h, flux_msg_handler_t *w,
}

if (!lookup (lh)) {
const char *missing_ref;
bool ref_raw, stall;

missing_ref = lookup_get_missing_ref (lh, &ref_raw);
assert (missing_ref);
struct kvs_cb_data cbd;

if (!(wait = wait_create_msg_handler (h, w, msg, get_request_cb, lh)))
goto done;
if (load (ctx, missing_ref, ref_raw, wait, &stall) < 0) {
flux_log_error (h, "%s: load", __FUNCTION__);

cbd.ctx = ctx;
cbd.wait = wait;
cbd.errnum = 0;

if (lookup_iter_missing_refs (lh, lookup_load_cb, &cbd) < 0) {
errno = cbd.errnum;

/* rpcs already in flight, stall for them to complete */
if (wait_get_usecount (wait) > 0) {
lookup_set_aux_errnum (lh, cbd.errnum);
goto stall;
}

goto done;
}
/* if not stalling, logic issue within code */
assert (stall);

assert (wait_get_usecount (wait) > 0);
goto stall;
}
if (lookup_get_errnum (lh) != 0) {
Expand Down Expand Up @@ -875,8 +907,16 @@ static void watch_request_cb (flux_t *h, flux_msg_handler_t *w,
assert (ret == 0);
}
else {
int err;

lh = arg;

/* error in prior load(), waited for in flight rpcs to complete */
if ((err = lookup_get_aux_errnum (lh))) {
errno = err;
goto done;
}

ctx = lookup_get_aux_data (lh);
assert (ctx);

Expand All @@ -887,20 +927,28 @@ static void watch_request_cb (flux_t *h, flux_msg_handler_t *w,
}

if (!lookup (lh)) {
const char *missing_ref;
bool ref_raw, stall;

missing_ref = lookup_get_missing_ref (lh, &ref_raw);
assert (missing_ref);
struct kvs_cb_data cbd;

if (!(wait = wait_create_msg_handler (h, w, msg, watch_request_cb, lh)))
goto done;
if (load (ctx, missing_ref, ref_raw, wait, &stall) < 0) {
flux_log_error (h, "%s: load", __FUNCTION__);

cbd.ctx = ctx;
cbd.wait = wait;
cbd.errnum = 0;

if (lookup_iter_missing_refs (lh, lookup_load_cb, &cbd) < 0) {
errno = cbd.errnum;

/* rpcs already in flight, stall for them to complete */
if (wait_get_usecount (wait) > 0) {
lookup_set_aux_errnum (lh, cbd.errnum);
goto stall;
}

goto done;
}
/* if not stalling, logic issue within code */
assert (stall);

assert (wait_get_usecount (wait) > 0);
goto stall;
}
if (lookup_get_errnum (lh) != 0) {
Expand Down
Loading

0 comments on commit eebb5e7

Please sign in to comment.