From 0a10973b961a9d8be4eb8601ecf9028f8263f9ae Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Fri, 5 Aug 2022 11:55:31 -0700 Subject: [PATCH] kvs: checkpoint root sequence number Problem: The KVS root sequence number isn't persisted across restarts. It restarts at zero everytime the KVS module is loaded. Solution: Add the root sequence number to the checkpoint object, read it in and initialize the KVS primary namespace to that sequence number. Fixes #4446 --- src/cmd/builtin/restore.c | 3 ++- src/common/libkvs/kvs_checkpoint.c | 28 +++++++++++++++++++-- src/common/libkvs/kvs_checkpoint.h | 5 ++++ src/common/libkvs/test/kvs_checkpoint.c | 13 +++++++++- src/modules/content-sqlite/content-sqlite.c | 3 ++- src/modules/kvs/kvs.c | 18 +++++++------ src/modules/kvs/kvstxn.c | 9 +++++++ 7 files changed, 67 insertions(+), 12 deletions(-) diff --git a/src/cmd/builtin/restore.c b/src/cmd/builtin/restore.c index 4e55c3dfe09a..810829b5ebc3 100644 --- a/src/cmd/builtin/restore.c +++ b/src/cmd/builtin/restore.c @@ -371,7 +371,8 @@ static int cmd_restore (optparse_t *p, int ac, char *av[]) shortblobref_length (blobref), blobref); } - if (!(f = kvs_checkpoint_commit (h, NULL, blobref, restore_timestamp)) + /* restoring, therefore we restart sequence number at 0 */ + if (!(f = kvs_checkpoint_commit (h, NULL, blobref, 0, restore_timestamp)) || flux_rpc_get (f, NULL) < 0) { log_msg_exit ("error updating checkpoint: %s", future_strerror (f, errno)); diff --git a/src/common/libkvs/kvs_checkpoint.c b/src/common/libkvs/kvs_checkpoint.c index e9d933398eeb..e5be28d11df0 100644 --- a/src/common/libkvs/kvs_checkpoint.c +++ b/src/common/libkvs/kvs_checkpoint.c @@ -23,11 +23,12 @@ flux_future_t *kvs_checkpoint_commit (flux_t *h, const char *key, const char *rootref, + int sequence, double timestamp) { flux_future_t *f = NULL; - if (!h || !rootref) { + if (!h || !rootref || sequence < 0) { errno = EINVAL; return NULL; } @@ -40,12 +41,13 @@ flux_future_t *kvs_checkpoint_commit (flux_t *h, "content.checkpoint-put", 0, 0, - "{s:s s:{s:i s:s s:f}}", + "{s:s s:{s:i s:s s:i s:f}}", "key", key, "value", "version", 1, "rootref", rootref, + "sequence", sequence, "timestamp", timestamp))) return NULL; @@ -117,6 +119,28 @@ int kvs_checkpoint_lookup_get_timestamp (flux_future_t *f, double *timestamp) return 0; } +int kvs_checkpoint_lookup_get_sequence (flux_future_t *f, int *sequence) +{ + int version; + int seq = 0; + + if (!f || !sequence) { + errno = EINVAL; + return -1; + } + if (flux_rpc_get_unpack (f, "{s:{s:i s?i}}", + "value", + "version", &version, + "sequence", &seq) < 0) + return -1; + if (version != 0 && version != 1) { + errno = EINVAL; + return -1; + } + *sequence = seq; + return 0; +} + /* * vi:tabstop=4 shiftwidth=4 expandtab */ diff --git a/src/common/libkvs/kvs_checkpoint.h b/src/common/libkvs/kvs_checkpoint.h index db5d8a4a5604..822a7df17f76 100644 --- a/src/common/libkvs/kvs_checkpoint.h +++ b/src/common/libkvs/kvs_checkpoint.h @@ -26,6 +26,7 @@ flux_future_t *kvs_checkpoint_commit (flux_t *h, const char *key, const char *rootref, + int sequence, double timestamp); flux_future_t *kvs_checkpoint_lookup (flux_t *h, const char *key); @@ -36,6 +37,10 @@ int kvs_checkpoint_lookup_get_rootref (flux_future_t *f, const char **rootref); */ int kvs_checkpoint_lookup_get_timestamp (flux_future_t *f, double *timestamp); +/* sets sequence to 0 if unavailable + */ +int kvs_checkpoint_lookup_get_sequence (flux_future_t *f, int *sequence); + #endif /* !_KVS_CHECKPOINT_H */ /* diff --git a/src/common/libkvs/test/kvs_checkpoint.c b/src/common/libkvs/test/kvs_checkpoint.c index 6309a38919fa..484e07a9bb4f 100644 --- a/src/common/libkvs/test/kvs_checkpoint.c +++ b/src/common/libkvs/test/kvs_checkpoint.c @@ -26,7 +26,7 @@ void errors (void) const char *rootref; errno = 0; - ok (kvs_checkpoint_commit (NULL, NULL, NULL, 0) == NULL + ok (kvs_checkpoint_commit (NULL, NULL, NULL, 0, 0) == NULL && errno == EINVAL, "kvs_checkpoint_commit fails on bad input"); @@ -45,6 +45,11 @@ void errors (void) && errno == EINVAL, "kvs_checkpoint_lookup_get_timestamp fails on bad input"); + errno = 0; + ok (kvs_checkpoint_lookup_get_sequence (NULL, NULL) < 0 + && errno == EINVAL, + "kvs_checkpoint_lookup_get_sequence fails on bad input"); + if (!(f = flux_future_create (NULL, NULL))) BAIL_OUT ("flux_future_create failed"); @@ -59,6 +64,12 @@ void errors (void) && errno == EDEADLOCK, "kvs_checkpoint_lookup_get_timestamp fails on unfulfilled future"); + errno = 0; + int sequence; + ok (kvs_checkpoint_lookup_get_sequence (f, &sequence) < 0 + && errno == EDEADLOCK, + "kvs_checkpoint_lookup_get_sequence fails on unfulfilled future"); + flux_future_destroy (f); } diff --git a/src/modules/content-sqlite/content-sqlite.c b/src/modules/content-sqlite/content-sqlite.c index 8c2f0a2dc3d8..9565d1ea5428 100644 --- a/src/modules/content-sqlite/content-sqlite.c +++ b/src/modules/content-sqlite/content-sqlite.c @@ -390,9 +390,10 @@ void checkpoint_get_cb (flux_t *h, } /* assume "version 0" if value is a bare blobref and return it * in a json envelope */ - if (!(o = json_pack ("{s:i s:s s:f}", + if (!(o = json_pack ("{s:i s:s s:i s:f}", "version", 0, "rootref", s, + "sequence", 0, "timestamp", 0.))) { errstr = "failed to encode blobref in json envelope"; errno = EINVAL; diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index 4c558c7871a9..39640396a1e0 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -2820,7 +2820,7 @@ static void process_args (struct kvs_ctx *ctx, int ac, char **av) * Copy rootref buf with '\0' termination. * Return 0 on success, -1 on failure, */ -static int checkpoint_get (flux_t *h, char *buf, size_t len) +static int checkpoint_get (flux_t *h, char *buf, size_t len, int *seq) { flux_future_t *f = NULL; const char *rootref; @@ -2840,6 +2840,8 @@ static int checkpoint_get (flux_t *h, char *buf, size_t len) } strcpy (buf, rootref); + (void)kvs_checkpoint_lookup_get_sequence (f, seq); + (void)kvs_checkpoint_lookup_get_timestamp (f, ×tamp); if (timestamp > 0) timestamp_tostr (timestamp, datestr, sizeof (datestr)); @@ -2856,12 +2858,12 @@ static int checkpoint_get (flux_t *h, char *buf, size_t len) /* Synchronously store checkpoint to checkpoint service. * Returns 0 on success, -1 on failure. */ -static int checkpoint_put (flux_t *h, const char *rootref) +static int checkpoint_put (flux_t *h, const char *rootref, int rootseq) { flux_future_t *f = NULL; int rv = -1; - if (!(f = kvs_checkpoint_commit (h, NULL, rootref, 0)) + if (!(f = kvs_checkpoint_commit (h, NULL, rootref, rootseq, 0)) || flux_rpc_get (f, NULL) < 0) goto error; rv = 0; @@ -2964,6 +2966,7 @@ int mod_main (flux_t *h, int argc, char **argv) struct kvsroot *root; char empty_dir_rootref[BLOBREF_MAX_STRING_SIZE]; char rootref[BLOBREF_MAX_STRING_SIZE]; + int seq = 0; uint32_t owner = getuid (); if (store_initial_rootdir (ctx, @@ -2974,9 +2977,10 @@ int mod_main (flux_t *h, int argc, char **argv) } /* Look for a checkpoint and use it if found. - * Otherwise start the primary root namespace with an empty directory. + * Otherwise start the primary root namespace with an empty directory + * and seq = 0. */ - if (checkpoint_get (h, rootref, sizeof (rootref)) < 0) + if (checkpoint_get (h, rootref, sizeof (rootref), &seq) < 0) memcpy (rootref, empty_dir_rootref, sizeof (empty_dir_rootref)); /* primary namespace must always be there and not marked @@ -2996,7 +3000,7 @@ int mod_main (flux_t *h, int argc, char **argv) } } - setroot (ctx, root, rootref, 0); + setroot (ctx, root, rootref, seq); if (event_subscribe (ctx, KVS_PRIMARY_NAMESPACE) < 0) { flux_log_error (h, "event_subscribe"); @@ -3033,7 +3037,7 @@ int mod_main (flux_t *h, int argc, char **argv) flux_log_error (h, "error looking up primary root"); goto done; } - if (checkpoint_put (ctx->h, root->ref) < 0) { + if (checkpoint_put (ctx->h, root->ref, root->seq) < 0) { if (errno != ENOSYS) { // service not loaded is not an error flux_log_error (h, "error saving primary KVS checkpoint"); goto done; diff --git a/src/modules/kvs/kvstxn.c b/src/modules/kvs/kvstxn.c index 13a8cd3438e8..b9cb2acf4a18 100644 --- a/src/modules/kvs/kvstxn.c +++ b/src/modules/kvs/kvstxn.c @@ -1091,9 +1091,18 @@ kvstxn_process_t kvstxn_process (kvstxn_t *kt, else if (kt->state == KVSTXN_STATE_SYNC_CHECKPOINT) { if (!(kt->f_sync_checkpoint)) { + int newseq = root_seq; + + /* if we're publishing, seq will be the seq after + * the current one. + */ + if (!(kt->internal_flags & KVSTXN_INTERNAL_FLAG_NO_PUBLISH)) + newseq++; + kt->f_sync_checkpoint = kvs_checkpoint_commit (kt->ktm->h, NULL, kt->newroot, + newseq, 0); if (!kt->f_sync_checkpoint) { kt->errnum = errno;