diff --git a/doc/Makefile.am b/doc/Makefile.am index f14869dc09cc..5554a1df025b 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -292,7 +292,8 @@ MAN5_FILES_PRIMARY = \ man5/flux-config-resource.5 \ man5/flux-config-archive.5 \ man5/flux-config-job-manager.5 \ - man5/flux-config-ingest.5 + man5/flux-config-ingest.5 \ + man5/flux-config-kvs.5 MAN7_FILES = $(MAN7_FILES_PRIMARY) diff --git a/doc/man5/flux-config-kvs.rst b/doc/man5/flux-config-kvs.rst new file mode 100644 index 000000000000..19f75ac99343 --- /dev/null +++ b/doc/man5/flux-config-kvs.rst @@ -0,0 +1,46 @@ +================== +flux-config-kvs(5) +================== + + +DESCRIPTION +=========== + +The Flux system instance **kvs** service provides the primary key value +store (i.e. "the KVS") for a large number of Flux services. For +example, job eventlogs are stored in the KVS. + +The ``kvs`` table may contain the following keys: + + +KEYS +==== + +checkpoint-period + (optional) Sets a period of time (in RFC 23 Flux Standard Duration + format) that the KVS will regularly checkpoint a reference to its + primary namespace. The checkpoint is used to protect against data + loss in the event of a Flux broker crash. + + +EXAMPLE +======= + +:: + + [kvs] + checkpoint-period = "30m" + + +RESOURCES +========= + +Flux: http://flux-framework.org + +RFC 23: Flux Standard Duration: https://flux-framework.readthedocs.io/projects/flux-rfc/en/latest/spec_23.html + + +SEE ALSO +======== + +:man5:`flux-config` diff --git a/doc/man5/flux-config.rst b/doc/man5/flux-config.rst index 10923fab882b..041b3bc09e41 100644 --- a/doc/man5/flux-config.rst +++ b/doc/man5/flux-config.rst @@ -77,4 +77,4 @@ SEE ALSO :man1:`flux-broker`, :man5:`flux-config-access`, :man5:`flux-config-bootstrap`, :man5:`flux-config-tbon`, :man5:`flux-config-exec`, :man5:`flux-config-ingest`, :man5:`flux-config-resource`, :man5:`flux-config-archive`, -:man5:`flux-config-job-manager` +:man5:`flux-config-job-manager`, :man5:`flux-config-kvs` diff --git a/doc/manpages.py b/doc/manpages.py index 914ee386204c..f29ebd817744 100644 --- a/doc/manpages.py +++ b/doc/manpages.py @@ -282,6 +282,7 @@ ('man5/flux-config-resource', 'flux-config-resource', 'configure Flux resource service', [author], 5), ('man5/flux-config-archive', 'flux-config-archive', 'configure Flux job archival service', [author], 5), ('man5/flux-config-job-manager', 'flux-config-job-manager', 'configure Flux job manager service', [author], 5), + ('man5/flux-config-kvs', 'flux-config-kvs', 'configure Flux kvs service', [author], 5), ('man7/flux-broker-attributes', 'flux-broker-attributes', 'overview Flux broker attributes', [author], 7), ('man7/flux-jobtap-plugins', 'flux-jobtap-plugins', 'overview Flux jobtap plugin API', [author], 7), ] diff --git a/doc/test/spell.en.pws b/doc/test/spell.en.pws index 2e337e0b6941..9627c335242a 100644 --- a/doc/test/spell.en.pws +++ b/doc/test/spell.en.pws @@ -629,3 +629,4 @@ tmpfiles EDEADLOCK setpgrp nosetpgrp +checkpointed diff --git a/src/modules/kvs/Makefile.am b/src/modules/kvs/Makefile.am index d6a596e171f5..fad7838f2aa5 100644 --- a/src/modules/kvs/Makefile.am +++ b/src/modules/kvs/Makefile.am @@ -29,7 +29,9 @@ kvs_la_SOURCES = \ kvsroot.h \ kvsroot.c \ kvs_wait_version.h \ - kvs_wait_version.c + kvs_wait_version.c \ + kvs_checkpoint.h \ + kvs_checkpoint.c kvs_la_LDFLAGS = $(fluxmod_ldflags) -module kvs_la_LIBADD = $(top_builddir)/src/common/libkvs/libkvs.la \ diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index 19df3768ae00..eb78bf270fb4 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -29,11 +29,13 @@ #include "src/common/libutil/monotime.h" #include "src/common/libutil/tstat.h" #include "src/common/libutil/timestamp.h" +#include "src/common/libutil/errprintf.h" #include "src/common/libkvs/treeobj.h" #include "src/common/libkvs/kvs_checkpoint.h" #include "src/common/libkvs/kvs_txn_private.h" #include "src/common/libkvs/kvs_util_private.h" #include "src/common/libcontent/content.h" +#include "src/common/libutil/fsd.h" #include "waitqueue.h" #include "cache.h" @@ -43,6 +45,7 @@ #include "kvstxn.h" #include "kvsroot.h" #include "kvs_wait_version.h" +#include "kvs_checkpoint.h" /* heartbeat_sync_cb() is called periodically to manage cached content * and namespaces. Synchronize with the system heartbeat if possible, @@ -73,6 +76,7 @@ struct kvs_ctx { bool events_init; /* flag */ const char *hash_name; unsigned int seq; /* for commit transactions */ + kvs_checkpoint_t *kcp; struct list_head work_queue; }; @@ -89,6 +93,8 @@ static void transaction_prep_cb (flux_reactor_t *r, flux_watcher_t *w, static void transaction_check_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg); static void start_root_remove (struct kvs_ctx *ctx, const char *ns); +static void work_queue_check_append (struct kvs_ctx *ctx, + struct kvsroot *root); static void kvstxn_apply (kvstxn_t *kt); /* @@ -103,11 +109,19 @@ static void kvs_ctx_destroy (struct kvs_ctx *ctx) flux_watcher_destroy (ctx->prep_w); flux_watcher_destroy (ctx->check_w); flux_watcher_destroy (ctx->idle_w); + kvs_checkpoint_destroy (ctx->kcp); free (ctx); errno = saved_errno; } } +static void work_queue_check_append_wrapper (struct kvsroot *root, + void *arg) +{ + struct kvs_ctx *ctx = arg; + work_queue_check_append (ctx, root); +} + static struct kvs_ctx *kvs_ctx_create (flux_t *h) { flux_reactor_t *r = flux_get_reactor (h); @@ -138,6 +152,13 @@ static struct kvs_ctx *kvs_ctx_create (flux_t *h) goto error; flux_watcher_start (ctx->prep_w); flux_watcher_start (ctx->check_w); + ctx->kcp = kvs_checkpoint_create (h, + NULL, /* set later */ + 0.0, /* default 0.0, set later */ + work_queue_check_append_wrapper, + ctx); + if (!ctx->kcp) + goto error; } ctx->transaction_merge = 1; list_head_init (&ctx->work_queue); @@ -1044,6 +1065,7 @@ static void kvstxn_apply (kvstxn_t *kt) done: if (errnum == 0) { json_t *names = kvstxn_get_names (kt); + int internal_flags = kvstxn_get_internal_flags (kt); int count; if ((count = json_array_size (names)) > 1) { int opcount = 0; @@ -1051,8 +1073,10 @@ static void kvstxn_apply (kvstxn_t *kt) flux_log (ctx->h, LOG_DEBUG, "aggregated %d transactions (%d ops)", count, opcount); } - setroot (ctx, root, kvstxn_get_newroot_ref (kt), root->seq + 1); - setroot_event_send (ctx, root, names, kvstxn_get_keys (kt)); + if (!(internal_flags & KVSTXN_INTERNAL_FLAG_NO_PUBLISH)) { + setroot (ctx, root, kvstxn_get_newroot_ref (kt), root->seq + 1); + setroot_event_send (ctx, root, names, kvstxn_get_keys (kt)); + } } else { fallback = kvstxn_fallback_mergeable (kt); @@ -1570,7 +1594,7 @@ static void relaycommit_request_cb (flux_t *h, flux_msg_handler_t *mh, goto error; } - if (kvstxn_mgr_add_transaction (root->ktm, name, ops, flags) < 0) { + if (kvstxn_mgr_add_transaction (root->ktm, name, ops, flags, 0) < 0) { flux_log_error (h, "%s: kvstxn_mgr_add_transaction", __FUNCTION__); goto error; @@ -1651,7 +1675,8 @@ static void commit_request_cb (flux_t *h, flux_msg_handler_t *mh, if (kvstxn_mgr_add_transaction (root->ktm, treq_get_name (tr), ops, - flags) < 0) { + flags, + 0) < 0) { flux_log_error (h, "%s: kvstxn_mgr_add_transaction", __FUNCTION__); goto error; @@ -1749,7 +1774,8 @@ static void relayfence_request_cb (flux_t *h, flux_msg_handler_t *mh, if (kvstxn_mgr_add_transaction (root->ktm, treq_get_name (tr), treq_get_ops (tr), - treq_get_flags (tr)) < 0) { + treq_get_flags (tr), + 0) < 0) { flux_log_error (h, "%s: kvstxn_mgr_add_transaction", __FUNCTION__); goto error; @@ -1856,7 +1882,8 @@ static void fence_request_cb (flux_t *h, flux_msg_handler_t *mh, if (kvstxn_mgr_add_transaction (root->ktm, treq_get_name (tr), treq_get_ops (tr), - treq_get_flags (tr)) < 0) { + treq_get_flags (tr), + 0) < 0) { flux_log_error (h, "%s: kvstxn_mgr_add_transaction", __FUNCTION__); goto error; @@ -2698,6 +2725,30 @@ static void setroot_unpause_request_cb (flux_t *h, flux_msg_handler_t *mh, flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); } +static void config_reload_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + struct kvs_ctx *ctx = arg; + const flux_conf_t *conf; + const char *errstr = NULL; + flux_error_t error; + + if (flux_conf_reload_decode (msg, &conf) < 0) + goto error; + if (kvs_checkpoint_reload (ctx->kcp, conf, &error) < 0) { + errstr = error.text; + goto error; + } + if (flux_respond (h, msg, NULL) < 0) + flux_log_error (h, "error responding to config-reload request"); + return; + error: + if (flux_respond_error (h, msg, errno, errstr) < 0) + flux_log_error (h, "error responding to config-reload request"); +} + /* see comments above in event_subscribe() regarding event * subscriptions to kvs.namespace */ static const struct flux_msg_handler_spec htab[] = { @@ -2735,9 +2786,22 @@ static const struct flux_msg_handler_spec htab[] = { setroot_pause_request_cb, FLUX_ROLE_USER }, { FLUX_MSGTYPE_REQUEST, "kvs.setroot-unpause", setroot_unpause_request_cb, FLUX_ROLE_USER }, + { FLUX_MSGTYPE_REQUEST, "kvs.config-reload", config_reload_cb, 0 }, FLUX_MSGHANDLER_TABLE_END, }; +static int process_config (struct kvs_ctx *ctx) +{ + flux_error_t error; + if (kvs_checkpoint_config_parse (ctx->kcp, + flux_get_conf (ctx->h), + &error) < 0) { + flux_log (ctx->h, LOG_ERR, "%s", error.text); + return -1; + } + return 0; +} + static void process_args (struct kvs_ctx *ctx, int ac, char **av) { int i; @@ -2891,6 +2955,8 @@ int mod_main (flux_t *h, int argc, char **argv) flux_log_error (h, "error creating KVS context"); goto done; } + if (process_config (ctx) < 0) + goto done; process_args (ctx, argc, argv); if (ctx->rank == 0) { struct kvsroot *root; @@ -2934,6 +3000,8 @@ int mod_main (flux_t *h, int argc, char **argv) flux_log_error (h, "event_subscribe"); goto done; } + + kvs_checkpoint_update_root_primary (ctx->kcp, root); } if (flux_msg_handler_addvec (h, htab, ctx, &handlers) < 0) { flux_log_error (h, "flux_msg_handler_addvec"); @@ -2947,6 +3015,7 @@ int mod_main (flux_t *h, int argc, char **argv) flux_log_error (h, "error starting heartbeat synchronization"); goto done; } + kvs_checkpoint_start (ctx->kcp); if (flux_reactor_run (flux_get_reactor (h), 0) < 0) { flux_log_error (h, "flux_reactor_run"); goto done; diff --git a/src/modules/kvs/kvs_checkpoint.c b/src/modules/kvs/kvs_checkpoint.c new file mode 100644 index 000000000000..6e85e88a5e40 --- /dev/null +++ b/src/modules/kvs/kvs_checkpoint.c @@ -0,0 +1,233 @@ +/************************************************************\ + * Copyright 2019 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include +#include +#include +#include +#include + +#include "src/common/libutil/errprintf.h" +#include "src/common/libutil/fsd.h" + +#include "kvs_checkpoint.h" +#include "kvsroot.h" + +struct kvs_checkpoint { + flux_t *h; + struct kvsroot *root_primary; + double checkpoint_period; /* in seconds */ + flux_watcher_t *checkpoint_w; + kvs_checkpoint_txn_cb txn_cb; + void *txn_cb_arg; + int last_checkpoint_seq; +}; + +static int checkpoint_period_parse (const flux_conf_t *conf, + flux_error_t *errp, + double *checkpoint_period) +{ + flux_error_t error; + const char *str = NULL; + + if (flux_conf_unpack (conf, + &error, + "{s?{s?s}}", + "kvs", + "checkpoint-period", &str) < 0) { + errprintf (errp, + "error reading config for kvs: %s", + error.text); + return -1; + } + + if (str) { + if (fsd_parse_duration (str, checkpoint_period) < 0) { + errprintf (errp, + "invalid checkpoint-period config: %s", + str); + return -1; + } + } + + return 0; +} + +int kvs_checkpoint_config_parse (kvs_checkpoint_t *kcp, + const flux_conf_t *conf, + flux_error_t *errp) +{ + if (kcp) { + double checkpoint_period = kcp->checkpoint_period; + if (checkpoint_period_parse (conf, errp, &checkpoint_period) < 0) + return -1; + kcp->checkpoint_period = checkpoint_period; + } + return 0; +} + +int kvs_checkpoint_reload (kvs_checkpoint_t *kcp, + const flux_conf_t *conf, + flux_error_t *errp) +{ + if (kcp) { + double checkpoint_period = kcp->checkpoint_period; + if (checkpoint_period_parse (conf, + errp, + &checkpoint_period) < 0) + return -1; + + if (checkpoint_period != kcp->checkpoint_period) { + kcp->checkpoint_period = checkpoint_period; + flux_watcher_stop (kcp->checkpoint_w); + + if (kcp->root_primary + && kcp->checkpoint_period > 0.0) { + flux_timer_watcher_reset (kcp->checkpoint_w, + kcp->checkpoint_period, + kcp->checkpoint_period); + flux_watcher_start (kcp->checkpoint_w); + } + } + } + return 0; +} + + +static void checkpoint_cb (flux_reactor_t *r, + flux_watcher_t *w, + int revents, + void *arg) +{ + kvs_checkpoint_t *kcp = arg; + char name[64]; + json_t *ops = NULL; + + /* if no changes to root since last checkpoint-period, do + * nothing */ + if (kcp->last_checkpoint_seq == kcp->root_primary->seq) + return; + + snprintf (name, + sizeof (name), + "checkpoint-period.%u", + kcp->root_primary->seq); + + if (!(ops = json_array ())) { + errno = ENOMEM; + flux_log_error (kcp->h, "checkpoint-period setup failure"); + goto done; + } + + /* Set FLUX_KVS_SYNC, to perform the checkpoint. + * + * Set KVSTXN_INTERNAL_FLAG_NO_PUBLISH, this is an internal KVS + * module transaction to checkpoint. It has no operations so the + * KVS data will not change. Therefore no setroot() needs to be + * called after this is done. + */ + if (kvstxn_mgr_add_transaction (kcp->root_primary->ktm, + name, + ops, + FLUX_KVS_SYNC, + KVSTXN_INTERNAL_FLAG_NO_PUBLISH) < 0) { + flux_log_error (kcp->h, "%s: kvstxn_mgr_add_transaction", + __FUNCTION__); + goto done; + } + + if (kcp->txn_cb) + kcp->txn_cb (kcp->root_primary, kcp->txn_cb_arg); + + /* N.B. "last_checkpoint_seq" protects against unnecessary + * checkpointing when there is no activity in the primary KVS. + */ + kcp->last_checkpoint_seq = kcp->root_primary->seq; + +done: + json_decref (ops); +} + +kvs_checkpoint_t *kvs_checkpoint_create (flux_t *h, + struct kvsroot *root_primary, + double checkpoint_period, + kvs_checkpoint_txn_cb txn_cb, + void *txn_cb_arg) +{ + kvs_checkpoint_t *kcp = NULL; + + if (!(kcp = calloc (1, sizeof (*kcp)))) + goto error; + + kcp->h = h; + kcp->root_primary = root_primary; /* can be NULL initially */ + kcp->checkpoint_period = checkpoint_period; + kcp->txn_cb = txn_cb; + kcp->txn_cb_arg = txn_cb_arg; + + /* create regardless of checkpoint-period value, in case user + * reconfigures later. + */ + if (!(kcp->checkpoint_w = + flux_timer_watcher_create (flux_get_reactor (h), + kcp->checkpoint_period, + kcp->checkpoint_period, + checkpoint_cb, + kcp))) { + flux_log_error (kcp->h, "flux_timer_watcher_create"); + goto error; + + } + + return kcp; + + error: + kvs_checkpoint_destroy (kcp); + return NULL; +} + +void kvs_checkpoint_update_root_primary (kvs_checkpoint_t *kcp, + struct kvsroot *root_primary) +{ + if (kcp && root_primary) + kcp->root_primary = root_primary; +} + +void kvs_checkpoint_start (kvs_checkpoint_t *kcp) +{ + if (kcp + && kcp->root_primary + && kcp->checkpoint_period > 0.0) { + flux_watcher_stop (kcp->checkpoint_w); + flux_timer_watcher_reset (kcp->checkpoint_w, + kcp->checkpoint_period, + kcp->checkpoint_period); + flux_watcher_start (kcp->checkpoint_w); + } +} + +void kvs_checkpoint_destroy (kvs_checkpoint_t *kcp) +{ + if (kcp) { + int save_errno = errno; + flux_watcher_destroy (kcp->checkpoint_w); + free (kcp); + errno = save_errno; + } +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/src/modules/kvs/kvs_checkpoint.h b/src/modules/kvs/kvs_checkpoint.h new file mode 100644 index 000000000000..ad16ef695471 --- /dev/null +++ b/src/modules/kvs/kvs_checkpoint.h @@ -0,0 +1,74 @@ +/************************************************************\ + * Copyright 2019 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef _FLUX_KVS_CHECKPOINT_H +#define _FLUX_KVS_CHECKPOINT_H + +#include + +#include "kvsroot.h" + +/* kvs_checkpoint will handle checkpointing for the checkpoint-period + * configuration under the [kvs] table. Internally the checkpoint-period + * value and a timer are managed. + * + * To avoid excess comparisons for `rank == 0` throughout KVS code, + * most functions below are no-ops if the `kvs_checkpoint_t` argument + * is NULL. + */ + +typedef struct kvs_checkpoint kvs_checkpoint_t; + +/* callback after sync/checkpoint transaction submitted */ +typedef void (*kvs_checkpoint_txn_cb)(struct kvsroot *root, void *arg); + +/* root_primary - root of primary namespace, will be passed to txn_cb + * - can be NULL if not available at creation time, use + * kvs_checkpoint_update_root_primary() to set later. + * checkpoint_period - timer will trigger a checkpoint every X seconds, + * - no timer will be done if <= 0.0. + * txn_cb - callback after each checkpoint transaction submitted + * txn_cb_arg - passed to txn_cb + */ +kvs_checkpoint_t *kvs_checkpoint_create (flux_t *h, + struct kvsroot *root_primary, + double checkpoint_period, + kvs_checkpoint_txn_cb txn_cb, + void *txn_cb_arg); + +/* update internal checkpoint_period setting as needed */ +int kvs_checkpoint_config_parse (kvs_checkpoint_t *kcp, + const flux_conf_t *conf, + flux_error_t *errp); + +/* update internal checkpoint_period setting as needed and restart + * internal timers if needed + */ +int kvs_checkpoint_reload (kvs_checkpoint_t *kcp, + const flux_conf_t *conf, + flux_error_t *errp); + +/* update kvsroot used internally */ +void kvs_checkpoint_update_root_primary (kvs_checkpoint_t *kcp, + struct kvsroot *root_primary); + +/* start / restart checkpoint timer. If root_primary not yet set or + * checkpoint_period <= 0.0, will do nothing. + */ +void kvs_checkpoint_start (kvs_checkpoint_t *kcp); + +void kvs_checkpoint_destroy (kvs_checkpoint_t *kcp); + + +#endif /* !_FLUX_KVS_CHECKPOINT_H */ + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/src/modules/kvs/kvsroot.h b/src/modules/kvs/kvsroot.h index 5d43bf76278d..46313800651a 100644 --- a/src/modules/kvs/kvsroot.h +++ b/src/modules/kvs/kvsroot.h @@ -20,6 +20,7 @@ #include "waitqueue.h" #include "src/common/libutil/blobref.h" #include "src/common/libccan/ccan/list/list.h" +#include "src/common/libczmqcontainers/czmq_containers.h" typedef struct kvsroot_mgr kvsroot_mgr_t; diff --git a/src/modules/kvs/kvstxn.c b/src/modules/kvs/kvstxn.c index 9de62250bbc8..55e3dcab28ad 100644 --- a/src/modules/kvs/kvstxn.c +++ b/src/modules/kvs/kvstxn.c @@ -34,10 +34,6 @@ #include "kvstxn.h" -#define KVSTXN_PROCESSING 0x01 -#define KVSTXN_MERGED 0x02 /* kvstxn is a merger of transactions */ -#define KVSTXN_MERGE_COMPONENT 0x04 /* kvstxn is member of a merger */ - struct kvstxn_mgr { struct cache *cache; const char *ns_name; @@ -55,7 +51,8 @@ struct kvstxn { json_t *ops; json_t *keys; json_t *names; - int flags; + int flags; /* kvs flags from request caller */ + int internal_flags; /* special kvstxn api internal flags */ json_t *rootcpy; /* working copy of root dir */ const json_t *rootdir; /* source of rootcpy above */ struct cache_entry *entry; /* for reference counting rootdir above */ @@ -65,7 +62,9 @@ struct kvstxn { zlist_t *dirty_cache_entries_list; flux_future_t *f_sync_content_flush; flux_future_t *f_sync_checkpoint; - int internal_flags; + bool processing; /* kvstxn is being processed */ + bool merged; /* kvstxn is a merger of transactions */ + bool merge_component; /* kvstxn is member of a merger */ kvstxn_mgr_t *ktm; /* State transitions * @@ -125,7 +124,8 @@ static void kvstxn_destroy (kvstxn_t *kt) static kvstxn_t *kvstxn_create (kvstxn_mgr_t *ktm, const char *name, json_t *ops, - int flags) + int flags, + int internal_flags) { kvstxn_t *kt; @@ -151,6 +151,7 @@ static kvstxn_t *kvstxn_create (kvstxn_mgr_t *ktm, } } kt->flags = flags; + kt->internal_flags = internal_flags; if (!(kt->missing_refs_list = zlist_new ())) goto error_enomem; zlist_autofree (kt->missing_refs_list); @@ -183,7 +184,7 @@ int kvstxn_set_aux_errnum (kvstxn_t *kt, int errnum) bool kvstxn_fallback_mergeable (kvstxn_t *kt) { - if (kt->internal_flags & KVSTXN_MERGED) + if (kt->merged) return true; return false; } @@ -203,6 +204,11 @@ int kvstxn_get_flags (kvstxn_t *kt) return kt->flags; } +int kvstxn_get_internal_flags (kvstxn_t *kt) +{ + return kt->internal_flags; +} + const char *kvstxn_get_namespace (kvstxn_t *kt) { return kt->ktm->ns_name; @@ -842,7 +848,7 @@ kvstxn_process_t kvstxn_process (kvstxn_t *kt, const char *rootdir_ref) if (kt->errnum) return KVSTXN_PROCESS_ERROR; - if (!(kt->internal_flags & KVSTXN_PROCESSING)) { + if (!kt->processing) { kt->errnum = EINVAL; return KVSTXN_PROCESS_ERROR; } @@ -1243,11 +1249,15 @@ void kvstxn_mgr_destroy (kvstxn_mgr_t *ktm) int kvstxn_mgr_add_transaction (kvstxn_mgr_t *ktm, const char *name, json_t *ops, - int flags) + int flags, + int internal_flags) { kvstxn_t *kt; + int valid_internal_flags = KVSTXN_INTERNAL_FLAG_NO_PUBLISH; - if (!name || !ops) { + if (!name + || !ops + || (internal_flags & ~valid_internal_flags)) { errno = EINVAL; return -1; } @@ -1255,7 +1265,8 @@ int kvstxn_mgr_add_transaction (kvstxn_mgr_t *ktm, if (!(kt = kvstxn_create (ktm, name, ops, - flags))) + flags, + internal_flags))) return -1; if (zlist_append (ktm->ready, kt) < 0) { @@ -1281,7 +1292,7 @@ kvstxn_t *kvstxn_mgr_get_ready_transaction (kvstxn_mgr_t *ktm) { if (kvstxn_mgr_transaction_ready (ktm)) { kvstxn_t *kt = zlist_first (ktm->ready); - kt->internal_flags |= KVSTXN_PROCESSING; + kt->processing = true; return kt; } return NULL; @@ -1290,19 +1301,19 @@ kvstxn_t *kvstxn_mgr_get_ready_transaction (kvstxn_mgr_t *ktm) void kvstxn_mgr_remove_transaction (kvstxn_mgr_t *ktm, kvstxn_t *kt, bool fallback) { - if (kt->internal_flags & KVSTXN_PROCESSING) { + if (kt->processing) { bool kvstxn_is_merged = false; - if (kt->internal_flags & KVSTXN_MERGED) + if (kt->merged) kvstxn_is_merged = true; zlist_remove (ktm->ready, kt); if (kvstxn_is_merged) { kvstxn_t *kt_tmp = zlist_first (ktm->ready); - while (kt_tmp && (kt_tmp->internal_flags & KVSTXN_MERGE_COMPONENT)) { + while (kt_tmp && kt_tmp->merge_component) { if (fallback) { - kt_tmp->internal_flags &= ~KVSTXN_MERGE_COMPONENT; + kt_tmp->merge_component = false; kt_tmp->flags |= FLUX_KVS_NO_MERGE; } else @@ -1408,18 +1419,23 @@ int kvstxn_mgr_merge_ready_transactions (kvstxn_mgr_t *ktm) || first->aux_errnum != 0 || first->state > KVSTXN_STATE_APPLY_OPS || kvstxn_no_merge (first) - || first->internal_flags & KVSTXN_MERGED) + || first->merged) return 0; second = zlist_next (ktm->ready); if (!second || kvstxn_no_merge (second) - || (first->flags != second->flags)) + || (first->flags != second->flags) + || (first->internal_flags != second->internal_flags)) return 0; - if (!(new = kvstxn_create (ktm, NULL, NULL, first->flags))) + if (!(new = kvstxn_create (ktm, + NULL, + NULL, + first->flags, + first->internal_flags))) return -1; - new->internal_flags |= KVSTXN_MERGED; + new->merged = true; nextkt = zlist_first (ktm->ready); do { @@ -1451,11 +1467,10 @@ int kvstxn_mgr_merge_ready_transactions (kvstxn_mgr_t *ktm) nextkt = zlist_first (ktm->ready); nextkt = zlist_next (ktm->ready); do { - /* Wipe out KVSTXN_PROCESSING flag if user previously got - * the kvstxn_t + /* reset processing flag if user previously got the kvstxn_t */ - nextkt->internal_flags &= ~KVSTXN_PROCESSING; - nextkt->internal_flags |= KVSTXN_MERGE_COMPONENT; + nextkt->processing = false; + nextkt->merge_component = true; } while (--count && (nextkt = zlist_next (ktm->ready))); return 0; diff --git a/src/modules/kvs/kvstxn.h b/src/modules/kvs/kvstxn.h index f2b54bf09777..5eae88520de0 100644 --- a/src/modules/kvs/kvstxn.h +++ b/src/modules/kvs/kvstxn.h @@ -27,6 +27,16 @@ typedef enum { KVSTXN_PROCESS_FINISHED = 6, } kvstxn_process_t; +/* api flags, to be used with kvstxn_mgr_add_transaction() + * + * KVSTXN_INTERNAL_FLAG_NO_PUBLISH - Indicate that this transaction + * should not publish its change after the transaction completes. + * Note that kvstxn does not use this flag internally, users can check + * that it has been set via kvstxn_get_internal_flags(). + */ + +#define KVSTXN_INTERNAL_FLAG_NO_PUBLISH 0x01 + /* * kvstxn_t API */ @@ -60,6 +70,7 @@ bool kvstxn_fallback_mergeable (kvstxn_t *kt); json_t *kvstxn_get_ops (kvstxn_t *kt); json_t *kvstxn_get_names (kvstxn_t *kt); int kvstxn_get_flags (kvstxn_t *kt); +int kvstxn_get_internal_flags (kvstxn_t *kt); /* returns namespace passed into kvstxn_mgr_create() */ const char *kvstxn_get_namespace (kvstxn_t *kt); @@ -155,7 +166,8 @@ void kvstxn_mgr_destroy (kvstxn_mgr_t *ktm); int kvstxn_mgr_add_transaction (kvstxn_mgr_t *ktm, const char *name, json_t *ops, - int flags); + int flags, + int internal_flags); /* returns true if there is a transaction ready for processing and is * not blocked, false if not. diff --git a/src/modules/kvs/test/kvsroot.c b/src/modules/kvs/test/kvsroot.c index d9e033a0d9af..031ec3aafd6d 100644 --- a/src/modules/kvs/test/kvsroot.c +++ b/src/modules/kvs/test/kvsroot.c @@ -275,6 +275,7 @@ void basic_kvstxn_mgr_tests (void) ok (kvstxn_mgr_add_transaction (root->ktm, "foo", ops, + 0, 0) == 0, "kvstxn_mgr_add_transaction works"); diff --git a/src/modules/kvs/test/kvstxn.c b/src/modules/kvs/test/kvstxn.c index b385399ba8ed..1095e8545a58 100644 --- a/src/modules/kvs/test/kvstxn.c +++ b/src/modules/kvs/test/kvstxn.c @@ -240,7 +240,7 @@ void kvstxn_mgr_basic_tests (void) ok (kvstxn_mgr_get_ready_transaction (ktm) == NULL, "kvstxn_mgr_get_ready_transaction initially returns NULL for no ready transactions"); - ok (kvstxn_mgr_add_transaction (ktm, NULL, NULL, 0) < 0 + ok (kvstxn_mgr_add_transaction (ktm, NULL, NULL, 0, 0) < 0 && errno == EINVAL, "kvstxn_mgr_add_transaction fails with EINVAL on bad input"); @@ -250,6 +250,7 @@ void kvstxn_mgr_basic_tests (void) ok (kvstxn_mgr_add_transaction (ktm, "transaction1", ops, + 0, 0) == 0, "kvstxn_mgr_add_transaction works"); @@ -276,12 +277,13 @@ void kvstxn_mgr_basic_tests (void) cache_destroy (cache); } -void create_ready_kvstxn (kvstxn_mgr_t *ktm, - const char *name, - const char *key, - const char *val, - int op_flags, - int transaction_flags) +static void create_ready_kvstxn_wrapper (kvstxn_mgr_t *ktm, + const char *name, + const char *key, + const char *val, + int op_flags, + int transaction_flags, + int internal_flags) { json_t *ops = NULL; @@ -293,7 +295,8 @@ void create_ready_kvstxn (kvstxn_mgr_t *ktm, ok (kvstxn_mgr_add_transaction (ktm, name, ops, - transaction_flags) == 0, + transaction_flags, + internal_flags) == 0, "kvstxn_mgr_add_transaction works"); json_decref (ops); @@ -302,6 +305,39 @@ void create_ready_kvstxn (kvstxn_mgr_t *ktm, "kvstxn_mgr_transaction_ready says a kvstxn is ready"); } +void create_ready_kvstxn (kvstxn_mgr_t *ktm, + const char *name, + const char *key, + const char *val, + int op_flags, + int transaction_flags) +{ + create_ready_kvstxn_wrapper (ktm, + name, + key, + val, + op_flags, + transaction_flags, + 0); +} + +void create_ready_kvstxn_internal_flags (kvstxn_mgr_t *ktm, + const char *name, + const char *key, + const char *val, + int op_flags, + int transaction_flags, + int internal_flags) +{ + create_ready_kvstxn_wrapper (ktm, + name, + key, + val, + op_flags, + transaction_flags, + internal_flags); +} + /* Return true if 'key' is referenced an 'ops' array entry. */ bool is_op_key (json_t *ops, const char *key) @@ -379,6 +415,7 @@ void verify_ready_kvstxn (kvstxn_mgr_t *ktm, json_t *names, json_t *ops, int flags, + int internal_flags, const char *extramsg) { json_t *o; @@ -402,6 +439,9 @@ void verify_ready_kvstxn (kvstxn_mgr_t *ktm, ok (kvstxn_get_flags (kt) == flags, "flags do not match"); + ok (kvstxn_get_internal_flags (kt) == internal_flags, + "internal_flags do not match"); + ok (kvstxn_get_newroot_ref (kt) == NULL, "kvstxn_get_newroot returns NULL on non-processed transaction"); @@ -449,7 +489,7 @@ void kvstxn_mgr_merge_tests (void) ops_append (ops, "key1", "1", 0); ops_append (ops, "key2", "2", 0); - verify_ready_kvstxn (ktm, names, ops, 0, "merged transaction"); + verify_ready_kvstxn (ktm, names, ops, 0, 0, "merged transaction"); json_decref (names); json_decref (ops); @@ -471,7 +511,12 @@ void kvstxn_mgr_merge_tests (void) ops = json_array (); ops_append (ops, "key1", "1", 0); - verify_ready_kvstxn (ktm, names, ops, FLUX_KVS_NO_MERGE, "unmerged transaction"); + verify_ready_kvstxn (ktm, + names, + ops, + FLUX_KVS_NO_MERGE, + 0, + "unmerged transaction"); json_decref (names); json_decref (ops); @@ -493,7 +538,7 @@ void kvstxn_mgr_merge_tests (void) ops = json_array (); ops_append (ops, "key1", "1", 0); - verify_ready_kvstxn (ktm, names, ops, 0, "unmerged transaction"); + verify_ready_kvstxn (ktm, names, ops, 0, 0, "unmerged transaction"); json_decref (names); json_decref (ops); @@ -515,7 +560,12 @@ void kvstxn_mgr_merge_tests (void) ops = json_array (); ops_append (ops, "key1", "1", 0); - verify_ready_kvstxn (ktm, names, ops, FLUX_KVS_SYNC, "unmerged transaction"); + verify_ready_kvstxn (ktm, + names, + ops, + FLUX_KVS_SYNC, + 0, + "unmerged transaction"); json_decref (names); json_decref (ops); @@ -537,7 +587,7 @@ void kvstxn_mgr_merge_tests (void) ops = json_array (); ops_append (ops, "key1", "1", 0); - verify_ready_kvstxn (ktm, names, ops, 0, "unmerged transaction"); + verify_ready_kvstxn (ktm, names, ops, 0, 0, "unmerged transaction"); json_decref (names); json_decref (ops); @@ -559,7 +609,7 @@ void kvstxn_mgr_merge_tests (void) ops = json_array (); ops_append (ops, "key1", "1", 0); - verify_ready_kvstxn (ktm, names, ops, 0, "unmerged fence"); + verify_ready_kvstxn (ktm, names, ops, 0, 0, "unmerged fence"); json_decref (names); json_decref (ops); @@ -607,7 +657,7 @@ void kvstxn_basic_tests (void) ops = json_array (); ops_append (ops, "key1", "1", 0); - verify_ready_kvstxn (ktm, names, ops, 0x44, "basic test"); + verify_ready_kvstxn (ktm, names, ops, 0x44, 0, "basic test"); json_decref (names); json_decref (ops); @@ -867,6 +917,66 @@ void kvstxn_basic_kvstxn_process_test_empty_ops (void) cache_destroy (cache); } +void kvstxn_basic_kvstxn_process_test_internal_flags (void) +{ + struct cache *cache; + kvsroot_mgr_t *krm; + kvstxn_mgr_t *ktm; + kvstxn_t *kt; + char rootref[BLOBREF_MAX_STRING_SIZE]; + const char *newroot; + int flags; + + cache = create_cache_with_empty_rootdir (rootref, sizeof (rootref)); + + ok ((krm = kvsroot_mgr_create (NULL, NULL)) != NULL, + "kvsroot_mgr_create works"); + + setup_kvsroot (krm, KVS_PRIMARY_NAMESPACE, cache, ref_dummy); + + ok ((ktm = kvstxn_mgr_create (cache, + KVS_PRIMARY_NAMESPACE, + "sha1", + NULL, + &test_global)) != NULL, + "kvstxn_mgr_create works"); + + create_ready_kvstxn_internal_flags (ktm, + "transaction1", + NULL, + NULL, + 0, + 0, + KVSTXN_INTERNAL_FLAG_NO_PUBLISH); + + ok ((kt = kvstxn_mgr_get_ready_transaction (ktm)) != NULL, + "kvstxn_mgr_get_ready_transaction returns ready kvstxn"); + + flags = kvstxn_get_internal_flags (kt); + ok (flags == KVSTXN_INTERNAL_FLAG_NO_PUBLISH, + "kvstxn_get_internal_flags returns correct flags"); + + ok (kvstxn_process (kt, rootref) == KVSTXN_PROCESS_FINISHED, + "kvstxn_process returns KVSTXN_PROCESS_FINISHED"); + + ok ((newroot = kvstxn_get_newroot_ref (kt)) != NULL, + "kvstxn_get_newroot_ref returns != NULL when processing complete"); + + ok (strcmp (newroot, rootref) == 0, + "root stays identical when no ops in transaction"); + + verify_keys_and_ops_standard (kt); + + kvstxn_mgr_remove_transaction (ktm, kt, false); + + ok ((kt = kvstxn_mgr_get_ready_transaction (ktm)) == NULL, + "kvstxn_mgr_get_ready_transaction returns NULL, no more kvstxns"); + + kvstxn_mgr_destroy (ktm); + kvsroot_mgr_destroy (krm); + cache_destroy (cache); +} + void kvstxn_basic_kvstxn_process_test_normalization (void) { struct cache *cache; @@ -1504,6 +1614,7 @@ void kvstxn_process_multiple_missing_ref (void) ok (kvstxn_mgr_add_transaction (ktm, "transaction1", ops, + 0, 0) == 0, "kvstxn_mgr_add_transaction works"); @@ -1627,6 +1738,7 @@ void kvstxn_process_multiple_identical_missing_ref (void) ok (kvstxn_mgr_add_transaction (ktm, "transaction1", ops, + 0, 0) == 0, "kvstxn_mgr_add_transaction works"); @@ -1739,6 +1851,7 @@ void kvstxn_process_missing_ref_removed (void) ok (kvstxn_mgr_add_transaction (ktm, "transaction1", ops, + 0, 0) == 0, "kvstxn_mgr_add_transaction works"); @@ -2050,6 +2163,7 @@ void kvstxn_process_malformed_operation (void) ok (kvstxn_mgr_add_transaction (ktm, "malformed", ops, + 0, 0) == 0, "kvstxn_mgr_add_transaction works"); @@ -3199,6 +3313,7 @@ void kvstxn_process_append_no_duplicate (void) ok (kvstxn_mgr_add_transaction (ktm, "transaction1", ops, + 0, 0) == 0, "kvstxn_mgr_add_transaction works"); @@ -3423,6 +3538,7 @@ int main (int argc, char *argv[]) kvstxn_corner_case_tests (); kvstxn_basic_kvstxn_process_test (); kvstxn_basic_kvstxn_process_test_empty_ops (); + kvstxn_basic_kvstxn_process_test_internal_flags (); kvstxn_basic_kvstxn_process_test_normalization (); kvstxn_basic_kvstxn_process_test_multiple_transactions (); kvstxn_basic_kvstxn_process_test_multiple_transactions_merge (); diff --git a/t/Makefile.am b/t/Makefile.am index c76d26f831ec..3ea09027c8da 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -90,6 +90,7 @@ TESTSCRIPTS = \ t1008-kvs-eventlog.t \ t1009-kvs-copy.t \ t1010-kvs-commit-sync.t \ + t1011-kvs-checkpoint-period.t \ t1101-barrier-basic.t \ t1102-cmddriver.t \ t1103-apidisconnect.t \ diff --git a/t/t1011-kvs-checkpoint-period.t b/t/t1011-kvs-checkpoint-period.t new file mode 100755 index 000000000000..b1845ce563fa --- /dev/null +++ b/t/t1011-kvs-checkpoint-period.t @@ -0,0 +1,165 @@ +#!/bin/sh +# + +test_description='Test kvs module checkpoint-period config.' + +. `dirname $0`/kvs/kvs-helper.sh + +. `dirname $0`/sharness.sh + +RPC=${FLUX_BUILD_DIR}/t/request/rpc + +skip_all_unless_have jq + +export FLUX_CONF_DIR=$(pwd) +SIZE=4 +test_under_flux ${SIZE} minimal + +kvs_checkpoint_get() { + jq -j -c -n "{key:\"$1\"}" \ + | $RPC kvs-checkpoint.get \ + | jq -r .value.rootref +} + +# arg1 - old ref +# arg2 - timeout (seconds) +kvs_checkpoint_changed() { + old_ref=$1 + local i=0 + local iters=$(($2 * 10)) + while [ $i -lt ${iters} ] + do + ref=$(kvs_checkpoint_get kvs-primary) + if [ "${old_ref}" != "${ref}" ] + then + return 0 + fi + sleep 0.1 + i=$((i + 1)) + done + return 1 +} + +test_expect_success 'configure bad checkpoint-period timer in kvs' ' + cat >kvs.toml <<-EOF && + [kvs] + checkpoint-period = "1Z" + EOF + flux config reload && + test_must_fail flux module load kvs +' + +test_expect_success 'configure checkpoint-period, place initial value' ' + cat >kvs.toml <<-EOF && + [kvs] + checkpoint-period = "200ms" + EOF + flux config reload && + flux module load content-sqlite && + flux module load kvs && + flux kvs put --blobref --sync a=1 > blob1.out +' + +test_expect_success 'kvs: put some more data to kvs (1)' ' + flux kvs put --blobref b=1 > blob2.out +' + +test_expect_success 'kvs: checkpoint of kvs-primary should change in time (1)' ' + kvs_checkpoint_changed $(cat blob1.out) 5 && + kvs_checkpoint_get kvs-primary > checkpoint2.out && + test_cmp checkpoint2.out blob2.out +' + +test_expect_success 'kvs: put some more data to kvs (2)' ' + flux kvs put --blobref c=1 > blob3.out +' + +test_expect_success 'kvs: checkpoint of kvs-primary should change in time (2)' ' + kvs_checkpoint_changed $(cat blob2.out) 5 && + kvs_checkpoint_get kvs-primary > checkpoint3.out && + test_cmp checkpoint3.out blob3.out +' + +test_expect_success 'kvs: put some data to non-primary namespace' ' + flux kvs namespace create "test-ns" && + flux kvs put --namespace=test-ns d=1 +' + +test_expect_success 'kvs: checkpoint of kvs-primary should not change (1)' ' + test_must_fail kvs_checkpoint_changed $(cat blob3.out) 2 +' + +test_expect_success 'configure bad checkpoint-period timer in kvs on reload' ' + cat >kvs.toml <<-EOF && + [kvs] + checkpoint-period = "1Z" + EOF + test_must_fail flux config reload +' + +test_expect_success 're-config checkpoint-period timer, set large period' ' + cat >kvs.toml <<-EOF && + [kvs] + checkpoint-period = "60m" + EOF + flux config reload +' + +test_expect_success 'kvs: put some more data to kvs (3)' ' + flux kvs put --blobref d=1 > blob4.out +' + +test_expect_success 'kvs: checkpoint of kvs-primary should not change (2)' ' + test_must_fail kvs_checkpoint_changed $(cat blob3.out) 2 +' + +test_expect_success 're-config checkpoint-period timer, set small period' ' + cat >kvs.toml <<-EOF && + [kvs] + checkpoint-period = "200ms" + EOF + flux config reload +' + +test_expect_success 'kvs: checkpoint of kvs-primary should change in time (3)' ' + kvs_checkpoint_changed $(cat blob3.out) 5 && + kvs_checkpoint_get kvs-primary > checkpoint4.out && + test_cmp checkpoint4.out blob4.out +' + +test_expect_success 're-config checkpoint-period timer, disable it' ' + cat >kvs.toml <<-EOF && + [kvs] + checkpoint-period = "0s" + EOF + flux config reload +' + +test_expect_success 'kvs: put some more data to kvs (4)' ' + flux kvs put --blobref e=1 > blob5.out +' + +test_expect_success 'kvs: checkpoint of kvs-primary should not change (3)' ' + test_must_fail kvs_checkpoint_changed $(cat blob4.out) 2 +' + +test_expect_success 're-config checkpoint-period timer in kvs, re-enable it' ' + cat >kvs.toml <<-EOF && + [kvs] + checkpoint-period = "200ms" + EOF + flux config reload +' + +test_expect_success 'kvs: checkpoint of kvs-primary should change in time (4)' ' + kvs_checkpoint_changed $(cat blob4.out) 5 && + kvs_checkpoint_get kvs-primary > checkpoint5.out && + test_cmp checkpoint5.out blob5.out +' + +test_expect_success 'kvs: remove modules' ' + flux module remove kvs && + flux module remove content-sqlite +' + +test_done