diff --git a/doc/man5/flux-config-kvs.rst b/doc/man5/flux-config-kvs.rst index 19f75ac99343..5c78125fecf7 100644 --- a/doc/man5/flux-config-kvs.rst +++ b/doc/man5/flux-config-kvs.rst @@ -22,6 +22,12 @@ checkpoint-period primary namespace. The checkpoint is used to protect against data loss in the event of a Flux broker crash. +gc-threshold + (optional) Sets the KVS garbage collection sequence number + threshold. Once this threshold is crossed, it will inform + :man1:`flux-shutdown` to ask the user to perform offline KVS + garbage collection. + EXAMPLE ======= @@ -30,7 +36,7 @@ EXAMPLE [kvs] checkpoint-period = "30m" - + gc-threshold = 1000000 RESOURCES ========= @@ -43,4 +49,4 @@ RFC 23: Flux Standard Duration: https://flux-framework.readthedocs.io/projects/f SEE ALSO ======== -:man5:`flux-config` +:man1:`flux-shutdown`,:man5:`flux-config` diff --git a/src/cmd/builtin/shutdown.c b/src/cmd/builtin/shutdown.c index 4fef6ee63ace..778f9eecad98 100644 --- a/src/cmd/builtin/shutdown.c +++ b/src/cmd/builtin/shutdown.c @@ -12,13 +12,54 @@ # include #endif #include +#include #include #include "src/broker/state_machine.h" +#include "src/common/libkvs/kvs_checkpoint.h" #include "src/common/libutil/uri.h" #include "builtin.h" +static void get_checkpoint_sequence (flux_t *h, int *seq) { + flux_future_t *f; + (*seq) = 0; + if (!(f = kvs_checkpoint_lookup (h, NULL, 0)) + || (kvs_checkpoint_lookup_get_sequence (f, seq) < 0 + && errno != ENOENT)) + log_msg_exit ("Error fetching checkpoint sequence: %s", + future_strerror (f, errno)); +} + +static void get_gc_threshold (flux_t *h, int *gc_threshold) { + flux_future_t *f; + json_t *o; + (*gc_threshold) = 0; + if (!(f = flux_rpc (h, "config.get", NULL, FLUX_NODEID_ANY, 0)) + || flux_rpc_get_unpack (f, "o", &o) < 0) + log_msg_exit ("Error fetching flux config: %s", + future_strerror (f, errno)); + (void)json_unpack (o, "{s:{s:i}}", "kvs", "gc-threshold", gc_threshold); +} + +static bool gc_threshold_check (flux_t *h) { + int gc_threshold, seq; + bool rc = false; + + get_checkpoint_sequence (h, &seq); + get_gc_threshold (h, &gc_threshold); + + if (gc_threshold > 0 && seq > gc_threshold) { + char *s = NULL; + printf ("gc threshold exceeded, do you want to perform garbage collection (Y/n)? "); + scanf ("%ms", &s); + if (!s || strncasecmp (s, "y", 1) == 0) + rc = true; + free (s); + } + return rc; +} + static void process_updates (flux_future_t *f) { const char *s; @@ -68,7 +109,9 @@ static int subcmd (optparse_t *p, int ac, char *av[]) if (optparse_hasopt (p, "background")) flags &= ~FLUX_RPC_STREAMING; - if (optparse_hasopt (p, "gc") || optparse_hasopt (p, "dump")) { + if (optparse_hasopt (p, "gc") + || optparse_hasopt (p, "dump") + || gc_threshold_check (h)) { const char *val = optparse_get_str (p, "dump", "auto"); if (flux_attr_set (h, "content.dump", val) < 0)