Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KVS oom() and xzmalloc() cleanup #1124

Merged
merged 22 commits into from
Jul 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
becb797
modules/kvs: Remove unused function declaration
chu11 Jul 24, 2017
f8c4a0c
modules/kvs: Add/cleanup flux log error messages
chu11 Jul 24, 2017
3f9c499
modules/kvs: Minor code cleanup
chu11 Jul 24, 2017
222474b
modules/kvs: Remove basic oom() calls
chu11 Jul 21, 2017
7c997ea
modules/kvs: Replace basic alloc calls that exit
chu11 Jul 21, 2017
4585b47
modules/kvs: Remove oom() from kvs_util
chu11 Jul 21, 2017
f61c273
modules/kvs: Remove oom() in cache functions.
chu11 Jul 21, 2017
0947f76
modules/kvs: Add cache_get_stats() unit tests
chu11 Jul 21, 2017
ec09960
modules/kvs: Fix oom in cache entry list push
chu11 Jul 22, 2017
f3d03da
modules/kvs: Remove oom() in fileval_big()
chu11 Jul 22, 2017
af5cb80
modules/kvs: Refactor load() function
chu11 Jul 22, 2017
67a5b43
modules/kvs: Remove oom() from wait_queue_create()
chu11 Jul 21, 2017
18fc0ac
common/libkvs: Return ENOMEM from j_dirent_create
chu11 Jul 23, 2017
91044ce
modules/kvs: Cleanup oom responses in commit API
chu11 Jul 23, 2017
93fca8f
modules/kvs: Do not oom() in cache_entry_create()
chu11 Jul 23, 2017
695112b
modules/kvs: Remove kvs_util_json_copydir()
chu11 Jul 23, 2017
f09e588
modules/kvs: Remove oom() from wait_addqueue()
chu11 Jul 21, 2017
0f8e647
modules/kvs: Remove oom() from wait_destroy_msg()
chu11 Jul 23, 2017
297f310
modules/kvs: Add cache_entry_set_json unit test
chu11 Jul 25, 2017
b82fbfa
modules/kvs: Update comments for commit_unroll()
chu11 Jul 25, 2017
7b0f6f2
modules/kvs: Simplify kvs_util_json_dumps()
chu11 Jul 25, 2017
e464830
modules/kvs: Add kvs_util_json_encoded_size()
chu11 Jul 25, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions src/common/libkvs/jansson_dirent.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,32 @@
#include <jansson.h>

#include "src/common/libutil/blobref.h"
#include "src/common/libutil/oom.h"

#include "jansson_dirent.h"

json_t *j_dirent_create (const char *type, void *arg)
{
json_t *dirent;
json_t *dirent = NULL;
bool valid_type = false;

if (!(dirent = json_object ()))
oom ();
if (!(dirent = json_object ())) {
errno = ENOMEM;
goto error;
}

if (!strcmp (type, "FILEREF") || !strcmp (type, "DIRREF")) {
char *ref = arg;
json_t *o;

if (!(o = json_string (ref)))
oom ();
if (json_object_set_new (dirent, type, o) < 0)
oom ();
if (!(o = json_string (ref))) {
errno = ENOMEM;
goto error;
}
if (json_object_set_new (dirent, type, o) < 0) {
json_decref (o);
errno = ENOMEM;
goto error;
}

valid_type = true;
} else if (!strcmp (type, "FILEVAL") || !strcmp (type, "DIRVAL")
Expand All @@ -60,16 +66,25 @@ json_t *j_dirent_create (const char *type, void *arg)
if (val)
json_incref (val);
else {
if (!(val = json_object ()))
oom ();
if (!(val = json_object ())) {
errno = ENOMEM;
goto error;
}
}
if (json_object_set_new (dirent, type, val) < 0) {
json_decref (val);
errno = ENOMEM;
goto error;
}
if (json_object_set_new (dirent, type, val) < 0)
oom ();
valid_type = true;
}
assert (valid_type == true);

return dirent;

error:
json_decref (dirent);
return NULL;
}

int j_dirent_validate (json_t *dirent)
Expand Down
79 changes: 53 additions & 26 deletions src/modules/kvs/cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,11 @@
#include <jansson.h>

#include "src/common/libutil/blobref.h"
#include "src/common/libutil/xzmalloc.h"
#include "src/common/libutil/tstat.h"
#include "src/common/libutil/log.h"
#include "src/common/libutil/iterators.h"
#include "src/common/libutil/oom.h"

#include "waitqueue.h"
#include "kvs_util.h"
#include "cache.h"

struct cache_entry {
Expand All @@ -65,7 +62,11 @@ struct cache {

struct cache_entry *cache_entry_create (json_t *o)
{
struct cache_entry *hp = xzmalloc (sizeof (*hp));
struct cache_entry *hp = calloc (1, sizeof (*hp));
if (!hp) {
errno = ENOMEM;
return NULL;
}
if (o)
hp->o = o;
return hp;
Expand Down Expand Up @@ -145,22 +146,30 @@ void cache_entry_destroy (void *arg)
}
}

void cache_entry_wait_notdirty (struct cache_entry *hp, wait_t *wait)
int cache_entry_wait_notdirty (struct cache_entry *hp, wait_t *wait)
{
if (wait) {
if (!hp->waitlist_notdirty)
hp->waitlist_notdirty = wait_queue_create ();
wait_addqueue (hp->waitlist_notdirty, wait);
if (!hp->waitlist_notdirty) {
if (!(hp->waitlist_notdirty = wait_queue_create ()))
return -1;
}
if (wait_addqueue (hp->waitlist_notdirty, wait) < 0)
return -1;
}
return 0;
}

void cache_entry_wait_valid (struct cache_entry *hp, wait_t *wait)
int cache_entry_wait_valid (struct cache_entry *hp, wait_t *wait)
{
if (wait) {
if (!hp->waitlist_valid)
hp->waitlist_valid = wait_queue_create ();
wait_addqueue (hp->waitlist_valid, wait);
if (!hp->waitlist_valid) {
if (!(hp->waitlist_valid = wait_queue_create ()))
return -1;
}
if (wait_addqueue (hp->waitlist_valid, wait) < 0)
return -1;
}
return 0;
}

struct cache_entry *cache_lookup (struct cache *cache, const char *ref,
Expand Down Expand Up @@ -224,8 +233,10 @@ int cache_expire_entries (struct cache *cache, int current_epoch, int thresh)
struct cache_entry *hp;
int count = 0;

if (!(keys = zhash_keys (cache->zh)))
oom ();
if (!(keys = zhash_keys (cache->zh))) {
errno = ENOMEM;
return -1;
}
while ((ref = zlist_pop (keys))) {
if ((hp = zhash_lookup (cache->zh, ref))
&& !cache_entry_get_dirty (hp)
Expand All @@ -240,26 +251,32 @@ int cache_expire_entries (struct cache *cache, int current_epoch, int thresh)
return count;
}

void cache_get_stats (struct cache *cache, tstat_t *ts, int *sizep,
int *incompletep, int *dirtyp)
int cache_get_stats (struct cache *cache, tstat_t *ts, int *sizep,
int *incompletep, int *dirtyp)
{
zlist_t *keys;
zlist_t *keys = NULL;
struct cache_entry *hp;
char *ref;
int size = 0;
int incomplete = 0;
int dirty = 0;
int rc = -1;

if (!(keys = zhash_keys (cache->zh)))
oom ();
if (!(keys = zhash_keys (cache->zh))) {
errno = ENOMEM;
goto cleanup;
}
while ((ref = zlist_pop (keys))) {
hp = zhash_lookup (cache->zh, ref);
if (cache_entry_get_valid (hp)) {
/* must pass JSON_ENCODE_ANY, object could be anything */
char *s = json_dumps (hp->o, JSON_ENCODE_ANY);
if (!s)
oom ();
int obj_size = strlen (s);
int obj_size;
if (!s) {
errno = ENOMEM;
goto cleanup;
}
obj_size = strlen (s);
free (s);
size += obj_size;
tstat_push (ts, obj_size);
Expand All @@ -269,13 +286,16 @@ void cache_get_stats (struct cache *cache, tstat_t *ts, int *sizep,
dirty++;
free (ref);
}
zlist_destroy (&keys);
if (sizep)
*sizep = size;
if (incompletep)
*incompletep = incomplete;
if (dirtyp)
*dirtyp = dirty;
rc = 0;
cleanup:
zlist_destroy (&keys);
return rc;
}

int cache_wait_destroy_msg (struct cache *cache, wait_test_msg_f cb, void *arg)
Expand Down Expand Up @@ -304,9 +324,16 @@ int cache_wait_destroy_msg (struct cache *cache, wait_test_msg_f cb, void *arg)

struct cache *cache_create (void)
{
struct cache *cache = xzmalloc (sizeof (*cache));
if (!(cache->zh = zhash_new ()))
oom ();
struct cache *cache = calloc (1, sizeof (*cache));
if (!cache) {
errno = ENOMEM;
return NULL;
}
if (!(cache->zh = zhash_new ())) {
free (cache);
errno = ENOMEM;
return NULL;
}
return cache;
}

Expand Down
12 changes: 7 additions & 5 deletions src/modules/kvs/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ int cache_entry_clear_dirty (struct cache_entry *hp);
*/
json_t *cache_entry_get_json (struct cache_entry *hp);
void cache_entry_set_json (struct cache_entry *hp, json_t *o);
int cache_entry_clear_json (struct cache_entry *hp);

/* Arrange for message handler represented by 'wait' to be restarted
* once cache entry becomes valid or not dirty at completion of a
* load or store RPC.
* Returns -1 on error, 0 on success
*/
void cache_entry_wait_notdirty (struct cache_entry *hp, wait_t *wait);
void cache_entry_wait_valid (struct cache_entry *hp, wait_t *wait);
int cache_entry_wait_notdirty (struct cache_entry *hp, wait_t *wait);
int cache_entry_wait_valid (struct cache_entry *hp, wait_t *wait);

/* Create/destroy the cache container and its contents.
*/
Expand Down Expand Up @@ -93,13 +93,15 @@ int cache_count_entries (struct cache *cache);

/* Expire cache entries that are not dirty, not incomplete, and last
* used more than 'thresh' epoch's ago.
* Returns -1 on error, expired count on success.
*/
int cache_expire_entries (struct cache *cache, int current_epoch, int thresh);

/* Obtain statistics on the cache.
* Returns -1 on error, 0 on success
*/
void cache_get_stats (struct cache *cache, tstat_t *ts, int *size,
int *incomplete, int *dirty);
int cache_get_stats (struct cache *cache, tstat_t *ts, int *size,
int *incomplete, int *dirty);

/* Destroy wait_t's on the waitqueue_t of any cache entry
* if they meet match criteria.
Expand Down
Loading