From e1b0450c44e8f89a56ff76bfc13e3ac38dc552d0 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Tue, 15 Feb 2022 09:50:55 -0800 Subject: [PATCH] kvs: add date to kvs-primary checkpoint Problem: It'd be convenient if we knew the date when the primary namespace was checkpointed. Solution: When checkpointing the primary namespace, store a json object with version, rootref, and timestamp, instead of just the rootref string. On retrieval, parse appropriately and retrieve timestamp for output in logs. Support the original checkpointing format by checking if the checkpoint object is a raw blobref string first. Fixes #3580 --- src/modules/kvs/kvs.c | 121 ++++++++++++++++++++++++++++----- t/Makefile.am | 1 + t/kvs/change-checkpoint.py | 21 ++++++ t/t2010-kvs-snapshot-restore.t | 31 +++++++++ 4 files changed, 156 insertions(+), 18 deletions(-) create mode 100755 t/kvs/change-checkpoint.py diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index e79536c6eae5..080527909d8c 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -2707,14 +2707,62 @@ static void process_args (struct kvs_ctx *ctx, int ac, char **av) } } +static int checkpoint_get_version0 (const char *value, char *buf, size_t len) +{ + /* if value is a blobref, its verison 0 checkpoint */ + if (blobref_validate (value) == 0) { + if (strlen (value) >= len) + return -1; + strcpy (buf, value); + return 0; + } + return -1; +} + +static int checkpoint_get_version1 (const char *value, + char *buf, size_t len, + double *timestamp) +{ + const char *rootref = NULL; + json_t *o = NULL; + int version; + int rv = -1; + + if (!(o = json_loads (value, 0, NULL))) + goto error; + + if (json_unpack (o, "{s:i}", "version", &version) < 0) + goto error; + /* only can handle version 1 right now */ + if (version == 1) { + if (json_unpack (o, "{s:s s:f}", + "rootref", &rootref, + "timestamp", timestamp) < 0) + goto error; + if (strlen (rootref) >= len) + goto error; + strcpy (buf, rootref); + } + else + goto error; + + rv = 0; +error: + json_decref (o); + return rv; +} + /* Synchronously get string value by key from checkpoint service. * Copy value to buf with '\0' termination. * Return 0 on success, -1 on failure, */ static int checkpoint_get (flux_t *h, const char *key, char *buf, size_t len) { - flux_future_t *f; - const char *value; + flux_future_t *f = NULL; + const char *value = NULL; + double timestamp = 0.; + char datestr[128]; + int rv = -1; if (!(f = flux_rpc_pack (h, "kvs-checkpoint.get", @@ -2726,24 +2774,59 @@ static int checkpoint_get (flux_t *h, const char *key, char *buf, size_t len) return -1; if (flux_rpc_get_unpack (f, "{s:s}", "value", &value) < 0) goto error; - if (strlen (value) >= len) { + + if (checkpoint_get_version1 (value, buf, len, ×tamp) < 0 + && checkpoint_get_version0 (value, buf, len) < 0) { errno = EINVAL; goto error; } - strcpy (buf, value); - flux_future_destroy (f); - return 0; + + if (timestamp > 0.) { + time_t sec = timestamp; + struct tm tm; + gmtime_r (&sec, &tm); + strftime (datestr, sizeof (datestr), "%FT%T", &tm); + } + else + snprintf (datestr, sizeof (datestr), "N/A"); + + flux_log (h, LOG_INFO, + "restored kvs-primary from checkpoint on %s", datestr); + rv = 0; error: flux_future_destroy (f); - return -1; + return rv; } /* Synchronously store key-value pair to checkpoint service. * Returns 0 on success, -1 on failure. */ -static int checkpoint_put (flux_t *h, const char *key, const char *value) +static int checkpoint_put (flux_t *h, const char *key, const char *rootref) { - flux_future_t *f; + flux_future_t *f = NULL; + double timestamp; + json_t *o = NULL; + char *value = NULL; + int save_errno, rv = -1; + + timestamp = flux_reactor_now (flux_get_reactor (h)); + + /* version 0 checkpoint + * - blobref string only + * version 1 checkpoint object + * - {"version":1 "rootref":s "timestamp":f} + */ + if (!(o = json_pack ("{s:i s:s s:f}", + "version", 1, + "rootref", rootref, + "timestamp", timestamp))) { + errno = ENOMEM; + goto error; + } + if (!(value = json_dumps (o, JSON_COMPACT))) { + errno = ENOMEM; + goto error; + } if (!(f = flux_rpc_pack (h, "kvs-checkpoint.put", @@ -2754,13 +2837,17 @@ static int checkpoint_put (flux_t *h, const char *key, const char *value) key, "value", value))) - return -1; - if (flux_rpc_get (f, NULL) < 0) { - flux_future_destroy (f); - return -1; - } + goto error; + if (flux_rpc_get (f, NULL) < 0) + goto error; + rv = 0; +error: + save_errno = errno; flux_future_destroy (f); - return 0; + json_decref (o); + free (value); + errno = save_errno; + return rv; } /* Store initial root in local cache, and flush to content cache @@ -2855,9 +2942,7 @@ int mod_main (flux_t *h, int argc, char **argv) /* Look for a checkpoint and use it if found. * Otherwise start the primary root namespace with an empty directory. */ - if (checkpoint_get (h, "kvs-primary", rootref, sizeof (rootref)) == 0) - flux_log (h, LOG_INFO, "restored kvs-primary from checkpoint"); - else { + if (checkpoint_get (h, "kvs-primary", rootref, sizeof (rootref)) < 0) { if (store_initial_rootdir (ctx, rootref, sizeof (rootref)) < 0) { flux_log_error (h, "storing initial root object"); goto done; diff --git a/t/Makefile.am b/t/Makefile.am index 3c5c1cf2b383..e0e85d03d45c 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -294,6 +294,7 @@ dist_check_SCRIPTS = \ valgrind/valgrind-workload.sh \ valgrind/workload.d/job \ kvs/kvs-helper.sh \ + kvs/change-checkpoint.py \ job-manager/exec-service.lua \ job-manager/drain-cancel.py \ job-manager/bulk-state.py \ diff --git a/t/kvs/change-checkpoint.py b/t/kvs/change-checkpoint.py new file mode 100755 index 000000000000..275b16397e3a --- /dev/null +++ b/t/kvs/change-checkpoint.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python3 + +import sys +import sqlite3 + +if len(sys.argv) < 4: + print("change-checkpoint.py ") + sys.exit(1) + +conn = sqlite3.connect(sys.argv[1]) +cursor = conn.cursor() +s = ( + 'REPLACE INTO checkpt (key,value) values ("' + + sys.argv[2] + + '", "' + + sys.argv[3] + + '")' +) +cursor.execute(s) +conn.commit() +sys.exit(0) diff --git a/t/t2010-kvs-snapshot-restore.t b/t/t2010-kvs-snapshot-restore.t index 6f0959a32891..8ae1da220501 100755 --- a/t/t2010-kvs-snapshot-restore.t +++ b/t/t2010-kvs-snapshot-restore.t @@ -10,6 +10,7 @@ test -n "$FLUX_TESTS_LOGFILE" && set -- "$@" --logfile test_under_flux 1 CHECKPOINT=${FLUX_BUILD_DIR}/t/kvs/checkpoint +CHANGECHECKPOINT=${FLUX_SOURCE_DIR}/t/kvs/change-checkpoint.py test_expect_success 'store kvs-checkpoint key-val pairs' ' $CHECKPOINT put foo bar && @@ -59,4 +60,34 @@ test_expect_success 'content from previous instance survived' ' test_cmp get.exp get.out ' +test_expect_success 're-run instance, verify checkpoint date saved' ' + flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \ + flux dmesg >dmesg1.out +' + +# just check for todays date, not time for obvious reasons +test_expect_success 'verify date in flux logs' ' + today=`date --iso-8601` && + grep checkpoint dmesg1.out | grep ${today} +' + +test_expect_success 're-run instance, get rootref' ' + flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \ + flux kvs getroot -b > getroot.out +' + +test_expect_success 'write rootref to checkpoint path, emulating original checkpoint' ' + rootref=$(cat getroot.out) && + ${CHANGECHECKPOINT} $(pwd)/content.sqlite "kvs-primary" ${rootref} +' + +test_expect_success 're-run instance, verify checkpoint correctly loaded' ' + flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \ + flux dmesg >dmesg2.out +' + +test_expect_success 'verify checkpoint loaded with no date' ' + grep checkpoint dmesg2.out | grep "N\/A" +' + test_done