Skip to content

Commit

Permalink
libkz: avoid setting up watch until blocked
Browse files Browse the repository at this point in the history
Since KVS watches are costly, avoid setting one up
until any existing data in the stream has been
read and consumed.  If EOF is reached, then the
KVS watch is avoided entirely.
  • Loading branch information
garlick committed Apr 4, 2018
1 parent 53f084b commit 7dcf93c
Showing 1 changed file with 94 additions and 23 deletions.
117 changes: 94 additions & 23 deletions src/common/libkz/kz.c
Original file line number Diff line number Diff line change
Expand Up @@ -341,41 +341,69 @@ int kz_close (kz_t *kz)
return rc;
}

/* Handle response for lookup of next block (kz->seq).
* Notify user, who should call kz_get() or kz_get_json() to consume it.
*/
static void lookup_continuation (flux_future_t *f, void *arg)
{
kz_t *kz = arg;

assert (f == kz->lookup_f);

if (!kz->ready_cb)
return;

if (kz->ready_cb)
kz->ready_cb (kz, kz->ready_arg);

if (kz->lookup_f) {
flux_future_destroy (kz->lookup_f);
kz->lookup_f = NULL;
if (kz->lookup_f != NULL) {
flux_log (kz->h, LOG_ERR, "%s: %s unclaimed data - fatal error",
__FUNCTION__, flux_kvs_lookup_get_key (f));
errno = EINVAL;
flux_reactor_stop_error (flux_get_reactor (kz->h));
}
/* If last block of this stream has been handled,
* disable the KVS watcher and don't call lookup_next().
* Expect the user to call kz_close() soon.
* disable the KVS watcher (if any) as we're done.
* Otherwise, go get the next block.
*/
if (kz->eof) {
if (kz->watching) {
const char *key = clear_key (kz);
(void)flux_kvs_unwatch (kz->h, key);
kz->watching = false;
}
return;
}
(void)lookup_next (kz);
}

/* Notification of change in stream directory.
*/
static int kvswatch_cb (const char *dir_key, flux_kvsdir_t *dir,
void *arg, int errnum)
{
kz_t *kz = arg;

if (errnum == ENOENT)
kz->last_dir_size = 0;
else if (errnum == 0)
kz->last_dir_size = flux_kvsdir_get_size (dir);
else {
(void)lookup_next (kz);
flux_log (kz->h, LOG_ERR, "%s: %s", __FUNCTION__, strerror (errnum));
return -1;
}
if (lookup_next (kz) < 0)
return -1;
return 0;
}

/* Send request to lookup the next block (kz->seq).
* If kz->last_dir_size blocks have already been consumed,
* install a KVS watch to notify us when more blocks are available
* (unless already at EOF).
*/
static int lookup_next (kz_t *kz)
{
if (kz->lookup_f == NULL && kz->seq < kz->last_dir_size) {
if (kz->lookup_f != NULL)
return 0;

if (kz->seq < kz->last_dir_size) {
const char *key = format_key (kz, kz->seq);
if (!(kz->lookup_f = flux_kvs_lookup (kz->h, 0, key))) {
flux_log_error (kz->h, "%s: seq=%d flux_kvs_lookup",
Expand All @@ -390,24 +418,61 @@ static int lookup_next (kz_t *kz)
return -1;
}
}
/* EOF not yet reached, but all known blocks have been consumed.
* Time to KVS watch the stream directory for more entries.
*/
else if (!kz->eof) {
if (!kz->watching) {
kz->watching = true;
const char *key = clear_key (kz);
flux_log (kz->h, LOG_DEBUG, "%s: watch %s", __FUNCTION__, key);
if (flux_kvs_watch_dir (kz->h, kvswatch_cb, kz, "%s", key) < 0)
return -1;
}
}
return 0;
}

static int kvswatch_cb (const char *dir_key, flux_kvsdir_t *dir,
void *arg, int errnum)
/* Handle response containing kz->last_dir_size.
* Initiate next request (or install a KVS watcher) in lookup_next().
*/
static void lookup_dir_continuation (flux_future_t *f, void *arg)
{
kz_t *kz = arg;
const flux_kvsdir_t *dir;

if (errnum == ENOENT)
assert (f == kz->lookup_f);

if (flux_kvs_lookup_get_dir (f, &dir) < 0)
kz->last_dir_size = 0;
else if (errnum == 0)
else
kz->last_dir_size = flux_kvsdir_get_size (dir);
else {
flux_log (kz->h, LOG_ERR, "%s: %s", __FUNCTION__, strerror (errnum));

flux_future_destroy (kz->lookup_f);
kz->lookup_f = NULL;

(void)lookup_next (kz);
}

/* Send request to lookup kz->last_dir_size.
*/
static int lookup_dir (kz_t *kz)
{
if (kz->lookup_f != NULL)
return 0;

const char *key = clear_key (kz);
if (!(kz->lookup_f = flux_kvs_lookup (kz->h, FLUX_KVS_READDIR, key))) {
flux_log_error (kz->h, "%s: flux_kvs_lookup", __FUNCTION__);
return -1;
}
if (lookup_next (kz) < 0)
if (flux_future_then (kz->lookup_f, -1.,
lookup_dir_continuation, kz) < 0) {
flux_log_error (kz->h, "%s: flux_future_then", __FUNCTION__);
flux_future_destroy (kz->lookup_f);
kz->lookup_f = NULL;
return -1;
}
return 0;
}

Expand All @@ -419,13 +484,19 @@ int kz_set_ready_cb (kz_t *kz, kz_ready_f ready_cb, void *arg)
}
kz->ready_cb = ready_cb;
kz->ready_arg = arg;
const char *key = clear_key (kz);
if (kz->ready_cb != NULL && !kz->watching) {
if (flux_kvs_watch_dir (kz->h, kvswatch_cb, kz, "%s", key) < 0)

/* Callback registration.
* Begin looking up stream directory, continued in lookup_continuation().
*/
if (kz->ready_cb != NULL) {
if (lookup_dir (kz) < 0)
return -1;
kz->watching = true;
}
if (kz->ready_cb == NULL && kz->watching) {
/* Callback de-registration.
* Unwire KVS watcher, if any.
*/
else if (kz->ready_cb == NULL && kz->watching) {
const char *key = clear_key (kz);
if (flux_kvs_unwatch (kz->h, key) < 0)
return -1;
kz->watching = false;
Expand Down

0 comments on commit 7dcf93c

Please sign in to comment.