Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KVS: Support valref pointing to zero length blob objects #1237

Merged
merged 8 commits into from
Oct 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 81 additions & 34 deletions src/modules/kvs/cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ struct cache_entry {
waitqueue_t *waitlist_valid;
void *data; /* value object/data */
int len;
bool data_valid; /* flag indicating if data set, don't use
* data == NULL as test */
cache_data_type_t type; /* what does data point to */
int lastuse_epoch; /* time of last use for cache expiry */
uint8_t dirty:1;
Expand All @@ -62,25 +64,38 @@ struct cache {
zhash_t *zh;
};

struct cache_entry *cache_entry_create (void)
struct cache_entry *cache_entry_create (cache_data_type_t t)
{
struct cache_entry *hp = calloc (1, sizeof (*hp));
if (!hp) {
struct cache_entry *hp;

if (t != CACHE_DATA_TYPE_NONE
&& t != CACHE_DATA_TYPE_JSON
&& t != CACHE_DATA_TYPE_RAW) {
errno = EINVAL;
return NULL;
}

if (!(hp = calloc (1, sizeof (*hp)))) {
errno = ENOMEM;
return NULL;
}
hp->type = CACHE_DATA_TYPE_NONE;
hp->type = t;
return hp;
}

struct cache_entry *cache_entry_create_json (json_t *o)
{
struct cache_entry *hp = cache_entry_create ();
if (!hp)
struct cache_entry *hp;

if (!o) {
errno = EINVAL;
return NULL;
if (o)
hp->data = o;
hp->type = CACHE_DATA_TYPE_JSON;
}

if (!(hp = cache_entry_create (CACHE_DATA_TYPE_JSON)))
return NULL;
hp->data = o;
hp->data_valid = true;
return hp;
}

Expand All @@ -93,13 +108,15 @@ struct cache_entry *cache_entry_create_raw (void *data, int len)
return NULL;
}

if (!(hp = cache_entry_create ()))
if (!(hp = cache_entry_create (CACHE_DATA_TYPE_RAW)))
return NULL;

if (data) {
hp->data = data;
hp->len = len;
}
hp->type = CACHE_DATA_TYPE_RAW;
/* true even if data == NULL */
hp->data_valid = true;
return hp;
}

Expand All @@ -125,17 +142,17 @@ bool cache_entry_is_type_raw (struct cache_entry *hp)

bool cache_entry_get_valid (struct cache_entry *hp)
{
return (hp && hp->data != NULL);
return (hp && hp->data_valid);
}

bool cache_entry_get_dirty (struct cache_entry *hp)
{
return (hp && hp->data && hp->dirty);
return (hp && hp->data_valid && hp->dirty);
}

int cache_entry_set_dirty (struct cache_entry *hp, bool val)
{
if (hp && hp->data) {
if (hp && hp->data_valid) {
if ((val && hp->dirty) || (!val && !hp->dirty))
; /* no-op */
else if (val && !hp->dirty)
Expand All @@ -157,7 +174,7 @@ int cache_entry_set_dirty (struct cache_entry *hp, bool val)

int cache_entry_clear_dirty (struct cache_entry *hp)
{
if (hp && hp->data) {
if (hp && hp->data_valid) {
if (hp->dirty
&& (!hp->waitlist_notdirty
|| !wait_queue_length (hp->waitlist_notdirty)))
Expand All @@ -169,7 +186,7 @@ int cache_entry_clear_dirty (struct cache_entry *hp)

int cache_entry_force_clear_dirty (struct cache_entry *hp)
{
if (hp && hp->data) {
if (hp && hp->data_valid) {
if (hp->dirty) {
if (hp->waitlist_notdirty) {
wait_queue_destroy (hp->waitlist_notdirty);
Expand All @@ -184,44 +201,50 @@ int cache_entry_force_clear_dirty (struct cache_entry *hp)

json_t *cache_entry_get_json (struct cache_entry *hp)
{
if (!hp || !hp->data || hp->type != CACHE_DATA_TYPE_JSON)
if (!hp || !hp->data_valid || hp->type != CACHE_DATA_TYPE_JSON)
return NULL;
/* should be non-NULL for json */
assert (hp->data);
return hp->data;
}

int cache_entry_set_json (struct cache_entry *hp, json_t *o)
{
if (hp
&& o
&& (hp->type == CACHE_DATA_TYPE_NONE
|| hp->type == CACHE_DATA_TYPE_JSON)) {
if ((o && hp->data) || (!o && !hp->data)) {
if (hp->data_valid) {
assert (hp->data);
json_decref (o); /* no-op, 'o' is assumed identical to hp->data */
} else if (o && !hp->data) {
} else {
assert (!hp->data);
hp->data = o;
hp->data_valid = true;
if (hp->waitlist_valid) {
if (wait_runqueue (hp->waitlist_valid) < 0) {
/* set back to orig */
hp->data = NULL;
hp->data_valid = false;
return -1;
}
}
} else if (!o && hp->data) {
json_decref (hp->data);
hp->data = NULL;
}
hp->type = CACHE_DATA_TYPE_JSON;
return 0;
}
return -1;
}

void *cache_entry_get_raw (struct cache_entry *hp, int *len)
int cache_entry_get_raw (struct cache_entry *hp, void **data, int *len)
{
if (!hp || !hp->data || hp->type != CACHE_DATA_TYPE_RAW)
return NULL;
if (!hp || !hp->data_valid || hp->type != CACHE_DATA_TYPE_RAW)
return -1;
if (data)
(*data) = hp->data;
if (len)
(*len) = hp->len;
return hp->data;
return 0;
}

int cache_entry_set_raw (struct cache_entry *hp, void *data, int len)
Expand All @@ -234,35 +257,59 @@ int cache_entry_set_raw (struct cache_entry *hp, void *data, int len)
if (hp
&& (hp->type == CACHE_DATA_TYPE_NONE
|| hp->type == CACHE_DATA_TYPE_RAW)) {
if ((data && hp->data) || (!data && !hp->data)) {
free (data); /* no-op, 'data' is assumed identical to hp->data */
} else if (data && !hp->data) {
if (hp->data_valid) {
if ((data && hp->data) || (!data && !hp->data))
free (data); /* no-op, 'data' is assumed identical to hp->data */
else {
/* attempt to change already valid cache entry,
* cannot, must call cache_entry_clear_data() */
errno = EBADE;
return -1;
}
}
else {
hp->data = data;
hp->len = len;
hp->data_valid = true;

if (hp->waitlist_valid) {
if (wait_runqueue (hp->waitlist_valid) < 0) {
/* set back to orig */
hp->data = NULL;
hp->len = 0;
hp->data_valid = false;
return -1;
}
}
} else if (!data && hp->data) {
free (hp->data);
hp->data = NULL;
hp->len = 0;
}
hp->type = CACHE_DATA_TYPE_RAW;
return 0;
}
return -1;
}

int cache_entry_clear_data (struct cache_entry *hp)
{
if (hp) {
if (hp->data) {
if (hp->type == CACHE_DATA_TYPE_JSON)
json_decref (hp->data);
else if (hp->type == CACHE_DATA_TYPE_RAW)
free (hp->data);
}
hp->data = NULL;
hp->len = 0;
hp->data_valid = false;
return 0;
}
return -1;
}

void cache_entry_destroy (void *arg)
{
struct cache_entry *hp = arg;
if (hp) {
if (hp->data) {
if (hp->data_valid) {
if (hp->type == CACHE_DATA_TYPE_JSON)
json_decref (hp->data);
else if (hp->type == CACHE_DATA_TYPE_RAW)
Expand Down
42 changes: 29 additions & 13 deletions src/modules/kvs/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,28 @@ struct cache;
/* Create/destroy cache entry.
*
* cache_entry_create() creates an entry, setting the cache entry type
* to CACHE_DATA_TYPE_NONE.
* to specified type. CACHE_DATA_TYPE_NONE indicates user is not yet
* sure of the type of data to be stored, and it will be determined
* later when cache_entry_set_X() function is called.
* cache_entry_get_valid() will return false after
* cache_entry_create() is initially called, regardless of the type
* passed in.
*
* cache_entry_create_json() creates an entry, setting the cache entry
* type to CACHE_DATA_TYPE_JSON. The create transfers ownership of
* 'o' to the cache entry. On destroy, json_decref() will be called
* on 'o'. If 'o' is NULL, no data is set, but the type is still set
* to CACHE_DATA_TYPE_JSON and only json can be used for the entry.
* on 'o'. 'o' cannot be NULL.
*
* cache_entry_create_raw() creates an entry, setting the cache entry
* type to CACHE_DATA_TYPE_RAW. The create transfers ownership of
* 'data' to the cache entry. On destroy, free() will be called on
* 'data'. If 'data' is NULL, no data is set, but the type is still
* set to CACHE_DATA_TYPE_RAW and only raw can be used for the entry.
* 'data'. If 'data' is NULL, 'len' must be zero. If 'data' is
* non-NULL, 'len' must be > 0.
*
* cache_entry_get_valid() will return true on entries when
* cache_entry_create_json() and cache_entry_get_raw() return success.
*/
struct cache_entry *cache_entry_create (void);
struct cache_entry *cache_entry_create (cache_data_type_t t);
struct cache_entry *cache_entry_create_json (json_t *o);
struct cache_entry *cache_entry_create_raw (void *data, int len);
void cache_entry_destroy (void *arg);
Expand Down Expand Up @@ -89,26 +96,35 @@ int cache_entry_force_clear_dirty (struct cache_entry *hp);
*
* json set accessor must have type of CACHE_DATA_TYPE_NONE or
* CACHE_DATA_TYPE_JSON to set json object. After setting, the type
* is converted to CACHE_DATA_TYPE_JSON. If non-NULL, set transfers
* ownership of 'o' to the cache entry.
* is converted to CACHE_DATA_TYPE_JSON. 'o' must be non-NULL. Set
* transfers ownership of 'o' to the cache entry.
*
* raw set accessor must have type of CACHE_DATA_TYPE_NONE or
* CACHE_DATA_TYPE_RAW to set raw data. After setting, the type is
* converted to CACHE_DATA_TYPE_RAW. If non-NULL, set transfers
* ownership of 'data' to the cache entry.
* converted to CACHE_DATA_TYPE_RAW. If 'data' is NULL, 'len' must be
* zero. If 'data' is non-NULL, 'len' must be > 0. If non-NULL, set
* transfers ownership of 'data' to the cache entry.
*
* cache_entry_clear_data () will clear any data in the entry.
*
* An invalid->valid transition runs the entry's wait queue, if any in
* both set accessors.
*
* cache_entry_set_json() & cache_entry_set_raw() returns -1 on error,
* 0 on success
* Generally speaking, a cache entry can only bet set once. If you
* wish to set it again, you must run cache_entry_clear_data() before
* doing so.
*
* cache_entry_set_json() & cache_entry_set_raw() &
* cache_entry_clear_data() returns -1 on error, 0 on success
*/
json_t *cache_entry_get_json (struct cache_entry *hp);
int cache_entry_set_json (struct cache_entry *hp, json_t *o);

void *cache_entry_get_raw (struct cache_entry *hp, int *len);
int cache_entry_get_raw (struct cache_entry *hp, void **data, int *len);
int cache_entry_set_raw (struct cache_entry *hp, void *data, int len);

int cache_entry_clear_data (struct cache_entry *hp);

/* Arrange for message handler represented by 'wait' to be restarted
* once cache entry becomes valid or not dirty at completion of a
* load or store RPC.
Expand Down
2 changes: 1 addition & 1 deletion src/modules/kvs/commit.c
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ static int store_cache (commit_t *c, int current_epoch, json_t *o,
}
}
if (!(hp = cache_lookup (c->cm->cache, ref, current_epoch))) {
if (!(hp = cache_entry_create ())) {
if (!(hp = cache_entry_create (CACHE_DATA_TYPE_NONE))) {
saved_errno = ENOMEM;
goto done;
}
Expand Down
28 changes: 16 additions & 12 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,15 @@ static void content_load_completion (flux_future_t *f, void *arg)
* this error scenario appropriately.
*/
if (cache_entry_is_type_raw (hp)) {
char *datacpy;
char *datacpy = NULL;

if (!(datacpy = malloc (size))) {
flux_log_error (ctx->h, "%s: malloc", __FUNCTION__);
goto done;
if (size) {
if (!(datacpy = malloc (size))) {
flux_log_error (ctx->h, "%s: malloc", __FUNCTION__);
goto done;
}
memcpy (datacpy, data, size);
}
memcpy (datacpy, data, size);

if (cache_entry_set_raw (hp, datacpy, size) < 0) {
flux_log_error (ctx->h, "%s: cache_entry_set_raw", __FUNCTION__);
Expand Down Expand Up @@ -276,15 +278,15 @@ static int load (kvs_ctx_t *ctx, const href_t ref, bool is_raw, wait_t *wait,
*/
if (!hp) {
if (is_raw) {
if (!(hp = cache_entry_create_raw (NULL, 0))) {
flux_log_error (ctx->h, "%s: cache_entry_create_raw",
if (!(hp = cache_entry_create (CACHE_DATA_TYPE_RAW))) {
flux_log_error (ctx->h, "%s: cache_entry_create",
__FUNCTION__);
return -1;
}
}
else {
if (!(hp = cache_entry_create_json (NULL))) {
flux_log_error (ctx->h, "%s: cache_entry_create_json",
if (!(hp = cache_entry_create (CACHE_DATA_TYPE_JSON))) {
flux_log_error (ctx->h, "%s: cache_entry_create",
__FUNCTION__);
return -1;
}
Expand Down Expand Up @@ -481,7 +483,7 @@ static int commit_cache_cb (commit_t *c, struct cache_entry *hp, void *data)

is_raw = cache_entry_is_type_raw (hp);
if (is_raw)
storedata = cache_entry_get_raw (hp, &storedatalen);
cache_entry_get_raw (hp, &storedata, &storedatalen);
else
storedata = cache_entry_get_json (hp);

Expand Down Expand Up @@ -1628,14 +1630,16 @@ static int store_initial_rootdir (kvs_ctx_t *ctx, json_t *o, href_t ref)
goto decref_done;
}
if (!(hp = cache_lookup (ctx->cache, ref, ctx->epoch))) {
if (!(hp = cache_entry_create ())) {
if (!(hp = cache_entry_create (CACHE_DATA_TYPE_JSON))) {
saved_errno = errno;
flux_log_error (ctx->h, "%s: cache_entry_create", __FUNCTION__);
flux_log_error (ctx->h, "%s: cache_entry_create_json_empty",
__FUNCTION__);
goto decref_done;
}
cache_insert (ctx->cache, ref, hp);
}
if (!cache_entry_get_valid (hp)) {
assert (o);
if (cache_entry_set_json (hp, o) < 0) {
saved_errno = errno;
flux_log_error (ctx->h, "%s: cache_entry_set_json",
Expand Down
Loading