Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

content: require backing store for checkpoint #6255

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions src/modules/content-files/content-files.c
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,7 @@ void checkpoint_get_cb (flux_t *h,
if (flux_respond_pack (h,
msg,
"{s:O}",
"value",
o) < 0)
"value", o) < 0)
flux_log_error (h, "error responding to checkpoint-get request");
free (data);
json_decref (o);
Expand Down Expand Up @@ -257,10 +256,8 @@ void checkpoint_put_cb (flux_t *h,
if (flux_request_unpack (msg,
NULL,
"{s:s s:o}",
"key",
&key,
"value",
&o) < 0)
"key", &key,
"value", &o) < 0)
goto error;
if (!(value = json_dumps (o, JSON_COMPACT))) {
errstr = "failed to encode checkpoint value";
Expand Down
21 changes: 7 additions & 14 deletions src/modules/content-s3/content-s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,10 @@ static struct s3_config *parse_config (const flux_conf_t *conf,
&error,
"{s:{s:s, s:s, s:s, s?b !} }",
"content-s3",
"credential-file",
&cred_file,
"bucket",
&bucket,
"uri",
&uri,
"virtual-host-style",
&is_virtual_host) < 0) {
"credential-file", &cred_file,
"bucket", &bucket,
"uri", &uri,
"virtual-host-style", &is_virtual_host) < 0) {
errprintf (errp, "%s", error.text);
goto error;
}
Expand Down Expand Up @@ -367,8 +363,7 @@ void checkpoint_get_cb (flux_t *h,
if (flux_respond_pack (h,
msg,
"{s:O}",
"value",
o) < 0) {
"value", o) < 0) {
errno = EIO;
flux_log_error (h, "error responding to checkpoint-get request (pack)");
}
Expand Down Expand Up @@ -400,10 +395,8 @@ void checkpoint_put_cb (flux_t *h,
if (flux_request_unpack (msg,
NULL,
"{s:s s:o}",
"key",
&key,
"value",
&o) < 0)
"key", &key,
"value", &o) < 0)
goto error;
if (!(value = json_dumps (o, JSON_COMPACT))) {
errstr = "failed to encode checkpoint value";
Expand Down
9 changes: 3 additions & 6 deletions src/modules/content-sqlite/content-sqlite.c
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,7 @@ void checkpoint_get_cb (flux_t *h,
if (flux_respond_pack (h,
msg,
"{s:O}",
"value",
o) < 0)
"value", o) < 0)
flux_log_error (h, "flux_respond_pack");
(void )sqlite3_reset (ctx->checkpt_get_stmt);
json_decref (o);
Expand All @@ -440,10 +439,8 @@ void checkpoint_put_cb (flux_t *h,
if (flux_request_unpack (msg,
NULL,
"{s:s s:o}",
"key",
&key,
"value",
&o) < 0)
"key", &key,
"value", &o) < 0)
goto error;
if (!(value = json_dumps (o, JSON_COMPACT))) {
errstr = "failed to encode checkpoint value";
Expand Down
1 change: 0 additions & 1 deletion src/modules/content/cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,6 @@ static void content_register_backing_request (flux_t *h,
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "error responding to register-backing request");
(void)cache_flush (cache);
(void)checkpoints_flush (cache->checkpoint);
return;
error:
if (flux_respond_error (h, msg, errno, errstr) < 0)
Expand Down
186 changes: 15 additions & 171 deletions src/modules/content/checkpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,74 +28,8 @@ struct content_checkpoint {
flux_msg_handler_t **handlers;
uint32_t rank;
struct content_cache *cache;
zhashx_t *hash;
unsigned int hash_dirty;
};

struct checkpoint_data {
struct content_checkpoint *checkpoint;
json_t *value;
uint8_t dirty:1;
bool in_progress;
int refcount;
};

static struct checkpoint_data *
checkpoint_data_incref (struct checkpoint_data *data)
{
if (data)
data->refcount++;
return data;
}

static void checkpoint_data_decref (struct checkpoint_data *data)
{
if (data && --data->refcount == 0) {
if (data->dirty)
data->checkpoint->hash_dirty--;
json_decref (data->value);
free (data);
}
}

/* zhashx_destructor_fn */
static void checkpoint_data_decref_wrapper (void **arg)
{
if (arg) {
struct checkpoint_data *data = *arg;
checkpoint_data_decref (data);
}
}

static struct checkpoint_data *
checkpoint_data_create (struct content_checkpoint *checkpoint,
json_t *value)
{
struct checkpoint_data *data = NULL;

if (!(data = calloc (1, sizeof (*data))))
return NULL;
data->checkpoint = checkpoint;
data->value = json_incref (value);
data->refcount = 1;
return data;
}

static int checkpoint_data_update (struct content_checkpoint *checkpoint,
const char *key,
json_t *value)
{
struct checkpoint_data *data = NULL;

if (!(data = checkpoint_data_create (checkpoint, value)))
return -1;

zhashx_update (checkpoint->hash, key, data);
data->dirty = 1;
checkpoint->hash_dirty++;
return 0;
}

static void checkpoint_get_continuation (flux_future_t *f, void *arg)
{
struct content_checkpoint *checkpoint = arg;
Expand All @@ -111,9 +45,6 @@ static void checkpoint_get_continuation (flux_future_t *f, void *arg)
if (flux_rpc_get_unpack (f, "{s:o}", "value", &value) < 0)
goto error;

if (checkpoint_data_update (checkpoint, key, value) < 0)
goto error;

if (flux_respond_pack (checkpoint->h, msg, "{s:O}", "value", value) < 0)
flux_log_error (checkpoint->h, "error responding to checkpoint-get");

Expand Down Expand Up @@ -176,24 +107,16 @@ void content_checkpoint_get_request (flux_t *h, flux_msg_handler_t *mh,
const char *key;
const char *errstr = NULL;

if (flux_request_unpack (msg, NULL, "{s:s}", "key", &key) < 0)
goto error;

if (checkpoint->rank == 0
&& !content_cache_backing_loaded (checkpoint->cache)) {
struct checkpoint_data *data = zhashx_lookup (checkpoint->hash, key);
if (!data) {
errstr = "checkpoint key unavailable";
errno = ENOENT;
goto error;
}
if (flux_respond_pack (h, msg,
"{s:O}",
"value", data->value) < 0)
flux_log_error (h, "error responding to checkpoint-get");
return;
errstr = "checkpoint get unavailable, no backing store";
errno = ENOSYS;
goto error;
}

if (flux_request_unpack (msg, NULL, "{s:s}", "key", &key) < 0)
goto error;

if (checkpoint_get_forward (checkpoint,
msg,
key,
Expand Down Expand Up @@ -279,26 +202,20 @@ void content_checkpoint_put_request (flux_t *h, flux_msg_handler_t *mh,
json_t *value;
const char *errstr = NULL;

if (checkpoint->rank == 0
&& !content_cache_backing_loaded (checkpoint->cache)) {
errstr = "checkpoint put unavailable, no backing store";
errno = ENOSYS;
goto error;
}

if (flux_request_unpack (msg,
NULL,
"{s:s s:o}",
"key",
&key,
"value",
&value) < 0)
"key", &key,
"value", &value) < 0)
goto error;

if (checkpoint->rank == 0) {
if (checkpoint_data_update (checkpoint, key, value) < 0)
goto error;

if (!content_cache_backing_loaded (checkpoint->cache)) {
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (checkpoint->h, "error responding to checkpoint-put");
return;
}
}

if (checkpoint_put_forward (checkpoint,
msg,
key,
Expand All @@ -313,72 +230,6 @@ void content_checkpoint_put_request (flux_t *h, flux_msg_handler_t *mh,
flux_log_error (h, "error responding to checkpoint-put request");
}

static void checkpoint_flush_continuation (flux_future_t *f, void *arg)
{
struct checkpoint_data *data = arg;
int rv;

assert (data);
if ((rv = flux_rpc_get (f, NULL)) < 0)
flux_log_error (data->checkpoint->h, "checkpoint flush rpc");
if (!rv) {
data->dirty = 0;
data->checkpoint->hash_dirty--;
}
data->in_progress = false;
checkpoint_data_decref (data);
flux_future_destroy (f);
}

static int checkpoint_flush (struct content_checkpoint *checkpoint,
struct checkpoint_data *data)
{
if (data->dirty && !data->in_progress) {
const char *key = zhashx_cursor (checkpoint->hash);
const char *topic = "content-backing.checkpoint-put";
flux_future_t *f;
if (!(f = flux_rpc_pack (checkpoint->h, topic, 0, 0,
"{s:s s:O}",
"key", key,
"value", data->value))
|| flux_future_then (f,
-1,
checkpoint_flush_continuation,
(void *)checkpoint_data_incref (data)) < 0) {
flux_log_error (checkpoint->h, "%s: checkpoint flush", __FUNCTION__);
flux_future_destroy (f);
return -1;
}
data->in_progress = true;
}
return 0;
}

int checkpoints_flush (struct content_checkpoint *checkpoint)
{
int last_errno = 0;
int rc = 0;

if (checkpoint->hash_dirty > 0) {
struct checkpoint_data *data = zhashx_first (checkpoint->hash);
while (data) {
if (checkpoint_flush (checkpoint, data) < 0) {
last_errno = errno;
rc = -1;
/* A few errors we will consider "unrecoverable", so
* break out */
if (errno == ENOSYS
|| errno == ENOMEM)
break;
}
data = zhashx_next (checkpoint->hash);
}
}
if (rc < 0)
errno = last_errno;
return rc;
}

static const struct flux_msg_handler_spec htab[] = {
{
FLUX_MSGTYPE_REQUEST,
Expand All @@ -400,7 +251,6 @@ void content_checkpoint_destroy (struct content_checkpoint *checkpoint)
if (checkpoint) {
int saved_errno = errno;
flux_msg_handler_delvec (checkpoint->handlers);
zhashx_destroy (&checkpoint->hash);
free (checkpoint);
errno = saved_errno;
}
Expand All @@ -419,16 +269,10 @@ struct content_checkpoint *content_checkpoint_create (
checkpoint->rank = rank;
checkpoint->cache = cache;

if (!(checkpoint->hash = zhashx_new ()))
goto nomem;
zhashx_set_destructor (checkpoint->hash, checkpoint_data_decref_wrapper);

if (flux_msg_handler_addvec (h, htab, checkpoint, &checkpoint->handlers) < 0)
goto error;
return checkpoint;

nomem:
errno = ENOMEM;
error:
content_checkpoint_destroy (checkpoint);
return NULL;
Expand Down
2 changes: 0 additions & 2 deletions src/modules/content/checkpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ struct content_checkpoint *content_checkpoint_create (
struct content_cache *cache);
void content_checkpoint_destroy (struct content_checkpoint *checkpoint);

int checkpoints_flush (struct content_checkpoint *checkpoint);

#endif /* !_CONTENT_CHECKPOINT_H */

/*
Expand Down
Loading
Loading