Skip to content

Commit

Permalink
content-{sqlite,files,s3}: refactor checkpoint
Browse files Browse the repository at this point in the history
Problem: We would like to checkpoint more complex json data
structures to the content backing service, but since the
content checkpoint services only take string values, it is
inconvenient to have to encode/decode all json objects
into/from strings.

Solution: Update the checkpoint protocol to instead
take / send a json object.  Support backwards compatibility
by sending raw blobref in new json object format.  Update
all callers accordingly and add additional tests.

Fixes #4144
  • Loading branch information
chu11 committed Feb 24, 2022
1 parent 3ed86a9 commit 93503b3
Show file tree
Hide file tree
Showing 9 changed files with 331 additions and 153 deletions.
23 changes: 21 additions & 2 deletions src/cmd/builtin/startlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,45 @@ static void content_flush (flux_t *h)
static void kvs_checkpoint_put (flux_t *h, const char *treeobj)
{
json_t *o;
json_t *value;
const char *rootref;
flux_future_t *f;
double timestamp;

if (!(o = treeobj_decode (treeobj))
|| !(rootref = treeobj_get_blobref (o, 0)))
log_err_exit ("Error decoding treeobj from eventlog commit");

timestamp = flux_reactor_now (flux_get_reactor (h));

/* version 0 checkpoint
* - blobref string only
* version 1 checkpoint object
* - {"version":1 "rootref":s "timestamp":f}
*/
if (!(value = json_pack ("{s:i s:s s:f}",
"version", 1,
"rootref", rootref,
"timestamp", timestamp))) {
errno = ENOMEM;
log_err_exit ("Error encoding checkpoint object");
}

if (!(f = flux_rpc_pack (h,
"kvs-checkpoint.put",
0,
0,
"{s:s s:s}",
"{s:s s:O}",
"key",
"kvs-primary",
"value",
rootref))
value))
|| flux_rpc_get (f, NULL) < 0)
log_msg_exit ("Error writing kvs checkpoint: %s",
future_strerror (f, errno));
flux_future_destroy (f);
json_decref (o);
json_decref (value);
}

static void post_startlog_event (flux_t *h,
Expand Down
30 changes: 25 additions & 5 deletions src/modules/content-files/content-files.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include "config.h"
#endif
#include <flux/core.h>
#include <jansson.h>

#include "src/common/libutil/blobref.h"
#include "src/common/libutil/log.h"
Expand Down Expand Up @@ -157,24 +158,36 @@ void checkpoint_get_cb (flux_t *h,
const char *key;
void *data = NULL;
size_t size;
json_t *o = NULL;
const char *errstr = NULL;

if (flux_request_unpack (msg, NULL, "{s:s}", "key", &key) < 0)
goto error;
if (filedb_get (ctx->dbpath, key, &data, &size, &errstr) < 0)
goto error;
if (size > 0) {
/* recovery from version 0 checkpoint blobref not supported */
if (!(o = json_loads (data, 0, NULL))) {
errno = EINVAL;
goto error;
}
}
else
o = json_null ();
if (flux_respond_pack (h,
msg,
"{s:s}",
"{s:O}",
"value",
size > 0 ? data : "") < 0)
o) < 0)
flux_log_error (h, "error responding to kvs-checkpoint.get request");
free (data);
json_decref (o);
return;
error:
if (flux_respond_error (h, msg, errno, errstr) < 0)
flux_log_error (h, "error responding to kvs-checkpoint.get request");
free (data);
json_decref (o);
}

/* Handle a kvs-checkpoint.put request from the rank 0 kvs module.
Expand All @@ -187,25 +200,32 @@ void checkpoint_put_cb (flux_t *h,
{
struct content_files *ctx = arg;
const char *key;
const char *value;
json_t *o;
char *value = NULL;
const char *errstr = NULL;

if (flux_request_unpack (msg,
NULL,
"{s:s s:s}",
"{s:s s:o}",
"key",
&key,
"value",
&value) < 0)
&o) < 0)
goto error;
if (!(value = json_dumps (o, JSON_COMPACT))) {
errno = EINVAL;
goto error;
}
if (filedb_put (ctx->dbpath, key, value, strlen (value), &errstr) < 0)
goto error;
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "error responding to kvs-checkpoint.put request");
free (value);
return;
error:
if (flux_respond_error (h, msg, errno, errstr) < 0)
flux_log_error (h, "error responding to kvs-checkpoint.put request");
free (value);
}

/* Destroy module context.
Expand Down
34 changes: 24 additions & 10 deletions src/modules/content-s3/content-s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -326,35 +326,42 @@ void checkpoint_get_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg
struct content_s3 *ctx = arg;
const char *key;
void *data = NULL;
char *dup = NULL;
size_t size;
json_t *o = NULL;

if (flux_request_unpack (msg, NULL, "{s:s}", "key", &key) < 0)
goto error;

if (s3_get (ctx->cfg, key, &data, &size, &errstr) < 0)
goto error;

if (!(dup = strndup (data, size)))
goto error;
if (size > 0) {
if (!(o = json_loadb (data, size, 0, NULL))) {
/* recovery from version 0 checkpoint blobref not supported */
errno = EINVAL;
goto error;
}
}
else
o = json_null ();

if (flux_respond_pack (h,
msg,
"{s:s}",
"{s:O}",
"value",
size > 0 ? dup : "") < 0) {
o) < 0) {
errno = EIO;
flux_log_error (h, "error responding to kvs-checkpoint.get request (pack)");
}
free (data);
free (dup);
json_decref (o);
return;

error:
if (flux_respond_error (h, msg, errno, errstr) < 0)
flux_log_error (h, "error responding to kvs-checkpoint.get request");
free (data);
free (dup);
json_decref (o);
}

/* Handle a kvs-checkpoint.put request from the rank 0 kvs module.
Expand All @@ -364,26 +371,33 @@ void checkpoint_put_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg
{
struct content_s3 *ctx = arg;
const char *key;
const char *value;
json_t *o;
char *value = NULL;
const char *errstr = NULL;

if (flux_request_unpack (msg,
NULL,
"{s:s s:s}",
"{s:s s:o}",
"key",
&key,
"value",
&value) < 0)
&o) < 0)
goto error;
if (!(value = json_dumps (o, JSON_COMPACT))) {
errno = EINVAL;
goto error;
}
if (s3_put (ctx->cfg, key, value, strlen (value), &errstr) < 0)
goto error;
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "error responding to kvs-checkpoint.put request (pack)");
free (value);
return;

error:
if (flux_respond_error (h, msg, errno, errstr) < 0)
flux_log_error (h, "error responding to kvs-checkpoint.put request");
free (value);
}

/* Table of message handler callbacks registered below.
Expand Down
37 changes: 31 additions & 6 deletions src/modules/content-sqlite/content-sqlite.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <sqlite3.h>
#include <lz4.h>
#include <flux/core.h>
#include <jansson.h>

#include "src/common/libutil/blobref.h"
#include "src/common/libutil/log.h"
Expand Down Expand Up @@ -337,6 +338,8 @@ void checkpoint_get_cb (flux_t *h,
{
struct content_sqlite *ctx = arg;
const char *key;
char *s;
json_t *o = NULL;

if (flux_request_unpack (msg, NULL, "{s:s}", "key", &key) < 0)
goto error;
Expand All @@ -353,18 +356,33 @@ void checkpoint_get_cb (flux_t *h,
errno = ENOENT;
goto error;
}
s = (char *)sqlite3_column_text (ctx->checkpt_get_stmt, 0);
if (!(o = json_loads (s, 0, NULL))) {
/* version 0 checkpoint is a blobref string, used before v0.36
* reply with updated object representation.
*/
if (!(o = json_pack ("{s:i s:s s:f}",
"version", 0,
"rootref", s,
"timestamp", 0.))) {
errno = EINVAL;
goto error;
}
}
if (flux_respond_pack (h,
msg,
"{s:s}",
"{s:O}",
"value",
sqlite3_column_text (ctx->checkpt_get_stmt, 0)) < 0)
o) < 0)
flux_log_error (h, "flux_respond_pack");
(void )sqlite3_reset (ctx->checkpt_get_stmt);
json_decref (o);
return;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "flux_respond_error");
(void )sqlite3_reset (ctx->checkpt_get_stmt);
json_decref (o);
}

void checkpoint_put_cb (flux_t *h,
Expand All @@ -374,16 +392,21 @@ void checkpoint_put_cb (flux_t *h,
{
struct content_sqlite *ctx = arg;
const char *key;
const char *value;
json_t *o;
char *value = NULL;

if (flux_request_unpack (msg,
NULL,
"{s:s s:s}",
"{s:s s:o}",
"key",
&key,
"value",
&value) < 0)
&o) < 0)
goto error;
if (!(value = json_dumps (o, JSON_COMPACT))) {
errno = EINVAL;
goto error;
}
if (sqlite3_bind_text (ctx->checkpt_put_stmt,
1,
(char *)key,
Expand All @@ -395,7 +418,7 @@ void checkpoint_put_cb (flux_t *h,
}
if (sqlite3_bind_text (ctx->checkpt_put_stmt,
2,
(char *)value,
value,
strlen (value),
SQLITE_STATIC) != SQLITE_OK) {
log_sqlite_error (ctx, "checkpt_put: binding value");
Expand All @@ -411,11 +434,13 @@ void checkpoint_put_cb (flux_t *h,
if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "flux_respond");
(void )sqlite3_reset (ctx->checkpt_put_stmt);
free (value);
return;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "flux_respond_error");
(void )sqlite3_reset (ctx->checkpt_put_stmt);
free (value);
}

static void content_sqlite_closedb (struct content_sqlite *ctx)
Expand Down
Loading

0 comments on commit 93503b3

Please sign in to comment.