From dad5b37ba10c5930eccb53e6f1a789c834091eee Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Mon, 25 Sep 2017 13:55:27 -0700 Subject: [PATCH 1/9] modules/kvs/cache: Refactor internal structure Change internal json_t *o field to void *data, so the internally cached data is not type specific. --- src/modules/kvs/cache.c | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/src/modules/kvs/cache.c b/src/modules/kvs/cache.c index daaebbfd8323..1b1bb6b15b83 100644 --- a/src/modules/kvs/cache.c +++ b/src/modules/kvs/cache.c @@ -51,7 +51,7 @@ struct cache_entry { waitqueue_t *waitlist_notdirty; waitqueue_t *waitlist_valid; - json_t *o; /* value object */ + void *data; /* value object/data */ int lastuse_epoch; /* time of last use for cache expiry */ uint8_t dirty:1; }; @@ -68,23 +68,23 @@ struct cache_entry *cache_entry_create (json_t *o) return NULL; } if (o) - hp->o = o; + hp->data = o; return hp; } bool cache_entry_get_valid (struct cache_entry *hp) { - return (hp && hp->o != NULL); + return (hp && hp->data != NULL); } bool cache_entry_get_dirty (struct cache_entry *hp) { - return (hp && hp->o && hp->dirty); + return (hp && hp->data && hp->dirty); } int cache_entry_set_dirty (struct cache_entry *hp, bool val) { - if (hp && hp->o) { + if (hp && hp->data) { if ((val && hp->dirty) || (!val && !hp->dirty)) ; /* no-op */ else if (val && !hp->dirty) @@ -106,7 +106,7 @@ int cache_entry_set_dirty (struct cache_entry *hp, bool val) int cache_entry_clear_dirty (struct cache_entry *hp) { - if (hp && hp->o) { + if (hp && hp->data) { if (hp->dirty && (!hp->waitlist_notdirty || !wait_queue_length (hp->waitlist_notdirty))) @@ -118,7 +118,7 @@ int cache_entry_clear_dirty (struct cache_entry *hp) int cache_entry_force_clear_dirty (struct cache_entry *hp) { - if (hp && hp->o) { + if (hp && hp->data) { if (hp->dirty) { if (hp->waitlist_notdirty) { wait_queue_destroy (hp->waitlist_notdirty); @@ -133,28 +133,28 @@ int cache_entry_force_clear_dirty (struct cache_entry *hp) json_t *cache_entry_get_json (struct cache_entry *hp) { - if (!hp || !hp->o) + if (!hp || !hp->data) return NULL; - return hp->o; + return hp->data; } int cache_entry_set_json (struct cache_entry *hp, json_t *o) { if (hp) { - if ((o && hp->o) || (!o && !hp->o)) { - json_decref (o); /* no-op, 'o' is assumed identical to hp->o */ - } else if (o && !hp->o) { - hp->o = o; + if ((o && hp->data) || (!o && !hp->data)) { + json_decref (o); /* no-op, 'o' is assumed identical to hp->data */ + } else if (o && !hp->data) { + hp->data = o; if (hp->waitlist_valid) { if (wait_runqueue (hp->waitlist_valid) < 0) { /* set back to orig */ - hp->o = NULL; + hp->data = NULL; return -1; } } - } else if (!o && hp->o) { - json_decref (hp->o); - hp->o = NULL; + } else if (!o && hp->data) { + json_decref (hp->data); + hp->data = NULL; } return 0; } @@ -165,8 +165,8 @@ void cache_entry_destroy (void *arg) { struct cache_entry *hp = arg; if (hp) { - if (hp->o) - json_decref (hp->o); + if (hp->data) + json_decref (hp->data); if (hp->waitlist_notdirty) wait_queue_destroy (hp->waitlist_notdirty); if (hp->waitlist_valid) @@ -300,7 +300,7 @@ int cache_get_stats (struct cache *cache, tstat_t *ts, int *sizep, hp = zhash_lookup (cache->zh, ref); if (cache_entry_get_valid (hp)) { /* must pass JSON_ENCODE_ANY, object could be anything */ - char *s = json_dumps (hp->o, JSON_ENCODE_ANY); + char *s = json_dumps (hp->data, JSON_ENCODE_ANY); int obj_size; if (!s) { saved_errno = ENOMEM; From 1ed68cc08934254594a52a8fe480d599e75a654e Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Fri, 22 Sep 2017 11:37:45 -0700 Subject: [PATCH 2/9] modules/kvs/cache: Refactor cache_entry_create Convert cache_entry_create() to not accept any parameters, it now only creates an empty cache entry. Create new function cache_entry_create_json(), that takes an optional json object to hold the cache entry data. If the user passes in NULL, it behaves identically to calling cache_entry_create(). Adjust callers of cache_entry_create() appropriately throughout. Add unit tests and coverage appropriately. --- src/modules/kvs/cache.c | 10 ++++- src/modules/kvs/cache.h | 6 ++- src/modules/kvs/commit.c | 2 +- src/modules/kvs/kvs.c | 9 +++-- src/modules/kvs/test/cache.c | 71 ++++++++++++++++++++++++++++------- src/modules/kvs/test/commit.c | 54 +++++++++++++------------- src/modules/kvs/test/lookup.c | 42 ++++++++++----------- 7 files changed, 125 insertions(+), 69 deletions(-) diff --git a/src/modules/kvs/cache.c b/src/modules/kvs/cache.c index 1b1bb6b15b83..dce7ecc373ab 100644 --- a/src/modules/kvs/cache.c +++ b/src/modules/kvs/cache.c @@ -60,13 +60,21 @@ struct cache { zhash_t *zh; }; -struct cache_entry *cache_entry_create (json_t *o) +struct cache_entry *cache_entry_create (void) { struct cache_entry *hp = calloc (1, sizeof (*hp)); if (!hp) { errno = ENOMEM; return NULL; } + return hp; +} + +struct cache_entry *cache_entry_create_json (json_t *o) +{ + struct cache_entry *hp = cache_entry_create (); + if (!hp) + return NULL; if (o) hp->data = o; return hp; diff --git a/src/modules/kvs/cache.h b/src/modules/kvs/cache.h index 389f2ccf9497..e06dfa920657 100644 --- a/src/modules/kvs/cache.h +++ b/src/modules/kvs/cache.h @@ -11,9 +11,11 @@ struct cache; /* Create/destroy cache entry. - * If non-NULL, create transfers ownership of 'o' to the cache entry. + * In cache_entry_create_json(), create transfers ownership of 'o' to + * the cache entry. If 'o' is NULL, calls cache_entry_create(). */ -struct cache_entry *cache_entry_create (json_t *o); +struct cache_entry *cache_entry_create (void); +struct cache_entry *cache_entry_create_json (json_t *o); void cache_entry_destroy (void *arg); /* Return true if cache entry contains valid json. diff --git a/src/modules/kvs/commit.c b/src/modules/kvs/commit.c index cbe0c05398ca..fa50ba20eaff 100644 --- a/src/modules/kvs/commit.c +++ b/src/modules/kvs/commit.c @@ -197,7 +197,7 @@ static int store_cache (commit_t *c, int current_epoch, json_t *o, goto done; } if (!(hp = cache_lookup (c->cm->cache, ref, current_epoch))) { - if (!(hp = cache_entry_create (NULL))) { + if (!(hp = cache_entry_create ())) { saved_errno = ENOMEM; goto done; } diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index ddb013363574..4fd89e92a590 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -252,7 +252,7 @@ static int load (kvs_ctx_t *ctx, const href_t ref, wait_t *wait, bool *stall) /* Create an incomplete hash entry if none found. */ if (!hp) { - if (!(hp = cache_entry_create (NULL))) { + if (!(hp = cache_entry_create ())) { flux_log_error (ctx->h, "%s: cache_entry_create", __FUNCTION__); return -1; } @@ -1349,8 +1349,9 @@ static void setroot_event_cb (flux_t *h, flux_msg_handler_t *w, * no consistency issue by not caching. We will still * set new root below via setroot(). */ - if (!(hp = cache_entry_create (json_incref (root)))) { - flux_log_error (ctx->h, "%s: cache_entry_create", __FUNCTION__); + if (!(hp = cache_entry_create_json (json_incref (root)))) { + flux_log_error (ctx->h, "%s: cache_entry_create_json", + __FUNCTION__); json_decref (root); return; } @@ -1570,7 +1571,7 @@ static int store_initial_rootdir (kvs_ctx_t *ctx, json_t *o, href_t ref) goto decref_done; } if (!(hp = cache_lookup (ctx->cache, ref, ctx->epoch))) { - if (!(hp = cache_entry_create (NULL))) { + if (!(hp = cache_entry_create ())) { saved_errno = errno; flux_log_error (ctx->h, "%s: cache_entry_create", __FUNCTION__); goto decref_done; diff --git a/src/modules/kvs/test/cache.c b/src/modules/kvs/test/cache.c index 144da1bbe8bb..513f1b71e962 100644 --- a/src/modules/kvs/test/cache.c +++ b/src/modules/kvs/test/cache.c @@ -53,9 +53,9 @@ void cache_entry_tests (void) * N.B.: json ref is NOT incremented by create or get_json. */ - /* test empty cache entry */ + /* test empty cache entry created by cache_entry_create() */ - ok ((e = cache_entry_create (NULL)) != NULL, + ok ((e = cache_entry_create ()) != NULL, "cache_entry_create works"); ok (cache_entry_get_valid (e) == false, "cache entry initially non-valid"); @@ -68,21 +68,65 @@ void cache_entry_tests (void) cache_entry_destroy (e); e = NULL; + /* test empty cache entry created by cache_entry_create_json() */ + + ok ((e = cache_entry_create_json (NULL)) != NULL, + "cache_entry_create_json w/ NULL input works"); + ok (cache_entry_get_valid (e) == false, + "cache entry initially non-valid"); + ok (cache_entry_get_dirty (e) == false, + "cache entry initially not dirty"); + ok (cache_entry_set_dirty (e, true) < 0, + "cache_entry_set_dirty fails b/c entry non-valid"); + ok (cache_entry_get_dirty (e) == false, + "cache entry does not set dirty, b/c no data"); + ok ((otmp = cache_entry_get_json (e)) == NULL, + "cache_entry_get_json returns NULL, no json set"); + cache_entry_destroy (e); + e = NULL; + + /* test empty cache entry created by cache_entry_create_json(), + * later filled with data. + */ + + o1 = json_object (); + json_object_set_new (o1, "foo", json_integer (42)); + + ok ((e = cache_entry_create_json (NULL)) != NULL, + "cache_entry_create_json w/ NULL input works"); + ok (cache_entry_get_valid (e) == false, + "cache entry initially non-valid"); + ok (cache_entry_get_dirty (e) == false, + "cache entry initially not dirty"); + ok (cache_entry_set_dirty (e, true) < 0, + "cache_entry_set_dirty fails b/c entry non-valid"); + ok (cache_entry_get_dirty (e) == false, + "cache entry does not set dirty, b/c no data"); + ok ((otmp = cache_entry_get_json (e)) == NULL, + "cache_entry_get_json returns NULL, no json set"); + ok (cache_entry_set_json (e, o1) == 0, + "cache_entry_set_json success"); + ok (cache_entry_get_valid (e) == true, + "cache entry now valid after cache_entry_set_json call"); + cache_entry_destroy (e); /* destroys o1 */ + e = NULL; + /* test cache entry filled with json initially */ o1 = json_object (); json_object_set_new (o1, "foo", json_integer (42)); - ok ((e = cache_entry_create (o1)) != NULL, - "cache_entry_create works"); + + ok ((e = cache_entry_create_json (o1)) != NULL, + "cache_entry_create_json w/ non-NULL input works"); ok (cache_entry_get_valid (e) == true, "cache entry initially valid"); ok (cache_entry_get_dirty (e) == false, "cache entry initially not dirty"); - ok (cache_entry_set_dirty (e, true) == 0, "cache_entry_set_dirty success"); ok (cache_entry_get_dirty (e) == true, "cache entry succcessfully set dirty"); + ok (cache_entry_clear_dirty (e) == 0, "cache_entry_clear_dirty returns 0, b/c no waiters"); ok (cache_entry_get_dirty (e) == false, @@ -110,6 +154,7 @@ void cache_entry_tests (void) "cache entry no longer has json object"); cache_entry_destroy (e); /* destroys o1 */ + e = NULL; } void waiter_tests (void) @@ -125,7 +170,7 @@ void waiter_tests (void) count = 0; ok ((w = wait_create (wait_cb, &count)) != NULL, "wait_create works"); - ok ((e = cache_entry_create (NULL)) != NULL, + ok ((e = cache_entry_create ()) != NULL, "cache_entry_create created empty object"); ok (cache_entry_get_valid (e) == false, "cache entry invalid, adding waiter"); @@ -192,7 +237,7 @@ void cache_remove_entry_tests (void) ok ((cache = cache_create ()) != NULL, "cache_create works"); - ok ((e = cache_entry_create (NULL)) != NULL, + ok ((e = cache_entry_create ()) != NULL, "cache_entry_create works"); cache_insert (cache, "remove-ref", e); ok (cache_lookup (cache, "remove-ref", 0) != NULL, @@ -207,7 +252,7 @@ void cache_remove_entry_tests (void) count = 0; ok ((w = wait_create (wait_cb, &count)) != NULL, "wait_create works"); - ok ((e = cache_entry_create (NULL)) != NULL, + ok ((e = cache_entry_create ()) != NULL, "cache_entry_create created empty object"); cache_insert (cache, "remove-ref", e); ok (cache_lookup (cache, "remove-ref", 0) != NULL, @@ -234,8 +279,8 @@ void cache_remove_entry_tests (void) ok ((w = wait_create (wait_cb, &count)) != NULL, "wait_create works"); o = json_string ("foobar"); - ok ((e = cache_entry_create (o)) != NULL, - "cache_entry_create created empty object"); + ok ((e = cache_entry_create_json (o)) != NULL, + "cache_entry_create_json created entry"); cache_insert (cache, "remove-ref", e); ok (cache_lookup (cache, "remove-ref", 0) != NULL, "cache_lookup verify entry exists"); @@ -278,7 +323,7 @@ void cache_expiration_tests (void) "cache contains 0 entries"); /* first test w/ entry w/o json object */ - ok ((e1 = cache_entry_create (NULL)) != NULL, + ok ((e1 = cache_entry_create ()) != NULL, "cache_entry_create works"); cache_insert (cache, "xxx1", e1); ok (cache_count_entries (cache) == 1, @@ -314,8 +359,8 @@ void cache_expiration_tests (void) /* second test w/ entry with json object */ o1 = json_object (); json_object_set_new (o1, "foo", json_integer (42)); - ok ((e3 = cache_entry_create (o1)) != NULL, - "cache_entry_create works"); + ok ((e3 = cache_entry_create_json (o1)) != NULL, + "cache_entry_create_json works"); cache_insert (cache, "xxx2", e3); ok (cache_count_entries (cache) == 2, "cache contains 2 entries after insert"); diff --git a/src/modules/kvs/test/commit.c b/src/modules/kvs/test/commit.c index 400dd05ff279..5f4547f5d4da 100644 --- a/src/modules/kvs/test/commit.c +++ b/src/modules/kvs/test/commit.c @@ -55,8 +55,8 @@ struct cache *create_cache_with_empty_rootdir (href_t ref) "cache_create works"); ok (kvs_util_json_hash ("sha1", rootdir, ref) == 0, "kvs_util_json_hash worked"); - ok ((hp = cache_entry_create (rootdir)) != NULL, - "cache_entry_create works"); + ok ((hp = cache_entry_create_json (rootdir)) != NULL, + "cache_entry_create_json works"); cache_insert (cache, ref, hp); return cache; } @@ -606,7 +606,7 @@ void commit_basic_root_not_dir (void) ok (kvs_util_json_hash ("sha1", root, root_ref) == 0, "kvs_util_json_hash worked"); - cache_insert (cache, root_ref, cache_entry_create (root)); + cache_insert (cache, root_ref, cache_entry_create_json (root)); ok ((cm = commit_mgr_create (cache, "sha1", NULL, &test_global)) != NULL, "commit_mgr_create works"); @@ -647,8 +647,8 @@ int rootref_cb (commit_t *c, const char *ref, void *data) ok ((rootdir = treeobj_create_dir ()) != NULL, "treeobj_create_dir works"); - ok ((hp = cache_entry_create (rootdir)) != NULL, - "cache_entry_create works"); + ok ((hp = cache_entry_create_json (rootdir)) != NULL, + "cache_entry_create_json works"); cache_insert (rd->cache, ref, hp); @@ -733,8 +733,8 @@ int missingref_cb (commit_t *c, const char *ref, void *data) ok (strcmp (ref, md->dir_ref) == 0, "missing reference is what we expect it to be"); - ok ((hp = cache_entry_create (md->dir)) != NULL, - "cache_entry_create works"); + ok ((hp = cache_entry_create_json (md->dir)) != NULL, + "cache_entry_create_json works"); cache_insert (md->cache, ref, hp); @@ -779,7 +779,7 @@ void commit_process_missing_ref (void) { ok (kvs_util_json_hash ("sha1", root, root_ref) == 0, "kvs_util_json_hash worked"); - cache_insert (cache, root_ref, cache_entry_create (root)); + cache_insert (cache, root_ref, cache_entry_create_json (root)); ok ((cm = commit_mgr_create (cache, "sha1", NULL, &test_global)) != NULL, "commit_mgr_create works"); @@ -872,7 +872,7 @@ void commit_process_error_callbacks (void) { ok (kvs_util_json_hash ("sha1", root, root_ref) == 0, "kvs_util_json_hash worked"); - cache_insert (cache, root_ref, cache_entry_create (root)); + cache_insert (cache, root_ref, cache_entry_create_json (root)); ok ((cm = commit_mgr_create (cache, "sha1", NULL, &test_global)) != NULL, "commit_mgr_create works"); @@ -890,7 +890,7 @@ void commit_process_error_callbacks (void) { /* insert cache entry now, want don't want missing refs on next * commit_process call */ - cache_insert (cache, dir_ref, cache_entry_create (dir)); + cache_insert (cache, dir_ref, cache_entry_create_json (dir)); ok (commit_process (c, 1, root_ref) == COMMIT_PROCESS_DIRTY_CACHE_ENTRIES, "commit_process returns COMMIT_PROCESS_DIRTY_CACHE_ENTRIES"); @@ -946,7 +946,7 @@ void commit_process_error_callbacks_partway (void) { ok (kvs_util_json_hash ("sha1", dir, dir_ref) == 0, "kvs_util_json_hash worked"); - cache_insert (cache, dir_ref, cache_entry_create (dir)); + cache_insert (cache, dir_ref, cache_entry_create_json (dir)); root = treeobj_create_dir (); treeobj_insert_entry (root, "dir", treeobj_create_dirref (dir_ref)); @@ -954,7 +954,7 @@ void commit_process_error_callbacks_partway (void) { ok (kvs_util_json_hash ("sha1", root, root_ref) == 0, "kvs_util_json_hash worked"); - cache_insert (cache, root_ref, cache_entry_create (root)); + cache_insert (cache, root_ref, cache_entry_create_json (root)); ok ((cm = commit_mgr_create (cache, "sha1", NULL, &test_global)) != NULL, "commit_mgr_create works"); @@ -1000,7 +1000,7 @@ void commit_process_invalid_operation (void) { ok (kvs_util_json_hash ("sha1", root, root_ref) == 0, "kvs_util_json_hash worked"); - cache_insert (cache, root_ref, cache_entry_create (root)); + cache_insert (cache, root_ref, cache_entry_create_json (root)); ok ((cm = commit_mgr_create (cache, "sha1", NULL, &test_global)) != NULL, "commit_mgr_create works"); @@ -1040,7 +1040,7 @@ void commit_process_invalid_hash (void) { ok (kvs_util_json_hash ("sha1", root, root_ref) == 0, "kvs_util_json_hash worked"); - cache_insert (cache, root_ref, cache_entry_create (root)); + cache_insert (cache, root_ref, cache_entry_create_json (root)); ok ((cm = commit_mgr_create (cache, "foobar", NULL, &test_global)) != NULL, "commit_mgr_create works"); @@ -1094,7 +1094,7 @@ void commit_process_follow_link (void) { ok (kvs_util_json_hash ("sha1", dir, dir_ref) == 0, "kvs_util_json_hash worked"); - cache_insert (cache, dir_ref, cache_entry_create (dir)); + cache_insert (cache, dir_ref, cache_entry_create_json (dir)); root = treeobj_create_dir (); treeobj_insert_entry (root, "dir", treeobj_create_dirref (dir_ref)); @@ -1103,7 +1103,7 @@ void commit_process_follow_link (void) { ok (kvs_util_json_hash ("sha1", root, root_ref) == 0, "kvs_util_json_hash worked"); - cache_insert (cache, root_ref, cache_entry_create (root)); + cache_insert (cache, root_ref, cache_entry_create_json (root)); ok ((cm = commit_mgr_create (cache, "sha1", NULL, &test_global)) != NULL, @@ -1160,7 +1160,7 @@ void commit_process_dirval_test (void) { ok (kvs_util_json_hash ("sha1", root, root_ref) == 0, "kvs_util_json_hash worked"); - cache_insert (cache, root_ref, cache_entry_create (root)); + cache_insert (cache, root_ref, cache_entry_create_json (root)); ok ((cm = commit_mgr_create (cache, "sha1", NULL, &test_global)) != NULL, "commit_mgr_create works"); @@ -1217,7 +1217,7 @@ void commit_process_delete_test (void) { ok (kvs_util_json_hash ("sha1", dir, dir_ref) == 0, "kvs_util_json_hash worked"); - cache_insert (cache, dir_ref, cache_entry_create (dir)); + cache_insert (cache, dir_ref, cache_entry_create_json (dir)); root = treeobj_create_dir (); treeobj_insert_entry (root, "dir", treeobj_create_dirref (dir_ref)); @@ -1225,7 +1225,7 @@ void commit_process_delete_test (void) { ok (kvs_util_json_hash ("sha1", root, root_ref) == 0, "kvs_util_json_hash worked"); - cache_insert (cache, root_ref, cache_entry_create (root)); + cache_insert (cache, root_ref, cache_entry_create_json (root)); ok ((cm = commit_mgr_create (cache, "sha1", NULL, &test_global)) != NULL, "commit_mgr_create works"); @@ -1271,7 +1271,7 @@ void commit_process_delete_nosubdir_test (void) { ok (kvs_util_json_hash ("sha1", root, root_ref) == 0, "kvs_util_json_hash worked"); - cache_insert (cache, root_ref, cache_entry_create (root)); + cache_insert (cache, root_ref, cache_entry_create_json (root)); ok ((cm = commit_mgr_create (cache, "sha1", NULL, &test_global)) != NULL, "commit_mgr_create works"); @@ -1324,7 +1324,7 @@ void commit_process_delete_filevalinpath_test (void) { ok (kvs_util_json_hash ("sha1", dir, dir_ref) == 0, "kvs_util_json_hash worked"); - cache_insert (cache, dir_ref, cache_entry_create (dir)); + cache_insert (cache, dir_ref, cache_entry_create_json (dir)); root = treeobj_create_dir (); treeobj_insert_entry (root, "dir", treeobj_create_dirref (dir_ref)); @@ -1332,7 +1332,7 @@ void commit_process_delete_filevalinpath_test (void) { ok (kvs_util_json_hash ("sha1", root, root_ref) == 0, "kvs_util_json_hash worked"); - cache_insert (cache, root_ref, cache_entry_create (root)); + cache_insert (cache, root_ref, cache_entry_create_json (root)); ok ((cm = commit_mgr_create (cache, "sha1", NULL, &test_global)) != NULL, "commit_mgr_create works"); @@ -1385,7 +1385,7 @@ void commit_process_bad_dirrefs (void) { ok (kvs_util_json_hash ("sha1", dir, dir_ref) == 0, "kvs_util_json_hash worked"); - cache_insert (cache, dir_ref, cache_entry_create (dir)); + cache_insert (cache, dir_ref, cache_entry_create_json (dir)); dirref = treeobj_create_dirref (dir_ref); treeobj_append_blobref (dirref, dir_ref); @@ -1396,7 +1396,7 @@ void commit_process_bad_dirrefs (void) { ok (kvs_util_json_hash ("sha1", root, root_ref) == 0, "kvs_util_json_hash worked"); - cache_insert (cache, root_ref, cache_entry_create (root)); + cache_insert (cache, root_ref, cache_entry_create_json (root)); ok ((cm = commit_mgr_create (cache, "sha1", NULL, &test_global)) != NULL, "commit_mgr_create works"); @@ -1446,7 +1446,7 @@ void commit_process_big_fileval (void) { ok (kvs_util_json_hash ("sha1", root, root_ref) == 0, "kvs_util_json_hash worked"); - cache_insert (cache, root_ref, cache_entry_create (root)); + cache_insert (cache, root_ref, cache_entry_create_json (root)); ok ((cm = commit_mgr_create (cache, "sha1", NULL, &test_global)) != NULL, "commit_mgr_create works"); @@ -1544,7 +1544,7 @@ void commit_process_giant_dir (void) { ok (kvs_util_json_hash ("sha1", dir, dir_ref) == 0, "kvs_util_json_hash worked"); - cache_insert (cache, dir_ref, cache_entry_create (dir)); + cache_insert (cache, dir_ref, cache_entry_create_json (dir)); root = treeobj_create_dir (); treeobj_insert_entry (dir, "dir", treeobj_create_dirref (dir_ref)); @@ -1552,7 +1552,7 @@ void commit_process_giant_dir (void) { ok (kvs_util_json_hash ("sha1", root, root_ref) == 0, "kvs_util_json_hash worked"); - cache_insert (cache, root_ref, cache_entry_create (root)); + cache_insert (cache, root_ref, cache_entry_create_json (root)); ok ((cm = commit_mgr_create (cache, "sha1", NULL, &test_global)) != NULL, "commit_mgr_create works"); diff --git a/src/modules/kvs/test/lookup.c b/src/modules/kvs/test/lookup.c index 6340efd1fabc..d191810af1f4 100644 --- a/src/modules/kvs/test/lookup.c +++ b/src/modules/kvs/test/lookup.c @@ -309,11 +309,11 @@ void lookup_root (void) { opaque_data = get_json_base64_string ("abcd"); kvs_util_json_hash ("sha1", opaque_data, valref_ref); - cache_insert (cache, valref_ref, cache_entry_create (opaque_data)); + cache_insert (cache, valref_ref, cache_entry_create_json (opaque_data)); root = treeobj_create_dir (); kvs_util_json_hash ("sha1", root, root_ref); - cache_insert (cache, root_ref, cache_entry_create (root)); + cache_insert (cache, root_ref, cache_entry_create_json (root)); /* flags = 0, should error EISDIR */ ok ((lh = lookup_create (cache, @@ -397,7 +397,7 @@ void lookup_basic (void) { opaque_data = get_json_base64_string ("abcd"); kvs_util_json_hash ("sha1", opaque_data, valref_ref); - cache_insert (cache, valref_ref, cache_entry_create (opaque_data)); + cache_insert (cache, valref_ref, cache_entry_create_json (opaque_data)); dir = treeobj_create_dir (); treeobj_insert_entry (dir, "val", treeobj_create_val ("bar", 3)); @@ -409,13 +409,13 @@ void lookup_basic (void) { treeobj_insert_entry (dirref, "symlink", treeobj_create_symlink ("baz")); kvs_util_json_hash ("sha1", dirref, dirref_ref); - cache_insert (cache, dirref_ref, cache_entry_create (dirref)); + cache_insert (cache, dirref_ref, cache_entry_create_json (dirref)); root = treeobj_create_dir (); treeobj_insert_entry (root, "dirref", treeobj_create_dirref (dirref_ref)); kvs_util_json_hash ("sha1", root, root_ref); - cache_insert (cache, root_ref, cache_entry_create (root)); + cache_insert (cache, root_ref, cache_entry_create_json (root)); /* lookup dir via dirref */ ok ((lh = lookup_create (cache, @@ -585,12 +585,12 @@ void lookup_errors (void) { opaque_data = get_json_base64_string ("abcd"); kvs_util_json_hash ("sha1", opaque_data, valref_ref); - cache_insert (cache, valref_ref, cache_entry_create (opaque_data)); + cache_insert (cache, valref_ref, cache_entry_create_json (opaque_data)); dirref = treeobj_create_dir (); treeobj_insert_entry (dirref, "val", treeobj_create_val ("bar", 3)); kvs_util_json_hash ("sha1", dirref, dirref_ref); - cache_insert (cache, dirref_ref, cache_entry_create (dirref)); + cache_insert (cache, dirref_ref, cache_entry_create_json (dirref)); dir = treeobj_create_dir (); treeobj_insert_entry (dir, "val", treeobj_create_val ("baz", 3)); @@ -616,7 +616,7 @@ void lookup_errors (void) { treeobj_insert_entry (root, "valref_multi", valref_multi); kvs_util_json_hash ("sha1", root, root_ref); - cache_insert (cache, root_ref, cache_entry_create (root)); + cache_insert (cache, root_ref, cache_entry_create_json (root)); /* Lookup non-existent field. Not ENOENT - caller of lookup * decides what to do with entry not found */ @@ -918,12 +918,12 @@ void lookup_links (void) { opaque_data = get_json_base64_string ("abcd"); kvs_util_json_hash ("sha1", opaque_data, valref_ref); - cache_insert (cache, valref_ref, cache_entry_create (opaque_data)); + cache_insert (cache, valref_ref, cache_entry_create_json (opaque_data)); dirref3 = treeobj_create_dir (); treeobj_insert_entry (dirref3, "val", treeobj_create_val ("baz", 3)); kvs_util_json_hash ("sha1", dirref3, dirref3_ref); - cache_insert (cache, dirref3_ref, cache_entry_create (dirref3)); + cache_insert (cache, dirref3_ref, cache_entry_create_json (dirref3)); dir = treeobj_create_dir (); treeobj_insert_entry (dir, "val", treeobj_create_val ("bar", 3)); @@ -935,7 +935,7 @@ void lookup_links (void) { treeobj_insert_entry (dirref2, "dirref", treeobj_create_dirref (dirref3_ref)); treeobj_insert_entry (dirref2, "symlink", treeobj_create_symlink ("dirref2.val")); kvs_util_json_hash ("sha1", dirref2, dirref2_ref); - cache_insert (cache, dirref2_ref, cache_entry_create (dirref2)); + cache_insert (cache, dirref2_ref, cache_entry_create_json (dirref2)); dirref1 = treeobj_create_dir (); treeobj_insert_entry (dirref1, "link2dirref", treeobj_create_symlink ("dirref2")); @@ -944,13 +944,13 @@ void lookup_links (void) { treeobj_insert_entry (dirref1, "link2dir", treeobj_create_symlink ("dirref2.dir")); treeobj_insert_entry (dirref1, "link2symlink", treeobj_create_symlink ("dirref2.symlink")); kvs_util_json_hash ("sha1", dirref1, dirref1_ref); - cache_insert (cache, dirref1_ref, cache_entry_create (dirref1)); + cache_insert (cache, dirref1_ref, cache_entry_create_json (dirref1)); root = treeobj_create_dir (); treeobj_insert_entry (root, "dirref1", treeobj_create_dirref (dirref1_ref)); treeobj_insert_entry (root, "dirref2", treeobj_create_dirref (dirref2_ref)); kvs_util_json_hash ("sha1", root, root_ref); - cache_insert (cache, root_ref, cache_entry_create (root)); + cache_insert (cache, root_ref, cache_entry_create_json (root)); /* lookup val, follow two links */ ok ((lh = lookup_create (cache, @@ -1120,18 +1120,18 @@ void lookup_alt_root (void) { dirref1 = treeobj_create_dir (); treeobj_insert_entry (dirref1, "val", treeobj_create_val ("foo", 3)); kvs_util_json_hash ("sha1", dirref1, dirref1_ref); - cache_insert (cache, dirref1_ref, cache_entry_create (dirref1)); + cache_insert (cache, dirref1_ref, cache_entry_create_json (dirref1)); dirref2 = treeobj_create_dir (); treeobj_insert_entry (dirref2, "val", treeobj_create_val ("bar", 3)); kvs_util_json_hash ("sha1", dirref2, dirref2_ref); - cache_insert (cache, dirref2_ref, cache_entry_create (dirref2)); + cache_insert (cache, dirref2_ref, cache_entry_create_json (dirref2)); root = treeobj_create_dir (); treeobj_insert_entry (root, "dirref1", treeobj_create_dirref (dirref1_ref)); treeobj_insert_entry (root, "dirref2", treeobj_create_dirref (dirref2_ref)); kvs_util_json_hash ("sha1", root, root_ref); - cache_insert (cache, root_ref, cache_entry_create (root)); + cache_insert (cache, root_ref, cache_entry_create_json (root)); /* lookup val, alt root-ref dirref1_ref */ ok ((lh = lookup_create (cache, @@ -1195,7 +1195,7 @@ void lookup_stall_root (void) { "lookup_create stalltest \".\""); check_stall (lh, EAGAIN, root_ref, "root \".\" stall"); - cache_insert (cache, root_ref, cache_entry_create (root)); + cache_insert (cache, root_ref, cache_entry_create_json (root)); /* lookup root ".", should succeed */ check (lh, 0, root, "root \".\" #1"); @@ -1281,12 +1281,12 @@ void lookup_stall (void) { "lookup_create stalltest dirref1.val"); check_stall (lh, EAGAIN, root_ref, "dirref1.val stall #1"); - cache_insert (cache, root_ref, cache_entry_create (root)); + cache_insert (cache, root_ref, cache_entry_create_json (root)); /* next call to lookup, should stall */ check_stall (lh, EAGAIN, dirref1_ref, "dirref1.val stall #2"); - cache_insert (cache, dirref1_ref, cache_entry_create (dirref1)); + cache_insert (cache, dirref1_ref, cache_entry_create_json (dirref1)); /* final call to lookup, should succeed */ test = treeobj_create_val ("foo", 3); @@ -1317,7 +1317,7 @@ void lookup_stall (void) { "lookup_create stalltest symlink.val"); check_stall (lh, EAGAIN, dirref2_ref, "symlink.val stall"); - cache_insert (cache, dirref2_ref, cache_entry_create (dirref2)); + cache_insert (cache, dirref2_ref, cache_entry_create_json (dirref2)); /* lookup symlink.val, should succeed */ test = treeobj_create_val ("bar", 3); @@ -1348,7 +1348,7 @@ void lookup_stall (void) { "lookup_create stalltest dirref1.valref"); check_stall (lh, EAGAIN, valref_ref, "dirref1.valref stall"); - cache_insert (cache, valref_ref, cache_entry_create (opaque_data)); + cache_insert (cache, valref_ref, cache_entry_create_json (opaque_data)); /* lookup dirref1.valref, should succeed */ test = treeobj_create_val ("abcd", 4); From 4a0c7e7d20d395b6adee05d558afdeaad6d969f0 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Mon, 25 Sep 2017 15:30:53 -0700 Subject: [PATCH 3/9] modules/kvs/cache: Support cache data type Support type field in cache entries, so different types of data can be stored in the cache entry. Add new functions cache_entry_type() and cache_entry_is_type_json() to retrieve/test type. Add unit tests appropriately. --- src/modules/kvs/cache.c | 31 ++++++++++++++++++++++++---- src/modules/kvs/cache.h | 34 ++++++++++++++++++++++++++---- src/modules/kvs/test/cache.c | 40 ++++++++++++++++++++++++++++++++---- 3 files changed, 93 insertions(+), 12 deletions(-) diff --git a/src/modules/kvs/cache.c b/src/modules/kvs/cache.c index dce7ecc373ab..f462e064a59c 100644 --- a/src/modules/kvs/cache.c +++ b/src/modules/kvs/cache.c @@ -52,6 +52,7 @@ struct cache_entry { waitqueue_t *waitlist_notdirty; waitqueue_t *waitlist_valid; void *data; /* value object/data */ + cache_data_type_t type; /* what does data point to */ int lastuse_epoch; /* time of last use for cache expiry */ uint8_t dirty:1; }; @@ -67,6 +68,7 @@ struct cache_entry *cache_entry_create (void) errno = ENOMEM; return NULL; } + hp->type = CACHE_DATA_TYPE_NONE; return hp; } @@ -77,9 +79,25 @@ struct cache_entry *cache_entry_create_json (json_t *o) return NULL; if (o) hp->data = o; + hp->type = CACHE_DATA_TYPE_JSON; return hp; } +int cache_entry_type (struct cache_entry *hp, cache_data_type_t *t) +{ + if (hp) { + if (t) + (*t) = hp->type; + return 0; + } + return -1; +} + +bool cache_entry_is_type_json (struct cache_entry *hp) +{ + return (hp && hp->type == CACHE_DATA_TYPE_JSON); +} + bool cache_entry_get_valid (struct cache_entry *hp) { return (hp && hp->data != NULL); @@ -141,14 +159,16 @@ int cache_entry_force_clear_dirty (struct cache_entry *hp) json_t *cache_entry_get_json (struct cache_entry *hp) { - if (!hp || !hp->data) + if (!hp || !hp->data || hp->type != CACHE_DATA_TYPE_JSON) return NULL; return hp->data; } int cache_entry_set_json (struct cache_entry *hp, json_t *o) { - if (hp) { + if (hp + && (hp->type == CACHE_DATA_TYPE_NONE + || hp->type == CACHE_DATA_TYPE_JSON)) { if ((o && hp->data) || (!o && !hp->data)) { json_decref (o); /* no-op, 'o' is assumed identical to hp->data */ } else if (o && !hp->data) { @@ -164,6 +184,7 @@ int cache_entry_set_json (struct cache_entry *hp, json_t *o) json_decref (hp->data); hp->data = NULL; } + hp->type = CACHE_DATA_TYPE_JSON; return 0; } return -1; @@ -173,8 +194,10 @@ void cache_entry_destroy (void *arg) { struct cache_entry *hp = arg; if (hp) { - if (hp->data) - json_decref (hp->data); + if (hp->data) { + if (hp->type == CACHE_DATA_TYPE_JSON) + json_decref (hp->data); + } if (hp->waitlist_notdirty) wait_queue_destroy (hp->waitlist_notdirty); if (hp->waitlist_valid) diff --git a/src/modules/kvs/cache.h b/src/modules/kvs/cache.h index e06dfa920657..0b02e21470fb 100644 --- a/src/modules/kvs/cache.h +++ b/src/modules/kvs/cache.h @@ -6,18 +6,39 @@ #include "src/common/libutil/tstat.h" #include "waitqueue.h" +typedef enum { + CACHE_DATA_TYPE_NONE, + CACHE_DATA_TYPE_JSON, +} cache_data_type_t; + struct cache_entry; struct cache; /* Create/destroy cache entry. - * In cache_entry_create_json(), create transfers ownership of 'o' to - * the cache entry. If 'o' is NULL, calls cache_entry_create(). + * + * cache_entry_create() creates an entry, setting the cache entry type + * to CACHE_DATA_TYPE_NONE. + * + * cache_entry_create_json() creates an entry, setting the cache entry + * type to CACHE_DATA_TYPE_JSON. The create transfers ownership of 'o' to + * the cache entry. If 'o' is NULL, no data is set, but the type is + * still set to CACHE_DATA_TYPE_JSON and only json can be used for the entry. */ struct cache_entry *cache_entry_create (void); struct cache_entry *cache_entry_create_json (json_t *o); void cache_entry_destroy (void *arg); +/* Return what data type is stored in the cache entry or will be + * stored in the cache entry. CACHE_DATA_TYPE_NONE means it has not + * yet been determined. Returns 0 on success, -1 on error. + * + * For convenience, cache_entry_is_type_json() checks specifically if + * an entry is of type json. + */ +int cache_entry_type (struct cache_entry *hp, cache_data_type_t *t); +bool cache_entry_is_type_json (struct cache_entry *hp); + /* Return true if cache entry contains valid json. * False would indicate that a load RPC is in progress. */ @@ -48,8 +69,13 @@ int cache_entry_clear_dirty (struct cache_entry *hp); int cache_entry_force_clear_dirty (struct cache_entry *hp); /* Accessors for cache entry data. - * If non-NULL, set transfers ownership of 'o' to the cache entry. - * An invalid->valid transition runs the entry's wait queue, if any. + * + * json get accessor must have type of CACHE_DATA_TYPE_JSON to + * retrieve json object. json set accessor must have type of + * CACHE_DATA_TYPE_NONE or CACHE_DATA_TYPE_JSON to set json object. + * After setting, the type is converted to CACHE_DATA_TYPE_JSON. If + * non-NULL, set transfers ownership of 'o' to the cache entry. An + * invalid->valid transition runs the entry's wait queue, if any. * cache_entry_set_json() returns -1 on error, 0 on success */ json_t *cache_entry_get_json (struct cache_entry *hp); diff --git a/src/modules/kvs/test/cache.c b/src/modules/kvs/test/cache.c index 513f1b71e962..e33cf85dc83f 100644 --- a/src/modules/kvs/test/cache.c +++ b/src/modules/kvs/test/cache.c @@ -42,8 +42,11 @@ void cache_entry_tests (void) { struct cache_entry *e; json_t *otmp, *o1, *o2; + cache_data_type_t t; /* corner case tests */ + ok (cache_entry_type (NULL, NULL) < 0, + "cache_entry_type fails with bad input"); ok (cache_entry_set_json (NULL, NULL) < 0, "cache_entry_set_json fails with bad input"); cache_entry_destroy (NULL); @@ -57,6 +60,12 @@ void cache_entry_tests (void) ok ((e = cache_entry_create ()) != NULL, "cache_entry_create works"); + ok (cache_entry_type (e, &t) == 0, + "cache_entry_type success"); + ok (t == CACHE_DATA_TYPE_NONE, + "cache_entry_type returns NONE"); + ok (cache_entry_is_type_json (e) == false, + "cache_entry_is_type_json returns false"); ok (cache_entry_get_valid (e) == false, "cache entry initially non-valid"); ok (cache_entry_get_dirty (e) == false, @@ -72,6 +81,12 @@ void cache_entry_tests (void) ok ((e = cache_entry_create_json (NULL)) != NULL, "cache_entry_create_json w/ NULL input works"); + ok (cache_entry_type (e, &t) == 0, + "cache_entry_type success"); + ok (t == CACHE_DATA_TYPE_JSON, + "cache_entry_type returns JSON"); + ok (cache_entry_is_type_json (e) == true, + "cache_entry_is_type_json returns true"); ok (cache_entry_get_valid (e) == false, "cache entry initially non-valid"); ok (cache_entry_get_dirty (e) == false, @@ -85,15 +100,20 @@ void cache_entry_tests (void) cache_entry_destroy (e); e = NULL; - /* test empty cache entry created by cache_entry_create_json(), - * later filled with data. + /* test empty cache entry created, later filled with data. */ o1 = json_object (); json_object_set_new (o1, "foo", json_integer (42)); - ok ((e = cache_entry_create_json (NULL)) != NULL, - "cache_entry_create_json w/ NULL input works"); + ok ((e = cache_entry_create ()) != NULL, + "cache_entry_create works"); + ok (cache_entry_type (e, &t) == 0, + "cache_entry_type success"); + ok (t == CACHE_DATA_TYPE_NONE, + "cache_entry_type returns NONE"); + ok (cache_entry_is_type_json (e) == false, + "cache_entry_is_type_json returns false"); ok (cache_entry_get_valid (e) == false, "cache entry initially non-valid"); ok (cache_entry_get_dirty (e) == false, @@ -106,8 +126,14 @@ void cache_entry_tests (void) "cache_entry_get_json returns NULL, no json set"); ok (cache_entry_set_json (e, o1) == 0, "cache_entry_set_json success"); + ok (cache_entry_type (e, &t) == 0, + "cache_entry_type success"); + ok (t == CACHE_DATA_TYPE_JSON, + "cache_entry_type returns JSON"); ok (cache_entry_get_valid (e) == true, "cache entry now valid after cache_entry_set_json call"); + ok (cache_entry_is_type_json (e) == true, + "cache_entry_is_type_json returns true"); cache_entry_destroy (e); /* destroys o1 */ e = NULL; @@ -118,6 +144,12 @@ void cache_entry_tests (void) ok ((e = cache_entry_create_json (o1)) != NULL, "cache_entry_create_json w/ non-NULL input works"); + ok (cache_entry_type (e, &t) == 0, + "cache_entry_type success"); + ok (t == CACHE_DATA_TYPE_JSON, + "cache_entry_type returns JSON"); + ok (cache_entry_is_type_json (e) == true, + "cache_entry_is_type_json returns true"); ok (cache_entry_get_valid (e) == true, "cache entry initially valid"); ok (cache_entry_get_dirty (e) == false, From 31db337239762b728ebee5d53a96ac8bdd5ea3c4 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Fri, 22 Sep 2017 14:03:28 -0700 Subject: [PATCH 4/9] modules/kvs/cache: Add raw cache entry support Add support for cache entries that support storing raw data. Included in this patch is new function cache_entry_create_raw(), cache_entry_is_type_raw(), cache_entry_get_raw(), and cache_entry_set_raw(). Add unit tests appropriately. --- src/modules/kvs/cache.c | 91 ++++++++++++++++++-- src/modules/kvs/cache.h | 50 ++++++++--- src/modules/kvs/test/cache.c | 161 +++++++++++++++++++++++++++++++++-- 3 files changed, 275 insertions(+), 27 deletions(-) diff --git a/src/modules/kvs/cache.c b/src/modules/kvs/cache.c index f462e064a59c..fe0eeb1f97b6 100644 --- a/src/modules/kvs/cache.c +++ b/src/modules/kvs/cache.c @@ -52,6 +52,7 @@ struct cache_entry { waitqueue_t *waitlist_notdirty; waitqueue_t *waitlist_valid; void *data; /* value object/data */ + int len; cache_data_type_t type; /* what does data point to */ int lastuse_epoch; /* time of last use for cache expiry */ uint8_t dirty:1; @@ -83,6 +84,25 @@ struct cache_entry *cache_entry_create_json (json_t *o) return hp; } +struct cache_entry *cache_entry_create_raw (void *data, int len) +{ + struct cache_entry *hp; + + if ((data && len <= 0) || (!data && len)) { + errno = EINVAL; + return NULL; + } + + if (!(hp = cache_entry_create ())) + return NULL; + if (data) { + hp->data = data; + hp->len = len; + } + hp->type = CACHE_DATA_TYPE_RAW; + return hp; +} + int cache_entry_type (struct cache_entry *hp, cache_data_type_t *t) { if (hp) { @@ -98,6 +118,11 @@ bool cache_entry_is_type_json (struct cache_entry *hp) return (hp && hp->type == CACHE_DATA_TYPE_JSON); } +bool cache_entry_is_type_raw (struct cache_entry *hp) +{ + return (hp && hp->type == CACHE_DATA_TYPE_RAW); +} + bool cache_entry_get_valid (struct cache_entry *hp) { return (hp && hp->data != NULL); @@ -190,6 +215,49 @@ int cache_entry_set_json (struct cache_entry *hp, json_t *o) return -1; } +void *cache_entry_get_raw (struct cache_entry *hp, int *len) +{ + if (!hp || !hp->data || hp->type != CACHE_DATA_TYPE_RAW) + return NULL; + if (len) + (*len) = hp->len; + return hp->data; +} + +int cache_entry_set_raw (struct cache_entry *hp, void *data, int len) +{ + if ((data && len <= 0) || (!data && len)) { + errno = EINVAL; + return -1; + } + + if (hp + && (hp->type == CACHE_DATA_TYPE_NONE + || hp->type == CACHE_DATA_TYPE_RAW)) { + if ((data && hp->data) || (!data && !hp->data)) { + free (data); /* no-op, 'data' is assumed identical to hp->data */ + } else if (data && !hp->data) { + hp->data = data; + hp->len = len; + if (hp->waitlist_valid) { + if (wait_runqueue (hp->waitlist_valid) < 0) { + /* set back to orig */ + hp->data = NULL; + hp->len = 0; + return -1; + } + } + } else if (!data && hp->data) { + free (hp->data); + hp->data = NULL; + hp->len = 0; + } + hp->type = CACHE_DATA_TYPE_RAW; + return 0; + } + return -1; +} + void cache_entry_destroy (void *arg) { struct cache_entry *hp = arg; @@ -197,6 +265,8 @@ void cache_entry_destroy (void *arg) if (hp->data) { if (hp->type == CACHE_DATA_TYPE_JSON) json_decref (hp->data); + else if (hp->type == CACHE_DATA_TYPE_RAW) + free (hp->data); } if (hp->waitlist_notdirty) wait_queue_destroy (hp->waitlist_notdirty); @@ -330,15 +400,20 @@ int cache_get_stats (struct cache *cache, tstat_t *ts, int *sizep, while ((ref = zlist_pop (keys))) { hp = zhash_lookup (cache->zh, ref); if (cache_entry_get_valid (hp)) { - /* must pass JSON_ENCODE_ANY, object could be anything */ - char *s = json_dumps (hp->data, JSON_ENCODE_ANY); - int obj_size; - if (!s) { - saved_errno = ENOMEM; - goto cleanup; + int obj_size = 0; + + if (hp->type == CACHE_DATA_TYPE_JSON) { + /* must pass JSON_ENCODE_ANY, object could be anything */ + char *s = json_dumps (hp->data, JSON_ENCODE_ANY); + if (!s) { + saved_errno = ENOMEM; + goto cleanup; + } + obj_size = strlen (s); + free (s); } - obj_size = strlen (s); - free (s); + else if (hp->type == CACHE_DATA_TYPE_RAW) + obj_size = hp->len; size += obj_size; tstat_push (ts, obj_size); } else diff --git a/src/modules/kvs/cache.h b/src/modules/kvs/cache.h index 0b02e21470fb..46b86bc9d9e7 100644 --- a/src/modules/kvs/cache.h +++ b/src/modules/kvs/cache.h @@ -9,6 +9,7 @@ typedef enum { CACHE_DATA_TYPE_NONE, CACHE_DATA_TYPE_JSON, + CACHE_DATA_TYPE_RAW, } cache_data_type_t; struct cache_entry; @@ -21,12 +22,20 @@ struct cache; * to CACHE_DATA_TYPE_NONE. * * cache_entry_create_json() creates an entry, setting the cache entry - * type to CACHE_DATA_TYPE_JSON. The create transfers ownership of 'o' to - * the cache entry. If 'o' is NULL, no data is set, but the type is - * still set to CACHE_DATA_TYPE_JSON and only json can be used for the entry. + * type to CACHE_DATA_TYPE_JSON. The create transfers ownership of + * 'o' to the cache entry. On destroy, json_decref() will be called + * on 'o'. If 'o' is NULL, no data is set, but the type is still set + * to CACHE_DATA_TYPE_JSON and only json can be used for the entry. + * + * cache_entry_create_raw() creates an entry, setting the cache entry + * type to CACHE_DATA_TYPE_RAW. The create transfers ownership of + * 'data' to the cache entry. On destroy, free() will be called on + * 'data'. If 'data' is NULL, no data is set, but the type is still + * set to CACHE_DATA_TYPE_RAW and only raw can be used for the entry. */ struct cache_entry *cache_entry_create (void); struct cache_entry *cache_entry_create_json (json_t *o); +struct cache_entry *cache_entry_create_raw (void *data, int len); void cache_entry_destroy (void *arg); /* Return what data type is stored in the cache entry or will be @@ -34,12 +43,14 @@ void cache_entry_destroy (void *arg); * yet been determined. Returns 0 on success, -1 on error. * * For convenience, cache_entry_is_type_json() checks specifically if - * an entry is of type json. + * an entry is of type json. cache_entry_is_type_raw() checks + * specifically if an entry is of type raw. */ int cache_entry_type (struct cache_entry *hp, cache_data_type_t *t); bool cache_entry_is_type_json (struct cache_entry *hp); +bool cache_entry_is_type_raw (struct cache_entry *hp); -/* Return true if cache entry contains valid json. +/* Return true if cache entry contains valid json or raw data. * False would indicate that a load RPC is in progress. */ bool cache_entry_get_valid (struct cache_entry *hp); @@ -71,16 +82,33 @@ int cache_entry_force_clear_dirty (struct cache_entry *hp); /* Accessors for cache entry data. * * json get accessor must have type of CACHE_DATA_TYPE_JSON to - * retrieve json object. json set accessor must have type of - * CACHE_DATA_TYPE_NONE or CACHE_DATA_TYPE_JSON to set json object. - * After setting, the type is converted to CACHE_DATA_TYPE_JSON. If - * non-NULL, set transfers ownership of 'o' to the cache entry. An - * invalid->valid transition runs the entry's wait queue, if any. - * cache_entry_set_json() returns -1 on error, 0 on success + * retrieve json object. + * + * raw get accessor must have type of CACHE_DATA_TYPE_RAW to + * retrieve raw data. + * + * json set accessor must have type of CACHE_DATA_TYPE_NONE or + * CACHE_DATA_TYPE_JSON to set json object. After setting, the type + * is converted to CACHE_DATA_TYPE_JSON. If non-NULL, set transfers + * ownership of 'o' to the cache entry. + * + * raw set accessor must have type of CACHE_DATA_TYPE_NONE or + * CACHE_DATA_TYPE_RAW to set raw data. After setting, the type is + * converted to CACHE_DATA_TYPE_RAW. If non-NULL, set transfers + * ownership of 'data' to the cache entry. + * + * An invalid->valid transition runs the entry's wait queue, if any in + * both set accessors. + * + * cache_entry_set_json() & cache_entry_set_raw() returns -1 on error, + * 0 on success */ json_t *cache_entry_get_json (struct cache_entry *hp); int cache_entry_set_json (struct cache_entry *hp, json_t *o); +void *cache_entry_get_raw (struct cache_entry *hp, int *len); +int cache_entry_set_raw (struct cache_entry *hp, void *data, int len); + /* Arrange for message handler represented by 'wait' to be restarted * once cache entry becomes valid or not dirty at completion of a * load or store RPC. diff --git a/src/modules/kvs/test/cache.c b/src/modules/kvs/test/cache.c index e33cf85dc83f..92458cc5966e 100644 --- a/src/modules/kvs/test/cache.c +++ b/src/modules/kvs/test/cache.c @@ -38,23 +38,27 @@ void cache_tests (void) cache_destroy (cache); } -void cache_entry_tests (void) +void cache_entry_basic_tests (void) { struct cache_entry *e; - json_t *otmp, *o1, *o2; + json_t *otmp; cache_data_type_t t; /* corner case tests */ + ok (cache_entry_create_raw (NULL, 5) == NULL, + "cache_entry_create_raw fails with bad input"); ok (cache_entry_type (NULL, NULL) < 0, "cache_entry_type fails with bad input"); ok (cache_entry_set_json (NULL, NULL) < 0, "cache_entry_set_json fails with bad input"); cache_entry_destroy (NULL); diag ("cache_entry_destroy accept NULL arg"); - - /* Play with one entry. - * N.B.: json ref is NOT incremented by create or get_json. - */ + ok ((e = cache_entry_create_raw (NULL, 0)) != NULL, + "cache_entry_create_raw success"); + ok (cache_entry_set_raw (e, "abcd", -1) < 0, + "cache_entry_set_raw fails with bad input"); + cache_entry_destroy (e); + e = NULL; /* test empty cache entry created by cache_entry_create() */ @@ -66,6 +70,8 @@ void cache_entry_tests (void) "cache_entry_type returns NONE"); ok (cache_entry_is_type_json (e) == false, "cache_entry_is_type_json returns false"); + ok (cache_entry_is_type_raw (e) == false, + "cache_entry_is_type_raw returns false"); ok (cache_entry_get_valid (e) == false, "cache entry initially non-valid"); ok (cache_entry_get_dirty (e) == false, @@ -74,8 +80,21 @@ void cache_entry_tests (void) "cache_entry_set_dirty fails b/c entry non-valid"); ok ((otmp = cache_entry_get_json (e)) == NULL, "cache_entry_get_json returns NULL, no json set"); + ok (cache_entry_get_raw (e, NULL) == NULL, + "cache_entry_get_json returns NULL, no json set"); cache_entry_destroy (e); e = NULL; +} + +void cache_entry_json_tests (void) +{ + struct cache_entry *e; + json_t *otmp, *o1, *o2; + cache_data_type_t t; + + /* Play with one entry. + * N.B.: json ref is NOT incremented by create or get_json. + */ /* test empty cache entry created by cache_entry_create_json() */ @@ -100,7 +119,7 @@ void cache_entry_tests (void) cache_entry_destroy (e); e = NULL; - /* test empty cache entry created, later filled with data. + /* test empty cache entry, later filled with json. */ o1 = json_object (); @@ -134,6 +153,8 @@ void cache_entry_tests (void) "cache entry now valid after cache_entry_set_json call"); ok (cache_entry_is_type_json (e) == true, "cache_entry_is_type_json returns true"); + ok (cache_entry_set_raw (e, "abcd", 4) < 0, + "cache_entry_set_raw fails on cache entry with type json"); cache_entry_destroy (e); /* destroys o1 */ e = NULL; @@ -189,6 +210,128 @@ void cache_entry_tests (void) e = NULL; } +void cache_entry_raw_tests (void) +{ + struct cache_entry *e; + json_t *o1; + cache_data_type_t t; + char *data; + int len; + + /* test empty cache entry created by cache_entry_create_raw() */ + + ok ((e = cache_entry_create_raw (NULL, 0)) != NULL, + "cache_entry_create_raw w/ NULL input works"); + ok (cache_entry_type (e, &t) == 0, + "cache_entry_type success"); + ok (t == CACHE_DATA_TYPE_RAW, + "cache_entry_type returns RAW"); + ok (cache_entry_is_type_raw (e) == true, + "cache_entry_is_type_raw returns true"); + ok (cache_entry_get_valid (e) == false, + "cache entry initially non-valid"); + ok (cache_entry_get_dirty (e) == false, + "cache entry initially not dirty"); + ok (cache_entry_set_dirty (e, true) < 0, + "cache_entry_set_dirty fails b/c entry non-valid"); + ok (cache_entry_get_dirty (e) == false, + "cache entry does not set dirty, b/c no data"); + ok ((data = cache_entry_get_raw (e, NULL)) == NULL, + "cache_entry_get_raw returns NULL, no data set"); + cache_entry_destroy (e); + e = NULL; + + /* test empty cache entry, later filled with raw data. + */ + + data = strdup ("abcd"); + + ok ((e = cache_entry_create ()) != NULL, + "cache_entry_create works"); + ok (cache_entry_type (e, &t) == 0, + "cache_entry_type success"); + ok (t == CACHE_DATA_TYPE_NONE, + "cache_entry_type returns NONE"); + ok (cache_entry_is_type_raw (e) == false, + "cache_entry_is_type_raw returns false"); + ok (cache_entry_get_valid (e) == false, + "cache entry initially non-valid"); + ok (cache_entry_get_dirty (e) == false, + "cache entry initially not dirty"); + ok (cache_entry_set_dirty (e, true) < 0, + "cache_entry_set_dirty fails b/c entry non-valid"); + ok (cache_entry_get_dirty (e) == false, + "cache entry does not set dirty, b/c no data"); + ok (cache_entry_get_raw (e, NULL) == NULL, + "cache_entry_get_raw returns NULL, no data set"); + ok (cache_entry_set_raw (e, data, strlen (data) + 1) == 0, + "cache_entry_set_raw success"); + ok (cache_entry_type (e, &t) == 0, + "cache_entry_type success"); + ok (t == CACHE_DATA_TYPE_RAW, + "cache_entry_type returns RAW"); + ok (cache_entry_is_type_raw (e) == true, + "cache_entry_is_type_raw returns true"); + ok (cache_entry_get_valid (e) == true, + "cache entry now valid after cache_entry_set_raw call"); + o1 = json_object (); + json_object_set_new (o1, "foo", json_integer (42)); + ok (cache_entry_set_json (e, o1) < 0, + "cache_entry_set_json fails on cache entry with type raw"); + json_decref (o1); + cache_entry_destroy (e); /* destroys data */ + e = NULL; + + /* test cache entry filled with raw data initially */ + + data = strdup ("abcd"); + + ok ((e = cache_entry_create_raw (data, strlen (data) + 1)) != NULL, + "cache_entry_create_json w/ non-NULL input works"); + ok (cache_entry_type (e, &t) == 0, + "cache_entry_type success"); + ok (t == CACHE_DATA_TYPE_RAW, + "cache_entry_type returns RAW"); + ok (cache_entry_is_type_raw (e) == true, + "cache_entry_is_type_raw returns true"); + ok (cache_entry_get_valid (e) == true, + "cache entry initially valid"); + ok (cache_entry_get_dirty (e) == false, + "cache entry initially not dirty"); + ok (cache_entry_set_dirty (e, true) == 0, + "cache_entry_set_dirty success"); + ok (cache_entry_get_dirty (e) == true, + "cache entry succcessfully set dirty"); + + ok (cache_entry_clear_dirty (e) == 0, + "cache_entry_clear_dirty returns 0, b/c no waiters"); + ok (cache_entry_get_dirty (e) == false, + "cache entry succcessfully now not dirty"); + + ok (cache_entry_set_dirty (e, true) == 0, + "cache_entry_set_dirty success"); + ok (cache_entry_get_dirty (e) == true, + "cache entry succcessfully set dirty"); + ok (cache_entry_force_clear_dirty (e) == 0, + "cache_entry_force_clear_dirty returns 0"); + ok (cache_entry_get_dirty (e) == false, + "cache entry succcessfully now not dirty"); + + ok ((data = cache_entry_get_raw (e, &len)) != NULL, + "raw data retrieved from cache entry"); + ok (strcmp (data, "abcd") == 0, + "raw data matches expected string"); + ok (len == strlen (data) + 1, + "raw data length matches expected length"); + ok (cache_entry_set_raw (e, NULL, 0) == 0, + "cache_entry_set_raw success"); + ok (cache_entry_get_raw (e, NULL) == NULL, + "cache entry no longer has data object"); + + cache_entry_destroy (e); /* destroys data */ + e = NULL; +} + void waiter_tests (void) { struct cache_entry *e; @@ -454,7 +597,9 @@ int main (int argc, char *argv[]) plan (NO_PLAN); cache_tests (); - cache_entry_tests (); + cache_entry_basic_tests (); + cache_entry_json_tests (); + cache_entry_raw_tests (); waiter_tests (); cache_expiration_tests (); cache_remove_entry_tests (); From 8218587e7bdccfa99a8a1d3aae1b16efdbbb2c4a Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Tue, 19 Sep 2017 16:45:09 -0700 Subject: [PATCH 5/9] modules/kvs/lookup: Update for raw opaque data Update internal lookup API to handle valref references pointing to raw unencoded data in the content store. Most notably, missing references have an additional boolean indicating if data is raw, so that on data load it can be handled appropriately. In addition, when returning data pointed to by a valref, add checks to ensure the data is raw in the KVS cache. Adjust lookup unit tests appropriately. --- src/modules/kvs/lookup.c | 78 +++++++++++++++++++++++------------ src/modules/kvs/lookup.h | 8 +++- src/modules/kvs/test/lookup.c | 77 ++++++++++++++-------------------- 3 files changed, 88 insertions(+), 75 deletions(-) diff --git a/src/modules/kvs/lookup.c b/src/modules/kvs/lookup.c index 2ebaa028d514..f249ad1e29c8 100644 --- a/src/modules/kvs/lookup.c +++ b/src/modules/kvs/lookup.c @@ -79,6 +79,7 @@ struct lookup { /* potential return values from lookup */ json_t *val; /* value of lookup */ const char *missing_ref; /* on stall, missing ref to load */ + bool missing_ref_raw; /* if true, missing ref points to raw data */ int errnum; /* errnum if error */ /* API internal */ @@ -224,6 +225,7 @@ static bool walk (lookup_t *lh) /* Get directory of dirent */ if (treeobj_is_dirref (wl->dirent)) { + struct cache_entry *hp; const char *refstr; int refcount; @@ -243,13 +245,23 @@ static bool walk (lookup_t *lh) goto error; } - if (!(dir = cache_lookup_and_get_json (lh->cache, - refstr, - lh->current_epoch))) { + if (!(hp = cache_lookup (lh->cache, refstr, lh->current_epoch)) + || !cache_entry_get_valid (hp)) { lh->missing_ref = refstr; + lh->missing_ref_raw = false; goto stall; } - + if (!(dir = cache_entry_get_json (hp))) { + /* dirref pointed to non json error, special case when + * root_dirent is bad, is EINVAL from user. + */ + flux_log (lh->h, LOG_ERR, "dirref points to non-json"); + if (wl->depth == 0 && wl->dirent == lh->root_dirent) + lh->errnum = EINVAL; + else + lh->errnum = EPERM; + goto error; + } if (!treeobj_is_dir (dir)) { /* dirref pointed to non-dir error, special case when * root_dirent is bad, is EINVAL from user. @@ -414,6 +426,7 @@ lookup_t *lookup_create (struct cache *cache, lh->val = NULL; lh->missing_ref = NULL; + lh->missing_ref_raw = false; lh->errnum = 0; if (!(lh->root_dirent = treeobj_create_dirref (lh->root_ref))) { @@ -487,14 +500,17 @@ json_t *lookup_get_value (lookup_t *lh) return NULL; } -const char *lookup_get_missing_ref (lookup_t *lh) +const char *lookup_get_missing_ref (lookup_t *lh, bool *ref_raw) { if (lh && lh->magic == LOOKUP_MAGIC && (lh->state == LOOKUP_STATE_CHECK_ROOT || lh->state == LOOKUP_STATE_WALK - || lh->state == LOOKUP_STATE_VALUE)) + || lh->state == LOOKUP_STATE_VALUE)) { + if (ref_raw) + (*ref_raw) = lh->missing_ref_raw; return lh->missing_ref; + } return NULL; } @@ -569,7 +585,7 @@ bool lookup (lookup_t *lh) { json_t *valtmp = NULL; const char *reftmp; - const char *strtmp; + struct cache_entry *hp; int refcount; if (!lh || lh->magic != LOOKUP_MAGIC) { @@ -592,18 +608,25 @@ bool lookup (lookup_t *lh) lh->errnum = EISDIR; goto done; } - valtmp = cache_lookup_and_get_json (lh->cache, - lh->root_ref, - lh->current_epoch); - if (!valtmp) { + if (!(hp = cache_lookup (lh->cache, + lh->root_ref, + lh->current_epoch)) + || !cache_entry_get_valid (hp)) { lh->state = LOOKUP_STATE_CHECK_ROOT; lh->missing_ref = lh->root_ref; + lh->missing_ref_raw = false; goto stall; } - if (!treeobj_is_dir (valtmp)) { + if (!(valtmp = cache_entry_get_json (hp))) { + flux_log (lh->h, LOG_ERR, "root_ref points to non-json"); lh->errnum = EINVAL; goto done; } + if (!treeobj_is_dir (valtmp)) { + /* root_ref points to not dir */ + lh->errnum = EPERM; + goto done; + } lh->val = json_incref (valtmp); } goto done; @@ -652,13 +675,17 @@ bool lookup (lookup_t *lh) lh->errnum = errno; goto done; } - valtmp = cache_lookup_and_get_json (lh->cache, - reftmp, - lh->current_epoch); - if (!valtmp) { + if (!(hp = cache_lookup (lh->cache, reftmp, lh->current_epoch)) + || !cache_entry_get_valid (hp)) { lh->missing_ref = reftmp; + lh->missing_ref_raw = false; goto stall; } + if (!(valtmp = cache_entry_get_json (hp))) { + flux_log (lh->h, LOG_ERR, "dirref points to non-json"); + lh->errnum = EPERM; + goto done; + } if (!treeobj_is_dir (valtmp)) { /* dirref points to not dir */ lh->errnum = EPERM; @@ -666,6 +693,9 @@ bool lookup (lookup_t *lh) } lh->val = json_incref (valtmp); } else if (treeobj_is_valref (lh->wdirent)) { + void *valdata; + int len; + if ((lh->flags & FLUX_KVS_READLINK)) { lh->errnum = EINVAL; goto done; @@ -688,22 +718,18 @@ bool lookup (lookup_t *lh) lh->errnum = errno; goto done; } - valtmp = cache_lookup_and_get_json (lh->cache, - reftmp, - lh->current_epoch); - if (!valtmp) { + if (!(hp = cache_lookup (lh->cache, reftmp, lh->current_epoch)) + || !cache_entry_get_valid (hp)) { lh->missing_ref = reftmp; + lh->missing_ref_raw = true; goto stall; } - if (!json_is_string (valtmp)) { - /* valref points to non-string */ + if (!(valdata = cache_entry_get_raw (hp, &len))) { + flux_log (lh->h, LOG_ERR, "valref points to non-raw data"); lh->errnum = EPERM; goto done; } - - /* Place base64 opaque data into treeobj val object */ - strtmp = json_string_value (valtmp); - if (!(lh->val = treeobj_create_val_base64 (strtmp))) { + if (!(lh->val = treeobj_create_val (valdata, len))) { lh->errnum = errno; goto done; } diff --git a/src/modules/kvs/lookup.h b/src/modules/kvs/lookup.h index 80cae39cbfc2..ad18713708a2 100644 --- a/src/modules/kvs/lookup.h +++ b/src/modules/kvs/lookup.h @@ -34,8 +34,12 @@ int lookup_get_errnum (lookup_t *lh); json_t *lookup_get_value (lookup_t *lh); /* Get missing ref after a lookup stall, missing reference can then be - * used to load reference into the KVS cache */ -const char *lookup_get_missing_ref (lookup_t *lh); + * used to load reference into the KVS cache + * + * If the missing references points to raw data, 'ref_raw' will be set + * to true, otherwise false. + */ +const char *lookup_get_missing_ref (lookup_t *lh, bool *ref_raw); /* Convenience function to get cache from earlier instantiation. * Convenient if replaying RPC and don't have it presently. diff --git a/src/modules/kvs/test/lookup.c b/src/modules/kvs/test/lookup.c index d191810af1f4..25ad6ce6c6c0 100644 --- a/src/modules/kvs/test/lookup.c +++ b/src/modules/kvs/test/lookup.c @@ -10,7 +10,7 @@ #include "src/modules/kvs/lookup.h" #include "src/modules/kvs/kvs_util.h" #include "src/modules/kvs/types.h" -#include "src/common/libutil/base64.h" +#include "src/common/libutil/blobref.h" void basic_api (void) { @@ -116,7 +116,7 @@ void basic_api_errors (void) "lookup_get_errnum returns EINVAL b/c lookup not yet started"); ok (lookup_get_value (lh) == NULL, "lookup_get_value fails b/c lookup not yet started"); - ok (lookup_get_missing_ref (lh) == NULL, + ok (lookup_get_missing_ref (lh, NULL) == NULL, "lookup_get_missing_ref fails b/c lookup not yet started"); ok (lookup_validate (NULL) == false, @@ -127,7 +127,7 @@ void basic_api_errors (void) "lookup_get_errnum returns EINVAL on NULL pointer"); ok (lookup_get_value (NULL) == NULL, "lookup_get_value fails on NULL pointer"); - ok (lookup_get_missing_ref (NULL) == NULL, + ok (lookup_get_missing_ref (NULL, NULL) == NULL, "lookup_get_missing_ref fails on NULL pointer"); ok (lookup_get_cache (NULL) == NULL, "lookup_get_cache fails on NULL pointer"); @@ -162,7 +162,7 @@ void basic_api_errors (void) "lookup_get_errnum returns EINVAL on bad pointer"); ok (lookup_get_value (lh) == NULL, "lookup_get_value fails on bad pointer"); - ok (lookup_get_missing_ref (lh) == NULL, + ok (lookup_get_missing_ref (lh, NULL) == NULL, "lookup_get_missing_ref fails on bad pointer"); ok (lookup_get_cache (lh) == NULL, "lookup_get_cache fails on bad pointer"); @@ -188,26 +188,12 @@ void basic_api_errors (void) cache_destroy (cache); } -json_t *get_json_base64_string (const char *s) -{ - int len, xlen; - char *xdata; - json_t *rv; - - len = strlen (s); - xlen = base64_encode_length (len); - xdata = malloc (xlen); - base64_encode_block (xdata, &xlen, s, len); - rv = json_string (xdata); - free (xdata); - return rv; -} - void check_common (lookup_t *lh, bool lookup_result, int get_errnum_result, json_t *get_value_result, const char *missing_ref_result, + bool missing_ref_raw, const char *msg, bool destroy_lookup) { @@ -236,8 +222,9 @@ void check_common (lookup_t *lh, } if (missing_ref_result) { const char *missing_ref; + bool ref_raw; - ok ((missing_ref = lookup_get_missing_ref (lh)) != NULL, + ok ((missing_ref = lookup_get_missing_ref (lh, &ref_raw)) != NULL, "%s: lookup_get_missing_ref returns expected non-NULL result", msg); if (missing_ref) { @@ -247,9 +234,12 @@ void check_common (lookup_t *lh, else { ok (false, "%s: missing ref returned matched expectation", msg); } + + ok (ref_raw == missing_ref_raw, + "%s: lookup_get_missing_ref returned expected ref_raw", msg); } else { - ok (lookup_get_missing_ref (lh) == NULL, + ok (lookup_get_missing_ref (lh, NULL) == NULL, "%s: lookup_get_missing_ref returns NULL as expected", msg); } @@ -267,6 +257,7 @@ void check (lookup_t *lh, get_errnum_result, get_value_result, NULL, + false, /* doesn't matter */ msg, true); } @@ -274,6 +265,7 @@ void check (lookup_t *lh, void check_stall (lookup_t *lh, int get_errnum_result, const char *missing_ref_result, + bool missing_ref_raw, const char *msg) { check_common (lh, @@ -281,6 +273,7 @@ void check_stall (lookup_t *lh, get_errnum_result, NULL, missing_ref_result, + missing_ref_raw, msg, false); } @@ -288,7 +281,6 @@ void check_stall (lookup_t *lh, /* lookup tests on root dir */ void lookup_root (void) { json_t *root; - json_t *opaque_data; json_t *test; struct cache *cache; lookup_t *lh; @@ -307,9 +299,8 @@ void lookup_root (void) { * treeobj dir, no entries */ - opaque_data = get_json_base64_string ("abcd"); - kvs_util_json_hash ("sha1", opaque_data, valref_ref); - cache_insert (cache, valref_ref, cache_entry_create_json (opaque_data)); + blobref_hash ("sha1", "abcd", 4, valref_ref, sizeof (href_t)); + cache_insert (cache, valref_ref, cache_entry_create_raw (strdup ("abcd"), 4)); root = treeobj_create_dir (); kvs_util_json_hash ("sha1", root, root_ref); @@ -369,7 +360,6 @@ void lookup_basic (void) { json_t *root; json_t *dirref; json_t *dir; - json_t *opaque_data; json_t *test; struct cache *cache; lookup_t *lh; @@ -395,9 +385,8 @@ void lookup_basic (void) { * "dirref" : dirref to dirref_ref */ - opaque_data = get_json_base64_string ("abcd"); - kvs_util_json_hash ("sha1", opaque_data, valref_ref); - cache_insert (cache, valref_ref, cache_entry_create_json (opaque_data)); + blobref_hash ("sha1", "abcd", 4, valref_ref, sizeof (href_t)); + cache_insert (cache, valref_ref, cache_entry_create_raw (strdup ("abcd"), 4)); dir = treeobj_create_dir (); treeobj_insert_entry (dir, "val", treeobj_create_val ("bar", 3)); @@ -549,7 +538,6 @@ void lookup_errors (void) { json_t *root; json_t *dirref; json_t *dir; - json_t *opaque_data; json_t *valref_multi; json_t *dirref_multi; struct cache *cache; @@ -583,9 +571,8 @@ void lookup_errors (void) { * "valref_multi" : valref to [ valref_ref, valref_ref ] */ - opaque_data = get_json_base64_string ("abcd"); - kvs_util_json_hash ("sha1", opaque_data, valref_ref); - cache_insert (cache, valref_ref, cache_entry_create_json (opaque_data)); + blobref_hash ("sha1", "abcd", 4, valref_ref, sizeof (href_t)); + cache_insert (cache, valref_ref, cache_entry_create_raw (strdup ("abcd"), 4)); dirref = treeobj_create_dir (); treeobj_insert_entry (dirref, "val", treeobj_create_val ("bar", 3)); @@ -797,7 +784,7 @@ void lookup_errors (void) { "lookup_create on dirref_bad, in middle of path"); check (lh, EPERM, NULL, "lookup dirref_bad, in middle of path"); - /* Lookup a valref that doesn't point to a base64 string, should get EPERM */ + /* Lookup a valref that doesn't point to raw data, should get EPERM */ ok ((lh = lookup_create (cache, 1, root_ref, @@ -876,7 +863,6 @@ void lookup_links (void) { json_t *dirref2; json_t *dirref3; json_t *dir; - json_t *opaque_data; json_t *test; struct cache *cache; lookup_t *lh; @@ -916,9 +902,8 @@ void lookup_links (void) { * "dirref2" : dirref to "dirref2_ref */ - opaque_data = get_json_base64_string ("abcd"); - kvs_util_json_hash ("sha1", opaque_data, valref_ref); - cache_insert (cache, valref_ref, cache_entry_create_json (opaque_data)); + blobref_hash ("sha1", "abcd", 4, valref_ref, sizeof (href_t)); + cache_insert (cache, valref_ref, cache_entry_create_raw (strdup ("abcd"), 4)); dirref3 = treeobj_create_dir (); treeobj_insert_entry (dirref3, "val", treeobj_create_val ("baz", 3)); @@ -1193,7 +1178,7 @@ void lookup_stall_root (void) { NULL, FLUX_KVS_READDIR)) != NULL, "lookup_create stalltest \".\""); - check_stall (lh, EAGAIN, root_ref, "root \".\" stall"); + check_stall (lh, EAGAIN, root_ref, false, "root \".\" stall"); cache_insert (cache, root_ref, cache_entry_create_json (root)); @@ -1219,7 +1204,6 @@ void lookup_stall (void) { json_t *root; json_t *dirref1; json_t *dirref2; - json_t *opaque_data; json_t *test; struct cache *cache; lookup_t *lh; @@ -1250,8 +1234,7 @@ void lookup_stall (void) { * */ - opaque_data = get_json_base64_string ("abcd"); - kvs_util_json_hash ("sha1", opaque_data, valref_ref); + blobref_hash ("sha1", "abcd", 4, valref_ref, sizeof (href_t)); dirref1 = treeobj_create_dir (); treeobj_insert_entry (dirref1, "val", treeobj_create_val ("foo", 3)); @@ -1279,12 +1262,12 @@ void lookup_stall (void) { NULL, 0)) != NULL, "lookup_create stalltest dirref1.val"); - check_stall (lh, EAGAIN, root_ref, "dirref1.val stall #1"); + check_stall (lh, EAGAIN, root_ref, false, "dirref1.val stall #1"); cache_insert (cache, root_ref, cache_entry_create_json (root)); /* next call to lookup, should stall */ - check_stall (lh, EAGAIN, dirref1_ref, "dirref1.val stall #2"); + check_stall (lh, EAGAIN, dirref1_ref, false, "dirref1.val stall #2"); cache_insert (cache, dirref1_ref, cache_entry_create_json (dirref1)); @@ -1315,7 +1298,7 @@ void lookup_stall (void) { NULL, 0)) != NULL, "lookup_create stalltest symlink.val"); - check_stall (lh, EAGAIN, dirref2_ref, "symlink.val stall"); + check_stall (lh, EAGAIN, dirref2_ref, false, "symlink.val stall"); cache_insert (cache, dirref2_ref, cache_entry_create_json (dirref2)); @@ -1346,9 +1329,9 @@ void lookup_stall (void) { NULL, 0)) != NULL, "lookup_create stalltest dirref1.valref"); - check_stall (lh, EAGAIN, valref_ref, "dirref1.valref stall"); + check_stall (lh, EAGAIN, valref_ref, true, "dirref1.valref stall"); - cache_insert (cache, valref_ref, cache_entry_create_json (opaque_data)); + cache_insert (cache, valref_ref, cache_entry_create_raw (strdup ("abcd"), 4)); /* lookup dirref1.valref, should succeed */ test = treeobj_create_val ("abcd", 4); From df34e214b70f021497ec3c6ab3ec89feb999a92e Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Tue, 26 Sep 2017 17:27:58 -0700 Subject: [PATCH 6/9] libkvs/treeobj: Remove treeobj_create_val_base64 --- src/common/libkvs/test/treeobj.c | 28 ---------------------------- src/common/libkvs/treeobj.c | 20 ++------------------ src/common/libkvs/treeobj.h | 4 +--- 3 files changed, 3 insertions(+), 49 deletions(-) diff --git a/src/common/libkvs/test/treeobj.c b/src/common/libkvs/test/treeobj.c index 8df135d03f07..f6418da7a3d4 100644 --- a/src/common/libkvs/test/treeobj.c +++ b/src/common/libkvs/test/treeobj.c @@ -196,33 +196,6 @@ void test_val (void) json_decref (val2); } -void test_val_base64 (void) -{ - json_t *val; - char *base64 = "NDI="; /* 42 w/o ending NUL byte*/ - char *outbuf; - int outlen; - - ok ((val = treeobj_create_val_base64 (base64)) != NULL, - "treeobj_create_val_base64 works"); - diag_json (val); - ok (treeobj_is_val (val), - "treeobj_is_value returns true"); - ok (treeobj_decode_val (val, (void **)&outbuf, &outlen) == 0, - "treeobj_decode_val works"); - ok (outlen == 2, - "and returned correct size"); - ok (memcmp ("42", outbuf, 2) == 0, - "and returned correct data"); - free (outbuf); - - errno = 0; - ok (treeobj_create_val_base64 (NULL) == NULL && errno == EINVAL, - "treeobj_create_val_base64 NULL fails "); - - json_decref (val); -} - void test_dirref (void) { json_t *dirref; @@ -563,7 +536,6 @@ int main(int argc, char** argv) test_valref (); test_val (); - test_val_base64 (); test_dirref (); test_dir (); test_copy_dir (); diff --git a/src/common/libkvs/treeobj.c b/src/common/libkvs/treeobj.c index ba7cbae57de2..10e8629da109 100644 --- a/src/common/libkvs/treeobj.c +++ b/src/common/libkvs/treeobj.c @@ -380,30 +380,14 @@ json_t *treeobj_create_val (const void *data, int len) } base64_encode_block (xdata, &xlen, data, len); - if (!(obj = treeobj_create_val_base64 (xdata))) - goto done; - -done: - free (xdata); - return obj; -} - -json_t *treeobj_create_val_base64 (const char *data) -{ - json_t *obj = NULL; - - if (!data) { - errno = EINVAL; - return NULL; - } - if (!(obj = json_pack ("{s:i s:s s:s}", "ver", treeobj_version, "type", "val", - "data", data))) { + "data", xdata))) { errno = ENOMEM; goto done; } done: + free (xdata); return obj; } diff --git a/src/common/libkvs/treeobj.h b/src/common/libkvs/treeobj.h index a3acd284d11e..5aaa425ec29f 100644 --- a/src/common/libkvs/treeobj.h +++ b/src/common/libkvs/treeobj.h @@ -9,13 +9,11 @@ /* Create a treeobj * valref, dirref: if blobref is NULL, treeobj_append_blobref() * must be called before object is valid. - * val & val_base64: copies argument (caller retains ownership) - * val_base64: user supplies base64 string + * val: copies argument (caller retains ownership) * Return JSON object on success, NULL on failure with errno set. */ json_t *treeobj_create_symlink (const char *target); json_t *treeobj_create_val (const void *data, int len); -json_t *treeobj_create_val_base64 (const char *data); json_t *treeobj_create_valref (const char *blobref); json_t *treeobj_create_dir (void); json_t *treeobj_create_dirref (const char *blobref); From 4fdefb9ce9478e8b06b88a31b063114418b419bb Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Thu, 21 Sep 2017 15:10:57 -0700 Subject: [PATCH 7/9] modules/kvs/commit: Update for raw opaque data Update internal commit API to handle valref references pointing to raw unencoded data in the content store. Most notably, create cache entries with raw data when appropriate, so it will ultimately get flushed to the content store as raw unencoded data. --- src/modules/kvs/commit.c | 77 +++++++++++++++++++++++++++-------- src/modules/kvs/test/commit.c | 61 +++++++++++++++++++++++---- 2 files changed, 113 insertions(+), 25 deletions(-) diff --git a/src/modules/kvs/commit.c b/src/modules/kvs/commit.c index fa50ba20eaff..20d016ef5864 100644 --- a/src/modules/kvs/commit.c +++ b/src/modules/kvs/commit.c @@ -36,6 +36,7 @@ #include #include +#include "src/common/libutil/base64.h" #include "src/common/libkvs/treeobj.h" #include "commit.h" @@ -182,19 +183,43 @@ static void cleanup_dirty_cache_list (commit_t *c) /* Store object 'o' under key 'ref' in local cache. * Object reference is still owned by the caller. + * 'is_raw' indicates this data is a json string w/ base64 value and + * should be flushed to the content store as raw data. * Returns -1 on error, 0 on success entry already there, 1 on success * entry needs to be flushed to content store */ static int store_cache (commit_t *c, int current_epoch, json_t *o, - href_t ref, struct cache_entry **hpp) + bool is_raw, href_t ref, struct cache_entry **hpp) { struct cache_entry *hp; int saved_errno, rc = -1; - - if (kvs_util_json_hash (c->cm->hash_name, o, ref) < 0) { - saved_errno = errno; - flux_log_error (c->cm->h, "kvs_util_json_hash"); - goto done; + const char *xdata; + char *data = NULL; + int xlen, len; + + if (is_raw) { + xdata = json_string_value (o); + xlen = strlen (xdata); + len = base64_decode_length (xlen); + if (!(data = malloc (len))) { + saved_errno = errno; + flux_log_error (c->cm->h, "malloc"); + goto done; + } + if (base64_decode_block (data, &len, xdata, xlen) < 0) { + free (data); + saved_errno = errno; + flux_log_error (c->cm->h, "base64_decode_block"); + goto done; + } + blobref_hash (c->cm->hash_name, data, len, ref, sizeof (href_t)); + } + else { + if (kvs_util_json_hash (c->cm->hash_name, o, ref) < 0) { + saved_errno = errno; + flux_log_error (c->cm->h, "kvs_util_json_hash"); + goto done; + } } if (!(hp = cache_lookup (c->cm->cache, ref, current_epoch))) { if (!(hp = cache_entry_create ())) { @@ -205,20 +230,34 @@ static int store_cache (commit_t *c, int current_epoch, json_t *o, } if (cache_entry_get_valid (hp)) { c->cm->noop_stores++; + if (is_raw) + free (data); rc = 0; } else { - json_incref (o); - if (cache_entry_set_json (hp, o) < 0) { - int ret; - saved_errno = errno; - json_decref (o); - ret = cache_remove_entry (c->cm->cache, ref); - assert (ret == 1); - goto done; + if (is_raw) { + if (cache_entry_set_raw (hp, data, len) < 0) { + int ret; + saved_errno = errno; + free (data); + ret = cache_remove_entry (c->cm->cache, ref); + assert (ret == 1); + goto done; + } + } + else { + json_incref (o); + if (cache_entry_set_json (hp, o) < 0) { + int ret; + saved_errno = errno; + json_decref (o); + ret = cache_remove_entry (c->cm->cache, ref); + assert (ret == 1); + goto done; + } } if (cache_entry_set_dirty (hp, true) < 0) { - /* cache entry now owns a reference, cache_remove_entry - * will decref object */ + /* cache entry now owns data, cache_remove_entry + * will decref/free object/data */ int ret; saved_errno = errno; ret = cache_remove_entry (c->cm->cache, ref); @@ -264,7 +303,8 @@ static int commit_unroll (commit_t *c, int current_epoch, json_t *dir) if (treeobj_is_dir (dir_entry)) { if (commit_unroll (c, current_epoch, dir_entry) < 0) /* depth first */ return -1; - if ((ret = store_cache (c, current_epoch, dir_entry, ref, &hp)) < 0) + if ((ret = store_cache (c, current_epoch, dir_entry, + false, ref, &hp)) < 0) return -1; if (ret) { if (zlist_push (c->item_callback_list, hp) < 0) { @@ -291,7 +331,7 @@ static int commit_unroll (commit_t *c, int current_epoch, json_t *dir) return -1; if (size > BLOBREF_MAX_STRING_SIZE) { if ((ret = store_cache (c, current_epoch, val_data, - ref, &hp)) < 0) + true, ref, &hp)) < 0) return -1; if (ret) { if (zlist_push (c->item_callback_list, hp) < 0) { @@ -592,6 +632,7 @@ commit_process_t commit_process (commit_t *c, else if ((sret = store_cache (c, current_epoch, c->rootcpy, + false, c->newroot, &hp)) < 0) c->errnum = errno; diff --git a/src/modules/kvs/test/commit.c b/src/modules/kvs/test/commit.c index 5f4547f5d4da..5b912c3eca00 100644 --- a/src/modules/kvs/test/commit.c +++ b/src/modules/kvs/test/commit.c @@ -371,7 +371,7 @@ void commit_basic_tests (void) cache_destroy (cache); } -int cache_count_cb (commit_t *c, struct cache_entry *hp, void *data) +int cache_count_dirty_cb (commit_t *c, struct cache_entry *hp, void *data) { int *count = data; if (cache_entry_get_dirty (hp)) { @@ -438,7 +438,7 @@ void commit_basic_commit_process_test (void) ok (commit_process (c, 1, rootref) == COMMIT_PROCESS_DIRTY_CACHE_ENTRIES, "commit_process returns COMMIT_PROCESS_DIRTY_CACHE_ENTRIES"); - ok (commit_iter_dirty_cache_entries (c, cache_count_cb, &count) == 0, + ok (commit_iter_dirty_cache_entries (c, cache_count_dirty_cb, &count) == 0, "commit_iter_dirty_cache_entries works for dirty cache entries"); ok (count == 1, @@ -484,7 +484,7 @@ void commit_basic_commit_process_test_multiple_fences (void) ok (commit_process (c, 1, rootref) == COMMIT_PROCESS_DIRTY_CACHE_ENTRIES, "commit_process returns COMMIT_PROCESS_DIRTY_CACHE_ENTRIES"); - ok (commit_iter_dirty_cache_entries (c, cache_count_cb, &count) == 0, + ok (commit_iter_dirty_cache_entries (c, cache_count_dirty_cb, &count) == 0, "commit_iter_dirty_cache_entries works for dirty cache entries"); ok (count == 1, @@ -509,7 +509,7 @@ void commit_basic_commit_process_test_multiple_fences (void) count = 0; - ok (commit_iter_dirty_cache_entries (c, cache_count_cb, &count) == 0, + ok (commit_iter_dirty_cache_entries (c, cache_count_dirty_cb, &count) == 0, "commit_iter_dirty_cache_entries works for dirty cache entries"); /* why two? 1 for root (new dir added), 1 for dir.key2 (a new dir) */ @@ -561,7 +561,7 @@ void commit_basic_commit_process_test_multiple_fences_merge (void) ok (commit_process (c, 1, rootref) == COMMIT_PROCESS_DIRTY_CACHE_ENTRIES, "commit_process returns COMMIT_PROCESS_DIRTY_CACHE_ENTRIES"); - ok (commit_iter_dirty_cache_entries (c, cache_count_cb, &count) == 0, + ok (commit_iter_dirty_cache_entries (c, cache_count_dirty_cb, &count) == 0, "commit_iter_dirty_cache_entries works for dirty cache entries"); /* why three? 1 for root, 1 for foo.key1 (a new dir), and 1 for @@ -1420,6 +1420,16 @@ void commit_process_bad_dirrefs (void) { cache_destroy (cache); } +int cache_count_raw_cb (commit_t *c, struct cache_entry *hp, void *data) +{ + int *count = data; + if (cache_entry_is_type_raw (hp)) { + if (count) + (*count)++; + } + return 0; +} + void commit_process_big_fileval (void) { struct cache *cache; commit_mgr_t *cm; @@ -1429,6 +1439,7 @@ void commit_process_big_fileval (void) { const char *newroot; int bigstrsize = BLOBREF_MAX_STRING_SIZE * 2; char bigstr[bigstrsize]; + int count; int i; ok ((cache = cache_create ()) != NULL, @@ -1451,11 +1462,42 @@ void commit_process_big_fileval (void) { ok ((cm = commit_mgr_create (cache, "sha1", NULL, &test_global)) != NULL, "commit_mgr_create works"); + /* first commit a small value, to make sure it isn't type raw in + * the cache */ + + create_ready_commit (cm, "fence1", "val", "smallstr", 0); + + ok ((c = commit_mgr_get_ready_commit (cm)) != NULL, + "commit_mgr_get_ready_commit returns ready commit"); + + ok (commit_process (c, 1, root_ref) == COMMIT_PROCESS_DIRTY_CACHE_ENTRIES, + "commit_process returns COMMIT_PROCESS_DIRTY_CACHE_ENTRIES"); + + count = 0; + ok (commit_iter_dirty_cache_entries (c, cache_count_raw_cb, &count) == 0, + "commit_iter_dirty_cache_entries works for dirty cache entries"); + + ok (count == 0, + "correct number of cache entries were raw"); + + ok (commit_process (c, 1, root_ref) == COMMIT_PROCESS_FINISHED, + "commit_process returns COMMIT_PROCESS_FINISHED"); + + ok ((newroot = commit_get_newroot_ref (c)) != NULL, + "commit_get_newroot_ref returns != NULL when processing complete"); + + verify_value (cache, newroot, "val", "smallstr"); + + commit_mgr_remove_commit (cm, c); + + /* next commit a big value, to make sure it is flagged raw in the + * cache */ + memset (bigstr, '\0', bigstrsize); for (i = 0; i < bigstrsize - 1; i++) bigstr[i] = 'a'; - create_ready_commit (cm, "fence1", "val", bigstr, 0); + create_ready_commit (cm, "fence2", "val", bigstr, 0); ok ((c = commit_mgr_get_ready_commit (cm)) != NULL, "commit_mgr_get_ready_commit returns ready commit"); @@ -1463,9 +1505,14 @@ void commit_process_big_fileval (void) { ok (commit_process (c, 1, root_ref) == COMMIT_PROCESS_DIRTY_CACHE_ENTRIES, "commit_process returns COMMIT_PROCESS_DIRTY_CACHE_ENTRIES"); - ok (commit_iter_dirty_cache_entries (c, cache_noop_cb, NULL) == 0, + count = 0; + ok (commit_iter_dirty_cache_entries (c, cache_count_raw_cb, &count) == 0, "commit_iter_dirty_cache_entries works for dirty cache entries"); + /* this entry should be raw, b/c large val converted into valref */ + ok (count == 1, + "correct number of cache entries were raw"); + ok (commit_process (c, 1, root_ref) == COMMIT_PROCESS_FINISHED, "commit_process returns COMMIT_PROCESS_FINISHED"); From 9428d33cbf2984461f7d8028e686088ae5e5216b Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Wed, 27 Sep 2017 06:56:35 -0700 Subject: [PATCH 8/9] modules/kvs: Update main KVS module for raw data Update lookup, commit, and KVS cache calls to handle potential for data being raw or json. The major change being valref treeobj objects pointing to raw unencoded data in the content store. Fixes #1187 --- src/modules/kvs/kvs.c | 125 ++++++++++++++++++++++++++++++------------ 1 file changed, 91 insertions(+), 34 deletions(-) diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index 4fd89e92a590..56616a78e599 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -38,6 +38,7 @@ #include #include +#include "src/common/libutil/base64.h" #include "src/common/libutil/blobref.h" #include "src/common/libutil/monotime.h" #include "src/common/libutil/tstat.h" @@ -168,7 +169,6 @@ static kvs_ctx_t *getctx (flux_t *h) static void content_load_completion (flux_future_t *f, void *arg) { kvs_ctx_t *ctx = arg; - json_t *o; const void *data; int size; const char *blobref; @@ -179,10 +179,6 @@ static void content_load_completion (flux_future_t *f, void *arg) goto done; } blobref = flux_future_aux_get (f, "ref"); - if (!(o = json_loads ((char *)data, JSON_DECODE_ANY, NULL))) { - flux_log_error (ctx->h, "%s: json_loads", __FUNCTION__); - goto done; - } /* should be impossible for lookup to fail, cache entry created * earlier, and cache_expire_entries() could not have removed it * b/c it is not yet valid. But check and log incase there is @@ -193,22 +189,45 @@ static void content_load_completion (flux_future_t *f, void *arg) goto done; } - /* This is a pretty terrible error case, where we've loaded an - * object from the content store, but can't put it in the cache. + /* If cache_entry_set_json() or cache_entry_set_raw() fail, it's a + * pretty terrible error case, where we've loaded an object from + * the content store, but can't put it in the cache. * * If there was a waiter on this cache entry waiting for it to be * valid, the load() will ultimately hang. The caller will * timeout or eventually give up, so the KVS can continue along - * its merry way. So we just log this error. + * its merry way. So we just log the error. * * If this is the result of a synchronous call to load(), there * should be no waiters on this cache entry. load() will handle * this error scenario appropriately. */ - if (cache_entry_set_json (hp, o) < 0) { - flux_log_error (ctx->h, "%s: cache_entry_set_json", __FUNCTION__); - goto done; + if (cache_entry_is_type_raw (hp)) { + char *datacpy; + + if (!(datacpy = malloc (size))) { + flux_log_error (ctx->h, "%s: malloc", __FUNCTION__); + goto done; + } + memcpy (datacpy, data, size); + + if (cache_entry_set_raw (hp, datacpy, size) < 0) { + flux_log_error (ctx->h, "%s: cache_entry_set_raw", __FUNCTION__); + goto done; + } } + else { + json_t *o; + if (!(o = json_loads ((char *)data, JSON_DECODE_ANY, NULL))) { + flux_log_error (ctx->h, "%s: json_loads", __FUNCTION__); + goto done; + } + if (cache_entry_set_json (hp, o) < 0) { + flux_log_error (ctx->h, "%s: cache_entry_set_json", __FUNCTION__); + goto done; + } + } + done: flux_future_destroy (f); } @@ -241,8 +260,12 @@ static int content_load_request_send (kvs_ctx_t *ctx, const href_t ref) return -1; } -/* Return 0 on success, -1 on error. Set stall variable appropriately */ -static int load (kvs_ctx_t *ctx, const href_t ref, wait_t *wait, bool *stall) +/* Return 0 on success, -1 on error. is_raw indicates if data being + * loaded is raw data, so we know how to place it in the cache. Set + * stall variable appropriately + */ +static int load (kvs_ctx_t *ctx, const href_t ref, bool is_raw, wait_t *wait, + bool *stall) { struct cache_entry *hp = cache_lookup (ctx->cache, ref, ctx->epoch); int saved_errno, ret; @@ -252,9 +275,19 @@ static int load (kvs_ctx_t *ctx, const href_t ref, wait_t *wait, bool *stall) /* Create an incomplete hash entry if none found. */ if (!hp) { - if (!(hp = cache_entry_create ())) { - flux_log_error (ctx->h, "%s: cache_entry_create", __FUNCTION__); - return -1; + if (is_raw) { + if (!(hp = cache_entry_create_raw (NULL, 0))) { + flux_log_error (ctx->h, "%s: cache_entry_create_raw", + __FUNCTION__); + return -1; + } + } + else { + if (!(hp = cache_entry_create_json (NULL))) { + flux_log_error (ctx->h, "%s: cache_entry_create_json", + __FUNCTION__); + return -1; + } } cache_insert (ctx->cache, ref, hp); if (content_load_request_send (ctx, ref) < 0) { @@ -353,20 +386,30 @@ static void content_store_completion (flux_future_t *f, void *arg) (void)content_store_get (f, arg); } -static int content_store_request_send (kvs_ctx_t *ctx, json_t *val, - bool now) +/* is_raw indicates if void *data is json or raw data. 'len' is + * ignored if it is json. + */ +static int content_store_request_send (kvs_ctx_t *ctx, void *data, int len, + bool is_raw, bool now) { flux_future_t *f; - char *data = NULL; + char *dataout = NULL; + char *dataout_cpy = NULL; int size; int saved_errno, rc = -1; - if (!(data = kvs_util_json_dumps (val))) - goto error; - - size = strlen (data) + 1; + if (is_raw) { + dataout = data; + size = len; + } + else { + if (!(dataout_cpy = kvs_util_json_dumps ((json_t *)data))) + goto error; + dataout = dataout_cpy; + size = strlen (dataout) + 1; + } - if (!(f = flux_content_store (ctx->h, data, size, 0))) + if (!(f = flux_content_store (ctx->h, dataout, size, 0))) goto error; if (now) { if (content_store_get (f, ctx) < 0) @@ -380,7 +423,7 @@ static int content_store_request_send (kvs_ctx_t *ctx, json_t *val, rc = 0; error: - free (data); + free (dataout_cpy); return rc; } @@ -410,7 +453,10 @@ static int commit_load_cb (commit_t *c, const char *ref, void *data) struct commit_cb_data *cbd = data; bool stall; - if (load (cbd->ctx, ref, cbd->wait, &stall) < 0) { + /* is_raw flag is always false on commit loads, we will never + * load raw data from the content store, only tree objects. + */ + if (load (cbd->ctx, ref, false, cbd->wait, &stall) < 0) { cbd->errnum = errno; flux_log_error (cbd->ctx->h, "%s: load", __FUNCTION__); return -1; @@ -427,11 +473,22 @@ static int commit_load_cb (commit_t *c, const char *ref, void *data) static int commit_cache_cb (commit_t *c, struct cache_entry *hp, void *data) { struct commit_cb_data *cbd = data; + void *storedata; + int storedatalen = 0; + bool is_raw; assert (cache_entry_get_dirty (hp)); + is_raw = cache_entry_is_type_raw (hp); + if (is_raw) + storedata = cache_entry_get_raw (hp, &storedatalen); + else + storedata = cache_entry_get_json (hp); + if (content_store_request_send (cbd->ctx, - cache_entry_get_json (hp), + storedata, + storedatalen, + is_raw, false) < 0) { cbd->errnum = errno; flux_log_error (cbd->ctx->h, "%s: content_store_request_send", @@ -721,14 +778,14 @@ static void get_request_cb (flux_t *h, flux_msg_handler_t *w, if (!lookup (lh)) { const char *missing_ref; - bool stall; + bool ref_raw, stall; - missing_ref = lookup_get_missing_ref (lh); + missing_ref = lookup_get_missing_ref (lh, &ref_raw); assert (missing_ref); if (!(wait = wait_create_msg_handler (h, w, msg, get_request_cb, lh))) goto done; - if (load (ctx, missing_ref, wait, &stall) < 0) { + if (load (ctx, missing_ref, ref_raw, wait, &stall) < 0) { flux_log_error (h, "%s: load", __FUNCTION__); goto done; } @@ -829,14 +886,14 @@ static void watch_request_cb (flux_t *h, flux_msg_handler_t *w, if (!lookup (lh)) { const char *missing_ref; - bool stall; + bool ref_raw, stall; - missing_ref = lookup_get_missing_ref (lh); + missing_ref = lookup_get_missing_ref (lh, &ref_raw); assert (missing_ref); if (!(wait = wait_create_msg_handler (h, w, msg, watch_request_cb, lh))) goto done; - if (load (ctx, missing_ref, wait, &stall) < 0) { + if (load (ctx, missing_ref, ref_raw, wait, &stall) < 0) { flux_log_error (h, "%s: load", __FUNCTION__); goto done; } @@ -1595,7 +1652,7 @@ static int store_initial_rootdir (kvs_ctx_t *ctx, json_t *o, href_t ref) assert (ret == 1); goto done_error; } - if (content_store_request_send (ctx, o, true) < 0) { + if (content_store_request_send (ctx, o, 0, false, true) < 0) { /* Must clean up, don't want cache entry to be assumed * valid. Everything here is synchronous and w/o waiters, * so nothing should error here */ From 3e3c6ed00a366fda96664a8fa23f85810d1a0db3 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Thu, 28 Sep 2017 11:58:40 -0700 Subject: [PATCH 9/9] test/kvs: Add kvs raw unit tests Add tests to ensure data written to content store in raw form via the kvs can be read outside of the KVS. Also verify that valref tree objects can be put into the KVS to point to data already in the content store. --- t/t1002-kvs-extra.t | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/t/t1002-kvs-extra.t b/t/t1002-kvs-extra.t index a596f3812e50..2c96799c04f5 100755 --- a/t/t1002-kvs-extra.t +++ b/t/t1002-kvs-extra.t @@ -270,6 +270,30 @@ test_expect_success 'kvs: kvsdir_get_size works' ' test "$OUTPUT" = "3" ' +# kvs reads/writes of raw data to/from content store work + +# largevalhash includes string quotes and NUL char at end +largeval="abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" +largevalhash="sha1-79da8e5c9dbe65c6460377d3f09b8f535ceb7d9d" + +test_expect_success 'kvs: large put stores raw data into content store' ' + flux kvs unlink -Rf $TEST && + flux kvs put $TEST.largeval=$largeval && + ${KVSBASIC} get-treeobj $TEST.largeval | grep -q \"valref\" && + ${KVSBASIC} get-treeobj $TEST.largeval | grep -q ${largevalhash} && + flux content load ${largevalhash} | grep $largeval +' + +# TODO - convert to using "flux content store", see issue1216 +test_expect_success 'kvs: valref that points to content store data can be read' ' + flux kvs unlink -Rf $TEST && + flux kvs put $TEST.largeval=$largeval && + ${KVSBASIC} put-treeobj $TEST.largeval2="{\"data\":[\"${largevalhash}\"],\"type\":\"valref\",\"ver\":1}" && + flux kvs get $TEST.largeval2 | grep $largeval +' + +# dtree tests + test_expect_success 'kvs: store 16x3 directory tree' ' ${FLUX_BUILD_DIR}/t/kvs/dtree -h3 -w16 --prefix $TEST.dtree '