Skip to content

Commit

Permalink
kvs: add date to kvs-primary checkpoint
Browse files Browse the repository at this point in the history
Problem: It'd be convenient if we knew the date when the kvs
primary checkpoint was checkpointed.

Solution: When checkpointing the primary KVS, store a json
object with version, rootref, and timestamp, instead of just
the rootref string.  On retrieval, parse appropriately and
retrieve timestamp for output in logs.  Support the original
checkpointing format by checking if the checkpoint object
is a raw blobref string first.

Fixes flux-framework#3580
  • Loading branch information
chu11 committed Feb 16, 2022
1 parent db2c75e commit f80b1a5
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 14 deletions.
136 changes: 122 additions & 14 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -2707,14 +2707,50 @@ static void process_args (struct kvs_ctx *ctx, int ac, char **av)
}
}

static int checkpoint_get_version0 (flux_t *h, const char *value,
char *buf, size_t len)
{
if (strlen (value) >= len) {
errno = EINVAL;
return -1;
}
strcpy (buf, value);
return 0;
}

static int checkpoint_get_version1 (flux_t *h, json_t *o,
char *buf, size_t len,
double *timestamp)
{
const char *rootref = NULL;

if (json_unpack (o, "{s:s s:f}",
"rootref", &rootref,
"timestamp", timestamp) < 0) {
errno = EINVAL;
return -1;
}
if (strlen (rootref) >= len) {
errno = EINVAL;
return -1;
}
strcpy (buf, rootref);
return 0;
}

/* Synchronously get string value by key from checkpoint service.
* Copy value to buf with '\0' termination.
* Return 0 on success, -1 on failure,
*/
static int checkpoint_get (flux_t *h, const char *key, char *buf, size_t len)
static int checkpoint_get (flux_t *h, const char *key,
char *buf, size_t len,
double *timestamp)
{
flux_future_t *f;
const char *value;
flux_future_t *f = NULL;
const char *value = NULL;
json_t *o = NULL;
int version;
int rv = -1;

if (!(f = flux_rpc_pack (h,
"kvs-checkpoint.get",
Expand All @@ -2726,24 +2762,79 @@ static int checkpoint_get (flux_t *h, const char *key, char *buf, size_t len)
return -1;
if (flux_rpc_get_unpack (f, "{s:s}", "value", &value) < 0)
goto error;
if (strlen (value) >= len) {

/* if value is a blobref, its verison 0 checkpoint */
if (blobref_validate (value) == 0)
return checkpoint_get_version0 (h, value, buf, len);

if (!(o = json_loads (value, 0, NULL))) {
errno = EINVAL;
goto error;
}
strcpy (buf, value);
flux_future_destroy (f);
return 0;
if (json_is_object (o)) {
if (json_unpack (o, "{s:i}", "version", &version) < 0) {
errno = EINVAL;
goto error;
}
/* only can handle version 1 right now */
if (version == 1) {
if (checkpoint_get_version1 (h, o, buf, len, timestamp) < 0)
goto error;
}
else {
errno = EINVAL;
goto error;
}
}
else {
errno = EINVAL;
goto error;
}
rv = 0;
error:
flux_future_destroy (f);
return -1;
json_decref (o);
return rv;
}

static int get_timestamp_now (double *timestamp)
{
struct timespec ts;
if (clock_gettime (CLOCK_REALTIME, &ts) < 0)
return -1;
*timestamp = (1E-9 * ts.tv_nsec) + ts.tv_sec;
return 0;
}

/* Synchronously store key-value pair to checkpoint service.
* Returns 0 on success, -1 on failure.
*/
static int checkpoint_put (flux_t *h, const char *key, const char *value)
static int checkpoint_put (flux_t *h, const char *key, const char *rootref)
{
flux_future_t *f;
flux_future_t *f = NULL;
double timestamp;
json_t *o = NULL;
char *value = NULL;
int rv = -1;

if (get_timestamp_now (&timestamp) < 0)
return -1;
/* version 0 checkpoint
* - blobref string only
* version 1 checkpoint object
* - {"version":1 "rootref":s "timestamp":f}
*/
if (!(o = json_pack ("{s:i s:s s:f}",
"version", 1,
"rootref", rootref,
"timestamp", timestamp))) {
errno = ENOMEM;
goto error;
}
if (!(value = json_dumps (o, JSON_COMPACT))) {
errno = ENOMEM;
goto error;
}

if (!(f = flux_rpc_pack (h,
"kvs-checkpoint.put",
Expand All @@ -2757,10 +2848,14 @@ static int checkpoint_put (flux_t *h, const char *key, const char *value)
return -1;
if (flux_rpc_get (f, NULL) < 0) {
flux_future_destroy (f);
return -1;
goto error;
}
rv = 0;
error:
flux_future_destroy (f);
return 0;
json_decref (o);
free (value);
return rv;
}

/* Store initial root in local cache, and flush to content cache
Expand Down Expand Up @@ -2850,13 +2945,26 @@ int mod_main (flux_t *h, int argc, char **argv)
if (ctx->rank == 0) {
struct kvsroot *root;
char rootref[BLOBREF_MAX_STRING_SIZE];
double timestamp = 0.;
uint32_t owner = getuid ();

/* Look for a checkpoint and use it if found.
* Otherwise start the primary root namespace with an empty directory.
*/
if (checkpoint_get (h, "kvs-primary", rootref, sizeof (rootref)) == 0)
flux_log (h, LOG_INFO, "restored kvs-primary from checkpoint");
if (checkpoint_get (h, "kvs-primary",
rootref, sizeof (rootref), &timestamp) == 0) {
char datestr[128];
time_t sec = timestamp;
struct tm tm;
if (timestamp > 0.) {
gmtime_r (&sec, &tm);
strftime (datestr, sizeof (datestr), "%FT%T", &tm);
}
else
snprintf (datestr, sizeof (datestr), "N/A");
flux_log (h, LOG_INFO,
"restored kvs-primary from checkpoint on %s", datestr);
}
else {
if (store_initial_rootdir (ctx, rootref, sizeof (rootref)) < 0) {
flux_log_error (h, "storing initial root object");
Expand Down
31 changes: 31 additions & 0 deletions t/t2010-kvs-snapshot-restore.t
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,35 @@ test_expect_success 'content from previous instance survived' '
test_cmp get.exp get.out
'

test_expect_success 're-run instance, verify checkpoint date saved' '
flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \
flux dmesg >dmesg1.out
'

# just check for todays date, not time for obvious reasons
test_expect_success 'verify date in flux logs' '
today=`date --iso-8601` &&
grep checkpoint dmesg1.out | grep ${today}
'

test_expect_success 're-run instance, get rootref' '
flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \
flux kvs getroot | jq .data[0] > getroot.out
'

test_expect_success 'write rootref to checkpoint path, emulating original checkpoint' '
rootref=$(cat getroot.out)
/usr/bin/sqlite3 $(pwd)/content.sqlite \
"REPLACE INTO checkpt (key,value) values (\"kvs-primary\", ${rootref})"
'

test_expect_success 're-run instance, verify checkpoint correctly loaded' '
flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \
flux dmesg >dmesg2.out
'

test_expect_success 'verify checkpoint loaded with no date' '
grep checkpoint dmesg2.out | grep "N\/A"
'

test_done

0 comments on commit f80b1a5

Please sign in to comment.