Skip to content

Commit

Permalink
libkvs: add kvs checkpoint convenience functions
Browse files Browse the repository at this point in the history
Problem: Some kvs checkpointing operations are duplicated.  It
would be convenient if there were common functions for it.

Solution: Add common checkpointing operations into a new
kvs_checkpoint api in libkvs.  Keep the API private for now.
Add unit tests.

Fixes flux-framework#4145
  • Loading branch information
chu11 committed Feb 28, 2022
1 parent dd0b947 commit f6cb161
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 0 deletions.
7 changes: 7 additions & 0 deletions src/common/libkvs/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ libkvs_la_SOURCES = \
kvs.c \
kvs_lookup.c \
kvs_getroot.c \
kvs_checkpoint.c \
kvs_checkpoint_private.h \
kvs_dir.c \
kvs_dir_private.h \
kvs_commit.c \
Expand Down Expand Up @@ -47,6 +49,7 @@ TESTS = \
test_kvs_commit.t \
test_kvs_getroot.t \
test_treeobj.t \
test_kvs_checkpoint.t \
test_kvs_copy.t \
test_kvs_util.t

Expand Down Expand Up @@ -104,6 +107,10 @@ test_treeobj_t_SOURCES = test/treeobj.c
test_treeobj_t_CPPFLAGS = $(test_cppflags)
test_treeobj_t_LDADD = $(test_ldadd)

test_kvs_checkpoint_t_SOURCES = test/kvs_checkpoint.c
test_kvs_checkpoint_t_CPPFLAGS = $(test_cppflags)
test_kvs_checkpoint_t_LDADD = $(test_ldadd)

test_kvs_copy_t_SOURCES = test/kvs_copy.c
test_kvs_copy_t_CPPFLAGS = $(test_cppflags)
test_kvs_copy_t_LDADD = $(test_ldadd)
Expand Down
157 changes: 157 additions & 0 deletions src/common/libkvs/kvs_checkpoint.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/************************************************************\
* Copyright 2022 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <assert.h>
#include <errno.h>
#include <stdlib.h>
#include <jansson.h>
#include <flux/core.h>
#include <time.h>

#include "kvs_checkpoint_private.h"

flux_future_t *kvs_checkpoint_update (flux_t *h,
const char *key,
const char *rootref)
{
flux_future_t *f = NULL;
double timestamp;
json_t *o = NULL;
int save_errno;

if (!h || !key || !rootref) {
errno = EINVAL;
return NULL;
}

timestamp = flux_reactor_now (flux_get_reactor (h));

/* version 0 checkpoint
* - rootref string only
* version 1 checkpoint object
* - {"version":1 "rootref":s "timestamp":f}
*/
if (!(o = json_pack ("{s:i s:s s:f}",
"version", 1,
"rootref", rootref,
"timestamp", timestamp))) {
errno = ENOMEM;
goto error;
}

if (!(f = flux_rpc_pack (h,
"kvs-checkpoint.put",
0,
0,
"{s:s s:O}",
"key",
key,
"value",
o)))
goto error;

json_decref (o);
return f;

error:
save_errno = errno;
flux_future_destroy (f);
json_decref (o);
errno = save_errno;
return NULL;
}

flux_future_t *kvs_checkpoint_lookup (flux_t *h, const char *key)
{
if (!h || !key) {
errno = EINVAL;
return NULL;
}

return flux_rpc_pack (h,
"kvs-checkpoint.get",
0,
0,
"{s:s}",
"key",
key);
}

int kvs_checkpoint_lookup_get_rootref (flux_future_t *f, const char **rootref)
{
json_t *o;

if (!f || !rootref) {
errno = EINVAL;
return -1;
}

if (flux_rpc_get_unpack (f, "{s:o}", "value", &o) < 0)
return -1;

/* N.B. no need to check version, all versions support rootref */

if (json_unpack (o, "{s:s}", "rootref", rootref) < 0) {
errno = EINVAL;
return -1;
}
return 0;
}

/* returns "N/A" if not available */
int kvs_checkpoint_lookup_get_formatted_timestamp (flux_future_t *f,
char *buf,
size_t len)
{
int version;
json_t *o;

if (!f || !buf || len <= 0) {
errno = EINVAL;
return -1;
}

if (flux_rpc_get_unpack (f, "{s:o}", "value", &o) < 0)
return -1;

if (json_unpack (o, "{s:i}", "version", &version) < 0) {
errno = EINVAL;
return -1;
}

if (version == 0) {
snprintf (buf, len, "N/A");
}
else if (version == 1) {
double timestamp;
time_t sec;
struct tm tm;

if (json_unpack (o, "{s:f}", "timestamp", &timestamp) < 0) {
errno = EINVAL;
return -1;
}
sec = timestamp;
gmtime_r (&sec, &tm);
strftime (buf, len, "%FT%T", &tm);
}
else {
errno = EINVAL;
return -1;
}
return 0;
}

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
33 changes: 33 additions & 0 deletions src/common/libkvs/kvs_checkpoint_private.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/************************************************************\
* Copyright 2022 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

#ifndef _KVS_CHECKPOINT_PRIVATE_H
#define _KVS_CHECKPOINT_PRIVATE_H

#include <flux/core.h>

flux_future_t *kvs_checkpoint_update (flux_t *h,
const char *key,
const char *rootref);

flux_future_t *kvs_checkpoint_lookup (flux_t *h, const char *key);

int kvs_checkpoint_lookup_get_rootref (flux_future_t *f, const char **rootref);

/* returns "N/A" if not available */
int kvs_checkpoint_lookup_get_formatted_timestamp (flux_future_t *f,
char *buf,
size_t len);

#endif /* !_KVS_CHECKPOINT_PRIVATE_H */

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
80 changes: 80 additions & 0 deletions src/common/libkvs/test/kvs_checkpoint.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/************************************************************\
* Copyright 2022 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

#if HAVE_CONFIG_H
#include "config.h"
#endif

#include <string.h>
#include <errno.h>
#include <flux/core.h>

#include "src/common/libflux/flux.h"
#include "kvs_checkpoint_private.h"
#include "src/common/libtap/tap.h"

void errors (void)
{
flux_future_t *f;
const char *rootref;
char buf[128];

errno = 0;
ok (kvs_checkpoint_update (NULL, NULL, NULL) == NULL
&& errno == EINVAL,
"kvs_checkpoint_update fails on bad input");

errno = 0;
ok (kvs_checkpoint_lookup (NULL, NULL) == NULL
&& errno == EINVAL,
"kvs_checkpoint_lookup fails on bad input");

errno = 0;
ok (kvs_checkpoint_lookup_get_rootref (NULL, NULL) < 0
&& errno == EINVAL,
"kvs_checkpoint_lookup_get_rootref fails on bad input");

errno = 0;
ok (kvs_checkpoint_lookup_get_formatted_timestamp (NULL, NULL, 0) < 0
&& errno == EINVAL,
"kvs_checkpoint_lookup_get_formatted_timestamp fails on bad input");

if (!(f = flux_future_create (NULL, NULL)))
BAIL_OUT ("flux_future_create failed");

errno = 0;
ok (kvs_checkpoint_lookup_get_rootref (f, &rootref) < 0
&& errno == EINVAL,
"kvs_checkpoint_lookup_get_rootref fails on unfulfilled future");

errno = 0;
ok (kvs_checkpoint_lookup_get_formatted_timestamp (f, buf, 128) < 0
&& errno == EINVAL,
"kvs_checkpoint_lookup_get_formatted_timestamp "
"fails on unfulfilled future");

flux_future_destroy (f);
}

int main (int argc, char *argv[])
{

plan (NO_PLAN);

errors ();

done_testing();
return (0);
}

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/

0 comments on commit f6cb161

Please sign in to comment.