Skip to content

Commit

Permalink
content: require backing store for checkpoint
Browse files Browse the repository at this point in the history
Problem: A backing store is required for content.flush but it
is not required for content.checkpoint-put.  This is inconsistent
and can lead to checkpointing problems done the line.

Require content.checkpoint-put to only work if there is a backing
store available.  As a consequence, remove code that handled
"cached" checkpoints when a backing store is not available.

Fixes flux-framework#6251
  • Loading branch information
chu11 committed Dec 18, 2024
1 parent a154bdd commit f0f39f9
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 170 deletions.
1 change: 0 additions & 1 deletion src/modules/content/cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,6 @@ static void content_register_backing_request (flux_t *h,
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "error responding to register-backing request");
(void)cache_flush (cache);
(void)checkpoints_flush (cache->checkpoint);
return;
error:
if (flux_respond_error (h, msg, errno, errstr) < 0)
Expand Down
180 changes: 13 additions & 167 deletions src/modules/content/checkpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,74 +28,8 @@ struct content_checkpoint {
flux_msg_handler_t **handlers;
uint32_t rank;
struct content_cache *cache;
zhashx_t *hash;
unsigned int hash_dirty;
};

struct checkpoint_data {
struct content_checkpoint *checkpoint;
json_t *value;
uint8_t dirty:1;
bool in_progress;
int refcount;
};

static struct checkpoint_data *
checkpoint_data_incref (struct checkpoint_data *data)
{
if (data)
data->refcount++;
return data;
}

static void checkpoint_data_decref (struct checkpoint_data *data)
{
if (data && --data->refcount == 0) {
if (data->dirty)
data->checkpoint->hash_dirty--;
json_decref (data->value);
free (data);
}
}

/* zhashx_destructor_fn */
static void checkpoint_data_decref_wrapper (void **arg)
{
if (arg) {
struct checkpoint_data *data = *arg;
checkpoint_data_decref (data);
}
}

static struct checkpoint_data *
checkpoint_data_create (struct content_checkpoint *checkpoint,
json_t *value)
{
struct checkpoint_data *data = NULL;

if (!(data = calloc (1, sizeof (*data))))
return NULL;
data->checkpoint = checkpoint;
data->value = json_incref (value);
data->refcount = 1;
return data;
}

static int checkpoint_data_update (struct content_checkpoint *checkpoint,
const char *key,
json_t *value)
{
struct checkpoint_data *data = NULL;

if (!(data = checkpoint_data_create (checkpoint, value)))
return -1;

zhashx_update (checkpoint->hash, key, data);
data->dirty = 1;
checkpoint->hash_dirty++;
return 0;
}

static void checkpoint_get_continuation (flux_future_t *f, void *arg)
{
struct content_checkpoint *checkpoint = arg;
Expand All @@ -111,9 +45,6 @@ static void checkpoint_get_continuation (flux_future_t *f, void *arg)
if (flux_rpc_get_unpack (f, "{s:o}", "value", &value) < 0)
goto error;

if (checkpoint_data_update (checkpoint, key, value) < 0)
goto error;

if (flux_respond_pack (checkpoint->h, msg, "{s:O}", "value", value) < 0)
flux_log_error (checkpoint->h, "error responding to checkpoint-get");

Expand Down Expand Up @@ -176,24 +107,16 @@ void content_checkpoint_get_request (flux_t *h, flux_msg_handler_t *mh,
const char *key;
const char *errstr = NULL;

if (flux_request_unpack (msg, NULL, "{s:s}", "key", &key) < 0)
goto error;

if (checkpoint->rank == 0
&& !content_cache_backing_loaded (checkpoint->cache)) {
struct checkpoint_data *data = zhashx_lookup (checkpoint->hash, key);
if (!data) {
errstr = "checkpoint key unavailable";
errno = ENOENT;
goto error;
}
if (flux_respond_pack (h, msg,
"{s:O}",
"value", data->value) < 0)
flux_log_error (h, "error responding to checkpoint-get");
return;
errstr = "checkpoint get unavailable, no backing store";
errno = ENOSYS;
goto error;
}

if (flux_request_unpack (msg, NULL, "{s:s}", "key", &key) < 0)
goto error;

if (checkpoint_get_forward (checkpoint,
msg,
key,
Expand Down Expand Up @@ -279,24 +202,20 @@ void content_checkpoint_put_request (flux_t *h, flux_msg_handler_t *mh,
json_t *value;
const char *errstr = NULL;

if (checkpoint->rank == 0
&& !content_cache_backing_loaded (checkpoint->cache)) {
errstr = "checkpoint put unavailable, no backing store";
errno = ENOSYS;
goto error;
}

if (flux_request_unpack (msg,
NULL,
"{s:s s:o}",
"key", &key,
"value", &value) < 0)
goto error;

if (checkpoint->rank == 0) {
if (checkpoint_data_update (checkpoint, key, value) < 0)
goto error;

if (!content_cache_backing_loaded (checkpoint->cache)) {
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (checkpoint->h, "error responding to checkpoint-put");
return;
}
}

if (checkpoint_put_forward (checkpoint,
msg,
key,
Expand All @@ -311,72 +230,6 @@ void content_checkpoint_put_request (flux_t *h, flux_msg_handler_t *mh,
flux_log_error (h, "error responding to checkpoint-put request");
}

static void checkpoint_flush_continuation (flux_future_t *f, void *arg)
{
struct checkpoint_data *data = arg;
int rv;

assert (data);
if ((rv = flux_rpc_get (f, NULL)) < 0)
flux_log_error (data->checkpoint->h, "checkpoint flush rpc");
if (!rv) {
data->dirty = 0;
data->checkpoint->hash_dirty--;
}
data->in_progress = false;
checkpoint_data_decref (data);
flux_future_destroy (f);
}

static int checkpoint_flush (struct content_checkpoint *checkpoint,
struct checkpoint_data *data)
{
if (data->dirty && !data->in_progress) {
const char *key = zhashx_cursor (checkpoint->hash);
const char *topic = "content-backing.checkpoint-put";
flux_future_t *f;
if (!(f = flux_rpc_pack (checkpoint->h, topic, 0, 0,
"{s:s s:O}",
"key", key,
"value", data->value))
|| flux_future_then (f,
-1,
checkpoint_flush_continuation,
(void *)checkpoint_data_incref (data)) < 0) {
flux_log_error (checkpoint->h, "%s: checkpoint flush", __FUNCTION__);
flux_future_destroy (f);
return -1;
}
data->in_progress = true;
}
return 0;
}

int checkpoints_flush (struct content_checkpoint *checkpoint)
{
int last_errno = 0;
int rc = 0;

if (checkpoint->hash_dirty > 0) {
struct checkpoint_data *data = zhashx_first (checkpoint->hash);
while (data) {
if (checkpoint_flush (checkpoint, data) < 0) {
last_errno = errno;
rc = -1;
/* A few errors we will consider "unrecoverable", so
* break out */
if (errno == ENOSYS
|| errno == ENOMEM)
break;
}
data = zhashx_next (checkpoint->hash);
}
}
if (rc < 0)
errno = last_errno;
return rc;
}

static const struct flux_msg_handler_spec htab[] = {
{
FLUX_MSGTYPE_REQUEST,
Expand All @@ -398,7 +251,6 @@ void content_checkpoint_destroy (struct content_checkpoint *checkpoint)
if (checkpoint) {
int saved_errno = errno;
flux_msg_handler_delvec (checkpoint->handlers);
zhashx_destroy (&checkpoint->hash);
free (checkpoint);
errno = saved_errno;
}
Expand All @@ -417,16 +269,10 @@ struct content_checkpoint *content_checkpoint_create (
checkpoint->rank = rank;
checkpoint->cache = cache;

if (!(checkpoint->hash = zhashx_new ()))
goto nomem;
zhashx_set_destructor (checkpoint->hash, checkpoint_data_decref_wrapper);

if (flux_msg_handler_addvec (h, htab, checkpoint, &checkpoint->handlers) < 0)
goto error;
return checkpoint;

nomem:
errno = ENOMEM;
error:
content_checkpoint_destroy (checkpoint);
return NULL;
Expand Down
2 changes: 0 additions & 2 deletions src/modules/content/checkpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ struct content_checkpoint *content_checkpoint_create (
struct content_cache *cache);
void content_checkpoint_destroy (struct content_checkpoint *checkpoint);

int checkpoints_flush (struct content_checkpoint *checkpoint);

#endif /* !_CONTENT_CHECKPOINT_H */

/*
Expand Down

0 comments on commit f0f39f9

Please sign in to comment.