From 1ef979b04d36b0ce8fc1e3a16924d7256b894652 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 4 Mar 2020 07:11:24 -0800 Subject: [PATCH 1/8] content-sqlite: drop unused dump prepared stmt Problem: The 'sql_dump' prepared statement is unused, left over from code removed in #2786. Remove unused prepared statement. --- src/modules/content-sqlite/content-sqlite.c | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/src/modules/content-sqlite/content-sqlite.c b/src/modules/content-sqlite/content-sqlite.c index e91263ec579c..79c06fb15401 100644 --- a/src/modules/content-sqlite/content-sqlite.c +++ b/src/modules/content-sqlite/content-sqlite.c @@ -35,7 +35,6 @@ const char *sql_load = "SELECT object,size FROM objects" " WHERE hash = ?1 LIMIT 1"; const char *sql_store = "INSERT INTO objects (hash,size,object) " " values (?1, ?2, ?3)"; -const char *sql_dump = "SELECT object,size FROM objects"; struct content_sqlite { flux_msg_handler_t **handlers; @@ -44,7 +43,6 @@ struct content_sqlite { sqlite3 *db; sqlite3_stmt *load_stmt; sqlite3_stmt *store_stmt; - sqlite3_stmt *dump_stmt; flux_t *h; const char *hashfun; uint32_t blob_size_limit; @@ -380,13 +378,9 @@ static void content_sqlite_closedb (struct content_sqlite *ctx) if (sqlite3_finalize (ctx->load_stmt) != SQLITE_OK) log_sqlite_error (ctx, "sqlite_finalize load_stmt"); } - if (ctx->dump_stmt) { - if (sqlite3_finalize (ctx->dump_stmt) != SQLITE_OK) - log_sqlite_error (ctx, "sqlite_finalize dump_stmt"); - } if (ctx->db) { if (sqlite3_close (ctx->db) != SQLITE_OK) - log_sqlite_error (ctx, "sqlite_finalize dump_stmt"); + log_sqlite_error (ctx, "sqlite3_close"); } errno = saved_errno; } @@ -446,14 +440,6 @@ static int content_sqlite_opendb (struct content_sqlite *ctx) log_sqlite_error (ctx, "preparing store stmt"); goto error; } - if (sqlite3_prepare_v2 (ctx->db, - sql_dump, - -1, - &ctx->dump_stmt, - NULL) != SQLITE_OK) { - log_sqlite_error (ctx, "preparing dump stmt"); - goto error; - } return 0; error: set_errno_from_sqlite_error (ctx); From 21ffb63d918403f2d8409552e0264b45e2dfb45b Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 27 Feb 2020 10:42:26 -0800 Subject: [PATCH 2/8] broker: drop persist-directory, persist-filesystem Drop code that validates a 'persist-directory' attribute, if set, or that sets 'persist-directory' to a unique directory in 'persist-filesystem', if set. Drop code that sets the broker 'log-filename' attribute to a location in 'persist-directory', if the former is unset and the latter is set. Drop code from rc3 that attempts to write out a KVS snapshot if 'persist-directory', if set. Drop relevant sharness tests from t0001-basic.t. Drop all tests from t2010-kvs-snapshot-restore.t for now. Drop these attributes from flux-broker-attributes(7) man page. --- doc/man7/flux-broker-attributes.adoc | 14 +---- etc/rc3 | 10 +--- src/broker/broker.c | 85 ---------------------------- src/broker/log.c | 14 ----- t/t0001-basic.t | 26 --------- t/t2010-kvs-snapshot-restore.t | 33 +---------- 6 files changed, 3 insertions(+), 179 deletions(-) diff --git a/doc/man7/flux-broker-attributes.adoc b/doc/man7/flux-broker-attributes.adoc index f93e97efb68e..a32c02b7c06d 100644 --- a/doc/man7/flux-broker-attributes.adoc +++ b/doc/man7/flux-broker-attributes.adoc @@ -40,17 +40,6 @@ the session. By default broker.rundir is set to "${rundir}/${rank}", which guarantees a unique directory per rank. It is not advisable to override this attribute on the command line. Use rundir instead. -persist-directory:: -A persistent directory available for storage on rank 0 only. -If persist-directory is not defined, persistence is unavailable -and users should fall back to broker.rundir, with cleanup. - -persist-filesystem:: -If defined, and persist-directory is not defined, the rank -0 broker chooses a unique name for persist-directory within -persist-filesystem and creates it automatically. - - TOPOLOGY ATTRIBUTES ------------------- tbon.arity:: @@ -115,8 +104,7 @@ to stderr on the logging rank, for capture by the enclosing instance. log-filename:: (rank zero only) If set, session log entries, as filtered by log-forward-level, -are directed to this file. If unset, but persist-directory is set, log -entries are directed to persist-directory/log. +are directed to this file. log-stderr-level:: (rank zero only) Session log entries at syslog(3) level at or below this diff --git a/etc/rc3 b/etc/rc3 index da925bd5cc63..45d9e7a02fa1 100755 --- a/etc/rc3 +++ b/etc/rc3 @@ -21,10 +21,6 @@ flux module remove -f job-exec flux module remove -f job-manager flux exec -r all flux module remove -f job-ingest -if PERSISTDIR=$(flux getattr persist-directory 2>/dev/null); then - /bin/true; # XXX: nothing to persist? -fi - flux module remove -f userdb flux module remove -f cron @@ -34,10 +30,6 @@ flux exec -r all flux module remove -f barrier flux exec -r all flux module remove -f job-info flux exec -r all flux module remove -f kvs-watch flux exec -r all -x 0 flux module remove -f kvs -if test -n "$PERSISTDIR"; then - flux kvs getroot >${PERSISTDIR}/kvsroot.final - flux content flush -fi + flux module remove -f kvs flux module remove -f content-sqlite - diff --git a/src/broker/broker.c b/src/broker/broker.c index 4c1df66cb806..af3cbd694923 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -149,7 +149,6 @@ static void runlevel_cb (runlevel_t *r, int level, int rc, double elapsed, static void runlevel_io_cb (runlevel_t *r, const char *name, const char *msg, void *arg); -static int create_persistdir (attr_t *attrs, uint32_t rank); static int create_rundir (attr_t *attrs); static int create_broker_rundir (overlay_t *ov, void *arg); static int create_dummyattrs (flux_t *h, uint32_t rank, uint32_t size); @@ -499,14 +498,6 @@ int main (int argc, char *argv[]) // Setup profiling setup_profiling (argv[0], rank); - /* If persist-filesystem or persist-directory are set, initialize those, - * but only on rank 0. - */ - if (create_persistdir (ctx.attrs, rank) < 0) { - log_err ("create_persistdir"); - goto cleanup; - } - /* Initialize logging. * OK to call flux_log*() after this. */ @@ -1107,82 +1098,6 @@ static int create_broker_rundir (overlay_t *ov, void *arg) return rv; } -/* If 'persist-directory' set, validate it, make it immutable, done. - * If 'persist-filesystem' set, validate it, make it immutable, then: - * Avoid name collisions with other flux tmpdirs used in testing - * e.g. "flux--XXXXXX" - */ -static int create_persistdir (attr_t *attrs, uint32_t rank) -{ - struct stat sb; - const char *attr = "persist-directory"; - const char *persist_dir, *persist_fs; - char *dir, *tmpl = NULL; - int rc = -1; - - if (rank > 0) { - (void) attr_delete (attrs, "persist-filesystem", true); - (void) attr_delete (attrs, "persist-directory", true); - goto done_success; - } - if (attr_get (attrs, attr, &persist_dir, NULL) == 0) { - if (stat (persist_dir, &sb) < 0) - goto done; - if (!S_ISDIR (sb.st_mode)) { - errno = ENOTDIR; - goto done; - } - if ((sb.st_mode & S_IRWXU) != S_IRWXU) { - errno = EPERM; - goto done; - } - if (attr_set_flags (attrs, attr, FLUX_ATTRFLAG_IMMUTABLE) < 0) - goto done; - } else { - if (attr_get (attrs, "persist-filesystem", &persist_fs, NULL)< 0) { - goto done_success; - } - if (stat (persist_fs, &sb) < 0) - goto done; - if (!S_ISDIR (sb.st_mode)) { - errno = ENOTDIR; - goto done; - } - if ((sb.st_mode & S_IRWXU) != S_IRWXU) { - errno = EPERM; - goto done; - } - if (attr_set_flags (attrs, "persist-filesystem", - FLUX_ATTRFLAG_IMMUTABLE) < 0) - goto done; - if (asprintf (&tmpl, - "%s/fluxP-%d-XXXXXX", - persist_fs, - (int)getpid()) < 0) - goto done; - if (!(dir = mkdtemp (tmpl))) - goto done; - if (attr_add (attrs, attr, dir, FLUX_ATTRFLAG_IMMUTABLE) < 0) - goto done; - } -done_success: - if (attr_get (attrs, "persist-filesystem", NULL, NULL) < 0) { - if (attr_add (attrs, "persist-filesystem", NULL, - FLUX_ATTRFLAG_IMMUTABLE) < 0) - goto done; - } - if (attr_get (attrs, "persist-directory", NULL, NULL) < 0) { - if (attr_add (attrs, "persist-directory", NULL, - FLUX_ATTRFLAG_IMMUTABLE) < 0) - goto done; - } - rc = 0; -done: - if (tmpl) - free (tmpl); - return rc; -} - static bool nodeset_member (const char *s, uint32_t rank) { struct idset *ns = NULL; diff --git a/src/broker/log.c b/src/broker/log.c index d0a1093256ba..93d5efbe2fd1 100644 --- a/src/broker/log.c +++ b/src/broker/log.c @@ -411,26 +411,12 @@ static int attr_set_log (const char *name, const char *val, void *arg) static int logbuf_register_attrs (logbuf_t *logbuf, attr_t *attrs) { - char s[PATH_MAX]; - const char *val; int rc = -1; /* log-filename * Only allowed to be set on rank 0 (ignore initial value on rank > 0). - * If unset, and persist-directory is set, make it ${persist-directory}/log */ if (logbuf->rank == 0) { - if (attr_get (attrs, "log-filename", NULL, NULL) < 0 - && attr_get (attrs, "persist-directory", &val, NULL) == 0 && val) { - if (snprintf (s, sizeof (s), "%s/log", val) >= sizeof (s)) { - log_err ("log-filename truncated"); - goto done; - } - if (attr_add (attrs, "log-filename", s, 0) < 0) { - log_err ("could not initialize log-filename"); - goto done; - } - } if (attr_add_active (attrs, "log-filename", 0, attr_get_log, attr_set_log, logbuf) < 0) goto done; diff --git a/t/t0001-basic.t b/t/t0001-basic.t index 13cfbe9ab1bc..08da39c3e0e2 100755 --- a/t/t0001-basic.t +++ b/t/t0001-basic.t @@ -207,32 +207,6 @@ test_expect_success 'rundir override creates nonexistent dirs' ' flux start ${ARGS} -o,--setattr=rundir=$RUNDIR sh -c "test -d $RUNDIR" && test_expect_code 1 test -d $RUNDIR ' -test_expect_success 'broker persist-directory works' ' - PERSISTDIR=`mktemp -d` && - flux start ${ARGS} -o,--setattr=persist-directory=$PERSISTDIR /bin/true && - test -d $PERSISTDIR && - test `ls -1 $PERSISTDIR|wc -l` -gt 0 && - rm -rf $PERSISTDIR -' -test_expect_success 'broker persist-filesystem works' ' - PERSISTFS=`mktemp -d` && - PERSISTDIR=`flux start ${ARGS} -o,--setattr=persist-filesystem=$PERSISTFS flux getattr persist-directory` && - test -d $PERSISTDIR && - test `ls -1 $PERSISTDIR|wc -l` -gt 0 && - rm -rf $PERSISTDIR && - test -d $PERSISTFS && - rmdir $PERSISTFS -' -test_expect_success 'broker persist-filesystem is ignored if persist-directory set' ' - PERSISTFS=`mktemp -d` && - PERSISTDIR=`mktemp -d` && - DIR=`flux start ${ARGS} -o,--setattr=persist-filesystem=$PERSISTFS,--setattr=persist-directory=$PERSISTDIR \ - flux getattr persist-directory` && - test "$DIR" = "$PERSISTDIR" && - test `ls -1 $PERSISTDIR|wc -l` -gt 0 && - rmdir $PERSISTFS && - rm -rf $PERSISTDIR -' # Use -eq hack to test that BROKERPID is a number test_expect_success 'broker broker.pid attribute is readable' ' BROKERPID=`flux start ${ARGS} flux getattr broker.pid` && diff --git a/t/t2010-kvs-snapshot-restore.t b/t/t2010-kvs-snapshot-restore.t index ae7f89dde15b..c97fffa7efb7 100755 --- a/t/t2010-kvs-snapshot-restore.t +++ b/t/t2010-kvs-snapshot-restore.t @@ -1,41 +1,10 @@ #!/bin/sh # -test_description='Test loading KVS snapshot from earlier instance - -Test recovery of KVS snapshot from persistdir.' +test_description='Test KVS snapshot/restore' # Append --logfile option if FLUX_TESTS_LOGFILE is set in environment: test -n "$FLUX_TESTS_LOGFILE" && set -- "$@" --logfile . `dirname $0`/sharness.sh - -test_expect_success 'created persist-directory' ' - PERSISTDIR=$(mktemp -d --tmpdir=$(pwd)) -' - -test_expect_success 'run instance with persist-directory set' ' - rm -f $PERSISTDIR/kvsroot.final && - flux start -o,--setattr=persist-directory=$PERSISTDIR \ - flux kvs put testkey=42 -' - -test_expect_success 'sqlite file exists in persist-directory' ' - test -f $PERSISTDIR/content/sqlite && - echo Size in bytes: $(stat --format "%s" $PERSISTDIR/content/sqlite) -' - -test_expect_success 'kvsroot.final file exists in persist-directory' ' - test -f $PERSISTDIR/kvsroot.final -' - -test_expect_success 'recover KVS snapshot from persist-directory in new instance' ' - run_timeout 60 \ - flux start -o,--setattr=persist-directory=$PERSISTDIR \ - "flux kvs put --treeobj snap=- <$PERSISTDIR/kvsroot.final && \ - flux kvs get snap.testkey >testkey.out" && - echo 42 >testkey.exp && - test_cmp testkey.exp testkey.out -' - test_done From a274ceb5ba3dfed8b1d8ef1877f2dd5d77693a08 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 27 Feb 2020 10:42:51 -0800 Subject: [PATCH 3/8] content-sqlite: handle content.backing-path attr Drop the code that manipulates the location and persistence of the sqlite db file based on the 'persist-directory' attribute, and replace it with the following: If 'content.backing-path' attribute is set on entry, use its value as the sqlite db file. sqlite creates this file if it doesn't exist. On exit, it is preserved. If 'content.backing-path' is not set on entry, use '${rundir}/content.sqlite' as the sqlite db file. sqlite creates this file if it doesn't exist(*). On exit it is removed. Set the 'content.backing-path' attribte so the location of this file. Add a 'flux content flush' command to rc3 just before content-sqlite unload, to ensure that any dirty cache entries are flushed to the backing store before content-sqlite is unloaded. _______ (*) This file is not expected to exist initially given that rundir is a temporary directory, but it _could_ exit if the content-sqlite module is reloaded. Reusing its content is appropriate in that case. --- etc/rc3 | 1 + src/modules/content-sqlite/content-sqlite.c | 66 +++++++++++++-------- 2 files changed, 42 insertions(+), 25 deletions(-) diff --git a/etc/rc3 b/etc/rc3 index 45d9e7a02fa1..fb53eec7d412 100755 --- a/etc/rc3 +++ b/etc/rc3 @@ -32,4 +32,5 @@ flux exec -r all flux module remove -f kvs-watch flux exec -r all -x 0 flux module remove -f kvs flux module remove -f kvs +flux content flush flux module remove -f content-sqlite diff --git a/src/modules/content-sqlite/content-sqlite.c b/src/modules/content-sqlite/content-sqlite.c index 79c06fb15401..0cdb109311eb 100644 --- a/src/modules/content-sqlite/content-sqlite.c +++ b/src/modules/content-sqlite/content-sqlite.c @@ -38,7 +38,6 @@ const char *sql_store = "INSERT INTO objects (hash,size,object) " struct content_sqlite { flux_msg_handler_t **handlers; - char *dbdir; char *dbfile; sqlite3 *db; sqlite3_stmt *load_stmt; @@ -386,11 +385,15 @@ static void content_sqlite_closedb (struct content_sqlite *ctx) } } +/* Open the database file ctx->dbfile and set up the database. + */ static int content_sqlite_opendb (struct content_sqlite *ctx) { - if (sqlite3_open (ctx->dbfile, &ctx->db) != SQLITE_OK) { - flux_log_error (ctx->h, "sqlite3_open %s", ctx->dbfile); - return -1; + int flags = SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE; + + if (sqlite3_open_v2 (ctx->dbfile, &ctx->db, flags, NULL) != SQLITE_OK) { + log_sqlite_error (ctx, "opening %s", ctx->dbfile); + goto error; } if (sqlite3_exec (ctx->db, "PRAGMA journal_mode=OFF", @@ -452,7 +455,6 @@ static void content_sqlite_destroy (struct content_sqlite *ctx) int saved_errno = errno; flux_msg_handler_delvec (ctx->handlers); free (ctx->dbfile); - free (ctx->dbdir); free (ctx->lzo_buf); free (ctx); errno = saved_errno; @@ -468,9 +470,8 @@ static const struct flux_msg_handler_spec htab[] = { static struct content_sqlite *content_sqlite_create (flux_t *h) { struct content_sqlite *ctx; - const char *dir; + const char *backing_path; const char *tmp; - bool cleanup = false; if (!(ctx = calloc (1, sizeof (*ctx)))) return NULL; @@ -478,6 +479,12 @@ static struct content_sqlite *content_sqlite_create (flux_t *h) goto error; ctx->lzo_bufsize = lzo_buf_chunksize; ctx->h = h; + + /* Some tunables: + * - the hash function, e.g. sha1, sha256 + * - the maximum blob size + * - path to sqlite file + */ if (!(ctx->hashfun = flux_attr_get (h, "content.hash"))) { flux_log_error (h, "content.hash"); goto error; @@ -488,27 +495,34 @@ static struct content_sqlite *content_sqlite_create (flux_t *h) } ctx->blob_size_limit = strtoul (tmp, NULL, 10); - if (!(dir = flux_attr_get (h, "persist-directory"))) { - if (!(dir = flux_attr_get (h, "broker.rundir"))) { - flux_log_error (h, "broker.rundir"); + /* If 'content.backing-path' attribute is already set, then: + * - value is the sqlite backing file + * - if it exists, preserve existing content; else create empty + * - ensure that file perists when the instance exits + * Otherwise: + * - ${rundir}/content.sqlite is the backing file + * - ensure that file is cleaned up when the instance exits + * - set 'content.backing-path' to this name + */ + backing_path = flux_attr_get (h, "content.backing-path"); + if (backing_path) { + if (!(ctx->dbfile = strdup (backing_path))) goto error; - } - cleanup = true; } - if (asprintf (&ctx->dbdir, "%s/content", dir) < 0) - goto error; - if (mkdir (ctx->dbdir, 0755) < 0 && errno != EEXIST) { - flux_log_error (h, "mkdir %s", ctx->dbdir); - goto error; + else { + const char *rundir = flux_attr_get (h, "rundir"); + if (!rundir) { + flux_log_error (h, "rundir"); + goto error; + } + if (asprintf (&ctx->dbfile, "%s/content.sqlite", rundir) < 0) + goto error; + if (flux_attr_set (h, "content.backing-path", ctx->dbfile) < 0) + goto error; + cleanup_push_string (cleanup_file, ctx->dbfile); } - if (cleanup) - cleanup_push_string (cleanup_directory_recursive, ctx->dbdir); - if (asprintf (&ctx->dbfile, "%s/sqlite", ctx->dbdir) < 0) + if (flux_msg_handler_addvec (h, htab, ctx, &ctx->handlers) < 0) goto error; - if (flux_msg_handler_addvec (h, htab, ctx, &ctx->handlers) < 0) { - flux_log_error (h, "flux_msg_handler_addvec"); - goto error; - } return ctx; error: content_sqlite_destroy (ctx); @@ -519,8 +533,10 @@ int mod_main (flux_t *h, int argc, char **argv) { struct content_sqlite *ctx; - if (!(ctx = content_sqlite_create (h))) + if (!(ctx = content_sqlite_create (h))) { + flux_log_error (h, "content_sqlite_create failed"); return -1; + } if (content_sqlite_opendb(ctx) < 0) goto done; if (register_backing_store (h, "content-sqlite") < 0) { From 4a68e3d905a7d3edbebf89f6afc68af4bc1c09e5 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 27 Feb 2020 15:25:45 -0800 Subject: [PATCH 4/8] modules/content-sqlite: add kvs-checkpoint side service Add a new sqlite table to store KVS checkpoint data, and offer a new service, 'kvs-checkpoint', with get/put methods. The table has the following properties: - keys and values are strings of any length - keys are unique - empty string key is not allowed - put to existing key replaces value Note: piggybacking this service/table onto content-sqlite is a temporary short cut to get something working quickly. It should probably be split out to its own module eventually, as implementing it here will make it cumbersome to replace the content backing module. --- src/modules/content-sqlite/content-sqlite.c | 150 +++++++++++++++++++- 1 file changed, 146 insertions(+), 4 deletions(-) diff --git a/src/modules/content-sqlite/content-sqlite.c b/src/modules/content-sqlite/content-sqlite.c index 0cdb109311eb..919c80e5978c 100644 --- a/src/modules/content-sqlite/content-sqlite.c +++ b/src/modules/content-sqlite/content-sqlite.c @@ -36,12 +36,23 @@ const char *sql_load = "SELECT object,size FROM objects" const char *sql_store = "INSERT INTO objects (hash,size,object) " " values (?1, ?2, ?3)"; +const char *sql_create_table_checkpt = "CREATE TABLE if not exists checkpt(" + " key TEXT UNIQUE," + " value TEXT" + ");"; +const char *sql_checkpt_get = "SELECT value FROM checkpt" + " WHERE key = ?1"; +const char *sql_checkpt_put = "REPLACE INTO checkpt (key,value) " + " values (?1, ?2)"; + struct content_sqlite { flux_msg_handler_t **handlers; char *dbfile; sqlite3 *db; sqlite3_stmt *load_stmt; sqlite3_stmt *store_stmt; + sqlite3_stmt *checkpt_get_stmt; + sqlite3_stmt *checkpt_put_stmt; flux_t *h; const char *hashfun; uint32_t blob_size_limit; @@ -324,6 +335,99 @@ void store_cb (flux_t *h, flux_log_error (h, "store: flux_respond_error"); } +void checkpoint_get_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + struct content_sqlite *ctx = arg; + const char *key; + + if (flux_request_unpack (msg, NULL, "{s:s}", "key", &key) < 0) + goto error; + if (sqlite3_bind_text (ctx->checkpt_get_stmt, + 1, + (char *)key, + strlen (key), + SQLITE_STATIC) != SQLITE_OK) { + log_sqlite_error (ctx, "checkpt_get: binding key"); + set_errno_from_sqlite_error (ctx); + goto error; + } + if (sqlite3_step (ctx->checkpt_get_stmt) != SQLITE_ROW) { + errno = ENOENT; + goto error; + } + if (flux_respond_pack (h, + msg, + "{s:s}", + "value", + sqlite3_column_text (ctx->checkpt_get_stmt, 0)) < 0) + flux_log_error (h, "flux_respond_pack"); + (void )sqlite3_reset (ctx->checkpt_get_stmt); + return; +error: + if (flux_respond_error (h, msg, errno, NULL) < 0) + flux_log_error (h, "flux_respond_error"); + (void )sqlite3_reset (ctx->checkpt_get_stmt); +} + +void checkpoint_put_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + struct content_sqlite *ctx = arg; + const char *key; + const char *value; + + if (flux_request_unpack (msg, + NULL, + "{s:s s:s}", + "key", + &key, + "value", + &value) < 0) + goto error; + if (strlen (key) == 0) { + errno = EINVAL; + goto error; + } + if (sqlite3_bind_text (ctx->checkpt_put_stmt, + 1, + (char *)key, + strlen (key), + SQLITE_STATIC) != SQLITE_OK) { + log_sqlite_error (ctx, "checkpt_put: binding key"); + set_errno_from_sqlite_error (ctx); + goto error; + } + if (sqlite3_bind_text (ctx->checkpt_put_stmt, + 2, + (char *)value, + strlen (value), + SQLITE_STATIC) != SQLITE_OK) { + log_sqlite_error (ctx, "checkpt_put: binding value"); + set_errno_from_sqlite_error (ctx); + goto error; + } + if (sqlite3_step (ctx->checkpt_put_stmt) != SQLITE_DONE + && sqlite3_errcode (ctx->db) != SQLITE_CONSTRAINT) { + log_sqlite_error (ctx, "checkpt_put: executing stmt"); + set_errno_from_sqlite_error (ctx); + goto error; + } + if (flux_respond (h, msg, NULL) < 0) + flux_log_error (h, "flux_respond"); + (void )sqlite3_reset (ctx->checkpt_put_stmt); + return; +error: + if (flux_respond_error (h, msg, errno, NULL) < 0) + flux_log_error (h, "flux_respond_error"); + (void )sqlite3_reset (ctx->checkpt_put_stmt); +} + + int register_backing_store (flux_t *h, const char *name) { flux_future_t *f; @@ -354,11 +458,11 @@ int unregister_backing_store (flux_t *h) return rc; } -int register_content_backing_service (flux_t *h) +static int register_service (flux_t *h, const char *name) { int rc; flux_future_t *f; - if (!(f = flux_service_register (h, "content-backing"))) + if (!(f = flux_service_register (h, name))) return -1; rc = flux_future_get (f, NULL); flux_future_destroy (f); @@ -377,6 +481,14 @@ static void content_sqlite_closedb (struct content_sqlite *ctx) if (sqlite3_finalize (ctx->load_stmt) != SQLITE_OK) log_sqlite_error (ctx, "sqlite_finalize load_stmt"); } + if (ctx->checkpt_get_stmt) { + if (sqlite3_finalize (ctx->checkpt_get_stmt) != SQLITE_OK) + log_sqlite_error (ctx, "sqlite_finalize checkpt_get_stmt"); + } + if (ctx->checkpt_put_stmt) { + if (sqlite3_finalize (ctx->checkpt_put_stmt) != SQLITE_OK) + log_sqlite_error (ctx, "sqlite_finalize checkpt_put_stmt"); + } if (ctx->db) { if (sqlite3_close (ctx->db) != SQLITE_OK) log_sqlite_error (ctx, "sqlite3_close"); @@ -424,7 +536,15 @@ static int content_sqlite_opendb (struct content_sqlite *ctx) NULL, NULL, NULL) != SQLITE_OK) { - log_sqlite_error (ctx, "creating table"); + log_sqlite_error (ctx, "creating object table"); + goto error; + } + if (sqlite3_exec (ctx->db, + sql_create_table_checkpt, + NULL, + NULL, + NULL) != SQLITE_OK) { + log_sqlite_error (ctx, "creating checkpt table"); goto error; } if (sqlite3_prepare_v2 (ctx->db, @@ -443,6 +563,22 @@ static int content_sqlite_opendb (struct content_sqlite *ctx) log_sqlite_error (ctx, "preparing store stmt"); goto error; } + if (sqlite3_prepare_v2 (ctx->db, + sql_checkpt_get, + -1, + &ctx->checkpt_get_stmt, + NULL) != SQLITE_OK) { + log_sqlite_error (ctx, "preparing checkpt_get stmt"); + goto error; + } + if (sqlite3_prepare_v2 (ctx->db, + sql_checkpt_put, + -1, + &ctx->checkpt_put_stmt, + NULL) != SQLITE_OK) { + log_sqlite_error (ctx, "preparing checkpt_put stmt"); + goto error; + } return 0; error: set_errno_from_sqlite_error (ctx); @@ -464,6 +600,8 @@ static void content_sqlite_destroy (struct content_sqlite *ctx) static const struct flux_msg_handler_spec htab[] = { { FLUX_MSGTYPE_REQUEST, "content-backing.load", load_cb, 0 }, { FLUX_MSGTYPE_REQUEST, "content-backing.store", store_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "kvs-checkpoint.get", checkpoint_get_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "kvs-checkpoint.put", checkpoint_put_cb, 0 }, FLUX_MSGHANDLER_TABLE_END, }; @@ -543,10 +681,14 @@ int mod_main (flux_t *h, int argc, char **argv) flux_log_error (h, "registering backing store"); goto done; } - if (register_content_backing_service (h) < 0) { + if (register_service (h, "content-backing") < 0) { flux_log_error (h, "service.add: content-backing"); goto done; } + if (register_service (h, "kvs-checkpoint") < 0) { + flux_log_error (h, "service.add: kvs-checkpiont"); + goto done; + } if (flux_reactor_run (flux_get_reactor (h), 0) < 0) { flux_log_error (h, "flux_reactor_run"); goto done; From 6694c96a449d6ac1a071cfb9f5ed1e93d95126e0 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 27 Feb 2020 17:25:39 -0800 Subject: [PATCH 5/8] modules/kvs: save/restore checkpoint for primary ns On rank 0 startup, try to get a 'kvs-primary' key from the kvs-checkpoint service. If the service is available and the key is found, use it as the blobref for the initial KVS root. If unavailable or not found, start with an empty directory as before. On rank 0 unload, try to put 'kvs-primary' to the kvs-checkpoint service. If the service is unavailable, don't treat this as an error, since the instance may be purposefully running without one. --- src/modules/kvs/kvs.c | 87 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 84 insertions(+), 3 deletions(-) diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index cb381663b030..72cbf0d0b689 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -2831,6 +2831,62 @@ static void process_args (kvs_ctx_t *ctx, int ac, char **av) } } +/* Synchronously get string value by key from checkpoint service. + * Copy value to buf with '\0' termination. + * Return 0 on success, -1 on failure, + */ +static int checkpoint_get (flux_t *h, const char *key, char *buf, size_t len) +{ + flux_future_t *f; + const char *value; + + if (!(f = flux_rpc_pack (h, + "kvs-checkpoint.get", + 0, + 0, + "{s:s}", + "key", + key))) + return -1; + if (flux_rpc_get_unpack (f, "{s:s}", "value", &value) < 0) + goto error; + if (strlen (value) >= len) { + errno = EINVAL; + goto error; + } + strcpy (buf, value); + flux_future_destroy (f); + return 0; +error: + flux_future_destroy (f); + return -1; +} + +/* Synchronously store key-value pair to checkpoint service. + * Returns 0 on success, -1 on failure. + */ +static int checkpoint_put (flux_t *h, const char *key, const char *value) +{ + flux_future_t *f; + + if (!(f = flux_rpc_pack (h, + "kvs-checkpoint.put", + 0, + 0, + "{s:s s:s}", + "key", + key, + "value", + value))) + return -1; + if (flux_rpc_get (f, NULL) < 0) { + flux_future_destroy (f); + return -1; + } + flux_future_destroy (f); + return 0; +} + /* Store initial root in local cache, and flush to content cache * synchronously. The corresponding blobref is written into 'ref'. */ @@ -2919,9 +2975,16 @@ int mod_main (flux_t *h, int argc, char **argv) char rootref[BLOBREF_MAX_STRING_SIZE]; uint32_t owner = geteuid (); - if (store_initial_rootdir (ctx, rootref, sizeof (rootref)) < 0) { - flux_log_error (h, "storing initial root object"); - goto done; + /* Look for a checkpoint and use it if found. + * Otherwise start the primary root namespace with an empty directory. + */ + if (checkpoint_get (h, "kvs-primary", rootref, sizeof (rootref)) == 0) + flux_log (h, LOG_INFO, "restored kvs-primary from checkpoint"); + else { + if (store_initial_rootdir (ctx, rootref, sizeof (rootref)) < 0) { + flux_log_error (h, "storing initial root object"); + goto done; + } } /* primary namespace must always be there and not marked @@ -2956,6 +3019,24 @@ int mod_main (flux_t *h, int argc, char **argv) flux_log_error (h, "flux_reactor_run"); goto done; } + /* Checkpoint the KVS root to the content backing store. + * If backing store is not loaded, silently proceed without checkpoint. + */ + if (ctx->rank == 0) { + struct kvsroot *root; + + if (!(root = kvsroot_mgr_lookup_root_safe (ctx->krm, + KVS_PRIMARY_NAMESPACE))) { + flux_log_error (h, "error looking up primary root"); + goto done; + } + if (checkpoint_put (ctx->h, "kvs-primary", root->ref) < 0) { + if (errno != ENOSYS) { // service not loaded is not an error + flux_log_error (h, "error saving primary KVS checkpoint"); + goto done; + } + } + } rc = 0; done: flux_msg_handler_delvec (handlers); From d28bde78e75d895be8ffb61a4e32875a128393f9 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 4 Mar 2020 07:57:38 -0800 Subject: [PATCH 6/8] testsuite: add kvs/checkpoint test program Add a test program for exercising kvs-snapshot.get and kvs-snapshot.put service. --- t/Makefile.am | 6 ++++ t/kvs/checkpoint.c | 80 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+) create mode 100644 t/kvs/checkpoint.c diff --git a/t/Makefile.am b/t/Makefile.am index 93f9ba5ddef0..e0cfe7d3d591 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -228,6 +228,7 @@ check_PROGRAMS = \ kvs/issue1876 \ kvs/waitcreate_cancel \ kvs/setrootevents \ + kvs/checkpoint \ request/treq \ request/rpc \ barrier/tbarrier \ @@ -393,6 +394,11 @@ kvs_setrootevents_CPPFLAGS = $(test_cppflags) kvs_setrootevents_LDADD = \ $(test_ldadd) $(LIBDL) $(LIBUTIL) +kvs_checkpoint_SOURCES = kvs/checkpoint.c +kvs_checkpoint_CPPFLAGS = $(test_cppflags) +kvs_checkpoint_LDADD = \ + $(test_ldadd) $(LIBDL) $(LIBUTIL) + request_treq_SOURCES = request/treq.c request_treq_CPPFLAGS = $(test_cppflags) request_treq_LDADD = \ diff --git a/t/kvs/checkpoint.c b/t/kvs/checkpoint.c new file mode 100644 index 000000000000..c868b771e4f7 --- /dev/null +++ b/t/kvs/checkpoint.c @@ -0,0 +1,80 @@ +/************************************************************\ + * Copyright 2020 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include "src/common/libutil/log.h" + +void usage (void) +{ + fprintf (stderr, "Usage: checkpoint get key\n" + " or: checkpoint put key value\n"); + exit (1); +} + +int main (int argc, char **argv) +{ + flux_t *h; + const char *cmd; + const char *key; + const char *value; + flux_future_t *f; + + if (argc < 2 || argc > 4) + usage (); + cmd = argv[1]; + key = argv[2]; + if (argc == 4) + value = argv[3]; + if (strcmp (cmd, "get") != 0 && strcmp (cmd, "put") != 0) + usage (); + + if (!(h = flux_open (NULL, 0))) + log_err_exit ("flux_open"); + + if (!strcmp (cmd, "put")) { + if (!(f = flux_rpc_pack (h, + "kvs-checkpoint.put", + 0, + 0, + "{s:s s:s}", + "key", + key, + "value", + value))) + log_err_exit("flux_rpc"); + if (flux_rpc_get (f, NULL) < 0) + log_err_exit ("%s", key); + } + else { + if (!(f = flux_rpc_pack (h, + "kvs-checkpoint.get", + 0, + 0, + "{s:s}", + "key", + key))) + log_err_exit("flux_rpc"); + if (flux_rpc_get_unpack (f, "{s:s}", "value", &value) < 0) + log_err_exit ("%s", key); + printf ("%s\n", value); + } + + flux_future_destroy (f); + flux_close (h); + return 0; +} + +/* + * vi: ts=4 sw=4 expandtab + */ From d95eb5ebc1e85d9fd581a50cc74aa91bc083dcda Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 4 Mar 2020 06:49:34 -0800 Subject: [PATCH 7/8] testsuite: cover KVS checkpoint/restart --- t/t2010-kvs-snapshot-restore.t | 52 ++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/t/t2010-kvs-snapshot-restore.t b/t/t2010-kvs-snapshot-restore.t index c97fffa7efb7..6f0959a32891 100755 --- a/t/t2010-kvs-snapshot-restore.t +++ b/t/t2010-kvs-snapshot-restore.t @@ -7,4 +7,56 @@ test_description='Test KVS snapshot/restore' test -n "$FLUX_TESTS_LOGFILE" && set -- "$@" --logfile . `dirname $0`/sharness.sh +test_under_flux 1 + +CHECKPOINT=${FLUX_BUILD_DIR}/t/kvs/checkpoint + +test_expect_success 'store kvs-checkpoint key-val pairs' ' + $CHECKPOINT put foo bar && + $CHECKPOINT put foo2 42 && + $CHECKPOINT put foo3 "x x x" +' + +test_expect_success 'verify kvs-checkpoint key-val pairs' ' + test "$($CHECKPOINT get foo)" = "bar" && + test "$($CHECKPOINT get foo2)" = "42" && + test "$($CHECKPOINT get foo3)" = "x x x" +' + +test_expect_success 'get unknown kvs-checkpoint key fails' ' + test_must_fail $CHECKPOINT get noexist +' + +test_expect_success 'put existing kvs-checkpoint key is allowed' ' + $CHECKPOINT put foo zzz +' + +test_expect_success 'kvs-checkpoint value was updated' ' + test $($CHECKPOINT get foo) = "zzz" +' + +test_expect_success 'empty kvs-checkpoint key is not allowed' ' + test_must_fail $CHECKPOINT put "" xyz +' + +test_expect_success 'run instance with content.backing-path set' ' + flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \ + flux kvs put testkey=42 +' + +test_expect_success 'content.sqlite file exists after instance exited' ' + test -f content.sqlite && + echo Size in bytes: $(stat --format "%s" content.sqlite) +' + +test_expect_success 're-run instance with content.backing-path set' ' + flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \ + flux kvs get testkey >get.out +' + +test_expect_success 'content from previous instance survived' ' + echo 42 >get.exp && + test_cmp get.exp get.out +' + test_done From 558d46f7bb8624b7655b540b10cfc220f5b2222d Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 4 Mar 2020 06:56:29 -0800 Subject: [PATCH 8/8] doc/flux-broker-attributes(7): add backing-path Describe the new content.backing-path attribute. --- doc/man7/flux-broker-attributes.adoc | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/doc/man7/flux-broker-attributes.adoc b/doc/man7/flux-broker-attributes.adoc index a32c02b7c06d..c0fc0b5476ef 100644 --- a/doc/man7/flux-broker-attributes.adoc +++ b/doc/man7/flux-broker-attributes.adoc @@ -40,6 +40,13 @@ the session. By default broker.rundir is set to "${rundir}/${rank}", which guarantees a unique directory per rank. It is not advisable to override this attribute on the command line. Use rundir instead. +content.backing-path:: +The path to the content backing store file(s). If this is set on the +broker command line, the backing store uses this path instead of +a temporary one, and content is preserved on instance exit. +If file exists, its content is imported into the instance. +If it doesn't exist, it is created. + TOPOLOGY ATTRIBUTES ------------------- tbon.arity::