diff --git a/src/common/libflux/handle.c b/src/common/libflux/handle.c index 4382c3525ed3..dea44e0d33ce 100644 --- a/src/common/libflux/handle.c +++ b/src/common/libflux/handle.c @@ -44,6 +44,7 @@ #include "msg_handler.h" // for flux_sleep_on () #include "flog.h" #include "conf.h" +#include "mpipe.h" #include "message_private.h" // to check msg refcount in flux_send_new () #if HAVE_CALIPER @@ -73,7 +74,7 @@ struct flux_handle { const struct flux_handle_ops *ops; void *impl; void *dso; - struct flux_msglist *queue; + struct mpipe *queue; int pollfd; struct tagpool *tagpool; @@ -420,7 +421,7 @@ flux_t *flux_handle_create (void *impl, goto error; tagpool_set_grow_cb (h->tagpool, tagpool_grow_notify, h); if (!(flags & FLUX_O_NOREQUEUE)) { - if (!(h->queue = flux_msglist_create ())) + if (!(h->queue = mpipe_create (MPIPE_SINGLE_THREAD))) goto error; } if ((flags & FLUX_O_RPCTRACK)) { @@ -546,7 +547,7 @@ void flux_handle_destroy (flux_t *h) if (h->dso) dlclose (h->dso); #endif - flux_msglist_destroy (h->queue); + mpipe_destroy (h->queue); } free (h); errno = saved_errno; @@ -886,8 +887,8 @@ static flux_msg_t *flux_recv_any (flux_t *h, int flags) flux_msg_t *msg; if (!(h->flags & FLUX_O_NOREQUEUE)) { - if (flux_msglist_count (h->queue) > 0) - return (flux_msg_t *)flux_msglist_pop (h->queue); + if (!mpipe_empty (h->queue)) + return mpipe_dequeue (h->queue); } if (!h->ops->recv) { errno = ENOSYS; @@ -968,6 +969,10 @@ flux_msg_t *flux_recv (flux_t *h, struct flux_match match, int flags) */ int flux_requeue (flux_t *h, const flux_msg_t *msg, int flags) { + if (!h || !msg) { + errno = EINVAL; + return -1; + } h = lookup_clone_ancestor (h); int rc; @@ -976,10 +981,13 @@ int flux_requeue (flux_t *h, const flux_msg_t *msg, int flags) errno = EINVAL; return -1; } + flux_msg_incref (msg); if (flags == FLUX_RQ_TAIL) - rc = flux_msglist_append (h->queue, msg); + rc = mpipe_enqueue (h->queue, (flux_msg_t **)&msg); else - rc = flux_msglist_push (h->queue, msg); + rc = mpipe_requeue (h->queue, (flux_msg_t **)&msg); + if (rc < 0) + flux_msg_decref (msg); return rc; } @@ -1002,7 +1010,7 @@ int flux_pollfd (flux_t *h) if ((h->pollfd = epoll_create1 (EPOLL_CLOEXEC)) < 0) goto error; /* add queue pollfd */ - ev.data.fd = flux_msglist_pollfd (h->queue); + ev.data.fd = mpipe_pollfd (h->queue); if (ev.data.fd < 0) goto error; if (epoll_ctl (h->pollfd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) @@ -1048,7 +1056,7 @@ int flux_pollevents (flux_t *h) } if (!(h->flags & FLUX_O_NOREQUEUE)) { /* get queue events */ - if ((e = flux_msglist_pollevents (h->queue)) < 0) + if ((e = mpipe_pollevents (h->queue)) < 0) return -1; if ((e & POLLIN)) events |= FLUX_POLLIN; diff --git a/src/common/libflux/test/handle.c b/src/common/libflux/test/handle.c index 3abb07327913..886898eeafb3 100644 --- a/src/common/libflux/test/handle.c +++ b/src/common/libflux/test/handle.c @@ -150,6 +150,7 @@ int main (int argc, char *argv[]) flux_t *h; char *s; flux_msg_t *msg; + flux_msg_t *msg2; const char *topic; uint32_t matchtag; @@ -267,12 +268,33 @@ int main (int argc, char *argv[]) ok ((flux_pollevents (h) & FLUX_POLLIN) == 0, "flux_pollevents shows FLUX_POLLIN clear after queue is emptied"); - /* flux_requeue bad flags */ + /* flux_requeue bad args */ + errno = 0; + ok (flux_requeue (NULL, msg, FLUX_RQ_HEAD) < 0 && errno == EINVAL, + "flux_requeue h=NULL fails with EINVAL"); + errno = 0; + ok (flux_requeue (h, NULL, FLUX_RQ_HEAD) < 0 && errno == EINVAL, + "flux_requeue msg=NULL fails with EINVAL"); errno = 0; ok (flux_requeue (h, msg, 0) < 0 && errno == EINVAL, "flux_requeue fails with EINVAL if HEAD|TAIL unspecified"); flux_msg_destroy (msg); + /* requeue preserves aux container (kvs needs this) */ + if (!(msg = flux_request_encode ("foo", NULL))) + BAIL_OUT ("couldn't encode request"); + if (flux_msg_aux_set (msg, "fubar", "xyz", NULL) < 0) + BAIL_OUT ("couldn't attach something to message aux container"); + ok (flux_requeue (h, msg, FLUX_RQ_HEAD) == 0, + "flux_requeue works"); + msg2 = flux_recv (h, FLUX_MATCH_ANY, 0); + ok (msg2 == msg, + "flux_recv returned requeued message and it has the same address"); + ok (flux_msg_aux_get (msg2, "fubar") != NULL, + "and aux was preserved"); + flux_msg_decref (msg); + flux_msg_decref (msg2); + /* flux_requeue: add foo, bar to HEAD; then receive bar, foo */ if (!(msg = flux_request_encode ("foo", NULL))) BAIL_OUT ("couldn't encode request");