diff --git a/src/modules/content/cache.c b/src/modules/content/cache.c index b543f397fa7a..fa540cca3b37 100644 --- a/src/modules/content/cache.c +++ b/src/modules/content/cache.c @@ -782,7 +782,6 @@ static void content_register_backing_request (flux_t *h, if (flux_respond (h, msg, NULL) < 0) flux_log_error (h, "error responding to register-backing request"); (void)cache_flush (cache); - (void)checkpoints_flush (cache->checkpoint); return; error: if (flux_respond_error (h, msg, errno, errstr) < 0) diff --git a/src/modules/content/checkpoint.c b/src/modules/content/checkpoint.c index d2cba8cebdd9..b0d0b0c878a3 100644 --- a/src/modules/content/checkpoint.c +++ b/src/modules/content/checkpoint.c @@ -28,74 +28,8 @@ struct content_checkpoint { flux_msg_handler_t **handlers; uint32_t rank; struct content_cache *cache; - zhashx_t *hash; - unsigned int hash_dirty; }; -struct checkpoint_data { - struct content_checkpoint *checkpoint; - json_t *value; - uint8_t dirty:1; - bool in_progress; - int refcount; -}; - -static struct checkpoint_data * -checkpoint_data_incref (struct checkpoint_data *data) -{ - if (data) - data->refcount++; - return data; -} - -static void checkpoint_data_decref (struct checkpoint_data *data) -{ - if (data && --data->refcount == 0) { - if (data->dirty) - data->checkpoint->hash_dirty--; - json_decref (data->value); - free (data); - } -} - -/* zhashx_destructor_fn */ -static void checkpoint_data_decref_wrapper (void **arg) -{ - if (arg) { - struct checkpoint_data *data = *arg; - checkpoint_data_decref (data); - } -} - -static struct checkpoint_data * -checkpoint_data_create (struct content_checkpoint *checkpoint, - json_t *value) -{ - struct checkpoint_data *data = NULL; - - if (!(data = calloc (1, sizeof (*data)))) - return NULL; - data->checkpoint = checkpoint; - data->value = json_incref (value); - data->refcount = 1; - return data; -} - -static int checkpoint_data_update (struct content_checkpoint *checkpoint, - const char *key, - json_t *value) -{ - struct checkpoint_data *data = NULL; - - if (!(data = checkpoint_data_create (checkpoint, value))) - return -1; - - zhashx_update (checkpoint->hash, key, data); - data->dirty = 1; - checkpoint->hash_dirty++; - return 0; -} - static void checkpoint_get_continuation (flux_future_t *f, void *arg) { struct content_checkpoint *checkpoint = arg; @@ -111,9 +45,6 @@ static void checkpoint_get_continuation (flux_future_t *f, void *arg) if (flux_rpc_get_unpack (f, "{s:o}", "value", &value) < 0) goto error; - if (checkpoint_data_update (checkpoint, key, value) < 0) - goto error; - if (flux_respond_pack (checkpoint->h, msg, "{s:O}", "value", value) < 0) flux_log_error (checkpoint->h, "error responding to checkpoint-get"); @@ -176,24 +107,16 @@ void content_checkpoint_get_request (flux_t *h, flux_msg_handler_t *mh, const char *key; const char *errstr = NULL; - if (flux_request_unpack (msg, NULL, "{s:s}", "key", &key) < 0) - goto error; - if (checkpoint->rank == 0 && !content_cache_backing_loaded (checkpoint->cache)) { - struct checkpoint_data *data = zhashx_lookup (checkpoint->hash, key); - if (!data) { - errstr = "checkpoint key unavailable"; - errno = ENOENT; - goto error; - } - if (flux_respond_pack (h, msg, - "{s:O}", - "value", data->value) < 0) - flux_log_error (h, "error responding to checkpoint-get"); - return; + errstr = "checkpoint get unavailable, no backing store"; + errno = ENOSYS; + goto error; } + if (flux_request_unpack (msg, NULL, "{s:s}", "key", &key) < 0) + goto error; + if (checkpoint_get_forward (checkpoint, msg, key, @@ -279,6 +202,13 @@ void content_checkpoint_put_request (flux_t *h, flux_msg_handler_t *mh, json_t *value; const char *errstr = NULL; + if (checkpoint->rank == 0 + && !content_cache_backing_loaded (checkpoint->cache)) { + errstr = "checkpoint put unavailable, no backing store"; + errno = ENOSYS; + goto error; + } + if (flux_request_unpack (msg, NULL, "{s:s s:o}", @@ -286,17 +216,6 @@ void content_checkpoint_put_request (flux_t *h, flux_msg_handler_t *mh, "value", &value) < 0) goto error; - if (checkpoint->rank == 0) { - if (checkpoint_data_update (checkpoint, key, value) < 0) - goto error; - - if (!content_cache_backing_loaded (checkpoint->cache)) { - if (flux_respond (h, msg, NULL) < 0) - flux_log_error (checkpoint->h, "error responding to checkpoint-put"); - return; - } - } - if (checkpoint_put_forward (checkpoint, msg, key, @@ -311,72 +230,6 @@ void content_checkpoint_put_request (flux_t *h, flux_msg_handler_t *mh, flux_log_error (h, "error responding to checkpoint-put request"); } -static void checkpoint_flush_continuation (flux_future_t *f, void *arg) -{ - struct checkpoint_data *data = arg; - int rv; - - assert (data); - if ((rv = flux_rpc_get (f, NULL)) < 0) - flux_log_error (data->checkpoint->h, "checkpoint flush rpc"); - if (!rv) { - data->dirty = 0; - data->checkpoint->hash_dirty--; - } - data->in_progress = false; - checkpoint_data_decref (data); - flux_future_destroy (f); -} - -static int checkpoint_flush (struct content_checkpoint *checkpoint, - struct checkpoint_data *data) -{ - if (data->dirty && !data->in_progress) { - const char *key = zhashx_cursor (checkpoint->hash); - const char *topic = "content-backing.checkpoint-put"; - flux_future_t *f; - if (!(f = flux_rpc_pack (checkpoint->h, topic, 0, 0, - "{s:s s:O}", - "key", key, - "value", data->value)) - || flux_future_then (f, - -1, - checkpoint_flush_continuation, - (void *)checkpoint_data_incref (data)) < 0) { - flux_log_error (checkpoint->h, "%s: checkpoint flush", __FUNCTION__); - flux_future_destroy (f); - return -1; - } - data->in_progress = true; - } - return 0; -} - -int checkpoints_flush (struct content_checkpoint *checkpoint) -{ - int last_errno = 0; - int rc = 0; - - if (checkpoint->hash_dirty > 0) { - struct checkpoint_data *data = zhashx_first (checkpoint->hash); - while (data) { - if (checkpoint_flush (checkpoint, data) < 0) { - last_errno = errno; - rc = -1; - /* A few errors we will consider "unrecoverable", so - * break out */ - if (errno == ENOSYS - || errno == ENOMEM) - break; - } - data = zhashx_next (checkpoint->hash); - } - } - if (rc < 0) - errno = last_errno; - return rc; -} - static const struct flux_msg_handler_spec htab[] = { { FLUX_MSGTYPE_REQUEST, @@ -398,7 +251,6 @@ void content_checkpoint_destroy (struct content_checkpoint *checkpoint) if (checkpoint) { int saved_errno = errno; flux_msg_handler_delvec (checkpoint->handlers); - zhashx_destroy (&checkpoint->hash); free (checkpoint); errno = saved_errno; } @@ -417,16 +269,10 @@ struct content_checkpoint *content_checkpoint_create ( checkpoint->rank = rank; checkpoint->cache = cache; - if (!(checkpoint->hash = zhashx_new ())) - goto nomem; - zhashx_set_destructor (checkpoint->hash, checkpoint_data_decref_wrapper); - if (flux_msg_handler_addvec (h, htab, checkpoint, &checkpoint->handlers) < 0) goto error; return checkpoint; -nomem: - errno = ENOMEM; error: content_checkpoint_destroy (checkpoint); return NULL; diff --git a/src/modules/content/checkpoint.h b/src/modules/content/checkpoint.h index 7bd0c5ad6622..d0322ce6b40e 100644 --- a/src/modules/content/checkpoint.h +++ b/src/modules/content/checkpoint.h @@ -19,8 +19,6 @@ struct content_checkpoint *content_checkpoint_create ( struct content_cache *cache); void content_checkpoint_destroy (struct content_checkpoint *checkpoint); -int checkpoints_flush (struct content_checkpoint *checkpoint); - #endif /* !_CONTENT_CHECKPOINT_H */ /*