Skip to content

Commit

Permalink
modules/kvs/commit: Support commit append
Browse files Browse the repository at this point in the history
Support ability to append to already existing KVS values.

Add new append unit tests.
  • Loading branch information
chu11 committed Oct 31, 2017
1 parent 571f753 commit fa41127
Show file tree
Hide file tree
Showing 2 changed files with 369 additions and 4 deletions.
152 changes: 148 additions & 4 deletions src/modules/kvs/commit.c
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,13 @@ static int store_cache (commit_t *c, int current_epoch, json_t *o,
flux_log_error (c->cm->h, "base64_decode_block");
goto done;
}
/* len from base64_decode_length() always > 0 b/c of NUL byte,
* but len after base64_decode_block() can be zero. Adjust if
* necessary. */
if (!len) {
free (data);
data = NULL;
}
blobref_hash (c->cm->hash_name, data, len, ref, sizeof (href_t));
}
else {
Expand Down Expand Up @@ -369,6 +376,133 @@ static int commit_unroll (commit_t *c, int current_epoch, json_t *dir)
return 0;
}

static int commit_val_data_to_cache (commit_t *c, int current_epoch,
json_t *val, href_t ref)
{
struct cache_entry *hp;
json_t *val_data;
int ret;

if (!(val_data = treeobj_get_data (val)))
return -1;

if ((ret = store_cache (c, current_epoch, val_data,
true, ref, &hp)) < 0)
return -1;

if (ret) {
if (zlist_push (c->dirty_cache_entries_list, hp) < 0) {
commit_cleanup_dirty_cache_entry (c, hp);
errno = ENOMEM;
return -1;
}
}

return 0;
}

static int commit_append (commit_t *c, int current_epoch, json_t *dirent,
json_t *dir, const char *final_name)
{
json_t *entry;

if (!treeobj_is_val (dirent)) {
errno = EPROTO;
return -1;
}

entry = treeobj_get_entry (dir, final_name);

if (!entry) {
/* entry not found, treat like normal insertion */
if (treeobj_insert_entry (dir, final_name, dirent) < 0)
return -1;
}
else if (treeobj_is_valref (entry)) {
href_t ref;
json_t *cpy;

/* treeobj is valref, so we need to append the new data's
* blobref to this tree object. Before doing so, we must save
* off the new data to the cache and mark it dirty for
* flushing later (if necessary)
*
* Note that we make a copy of the original entry and
* re-insert it into the directory. We do not want to
* accidentally alter any json object pointers that may be
* sitting in the KVS cache.
*/

if (commit_val_data_to_cache (c, current_epoch,
dirent, ref) < 0)
return -1;

if (!(cpy = treeobj_deep_copy (entry)))
return -1;

if (treeobj_append_blobref (cpy, ref) < 0) {
json_decref (cpy);
return -1;
}

if (treeobj_insert_entry (dir, final_name, cpy) < 0) {
json_decref (cpy);
return -1;
}
}
else if (treeobj_is_val (entry)) {
json_t *tmp;
href_t ref1, ref2;

/* treeobj entry is val, so we need to convert the treeobj
* into a valref first. Then the procedure is basically the
* same as the treeobj valref case above.
*/

if (commit_val_data_to_cache (c, current_epoch,
entry, ref1) < 0)
return -1;

if (commit_val_data_to_cache (c, current_epoch,
dirent, ref2) < 0)
return -1;

if (!(tmp = treeobj_create_valref (ref1)))
return -1;

if (treeobj_append_blobref (tmp, ref2) < 0) {
json_decref (tmp);
return -1;
}

if (treeobj_insert_entry (dir, final_name, tmp) < 0) {
json_decref (tmp);
return -1;
}
}
else if (treeobj_is_symlink (entry)) {
/* Could use EPERM - operation not permitted, but want to
* avoid confusion with "common" errnos, we'll use this one
* instead. */
errno = EOPNOTSUPP;
return -1;
}
else if (treeobj_is_dir (entry)
|| treeobj_is_dirref (entry)) {
errno = EISDIR;
return -1;
}
else {
char *s = json_dumps (entry, 0);
flux_log (c->cm->h, LOG_ERR, "%s: corrupt treeobj: %s",
__FUNCTION__, s);
free (s);
errno = ENOTRECOVERABLE;
return -1;
}
return 0;
}

/* link (key, dirent) into directory 'dir'.
*/
static int commit_link_dirent (commit_t *c, int current_epoch,
Expand Down Expand Up @@ -509,12 +643,22 @@ static int commit_link_dirent (commit_t *c, int current_epoch,
name = next;
dir = subdir;
}
/* This is the final path component of the key. Add it to the directory.
/* This is the final path component of the key. Add/modify/delete
* it in the directory.
*/
if (!json_is_null (dirent)) {
if (treeobj_insert_entry (dir, name, dirent) < 0) {
saved_errno = errno;
goto done;
if (flags & FLUX_KVS_APPEND) {
if (commit_append (c, current_epoch, dirent, dir, name) < 0) {
saved_errno = errno;
goto done;
}
}
else {
/* if not append, it's a normal insertion */
if (treeobj_insert_entry (dir, name, dirent) < 0) {
saved_errno = errno;
goto done;
}
}
}
else {
Expand Down
Loading

0 comments on commit fa41127

Please sign in to comment.