Skip to content

Commit

Permalink
broker: do not drop dirty cache entries on error
Browse files Browse the repository at this point in the history
Problem: If an error occurs during a content store, a dirty
cache entry can be lost forever as it not on the flush list
or the lru list.  As a result, the "dirty count" of entries
will be inconsistent to the known dirty entries (i.e. entries
in the flush list or in the process of being stored).

Solution: If a content store fails, add the entry to a new
flush_errors list so it can be tried again at a later time.

Fixes flux-framework#4472
  • Loading branch information
chu11 committed Aug 26, 2022
1 parent b4cc5e0 commit 61cc216
Showing 1 changed file with 29 additions and 1 deletion.
30 changes: 29 additions & 1 deletion src/broker/content-cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ struct content_cache {

struct list_head lru; // LRU is for valid, clean entries only
struct list_head flush; // dirties queued due to batch limit
struct list_head flush_errors; // flush entries that hit error
unsigned int flush_errors_len;

uint32_t blob_size_limit;
uint32_t flush_batch_limit;
Expand Down Expand Up @@ -484,12 +486,23 @@ void content_load_request (flux_t *h, flux_msg_handler_t *mh,
*/
static void cache_resume_flush (struct content_cache *cache)
{
if (cache->acct_dirty == 0 || (cache->rank == 0 && !cache->backing))
if (cache->acct_dirty == 0
|| (cache->acct_dirty - cache->flush_errors_len) == 0
|| (cache->rank == 0 && !cache->backing))
flush_respond (cache);
else
(void)cache_flush (cache); /* resume flushing, subject to limits */
}

/* Call list_del() just in case entry is already on the list. */
static void flush_errors_list_append (struct content_cache *cache,
struct cache_entry *e)
{
list_del (&e->list);
list_add_tail (&cache->flush_errors, &e->list);
cache->flush_errors_len++;
}

static void cache_store_continuation (flux_future_t *f, void *arg)
{
struct content_cache *cache = arg;
Expand Down Expand Up @@ -518,6 +531,10 @@ static void cache_store_continuation (flux_future_t *f, void *arg)
cache_resume_flush (cache);
return;
error:
/* We cannot ensure the dirty entry has been flushed, add it to
* the flush_errors list so it can be tried again at a later
* time. */
flush_errors_list_append (cache, e);
request_list_respond_error (&e->store_requests,
cache->h,
errno,
Expand Down Expand Up @@ -814,6 +831,16 @@ static void content_flush_request (flux_t *h, flux_msg_handler_t *mh,
goto error;
}
if (cache->acct_dirty > 0) {
/* If there were flushes with errors before, we'll try them
* again by moving them to the flush list */
struct cache_entry *e = NULL;
struct cache_entry *next;
list_for_each_safe (&cache->flush_errors, e, next, list) {
/* flush_list_append() will remove from flush_errors
* first */
flush_list_append (cache, e);
}
cache->flush_errors_len = 0;
if (cache_flush (cache) < 0)
goto error;
if (msgstack_push (&cache->flush_requests, msg) < 0)
Expand Down Expand Up @@ -1038,6 +1065,7 @@ struct content_cache *content_cache_create (flux_t *h, attr_t *attrs)
cache->reactor = flux_get_reactor (h);
list_head_init (&cache->lru);
list_head_init (&cache->flush);
list_head_init (&cache->flush_errors);

if (register_attrs (cache, attrs) < 0)
goto error;
Expand Down

0 comments on commit 61cc216

Please sign in to comment.