Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvs: add defensive checkpoint via sync configuration option #4383

Merged
merged 7 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion doc/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,8 @@ MAN5_FILES_PRIMARY = \
man5/flux-config-resource.5 \
man5/flux-config-archive.5 \
man5/flux-config-job-manager.5 \
man5/flux-config-ingest.5
man5/flux-config-ingest.5 \
man5/flux-config-kvs.5


MAN7_FILES = $(MAN7_FILES_PRIMARY)
Expand Down
46 changes: 46 additions & 0 deletions doc/man5/flux-config-kvs.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
==================
flux-config-kvs(5)
==================


DESCRIPTION
===========

The Flux system instance **kvs** service provides the primary key value
store (i.e. "the KVS") for a large number of Flux services. For
example, job eventlogs are stored in the KVS.

The ``kvs`` table may contain the following keys:


KEYS
====

checkpoint-period
(optional) Sets a period of time (in RFC 23 Flux Standard Duration
format) that the KVS will regularly checkpoint a reference to its
primary namespace. The checkpoint is used to protect against data
loss in the event of a Flux broker crash.


EXAMPLE
=======

::

[kvs]
checkpoint-period = "30m"


RESOURCES
=========

Flux: http://flux-framework.org

RFC 23: Flux Standard Duration: https://flux-framework.readthedocs.io/projects/flux-rfc/en/latest/spec_23.html


SEE ALSO
========

:man5:`flux-config`
2 changes: 1 addition & 1 deletion doc/man5/flux-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,4 @@ SEE ALSO
:man1:`flux-broker`, :man5:`flux-config-access`, :man5:`flux-config-bootstrap`,
:man5:`flux-config-tbon`, :man5:`flux-config-exec`, :man5:`flux-config-ingest`,
:man5:`flux-config-resource`, :man5:`flux-config-archive`,
:man5:`flux-config-job-manager`
:man5:`flux-config-job-manager`, :man5:`flux-config-kvs`
1 change: 1 addition & 0 deletions doc/manpages.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@
('man5/flux-config-resource', 'flux-config-resource', 'configure Flux resource service', [author], 5),
('man5/flux-config-archive', 'flux-config-archive', 'configure Flux job archival service', [author], 5),
('man5/flux-config-job-manager', 'flux-config-job-manager', 'configure Flux job manager service', [author], 5),
('man5/flux-config-kvs', 'flux-config-kvs', 'configure Flux kvs service', [author], 5),
('man7/flux-broker-attributes', 'flux-broker-attributes', 'overview Flux broker attributes', [author], 7),
('man7/flux-jobtap-plugins', 'flux-jobtap-plugins', 'overview Flux jobtap plugin API', [author], 7),
]
1 change: 1 addition & 0 deletions doc/test/spell.en.pws
Original file line number Diff line number Diff line change
Expand Up @@ -629,3 +629,4 @@ tmpfiles
EDEADLOCK
setpgrp
nosetpgrp
checkpointed
4 changes: 3 additions & 1 deletion src/modules/kvs/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ kvs_la_SOURCES = \
kvsroot.h \
kvsroot.c \
kvs_wait_version.h \
kvs_wait_version.c
kvs_wait_version.c \
kvs_checkpoint.h \
kvs_checkpoint.c

kvs_la_LDFLAGS = $(fluxmod_ldflags) -module
kvs_la_LIBADD = $(top_builddir)/src/common/libkvs/libkvs.la \
Expand Down
81 changes: 75 additions & 6 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
#include "src/common/libutil/monotime.h"
#include "src/common/libutil/tstat.h"
#include "src/common/libutil/timestamp.h"
#include "src/common/libutil/errprintf.h"
#include "src/common/libkvs/treeobj.h"
#include "src/common/libkvs/kvs_checkpoint.h"
#include "src/common/libkvs/kvs_txn_private.h"
#include "src/common/libkvs/kvs_util_private.h"
#include "src/common/libcontent/content.h"
#include "src/common/libutil/fsd.h"

#include "waitqueue.h"
#include "cache.h"
Expand All @@ -43,6 +45,7 @@
#include "kvstxn.h"
#include "kvsroot.h"
#include "kvs_wait_version.h"
#include "kvs_checkpoint.h"

/* heartbeat_sync_cb() is called periodically to manage cached content
* and namespaces. Synchronize with the system heartbeat if possible,
Expand Down Expand Up @@ -73,6 +76,7 @@ struct kvs_ctx {
bool events_init; /* flag */
const char *hash_name;
unsigned int seq; /* for commit transactions */
kvs_checkpoint_t *kcp;
struct list_head work_queue;
};

Expand All @@ -89,6 +93,8 @@ static void transaction_prep_cb (flux_reactor_t *r, flux_watcher_t *w,
static void transaction_check_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg);
static void start_root_remove (struct kvs_ctx *ctx, const char *ns);
static void work_queue_check_append (struct kvs_ctx *ctx,
struct kvsroot *root);
static void kvstxn_apply (kvstxn_t *kt);

/*
Expand All @@ -103,11 +109,19 @@ static void kvs_ctx_destroy (struct kvs_ctx *ctx)
flux_watcher_destroy (ctx->prep_w);
flux_watcher_destroy (ctx->check_w);
flux_watcher_destroy (ctx->idle_w);
kvs_checkpoint_destroy (ctx->kcp);
free (ctx);
errno = saved_errno;
}
}

static void work_queue_check_append_wrapper (struct kvsroot *root,
void *arg)
{
struct kvs_ctx *ctx = arg;
work_queue_check_append (ctx, root);
}

static struct kvs_ctx *kvs_ctx_create (flux_t *h)
{
flux_reactor_t *r = flux_get_reactor (h);
Expand Down Expand Up @@ -138,6 +152,13 @@ static struct kvs_ctx *kvs_ctx_create (flux_t *h)
goto error;
flux_watcher_start (ctx->prep_w);
flux_watcher_start (ctx->check_w);
ctx->kcp = kvs_checkpoint_create (h,
NULL, /* set later */
0.0, /* default 0.0, set later */
work_queue_check_append_wrapper,
ctx);
if (!ctx->kcp)
goto error;
}
ctx->transaction_merge = 1;
list_head_init (&ctx->work_queue);
Expand Down Expand Up @@ -1044,15 +1065,18 @@ static void kvstxn_apply (kvstxn_t *kt)
done:
if (errnum == 0) {
json_t *names = kvstxn_get_names (kt);
int internal_flags = kvstxn_get_internal_flags (kt);
int count;
if ((count = json_array_size (names)) > 1) {
int opcount = 0;
opcount = json_array_size (kvstxn_get_ops (kt));
flux_log (ctx->h, LOG_DEBUG, "aggregated %d transactions (%d ops)",
count, opcount);
}
setroot (ctx, root, kvstxn_get_newroot_ref (kt), root->seq + 1);
setroot_event_send (ctx, root, names, kvstxn_get_keys (kt));
if (!(internal_flags & KVSTXN_INTERNAL_FLAG_NO_PUBLISH)) {
setroot (ctx, root, kvstxn_get_newroot_ref (kt), root->seq + 1);
setroot_event_send (ctx, root, names, kvstxn_get_keys (kt));
}
} else {
fallback = kvstxn_fallback_mergeable (kt);

Expand Down Expand Up @@ -1570,7 +1594,7 @@ static void relaycommit_request_cb (flux_t *h, flux_msg_handler_t *mh,
goto error;
}

if (kvstxn_mgr_add_transaction (root->ktm, name, ops, flags) < 0) {
if (kvstxn_mgr_add_transaction (root->ktm, name, ops, flags, 0) < 0) {
flux_log_error (h, "%s: kvstxn_mgr_add_transaction",
__FUNCTION__);
goto error;
Expand Down Expand Up @@ -1651,7 +1675,8 @@ static void commit_request_cb (flux_t *h, flux_msg_handler_t *mh,
if (kvstxn_mgr_add_transaction (root->ktm,
treq_get_name (tr),
ops,
flags) < 0) {
flags,
0) < 0) {
flux_log_error (h, "%s: kvstxn_mgr_add_transaction",
__FUNCTION__);
goto error;
Expand Down Expand Up @@ -1749,7 +1774,8 @@ static void relayfence_request_cb (flux_t *h, flux_msg_handler_t *mh,
if (kvstxn_mgr_add_transaction (root->ktm,
treq_get_name (tr),
treq_get_ops (tr),
treq_get_flags (tr)) < 0) {
treq_get_flags (tr),
0) < 0) {
flux_log_error (h, "%s: kvstxn_mgr_add_transaction",
__FUNCTION__);
goto error;
Expand Down Expand Up @@ -1856,7 +1882,8 @@ static void fence_request_cb (flux_t *h, flux_msg_handler_t *mh,
if (kvstxn_mgr_add_transaction (root->ktm,
treq_get_name (tr),
treq_get_ops (tr),
treq_get_flags (tr)) < 0) {
treq_get_flags (tr),
0) < 0) {
flux_log_error (h, "%s: kvstxn_mgr_add_transaction",
__FUNCTION__);
goto error;
Expand Down Expand Up @@ -2698,6 +2725,30 @@ static void setroot_unpause_request_cb (flux_t *h, flux_msg_handler_t *mh,
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
}

static void config_reload_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
struct kvs_ctx *ctx = arg;
const flux_conf_t *conf;
const char *errstr = NULL;
flux_error_t error;

if (flux_conf_reload_decode (msg, &conf) < 0)
goto error;
if (kvs_checkpoint_reload (ctx->kcp, conf, &error) < 0) {
errstr = error.text;
goto error;
}
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "error responding to config-reload request");
return;
error:
if (flux_respond_error (h, msg, errno, errstr) < 0)
flux_log_error (h, "error responding to config-reload request");
}

/* see comments above in event_subscribe() regarding event
* subscriptions to kvs.namespace */
static const struct flux_msg_handler_spec htab[] = {
Expand Down Expand Up @@ -2735,9 +2786,22 @@ static const struct flux_msg_handler_spec htab[] = {
setroot_pause_request_cb, FLUX_ROLE_USER },
{ FLUX_MSGTYPE_REQUEST, "kvs.setroot-unpause",
setroot_unpause_request_cb, FLUX_ROLE_USER },
{ FLUX_MSGTYPE_REQUEST, "kvs.config-reload", config_reload_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

static int process_config (struct kvs_ctx *ctx)
{
flux_error_t error;
if (kvs_checkpoint_config_parse (ctx->kcp,
flux_get_conf (ctx->h),
&error) < 0) {
flux_log (ctx->h, LOG_ERR, "%s", error.text);
return -1;
}
return 0;
}

static void process_args (struct kvs_ctx *ctx, int ac, char **av)
{
int i;
Expand Down Expand Up @@ -2891,6 +2955,8 @@ int mod_main (flux_t *h, int argc, char **argv)
flux_log_error (h, "error creating KVS context");
goto done;
}
if (process_config (ctx) < 0)
garlick marked this conversation as resolved.
Show resolved Hide resolved
goto done;
process_args (ctx, argc, argv);
if (ctx->rank == 0) {
struct kvsroot *root;
Expand Down Expand Up @@ -2934,6 +3000,8 @@ int mod_main (flux_t *h, int argc, char **argv)
flux_log_error (h, "event_subscribe");
goto done;
}

kvs_checkpoint_update_root_primary (ctx->kcp, root);
}
if (flux_msg_handler_addvec (h, htab, ctx, &handlers) < 0) {
flux_log_error (h, "flux_msg_handler_addvec");
Expand All @@ -2947,6 +3015,7 @@ int mod_main (flux_t *h, int argc, char **argv)
flux_log_error (h, "error starting heartbeat synchronization");
goto done;
}
kvs_checkpoint_start (ctx->kcp);
if (flux_reactor_run (flux_get_reactor (h), 0) < 0) {
flux_log_error (h, "flux_reactor_run");
goto done;
Expand Down
Loading