Skip to content

Commit

Permalink
libflux: add mpipe container
Browse files Browse the repository at this point in the history
Problem: flux_msglist takes references on messages and uses a
zlistx_t container internallly, which requires a heap allocation
on insertion.

Add a new container that takes ownership of messages and uses a
CCAN list internally to allocating memory on insertion.  Limit
interfaces to a queue abstraction (enqueue/dequeue/requeue).

Add some discussion of how edge-triggered polling works since
that's a somewhat uncommon interface.

Add unit test.
  • Loading branch information
garlick committed Oct 16, 2023
1 parent b5645b5 commit c912f84
Show file tree
Hide file tree
Showing 6 changed files with 583 additions and 1 deletion.
9 changes: 8 additions & 1 deletion src/common/libflux/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ libflux_la_SOURCES = \
flog.c \
attr.c \
handle.c \
mpipe.c \
mpipe.h \
connector_loop.c \
connector_interthread.c \
connector_local.c \
Expand Down Expand Up @@ -177,7 +179,8 @@ TESTS = test_message.t \
test_module.t \
test_plugin.t \
test_sync.t \
test_disconnect.t
test_disconnect.t \
test_mpipe.t

test_ldadd = \
$(top_builddir)/src/common/libtestutil/libtestutil.la \
Expand Down Expand Up @@ -301,6 +304,10 @@ test_interthread_t_SOURCES = test/interthread.c
test_interthread_t_CPPFLAGS = $(test_cppflags)
test_interthread_t_LDADD = $(test_ldadd)

test_mpipe_t_SOURCES = test/mpipe.c
test_mpipe_t_CPPFLAGS = $(test_cppflags)
test_mpipe_t_LDADD = $(test_ldadd)

test_module_t_SOURCES = test/module.c
test_module_t_CPPFLAGS = $(test_cppflags)
test_module_t_LDADD = $(test_ldadd)
Expand Down
1 change: 1 addition & 0 deletions src/common/libflux/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ flux_msg_t *flux_msg_create (int type)
if (!(msg = calloc (1, sizeof (*msg))))
return NULL;
list_head_init (&msg->routes);
list_node_init (&msg->list);
msg->proto.type = type;
if (msg_type_is_valid (msg))
msg_setup_type (msg);
Expand Down
1 change: 1 addition & 0 deletions src/common/libflux/message_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ struct flux_msg {
char *lasterr;
struct aux_item *aux;
int refcount;
struct list_node list; // for use by 'mpipe' container only
};

#define msgtype_is_valid(tp) \
Expand Down
257 changes: 257 additions & 0 deletions src/common/libflux/mpipe.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
/************************************************************\
* Copyright 2023 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

/* mpipe.c - reactive, thread-safe, unidirectional message queue */

/* The pollfd/pollevents pattern was borrowed from zeromq's ZMQ_EVENTS/ZMQ_FD,
* described in zmq_getsockopt(3). It is an edge-triggered notification system
* in which the pollfd, a special file descriptor created with eventfd(2),
* can be watched reactively for a POLLIN event, then the actual event on the
* queue is determined by sampling pollevents. The valid pollevents bits are:
*
* POLLIN messages are available to dequeue
* POLLOUT messages may be enqueued (always asserted currently)
*
* The pollevents should not be confused with pollfd events. In pollfd, only
* POLLIN is expected, signaling that one of the bits is newly set in
* pollevents, and used to wake up a reactor loop to service those bits.
*
* "edge-triggered" means that pollfd does not reassert if the reactor handler
* returns with the condition that caused the event still true. In the case
* of mpipe POLLIN events, a handler must dequeue all messages before
* returning, or if fairness is a concern (one message queue starving out other
* reactor handlers), a specialized watcher in the pattern of ev_flux.c or
* ev_zmq.c is needed. ev_zmq.c contains further explanation about that
* technique. When mpipe is used within a connector, the reactive signaling is
* encapsulated in the flux_t handle, so flux_handle_watcher_create(3), based
* on ev_flux.c, already implements a fair handler.
*
* In the current implementation, mqueue size is unlimited, so POLLOUT is
* always asserted in pollevents.
*/

#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <sys/poll.h>
#include <sys/eventfd.h>
#include <unistd.h>
#include <pthread.h>
#include <stdint.h>
#include <errno.h>

#include "ccan/list/list.h"

#include "message.h"
#include "message_private.h" // for access to msg->list

#include "mpipe.h"

struct mpipe {
struct list_head messages;
int pollevents;
int pollfd;
uint64_t event;
pthread_mutex_t lock;
int flags;
};

void mpipe_destroy (struct mpipe *q)
{
if (q) {
int saved_errno = errno;
flux_msg_t *msg;
while ((msg = mpipe_dequeue (q)))
flux_msg_destroy (msg);
if (q->pollfd >= 0)
(void)close (q->pollfd);
if (!(q->flags & MPIPE_SINGLE_THREAD))
pthread_mutex_destroy (&q->lock);
free (q);
errno = saved_errno;
};
}

struct mpipe *mpipe_create (int flags)
{
struct mpipe *q;

if (flags != 0 && flags != MPIPE_SINGLE_THREAD) {
errno = EINVAL;
return NULL;
}
if (!(q = calloc (1, sizeof (*q))))
return NULL;
q->flags = flags;
q->pollfd = -1;
q->pollevents = POLLOUT;
if (!(flags & MPIPE_SINGLE_THREAD))
pthread_mutex_init (&q->lock, NULL);
list_head_init (&q->messages);
return q;
}

static inline void mpipe_lock (struct mpipe *q)
{
if (!(q->flags & MPIPE_SINGLE_THREAD))
pthread_mutex_lock (&q->lock);
}

static inline void mpipe_unlock (struct mpipe *q)
{
if (!(q->flags & MPIPE_SINGLE_THREAD))
pthread_mutex_unlock (&q->lock);
}

// See eventfd(2) for an explanation of how signaling on q->pollfd works
static int mpipe_raise_event (struct mpipe *q)
{
if (q->pollfd >= 0 && q->event == 0) {
q->event = 1;
if (write (q->pollfd, &q->event, sizeof (q->event)) < 0)
return -1;
}
return 0;
}

static int mpipe_clear_event (struct mpipe *q)
{
if (q->pollfd >= 0 && q->event == 1) {
if (read (q->pollfd, &q->event, sizeof (q->event)) < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK)
return -1;
errno = 0;
}
q->event = 0;
}
return 0;
}

bool check_enqueue_args (struct mpipe *q, flux_msg_t **msg)
{
if (!q || !msg || !*msg)
return false;
/* When queue is used as a transport between threads, retaining a
* reference on a message after enqueuing it might result in both threads
* modifying the message simultaneously. Therefore reject the operation
* if references other than the one being transferred are held.
*/
if (!(q->flags & MPIPE_SINGLE_THREAD) && (*msg)->refcount > 1)
return false;
/* A message can only be in one mpipe queue at a time. Reject the
* operation if the list node is not in the state left by list_node_init()
* and list_del_init(), which is n.next == n.prev == n.
*/
if ((*msg)->list.next != &(*msg)->list
|| (*msg)->list.prev != &(*msg)->list)
return false;

return true;
}

int mpipe_enqueue (struct mpipe *q, flux_msg_t **msg)
{
if (!check_enqueue_args (q, msg)) {
errno = EINVAL;
return -1;
}
mpipe_lock (q);
if (!(q->pollevents & POLLIN)) {
q->pollevents |= POLLIN;
if (mpipe_raise_event (q) < 0) {
mpipe_unlock (q);
return -1;
}
}
list_add (&q->messages, &(*msg)->list);
mpipe_unlock (q);
*msg = NULL;
return 0;
}

int mpipe_requeue (struct mpipe *q, flux_msg_t **msg)
{
if (!check_enqueue_args (q, msg)) {
errno = EINVAL;
return -1;
}
mpipe_lock (q);
if (!(q->pollevents & POLLIN)) {
q->pollevents |= POLLIN;
if (mpipe_raise_event (q) < 0) {
mpipe_unlock (q);
return -1;
}
}
list_add_tail (&q->messages, &(*msg)->list);
mpipe_unlock (q);
*msg = NULL;
return 0;
}

flux_msg_t *mpipe_dequeue (struct mpipe *q)
{
if (!q)
return NULL;
mpipe_lock (q);
flux_msg_t *msg = list_tail (&q->messages, struct flux_msg, list);
if (msg) {
list_del_init (&msg->list);
if ((q->pollevents & POLLIN) && list_empty (&q->messages))
q->pollevents &= ~POLLIN;
}
mpipe_unlock (q);
return msg;
}

bool mpipe_empty (struct mpipe *q)
{
if (!q)
return true;
mpipe_lock (q);
bool res = list_empty (&q->messages);
mpipe_unlock (q);
return res;
}

int mpipe_pollfd (struct mpipe *q)
{
if (!q) {
errno = EINVAL;
return -1;
}
int rc;
mpipe_lock (q);
if (q->pollfd < 0) {
q->event = q->pollevents ? 1 : 0;
q->pollfd = eventfd (q->pollevents, EFD_NONBLOCK);
}
rc = q->pollfd;
mpipe_unlock (q);
return rc;
}

int mpipe_pollevents (struct mpipe *q)
{
if (!q) {
errno = EINVAL;
return -1;
}
int rc = -1;
mpipe_lock (q);
if (mpipe_clear_event (q) < 0)
goto done;
rc = q->pollevents;
done:
mpipe_unlock (q);
return rc;
}

// vi:ts=4 sw=4 expandtab
35 changes: 35 additions & 0 deletions src/common/libflux/mpipe.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/************************************************************\
* Copyright 2023 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

#ifndef _FLUX_CORE_MPIPE_H
#define _FLUX_CORE_MPIPE_H

/* If flags contains MPIPE_SINGLE_THREAD, pthread locking is eliminated
* and messages are permitted to be enqueued with a reference count > 1.
*/
enum {
MPIPE_SINGLE_THREAD = 1,
};

struct mpipe *mpipe_create (int flags);
void mpipe_destroy (struct mpipe *q);

int mpipe_enqueue (struct mpipe *q, flux_msg_t **msg);
int mpipe_requeue (struct mpipe *q, flux_msg_t **msg);
flux_msg_t *mpipe_dequeue (struct mpipe *q);

int mpipe_pollfd (struct mpipe *q);
int mpipe_pollevents (struct mpipe *q);

bool mpipe_empty (struct mpipe *q);

#endif // !_FLUX_CORE_MPIPE_H

// vi:ts=4 sw=4 expandtab
Loading

0 comments on commit c912f84

Please sign in to comment.