Skip to content

Commit

Permalink
interthread: reimplement using mpipe
Browse files Browse the repository at this point in the history
Problem: the interthread connector is implemented using flux_msglist
with thread-safe accessors, but mpipe is now the more appropriate
container.

Migrate implementation to mpipe.
  • Loading branch information
garlick committed Oct 16, 2023
1 parent c912f84 commit a8afa7a
Showing 1 changed file with 13 additions and 84 deletions.
97 changes: 13 additions & 84 deletions src/common/libflux/connector_interthread.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,11 @@
#include "ccan/list/list.h"
#include "ccan/str/str.h"
#include "message_private.h" // for access to msg->aux

struct msglist_safe {
struct flux_msglist *queue;
pthread_mutex_t lock;
};
#include "mpipe.h"

struct channel {
char *name;
struct msglist_safe *pair[2];
struct mpipe *pair[2];
int refcount; // max of 2
struct list_node list;
};
Expand All @@ -53,8 +49,8 @@ struct interthread_ctx {
struct flux_msg_cred cred;
char *router;
struct channel *chan;
struct msglist_safe *send; // refers to ctx->chan->pair[x]
struct msglist_safe *recv; // refers to ctx->chan->pair[y]
struct mpipe *send; // refers to ctx->chan->pair[x]
struct mpipe *recv; // refers to ctx->chan->pair[y]
};

/* Global state.
Expand All @@ -65,79 +61,12 @@ static pthread_mutex_t channels_lock = PTHREAD_MUTEX_INITIALIZER;

static const struct flux_handle_ops handle_ops;


static void msglist_safe_destroy (struct msglist_safe *ml)
{
if (ml) {
int saved_errno = errno;
pthread_mutex_destroy (&ml->lock);
flux_msglist_destroy (ml->queue);
free (ml);
errno = saved_errno;
};
}

static struct msglist_safe *msglist_safe_create (void)
{
struct msglist_safe *ml;

if (!(ml = calloc (1, sizeof (*ml))))
return NULL;
pthread_mutex_init (&ml->lock, NULL);
if (!(ml->queue = flux_msglist_create ())) {
msglist_safe_destroy (ml);
return NULL;
}
return ml;
}

static int msglist_safe_append_new (struct msglist_safe *ml,
flux_msg_t **msg)
{
pthread_mutex_lock (&ml->lock);
int rc = flux_msglist_append (ml->queue, *msg);
/* flux_msglist_append() takes a reference on *msg on success, so we must
* decref under the lock before setting *msg to NULL.
*/
if (rc == 0) {
flux_msg_decref (*msg);
*msg = NULL;
}
pthread_mutex_unlock (&ml->lock);
return rc;
}

static flux_msg_t *msglist_safe_pop (struct msglist_safe *ml)
{

pthread_mutex_lock (&ml->lock);
flux_msg_t *msg = (flux_msg_t *)flux_msglist_pop (ml->queue);
pthread_mutex_unlock (&ml->lock);
return msg;
}

static int msglist_safe_pollfd (struct msglist_safe *ml)
{
pthread_mutex_lock (&ml->lock);
int rc = flux_msglist_pollfd (ml->queue);
pthread_mutex_unlock (&ml->lock);
return rc;
}

static int msglist_safe_pollevents (struct msglist_safe *ml)
{
pthread_mutex_lock (&ml->lock);
int rc = flux_msglist_pollevents (ml->queue);
pthread_mutex_unlock (&ml->lock);
return rc;
}

static void channel_destroy (struct channel *chan)
{
if (chan) {
int saved_errno = errno;
msglist_safe_destroy (chan->pair[0]);
msglist_safe_destroy (chan->pair[1]);
mpipe_destroy (chan->pair[0]);
mpipe_destroy (chan->pair[1]);
free (chan->name);
free (chan);
errno = saved_errno;
Expand All @@ -150,8 +79,8 @@ static struct channel *channel_create (const char *name)

if (!(chan = calloc (1, sizeof (*chan)))
|| !(chan->name = strdup (name))
|| !(chan->pair[0] = msglist_safe_create ())
|| !(chan->pair[1] = msglist_safe_create ()))
|| !(chan->pair[0] = mpipe_create (0))
|| !(chan->pair[1] = mpipe_create (0)))
goto error;
list_node_init (&chan->list);
return chan;
Expand Down Expand Up @@ -225,7 +154,7 @@ static int op_pollevents (void *impl)
struct interthread_ctx *ctx = impl;
int e, revents = 0;

e = msglist_safe_pollevents (ctx->recv);
e = mpipe_pollevents (ctx->recv);
if (e & POLLIN)
revents |= FLUX_POLLIN;
if (e & POLLOUT)
Expand All @@ -238,7 +167,7 @@ static int op_pollevents (void *impl)
static int op_pollfd (void *impl)
{
struct interthread_ctx *ctx = impl;
return msglist_safe_pollfd (ctx->recv);
return mpipe_pollfd (ctx->recv);
}

static int router_process (flux_msg_t *msg, const char *name)
Expand Down Expand Up @@ -283,7 +212,7 @@ static int op_send_new (void *impl, flux_msg_t **msg, int flags)
* so it shouldn't survive transit of this kind either.
*/
aux_destroy (&(*msg)->aux);
return msglist_safe_append_new (ctx->send, msg);
return mpipe_enqueue (ctx->send, msg);
}

static int op_send (void *impl, const flux_msg_t *msg, int flags)
Expand All @@ -304,14 +233,14 @@ static flux_msg_t *op_recv (void *impl, int flags)
flux_msg_t *msg;

do {
msg = msglist_safe_pop (ctx->recv);
msg = mpipe_dequeue (ctx->recv);
if (!msg) {
if ((flags & FLUX_O_NONBLOCK)) {
errno = EWOULDBLOCK;
return NULL;
}
struct pollfd pfd = {
.fd = msglist_safe_pollfd (ctx->recv),
.fd = mpipe_pollfd (ctx->recv),
.events = POLLIN,
.revents = 0,
};
Expand Down

0 comments on commit a8afa7a

Please sign in to comment.