diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index e79536c6eae5..080527909d8c 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -2707,14 +2707,62 @@ static void process_args (struct kvs_ctx *ctx, int ac, char **av) } } +static int checkpoint_get_version0 (const char *value, char *buf, size_t len) +{ + /* if value is a blobref, its verison 0 checkpoint */ + if (blobref_validate (value) == 0) { + if (strlen (value) >= len) + return -1; + strcpy (buf, value); + return 0; + } + return -1; +} + +static int checkpoint_get_version1 (const char *value, + char *buf, size_t len, + double *timestamp) +{ + const char *rootref = NULL; + json_t *o = NULL; + int version; + int rv = -1; + + if (!(o = json_loads (value, 0, NULL))) + goto error; + + if (json_unpack (o, "{s:i}", "version", &version) < 0) + goto error; + /* only can handle version 1 right now */ + if (version == 1) { + if (json_unpack (o, "{s:s s:f}", + "rootref", &rootref, + "timestamp", timestamp) < 0) + goto error; + if (strlen (rootref) >= len) + goto error; + strcpy (buf, rootref); + } + else + goto error; + + rv = 0; +error: + json_decref (o); + return rv; +} + /* Synchronously get string value by key from checkpoint service. * Copy value to buf with '\0' termination. * Return 0 on success, -1 on failure, */ static int checkpoint_get (flux_t *h, const char *key, char *buf, size_t len) { - flux_future_t *f; - const char *value; + flux_future_t *f = NULL; + const char *value = NULL; + double timestamp = 0.; + char datestr[128]; + int rv = -1; if (!(f = flux_rpc_pack (h, "kvs-checkpoint.get", @@ -2726,24 +2774,59 @@ static int checkpoint_get (flux_t *h, const char *key, char *buf, size_t len) return -1; if (flux_rpc_get_unpack (f, "{s:s}", "value", &value) < 0) goto error; - if (strlen (value) >= len) { + + if (checkpoint_get_version1 (value, buf, len, ×tamp) < 0 + && checkpoint_get_version0 (value, buf, len) < 0) { errno = EINVAL; goto error; } - strcpy (buf, value); - flux_future_destroy (f); - return 0; + + if (timestamp > 0.) { + time_t sec = timestamp; + struct tm tm; + gmtime_r (&sec, &tm); + strftime (datestr, sizeof (datestr), "%FT%T", &tm); + } + else + snprintf (datestr, sizeof (datestr), "N/A"); + + flux_log (h, LOG_INFO, + "restored kvs-primary from checkpoint on %s", datestr); + rv = 0; error: flux_future_destroy (f); - return -1; + return rv; } /* Synchronously store key-value pair to checkpoint service. * Returns 0 on success, -1 on failure. */ -static int checkpoint_put (flux_t *h, const char *key, const char *value) +static int checkpoint_put (flux_t *h, const char *key, const char *rootref) { - flux_future_t *f; + flux_future_t *f = NULL; + double timestamp; + json_t *o = NULL; + char *value = NULL; + int save_errno, rv = -1; + + timestamp = flux_reactor_now (flux_get_reactor (h)); + + /* version 0 checkpoint + * - blobref string only + * version 1 checkpoint object + * - {"version":1 "rootref":s "timestamp":f} + */ + if (!(o = json_pack ("{s:i s:s s:f}", + "version", 1, + "rootref", rootref, + "timestamp", timestamp))) { + errno = ENOMEM; + goto error; + } + if (!(value = json_dumps (o, JSON_COMPACT))) { + errno = ENOMEM; + goto error; + } if (!(f = flux_rpc_pack (h, "kvs-checkpoint.put", @@ -2754,13 +2837,17 @@ static int checkpoint_put (flux_t *h, const char *key, const char *value) key, "value", value))) - return -1; - if (flux_rpc_get (f, NULL) < 0) { - flux_future_destroy (f); - return -1; - } + goto error; + if (flux_rpc_get (f, NULL) < 0) + goto error; + rv = 0; +error: + save_errno = errno; flux_future_destroy (f); - return 0; + json_decref (o); + free (value); + errno = save_errno; + return rv; } /* Store initial root in local cache, and flush to content cache @@ -2855,9 +2942,7 @@ int mod_main (flux_t *h, int argc, char **argv) /* Look for a checkpoint and use it if found. * Otherwise start the primary root namespace with an empty directory. */ - if (checkpoint_get (h, "kvs-primary", rootref, sizeof (rootref)) == 0) - flux_log (h, LOG_INFO, "restored kvs-primary from checkpoint"); - else { + if (checkpoint_get (h, "kvs-primary", rootref, sizeof (rootref)) < 0) { if (store_initial_rootdir (ctx, rootref, sizeof (rootref)) < 0) { flux_log_error (h, "storing initial root object"); goto done; diff --git a/t/Makefile.am b/t/Makefile.am index 3c5c1cf2b383..e0e85d03d45c 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -294,6 +294,7 @@ dist_check_SCRIPTS = \ valgrind/valgrind-workload.sh \ valgrind/workload.d/job \ kvs/kvs-helper.sh \ + kvs/change-checkpoint.py \ job-manager/exec-service.lua \ job-manager/drain-cancel.py \ job-manager/bulk-state.py \ diff --git a/t/kvs/change-checkpoint.py b/t/kvs/change-checkpoint.py new file mode 100755 index 000000000000..275b16397e3a --- /dev/null +++ b/t/kvs/change-checkpoint.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python3 + +import sys +import sqlite3 + +if len(sys.argv) < 4: + print("change-checkpoint.py ") + sys.exit(1) + +conn = sqlite3.connect(sys.argv[1]) +cursor = conn.cursor() +s = ( + 'REPLACE INTO checkpt (key,value) values ("' + + sys.argv[2] + + '", "' + + sys.argv[3] + + '")' +) +cursor.execute(s) +conn.commit() +sys.exit(0) diff --git a/t/t2010-kvs-snapshot-restore.t b/t/t2010-kvs-snapshot-restore.t index 6f0959a32891..8ae1da220501 100755 --- a/t/t2010-kvs-snapshot-restore.t +++ b/t/t2010-kvs-snapshot-restore.t @@ -10,6 +10,7 @@ test -n "$FLUX_TESTS_LOGFILE" && set -- "$@" --logfile test_under_flux 1 CHECKPOINT=${FLUX_BUILD_DIR}/t/kvs/checkpoint +CHANGECHECKPOINT=${FLUX_SOURCE_DIR}/t/kvs/change-checkpoint.py test_expect_success 'store kvs-checkpoint key-val pairs' ' $CHECKPOINT put foo bar && @@ -59,4 +60,34 @@ test_expect_success 'content from previous instance survived' ' test_cmp get.exp get.out ' +test_expect_success 're-run instance, verify checkpoint date saved' ' + flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \ + flux dmesg >dmesg1.out +' + +# just check for todays date, not time for obvious reasons +test_expect_success 'verify date in flux logs' ' + today=`date --iso-8601` && + grep checkpoint dmesg1.out | grep ${today} +' + +test_expect_success 're-run instance, get rootref' ' + flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \ + flux kvs getroot -b > getroot.out +' + +test_expect_success 'write rootref to checkpoint path, emulating original checkpoint' ' + rootref=$(cat getroot.out) && + ${CHANGECHECKPOINT} $(pwd)/content.sqlite "kvs-primary" ${rootref} +' + +test_expect_success 're-run instance, verify checkpoint correctly loaded' ' + flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \ + flux dmesg >dmesg2.out +' + +test_expect_success 'verify checkpoint loaded with no date' ' + grep checkpoint dmesg2.out | grep "N\/A" +' + test_done