diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index e79536c6eae5..3e75772a83e4 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -2707,14 +2707,50 @@ static void process_args (struct kvs_ctx *ctx, int ac, char **av) } } +static int checkpoint_get_version0 (flux_t *h, const char *value, + char *buf, size_t len) +{ + if (strlen (value) >= len) { + errno = EINVAL; + return -1; + } + strcpy (buf, value); + return 0; +} + +static int checkpoint_get_version1 (flux_t *h, json_t *o, + char *buf, size_t len, + double *timestamp) +{ + const char *rootref = NULL; + + if (json_unpack (o, "{s:s s:f}", + "rootref", &rootref, + "timestamp", timestamp) < 0) { + errno = EINVAL; + return -1; + } + if (strlen (rootref) >= len) { + errno = EINVAL; + return -1; + } + strcpy (buf, rootref); + return 0; +} + /* Synchronously get string value by key from checkpoint service. * Copy value to buf with '\0' termination. * Return 0 on success, -1 on failure, */ -static int checkpoint_get (flux_t *h, const char *key, char *buf, size_t len) +static int checkpoint_get (flux_t *h, const char *key, + char *buf, size_t len, + double *timestamp) { - flux_future_t *f; - const char *value; + flux_future_t *f = NULL; + const char *value = NULL; + json_t *o = NULL; + int version; + int rv = -1; if (!(f = flux_rpc_pack (h, "kvs-checkpoint.get", @@ -2726,24 +2762,79 @@ static int checkpoint_get (flux_t *h, const char *key, char *buf, size_t len) return -1; if (flux_rpc_get_unpack (f, "{s:s}", "value", &value) < 0) goto error; - if (strlen (value) >= len) { + + /* if value is a blobref, its verison 0 checkpoint */ + if (blobref_validate (value) == 0) + return checkpoint_get_version0 (h, value, buf, len); + + if (!(o = json_loads (value, 0, NULL))) { errno = EINVAL; goto error; } - strcpy (buf, value); - flux_future_destroy (f); - return 0; + if (json_is_object (o)) { + if (json_unpack (o, "{s:i}", "version", &version) < 0) { + errno = EINVAL; + goto error; + } + /* only can handle version 1 right now */ + if (version == 1) { + if (checkpoint_get_version1 (h, o, buf, len, timestamp) < 0) + goto error; + } + else { + errno = EINVAL; + goto error; + } + } + else { + errno = EINVAL; + goto error; + } + rv = 0; error: flux_future_destroy (f); - return -1; + json_decref (o); + return rv; +} + +static int get_timestamp_now (double *timestamp) +{ + struct timespec ts; + if (clock_gettime (CLOCK_REALTIME, &ts) < 0) + return -1; + *timestamp = (1E-9 * ts.tv_nsec) + ts.tv_sec; + return 0; } /* Synchronously store key-value pair to checkpoint service. * Returns 0 on success, -1 on failure. */ -static int checkpoint_put (flux_t *h, const char *key, const char *value) +static int checkpoint_put (flux_t *h, const char *key, const char *rootref) { - flux_future_t *f; + flux_future_t *f = NULL; + double timestamp; + json_t *o = NULL; + char *value = NULL; + int rv = -1; + + if (get_timestamp_now (×tamp) < 0) + return -1; + /* version 0 checkpoint + * - blobref string only + * version 1 checkpoint object + * - {"version":1 "rootref":s "timestamp":f} + */ + if (!(o = json_pack ("{s:i s:s s:f}", + "version", 1, + "rootref", rootref, + "timestamp", timestamp))) { + errno = ENOMEM; + goto error; + } + if (!(value = json_dumps (o, JSON_COMPACT))) { + errno = ENOMEM; + goto error; + } if (!(f = flux_rpc_pack (h, "kvs-checkpoint.put", @@ -2757,10 +2848,14 @@ static int checkpoint_put (flux_t *h, const char *key, const char *value) return -1; if (flux_rpc_get (f, NULL) < 0) { flux_future_destroy (f); - return -1; + goto error; } + rv = 0; +error: flux_future_destroy (f); - return 0; + json_decref (o); + free (value); + return rv; } /* Store initial root in local cache, and flush to content cache @@ -2850,13 +2945,26 @@ int mod_main (flux_t *h, int argc, char **argv) if (ctx->rank == 0) { struct kvsroot *root; char rootref[BLOBREF_MAX_STRING_SIZE]; + double timestamp = 0.; uint32_t owner = getuid (); /* Look for a checkpoint and use it if found. * Otherwise start the primary root namespace with an empty directory. */ - if (checkpoint_get (h, "kvs-primary", rootref, sizeof (rootref)) == 0) - flux_log (h, LOG_INFO, "restored kvs-primary from checkpoint"); + if (checkpoint_get (h, "kvs-primary", + rootref, sizeof (rootref), ×tamp) == 0) { + char datestr[128]; + time_t sec = timestamp; + struct tm tm; + if (timestamp > 0.) { + gmtime_r (&sec, &tm); + strftime (datestr, sizeof (datestr), "%FT%T", &tm); + } + else + snprintf (datestr, sizeof (datestr), "N/A"); + flux_log (h, LOG_INFO, + "restored kvs-primary from checkpoint on %s", datestr); + } else { if (store_initial_rootdir (ctx, rootref, sizeof (rootref)) < 0) { flux_log_error (h, "storing initial root object"); diff --git a/t/t2010-kvs-snapshot-restore.t b/t/t2010-kvs-snapshot-restore.t index 6f0959a32891..95bbd6c92ce4 100755 --- a/t/t2010-kvs-snapshot-restore.t +++ b/t/t2010-kvs-snapshot-restore.t @@ -59,4 +59,35 @@ test_expect_success 'content from previous instance survived' ' test_cmp get.exp get.out ' +test_expect_success 're-run instance, verify checkpoint date saved' ' + flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \ + flux dmesg >dmesg1.out +' + +# just check for todays date, not time for obvious reasons +test_expect_success 'verify date in flux logs' ' + today=`date --iso-8601` && + grep checkpoint dmesg1.out | grep ${today} +' + +test_expect_success 're-run instance, get rootref' ' + flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \ + flux kvs getroot | jq .data[0] > getroot.out +' + +test_expect_success 'write rootref to checkpoint path, emulating original checkpoint' ' + rootref=$(cat getroot.out) + /usr/bin/sqlite3 $(pwd)/content.sqlite \ + "REPLACE INTO checkpt (key,value) values (\"kvs-primary\", ${rootref})" +' + +test_expect_success 're-run instance, verify checkpoint correctly loaded' ' + flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \ + flux dmesg >dmesg2.out +' + +test_expect_success 'verify checkpoint loaded with no date' ' + grep checkpoint dmesg2.out | grep "N\/A" +' + test_done