Skip to content

Commit

Permalink
Merge pull request #2786 from garlick/content_cleanup
Browse files Browse the repository at this point in the history
content-sqlite, content-cache: cleanup and refactoring
  • Loading branch information
mergify[bot] authored Mar 2, 2020
2 parents 7bac877 + efcd9e8 commit 7d4fc7c
Show file tree
Hide file tree
Showing 5 changed files with 437 additions and 278 deletions.
7 changes: 3 additions & 4 deletions doc/man1/flux-content.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,9 @@ cache of its availability, which triggers the cache to begin
offloading entries. Once entries are offloaded, they are eligible
for expiration from the rank 0 cache.

If the module is unloaded, its contents are transferred back
to the cache. This operation may fail if more content has
been stored than can fit in memory, and so is only advisable early
in the life of an instance.
To avoid data loss, once a content backing module is loaded,
do not unload it unless the content cache on rank 0 has been flushed
and the system is shutting down.


CACHE EXPIRATION
Expand Down
125 changes: 86 additions & 39 deletions src/broker/content-cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -596,15 +596,6 @@ static void content_store_request (flux_t *h, flux_msg_handler_t *mh,
* informs the content service of its availability, and entries are
* asynchronously duplicated on the backing store and made eligible for
* dropping from the rank 0 cache.
*
* At module unload time, Backing store is disabled and content.backing
* synchronously transfers content back to the cache. This allows the
* module providing the backing store to be replaced early at runtime,
* before the amount of content exceeds the cache's ability to hold it.
*
* If the broker is being shutdown, this transfer is skipped by
* to avoid unnecessary and possibly OOM-triggering data movement into
* the cache.
*/

static int cache_flush (content_cache_t *cache)
Expand Down Expand Up @@ -638,43 +629,60 @@ static int cache_flush (content_cache_t *cache)
return rc;
}

static void content_backing_request (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
static void content_register_backing_request (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
content_cache_t *cache = arg;
const char *name;
int backing;

if (flux_request_unpack (msg, NULL, "{ s:b s:s }",
"backing", &backing,
"name", &name) < 0)
if (flux_request_unpack (msg, NULL, "{s:s}", "name", &name) < 0)
goto error;
if (cache->rank != 0) {
if (cache->rank != 0 || cache->backing) {
errno = EINVAL;
goto error;
}
if (!cache->backing && backing) {
if (!(cache->backing_name = strdup (name)))
goto error;
cache->backing = 1;
flux_log (h, LOG_DEBUG,
"content backing store: enabled %s", name);
(void)cache_flush (cache);
} else if (cache->backing && !backing) {
cache->backing = 0;
if (cache->backing_name)
free (cache->backing_name);
cache->backing_name = NULL;
flux_log (h, LOG_DEBUG, "content backing store: disabled %s", name);
}
if (!(cache->backing_name = strdup (name)))
goto error;
cache->backing = 1;
flux_log (h, LOG_DEBUG, "content backing store: enabled %s", name);
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "content backing");
(void)cache_flush (cache);
return;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "content backing");
};

static void content_unregister_backing_request (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
content_cache_t *cache = arg;

if (flux_request_decode (msg, NULL, NULL) < 0)
goto error;
if (!cache->backing) {
errno = EINVAL;
goto error;
}
cache->backing = 0;
free (cache->backing_name);
cache->backing_name = NULL;
flux_log (h, LOG_DEBUG, "content backing store: disabled");
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "flux_respond");
if (cache->acct_dirty > 0)
flux_log (h, LOG_ERR, "%d unflushables", cache->acct_dirty);
return;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "flux_respond_error");
}

/* Forcibly drop all entries from the cache that can be dropped
* without data loss.
* N.B. this walks the entire cache in one go.
Expand Down Expand Up @@ -858,15 +866,54 @@ static void heartbeat_event (flux_t *h, flux_msg_handler_t *mh,
*/

static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "content.load", content_load_request,
FLUX_ROLE_USER },
{ FLUX_MSGTYPE_REQUEST, "content.store", content_store_request,
FLUX_ROLE_USER },
{ FLUX_MSGTYPE_REQUEST, "content.backing", content_backing_request, 0 },
{ FLUX_MSGTYPE_REQUEST, "content.dropcache", content_dropcache_request, 0 },
{ FLUX_MSGTYPE_REQUEST, "content.stats.get", content_stats_request, 0 },
{ FLUX_MSGTYPE_REQUEST, "content.flush", content_flush_request, 0 },
{ FLUX_MSGTYPE_EVENT, "hb", heartbeat_event, 0 },
{
FLUX_MSGTYPE_REQUEST,
"content.load",
content_load_request,
FLUX_ROLE_USER
},
{
FLUX_MSGTYPE_REQUEST,
"content.store",
content_store_request,
FLUX_ROLE_USER
},
{
FLUX_MSGTYPE_REQUEST,
"content.unregister-backing",
content_unregister_backing_request,
0
},
{
FLUX_MSGTYPE_REQUEST,
"content.register-backing",
content_register_backing_request,
0
},
{
FLUX_MSGTYPE_REQUEST,
"content.dropcache",
content_dropcache_request,
0
},
{
FLUX_MSGTYPE_REQUEST,
"content.stats.get",
content_stats_request,
0
},
{
FLUX_MSGTYPE_REQUEST,
"content.flush",
content_flush_request,
0
},
{
FLUX_MSGTYPE_EVENT,
"hb",
heartbeat_event,
0
},
FLUX_MSGHANDLER_TABLE_END,
};

Expand Down
Loading

0 comments on commit 7d4fc7c

Please sign in to comment.