Skip to content

Commit

Permalink
Merge pull request #1274 from garlick/cache_copy
Browse files Browse the repository at this point in the history
kvs: [cleanup] improve isolation in internal cache
  • Loading branch information
chu11 authored Nov 8, 2017
2 parents 4456339 + ba26ea2 commit 14943b8
Show file tree
Hide file tree
Showing 8 changed files with 295 additions and 501 deletions.
2 changes: 1 addition & 1 deletion src/common/libkvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ int flux_kvs_wait_version (flux_t *h, int version)
if (!(f = flux_rpc_pack (h, "kvs.sync", FLUX_NODEID_ANY, 0, "{ s:i }",
"rootseq", version)))
goto done;
/* N.B. response contains (rootseq, rootdir) but we don't need it.
/* N.B. response contains (rootseq, rootref) but we don't need it.
*/
if (flux_future_get (f, NULL) < 0)
goto done;
Expand Down
105 changes: 29 additions & 76 deletions src/modules/kvs/cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,41 +149,42 @@ int cache_entry_get_raw (struct cache_entry *hp, const void **data,
return 0;
}

int cache_entry_set_raw (struct cache_entry *hp, void *data, int len)
int cache_entry_set_raw (struct cache_entry *hp, const void *data, int len)
{
if ((data && len <= 0) || (!data && len)) {
void *cpy = NULL;

if (!hp || (data && len <= 0) || (!data && len)) {
errno = EINVAL;
return -1;
}

if (hp) {
if (hp->valid) {
if ((data && hp->data) || (!data && !hp->data))
free (data); /* no-op, 'data' is assumed identical to hp->data */
else {
/* attempt to change already valid cache entry */
errno = EBADE;
return -1;
}
}
else {
hp->data = data;
hp->len = len;
hp->valid = true;

if (hp->waitlist_valid) {
if (wait_runqueue (hp->waitlist_valid) < 0) {
/* set back to orig */
hp->data = NULL;
hp->len = 0;
hp->valid = false;
return -1;
}
}
/* It should be a no-op if the entry is already set.
* However, as a sanity check, make sure proposed and existing values match.
*/
if (hp->valid) {
if (len != hp->len || memcmp (data, hp->data, len) != 0) {
errno = EBADE;
return -1;
}
return 0;
}
errno = EINVAL;
if (len > 0) {
if (!(cpy = malloc (len)))
return -1;
memcpy (cpy, data, len);
}
hp->data = cpy;
hp->len = len;
hp->valid = true;
if (hp->waitlist_valid) {
if (wait_runqueue (hp->waitlist_valid) < 0)
goto reset_invalid;
}
return 0;
reset_invalid:
free (hp->data);
hp->data = NULL;
hp->len = 0;
hp->valid = false;
return -1;
}

Expand All @@ -198,54 +199,6 @@ const json_t *cache_entry_get_treeobj (struct cache_entry *hp)
return hp->o;
}

int cache_entry_set_treeobj (struct cache_entry *hp, json_t *o)
{
if (hp && o) {
if (hp->valid) {
if (!hp->data) {
/* attempt to change already valid cache entry */
errno = EBADE;
return -1;
}
else
/* no-op, 'o' is assumed identical to current data */
json_decref (o);

assert (hp->data);
assert (hp->len > 0);
} else {
assert (!hp->data);
assert (!hp->o);

if (treeobj_validate (o) < 0) {
errno = EINVAL;
return -1;
}

if (!(hp->data = treeobj_encode (o)))
return -1;

hp->len = strlen (hp->data);
hp->o = o;
hp->valid = true;

if (hp->waitlist_valid) {
if (wait_runqueue (hp->waitlist_valid) < 0) {
/* set back to orig */
hp->data = NULL;
hp->len = 0;
hp->o = NULL;
hp->valid = false;
return -1;
}
}
}
return 0;
}
errno = EINVAL;
return -1;
}

void cache_entry_destroy (void *arg)
{
struct cache_entry *hp = arg;
Expand Down
21 changes: 5 additions & 16 deletions src/modules/kvs/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ struct cache;
/* Create/destroy cache entry.
*
* cache_entry_create() creates an empty cache entry. Data can be set
* in an entry via cache_entry_set_raw() or cache_entry_set_treeobj().
* in an entry via cache_entry_set_raw().
*/
struct cache_entry *cache_entry_create (void);
void cache_entry_destroy (void *arg);
Expand Down Expand Up @@ -55,12 +55,6 @@ int cache_entry_force_clear_dirty (struct cache_entry *hp);
* if it is non-NULL. If 'data' is non-NULL, 'len' must be > 0. If
* 'data' is NULL, 'len' must be zero.
*
* treeobj set accessor is a convenience function that will take a treeobj
* object and extract the raw data string from it and store that in
* the cache entry. The treeobj object 'o' is also cached internally for
* later retrieval. The create transfers ownership of 'o' to the
* cache entry. 'o' must be non-NULL.
*
* treeobj get accessor is a convenience function that will return the
* treeobj object equivalent of the raw data stored internally. If the
* internal raw data is not a valid treeobj object (i.e. improperly
Expand All @@ -70,21 +64,16 @@ int cache_entry_force_clear_dirty (struct cache_entry *hp);
* both set accessors.
*
* Generally speaking, a cache entry can only be set once. An attempt
* to set new data in a cache entry will silently succeed. A buffer
* passed to cache_entry_set_raw() will be freed for a cache entry
* that already has data stored. A treeobj object passed to
* cache_entry_set_treeobj() will be json_decref()'d for a cache entry
* that alrdady has data stored.
* to set new data in a cache entry will silently succeed.
*
* cache_entry_set_raw() & cache_entry_set_treeobj() &
* cache_entry_clear_data() returns -1 on error, 0 on success
* cache_entry_set_raw() & cache_entry_clear_data()
* return -1 on error, 0 on success
*/
int cache_entry_get_raw (struct cache_entry *hp, const void **data,
int *len);
int cache_entry_set_raw (struct cache_entry *hp, void *data, int len);
int cache_entry_set_raw (struct cache_entry *hp, const void *data, int len);

const json_t *cache_entry_get_treeobj (struct cache_entry *hp);
int cache_entry_set_treeobj (struct cache_entry *hp, json_t *o);

/* Arrange for message handler represented by 'wait' to be restarted
* once cache entry becomes valid or not dirty at completion of a
Expand Down
72 changes: 29 additions & 43 deletions src/modules/kvs/commit.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ const char *commit_get_newroot_ref (commit_t *c)
* blobref in the cache, any waiters for a valid cache entry would
* have been satisfied when the dirty cache entry was put onto
* this dirty cache list (i.e. in store_cache() below when
* cache_entry_set_treeobj() was called).
* cache_entry_set_raw() was called).
*/
void commit_cleanup_dirty_cache_entry (commit_t *c, struct cache_entry *hp)
{
Expand Down Expand Up @@ -206,7 +206,7 @@ static int store_cache (commit_t *c, int current_epoch, json_t *o,
bool is_raw, href_t ref, struct cache_entry **hpp)
{
struct cache_entry *hp;
int saved_errno, rc = -1;
int saved_errno, rc;
const char *xdata;
char *data = NULL;
int xlen, len;
Expand All @@ -216,15 +216,13 @@ static int store_cache (commit_t *c, int current_epoch, json_t *o,
xlen = strlen (xdata);
len = base64_decode_length (xlen);
if (!(data = malloc (len))) {
saved_errno = errno;
flux_log_error (c->cm->h, "malloc");
goto done;
goto error;
}
if (base64_decode_block (data, &len, xdata, xlen) < 0) {
free (data);
saved_errno = errno;
flux_log_error (c->cm->h, "base64_decode_block");
goto done;
errno = EPROTO;
goto error;
}
/* len from base64_decode_length() always > 0 b/c of NUL byte,
* but len after base64_decode_block() can be zero. Adjust if
Expand All @@ -233,66 +231,54 @@ static int store_cache (commit_t *c, int current_epoch, json_t *o,
free (data);
data = NULL;
}
blobref_hash (c->cm->hash_name, data, len, ref, sizeof (href_t));
}
else {
if (treeobj_hash (c->cm->hash_name, o, ref, sizeof (href_t)) < 0) {
saved_errno = errno;
flux_log_error (c->cm->h, "treeobj_hash");
goto done;
if (treeobj_validate (o) < 0 || !(data = treeobj_encode (o))) {
flux_log_error (c->cm->h, "%s: treeobj_encode", __FUNCTION__);
goto error;
}
len = strlen (data);
}
if (blobref_hash (c->cm->hash_name, data, len, ref, sizeof (href_t)) < 0) {
flux_log_error (c->cm->h, "%s: blobref_hash", __FUNCTION__);
goto error;
}
if (!(hp = cache_lookup (c->cm->cache, ref, current_epoch))) {
if (!(hp = cache_entry_create ())) {
saved_errno = ENOMEM;
goto done;
flux_log_error (c->cm->h, "%s: cache_entry_create", __FUNCTION__);
goto error;
}
cache_insert (c->cm->cache, ref, hp);
}
if (cache_entry_get_valid (hp)) {
c->cm->noop_stores++;
if (is_raw)
free (data);
rc = 0;
} else {
if (is_raw) {
if (cache_entry_set_raw (hp, data, len) < 0) {
int ret;
saved_errno = errno;
free (data);
ret = cache_remove_entry (c->cm->cache, ref);
assert (ret == 1);
goto done;
}
}
else {
json_incref (o);
if (cache_entry_set_treeobj (hp, o) < 0) {
int ret;
saved_errno = errno;
json_decref (o);
ret = cache_remove_entry (c->cm->cache, ref);
assert (ret == 1);
goto done;
}
}
else {
if (cache_entry_set_raw (hp, data, len) < 0) {
int ret;
ret = cache_remove_entry (c->cm->cache, ref);
assert (ret == 1);
goto error;
}
if (cache_entry_set_dirty (hp, true) < 0) {
/* cache entry now owns data, cache_remove_entry
* will decref/free object/data */
flux_log_error (c->cm->h, "%s: cache_entry_set_dirty",__FUNCTION__);
int ret;
saved_errno = errno;
ret = cache_remove_entry (c->cm->cache, ref);
assert (ret == 1);
goto done;
goto error;
}
rc = 1;
}
*hpp = hp;
free (data);
return rc;

done:
error:
saved_errno = errno;
free (data);
errno = saved_errno;
return rc;
return -1;
}

/* Store DIRVAL objects, converting them to DIRREFs.
Expand Down
Loading

0 comments on commit 14943b8

Please sign in to comment.