Skip to content

Commit

Permalink
Merge pull request #5559 from garlick/broker_nocopy
Browse files Browse the repository at this point in the history
broker: eliminate some message copies
  • Loading branch information
mergify[bot] authored Nov 22, 2023
2 parents fcee970 + 0c336eb commit 420e8e7
Show file tree
Hide file tree
Showing 12 changed files with 388 additions and 269 deletions.
336 changes: 196 additions & 140 deletions src/broker/broker.c

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/broker/broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
struct broker {
void *zctx;
flux_t *h;
flux_t *h_internal;
flux_watcher_t *w_internal;
flux_reactor_t *reactor;
optparse_t *opts;

Expand Down
8 changes: 4 additions & 4 deletions src/broker/modhash.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,22 @@ struct modhash {
zhash_t *zh_byuuid;
};

int modhash_response_sendmsg (modhash_t *mh, const flux_msg_t *msg)
int modhash_response_sendmsg_new (modhash_t *mh, flux_msg_t **msg)
{
const char *uuid;
module_t *p;

if (!msg)
if (!*msg)
return 0;
if (!(uuid = flux_msg_route_last (msg))) {
if (!(uuid = flux_msg_route_last (*msg))) {
errno = EPROTO;
return -1;
}
if (!(p = zhash_lookup (mh->zh_byuuid, uuid))) {
errno = ENOSYS;
return -1;
}
return module_sendmsg (p, msg);
return module_sendmsg_new (p, msg);
}

void modhash_add (modhash_t *mh, module_t *p)
Expand Down
2 changes: 1 addition & 1 deletion src/broker/modhash.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ int modhash_event_mcast (modhash_t *mh, const flux_msg_t *msg);
/* Send a response message to the module whose uuid matches the
* next hop in the routing stack.
*/
int modhash_response_sendmsg (modhash_t *mh, const flux_msg_t *msg);
int modhash_response_sendmsg_new (modhash_t *mh, flux_msg_t **msg);

/* Find a module matching 'uuid'.
*/
Expand Down
16 changes: 10 additions & 6 deletions src/broker/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -420,15 +420,15 @@ flux_msg_t *module_recvmsg (module_t *p)
return flux_recv (p->h_broker, FLUX_MATCH_ANY, FLUX_O_NONBLOCK);
}

int module_sendmsg (module_t *p, const flux_msg_t *msg)
int module_sendmsg_new (module_t *p, flux_msg_t **msg)
{
int type;
const char *topic;

if (!msg)
if (!msg || !*msg)
return 0;
if (flux_msg_get_type (msg, &type) < 0
|| flux_msg_get_topic (msg, &topic) < 0)
if (flux_msg_get_type (*msg, &type) < 0
|| flux_msg_get_topic (*msg, &topic) < 0)
return -1;
/* Muted modules only accept response to broker.module-status
*/
Expand All @@ -439,7 +439,7 @@ int module_sendmsg (module_t *p, const flux_msg_t *msg)
return -1;
}
}
return flux_send (p->h_broker, msg, 0);
return flux_send_new (p->h_broker, msg, 0);
}

int module_disconnect_arm (module_t *p,
Expand Down Expand Up @@ -633,8 +633,12 @@ int module_event_cast (module_t *p, const flux_msg_t *msg)
if (flux_msg_get_topic (msg, &topic) < 0)
return -1;
if (subhash_topic_match (p->sub, topic)) {
if (module_sendmsg (p, msg) < 0)
flux_msg_t *cpy;
if (!(cpy = flux_msg_copy (msg, true))
|| module_sendmsg_new (p, &cpy) < 0) {
flux_msg_decref (cpy);
return -1;
}
}
return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion src/broker/module.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void module_set_poller_cb (module_t *p, modpoller_cb_f cb, void *arg);
/* Send/recv a message for to/from a specific module.
*/
flux_msg_t *module_recvmsg (module_t *p);
int module_sendmsg (module_t *p, const flux_msg_t *msg);
int module_sendmsg_new (module_t *p, flux_msg_t **msg);

/* Pass module's requests through this function to enable disconnect
* messages to be sent when the module is unloaded. The callback will
Expand Down
159 changes: 84 additions & 75 deletions src/broker/overlay.c
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ struct overlay {
struct flux_msglist *health_requests;
};

static void overlay_mcast_child (struct overlay *ov, const flux_msg_t *msg);
static void overlay_mcast_child (struct overlay *ov, flux_msg_t *msg);
static int overlay_sendmsg_child (struct overlay *ov, const flux_msg_t *msg);
static int overlay_sendmsg_parent (struct overlay *ov, const flux_msg_t *msg);
static void hello_response_handler (struct overlay *ov, const flux_msg_t *msg);
Expand Down Expand Up @@ -572,20 +572,18 @@ static int overlay_control_child (struct overlay *ov,
return -1;
}

int overlay_sendmsg (struct overlay *ov,
const flux_msg_t *msg,
overlay_where_t where)
int overlay_sendmsg_new (struct overlay *ov,
flux_msg_t **msg,
overlay_where_t where)
{
int type;
uint8_t flags;
flux_msg_t *cpy = NULL;
const char *uuid;
uint32_t nodeid;
struct child *child = NULL;
int rc;

if (flux_msg_get_type (msg, &type) < 0
|| flux_msg_get_flags (msg, &flags) < 0)
if (flux_msg_get_type (*msg, &type) < 0
|| flux_msg_get_flags (*msg, &flags) < 0)
return -1;
switch (type) {
case FLUX_MSGTYPE_REQUEST:
Expand All @@ -595,47 +593,40 @@ int overlay_sendmsg (struct overlay *ov,
* select the peer, and our uuid remains as part of the source addr.
*/
if (where == OVERLAY_ANY) {
if (flux_msg_get_nodeid (msg, &nodeid) < 0)
goto error;
if (flux_msg_get_nodeid (*msg, &nodeid) < 0)
return -1;
if ((flags & FLUX_MSGFLAG_UPSTREAM) && nodeid == ov->rank)
where = OVERLAY_UPSTREAM;
else {
if ((child = child_lookup_route (ov, nodeid))) {
if (!subtree_is_online (child->status)) {
errno = EHOSTUNREACH;
goto error;
return -1;
}
if (!(cpy = flux_msg_copy (msg, true)))
goto error;
if (flux_msg_route_push (cpy, ov->uuid) < 0)
goto error;
if (flux_msg_route_push (cpy, child->uuid) < 0)
goto error;
msg = cpy;
if (flux_msg_route_push (*msg, ov->uuid) < 0
|| flux_msg_route_push (*msg, child->uuid) < 0)
return -1;
where = OVERLAY_DOWNSTREAM;
}
else
where = OVERLAY_UPSTREAM;
}
}
if (where == OVERLAY_UPSTREAM) {
rc = overlay_sendmsg_parent (ov, msg);
if (rc == 0)
rpc_track_update (ov->parent.tracker, msg);
if (overlay_sendmsg_parent (ov, *msg) < 0)
return -1;
rpc_track_update (ov->parent.tracker, *msg);
}
else {
rc = overlay_sendmsg_child (ov, msg);
if (rc == 0) {
if (!child) {
if ((uuid = flux_msg_route_last (msg)))
child = child_lookup_online (ov, ov->uuid);
}
if (child)
rpc_track_update (child->tracker, msg);
if (overlay_sendmsg_child (ov, *msg) < 0)
return -1;
if (!child) {
if ((uuid = flux_msg_route_last (*msg)))
child = child_lookup_online (ov, ov->uuid);
}
if (child)
rpc_track_update (child->tracker, *msg);
}
if (rc < 0)
goto error;
break;
case FLUX_MSGTYPE_RESPONSE:
/* Assume if next route matches parent, the message goes upstream;
Expand All @@ -644,46 +635,57 @@ int overlay_sendmsg (struct overlay *ov,
*/
if (where == OVERLAY_ANY) {
if (ov->rank > 0
&& (uuid = flux_msg_route_last (msg)) != NULL
&& (uuid = flux_msg_route_last (*msg)) != NULL
&& streq (uuid, ov->parent.uuid))
where = OVERLAY_UPSTREAM;
else
where = OVERLAY_DOWNSTREAM;
}
if (where == OVERLAY_UPSTREAM)
rc = overlay_sendmsg_parent (ov, msg);
else
rc = overlay_sendmsg_child (ov, msg);
if (rc < 0)
goto error;
if (where == OVERLAY_UPSTREAM) {
if (overlay_sendmsg_parent (ov, *msg) < 0)
return -1;
}
else {
if (overlay_sendmsg_child (ov, *msg) < 0)
return -1;
}
break;
case FLUX_MSGTYPE_EVENT:
if (where == OVERLAY_DOWNSTREAM || where == OVERLAY_ANY)
overlay_mcast_child (ov, msg);
overlay_mcast_child (ov, *msg);
else {
/* N.B. add route delimiter if needed to pass unpublished
* event message upstream through router socket.
*/
if (!(flags & FLUX_MSGFLAG_ROUTE)) {
if (!(cpy = flux_msg_copy (msg, true)))
goto error;
flux_msg_route_enable (cpy);
msg = cpy;
flux_msg_route_enable (*msg);
}
if (overlay_sendmsg_parent (ov, msg) < 0)
goto error;
if (overlay_sendmsg_parent (ov, *msg) < 0)
return -1;
}
break;
default:
goto inval;
errno = EINVAL;
return -1;
}
flux_msg_decref (*msg);
*msg = NULL;
return 0;
}

int overlay_sendmsg (struct overlay *ov,
const flux_msg_t *msg,
overlay_where_t where)
{
flux_msg_t *cpy;

if (!(cpy = flux_msg_copy (msg, true)))
return -1;
if (overlay_sendmsg_new (ov, &cpy, where) < 0) {
flux_msg_destroy (cpy);
return -1;
}
flux_msg_decref (cpy);
return 0;
inval:
errno = EINVAL;
error:
flux_msg_decref (cpy);
return -1;
}

static void sync_cb (flux_future_t *f, void *arg)
Expand Down Expand Up @@ -801,30 +803,25 @@ static int overlay_sendmsg_child (struct overlay *ov, const flux_msg_t *msg)
return rc;
}

/* Push child->uuid onto the message, then pop it off again after sending.
*/
static int overlay_mcast_child_one (struct overlay *ov,
const flux_msg_t *msg,
flux_msg_t *msg,
struct child *child)
{
flux_msg_t *cpy;
int rc = -1;

if (!(cpy = flux_msg_copy (msg, true)))
if (flux_msg_route_push (msg, child->uuid) < 0)
return -1;
flux_msg_route_enable (cpy);
if (flux_msg_route_push (cpy, child->uuid) < 0)
goto done;
if (overlay_sendmsg_child (ov, cpy) < 0)
goto done;
rc = 0;
done:
flux_msg_destroy (cpy);
int rc = overlay_sendmsg_child (ov, msg);
(void)flux_msg_route_delete_last (msg);
return rc;
}

static void overlay_mcast_child (struct overlay *ov, const flux_msg_t *msg)
static void overlay_mcast_child (struct overlay *ov, flux_msg_t *msg)
{
struct child *child;

flux_msg_route_enable (msg);

foreach_overlay_child (ov, child) {
if (subtree_is_online (child->status)) {
if (overlay_mcast_child_one (ov, msg, child) < 0) {
Expand All @@ -841,7 +838,8 @@ static void overlay_mcast_child (struct overlay *ov, const flux_msg_t *msg)
static void logdrop (struct overlay *ov,
overlay_where_t where,
const flux_msg_t *msg,
const char *fmt, ...)
const char *fmt,
...)
{
char reason[128];
va_list ap;
Expand Down Expand Up @@ -883,8 +881,10 @@ static int clear_msg_role (flux_msg_t *msg, uint32_t role)

/* Handle a message received from TBON child (downstream).
*/
static void child_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg)
static void child_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
struct overlay *ov = arg;
flux_msg_t *msg;
Expand Down Expand Up @@ -972,7 +972,9 @@ static void child_cb (flux_reactor_t *r, flux_watcher_t *w,
case FLUX_MSGTYPE_EVENT:
break;
}
ov->recv_cb (msg, OVERLAY_DOWNSTREAM, ov->recv_arg);
if (ov->recv_cb (&msg, OVERLAY_DOWNSTREAM, ov->recv_arg) < 0)
goto done;
return;
done:
flux_msg_decref (msg);
}
Expand All @@ -991,8 +993,10 @@ static void fail_parent_rpc (const flux_msg_t *msg, void *arg)
log_tracker_error (ov->h, msg, errno);
}

static void parent_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg)
static void parent_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
struct overlay *ov = arg;
flux_msg_t *msg;
Expand Down Expand Up @@ -1056,7 +1060,9 @@ static void parent_cb (flux_reactor_t *r, flux_watcher_t *w,
default:
break;
}
ov->recv_cb (msg, OVERLAY_UPSTREAM, ov->recv_arg);
if (ov->recv_cb (&msg, OVERLAY_UPSTREAM, ov->recv_arg) < 0)
goto done;
return;
done:
flux_msg_destroy (msg);
}
Expand Down Expand Up @@ -1203,8 +1209,11 @@ static void hello_response_handler (struct overlay *ov, const flux_msg_t *msg)
errno = saved_errno;
goto error;
}
flux_log (ov->h, LOG_DEBUG, "hello parent %lu %s",
(unsigned long)ov->parent.rank, uuid);
flux_log (ov->h,
LOG_DEBUG,
"hello parent %lu %s",
(unsigned long)ov->parent.rank,
uuid);
snprintf (ov->parent.uuid, sizeof (ov->parent.uuid), "%s", uuid);
ov->parent.hello_responded = true;
ov->parent.hello_error = false;
Expand Down
Loading

0 comments on commit 420e8e7

Please sign in to comment.