Skip to content

Commit

Permalink
kvs: add date to kvs-primary checkpoint
Browse files Browse the repository at this point in the history
Problem: It'd be convenient if we knew the date when the
primary namespace was checkpointed.

Solution: When checkpointing the primary namespace, store a json
object with version, rootref, and timestamp, instead of just
the rootref string.  On retrieval, parse appropriately and
retrieve timestamp for output in logs.  Support the original
checkpointing format by checking if the checkpoint object
is a raw blobref string first.

Fixes flux-framework#3580
  • Loading branch information
chu11 committed Feb 17, 2022
1 parent db2c75e commit 48d89cd
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 18 deletions.
119 changes: 101 additions & 18 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -2707,14 +2707,62 @@ static void process_args (struct kvs_ctx *ctx, int ac, char **av)
}
}

static int checkpoint_get_version0 (const char *value, char *buf, size_t len)
{
/* if value is a blobref, its verison 0 checkpoint */
if (blobref_validate (value) == 0) {
if (strlen (value) >= len)
return -1;
strcpy (buf, value);
return 0;
}
return -1;
}

static int checkpoint_get_version1 (const char *value,
char *buf, size_t len,
double *timestamp)
{
const char *rootref = NULL;
json_t *o = NULL;
int version;
int rv = -1;

if (!(o = json_loads (value, 0, NULL)))
goto error;

if (json_unpack (o, "{s:i}", "version", &version) < 0)
goto error;
/* only can handle version 1 right now */
if (version == 1) {
if (json_unpack (o, "{s:s s:f}",
"rootref", &rootref,
"timestamp", timestamp) < 0)
goto error;
if (strlen (rootref) >= len)
goto error;
strcpy (buf, rootref);
}
else
goto error;

rv = 0;
error:
json_decref (o);
return rv;
}

/* Synchronously get string value by key from checkpoint service.
* Copy value to buf with '\0' termination.
* Return 0 on success, -1 on failure,
*/
static int checkpoint_get (flux_t *h, const char *key, char *buf, size_t len)
{
flux_future_t *f;
const char *value;
flux_future_t *f = NULL;
const char *value = NULL;
double timestamp = 0.;
char datestr[128];
int rv = -1;

if (!(f = flux_rpc_pack (h,
"kvs-checkpoint.get",
Expand All @@ -2726,24 +2774,59 @@ static int checkpoint_get (flux_t *h, const char *key, char *buf, size_t len)
return -1;
if (flux_rpc_get_unpack (f, "{s:s}", "value", &value) < 0)
goto error;
if (strlen (value) >= len) {

if (checkpoint_get_version1 (value, buf, len, &timestamp) < 0
&& checkpoint_get_version0 (value, buf, len) < 0) {
errno = EINVAL;
goto error;
}
strcpy (buf, value);
flux_future_destroy (f);
return 0;

if (timestamp > 0.) {
time_t sec = timestamp;
struct tm tm;
gmtime_r (&sec, &tm);
strftime (datestr, sizeof (datestr), "%FT%T", &tm);
}
else
snprintf (datestr, sizeof (datestr), "N/A");

flux_log (h, LOG_INFO,
"restored kvs-primary from checkpoint on %s", datestr);
rv = 0;
error:
flux_future_destroy (f);
return -1;
return rv;
}

/* Synchronously store key-value pair to checkpoint service.
* Returns 0 on success, -1 on failure.
*/
static int checkpoint_put (flux_t *h, const char *key, const char *value)
static int checkpoint_put (flux_t *h, const char *key, const char *rootref)
{
flux_future_t *f;
flux_future_t *f = NULL;
double timestamp;
json_t *o = NULL;
char *value = NULL;
int rv = -1;

timestamp = flux_reactor_now (flux_get_reactor (h));

/* version 0 checkpoint
* - blobref string only
* version 1 checkpoint object
* - {"version":1 "rootref":s "timestamp":f}
*/
if (!(o = json_pack ("{s:i s:s s:f}",
"version", 1,
"rootref", rootref,
"timestamp", timestamp))) {
errno = ENOMEM;
goto error;
}
if (!(value = json_dumps (o, JSON_COMPACT))) {
errno = ENOMEM;
goto error;
}

if (!(f = flux_rpc_pack (h,
"kvs-checkpoint.put",
Expand All @@ -2754,13 +2837,15 @@ static int checkpoint_put (flux_t *h, const char *key, const char *value)
key,
"value",
value)))
return -1;
if (flux_rpc_get (f, NULL) < 0) {
flux_future_destroy (f);
return -1;
}
goto error;
if (flux_rpc_get (f, NULL) < 0)
goto error;
rv = 0;
error:
flux_future_destroy (f);
return 0;
json_decref (o);
free (value);
return rv;
}

/* Store initial root in local cache, and flush to content cache
Expand Down Expand Up @@ -2855,9 +2940,7 @@ int mod_main (flux_t *h, int argc, char **argv)
/* Look for a checkpoint and use it if found.
* Otherwise start the primary root namespace with an empty directory.
*/
if (checkpoint_get (h, "kvs-primary", rootref, sizeof (rootref)) == 0)
flux_log (h, LOG_INFO, "restored kvs-primary from checkpoint");
else {
if (checkpoint_get (h, "kvs-primary", rootref, sizeof (rootref)) < 0) {
if (store_initial_rootdir (ctx, rootref, sizeof (rootref)) < 0) {
flux_log_error (h, "storing initial root object");
goto done;
Expand Down
1 change: 1 addition & 0 deletions t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ dist_check_SCRIPTS = \
valgrind/valgrind-workload.sh \
valgrind/workload.d/job \
kvs/kvs-helper.sh \
kvs/change-checkpoint.py \
job-manager/exec-service.lua \
job-manager/drain-cancel.py \
job-manager/bulk-state.py \
Expand Down
21 changes: 21 additions & 0 deletions t/kvs/change-checkpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env python3

import sys
import sqlite3

if len(sys.argv) < 4:
print("change-checkpoint.py <file> <key> <value>")
sys.exit(1)

conn = sqlite3.connect(sys.argv[1])
cursor = conn.cursor()
s = (
'REPLACE INTO checkpt (key,value) values ("'
+ sys.argv[2]
+ '", "'
+ sys.argv[3]
+ '")'
)
cursor.execute(s)
conn.commit()
sys.exit(0)
31 changes: 31 additions & 0 deletions t/t2010-kvs-snapshot-restore.t
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ test -n "$FLUX_TESTS_LOGFILE" && set -- "$@" --logfile
test_under_flux 1

CHECKPOINT=${FLUX_BUILD_DIR}/t/kvs/checkpoint
CHANGECHECKPOINT=${FLUX_SOURCE_DIR}/t/kvs/change-checkpoint.py

test_expect_success 'store kvs-checkpoint key-val pairs' '
$CHECKPOINT put foo bar &&
Expand Down Expand Up @@ -59,4 +60,34 @@ test_expect_success 'content from previous instance survived' '
test_cmp get.exp get.out
'

test_expect_success 're-run instance, verify checkpoint date saved' '
flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \
flux dmesg >dmesg1.out
'

# just check for todays date, not time for obvious reasons
test_expect_success 'verify date in flux logs' '
today=`date --iso-8601` &&
grep checkpoint dmesg1.out | grep ${today}
'

test_expect_success 're-run instance, get rootref' '
flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \
flux kvs getroot -b > getroot.out
'

test_expect_success 'write rootref to checkpoint path, emulating original checkpoint' '
rootref=$(cat getroot.out) &&
${CHANGECHECKPOINT} $(pwd)/content.sqlite "kvs-primary" ${rootref}
'

test_expect_success 're-run instance, verify checkpoint correctly loaded' '
flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \
flux dmesg >dmesg2.out
'

test_expect_success 'verify checkpoint loaded with no date' '
grep checkpoint dmesg2.out | grep "N\/A"
'

test_done

0 comments on commit 48d89cd

Please sign in to comment.