Skip to content

Commit

Permalink
loop: reimplement using mpipe
Browse files Browse the repository at this point in the history
Problem: the loop connector is implemented using flux_msglist,
but mpipe is now the more appropriate container.

Migrate implementation to mpipe.
  • Loading branch information
garlick committed Oct 16, 2023
1 parent e23c943 commit 2c1338a
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions src/common/libflux/connector_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
#include <flux/core.h>

#include "ccan/str/str.h"
#include "mpipe.h"

struct loop_ctx {
flux_t *h;
struct flux_msg_cred cred;
struct flux_msglist *queue;
struct mpipe *queue;
};

static const struct flux_handle_ops handle_ops;
Expand All @@ -33,7 +34,7 @@ static int op_pollevents (void *impl)
struct loop_ctx *ctx = impl;
int e, revents = 0;

if ((e = flux_msglist_pollevents (ctx->queue)) < 0)
if ((e = mpipe_pollevents (ctx->queue)) < 0)
return e;
if (e & POLLIN)
revents |= FLUX_POLLIN;
Expand All @@ -47,38 +48,37 @@ static int op_pollevents (void *impl)
static int op_pollfd (void *impl)
{
struct loop_ctx *ctx = impl;
return flux_msglist_pollfd (ctx->queue);
return mpipe_pollfd (ctx->queue);
}

static int op_send (void *impl, const flux_msg_t *msg, int flags)
{
struct loop_ctx *ctx = impl;
flux_msg_t *cpy;
struct flux_msg_cred cred;
int rc = -1;

if (!(cpy = flux_msg_copy (msg, true)))
goto done;
goto error;
if (flux_msg_get_cred (cpy, &cred) < 0)
goto done;
goto error;
if (cred.userid == FLUX_USERID_UNKNOWN)
cred.userid = ctx->cred.userid;
if (cred.rolemask == FLUX_ROLE_NONE)
cred.rolemask = ctx->cred.rolemask;
if (flux_msg_set_cred (cpy, cred) < 0)
goto done;
if (flux_msglist_append (ctx->queue, cpy) < 0)
goto done;
rc = 0;
done:
goto error;
if (mpipe_enqueue (ctx->queue, &cpy) < 0)
goto error;
return 0;
error:
flux_msg_destroy (cpy);
return rc;
return -1;
}

static flux_msg_t *op_recv (void *impl, int flags)
{
struct loop_ctx *ctx = impl;
flux_msg_t *msg = (flux_msg_t *)flux_msglist_pop (ctx->queue);
flux_msg_t *msg = mpipe_dequeue (ctx->queue);
if (!msg)
errno = EWOULDBLOCK;
return msg;
Expand Down Expand Up @@ -140,7 +140,7 @@ static void op_fini (void *impl)

if (ctx) {
int saved_errno = errno;
flux_msglist_destroy (ctx->queue);
mpipe_destroy (ctx->queue);
free (ctx);
errno = saved_errno;
}
Expand All @@ -152,7 +152,7 @@ flux_t *connector_loop_init (const char *path, int flags, flux_error_t *errp)

if (!(ctx = calloc (1, sizeof (*ctx))))
return NULL;
if (!(ctx->queue = flux_msglist_create ()))
if (!(ctx->queue = mpipe_create (MPIPE_SINGLE_THREAD)))
goto error;
ctx->cred.userid = getuid ();
ctx->cred.rolemask = FLUX_ROLE_OWNER;
Expand Down

0 comments on commit 2c1338a

Please sign in to comment.