From f80b1a513481c21f53d2b194e33393bebaaa3d9b Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Tue, 15 Feb 2022 09:50:55 -0800 Subject: [PATCH] kvs: add date to kvs-primary checkpoint Problem: It'd be convenient if we knew the date when the kvs primary checkpoint was checkpointed. Solution: When checkpointing the primary KVS, store a json object with version, rootref, and timestamp, instead of just the rootref string. On retrieval, parse appropriately and retrieve timestamp for output in logs. Support the original checkpointing format by checking if the checkpoint object is a raw blobref string first. Fixes #3580 --- src/modules/kvs/kvs.c | 136 +++++++++++++++++++++++++++++---- t/t2010-kvs-snapshot-restore.t | 31 ++++++++ 2 files changed, 153 insertions(+), 14 deletions(-) diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index e79536c6eae5..3e75772a83e4 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -2707,14 +2707,50 @@ static void process_args (struct kvs_ctx *ctx, int ac, char **av) } } +static int checkpoint_get_version0 (flux_t *h, const char *value, + char *buf, size_t len) +{ + if (strlen (value) >= len) { + errno = EINVAL; + return -1; + } + strcpy (buf, value); + return 0; +} + +static int checkpoint_get_version1 (flux_t *h, json_t *o, + char *buf, size_t len, + double *timestamp) +{ + const char *rootref = NULL; + + if (json_unpack (o, "{s:s s:f}", + "rootref", &rootref, + "timestamp", timestamp) < 0) { + errno = EINVAL; + return -1; + } + if (strlen (rootref) >= len) { + errno = EINVAL; + return -1; + } + strcpy (buf, rootref); + return 0; +} + /* Synchronously get string value by key from checkpoint service. * Copy value to buf with '\0' termination. * Return 0 on success, -1 on failure, */ -static int checkpoint_get (flux_t *h, const char *key, char *buf, size_t len) +static int checkpoint_get (flux_t *h, const char *key, + char *buf, size_t len, + double *timestamp) { - flux_future_t *f; - const char *value; + flux_future_t *f = NULL; + const char *value = NULL; + json_t *o = NULL; + int version; + int rv = -1; if (!(f = flux_rpc_pack (h, "kvs-checkpoint.get", @@ -2726,24 +2762,79 @@ static int checkpoint_get (flux_t *h, const char *key, char *buf, size_t len) return -1; if (flux_rpc_get_unpack (f, "{s:s}", "value", &value) < 0) goto error; - if (strlen (value) >= len) { + + /* if value is a blobref, its verison 0 checkpoint */ + if (blobref_validate (value) == 0) + return checkpoint_get_version0 (h, value, buf, len); + + if (!(o = json_loads (value, 0, NULL))) { errno = EINVAL; goto error; } - strcpy (buf, value); - flux_future_destroy (f); - return 0; + if (json_is_object (o)) { + if (json_unpack (o, "{s:i}", "version", &version) < 0) { + errno = EINVAL; + goto error; + } + /* only can handle version 1 right now */ + if (version == 1) { + if (checkpoint_get_version1 (h, o, buf, len, timestamp) < 0) + goto error; + } + else { + errno = EINVAL; + goto error; + } + } + else { + errno = EINVAL; + goto error; + } + rv = 0; error: flux_future_destroy (f); - return -1; + json_decref (o); + return rv; +} + +static int get_timestamp_now (double *timestamp) +{ + struct timespec ts; + if (clock_gettime (CLOCK_REALTIME, &ts) < 0) + return -1; + *timestamp = (1E-9 * ts.tv_nsec) + ts.tv_sec; + return 0; } /* Synchronously store key-value pair to checkpoint service. * Returns 0 on success, -1 on failure. */ -static int checkpoint_put (flux_t *h, const char *key, const char *value) +static int checkpoint_put (flux_t *h, const char *key, const char *rootref) { - flux_future_t *f; + flux_future_t *f = NULL; + double timestamp; + json_t *o = NULL; + char *value = NULL; + int rv = -1; + + if (get_timestamp_now (×tamp) < 0) + return -1; + /* version 0 checkpoint + * - blobref string only + * version 1 checkpoint object + * - {"version":1 "rootref":s "timestamp":f} + */ + if (!(o = json_pack ("{s:i s:s s:f}", + "version", 1, + "rootref", rootref, + "timestamp", timestamp))) { + errno = ENOMEM; + goto error; + } + if (!(value = json_dumps (o, JSON_COMPACT))) { + errno = ENOMEM; + goto error; + } if (!(f = flux_rpc_pack (h, "kvs-checkpoint.put", @@ -2757,10 +2848,14 @@ static int checkpoint_put (flux_t *h, const char *key, const char *value) return -1; if (flux_rpc_get (f, NULL) < 0) { flux_future_destroy (f); - return -1; + goto error; } + rv = 0; +error: flux_future_destroy (f); - return 0; + json_decref (o); + free (value); + return rv; } /* Store initial root in local cache, and flush to content cache @@ -2850,13 +2945,26 @@ int mod_main (flux_t *h, int argc, char **argv) if (ctx->rank == 0) { struct kvsroot *root; char rootref[BLOBREF_MAX_STRING_SIZE]; + double timestamp = 0.; uint32_t owner = getuid (); /* Look for a checkpoint and use it if found. * Otherwise start the primary root namespace with an empty directory. */ - if (checkpoint_get (h, "kvs-primary", rootref, sizeof (rootref)) == 0) - flux_log (h, LOG_INFO, "restored kvs-primary from checkpoint"); + if (checkpoint_get (h, "kvs-primary", + rootref, sizeof (rootref), ×tamp) == 0) { + char datestr[128]; + time_t sec = timestamp; + struct tm tm; + if (timestamp > 0.) { + gmtime_r (&sec, &tm); + strftime (datestr, sizeof (datestr), "%FT%T", &tm); + } + else + snprintf (datestr, sizeof (datestr), "N/A"); + flux_log (h, LOG_INFO, + "restored kvs-primary from checkpoint on %s", datestr); + } else { if (store_initial_rootdir (ctx, rootref, sizeof (rootref)) < 0) { flux_log_error (h, "storing initial root object"); diff --git a/t/t2010-kvs-snapshot-restore.t b/t/t2010-kvs-snapshot-restore.t index 6f0959a32891..95bbd6c92ce4 100755 --- a/t/t2010-kvs-snapshot-restore.t +++ b/t/t2010-kvs-snapshot-restore.t @@ -59,4 +59,35 @@ test_expect_success 'content from previous instance survived' ' test_cmp get.exp get.out ' +test_expect_success 're-run instance, verify checkpoint date saved' ' + flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \ + flux dmesg >dmesg1.out +' + +# just check for todays date, not time for obvious reasons +test_expect_success 'verify date in flux logs' ' + today=`date --iso-8601` && + grep checkpoint dmesg1.out | grep ${today} +' + +test_expect_success 're-run instance, get rootref' ' + flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \ + flux kvs getroot | jq .data[0] > getroot.out +' + +test_expect_success 'write rootref to checkpoint path, emulating original checkpoint' ' + rootref=$(cat getroot.out) + /usr/bin/sqlite3 $(pwd)/content.sqlite \ + "REPLACE INTO checkpt (key,value) values (\"kvs-primary\", ${rootref})" +' + +test_expect_success 're-run instance, verify checkpoint correctly loaded' ' + flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \ + flux dmesg >dmesg2.out +' + +test_expect_success 'verify checkpoint loaded with no date' ' + grep checkpoint dmesg2.out | grep "N\/A" +' + test_done