diff --git a/src/common/libkvs/kvs.c b/src/common/libkvs/kvs.c index 7dec935b41ce..c60f98ebc4e2 100644 --- a/src/common/libkvs/kvs.c +++ b/src/common/libkvs/kvs.c @@ -53,7 +53,7 @@ int flux_kvs_wait_version (flux_t *h, int version) if (!(f = flux_rpc_pack (h, "kvs.sync", FLUX_NODEID_ANY, 0, "{ s:i }", "rootseq", version))) goto done; - /* N.B. response contains (rootseq, rootdir) but we don't need it. + /* N.B. response contains (rootseq, rootref) but we don't need it. */ if (flux_future_get (f, NULL) < 0) goto done; diff --git a/src/modules/kvs/cache.c b/src/modules/kvs/cache.c index d1735113905a..74dd281b6402 100644 --- a/src/modules/kvs/cache.c +++ b/src/modules/kvs/cache.c @@ -149,41 +149,42 @@ int cache_entry_get_raw (struct cache_entry *hp, const void **data, return 0; } -int cache_entry_set_raw (struct cache_entry *hp, void *data, int len) +int cache_entry_set_raw (struct cache_entry *hp, const void *data, int len) { - if ((data && len <= 0) || (!data && len)) { + void *cpy = NULL; + + if (!hp || (data && len <= 0) || (!data && len)) { errno = EINVAL; return -1; } - - if (hp) { - if (hp->valid) { - if ((data && hp->data) || (!data && !hp->data)) - free (data); /* no-op, 'data' is assumed identical to hp->data */ - else { - /* attempt to change already valid cache entry */ - errno = EBADE; - return -1; - } - } - else { - hp->data = data; - hp->len = len; - hp->valid = true; - - if (hp->waitlist_valid) { - if (wait_runqueue (hp->waitlist_valid) < 0) { - /* set back to orig */ - hp->data = NULL; - hp->len = 0; - hp->valid = false; - return -1; - } - } + /* It should be a no-op if the entry is already set. + * However, as a sanity check, make sure proposed and existing values match. + */ + if (hp->valid) { + if (len != hp->len || memcmp (data, hp->data, len) != 0) { + errno = EBADE; + return -1; } return 0; } - errno = EINVAL; + if (len > 0) { + if (!(cpy = malloc (len))) + return -1; + memcpy (cpy, data, len); + } + hp->data = cpy; + hp->len = len; + hp->valid = true; + if (hp->waitlist_valid) { + if (wait_runqueue (hp->waitlist_valid) < 0) + goto reset_invalid; + } + return 0; +reset_invalid: + free (hp->data); + hp->data = NULL; + hp->len = 0; + hp->valid = false; return -1; } @@ -198,54 +199,6 @@ const json_t *cache_entry_get_treeobj (struct cache_entry *hp) return hp->o; } -int cache_entry_set_treeobj (struct cache_entry *hp, json_t *o) -{ - if (hp && o) { - if (hp->valid) { - if (!hp->data) { - /* attempt to change already valid cache entry */ - errno = EBADE; - return -1; - } - else - /* no-op, 'o' is assumed identical to current data */ - json_decref (o); - - assert (hp->data); - assert (hp->len > 0); - } else { - assert (!hp->data); - assert (!hp->o); - - if (treeobj_validate (o) < 0) { - errno = EINVAL; - return -1; - } - - if (!(hp->data = treeobj_encode (o))) - return -1; - - hp->len = strlen (hp->data); - hp->o = o; - hp->valid = true; - - if (hp->waitlist_valid) { - if (wait_runqueue (hp->waitlist_valid) < 0) { - /* set back to orig */ - hp->data = NULL; - hp->len = 0; - hp->o = NULL; - hp->valid = false; - return -1; - } - } - } - return 0; - } - errno = EINVAL; - return -1; -} - void cache_entry_destroy (void *arg) { struct cache_entry *hp = arg; diff --git a/src/modules/kvs/cache.h b/src/modules/kvs/cache.h index 990b6c5babbc..75ca0ff09dea 100644 --- a/src/modules/kvs/cache.h +++ b/src/modules/kvs/cache.h @@ -13,7 +13,7 @@ struct cache; /* Create/destroy cache entry. * * cache_entry_create() creates an empty cache entry. Data can be set - * in an entry via cache_entry_set_raw() or cache_entry_set_treeobj(). + * in an entry via cache_entry_set_raw(). */ struct cache_entry *cache_entry_create (void); void cache_entry_destroy (void *arg); @@ -55,12 +55,6 @@ int cache_entry_force_clear_dirty (struct cache_entry *hp); * if it is non-NULL. If 'data' is non-NULL, 'len' must be > 0. If * 'data' is NULL, 'len' must be zero. * - * treeobj set accessor is a convenience function that will take a treeobj - * object and extract the raw data string from it and store that in - * the cache entry. The treeobj object 'o' is also cached internally for - * later retrieval. The create transfers ownership of 'o' to the - * cache entry. 'o' must be non-NULL. - * * treeobj get accessor is a convenience function that will return the * treeobj object equivalent of the raw data stored internally. If the * internal raw data is not a valid treeobj object (i.e. improperly @@ -70,21 +64,16 @@ int cache_entry_force_clear_dirty (struct cache_entry *hp); * both set accessors. * * Generally speaking, a cache entry can only be set once. An attempt - * to set new data in a cache entry will silently succeed. A buffer - * passed to cache_entry_set_raw() will be freed for a cache entry - * that already has data stored. A treeobj object passed to - * cache_entry_set_treeobj() will be json_decref()'d for a cache entry - * that alrdady has data stored. + * to set new data in a cache entry will silently succeed. * - * cache_entry_set_raw() & cache_entry_set_treeobj() & - * cache_entry_clear_data() returns -1 on error, 0 on success + * cache_entry_set_raw() & cache_entry_clear_data() + * return -1 on error, 0 on success */ int cache_entry_get_raw (struct cache_entry *hp, const void **data, int *len); -int cache_entry_set_raw (struct cache_entry *hp, void *data, int len); +int cache_entry_set_raw (struct cache_entry *hp, const void *data, int len); const json_t *cache_entry_get_treeobj (struct cache_entry *hp); -int cache_entry_set_treeobj (struct cache_entry *hp, json_t *o); /* Arrange for message handler represented by 'wait' to be restarted * once cache entry becomes valid or not dirty at completion of a diff --git a/src/modules/kvs/commit.c b/src/modules/kvs/commit.c index 7c5dbeea306f..cfca3b1d6dc9 100644 --- a/src/modules/kvs/commit.c +++ b/src/modules/kvs/commit.c @@ -159,7 +159,7 @@ const char *commit_get_newroot_ref (commit_t *c) * blobref in the cache, any waiters for a valid cache entry would * have been satisfied when the dirty cache entry was put onto * this dirty cache list (i.e. in store_cache() below when - * cache_entry_set_treeobj() was called). + * cache_entry_set_raw() was called). */ void commit_cleanup_dirty_cache_entry (commit_t *c, struct cache_entry *hp) { @@ -206,7 +206,7 @@ static int store_cache (commit_t *c, int current_epoch, json_t *o, bool is_raw, href_t ref, struct cache_entry **hpp) { struct cache_entry *hp; - int saved_errno, rc = -1; + int saved_errno, rc; const char *xdata; char *data = NULL; int xlen, len; @@ -216,15 +216,13 @@ static int store_cache (commit_t *c, int current_epoch, json_t *o, xlen = strlen (xdata); len = base64_decode_length (xlen); if (!(data = malloc (len))) { - saved_errno = errno; flux_log_error (c->cm->h, "malloc"); - goto done; + goto error; } if (base64_decode_block (data, &len, xdata, xlen) < 0) { - free (data); - saved_errno = errno; flux_log_error (c->cm->h, "base64_decode_block"); - goto done; + errno = EPROTO; + goto error; } /* len from base64_decode_length() always > 0 b/c of NUL byte, * but len after base64_decode_block() can be zero. Adjust if @@ -233,66 +231,54 @@ static int store_cache (commit_t *c, int current_epoch, json_t *o, free (data); data = NULL; } - blobref_hash (c->cm->hash_name, data, len, ref, sizeof (href_t)); } else { - if (treeobj_hash (c->cm->hash_name, o, ref, sizeof (href_t)) < 0) { - saved_errno = errno; - flux_log_error (c->cm->h, "treeobj_hash"); - goto done; + if (treeobj_validate (o) < 0 || !(data = treeobj_encode (o))) { + flux_log_error (c->cm->h, "%s: treeobj_encode", __FUNCTION__); + goto error; } + len = strlen (data); + } + if (blobref_hash (c->cm->hash_name, data, len, ref, sizeof (href_t)) < 0) { + flux_log_error (c->cm->h, "%s: blobref_hash", __FUNCTION__); + goto error; } if (!(hp = cache_lookup (c->cm->cache, ref, current_epoch))) { if (!(hp = cache_entry_create ())) { - saved_errno = ENOMEM; - goto done; + flux_log_error (c->cm->h, "%s: cache_entry_create", __FUNCTION__); + goto error; } cache_insert (c->cm->cache, ref, hp); } if (cache_entry_get_valid (hp)) { c->cm->noop_stores++; - if (is_raw) - free (data); rc = 0; - } else { - if (is_raw) { - if (cache_entry_set_raw (hp, data, len) < 0) { - int ret; - saved_errno = errno; - free (data); - ret = cache_remove_entry (c->cm->cache, ref); - assert (ret == 1); - goto done; - } - } - else { - json_incref (o); - if (cache_entry_set_treeobj (hp, o) < 0) { - int ret; - saved_errno = errno; - json_decref (o); - ret = cache_remove_entry (c->cm->cache, ref); - assert (ret == 1); - goto done; - } + } + else { + if (cache_entry_set_raw (hp, data, len) < 0) { + int ret; + ret = cache_remove_entry (c->cm->cache, ref); + assert (ret == 1); + goto error; } if (cache_entry_set_dirty (hp, true) < 0) { - /* cache entry now owns data, cache_remove_entry - * will decref/free object/data */ + flux_log_error (c->cm->h, "%s: cache_entry_set_dirty",__FUNCTION__); int ret; - saved_errno = errno; ret = cache_remove_entry (c->cm->cache, ref); assert (ret == 1); - goto done; + goto error; } rc = 1; } *hpp = hp; + free (data); return rc; - done: +error: + saved_errno = errno; + free (data); errno = saved_errno; - return rc; + return -1; } /* Store DIRVAL objects, converting them to DIRREFs. diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index 80f2ff62370d..d146740bdc6a 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -63,11 +63,15 @@ const int max_lastuse_age = 5; */ const bool event_includes_rootdir = true; +struct kvsroot { + int seq; + href_t ref; +}; + typedef struct { int magic; struct cache *cache; /* blobref => cache_entry */ - href_t rootdir; /* current root blobref */ - int rootseq; /* current root version (for ordering) */ + struct kvsroot root; commit_mgr_t *cm; waitqueue_t *watchlist; int watchlist_lastrun_epoch; @@ -179,7 +183,6 @@ static void content_load_completion (flux_future_t *f, void *arg) int size; const char *blobref; struct cache_entry *hp; - char *datacpy = NULL; if (flux_content_load_get (f, &data, &size) < 0) { flux_log_error (ctx->h, "%s: flux_content_load_get", __FUNCTION__); @@ -205,15 +208,7 @@ static void content_load_completion (flux_future_t *f, void *arg) * timeout or eventually give up, so the KVS can continue along * its merry way. So we just log the error. */ - if (size) { - if (!(datacpy = malloc (size))) { - flux_log_error (ctx->h, "%s: malloc", __FUNCTION__); - goto done; - } - memcpy (datacpy, data, size); - } - - if (cache_entry_set_raw (hp, datacpy, size) < 0) { + if (cache_entry_set_raw (hp, data, size) < 0) { flux_log_error (ctx->h, "%s: cache_entry_set_raw", __FUNCTION__); goto done; } @@ -387,12 +382,12 @@ static int content_store_request_send (kvs_ctx_t *ctx, const void *data, return rc; } -static void setroot (kvs_ctx_t *ctx, const char *rootdir, int rootseq) +static void setroot (kvs_ctx_t *ctx, const char *rootref, int rootseq) { - if (rootseq == 0 || rootseq > ctx->rootseq) { - assert (strlen (rootdir) < sizeof (href_t)); - strcpy (ctx->rootdir, rootdir); - ctx->rootseq = rootseq; + if (rootseq == 0 || rootseq > ctx->root.seq) { + assert (strlen (rootref) < sizeof (href_t)); + strcpy (ctx->root.ref, rootref); + ctx->root.seq = rootseq; /* log error on wait_runqueue(), don't error out. watchers * may miss value change, but will never get older one. * Maintains consistency model */ @@ -470,7 +465,7 @@ static void commit_apply (commit_t *c) if ((ret = commit_process (c, ctx->epoch, - ctx->rootdir)) == COMMIT_PROCESS_ERROR) { + ctx->root.ref)) == COMMIT_PROCESS_ERROR) { errnum = commit_get_errnum (c); goto done; } @@ -532,7 +527,7 @@ static void commit_apply (commit_t *c) /* else ret == COMMIT_PROCESS_FINISHED */ /* This is the transaction that finalizes the commit by replacing - * ctx->rootdir with newroot, incrementing the root seq, + * ctx->root.ref with newroot, incrementing ctx->root.seq, * and sending out the setroot event for "eventual consistency" * of other nodes. */ @@ -546,7 +541,7 @@ static void commit_apply (commit_t *c) flux_log (ctx->h, LOG_DEBUG, "aggregated %d commits (%d ops)", count, opcount); } - setroot (ctx, commit_get_newroot_ref (c), ctx->rootseq + 1); + setroot (ctx, commit_get_newroot_ref (c), ctx->root.seq + 1); setroot_event_send (ctx, fence_get_json_names (f)); } else { fence_t *f = commit_get_fence (c); @@ -655,7 +650,7 @@ static void heartbeat_cb (flux_t *h, flux_msg_handler_t *w, ctx->watchlist_lastrun_epoch = ctx->epoch; } /* "touch" root */ - (void)cache_lookup (ctx->cache, ctx->rootdir, ctx->epoch); + (void)cache_lookup (ctx->cache, ctx->root.ref, ctx->epoch); if (cache_expire_entries (ctx->cache, ctx->epoch, max_lastuse_age) < 0) flux_log_error (ctx->h, "%s: cache_expire_entries", __FUNCTION__); @@ -720,7 +715,7 @@ static void get_request_cb (flux_t *h, flux_msg_handler_t *w, if (!(lh = lookup_create (ctx->cache, ctx->epoch, - ctx->rootdir, + ctx->root.ref, root_ref, key, h, @@ -842,7 +837,7 @@ static void watch_request_cb (flux_t *h, flux_msg_handler_t *w, if (!(lh = lookup_create (ctx->cache, ctx->epoch, - ctx->rootdir, + ctx->root.ref, NULL, key, h, @@ -1228,7 +1223,7 @@ static void sync_request_cb (flux_t *h, flux_msg_handler_t *w, flux_log_error (h, "%s: flux_request_unpack", __FUNCTION__); goto error; } - if (ctx->rootseq < rootseq) { + if (ctx->root.seq < rootseq) { if (!(wait = wait_create_msg_handler (h, w, msg, sync_request_cb, arg))) goto error; if (wait_addqueue (ctx->watchlist, wait) < 0) { @@ -1240,8 +1235,8 @@ static void sync_request_cb (flux_t *h, flux_msg_handler_t *w, return; /* stall */ } if (flux_respond_pack (h, msg, "{ s:i s:s }", - "rootseq", ctx->rootseq, - "rootdir", ctx->rootdir) < 0) { + "rootseq", ctx->root.seq, + "rootref", ctx->root.ref) < 0) { flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); goto error; } @@ -1261,8 +1256,8 @@ static void getroot_request_cb (flux_t *h, flux_msg_handler_t *w, if (flux_request_decode (msg, NULL, NULL) < 0) goto error; if (flux_respond_pack (h, msg, "{ s:i s:s }", - "rootseq", ctx->rootseq, - "rootdir", ctx->rootdir) < 0) { + "rootseq", ctx->root.seq, + "rootref", ctx->root.ref) < 0) { flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); goto error; } @@ -1273,7 +1268,7 @@ static void getroot_request_cb (flux_t *h, flux_msg_handler_t *w, flux_log_error (h, "%s: flux_respond", __FUNCTION__); } -static int getroot_rpc (kvs_ctx_t *ctx, int *rootseq, href_t rootdir) +static int getroot_rpc (kvs_ctx_t *ctx, int *rootseq, href_t rootref) { flux_future_t *f; const char *ref; @@ -1285,7 +1280,7 @@ static int getroot_rpc (kvs_ctx_t *ctx, int *rootseq, href_t rootdir) } if (flux_rpc_get_unpack (f, "{ s:i s:s }", "rootseq", rootseq, - "rootdir", &ref) < 0) { + "rootref", &ref) < 0) { saved_errno = errno; flux_log_error (ctx->h, "%s: flux_rpc_get_unpack", __FUNCTION__); goto done; @@ -1294,7 +1289,7 @@ static int getroot_rpc (kvs_ctx_t *ctx, int *rootseq, href_t rootdir) saved_errno = EPROTO; goto done; } - strcpy (rootdir, ref); + strcpy (rootref, ref); rc = 0; done: flux_future_destroy (f); @@ -1347,76 +1342,76 @@ static int error_event_send (kvs_ctx_t *ctx, json_t *names, int errnum) return rc; } -/* Alter the (rootdir, rootseq) in response to a setroot event. +/* Optimization: the current rootdir object is optionally included + * in the kvs.setroot event. Prime the local cache with it. + * If there are complications, just skip it. Not critical. + */ +static void prime_cache_with_rootdir (kvs_ctx_t *ctx, json_t *rootdir) +{ + struct cache_entry *hp; + href_t ref; + void *data = NULL; + int len; + + if (treeobj_validate (rootdir) < 0 || !treeobj_is_dir (rootdir)) { + flux_log (ctx->h, LOG_ERR, "%s: invalid rootdir", __FUNCTION__); + goto done; + } + if (!(data = treeobj_encode (rootdir))) { + flux_log_error (ctx->h, "%s: treeobj_encode", __FUNCTION__); + goto done; + } + len = strlen (data); + if (blobref_hash (ctx->hash_name, data, len, ref, sizeof (href_t)) < 0) { + flux_log_error (ctx->h, "%s: blobref_hash", __FUNCTION__); + goto done; + } + if ((hp = cache_lookup (ctx->cache, ref, ctx->epoch))) + goto done; // already in cache, possibly dirty/invalid - we don't care + if (!(hp = cache_entry_create ())) { + flux_log_error (ctx->h, "%s: cache_entry_create", __FUNCTION__); + goto done; + } + if (cache_entry_set_raw (hp, data, len) < 0) { + flux_log_error (ctx->h, "%s: cache_entry_set_raw", __FUNCTION__); + cache_entry_destroy (hp); + goto done; + } + cache_insert (ctx->cache, ref, hp); +done: + free (data); +} + +/* Alter the (rootref, rootseq) in response to a setroot event. */ static void setroot_event_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; int rootseq; - const char *rootdir; - json_t *root = NULL; + const char *rootref; + json_t *rootdir = NULL; json_t *names = NULL; if (flux_event_unpack (msg, NULL, "{ s:i s:s s:o s:o }", "rootseq", &rootseq, - "rootdir", &rootdir, + "rootref", &rootref, "names", &names, - "rootdirval", &root) < 0) { + "rootdir", &rootdir) < 0) { flux_log_error (ctx->h, "%s: flux_event_unpack", __FUNCTION__); return; } finalize_fences_bynames (ctx, names, 0); - /* Copy of root object (corresponding to rootdir blobref) was included - * in the setroot event as an optimization, since it would otherwise - * be loaded from the content store on next KVS access - immediate - * if there are watchers. Store this object in the KVS cache - * with clear dirty bit as it is already valid in the content store. + + /* Optimization: prime local cache with directory object, if provided + * in event message. Ignore failure here - object will be fetched on + * demand from content cache if not in local cache. */ - if (!json_is_null (root)) { - struct cache_entry *hp; - if ((hp = cache_lookup (ctx->cache, rootdir, ctx->epoch))) { - if (!cache_entry_get_valid (hp)) { - /* On error, bad that we can't cache new root, but - * no consistency issue by not caching. We will still - * set new root below via setroot(). - */ - if (cache_entry_set_treeobj (hp, json_incref (root)) < 0) { - flux_log_error (ctx->h, "%s: cache_entry_set_treeobj", - __FUNCTION__); - json_decref (root); - } - } - if (cache_entry_get_dirty (hp)) { - /* If it was dirty, an RPC is already in process, so - * let that RPC handle any error handling with this - * cache entry, we just log this error. - */ - if (cache_entry_set_dirty (hp, false) < 0) - flux_log_error (ctx->h, "%s: cache_entry_set_dirty", - __FUNCTION__); - } - } else { - /* On error, bad that we can't cache new root, but - * no consistency issue by not caching. We will still - * set new root below via setroot(). - */ - if (!(hp = cache_entry_create ())) { - flux_log_error (ctx->h, "%s: cache_entry_create", - __FUNCTION__); - } - else if (cache_entry_set_treeobj (hp, json_incref (root)) < 0) { - flux_log_error (ctx->h, "%s: cache_entry_set_treeobj", - __FUNCTION__); - json_decref (root); - cache_entry_destroy (hp); - } - else - cache_insert (ctx->cache, rootdir, hp); - } - } - setroot (ctx, rootdir, rootseq); + if (!json_is_null (rootdir)) + prime_cache_with_rootdir (ctx, rootdir); + + setroot (ctx, rootref, rootseq); } static int setroot_event_send (kvs_ctx_t *ctx, json_t *names) @@ -1430,7 +1425,7 @@ static int setroot_event_send (kvs_ctx_t *ctx, json_t *names) if (event_includes_rootdir) { struct cache_entry *hp; - if ((hp = cache_lookup (ctx->cache, ctx->rootdir, ctx->epoch))) + if ((hp = cache_lookup (ctx->cache, ctx->root.ref, ctx->epoch))) root = cache_entry_get_treeobj (hp); assert (root != NULL); // root entry is always in cache on rank 0 } @@ -1443,10 +1438,10 @@ static int setroot_event_send (kvs_ctx_t *ctx, json_t *names) root = nullobj; } if (!(msg = flux_event_pack ("kvs.setroot", "{ s:i s:s s:O s:O }", - "rootseq", ctx->rootseq, - "rootdir", ctx->rootdir, + "rootseq", ctx->root.seq, + "rootref", ctx->root.ref, "names", names, - "rootdirval", root))) { + "rootdir", root))) { saved_errno = errno; flux_log_error (ctx->h, "%s: flux_event_pack", __FUNCTION__); goto done; @@ -1541,7 +1536,7 @@ static void stats_get_cb (flux_t *h, flux_msg_handler_t *w, "#watchers", wait_queue_length (ctx->watchlist), "#no-op stores", commit_mgr_get_noop_stores (ctx->cm), "#faults", ctx->faults, - "store revision", ctx->rootseq) < 0) { + "store revision", ctx->root.seq) < 0) { flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); goto done; } @@ -1611,85 +1606,81 @@ static void process_args (kvs_ctx_t *ctx, int ac, char **av) } } -/* Store initial rootdir in local cache, and flush to content - * cache synchronously. - * Object reference is given to this function, it will either give it - * to the cache or decref it. +/* Store initial root in local cache, and flush to content + * cache synchronously. If 'rootdir' is NULL, store an empty one. + * The corresponding blobref is written into 'ref'. */ -static int store_initial_rootdir (kvs_ctx_t *ctx, json_t *o, href_t ref) +static int store_initial_rootdir (kvs_ctx_t *ctx, const json_t *rootdir, + href_t ref) { struct cache_entry *hp; - int rc = -1; int saved_errno, ret; + void *data = NULL; + int len; + flux_future_t *f = NULL; + const char *newref; + json_t *empty_rootdir = NULL; - if (treeobj_hash (ctx->hash_name, o, ref, sizeof (href_t)) < 0) { - saved_errno = errno; - flux_log_error (ctx->h, "%s: treeobj_hash", - __FUNCTION__); - goto decref_done; + if (!rootdir) { + if (!(empty_rootdir = treeobj_create_dir ())) { + flux_log_error (ctx->h, "%s: treeobj_create_dir", __FUNCTION__); + goto error; + } + rootdir = empty_rootdir; + } + if (treeobj_validate (rootdir) < 0 || !treeobj_is_dir (rootdir)) { + errno = EINVAL; + goto error; + } + if (!(data = treeobj_encode (rootdir))) + goto error; + len = strlen (data); + if (blobref_hash (ctx->hash_name, data, len, ref, sizeof (href_t)) < 0) { + flux_log_error (ctx->h, "%s: blobref_hash", __FUNCTION__); + goto error; } if (!(hp = cache_lookup (ctx->cache, ref, ctx->epoch))) { if (!(hp = cache_entry_create ())) { - saved_errno = errno; - flux_log_error (ctx->h, "%s: cache_entry_create", - __FUNCTION__); - goto decref_done; + flux_log_error (ctx->h, "%s: cache_entry_create", __FUNCTION__); + goto error; } cache_insert (ctx->cache, ref, hp); } if (!cache_entry_get_valid (hp)) { - const void *data; - int len; - assert (o); - if (cache_entry_set_treeobj (hp, o) < 0) { - saved_errno = errno; - flux_log_error (ctx->h, "%s: cache_entry_set_treeobj", - __FUNCTION__); - ret = cache_remove_entry (ctx->cache, ref); - assert (ret == 1); - goto decref_done; + if (cache_entry_set_raw (hp, data, len) < 0) { // makes entry valid + flux_log_error (ctx->h, "%s: cache_entry_set_raw", __FUNCTION__); + goto error_uncache; } - if (cache_entry_set_dirty (hp, true) < 0) { - /* remove entry will decref object */ - saved_errno = errno; - flux_log_error (ctx->h, "%s: cache_entry_set_dirty", __FUNCTION__); - ret = cache_remove_entry (ctx->cache, ref); - assert (ret == 1); - goto done_error; - } - if (cache_entry_get_raw (hp, &data, &len) < 0) { - /* remove entry will decref object */ - saved_errno = errno; - flux_log_error (ctx->h, "%s: cache_entry_get_raw", __FUNCTION__); - ret = cache_remove_entry (ctx->cache, ref); - assert (ret == 1); - goto done_error; + if (!(f = flux_content_store (ctx->h, data, len, 0)) + || flux_content_store_get (f, &newref) < 0) { + flux_log_error (ctx->h, "%s: flux_content_store", __FUNCTION__); + goto error_uncache; } - if (content_store_request_send (ctx, data, len, true) < 0) { - /* Must clean up, don't want cache entry to be assumed - * valid. Everything here is synchronous and w/o waiters, - * so nothing should error here */ - saved_errno = errno; - flux_log_error (ctx->h, "%s: content_store_request_send", - __FUNCTION__); - ret = cache_entry_clear_dirty (hp); - assert (ret == 0); - assert (cache_entry_get_dirty (hp) == false); - ret = cache_remove_entry (ctx->cache, ref); - assert (ret == 1); - goto done_error; + /* Sanity check that content cache is using the same hash alg as KVS. + * It should suffice to do this once at startup. + */ + if (strcmp (newref, ref) != 0) { + errno = EPROTO; + flux_log_error (ctx->h, "%s: hash mismatch kvs=%s content=%s", + __FUNCTION__, ref, newref); + goto error_uncache; } - } else - json_decref (o); - rc = 0; - return rc; - -decref_done: - json_decref (o); -done_error: - if (rc < 0) - errno = saved_errno; - return rc; + } + free (data); + flux_future_destroy (f); + json_decref (empty_rootdir); + return 0; +error_uncache: + saved_errno = errno; + ret = cache_remove_entry (ctx->cache, ref); + assert (ret == 1); +error: + saved_errno = errno; + free (data); + flux_future_destroy (f); + json_decref (empty_rootdir); + errno = saved_errno; + return -1; } int mod_main (flux_t *h, int argc, char **argv) @@ -1711,19 +1702,13 @@ int mod_main (flux_t *h, int argc, char **argv) goto done; } if (ctx->rank == 0) { - json_t *rootdir; - href_t href; - - if (!(rootdir = treeobj_create_dir ())) { - flux_log_error (h, "treeobj_create_dir"); - goto done; - } + href_t rootref; - if (store_initial_rootdir (ctx, rootdir, href) < 0) { - flux_log_error (h, "storing root object"); + if (store_initial_rootdir (ctx, NULL, rootref) < 0) { + flux_log_error (h, "storing initial root object"); goto done; } - setroot (ctx, href, 0); + setroot (ctx, rootref, 0); } else { href_t href; int rootseq; diff --git a/src/modules/kvs/test/cache.c b/src/modules/kvs/test/cache.c index fbf371a11a9c..0771773d3c67 100644 --- a/src/modules/kvs/test/cache.c +++ b/src/modules/kvs/test/cache.c @@ -10,6 +10,28 @@ #include "src/modules/kvs/waitqueue.h" #include "src/modules/kvs/cache.h" +static int cache_entry_set_treeobj (struct cache_entry *hp, const json_t *o) +{ + char *s = NULL; + int saved_errno; + int rc = -1; + + if (!hp || !o || treeobj_validate (o) < 0) { + errno = EINVAL; + goto done; + } + if (!(s = treeobj_encode (o))) + goto done; + if (cache_entry_set_raw (hp, s, strlen (s)) < 0) + goto done; + rc = 0; +done: + saved_errno = errno; + free (s); + errno = saved_errno; + return rc; +} + void wait_cb (void *arg) { int *count = arg; @@ -111,18 +133,11 @@ void cache_entry_raw_tests (void) ok (cache_entry_get_valid (e) == true, "cache entry now valid after cache_entry_set_raw call"); - /* cache_entry_set_raw will free data2 */ ok (cache_entry_set_raw (e, data2, strlen (data) + 1) == 0, "cache_entry_set_raw again, silent success"); ok (cache_entry_set_raw (e, NULL, 0) < 0 && errno == EBADE, "cache_entry_set_raw fails with EBADE, changing validity type"); - o1 = treeobj_create_val ("foo", 3); - /* cache_entry_set_treeobj will json_decref o1 */ - ok (cache_entry_set_treeobj (e, o1) == 0, - "cache_entry_set_treeobj, silent success"); - o1 = NULL; - ok (cache_entry_get_raw (e, (const void **)&datatmp, &len) == 0, "raw data retrieved from cache entry"); ok (datatmp && strcmp (datatmp, data) == 0, @@ -168,7 +183,6 @@ void cache_entry_raw_tests (void) ok (cache_entry_set_raw (e, data, strlen (data) + 1) < 0 && errno == EBADE, "cache_entry_set_raw fails with EBADE, changing validity type"); - free (data); o1 = treeobj_create_val ("foo", 3); ok (cache_entry_set_treeobj (e, o1) < 0 @@ -184,90 +198,9 @@ void cache_entry_raw_tests (void) ok (len == 0, "raw data length is zero"); - cache_entry_destroy (e); /* destroys data */ - e = NULL; -} - -void cache_entry_treeobj_tests (void) -{ - struct cache_entry *e; - json_t *o1, *o2, *otest; - const json_t *otmp; - char *data; - - /* Play with one entry. - * N.B.: json ref is NOT incremented by create or get_treeobj. - */ - - /* test empty cache entry later filled with treeobj. - */ - - o1 = treeobj_create_val ("foo", 3); - o2 = treeobj_create_val ("foo", 3); - - data = strdup ("abcd"); - - ok ((e = cache_entry_create ()) != NULL, - "cache_entry_create works"); - ok (cache_entry_get_valid (e) == false, - "cache entry initially non-valid"); - ok (cache_entry_get_dirty (e) == false, - "cache entry initially not dirty"); - ok (cache_entry_set_dirty (e, true) < 0, - "cache_entry_set_dirty fails b/c entry non-valid"); - ok (cache_entry_get_dirty (e) == false, - "cache entry does not set dirty, b/c no data"); - ok (cache_entry_get_treeobj (e) == NULL, - "cache_entry_get_treeobj returns NULL, no treeobj set"); - ok (cache_entry_set_treeobj (e, o1) == 0, - "cache_entry_set_treeobj success"); - - /* cache_entry_set_treeobj will json_decref o2 */ - ok (cache_entry_set_treeobj (e, o2) == 0, - "cache_entry_set_treeobj again, silent success"); - o2 = NULL; - - ok (cache_entry_get_valid (e) == true, - "cache entry now valid after cache_entry_set_treeobj call"); - - /* cache_entry_set_raw will free data */ - ok (cache_entry_set_raw (e, data, 4) == 0, - "cache_entry_set_raw, silent success"); - data = NULL; - - ok (cache_entry_set_raw (e, NULL, 0) < 0 - && errno == EBADE, - "cache_entry_set_raw fails with EBADE, changing validity type"); - - ok (cache_entry_set_dirty (e, true) == 0, - "cache_entry_set_dirty success"); - ok (cache_entry_get_dirty (e) == true, - "cache entry succcessfully set dirty"); - - ok (cache_entry_clear_dirty (e) == 0, - "cache_entry_clear_dirty success"); - ok (cache_entry_get_dirty (e) == false, - "cache entry succcessfully now not dirty, b/c no waiters"); - - ok (cache_entry_set_dirty (e, true) == 0, - "cache_entry_set_dirty success"); - ok (cache_entry_get_dirty (e) == true, - "cache entry succcessfully set dirty"); - ok (cache_entry_force_clear_dirty (e) == 0, - "cache_entry_force_clear_dirty success"); - ok (cache_entry_get_dirty (e) == false, - "cache entry succcessfully now not dirty"); - - ok ((otmp = cache_entry_get_treeobj (e)) != NULL, - "treeobj retrieved from cache entry"); - otest = treeobj_create_val ("foo", 3); - /* XXX - json_equal takes const in jansson > 2.10 */ - ok (json_equal ((json_t *)otmp, otest) == 1, - "expected treeobj object returned"); - json_decref (otest); - - cache_entry_destroy (e); /* destroys o1 */ - e = NULL; + cache_entry_destroy (e); + free (data); + free (data2); } void cache_entry_raw_and_treeobj_tests (void) @@ -291,6 +224,7 @@ void cache_entry_raw_and_treeobj_tests (void) ok (cache_entry_get_treeobj (e) == NULL, "cache_entry_get_treeobj returns NULL for non-treeobj raw data"); cache_entry_destroy (e); + free (data); /* test cache entry filled with zero length raw data */ @@ -320,6 +254,7 @@ void cache_entry_raw_and_treeobj_tests (void) ok (json_equal ((json_t *)otmp, otest) == true, "treeobj returned from cache entry correct"); json_decref (o1); + free (data); cache_entry_destroy (e); /* test cache entry filled with treeobj and get raw data */ @@ -333,20 +268,24 @@ void cache_entry_raw_and_treeobj_tests (void) "cache_entry_set_treeobj success"); ok (cache_entry_get_raw (e, (const void **)&datatmp, &len) == 0, "cache_entry_get_raw returns success for get treeobj raw data"); - ok (datatmp && strcmp (datatmp, data) == 0, + ok (datatmp && strncmp (datatmp, data, len) == 0, "raw data matches expected string version of treeobj"); ok (datatmp && (len == strlen (data)), "raw data length matches expected length of treeobj string"); + json_decref (o1); + free (data); cache_entry_destroy (e); } -void waiter_raw_tests (void) +void waiter_tests (void) { struct cache_entry *e; char *data; wait_t *w; int count; + data = strdup ("abcd"); + /* Test cache entry waiters. * N.B. waiter is destroyed when run. */ @@ -363,7 +302,6 @@ void waiter_raw_tests (void) "cache_entry_force_clear_dirty returns error, b/c no object set"); ok (cache_entry_wait_valid (e, w) == 0, "cache_entry_wait_valid success"); - data = strdup ("abcd"); ok (cache_entry_set_raw (e, data, strlen (data) + 1) == 0, "cache_entry_set_raw success"); ok (cache_entry_get_valid (e) == true, @@ -408,7 +346,6 @@ void waiter_raw_tests (void) "waiter callback not called on force clear dirty"); cache_entry_destroy (e); /* destroys data */ - e = NULL; /* set cache entry to zero-data, should also call get valid * waiter */ @@ -428,77 +365,8 @@ void waiter_raw_tests (void) ok (count == 1, "waiter callback ran"); cache_entry_destroy (e); /* destroys data */ - e = NULL; -} - -void waiter_treeobj_tests (void) -{ - struct cache_entry *e; - json_t *o; - wait_t *w; - int count; - /* Test cache entry waiters. - * N.B. waiter is destroyed when run. - */ - count = 0; - ok ((w = wait_create (wait_cb, &count)) != NULL, - "wait_create works"); - ok ((e = cache_entry_create ()) != NULL, - "cache_entry_create created empty object"); - ok (cache_entry_get_valid (e) == false, - "cache entry invalid, adding waiter"); - ok (cache_entry_clear_dirty (e) < 0, - "cache_entry_clear_dirty returns error, b/c no object set"); - ok (cache_entry_force_clear_dirty (e) < 0, - "cache_entry_force_clear_dirty returns error, b/c no object set"); - o = treeobj_create_val ("foo", 3); - ok (cache_entry_wait_valid (e, w) == 0, - "cache_entry_wait_valid success"); - ok (cache_entry_set_treeobj (e, o) == 0, - "cache_entry_set_treeobj success"); - ok (cache_entry_get_valid (e) == true, - "cache entry set valid with one waiter"); - ok (count == 1, - "waiter callback ran"); - - count = 0; - ok ((w = wait_create (wait_cb, &count)) != NULL, - "wait_create works"); - ok (cache_entry_set_dirty (e, true) == 0, - "cache_entry_set_dirty success"); - ok (cache_entry_get_dirty (e) == true, - "cache entry set dirty, adding waiter"); - ok (cache_entry_wait_notdirty (e, w) == 0, - "cache_entry_wait_notdirty success"); - ok (cache_entry_clear_dirty (e) == 0, - "cache_entry_clear_dirty success"); - ok (cache_entry_get_dirty (e) == true, - "cache entry still dirty, b/c of a waiter"); - ok (cache_entry_set_dirty (e, false) == 0, - "cache_entry_set_dirty success"); - ok (cache_entry_get_dirty (e) == false, - "cache entry set not dirty with one waiter"); - ok (count == 1, - "waiter callback ran"); - - count = 0; - ok ((w = wait_create (wait_cb, &count)) != NULL, - "wait_create works"); - ok (cache_entry_set_dirty (e, true) == 0, - "cache_entry_set_dirty success"); - ok (cache_entry_get_dirty (e) == true, - "cache entry set dirty, adding waiter"); - ok (cache_entry_wait_notdirty (e, w) == 0, - "cache_entry_wait_notdirty success"); - ok (cache_entry_force_clear_dirty (e) == 0, - "cache_entry_force_clear_dirty success"); - ok (cache_entry_get_dirty (e) == false, - "cache entry set not dirty with one waiter"); - ok (count == 0, - "waiter callback not called on force clear dirty"); - - cache_entry_destroy (e); /* destroys o */ + free (data); } void cache_remove_entry_tests (void) @@ -541,6 +409,7 @@ void cache_remove_entry_tests (void) o = treeobj_create_val ("foobar", 6); ok (cache_entry_set_treeobj (e, o) == 0, "cache_entry_set_treeobj success"); + json_decref (o); ok (cache_entry_get_valid (e) == true, "cache entry set valid with one waiter"); ok (count == 1, @@ -553,11 +422,12 @@ void cache_remove_entry_tests (void) count = 0; ok ((w = wait_create (wait_cb, &count)) != NULL, "wait_create works"); - o = treeobj_create_val ("foobar", 6); ok ((e = cache_entry_create ()) != NULL, "cache_entry_create works"); + o = treeobj_create_val ("foobar", 6); ok (cache_entry_set_treeobj (e, o) == 0, "cache_entry_set_treeobj success"); + json_decref (o); cache_insert (cache, "remove-ref", e); ok (cache_lookup (cache, "remove-ref", 0) != NULL, "cache_lookup verify entry exists"); @@ -634,6 +504,7 @@ void cache_expiration_tests (void) "cache_entry_create works"); ok (cache_entry_set_treeobj (e3, o1) == 0, "cache_entry_set_treeobj success"); + json_decref (o1); cache_insert (cache, "xxx2", e3); ok (cache_count_entries (cache) == 2, "cache contains 2 entries after insert"); @@ -692,10 +563,8 @@ int main (int argc, char *argv[]) cache_tests (); cache_entry_basic_tests (); cache_entry_raw_tests (); - cache_entry_treeobj_tests (); cache_entry_raw_and_treeobj_tests (); - waiter_raw_tests (); - waiter_treeobj_tests (); + waiter_tests (); cache_expiration_tests (); cache_remove_entry_tests (); diff --git a/src/modules/kvs/test/commit.c b/src/modules/kvs/test/commit.c index 8f785e25894d..aecf81d1aa35 100644 --- a/src/modules/kvs/test/commit.c +++ b/src/modules/kvs/test/commit.c @@ -18,6 +18,28 @@ static int test_global = 5; +static int cache_entry_set_treeobj (struct cache_entry *hp, const json_t *o) +{ + char *s = NULL; + int saved_errno; + int rc = -1; + + if (!hp || !o || treeobj_validate (o) < 0) { + errno = EINVAL; + goto done; + } + if (!(s = treeobj_encode (o))) + goto done; + if (cache_entry_set_raw (hp, s, strlen (s)) < 0) + goto done; + rc = 0; +done: + saved_errno = errno; + free (s); + errno = saved_errno; + return rc; +} + /* convenience function */ static struct cache_entry *create_cache_entry_raw (void *data, int len) { diff --git a/src/modules/kvs/test/lookup.c b/src/modules/kvs/test/lookup.c index 34a423215c17..66ac256020ad 100644 --- a/src/modules/kvs/test/lookup.c +++ b/src/modules/kvs/test/lookup.c @@ -20,6 +20,28 @@ struct lookup_ref_data int count; }; +static int cache_entry_set_treeobj (struct cache_entry *hp, const json_t *o) +{ + char *s = NULL; + int saved_errno; + int rc = -1; + + if (!hp || !o || treeobj_validate (o) < 0) { + errno = EINVAL; + goto done; + } + if (!(s = treeobj_encode (o))) + goto done; + if (cache_entry_set_raw (hp, s, strlen (s)) < 0) + goto done; + rc = 0; +done: + saved_errno = errno; + free (s); + errno = saved_errno; + return rc; +} + /* convenience function */ static struct cache_entry *create_cache_entry_raw (void *data, int len) { @@ -725,12 +747,10 @@ void lookup_errors (void) { json_t *dirref; json_t *dir; json_t *dirref_multi; - json_t *valref_multi_overflow; struct cache *cache; lookup_t *lh; href_t dirref_ref; href_t valref_ref; - href_t valref_long_ref; href_t root_ref; ok ((cache = cache_create ()) != NULL, @@ -741,9 +761,6 @@ void lookup_errors (void) { * valref_ref * "abcd" * - * valref_long_ref - * "long", but invalid length on buffer - * * dirref_ref * "val" : val to "bar" * @@ -756,22 +773,12 @@ void lookup_errors (void) { * "dirref" : dirref to dirref_ref * "dir" : dir w/ "val" : val to "baz" * "dirref_bad" : dirref to valref_ref - * "valref_multi_overflow" : [ valref_long_ref, valref_long_ref ] * "dirref_multi" : dirref to [ dirref_ref, dirref_ref ] */ blobref_hash ("sha1", "abcd", 4, valref_ref, sizeof (href_t)); cache_insert (cache, valref_ref, create_cache_entry_raw (strdup ("abcd"), 4)); - /* achu: Note that I am abusing internal knowledge that cache - * entries blindly store pointers and lengths. Obviously the - * "real" length of the buffer below is length 4, but I'm storing - * a huge number. - */ - blobref_hash ("sha1", "long", 4, valref_long_ref, sizeof (href_t)); - cache_insert (cache, valref_long_ref, - create_cache_entry_raw (strdup ("long"), INT_MAX - 32)); - dirref = treeobj_create_dir (); treeobj_insert_entry (dirref, "val", treeobj_create_val ("bar", 3)); treeobj_hash ("sha1", dirref, dirref_ref, sizeof (href_t)); @@ -790,10 +797,6 @@ void lookup_errors (void) { treeobj_insert_entry (root, "dir", dir); treeobj_insert_entry (root, "dirref_bad", treeobj_create_dirref (valref_ref)); - valref_multi_overflow = treeobj_create_valref (valref_long_ref); - treeobj_append_blobref (valref_multi_overflow, valref_long_ref); - treeobj_insert_entry (root, "valref_multi_overflow", valref_multi_overflow); - dirref_multi = treeobj_create_dirref (dirref_ref); treeobj_append_blobref (dirref_multi, dirref_ref); @@ -982,19 +985,6 @@ void lookup_errors (void) { "lookup_create on dirref_bad, in middle of path"); check (lh, ENOTRECOVERABLE, NULL, "lookup dirref_bad, in middle of path"); - /* Lookup a valref multiple blobref that points to buffers that will - * over int, should get EOVERFLOW. - */ - ok ((lh = lookup_create (cache, - 1, - root_ref, - root_ref, - "valref_multi_overflow", - NULL, - 0)) != NULL, - "lookup_create on valref_multi_overflow"); - check (lh, EOVERFLOW, NULL, "lookup valref_multi_overflow"); - /* Lookup with an invalid root_ref, should get EINVAL */ ok ((lh = lookup_create (cache, 1,