Skip to content

Commit

Permalink
kvs: checkpoint root sequence number
Browse files Browse the repository at this point in the history
Problem: The KVS root sequence number isn't persisted across
restarts.  It restarts at zero everytime the KVS module is loaded.

Solution: Add the root sequence number to the checkpoint object, read
it in and initialize the KVS primary namespace to that sequence
number.

Fixes flux-framework#4446
  • Loading branch information
chu11 committed Aug 6, 2022
1 parent b075e1d commit f301c25
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 12 deletions.
3 changes: 2 additions & 1 deletion src/cmd/builtin/restore.c
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,8 @@ static int cmd_restore (optparse_t *p, int ac, char *av[])
shortblobref_length (blobref),
blobref);
}
if (!(f = kvs_checkpoint_commit (h, NULL, blobref, restore_timestamp))
/* restoring, therefore we restart sequence number at 0 */
if (!(f = kvs_checkpoint_commit (h, NULL, blobref, 0, restore_timestamp))
|| flux_rpc_get (f, NULL) < 0) {
log_msg_exit ("error updating checkpoint: %s",
future_strerror (f, errno));
Expand Down
28 changes: 26 additions & 2 deletions src/common/libkvs/kvs_checkpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
flux_future_t *kvs_checkpoint_commit (flux_t *h,
const char *key,
const char *rootref,
int sequence,
double timestamp)
{
flux_future_t *f = NULL;

if (!h || !rootref) {
if (!h || !rootref || sequence < 0) {
errno = EINVAL;
return NULL;
}
Expand All @@ -40,12 +41,13 @@ flux_future_t *kvs_checkpoint_commit (flux_t *h,
"kvs-checkpoint.put",
0,
0,
"{s:s s:{s:i s:s s:f}}",
"{s:s s:{s:i s:s s:i s:f}}",
"key",
key,
"value",
"version", 1,
"rootref", rootref,
"sequence", sequence,
"timestamp", timestamp)))
return NULL;

Expand Down Expand Up @@ -117,6 +119,28 @@ int kvs_checkpoint_lookup_get_timestamp (flux_future_t *f, double *timestamp)
return 0;
}

int kvs_checkpoint_lookup_get_sequence (flux_future_t *f, int *sequence)
{
int version;
int seq = 0;

if (!f || !sequence) {
errno = EINVAL;
return -1;
}
if (flux_rpc_get_unpack (f, "{s:{s:i s?i}}",
"value",
"version", &version,
"sequence", &seq) < 0)
return -1;
if (version != 0 && version != 1) {
errno = EINVAL;
return -1;
}
*sequence = seq;
return 0;
}

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
5 changes: 5 additions & 0 deletions src/common/libkvs/kvs_checkpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
flux_future_t *kvs_checkpoint_commit (flux_t *h,
const char *key,
const char *rootref,
int sequence,
double timestamp);

flux_future_t *kvs_checkpoint_lookup (flux_t *h, const char *key);
Expand All @@ -36,6 +37,10 @@ int kvs_checkpoint_lookup_get_rootref (flux_future_t *f, const char **rootref);
*/
int kvs_checkpoint_lookup_get_timestamp (flux_future_t *f, double *timestamp);

/* sets sequence to 0 if unavailable
*/
int kvs_checkpoint_lookup_get_sequence (flux_future_t *f, int *sequence);

#endif /* !_KVS_CHECKPOINT_H */

/*
Expand Down
13 changes: 12 additions & 1 deletion src/common/libkvs/test/kvs_checkpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ void errors (void)
const char *rootref;

errno = 0;
ok (kvs_checkpoint_commit (NULL, NULL, NULL, 0) == NULL
ok (kvs_checkpoint_commit (NULL, NULL, NULL, 0, 0) == NULL
&& errno == EINVAL,
"kvs_checkpoint_commit fails on bad input");

Expand All @@ -45,6 +45,11 @@ void errors (void)
&& errno == EINVAL,
"kvs_checkpoint_lookup_get_timestamp fails on bad input");

errno = 0;
ok (kvs_checkpoint_lookup_get_sequence (NULL, NULL) < 0
&& errno == EINVAL,
"kvs_checkpoint_lookup_get_sequence fails on bad input");

if (!(f = flux_future_create (NULL, NULL)))
BAIL_OUT ("flux_future_create failed");

Expand All @@ -59,6 +64,12 @@ void errors (void)
&& errno == EDEADLOCK,
"kvs_checkpoint_lookup_get_timestamp fails on unfulfilled future");

errno = 0;
int sequence;
ok (kvs_checkpoint_lookup_get_sequence (f, &sequence) < 0
&& errno == EDEADLOCK,
"kvs_checkpoint_lookup_get_sequence fails on unfulfilled future");

flux_future_destroy (f);
}

Expand Down
3 changes: 2 additions & 1 deletion src/modules/content-sqlite/content-sqlite.c
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,10 @@ void checkpoint_get_cb (flux_t *h,
}
/* assume "version 0" if value is a bare blobref and return it
* in a json envelope */
if (!(o = json_pack ("{s:i s:s s:f}",
if (!(o = json_pack ("{s:i s:s s:i s:f}",
"version", 0,
"rootref", s,
"sequence", 0,
"timestamp", 0.))) {
errstr = "failed to encode blobref in json envelope";
errno = EINVAL;
Expand Down
18 changes: 11 additions & 7 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -2820,7 +2820,7 @@ static void process_args (struct kvs_ctx *ctx, int ac, char **av)
* Copy rootref buf with '\0' termination.
* Return 0 on success, -1 on failure,
*/
static int checkpoint_get (flux_t *h, char *buf, size_t len)
static int checkpoint_get (flux_t *h, char *buf, size_t len, int *seq)
{
flux_future_t *f = NULL;
const char *rootref;
Expand All @@ -2840,6 +2840,8 @@ static int checkpoint_get (flux_t *h, char *buf, size_t len)
}
strcpy (buf, rootref);

(void)kvs_checkpoint_lookup_get_sequence (f, seq);

(void)kvs_checkpoint_lookup_get_timestamp (f, &timestamp);
if (timestamp > 0)
timestamp_tostr (timestamp, datestr, sizeof (datestr));
Expand All @@ -2856,12 +2858,12 @@ static int checkpoint_get (flux_t *h, char *buf, size_t len)
/* Synchronously store checkpoint to checkpoint service.
* Returns 0 on success, -1 on failure.
*/
static int checkpoint_put (flux_t *h, const char *rootref)
static int checkpoint_put (flux_t *h, const char *rootref, int rootseq)
{
flux_future_t *f = NULL;
int rv = -1;

if (!(f = kvs_checkpoint_commit (h, NULL, rootref, 0))
if (!(f = kvs_checkpoint_commit (h, NULL, rootref, rootseq, 0))
|| flux_rpc_get (f, NULL) < 0)
goto error;
rv = 0;
Expand Down Expand Up @@ -2964,6 +2966,7 @@ int mod_main (flux_t *h, int argc, char **argv)
struct kvsroot *root;
char empty_dir_rootref[BLOBREF_MAX_STRING_SIZE];
char rootref[BLOBREF_MAX_STRING_SIZE];
int seq = 0;
uint32_t owner = getuid ();

if (store_initial_rootdir (ctx,
Expand All @@ -2974,9 +2977,10 @@ int mod_main (flux_t *h, int argc, char **argv)
}

/* Look for a checkpoint and use it if found.
* Otherwise start the primary root namespace with an empty directory.
* Otherwise start the primary root namespace with an empty directory
* and seq = 0.
*/
if (checkpoint_get (h, rootref, sizeof (rootref)) < 0)
if (checkpoint_get (h, rootref, sizeof (rootref), &seq) < 0)
memcpy (rootref, empty_dir_rootref, sizeof (empty_dir_rootref));

/* primary namespace must always be there and not marked
Expand All @@ -2996,7 +3000,7 @@ int mod_main (flux_t *h, int argc, char **argv)
}
}

setroot (ctx, root, rootref, 0);
setroot (ctx, root, rootref, seq);

if (event_subscribe (ctx, KVS_PRIMARY_NAMESPACE) < 0) {
flux_log_error (h, "event_subscribe");
Expand Down Expand Up @@ -3033,7 +3037,7 @@ int mod_main (flux_t *h, int argc, char **argv)
flux_log_error (h, "error looking up primary root");
goto done;
}
if (checkpoint_put (ctx->h, root->ref) < 0) {
if (checkpoint_put (ctx->h, root->ref, root->seq) < 0) {
if (errno != ENOSYS) { // service not loaded is not an error
flux_log_error (h, "error saving primary KVS checkpoint");
goto done;
Expand Down
9 changes: 9 additions & 0 deletions src/modules/kvs/kvstxn.c
Original file line number Diff line number Diff line change
Expand Up @@ -1091,9 +1091,18 @@ kvstxn_process_t kvstxn_process (kvstxn_t *kt,
else if (kt->state == KVSTXN_STATE_SYNC_CHECKPOINT) {

if (!(kt->f_sync_checkpoint)) {
int newseq = root_seq;

/* if we're publishing, seq will be the seq after
* the current one.
*/
if (!(kt->internal_flags & KVSTXN_INTERNAL_FLAG_NO_PUBLISH))
newseq++;

kt->f_sync_checkpoint = kvs_checkpoint_commit (kt->ktm->h,
NULL,
kt->newroot,
newseq,
0);
if (!kt->f_sync_checkpoint) {
kt->errnum = errno;
Expand Down

0 comments on commit f301c25

Please sign in to comment.