Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvs: add date to kvs-primary checkpoint #4136

Merged
merged 1 commit into from
Feb 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 103 additions & 18 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -2707,14 +2707,62 @@ static void process_args (struct kvs_ctx *ctx, int ac, char **av)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit message for f80b1a5:

Maybe should read "...when the primary namespace was checkpointed."

and "When checkpointing the primary namespace..."

}

static int checkpoint_get_version0 (const char *value, char *buf, size_t len)
{
/* if value is a blobref, its verison 0 checkpoint */
if (blobref_validate (value) == 0) {
if (strlen (value) >= len)
return -1;
strcpy (buf, value);
return 0;
}
return -1;
}

static int checkpoint_get_version1 (const char *value,
char *buf, size_t len,
double *timestamp)
{
const char *rootref = NULL;
json_t *o = NULL;
int version;
int rv = -1;

if (!(o = json_loads (value, 0, NULL)))
goto error;

if (json_unpack (o, "{s:i}", "version", &version) < 0)
goto error;
/* only can handle version 1 right now */
if (version == 1) {
if (json_unpack (o, "{s:s s:f}",
"rootref", &rootref,
"timestamp", timestamp) < 0)
goto error;
if (strlen (rootref) >= len)
goto error;
strcpy (buf, rootref);
}
else
goto error;

rv = 0;
error:
json_decref (o);
return rv;
}

/* Synchronously get string value by key from checkpoint service.
* Copy value to buf with '\0' termination.
* Return 0 on success, -1 on failure,
*/
static int checkpoint_get (flux_t *h, const char *key, char *buf, size_t len)
{
flux_future_t *f;
const char *value;
flux_future_t *f = NULL;
const char *value = NULL;
double timestamp = 0.;
char datestr[128];
int rv = -1;

if (!(f = flux_rpc_pack (h,
"kvs-checkpoint.get",
Expand All @@ -2726,24 +2774,59 @@ static int checkpoint_get (flux_t *h, const char *key, char *buf, size_t len)
return -1;
if (flux_rpc_get_unpack (f, "{s:s}", "value", &value) < 0)
goto error;
if (strlen (value) >= len) {

if (checkpoint_get_version1 (value, buf, len, &timestamp) < 0
&& checkpoint_get_version0 (value, buf, len) < 0) {
errno = EINVAL;
goto error;
}
strcpy (buf, value);
flux_future_destroy (f);
return 0;

if (timestamp > 0.) {
time_t sec = timestamp;
struct tm tm;
gmtime_r (&sec, &tm);
strftime (datestr, sizeof (datestr), "%FT%T", &tm);
}
else
snprintf (datestr, sizeof (datestr), "N/A");

flux_log (h, LOG_INFO,
"restored kvs-primary from checkpoint on %s", datestr);
rv = 0;
error:
flux_future_destroy (f);
return -1;
return rv;
}

/* Synchronously store key-value pair to checkpoint service.
* Returns 0 on success, -1 on failure.
*/
static int checkpoint_put (flux_t *h, const char *key, const char *value)
static int checkpoint_put (flux_t *h, const char *key, const char *rootref)
{
flux_future_t *f;
flux_future_t *f = NULL;
double timestamp;
json_t *o = NULL;
char *value = NULL;
int save_errno, rv = -1;

timestamp = flux_reactor_now (flux_get_reactor (h));

/* version 0 checkpoint
* - blobref string only
* version 1 checkpoint object
* - {"version":1 "rootref":s "timestamp":f}
*/
if (!(o = json_pack ("{s:i s:s s:f}",
"version", 1,
"rootref", rootref,
"timestamp", timestamp))) {
errno = ENOMEM;
goto error;
}
if (!(value = json_dumps (o, JSON_COMPACT))) {
errno = ENOMEM;
goto error;
}

if (!(f = flux_rpc_pack (h,
"kvs-checkpoint.put",
Expand All @@ -2754,13 +2837,17 @@ static int checkpoint_put (flux_t *h, const char *key, const char *value)
key,
"value",
value)))
return -1;
if (flux_rpc_get (f, NULL) < 0) {
flux_future_destroy (f);
return -1;
}
goto error;
if (flux_rpc_get (f, NULL) < 0)
goto error;
rv = 0;
error:
save_errno = errno;
flux_future_destroy (f);
return 0;
json_decref (o);
free (value);
Comment on lines +2847 to +2848
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially clobbers errno.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

json_decref() and free() potentially clobber errno.

errno = save_errno;
return rv;
}

/* Store initial root in local cache, and flush to content cache
Expand Down Expand Up @@ -2855,9 +2942,7 @@ int mod_main (flux_t *h, int argc, char **argv)
/* Look for a checkpoint and use it if found.
* Otherwise start the primary root namespace with an empty directory.
*/
if (checkpoint_get (h, "kvs-primary", rootref, sizeof (rootref)) == 0)
flux_log (h, LOG_INFO, "restored kvs-primary from checkpoint");
else {
if (checkpoint_get (h, "kvs-primary", rootref, sizeof (rootref)) < 0) {
if (store_initial_rootdir (ctx, rootref, sizeof (rootref)) < 0) {
flux_log_error (h, "storing initial root object");
goto done;
Expand Down
1 change: 1 addition & 0 deletions t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ dist_check_SCRIPTS = \
valgrind/valgrind-workload.sh \
valgrind/workload.d/job \
kvs/kvs-helper.sh \
kvs/change-checkpoint.py \
job-manager/exec-service.lua \
job-manager/drain-cancel.py \
job-manager/bulk-state.py \
Expand Down
21 changes: 21 additions & 0 deletions t/kvs/change-checkpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env python3

import sys
import sqlite3

if len(sys.argv) < 4:
print("change-checkpoint.py <file> <key> <value>")
sys.exit(1)

conn = sqlite3.connect(sys.argv[1])
cursor = conn.cursor()
s = (
'REPLACE INTO checkpt (key,value) values ("'
+ sys.argv[2]
+ '", "'
+ sys.argv[3]
+ '")'
)
cursor.execute(s)
conn.commit()
sys.exit(0)
31 changes: 31 additions & 0 deletions t/t2010-kvs-snapshot-restore.t
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ test -n "$FLUX_TESTS_LOGFILE" && set -- "$@" --logfile
test_under_flux 1

CHECKPOINT=${FLUX_BUILD_DIR}/t/kvs/checkpoint
CHANGECHECKPOINT=${FLUX_SOURCE_DIR}/t/kvs/change-checkpoint.py

test_expect_success 'store kvs-checkpoint key-val pairs' '
$CHECKPOINT put foo bar &&
Expand Down Expand Up @@ -59,4 +60,34 @@ test_expect_success 'content from previous instance survived' '
test_cmp get.exp get.out
'

test_expect_success 're-run instance, verify checkpoint date saved' '
flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \
flux dmesg >dmesg1.out
'

# just check for todays date, not time for obvious reasons
test_expect_success 'verify date in flux logs' '
today=`date --iso-8601` &&
grep checkpoint dmesg1.out | grep ${today}
'

test_expect_success 're-run instance, get rootref' '
flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \
flux kvs getroot -b > getroot.out
'

test_expect_success 'write rootref to checkpoint path, emulating original checkpoint' '
rootref=$(cat getroot.out) &&
${CHANGECHECKPOINT} $(pwd)/content.sqlite "kvs-primary" ${rootref}
'

test_expect_success 're-run instance, verify checkpoint correctly loaded' '
flux start -o,--setattr=content.backing-path=$(pwd)/content.sqlite \
flux dmesg >dmesg2.out
'

test_expect_success 'verify checkpoint loaded with no date' '
grep checkpoint dmesg2.out | grep "N\/A"
'

test_done