Skip to content

Commit

Permalink
kvs-watch: do not load all KVS blobs at once
Browse files Browse the repository at this point in the history
Problem: With the FLUX_KVS_WATCH_APPEND and FLUX_KVS_STREAM
flags, all content blobs in a valref treeobj will be retrieved
from the content store at once.  If the valref array is gigantic,
this may be a very costly initial transaction.  Not to mention, we
probably wouldn't want to send an extremely large number (like millions)
of content requests all at once.

Solution: Send content requests in more reasonable 32K chunks.

Fixes flux-framework#6456
  • Loading branch information
chu11 committed Dec 21, 2024
1 parent 9278fa8 commit 8604a3d
Showing 1 changed file with 80 additions and 50 deletions.
130 changes: 80 additions & 50 deletions src/modules/kvs-watch/kvs-watch.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
#include "src/common/libcontent/content.h"
#include "src/common/libutil/errprintf.h"

/* if the valref is very large, we won't load all immediately,
* load at max MAX_LOAD
*/
#define MAX_LOADS 32768

/* State for one watcher */
struct watcher {
const flux_msg_t *request; // request message
Expand All @@ -43,9 +48,11 @@ struct watcher {

struct ns_monitor *nsm; // back pointer for removal
json_t *prev; // previous watch value for KVS_WATCH_FULL/UNIQ
bool index_valid; // flag if prev_start_index/prev_end_index set
int prev_start_index; // previous start index loaded
int prev_end_index; // previous end index loaded
bool index_valid; // flag if start_index/end_index set
int start_index; // start index of blobrefs to load
int end_index; // end index of blobrefs to load
int last_index_loaded; // last index sent
json_t *valref_treeobj; // valref for loads
int loaded_blob_count; // number of indices loaded (for FLUX_KVS_STREAM)
void *handle; // zlistx_t handle
};
Expand Down Expand Up @@ -84,6 +91,8 @@ struct watch_ctx {
zhash_t *namespaces; // hash of monitored namespaces
};

static int load_data (flux_t *h, struct watcher *w);

static void watcher_destroy (struct watcher *w)
{
if (w) {
Expand All @@ -103,6 +112,7 @@ static void watcher_destroy (struct watcher *w)
zlist_destroy (&w->loads);
}
json_decref (w->prev);
json_decref (w->valref_treeobj);
free (w);
errno = saved_errno;
}
Expand All @@ -127,6 +137,7 @@ static struct watcher *watcher_create (const flux_msg_t *msg,
goto error_nomem;
w->flags = flags;
w->rootseq = -1;
w->last_index_loaded = -1;
return w;
error_nomem:
errno = ENOMEM;
Expand Down Expand Up @@ -311,11 +322,27 @@ static void load_continuation (flux_future_t *f, void *arg)
&& !(w->flags & FLUX_KVS_STREAM)))
w->finished = true;
}
if (load_data (w->nsm->ctx->h, w) < 0) {
if (!w->mute) {
flux_error_t err;
errprintf (&err,
"error sending request for content blobs [%d:%d]",
w->start_index,
w->end_index);
if (flux_respond_error (w->nsm->ctx->h,
w->request,
errno,
err.text) < 0)
flux_log_error (w->nsm->ctx->h,
"%s: flux_respond_error",
__FUNCTION__);
}
return;
}
if ((w->flags & FLUX_KVS_STREAM)
&& w->responded
&& w->index_valid
&& (w->loaded_blob_count
== (w->prev_end_index - w->prev_start_index + 1))) {
&& (w->loaded_blob_count == (w->end_index - w->start_index + 1))) {
if (!w->mute) {
if (flux_respond_error (w->nsm->ctx->h,
w->request,
Expand Down Expand Up @@ -350,21 +377,31 @@ static flux_future_t *load_ref (flux_t *h, struct watcher *w, const char *ref)
return NULL;
}

static int load_range (flux_t *h,
struct watcher *w,
int start_index,
int end_index,
json_t *val)
static int load_data (flux_t *h, struct watcher *w)
{
int i;
int start_index, i;

assert (w->index_valid);
assert (w->valref_treeobj);

for (i = start_index; i <= end_index; i++) {
if (w->last_index_loaded >= w->end_index)
return 0;

if (w->start_index > w->last_index_loaded)
start_index = w->start_index;
else
start_index = w->last_index_loaded + 1;

for (i = start_index; i <= w->end_index; i++) {
flux_future_t *f;
const char *ref = treeobj_get_blobref (val, i);
const char *ref = treeobj_get_blobref (w->valref_treeobj, i);
if (!ref)
return -1;
if (!(f = load_ref (h, w, ref)))
return -1;
w->last_index_loaded = i;
if (zlist_size (w->loads) > MAX_LOADS)
break;
}
return 0;
}
Expand All @@ -391,16 +428,18 @@ static int handle_initial_response (flux_t *h,
*/
if (treeobj_is_val (val)) {
w->index_valid = true;
w->prev_start_index = 0;
w->prev_end_index = 0;
w->start_index = 0;
w->end_index = 0;
/* since this is a val object, we can just return it */
w->last_index_loaded = 0;
w->loaded_blob_count++;
goto out;
}
else if (treeobj_is_valref (val)) {
w->index_valid = true;
w->prev_start_index = 0;
w->prev_end_index = treeobj_get_count (val) - 1;
w->start_index = 0;
w->end_index = treeobj_get_count (val) - 1;
w->valref_treeobj = json_incref (val);
}
else {
if (w->flags & FLUX_KVS_WATCH_APPEND)
Expand All @@ -419,15 +458,11 @@ static int handle_initial_response (flux_t *h,
goto error_respond;
}

if (load_range (h,
w,
w->prev_start_index,
w->prev_end_index,
val) < 0) {
if (load_data (h, w) < 0) {
errprintf (&err,
"error sending request for content blobs [%d:%d]",
w->prev_start_index,
w->prev_end_index);
w->start_index,
w->end_index);
goto error_respond;
}

Expand Down Expand Up @@ -513,15 +548,16 @@ static int handle_append_response (flux_t *h,
*/
if (treeobj_is_val (val)) {
w->index_valid = true;
w->prev_start_index = 0;
w->prev_end_index = 0;
w->start_index = 0;
w->end_index = 0;
/* since this is a val object, we can just return it */
if (flux_respond_pack (h, w->request, "{ s:O }", "val", val) < 0) {
flux_log_error (h,
"%s: failed to respond to kvs-watch.lookup",
__FUNCTION__);
goto error_out;
}
w->last_index_loaded = 0;
w->loaded_blob_count++;
w->responded = true;
}
Expand All @@ -537,11 +573,12 @@ static int handle_append_response (flux_t *h,
if (w->flags & FLUX_KVS_STREAM)
goto out;
new_end_index = treeobj_get_count (val) - 1;
if (new_end_index > w->prev_end_index) {
w->prev_start_index = w->prev_end_index + 1;
w->prev_end_index = new_end_index;
if (new_end_index > w->end_index) {
w->end_index = new_end_index;
json_decref (w->valref_treeobj);
w->valref_treeobj = json_incref (val);
}
else if (new_end_index < w->prev_end_index) {
else if (new_end_index < w->end_index) {
errprintf (&err, "key watched with WATCH_APPEND truncated");
errno = EINVAL;
goto error_respond;
Expand All @@ -551,19 +588,16 @@ static int handle_append_response (flux_t *h,
}
else {
w->index_valid = true;
w->prev_start_index = 0;
w->prev_end_index = treeobj_get_count (val) - 1;
w->start_index = 0;
w->end_index = treeobj_get_count (val) - 1;
w->valref_treeobj = json_incref (val);
}

if (load_range (h,
w,
w->prev_start_index,
w->prev_end_index,
val) < 0) {
if (load_data (h, w) < 0) {
errprintf (&err,
"error sending request for content blobs [%d:%d]",
w->prev_start_index,
w->prev_end_index);
w->start_index,
w->end_index);
goto error_respond;
}
}
Expand Down Expand Up @@ -594,23 +628,20 @@ static int handle_append_response (flux_t *h,
if (w->flags & FLUX_KVS_STREAM)
goto out;
new_end_index = treeobj_get_count (val) - 1;
if (new_end_index > w->prev_end_index) {
w->prev_start_index = w->prev_end_index + 1;
w->prev_end_index = new_end_index;
if (new_end_index > w->end_index) {
w->end_index = new_end_index;
json_decref (w->valref_treeobj);
w->valref_treeobj = json_incref (val);
}
else if (new_end_index < w->prev_end_index) {
else if (new_end_index < w->end_index) {
errprintf (&err, "key watched with WATCH_APPEND shortened");
errno = EINVAL;
goto error_respond;
}
else
goto out;

if (load_range (h,
w,
w->prev_start_index,
w->prev_end_index,
val) < 0) {
if (load_data (h, w) < 0) {
errprintf (&err, "error loading reference");
goto error_respond;
}
Expand Down Expand Up @@ -768,8 +799,7 @@ static void handle_lookup_response (flux_future_t *f,
if ((w->flags & FLUX_KVS_STREAM)
&& w->responded
&& w->index_valid
&& (w->loaded_blob_count
== (w->prev_end_index - w->prev_start_index + 1))) {
&& (w->loaded_blob_count == (w->end_index - w->start_index + 1))) {
errno = ENODATA;
goto error;
}
Expand Down

0 comments on commit 8604a3d

Please sign in to comment.