Skip to content

Commit

Permalink
libflux: reimplement flux_requeue() using mpipe
Browse files Browse the repository at this point in the history
Problem: flux_requeue() is implemented using flux_msglist, but mpipe
is now the more appropriate container.

Migrate implementation to mpipe.

Check flux_requeue() arguments a bit more thorougly and update unit
test to exercise this.

Add a unit test that ensures a message aux container survives requeue/recv
since the KVS relies on it to cache lookup state between reactor loops.
  • Loading branch information
garlick committed Oct 16, 2023
1 parent a8afa7a commit e23c943
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 10 deletions.
26 changes: 17 additions & 9 deletions src/common/libflux/handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "msg_handler.h" // for flux_sleep_on ()
#include "flog.h"
#include "conf.h"
#include "mpipe.h"
#include "message_private.h" // to check msg refcount in flux_send_new ()

#if HAVE_CALIPER
Expand Down Expand Up @@ -73,7 +74,7 @@ struct flux_handle {
const struct flux_handle_ops *ops;
void *impl;
void *dso;
struct flux_msglist *queue;
struct mpipe *queue;
int pollfd;

struct tagpool *tagpool;
Expand Down Expand Up @@ -420,7 +421,7 @@ flux_t *flux_handle_create (void *impl,
goto error;
tagpool_set_grow_cb (h->tagpool, tagpool_grow_notify, h);
if (!(flags & FLUX_O_NOREQUEUE)) {
if (!(h->queue = flux_msglist_create ()))
if (!(h->queue = mpipe_create (MPIPE_SINGLE_THREAD)))
goto error;
}
if ((flags & FLUX_O_RPCTRACK)) {
Expand Down Expand Up @@ -546,7 +547,7 @@ void flux_handle_destroy (flux_t *h)
if (h->dso)
dlclose (h->dso);
#endif
flux_msglist_destroy (h->queue);
mpipe_destroy (h->queue);
}
free (h);
errno = saved_errno;
Expand Down Expand Up @@ -886,8 +887,8 @@ static flux_msg_t *flux_recv_any (flux_t *h, int flags)
flux_msg_t *msg;

if (!(h->flags & FLUX_O_NOREQUEUE)) {
if (flux_msglist_count (h->queue) > 0)
return (flux_msg_t *)flux_msglist_pop (h->queue);
if (!mpipe_empty (h->queue))
return mpipe_dequeue (h->queue);
}
if (!h->ops->recv) {
errno = ENOSYS;
Expand Down Expand Up @@ -968,6 +969,10 @@ flux_msg_t *flux_recv (flux_t *h, struct flux_match match, int flags)
*/
int flux_requeue (flux_t *h, const flux_msg_t *msg, int flags)
{
if (!h || !msg) {
errno = EINVAL;
return -1;
}
h = lookup_clone_ancestor (h);
int rc;

Expand All @@ -976,10 +981,13 @@ int flux_requeue (flux_t *h, const flux_msg_t *msg, int flags)
errno = EINVAL;
return -1;
}
flux_msg_incref (msg);
if (flags == FLUX_RQ_TAIL)
rc = flux_msglist_append (h->queue, msg);
rc = mpipe_enqueue (h->queue, (flux_msg_t **)&msg);
else
rc = flux_msglist_push (h->queue, msg);
rc = mpipe_requeue (h->queue, (flux_msg_t **)&msg);
if (rc < 0)
flux_msg_decref (msg);
return rc;
}

Expand All @@ -1002,7 +1010,7 @@ int flux_pollfd (flux_t *h)
if ((h->pollfd = epoll_create1 (EPOLL_CLOEXEC)) < 0)
goto error;
/* add queue pollfd */
ev.data.fd = flux_msglist_pollfd (h->queue);
ev.data.fd = mpipe_pollfd (h->queue);
if (ev.data.fd < 0)
goto error;
if (epoll_ctl (h->pollfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0)
Expand Down Expand Up @@ -1048,7 +1056,7 @@ int flux_pollevents (flux_t *h)
}
if (!(h->flags & FLUX_O_NOREQUEUE)) {
/* get queue events */
if ((e = flux_msglist_pollevents (h->queue)) < 0)
if ((e = mpipe_pollevents (h->queue)) < 0)
return -1;
if ((e & POLLIN))
events |= FLUX_POLLIN;
Expand Down
24 changes: 23 additions & 1 deletion src/common/libflux/test/handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ int main (int argc, char *argv[])
flux_t *h;
char *s;
flux_msg_t *msg;
flux_msg_t *msg2;
const char *topic;
uint32_t matchtag;

Expand Down Expand Up @@ -267,12 +268,33 @@ int main (int argc, char *argv[])
ok ((flux_pollevents (h) & FLUX_POLLIN) == 0,
"flux_pollevents shows FLUX_POLLIN clear after queue is emptied");

/* flux_requeue bad flags */
/* flux_requeue bad args */
errno = 0;
ok (flux_requeue (NULL, msg, FLUX_RQ_HEAD) < 0 && errno == EINVAL,
"flux_requeue h=NULL fails with EINVAL");
errno = 0;
ok (flux_requeue (h, NULL, FLUX_RQ_HEAD) < 0 && errno == EINVAL,
"flux_requeue msg=NULL fails with EINVAL");
errno = 0;
ok (flux_requeue (h, msg, 0) < 0 && errno == EINVAL,
"flux_requeue fails with EINVAL if HEAD|TAIL unspecified");
flux_msg_destroy (msg);

/* requeue preserves aux container (kvs needs this) */
if (!(msg = flux_request_encode ("foo", NULL)))
BAIL_OUT ("couldn't encode request");
if (flux_msg_aux_set (msg, "fubar", "xyz", NULL) < 0)
BAIL_OUT ("couldn't attach something to message aux container");
ok (flux_requeue (h, msg, FLUX_RQ_HEAD) == 0,
"flux_requeue works");
msg2 = flux_recv (h, FLUX_MATCH_ANY, 0);
ok (msg2 == msg,
"flux_recv returned requeued message and it has the same address");
ok (flux_msg_aux_get (msg2, "fubar") != NULL,
"and aux was preserved");
flux_msg_decref (msg);
flux_msg_decref (msg2);

/* flux_requeue: add foo, bar to HEAD; then receive bar, foo */
if (!(msg = flux_request_encode ("foo", NULL)))
BAIL_OUT ("couldn't encode request");
Expand Down

0 comments on commit e23c943

Please sign in to comment.