Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] broker: do not drop dirty cache entries on error #4524

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion src/broker/content-cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ struct content_cache {

struct list_head lru; // LRU is for valid, clean entries only
struct list_head flush; // dirties queued due to batch limit
struct list_head flush_errors; // flush entries that hit error
unsigned int flush_errors_len;

uint32_t blob_size_limit;
uint32_t flush_batch_limit;
Expand Down Expand Up @@ -484,12 +486,23 @@ void content_load_request (flux_t *h, flux_msg_handler_t *mh,
*/
static void cache_resume_flush (struct content_cache *cache)
{
if (cache->acct_dirty == 0 || (cache->rank == 0 && !cache->backing))
if (cache->acct_dirty == 0
|| (cache->acct_dirty - cache->flush_errors_len) == 0
|| (cache->rank == 0 && !cache->backing))
flush_respond (cache);
else
(void)cache_flush (cache); /* resume flushing, subject to limits */
}

/* Call list_del() just in case entry is already on the list. */
static void flush_errors_list_append (struct content_cache *cache,
struct cache_entry *e)
{
list_del (&e->list);
list_add_tail (&cache->flush_errors, &e->list);
cache->flush_errors_len++;
}

static void cache_store_continuation (flux_future_t *f, void *arg)
{
struct content_cache *cache = arg;
Expand Down Expand Up @@ -518,6 +531,10 @@ static void cache_store_continuation (flux_future_t *f, void *arg)
cache_resume_flush (cache);
return;
error:
/* We cannot ensure the dirty entry has been flushed, add it to
* the flush_errors list so it can be tried again at a later
* time. */
flush_errors_list_append (cache, e);
request_list_respond_error (&e->store_requests,
cache->h,
errno,
Expand Down Expand Up @@ -814,6 +831,16 @@ static void content_flush_request (flux_t *h, flux_msg_handler_t *mh,
goto error;
}
if (cache->acct_dirty > 0) {
/* If there were flushes with errors before, we'll try them
* again by moving them to the flush list */
struct cache_entry *e = NULL;
struct cache_entry *next;
list_for_each_safe (&cache->flush_errors, e, next, list) {
/* flush_list_append() will remove from flush_errors
* first */
flush_list_append (cache, e);
}
cache->flush_errors_len = 0;
if (cache_flush (cache) < 0)
goto error;
if (msgstack_push (&cache->flush_requests, msg) < 0)
Expand Down Expand Up @@ -1038,6 +1065,7 @@ struct content_cache *content_cache_create (flux_t *h, attr_t *attrs)
cache->reactor = flux_get_reactor (h);
list_head_init (&cache->lru);
list_head_init (&cache->flush);
list_head_init (&cache->flush_errors);

if (register_attrs (cache, attrs) < 0)
goto error;
Expand Down