Skip to content

Commit

Permalink
Merge pull request #4136 from chu11/issue3580_kvs_checkpoint_date
Browse files Browse the repository at this point in the history
kvs: add date to kvs-primary checkpoint
  • Loading branch information
mergify[bot] authored Feb 18, 2022
2 parents db2c75e + e1b0450 commit af925ba
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 18 deletions.
121 changes: 103 additions & 18 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -2707,14 +2707,62 @@ static void process_args (struct kvs_ctx *ctx, int ac, char **av)
}
}

static int checkpoint_get_version0 (const char *value, char *buf, size_t len)
{
/* if value is a blobref, its verison 0 checkpoint */
if (blobref_validate (value) == 0) {
if (strlen (value) >= len)
return -1;
strcpy (buf, value);
return 0;
}
return -1;
}

static int checkpoint_get_version1 (const char *value,
char *buf, size_t len,
double *timestamp)
{
const char *rootref = NULL;
json_t *o = NULL;
int version;
int rv = -1;

if (!(o = json_loads (value, 0, NULL)))
goto error;

if (json_unpack (o, "{s:i}", "version", &version) < 0)
goto error;
/* only can handle version 1 right now */
if (version == 1) {
if (json_unpack (o, "{s:s s:f}",
"rootref", &rootref,
"timestamp", timestamp) < 0)
goto error;
if (strlen (rootref) >= len)
goto error;
strcpy (buf, rootref);
}
else
goto error;

rv = 0;
error:
json_decref (o);
return rv;
}

/* Synchronously get string value by key from checkpoint service.
* Copy value to buf with '\0' termination.
* Return 0 on success, -1 on failure,
*/
static int checkpoint_get (flux_t *h, const char *key, char *buf, size_t len)
{
flux_future_t *f;
const char *value;
flux_future_t *f = NULL;
const char *value = NULL;
double timestamp = 0.;
char datestr[128];
int rv = -1;

if (!(f = flux_rpc_pack (h,
"kvs-checkpoint.get",
Expand All @@ -2726,24 +2774,59 @@ static int checkpoint_get (flux_t *h, const char *key, char *buf, size_t len)
return -1;
if (flux_rpc_get_unpack (f, "{s:s}", "value", &value) < 0)
goto error;
if (strlen (value) >= len) {

if (checkpoint_get_version1 (value, buf, len, &timestamp) < 0
&& checkpoint_get_version0 (value, buf, len) < 0) {
errno = EINVAL;
goto error;
}
strcpy (buf, value);
flux_future_destroy (f);
return 0;

if (timestamp > 0.) {
time_t sec = timestamp;
struct tm tm;
gmtime_r (&sec, &tm);
strftime (datestr, sizeof (datestr), "%FT%T", &tm);
}
else
snprintf (datestr, sizeof (datestr), "N/A");

flux_log (h, LOG_INFO,
"restored kvs-primary from checkpoint on %s", datestr);
rv = 0;
error:
flux_future_destroy (f);
return -1;
return rv;
}

/* Synchronously store key-value pair to checkpoint service.
* Returns 0 on success, -1 on failure.
*/
static int checkpoint_put (flux_t *h, const char *key, const char *value)
static int checkpoint_put (flux_t *h, const char *key, const char *rootref)
{
flux_future_t *f;
flux_future_t *f = NULL;
double timestamp;
json_t *o = NULL;
char *value = NULL;
int save_errno, rv = -1;

timestamp = flux_reactor_now (flux_get_reactor (h));

/* version 0 checkpoint
* - blobref string only
* version 1 checkpoint object
* - {"version":1 "rootref":s "timestamp":f}
*/
if (!(o = json_pack ("{s:i s:s s:f}",
"version", 1,
"rootref", rootref,
"timestamp", timestamp))) {
errno = ENOMEM;
goto error;
}
if (!(value = json_dumps (o, JSON_COMPACT))) {
errno = ENOMEM;
goto error;
}

if (!(f = flux_rpc_pack (h,
"kvs-checkpoint.put",
Expand All @@ -2754,13 +2837,17 @@ static int checkpoint_put (flux_t *h, const char *key, const char *value)
key,
"value",
value)))
return -1;
if (flux_rpc_get (f, NULL) < 0) {
flux_future_destroy (f);
return -1;
}
goto error;
if (flux_rpc_get (f, NULL) < 0)
goto error;
rv = 0;
error:
save_errno = errno;
flux_future_destroy (f);
return 0;
json_decref (o);
free (value);
errno = save_errno;
return rv;
}

/* Store initial root in local cache, and flush to content cache
Expand Down Expand Up @@ -2855,9 +2942,7 @@ int mod_main (flux_t *h, int argc, char **argv)
/* Look for a checkpoint and use it if found.
* Otherwise start the primary root namespace with an empty directory.
*/
if (checkpoint_get (h, "kvs-primary", rootref, sizeof (rootref)) == 0)
flux_log (h, LOG_INFO, "restored kvs-primary from checkpoint");
else {
if (checkpoint_get (h, "kvs-primary", rootref, sizeof (rootref)) < 0) {
if (store_initial_rootdir (ctx, rootref, sizeof (rootref)) < 0) {
flux_log_error (h, "storing initial root object");
goto done;
Expand Down
1 change: 1 addition & 0 deletions t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ dist_check_SCRIPTS = \
valgrind/valgrind-workload.sh \
valgrind/workload.d/job \
kvs/kvs-helper.sh \
kvs/change-checkpoint.py \
job-manager/exec-service.lua \
job-manager/drain-cancel.py \
job-manager/bulk-state.py \
Expand Down
21 changes: 21 additions & 0 deletions t/kvs/change-checkpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env python3

import sys
import sqlite3

if len(sys.argv) < 4:
print("change-checkpoint.py <file> <key> <value>")
sys.exit(1)

conn = sqlite3.connect(sys.argv[1])
cursor = conn.cursor()
s = (
'REPLACE INTO checkpt (key,value) values ("'
+ sys.argv[2]
+ '", "'
+ sys.argv[3]
+ '")'
)
cursor.execute(s)
conn.commit()
sys.exit(0)
31 changes: 31 additions & 0 deletions t/t2010-kvs-snapshot-restore.t
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ test -n "$FLUX_TESTS_LOGFILE" && set -- "$@" --logfile
test_under_flux 1

CHECKPOINT=${FLUX_BUILD_DIR}/t/kvs/checkpoint
CHANGECHECKPOINT=${FLUX_SOURCE_DIR}/t/kvs/change-checkpoint.py

test_expect_success 'store kvs-checkpoint key-val pairs' '
$CHECKPOINT put foo bar &&
Expand Down Expand Up @@ -59,4 +60,34 @@ test_expect_success 'content from previous instance survived' '
test_cmp get.exp get.out
'

test_expect_success 're-run instance, verify checkpoint date saved' '
flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \
flux dmesg >dmesg1.out
'

# just check for todays date, not time for obvious reasons
test_expect_success 'verify date in flux logs' '
today=`date --iso-8601` &&
grep checkpoint dmesg1.out | grep ${today}
'

test_expect_success 're-run instance, get rootref' '
flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \
flux kvs getroot -b > getroot.out
'

test_expect_success 'write rootref to checkpoint path, emulating original checkpoint' '
rootref=$(cat getroot.out) &&
${CHANGECHECKPOINT} $(pwd)/content.sqlite "kvs-primary" ${rootref}
'

test_expect_success 're-run instance, verify checkpoint correctly loaded' '
flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \
flux dmesg >dmesg2.out
'

test_expect_success 'verify checkpoint loaded with no date' '
grep checkpoint dmesg2.out | grep "N\/A"
'

test_done

0 comments on commit af925ba

Please sign in to comment.