Skip to content

Commit

Permalink
Merge pull request flux-framework#4383 from chu11/issue4301_defensive…
Browse files Browse the repository at this point in the history
…_checkpoints

kvs: add defensive checkpoint via sync configuration option
  • Loading branch information
mergify[bot] authored Jul 26, 2022
2 parents 7e17fdb + 882a320 commit 007b4a6
Show file tree
Hide file tree
Showing 16 changed files with 788 additions and 50 deletions.
3 changes: 2 additions & 1 deletion doc/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,8 @@ MAN5_FILES_PRIMARY = \
man5/flux-config-resource.5 \
man5/flux-config-archive.5 \
man5/flux-config-job-manager.5 \
man5/flux-config-ingest.5
man5/flux-config-ingest.5 \
man5/flux-config-kvs.5


MAN7_FILES = $(MAN7_FILES_PRIMARY)
Expand Down
46 changes: 46 additions & 0 deletions doc/man5/flux-config-kvs.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
==================
flux-config-kvs(5)
==================


DESCRIPTION
===========

The Flux system instance **kvs** service provides the primary key value
store (i.e. "the KVS") for a large number of Flux services. For
example, job eventlogs are stored in the KVS.

The ``kvs`` table may contain the following keys:


KEYS
====

checkpoint-period
(optional) Sets a period of time (in RFC 23 Flux Standard Duration
format) that the KVS will regularly checkpoint a reference to its
primary namespace. The checkpoint is used to protect against data
loss in the event of a Flux broker crash.


EXAMPLE
=======

::

[kvs]
checkpoint-period = "30m"


RESOURCES
=========

Flux: http://flux-framework.org

RFC 23: Flux Standard Duration: https://flux-framework.readthedocs.io/projects/flux-rfc/en/latest/spec_23.html


SEE ALSO
========

:man5:`flux-config`
2 changes: 1 addition & 1 deletion doc/man5/flux-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,4 @@ SEE ALSO
:man1:`flux-broker`, :man5:`flux-config-access`, :man5:`flux-config-bootstrap`,
:man5:`flux-config-tbon`, :man5:`flux-config-exec`, :man5:`flux-config-ingest`,
:man5:`flux-config-resource`, :man5:`flux-config-archive`,
:man5:`flux-config-job-manager`
:man5:`flux-config-job-manager`, :man5:`flux-config-kvs`
1 change: 1 addition & 0 deletions doc/manpages.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@
('man5/flux-config-resource', 'flux-config-resource', 'configure Flux resource service', [author], 5),
('man5/flux-config-archive', 'flux-config-archive', 'configure Flux job archival service', [author], 5),
('man5/flux-config-job-manager', 'flux-config-job-manager', 'configure Flux job manager service', [author], 5),
('man5/flux-config-kvs', 'flux-config-kvs', 'configure Flux kvs service', [author], 5),
('man7/flux-broker-attributes', 'flux-broker-attributes', 'overview Flux broker attributes', [author], 7),
('man7/flux-jobtap-plugins', 'flux-jobtap-plugins', 'overview Flux jobtap plugin API', [author], 7),
]
1 change: 1 addition & 0 deletions doc/test/spell.en.pws
Original file line number Diff line number Diff line change
Expand Up @@ -629,3 +629,4 @@ tmpfiles
EDEADLOCK
setpgrp
nosetpgrp
checkpointed
4 changes: 3 additions & 1 deletion src/modules/kvs/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ kvs_la_SOURCES = \
kvsroot.h \
kvsroot.c \
kvs_wait_version.h \
kvs_wait_version.c
kvs_wait_version.c \
kvs_checkpoint.h \
kvs_checkpoint.c

kvs_la_LDFLAGS = $(fluxmod_ldflags) -module
kvs_la_LIBADD = $(top_builddir)/src/common/libkvs/libkvs.la \
Expand Down
81 changes: 75 additions & 6 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
#include "src/common/libutil/monotime.h"
#include "src/common/libutil/tstat.h"
#include "src/common/libutil/timestamp.h"
#include "src/common/libutil/errprintf.h"
#include "src/common/libkvs/treeobj.h"
#include "src/common/libkvs/kvs_checkpoint.h"
#include "src/common/libkvs/kvs_txn_private.h"
#include "src/common/libkvs/kvs_util_private.h"
#include "src/common/libcontent/content.h"
#include "src/common/libutil/fsd.h"

#include "waitqueue.h"
#include "cache.h"
Expand All @@ -43,6 +45,7 @@
#include "kvstxn.h"
#include "kvsroot.h"
#include "kvs_wait_version.h"
#include "kvs_checkpoint.h"

/* heartbeat_sync_cb() is called periodically to manage cached content
* and namespaces. Synchronize with the system heartbeat if possible,
Expand Down Expand Up @@ -73,6 +76,7 @@ struct kvs_ctx {
bool events_init; /* flag */
const char *hash_name;
unsigned int seq; /* for commit transactions */
kvs_checkpoint_t *kcp;
struct list_head work_queue;
};

Expand All @@ -89,6 +93,8 @@ static void transaction_prep_cb (flux_reactor_t *r, flux_watcher_t *w,
static void transaction_check_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg);
static void start_root_remove (struct kvs_ctx *ctx, const char *ns);
static void work_queue_check_append (struct kvs_ctx *ctx,
struct kvsroot *root);
static void kvstxn_apply (kvstxn_t *kt);

/*
Expand All @@ -103,11 +109,19 @@ static void kvs_ctx_destroy (struct kvs_ctx *ctx)
flux_watcher_destroy (ctx->prep_w);
flux_watcher_destroy (ctx->check_w);
flux_watcher_destroy (ctx->idle_w);
kvs_checkpoint_destroy (ctx->kcp);
free (ctx);
errno = saved_errno;
}
}

static void work_queue_check_append_wrapper (struct kvsroot *root,
void *arg)
{
struct kvs_ctx *ctx = arg;
work_queue_check_append (ctx, root);
}

static struct kvs_ctx *kvs_ctx_create (flux_t *h)
{
flux_reactor_t *r = flux_get_reactor (h);
Expand Down Expand Up @@ -138,6 +152,13 @@ static struct kvs_ctx *kvs_ctx_create (flux_t *h)
goto error;
flux_watcher_start (ctx->prep_w);
flux_watcher_start (ctx->check_w);
ctx->kcp = kvs_checkpoint_create (h,
NULL, /* set later */
0.0, /* default 0.0, set later */
work_queue_check_append_wrapper,
ctx);
if (!ctx->kcp)
goto error;
}
ctx->transaction_merge = 1;
list_head_init (&ctx->work_queue);
Expand Down Expand Up @@ -1044,15 +1065,18 @@ static void kvstxn_apply (kvstxn_t *kt)
done:
if (errnum == 0) {
json_t *names = kvstxn_get_names (kt);
int internal_flags = kvstxn_get_internal_flags (kt);
int count;
if ((count = json_array_size (names)) > 1) {
int opcount = 0;
opcount = json_array_size (kvstxn_get_ops (kt));
flux_log (ctx->h, LOG_DEBUG, "aggregated %d transactions (%d ops)",
count, opcount);
}
setroot (ctx, root, kvstxn_get_newroot_ref (kt), root->seq + 1);
setroot_event_send (ctx, root, names, kvstxn_get_keys (kt));
if (!(internal_flags & KVSTXN_INTERNAL_FLAG_NO_PUBLISH)) {
setroot (ctx, root, kvstxn_get_newroot_ref (kt), root->seq + 1);
setroot_event_send (ctx, root, names, kvstxn_get_keys (kt));
}
} else {
fallback = kvstxn_fallback_mergeable (kt);

Expand Down Expand Up @@ -1570,7 +1594,7 @@ static void relaycommit_request_cb (flux_t *h, flux_msg_handler_t *mh,
goto error;
}

if (kvstxn_mgr_add_transaction (root->ktm, name, ops, flags) < 0) {
if (kvstxn_mgr_add_transaction (root->ktm, name, ops, flags, 0) < 0) {
flux_log_error (h, "%s: kvstxn_mgr_add_transaction",
__FUNCTION__);
goto error;
Expand Down Expand Up @@ -1651,7 +1675,8 @@ static void commit_request_cb (flux_t *h, flux_msg_handler_t *mh,
if (kvstxn_mgr_add_transaction (root->ktm,
treq_get_name (tr),
ops,
flags) < 0) {
flags,
0) < 0) {
flux_log_error (h, "%s: kvstxn_mgr_add_transaction",
__FUNCTION__);
goto error;
Expand Down Expand Up @@ -1749,7 +1774,8 @@ static void relayfence_request_cb (flux_t *h, flux_msg_handler_t *mh,
if (kvstxn_mgr_add_transaction (root->ktm,
treq_get_name (tr),
treq_get_ops (tr),
treq_get_flags (tr)) < 0) {
treq_get_flags (tr),
0) < 0) {
flux_log_error (h, "%s: kvstxn_mgr_add_transaction",
__FUNCTION__);
goto error;
Expand Down Expand Up @@ -1856,7 +1882,8 @@ static void fence_request_cb (flux_t *h, flux_msg_handler_t *mh,
if (kvstxn_mgr_add_transaction (root->ktm,
treq_get_name (tr),
treq_get_ops (tr),
treq_get_flags (tr)) < 0) {
treq_get_flags (tr),
0) < 0) {
flux_log_error (h, "%s: kvstxn_mgr_add_transaction",
__FUNCTION__);
goto error;
Expand Down Expand Up @@ -2698,6 +2725,30 @@ static void setroot_unpause_request_cb (flux_t *h, flux_msg_handler_t *mh,
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
}

static void config_reload_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
struct kvs_ctx *ctx = arg;
const flux_conf_t *conf;
const char *errstr = NULL;
flux_error_t error;

if (flux_conf_reload_decode (msg, &conf) < 0)
goto error;
if (kvs_checkpoint_reload (ctx->kcp, conf, &error) < 0) {
errstr = error.text;
goto error;
}
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "error responding to config-reload request");
return;
error:
if (flux_respond_error (h, msg, errno, errstr) < 0)
flux_log_error (h, "error responding to config-reload request");
}

/* see comments above in event_subscribe() regarding event
* subscriptions to kvs.namespace */
static const struct flux_msg_handler_spec htab[] = {
Expand Down Expand Up @@ -2735,9 +2786,22 @@ static const struct flux_msg_handler_spec htab[] = {
setroot_pause_request_cb, FLUX_ROLE_USER },
{ FLUX_MSGTYPE_REQUEST, "kvs.setroot-unpause",
setroot_unpause_request_cb, FLUX_ROLE_USER },
{ FLUX_MSGTYPE_REQUEST, "kvs.config-reload", config_reload_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

static int process_config (struct kvs_ctx *ctx)
{
flux_error_t error;
if (kvs_checkpoint_config_parse (ctx->kcp,
flux_get_conf (ctx->h),
&error) < 0) {
flux_log (ctx->h, LOG_ERR, "%s", error.text);
return -1;
}
return 0;
}

static void process_args (struct kvs_ctx *ctx, int ac, char **av)
{
int i;
Expand Down Expand Up @@ -2891,6 +2955,8 @@ int mod_main (flux_t *h, int argc, char **argv)
flux_log_error (h, "error creating KVS context");
goto done;
}
if (process_config (ctx) < 0)
goto done;
process_args (ctx, argc, argv);
if (ctx->rank == 0) {
struct kvsroot *root;
Expand Down Expand Up @@ -2934,6 +3000,8 @@ int mod_main (flux_t *h, int argc, char **argv)
flux_log_error (h, "event_subscribe");
goto done;
}

kvs_checkpoint_update_root_primary (ctx->kcp, root);
}
if (flux_msg_handler_addvec (h, htab, ctx, &handlers) < 0) {
flux_log_error (h, "flux_msg_handler_addvec");
Expand All @@ -2947,6 +3015,7 @@ int mod_main (flux_t *h, int argc, char **argv)
flux_log_error (h, "error starting heartbeat synchronization");
goto done;
}
kvs_checkpoint_start (ctx->kcp);
if (flux_reactor_run (flux_get_reactor (h), 0) < 0) {
flux_log_error (h, "flux_reactor_run");
goto done;
Expand Down
Loading

0 comments on commit 007b4a6

Please sign in to comment.