Skip to content

Commit

Permalink
libkvs: add kvs checkpoint convenience functions
Browse files Browse the repository at this point in the history
Problem: Some kvs checkpointing operations are duplicated.  It
would be convenient if there were common functions for it.

Solution: Add common checkpointing operations into a new
kvs_checkpoint api in libkvs.  Keep the API private for now.
Add unit tests.

Fixes flux-framework#4145
  • Loading branch information
chu11 committed Mar 1, 2022
1 parent 5744887 commit d75f743
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 0 deletions.
7 changes: 7 additions & 0 deletions src/common/libkvs/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ libkvs_la_SOURCES = \
kvs.c \
kvs_lookup.c \
kvs_getroot.c \
kvs_checkpoint.c \
kvs_checkpoint.h \
kvs_dir.c \
kvs_dir_private.h \
kvs_commit.c \
Expand Down Expand Up @@ -47,6 +49,7 @@ TESTS = \
test_kvs_commit.t \
test_kvs_getroot.t \
test_treeobj.t \
test_kvs_checkpoint.t \
test_kvs_copy.t \
test_kvs_util.t

Expand Down Expand Up @@ -104,6 +107,10 @@ test_treeobj_t_SOURCES = test/treeobj.c
test_treeobj_t_CPPFLAGS = $(test_cppflags)
test_treeobj_t_LDADD = $(test_ldadd)

test_kvs_checkpoint_t_SOURCES = test/kvs_checkpoint.c
test_kvs_checkpoint_t_CPPFLAGS = $(test_cppflags)
test_kvs_checkpoint_t_LDADD = $(test_ldadd)

test_kvs_copy_t_SOURCES = test/kvs_copy.c
test_kvs_copy_t_CPPFLAGS = $(test_cppflags)
test_kvs_copy_t_LDADD = $(test_ldadd)
Expand Down
138 changes: 138 additions & 0 deletions src/common/libkvs/kvs_checkpoint.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/************************************************************\
* Copyright 2022 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <assert.h>
#include <errno.h>
#include <stdlib.h>
#include <jansson.h>
#include <flux/core.h>
#include <time.h>

#include "kvs_checkpoint.h"

flux_future_t *kvs_checkpoint_commit (flux_t *h,
const char *key,
const char *rootref)
{
flux_future_t *f = NULL;
double timestamp;

if (!h || !key || !rootref) {
errno = EINVAL;
return NULL;
}

timestamp = flux_reactor_now (flux_get_reactor (h));

if (!(f = flux_rpc_pack (h,
"kvs-checkpoint.put",
0,
0,
"{s:s s:{s:i s:s s:f}}",
"key",
key,
"value",
"version", 1,
"rootref", rootref,
"timestamp", timestamp)))
return NULL;

return f;
}

flux_future_t *kvs_checkpoint_lookup (flux_t *h, const char *key)
{
if (!h || !key) {
errno = EINVAL;
return NULL;
}

return flux_rpc_pack (h,
"kvs-checkpoint.get",
0,
0,
"{s:s}",
"key",
key);
}

int kvs_checkpoint_lookup_get_rootref (flux_future_t *f, const char **rootref)
{
const char *tmp_rootref;
int version;

if (!f || !rootref) {
errno = EINVAL;
return -1;
}

if (flux_rpc_get_unpack (f, "{s:{s:i s:s}}",
"value",
"version", &version,
"rootref", &tmp_rootref) < 0)
return -1;

if (version != 0 && version != 1) {
errno = EINVAL;
return -1;
}

(*rootref) = tmp_rootref;
return 0;
}

/* returns "N/A" if not available */
int kvs_checkpoint_lookup_get_formatted_timestamp (flux_future_t *f,
char *buf,
size_t len)
{
int version;
double timestamp = 0.;

if (!f || !buf) {
errno = EINVAL;
return -1;
}

if (flux_rpc_get_unpack (f, "{s:{s:i s?f}}",
"value",
"version", &version,
"timestamp", &timestamp) < 0)
return -1;

if (version != 0 && version != 1) {
errno = EINVAL;
return -1;
}

if (version == 1) {
time_t sec = timestamp;
struct tm tm;
gmtime_r (&sec, &tm);
if (strftime (buf, len, "%FT%T", &tm) == 0) {
errno = EINVAL;
return -1;
}
}
else { /* version == 0 */
if (snprintf (buf, len, "N/A") >= len) {
errno = EINVAL;
return -1;
}
}
return 0;
}

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
33 changes: 33 additions & 0 deletions src/common/libkvs/kvs_checkpoint.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/************************************************************\
* Copyright 2022 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

#ifndef _KVS_CHECKPOINT_H
#define _KVS_CHECKPOINT_H

#include <flux/core.h>

flux_future_t *kvs_checkpoint_commit (flux_t *h,
const char *key,
const char *rootref);

flux_future_t *kvs_checkpoint_lookup (flux_t *h, const char *key);

int kvs_checkpoint_lookup_get_rootref (flux_future_t *f, const char **rootref);

/* returns "N/A" if not available */
int kvs_checkpoint_lookup_get_formatted_timestamp (flux_future_t *f,
char *buf,
size_t len);

#endif /* !_KVS_CHECKPOINT_H */

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
80 changes: 80 additions & 0 deletions src/common/libkvs/test/kvs_checkpoint.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/************************************************************\
* Copyright 2022 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

#if HAVE_CONFIG_H
#include "config.h"
#endif

#include <string.h>
#include <errno.h>
#include <flux/core.h>

#include "src/common/libflux/flux.h"
#include "kvs_checkpoint.h"
#include "src/common/libtap/tap.h"

void errors (void)
{
flux_future_t *f;
const char *rootref;
char buf[128];

errno = 0;
ok (kvs_checkpoint_commit (NULL, NULL, NULL) == NULL
&& errno == EINVAL,
"kvs_checkpoint_commit fails on bad input");

errno = 0;
ok (kvs_checkpoint_lookup (NULL, NULL) == NULL
&& errno == EINVAL,
"kvs_checkpoint_lookup fails on bad input");

errno = 0;
ok (kvs_checkpoint_lookup_get_rootref (NULL, NULL) < 0
&& errno == EINVAL,
"kvs_checkpoint_lookup_get_rootref fails on bad input");

errno = 0;
ok (kvs_checkpoint_lookup_get_formatted_timestamp (NULL, NULL, 0) < 0
&& errno == EINVAL,
"kvs_checkpoint_lookup_get_formatted_timestamp fails on bad input");

if (!(f = flux_future_create (NULL, NULL)))
BAIL_OUT ("flux_future_create failed");

errno = 0;
ok (kvs_checkpoint_lookup_get_rootref (f, &rootref) < 0
&& errno == EINVAL,
"kvs_checkpoint_lookup_get_rootref fails on unfulfilled future");

errno = 0;
ok (kvs_checkpoint_lookup_get_formatted_timestamp (f, buf, 128) < 0
&& errno == EINVAL,
"kvs_checkpoint_lookup_get_formatted_timestamp "
"fails on unfulfilled future");

flux_future_destroy (f);
}

int main (int argc, char *argv[])
{

plan (NO_PLAN);

errors ();

done_testing();
return (0);
}

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/

0 comments on commit d75f743

Please sign in to comment.