Skip to content

Commit

Permalink
modules/kvs: use kvs checkpoint functions
Browse files Browse the repository at this point in the history
Problem: kvs code can be simplified by using kvs checkpointing
helper functions.

Solution: Use kvs checkpointing helper functions when handling
kvs primary checkpoint.
  • Loading branch information
chu11 committed Feb 28, 2022
1 parent 7b299b7 commit efe367a
Showing 1 changed file with 20 additions and 76 deletions.
96 changes: 20 additions & 76 deletions src/modules/kvs/kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "src/common/libutil/monotime.h"
#include "src/common/libutil/tstat.h"
#include "src/common/libkvs/treeobj.h"
#include "src/common/libkvs/kvs_checkpoint_private.h"
#include "src/common/libkvs/kvs_txn_private.h"
#include "src/common/libkvs/kvs_util_private.h"

Expand Down Expand Up @@ -2714,62 +2715,34 @@ static void process_args (struct kvs_ctx *ctx, int ac, char **av)
static int checkpoint_get (flux_t *h, const char *key, char *buf, size_t len)
{
flux_future_t *f = NULL;
json_t *o;
char datestr[128];
int version;
const char *rootref = NULL;
double timestamp = 0.;

if (!(f = flux_rpc_pack (h,
"kvs-checkpoint.get",
0,
0,
"{s:s}",
"key",
key)))
const char *rootref;
char datestr[128] = {0};
int rv = -1;

if (!(f = kvs_checkpoint_lookup (h, "kvs-primary")))
return -1;
if (flux_rpc_get_unpack (f, "{s:o}", "value", &o) < 0)
goto error;

if (json_unpack (o, "{s:i}", "version", &version) < 0)
goto error_einval;
if (kvs_checkpoint_lookup_get_rootref (f, &rootref) < 0)
goto error;

if (version == 0) {
if (json_unpack (o, "{s:s}", "rootref", &rootref) < 0)
goto error_einval;
}
else if (version == 1) {
if (json_unpack (o, "{s:s s:f}",
"rootref", &rootref,
"timestamp", &timestamp) < 0)
goto error_einval;
if (strlen (rootref) >= len) {
errno = EINVAL;
goto error;
}
else
goto error_einval;

if (strlen (rootref) >= len)
goto error_einval;
strcpy (buf, rootref);

if (timestamp > 0.) {
time_t sec = timestamp;
struct tm tm;
gmtime_r (&sec, &tm);
strftime (datestr, sizeof (datestr), "%FT%T", &tm);
}
else
snprintf (datestr, sizeof (datestr), "N/A");
if (kvs_checkpoint_lookup_get_formatted_timestamp (f,
datestr,
sizeof (datestr)) < 0)
goto error;

flux_log (h, LOG_INFO,
"restored kvs-primary from checkpoint on %s", datestr);
flux_future_destroy (f);
return 0;

error_einval:
errno = EINVAL;
rv = 0;
error:
flux_future_destroy (f);
return -1;
return rv;
}

/* Synchronously store key-value pair to checkpoint service.
Expand All @@ -2778,43 +2751,14 @@ static int checkpoint_get (flux_t *h, const char *key, char *buf, size_t len)
static int checkpoint_put (flux_t *h, const char *key, const char *rootref)
{
flux_future_t *f = NULL;
double timestamp;
json_t *o = NULL;
int save_errno, rv = -1;

timestamp = flux_reactor_now (flux_get_reactor (h));

/* version 0 checkpoint
* - blobref string only
* version 1 checkpoint object
* - {"version":1 "rootref":s "timestamp":f}
*/
if (!(o = json_pack ("{s:i s:s s:f}",
"version", 1,
"rootref", rootref,
"timestamp", timestamp))) {
errno = ENOMEM;
goto error;
}
int rv = -1;

if (!(f = flux_rpc_pack (h,
"kvs-checkpoint.put",
0,
0,
"{s:s s:O}",
"key",
key,
"value",
o)))
goto error;
if (flux_rpc_get (f, NULL) < 0)
if (!(f = kvs_checkpoint_update (h, "kvs-primary", rootref))
|| flux_rpc_get (f, NULL) < 0)
goto error;
rv = 0;
error:
save_errno = errno;
flux_future_destroy (f);
json_decref (o);
errno = save_errno;
return rv;
}

Expand Down

0 comments on commit efe367a

Please sign in to comment.