diff --git a/src/common/libkz/kz.c b/src/common/libkz/kz.c index 979813eebd33..9e0fbc4f7e02 100644 --- a/src/common/libkz/kz.c +++ b/src/common/libkz/kz.c @@ -341,25 +341,26 @@ int kz_close (kz_t *kz) return rc; } +/* Handle response for lookup of next block (kz->seq). + * Notify user, who should call kz_get() or kz_get_json() to consume it. + */ static void lookup_continuation (flux_future_t *f, void *arg) { kz_t *kz = arg; assert (f == kz->lookup_f); - if (!kz->ready_cb) - return; - if (kz->ready_cb) kz->ready_cb (kz, kz->ready_arg); - - if (kz->lookup_f) { - flux_future_destroy (kz->lookup_f); - kz->lookup_f = NULL; + if (kz->lookup_f != NULL) { + flux_log (kz->h, LOG_ERR, "%s: %s unclaimed data - fatal error", + __FUNCTION__, flux_kvs_lookup_get_key (f)); + errno = EINVAL; + flux_reactor_stop_error (flux_get_reactor (kz->h)); } /* If last block of this stream has been handled, - * disable the KVS watcher and don't call lookup_next(). - * Expect the user to call kz_close() soon. + * disable the KVS watcher (if any) as we're done. + * Otherwise, go get the next block. */ if (kz->eof) { if (kz->watching) { @@ -367,15 +368,42 @@ static void lookup_continuation (flux_future_t *f, void *arg) (void)flux_kvs_unwatch (kz->h, key); kz->watching = false; } + return; } + (void)lookup_next (kz); +} + +/* Notification of change in stream directory. + */ +static int kvswatch_cb (const char *dir_key, flux_kvsdir_t *dir, + void *arg, int errnum) +{ + kz_t *kz = arg; + + if (errnum == ENOENT) + kz->last_dir_size = 0; + else if (errnum == 0) + kz->last_dir_size = flux_kvsdir_get_size (dir); else { - (void)lookup_next (kz); + flux_log (kz->h, LOG_ERR, "%s: %s", __FUNCTION__, strerror (errnum)); + return -1; } + if (lookup_next (kz) < 0) + return -1; + return 0; } +/* Send request to lookup the next block (kz->seq). + * If kz->last_dir_size blocks have already been consumed, + * install a KVS watch to notify us when more blocks are available + * (unless already at EOF). + */ static int lookup_next (kz_t *kz) { - if (kz->lookup_f == NULL && kz->seq < kz->last_dir_size) { + if (kz->lookup_f != NULL) + return 0; + + if (kz->seq < kz->last_dir_size) { const char *key = format_key (kz, kz->seq); if (!(kz->lookup_f = flux_kvs_lookup (kz->h, 0, key))) { flux_log_error (kz->h, "%s: seq=%d flux_kvs_lookup", @@ -390,24 +418,61 @@ static int lookup_next (kz_t *kz) return -1; } } + /* EOF not yet reached, but all known blocks have been consumed. + * Time to KVS watch the stream directory for more entries. + */ + else if (!kz->eof) { + if (!kz->watching) { + kz->watching = true; + const char *key = clear_key (kz); + flux_log (kz->h, LOG_DEBUG, "%s: watch %s", __FUNCTION__, key); + if (flux_kvs_watch_dir (kz->h, kvswatch_cb, kz, "%s", key) < 0) + return -1; + } + } return 0; } -static int kvswatch_cb (const char *dir_key, flux_kvsdir_t *dir, - void *arg, int errnum) +/* Handle response containing kz->last_dir_size. + * Initiate next request (or install a KVS watcher) in lookup_next(). + */ +static void lookup_dir_continuation (flux_future_t *f, void *arg) { kz_t *kz = arg; + const flux_kvsdir_t *dir; - if (errnum == ENOENT) + assert (f == kz->lookup_f); + + if (flux_kvs_lookup_get_dir (f, &dir) < 0) kz->last_dir_size = 0; - else if (errnum == 0) + else kz->last_dir_size = flux_kvsdir_get_size (dir); - else { - flux_log (kz->h, LOG_ERR, "%s: %s", __FUNCTION__, strerror (errnum)); + + flux_future_destroy (kz->lookup_f); + kz->lookup_f = NULL; + + (void)lookup_next (kz); +} + +/* Send request to lookup kz->last_dir_size. + */ +static int lookup_dir (kz_t *kz) +{ + if (kz->lookup_f != NULL) + return 0; + + const char *key = clear_key (kz); + if (!(kz->lookup_f = flux_kvs_lookup (kz->h, FLUX_KVS_READDIR, key))) { + flux_log_error (kz->h, "%s: flux_kvs_lookup", __FUNCTION__); return -1; } - if (lookup_next (kz) < 0) + if (flux_future_then (kz->lookup_f, -1., + lookup_dir_continuation, kz) < 0) { + flux_log_error (kz->h, "%s: flux_future_then", __FUNCTION__); + flux_future_destroy (kz->lookup_f); + kz->lookup_f = NULL; return -1; + } return 0; } @@ -419,13 +484,19 @@ int kz_set_ready_cb (kz_t *kz, kz_ready_f ready_cb, void *arg) } kz->ready_cb = ready_cb; kz->ready_arg = arg; - const char *key = clear_key (kz); - if (kz->ready_cb != NULL && !kz->watching) { - if (flux_kvs_watch_dir (kz->h, kvswatch_cb, kz, "%s", key) < 0) + + /* Callback registration. + * Begin looking up stream directory, continued in lookup_continuation(). + */ + if (kz->ready_cb != NULL) { + if (lookup_dir (kz) < 0) return -1; - kz->watching = true; } - if (kz->ready_cb == NULL && kz->watching) { + /* Callback de-registration. + * Unwire KVS watcher, if any. + */ + else if (kz->ready_cb == NULL && kz->watching) { + const char *key = clear_key (kz); if (flux_kvs_unwatch (kz->h, key) < 0) return -1; kz->watching = false;