Skip to content

Commit

Permalink
cmd/flux-kvs: Remove watch subcommand
Browse files Browse the repository at this point in the history
  • Loading branch information
chu11 committed Feb 13, 2019
1 parent 9ef2de6 commit c51b5b3
Showing 1 changed file with 0 additions and 225 deletions.
225 changes: 0 additions & 225 deletions src/cmd/flux-kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ int cmd_readlink (optparse_t *p, int argc, char **argv);
int cmd_mkdir (optparse_t *p, int argc, char **argv);
int cmd_version (optparse_t *p, int argc, char **argv);
int cmd_wait (optparse_t *p, int argc, char **argv);
int cmd_watch (optparse_t *p, int argc, char **argv);
int cmd_dropcache (optparse_t *p, int argc, char **argv);
int cmd_copy (optparse_t *p, int argc, char **argv);
int cmd_move (optparse_t *p, int argc, char **argv);
Expand Down Expand Up @@ -164,25 +163,6 @@ static struct optparse_option ls_opts[] = {
OPTPARSE_TABLE_END
};

static struct optparse_option watch_opts[] = {
{ .name = "namespace", .key = 'N', .has_arg = 1,
.usage = "Specify KVS namespace to use.",
},
{ .name = "recursive", .key = 'R', .has_arg = 0,
.usage = "Recursively display keys under subdirectories",
},
{ .name = "directory", .key = 'd', .has_arg = 0,
.usage = "List directory entries and not values",
},
{ .name = "current", .key = 'o', .has_arg = 0,
.usage = "Output current value before changes",
},
{ .name = "count", .key = 'c', .has_arg = 1,
.usage = "Display at most count changes",
},
OPTPARSE_TABLE_END
};

static struct optparse_option dropcache_opts[] = {
{ .name = "all", .key = 'a', .has_arg = 0,
.usage = "Drop KVS across all ranks",
Expand Down Expand Up @@ -331,13 +311,6 @@ static struct optparse_subcommand subcommands[] = {
0,
dropcache_opts
},
{ "watch",
"[-N ns] [-R] [-d] [-o] [-c count] key",
"Watch key and output changes",
cmd_watch,
OPTPARSE_OPT_HIDDEN,
watch_opts
},
{ "version",
"[-N ns]",
"Display curent KVS version",
Expand Down Expand Up @@ -1127,204 +1100,6 @@ int cmd_wait (optparse_t *p, int argc, char **argv)
return (0);
}

#define WATCH_DIR_SEPARATOR "======================"

static void watch_dump_key (const char *json_str,
const char *arg,
bool *prev_output_iskey)
{
output_key_json_str (NULL, json_str, arg);
fflush (stdout);
*prev_output_iskey = true;
}

static void watch_dump_kvsdir (flux_kvsdir_t *dir, const char *ns,
bool Ropt, bool dopt, const char *arg) {
if (!dir) {
output_key_json_str (NULL, NULL, arg);
printf ("%s\n", WATCH_DIR_SEPARATOR);
return;
}

dump_kvs_dir (dir, 0, ns, Ropt, dopt);
printf ("%s\n", WATCH_DIR_SEPARATOR);
fflush (stdout);
}

int watch_kvs_lookup_wrapper (flux_t *h, const char *ns, const char *key,
char **valp)
{
flux_future_t *f = NULL;
const char *tmp;
int rc = -1;

if (!(f = flux_kvs_lookup (h, ns, 0, key)))
goto done;
if (flux_kvs_lookup_get (f, &tmp) < 0)
goto done;
if (!(*valp = strdup (tmp))) {
errno = ENOMEM;
goto done;
}
rc = 0;
done:
flux_future_destroy (f);
return rc;
}

int watch_kvs_dir_wrapper (flux_t *h, const char *ns, const char *key,
flux_kvsdir_t **dirp)
{
flux_future_t *f = NULL;
const flux_kvsdir_t *tmp;
int rc = -1;

if (!(f = flux_kvs_lookup (h, ns, FLUX_KVS_READDIR, key)))
goto done;
if (flux_kvs_lookup_get_dir (f, &tmp) < 0)
goto done;
if (!((*dirp) = flux_kvsdir_copy (tmp))) {
goto done;
}
rc = 0;
done:
flux_future_destroy (f);
return rc;
}

int cmd_watch (optparse_t *p, int argc, char **argv)
{
flux_t *h;
flux_kvsdir_t *dir = NULL;
char *json_str = NULL;
char *key;
int count;
const char *ns = NULL;
bool Ropt;
bool dopt;
bool oopt;
bool isdir = false;
bool prev_output_iskey = false;
int optindex;
int rc;

h = (flux_t *)optparse_get_data (p, "flux_handle");

optindex = optparse_option_index (p);

if ((optindex - argc) == 0) {
optparse_print_usage (p);
exit (1);
}
if (optindex != (argc - 1))
log_msg_exit ("watch: specify one key");

ns = optparse_get_str (p, "namespace", NULL);
Ropt = optparse_hasopt (p, "recursive");
dopt = optparse_hasopt (p, "directory");
oopt = optparse_hasopt (p, "current");
count = optparse_get_int (p, "count", -1);

key = argv[optindex];

rc = watch_kvs_lookup_wrapper (h, ns, key, &json_str);
if (rc < 0 && (errno != ENOENT && errno != EISDIR))
log_err_exit ("%s", key);

/* key is a directory, setup for dir logic appropriately */
if (rc < 0 && errno == EISDIR) {
rc = watch_kvs_dir_wrapper (h, ns, key, &dir);
if (rc < 0 && errno != ENOENT)
log_err_exit ("%s", key);
isdir = true;
free (json_str);
json_str = NULL;
}

if (oopt) {
if (isdir)
watch_dump_kvsdir (dir, ns, Ropt, dopt, key);
else
watch_dump_key (json_str, key, &prev_output_iskey);
}

while (count && (rc == 0 || (rc < 0 && errno == ENOENT))) {
if (isdir) {
rc = flux_kvs_watch_once_dir (h, ns, &dir, "%s", key);
if (rc < 0 && (errno != ENOENT && errno != ENOTDIR)) {
printf ("%s: %s\n", key, flux_strerror (errno));
if (dir)
flux_kvsdir_destroy (dir);
dir = NULL;
}
else if (rc < 0 && errno == ENOENT) {
if (dir)
flux_kvsdir_destroy (dir);
dir = NULL;
watch_dump_kvsdir (dir, ns, Ropt, dopt, key);
}
else if (!rc) {
watch_dump_kvsdir (dir, ns, Ropt, dopt, key);
}
else { /* rc < 0 && errno == ENOTDIR */
/* We were watching a dir that is now a key, need to
* reset logic to the 'key' part of this loop */
isdir = false;
if (dir)
flux_kvsdir_destroy (dir);
dir = NULL;

rc = watch_kvs_lookup_wrapper (h, ns, key, &json_str);
if (rc < 0 && errno != ENOENT)
printf ("%s: %s\n", key, flux_strerror (errno));
else
watch_dump_key (json_str, key, &prev_output_iskey);
}
}
else {
rc = flux_kvs_watch_once (h, ns, key, &json_str);
if (rc < 0 && (errno != ENOENT && errno != EISDIR)) {
printf ("%s: %s\n", key, flux_strerror (errno));
free (json_str);
json_str = NULL;
}
else if (rc < 0 && errno == ENOENT) {
free (json_str);
json_str = NULL;
watch_dump_key (NULL, key, &prev_output_iskey);
}
else if (!rc) {
watch_dump_key (json_str, key, &prev_output_iskey);
}
else { /* rc < 0 && errno == EISDIR */
/* We were watching a key that is now a dir. So we
* have to move to the directory branch of this loop.
*/
isdir = true;
free (json_str);
json_str = NULL;

/* Output dir separator from prior key */
if (prev_output_iskey) {
printf ("%s\n", WATCH_DIR_SEPARATOR);
prev_output_iskey = false;
}

rc = watch_kvs_dir_wrapper (h, ns, key, &dir);
if (rc < 0 && errno != ENOENT)
printf ("%s: %s\n", key, flux_strerror (errno));
else /* rc == 0 || (rc < 0 && errno == ENOENT) */
watch_dump_kvsdir (dir, ns, Ropt, dopt, key);
}
}
count--;
}
if (dir)
flux_kvsdir_destroy (dir);
free (json_str);
return (0);
}

int cmd_dropcache (optparse_t *p, int argc, char **argv)
{
flux_t *h;
Expand Down

0 comments on commit c51b5b3

Please sign in to comment.