diff --git a/src/common/libflux/connector_loop.c b/src/common/libflux/connector_loop.c index 60cc86f77ff7..69bacb07ed73 100644 --- a/src/common/libflux/connector_loop.c +++ b/src/common/libflux/connector_loop.c @@ -19,11 +19,12 @@ #include #include "ccan/str/str.h" +#include "mpipe.h" struct loop_ctx { flux_t *h; struct flux_msg_cred cred; - struct flux_msglist *queue; + struct mpipe *queue; }; static const struct flux_handle_ops handle_ops; @@ -33,7 +34,7 @@ static int op_pollevents (void *impl) struct loop_ctx *ctx = impl; int e, revents = 0; - if ((e = flux_msglist_pollevents (ctx->queue)) < 0) + if ((e = mpipe_pollevents (ctx->queue)) < 0) return e; if (e & POLLIN) revents |= FLUX_POLLIN; @@ -47,7 +48,7 @@ static int op_pollevents (void *impl) static int op_pollfd (void *impl) { struct loop_ctx *ctx = impl; - return flux_msglist_pollfd (ctx->queue); + return mpipe_pollfd (ctx->queue); } static int op_send (void *impl, const flux_msg_t *msg, int flags) @@ -55,30 +56,29 @@ static int op_send (void *impl, const flux_msg_t *msg, int flags) struct loop_ctx *ctx = impl; flux_msg_t *cpy; struct flux_msg_cred cred; - int rc = -1; if (!(cpy = flux_msg_copy (msg, true))) - goto done; + goto error; if (flux_msg_get_cred (cpy, &cred) < 0) - goto done; + goto error; if (cred.userid == FLUX_USERID_UNKNOWN) cred.userid = ctx->cred.userid; if (cred.rolemask == FLUX_ROLE_NONE) cred.rolemask = ctx->cred.rolemask; if (flux_msg_set_cred (cpy, cred) < 0) - goto done; - if (flux_msglist_append (ctx->queue, cpy) < 0) - goto done; - rc = 0; -done: + goto error; + if (mpipe_enqueue (ctx->queue, &cpy) < 0) + goto error; + return 0; +error: flux_msg_destroy (cpy); - return rc; + return -1; } static flux_msg_t *op_recv (void *impl, int flags) { struct loop_ctx *ctx = impl; - flux_msg_t *msg = (flux_msg_t *)flux_msglist_pop (ctx->queue); + flux_msg_t *msg = mpipe_dequeue (ctx->queue); if (!msg) errno = EWOULDBLOCK; return msg; @@ -140,7 +140,7 @@ static void op_fini (void *impl) if (ctx) { int saved_errno = errno; - flux_msglist_destroy (ctx->queue); + mpipe_destroy (ctx->queue); free (ctx); errno = saved_errno; } @@ -152,7 +152,7 @@ flux_t *connector_loop_init (const char *path, int flags, flux_error_t *errp) if (!(ctx = calloc (1, sizeof (*ctx)))) return NULL; - if (!(ctx->queue = flux_msglist_create ())) + if (!(ctx->queue = mpipe_create (MPIPE_SINGLE_THREAD))) goto error; ctx->cred.userid = getuid (); ctx->cred.rolemask = FLUX_ROLE_OWNER;