diff --git a/src/modules/kvs/commit.c b/src/modules/kvs/commit.c index 6a77c7ee826b..951ed1f5ed78 100644 --- a/src/modules/kvs/commit.c +++ b/src/modules/kvs/commit.c @@ -226,6 +226,13 @@ static int store_cache (commit_t *c, int current_epoch, json_t *o, flux_log_error (c->cm->h, "base64_decode_block"); goto done; } + /* len from base64_decode_length() always > 0 b/c of NUL byte, + * but len after base64_decode_block() can be zero. Adjust if + * necessary. */ + if (!len) { + free (data); + data = NULL; + } blobref_hash (c->cm->hash_name, data, len, ref, sizeof (href_t)); } else { @@ -369,6 +376,133 @@ static int commit_unroll (commit_t *c, int current_epoch, json_t *dir) return 0; } +static int commit_val_data_to_cache (commit_t *c, int current_epoch, + json_t *val, href_t ref) +{ + struct cache_entry *hp; + json_t *val_data; + int ret; + + if (!(val_data = treeobj_get_data (val))) + return -1; + + if ((ret = store_cache (c, current_epoch, val_data, + true, ref, &hp)) < 0) + return -1; + + if (ret) { + if (zlist_push (c->dirty_cache_entries_list, hp) < 0) { + commit_cleanup_dirty_cache_entry (c, hp); + errno = ENOMEM; + return -1; + } + } + + return 0; +} + +static int commit_append (commit_t *c, int current_epoch, json_t *dirent, + json_t *dir, const char *final_name) +{ + json_t *entry; + + if (!treeobj_is_val (dirent)) { + errno = EPROTO; + return -1; + } + + entry = treeobj_get_entry (dir, final_name); + + if (!entry) { + /* entry not found, treat like normal insertion */ + if (treeobj_insert_entry (dir, final_name, dirent) < 0) + return -1; + } + else if (treeobj_is_valref (entry)) { + href_t ref; + json_t *cpy; + + /* treeobj is valref, so we need to append the new data's + * blobref to this tree object. Before doing so, we must save + * off the new data to the cache and mark it dirty for + * flushing later (if necessary) + * + * Note that we make a copy of the original entry and + * re-insert it into the directory. We do not want to + * accidentally alter any json object pointers that may be + * sitting in the KVS cache. + */ + + if (commit_val_data_to_cache (c, current_epoch, + dirent, ref) < 0) + return -1; + + if (!(cpy = treeobj_deep_copy (entry))) + return -1; + + if (treeobj_append_blobref (cpy, ref) < 0) { + json_decref (cpy); + return -1; + } + + if (treeobj_insert_entry (dir, final_name, cpy) < 0) { + json_decref (cpy); + return -1; + } + } + else if (treeobj_is_val (entry)) { + json_t *tmp; + href_t ref1, ref2; + + /* treeobj entry is val, so we need to convert the treeobj + * into a valref first. Then the procedure is basically the + * same as the treeobj valref case above. + */ + + if (commit_val_data_to_cache (c, current_epoch, + entry, ref1) < 0) + return -1; + + if (commit_val_data_to_cache (c, current_epoch, + dirent, ref2) < 0) + return -1; + + if (!(tmp = treeobj_create_valref (ref1))) + return -1; + + if (treeobj_append_blobref (tmp, ref2) < 0) { + json_decref (tmp); + return -1; + } + + if (treeobj_insert_entry (dir, final_name, tmp) < 0) { + json_decref (tmp); + return -1; + } + } + else if (treeobj_is_symlink (entry)) { + /* Could use EPERM - operation not permitted, but want to + * avoid confusion with "common" errnos, we'll use this one + * instead. */ + errno = EOPNOTSUPP; + return -1; + } + else if (treeobj_is_dir (entry) + || treeobj_is_dirref (entry)) { + errno = EISDIR; + return -1; + } + else { + char *s = json_dumps (entry, 0); + flux_log (c->cm->h, LOG_ERR, "%s: corrupt treeobj: %s", + __FUNCTION__, s); + free (s); + errno = ENOTRECOVERABLE; + return -1; + } + return 0; +} + /* link (key, dirent) into directory 'dir'. */ static int commit_link_dirent (commit_t *c, int current_epoch, @@ -509,12 +643,22 @@ static int commit_link_dirent (commit_t *c, int current_epoch, name = next; dir = subdir; } - /* This is the final path component of the key. Add it to the directory. + /* This is the final path component of the key. Add/modify/delete + * it in the directory. */ if (!json_is_null (dirent)) { - if (treeobj_insert_entry (dir, name, dirent) < 0) { - saved_errno = errno; - goto done; + if (flags & FLUX_KVS_APPEND) { + if (commit_append (c, current_epoch, dirent, dir, name) < 0) { + saved_errno = errno; + goto done; + } + } + else { + /* if not append, it's a normal insertion */ + if (treeobj_insert_entry (dir, name, dirent) < 0) { + saved_errno = errno; + goto done; + } } } else { diff --git a/src/modules/kvs/test/commit.c b/src/modules/kvs/test/commit.c index ed96fa187955..a31736ef7297 100644 --- a/src/modules/kvs/test/commit.c +++ b/src/modules/kvs/test/commit.c @@ -18,6 +18,22 @@ static int test_global = 5; +/* convenience function */ +static struct cache_entry *create_cache_entry_raw (void *data, int len) +{ + struct cache_entry *hp; + int ret; + + assert (data); + assert (len); + + hp = cache_entry_create (); + assert (hp); + ret = cache_entry_set_raw (hp, data, len); + assert (ret == 0); + return hp; +} + /* convenience function */ static struct cache_entry *create_cache_entry_json (json_t *o) { @@ -1753,6 +1769,209 @@ void commit_process_giant_dir (void) cache_destroy (cache); } +void commit_process_append (void) +{ + struct cache *cache; + int count = 0; + commit_mgr_t *cm; + commit_t *c; + json_t *root; + href_t valref_ref; + href_t root_ref; + const char *newroot; + + ok ((cache = cache_create ()) != NULL, + "cache_create works"); + + /* This root is + * + * valref_ref + * "ABCD" + * + * root_ref + * "val" : val to "abcd" + * "valref" : valref to valref_ref + */ + + blobref_hash ("sha1", "ABCD", 4, valref_ref, sizeof (href_t)); + cache_insert (cache, valref_ref, create_cache_entry_raw (strdup ("ABCD"), 4)); + + root = treeobj_create_dir (); + treeobj_insert_entry (root, "val", treeobj_create_val ("abcd", 4)); + treeobj_insert_entry (root, "valref", treeobj_create_val ("ABCD", 4)); + + ok (kvs_util_json_hash ("sha1", root, root_ref) == 0, + "kvs_util_json_hash worked"); + + cache_insert (cache, root_ref, create_cache_entry_json (root)); + + ok ((cm = commit_mgr_create (cache, "sha1", NULL, &test_global)) != NULL, + "commit_mgr_create works"); + + /* + * first test, append to a treeobj val + */ + + create_ready_commit (cm, "fence1", "val", "efgh", FLUX_KVS_APPEND, 0); + + ok ((c = commit_mgr_get_ready_commit (cm)) != NULL, + "commit_mgr_get_ready_commit returns ready commit"); + + ok (commit_process (c, 1, root_ref) == COMMIT_PROCESS_DIRTY_CACHE_ENTRIES, + "commit_process returns COMMIT_PROCESS_DIRTY_CACHE_ENTRIES"); + + count = 0; + ok (commit_iter_dirty_cache_entries (c, cache_count_dirty_cb, &count) == 0, + "commit_iter_dirty_cache_entries works for dirty cache entries"); + + /* 3 dirty entries, raw "abcd", raw "efgh", and a new root b/c val + * has been changed into a valref. */ + ok (count == 3, + "correct number of cache entries were dirty"); + + ok (commit_process (c, 1, root_ref) == COMMIT_PROCESS_FINISHED, + "commit_process returns COMMIT_PROCESS_FINISHED"); + + ok ((newroot = commit_get_newroot_ref (c)) != NULL, + "commit_get_newroot_ref returns != NULL when processing complete"); + + verify_value (cache, newroot, "val", "abcdefgh"); + + commit_mgr_remove_commit (cm, c); + + /* + * second test, append to a treeobj valref + */ + + create_ready_commit (cm, "fence2", "valref", "EFGH", FLUX_KVS_APPEND, 0); + + ok ((c = commit_mgr_get_ready_commit (cm)) != NULL, + "commit_mgr_get_ready_commit returns ready commit"); + + ok (commit_process (c, 1, root_ref) == COMMIT_PROCESS_DIRTY_CACHE_ENTRIES, + "commit_process returns COMMIT_PROCESS_DIRTY_CACHE_ENTRIES"); + + count = 0; + ok (commit_iter_dirty_cache_entries (c, cache_count_dirty_cb, &count) == 0, + "commit_iter_dirty_cache_entries works for dirty cache entries"); + + /* 2 dirty entries, raw "EFGH", and a new root b/c valref has an + * additional blobref */ + ok (count == 2, + "correct number of cache entries were dirty"); + + ok (commit_process (c, 1, root_ref) == COMMIT_PROCESS_FINISHED, + "commit_process returns COMMIT_PROCESS_FINISHED"); + + ok ((newroot = commit_get_newroot_ref (c)) != NULL, + "commit_get_newroot_ref returns != NULL when processing complete"); + + verify_value (cache, newroot, "valref", "ABCDEFGH"); + + commit_mgr_remove_commit (cm, c); + + /* + * third test, append to a non-existent value, it's like an insert + */ + + create_ready_commit (cm, "fence3", "newval", "foobar", FLUX_KVS_APPEND, 0); + + ok ((c = commit_mgr_get_ready_commit (cm)) != NULL, + "commit_mgr_get_ready_commit returns ready commit"); + + ok (commit_process (c, 1, root_ref) == COMMIT_PROCESS_DIRTY_CACHE_ENTRIES, + "commit_process returns COMMIT_PROCESS_DIRTY_CACHE_ENTRIES"); + + count = 0; + ok (commit_iter_dirty_cache_entries (c, cache_count_dirty_cb, &count) == 0, + "commit_iter_dirty_cache_entries works for dirty cache entries"); + + /* 1 dirty entries, root simply has a new val in it */ + ok (count == 1, + "correct number of cache entries were dirty"); + + ok (commit_process (c, 1, root_ref) == COMMIT_PROCESS_FINISHED, + "commit_process returns COMMIT_PROCESS_FINISHED"); + + ok ((newroot = commit_get_newroot_ref (c)) != NULL, + "commit_get_newroot_ref returns != NULL when processing complete"); + + verify_value (cache, newroot, "newval", "foobar"); + + commit_mgr_remove_commit (cm, c); + + commit_mgr_destroy (cm); + cache_destroy (cache); +} + +void commit_process_append_errors (void) +{ + struct cache *cache; + commit_mgr_t *cm; + commit_t *c; + json_t *root; + href_t root_ref; + + ok ((cache = cache_create ()) != NULL, + "cache_create works"); + + /* This root is + * + * root_ref + * "dir" : empty directory + * "symlink" : symlink to "dir" + */ + + root = treeobj_create_dir (); + treeobj_insert_entry (root, "dir", treeobj_create_dir ()); + treeobj_insert_entry (root, "symlink", treeobj_create_symlink ("dir")); + + ok (kvs_util_json_hash ("sha1", root, root_ref) == 0, + "kvs_util_json_hash worked"); + + cache_insert (cache, root_ref, create_cache_entry_json (root)); + + ok ((cm = commit_mgr_create (cache, "sha1", NULL, &test_global)) != NULL, + "commit_mgr_create works"); + + /* + * append to a dir, should get EISDIR + */ + + create_ready_commit (cm, "fence1", "dir", "1", FLUX_KVS_APPEND, 0); + + ok ((c = commit_mgr_get_ready_commit (cm)) != NULL, + "commit_mgr_get_ready_commit returns ready commit"); + + ok (commit_process (c, 1, root_ref) == COMMIT_PROCESS_ERROR, + "commit_process returns COMMIT_PROCESS_ERROR"); + + ok (commit_get_errnum (c) == EISDIR, + "commit_get_errnum return EISDIR"); + + commit_mgr_remove_commit (cm, c); + + /* + * append to a symlink, should get EOPNOTSUPP + */ + + create_ready_commit (cm, "fence2", "symlink", "2", FLUX_KVS_APPEND, 0); + + ok ((c = commit_mgr_get_ready_commit (cm)) != NULL, + "commit_mgr_get_ready_commit returns ready commit"); + + ok (commit_process (c, 1, root_ref) == COMMIT_PROCESS_ERROR, + "commit_process returns COMMIT_PROCESS_ERROR"); + + ok (commit_get_errnum (c) == EOPNOTSUPP, + "commit_get_errnum return EOPNOTSUPP"); + + commit_mgr_remove_commit (cm, c); + + commit_mgr_destroy (cm); + cache_destroy (cache); +} + int main (int argc, char *argv[]) { plan (NO_PLAN); @@ -1782,6 +2001,8 @@ int main (int argc, char *argv[]) commit_process_bad_dirrefs (); commit_process_big_fileval (); commit_process_giant_dir (); + commit_process_append (); + commit_process_append_errors (); done_testing (); return (0);