Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

broker: eliminate some message copies #5559

Merged
merged 11 commits into from
Nov 22, 2023
336 changes: 196 additions & 140 deletions src/broker/broker.c

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/broker/broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
struct broker {
void *zctx;
flux_t *h;
flux_t *h_internal;
flux_watcher_t *w_internal;
flux_reactor_t *reactor;
optparse_t *opts;

Expand Down
8 changes: 4 additions & 4 deletions src/broker/modhash.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,22 @@ struct modhash {
zhash_t *zh_byuuid;
};

int modhash_response_sendmsg (modhash_t *mh, const flux_msg_t *msg)
int modhash_response_sendmsg_new (modhash_t *mh, flux_msg_t **msg)
{
const char *uuid;
module_t *p;

if (!msg)
if (!*msg)
return 0;
if (!(uuid = flux_msg_route_last (msg))) {
if (!(uuid = flux_msg_route_last (*msg))) {
errno = EPROTO;
return -1;
}
if (!(p = zhash_lookup (mh->zh_byuuid, uuid))) {
errno = ENOSYS;
return -1;
}
return module_sendmsg (p, msg);
return module_sendmsg_new (p, msg);
}

void modhash_add (modhash_t *mh, module_t *p)
Expand Down
2 changes: 1 addition & 1 deletion src/broker/modhash.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ int modhash_event_mcast (modhash_t *mh, const flux_msg_t *msg);
/* Send a response message to the module whose uuid matches the
* next hop in the routing stack.
*/
int modhash_response_sendmsg (modhash_t *mh, const flux_msg_t *msg);
int modhash_response_sendmsg_new (modhash_t *mh, flux_msg_t **msg);

/* Find a module matching 'uuid'.
*/
Expand Down
16 changes: 10 additions & 6 deletions src/broker/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -420,15 +420,15 @@
return flux_recv (p->h_broker, FLUX_MATCH_ANY, FLUX_O_NONBLOCK);
}

int module_sendmsg (module_t *p, const flux_msg_t *msg)
int module_sendmsg_new (module_t *p, flux_msg_t **msg)
{
int type;
const char *topic;

if (!msg)
if (!msg || !*msg)
return 0;
if (flux_msg_get_type (msg, &type) < 0
|| flux_msg_get_topic (msg, &topic) < 0)
if (flux_msg_get_type (*msg, &type) < 0
|| flux_msg_get_topic (*msg, &topic) < 0)
return -1;
/* Muted modules only accept response to broker.module-status
*/
Expand All @@ -439,7 +439,7 @@
return -1;
}
}
return flux_send (p->h_broker, msg, 0);
return flux_send_new (p->h_broker, msg, 0);
}

int module_disconnect_arm (module_t *p,
Expand Down Expand Up @@ -633,8 +633,12 @@
if (flux_msg_get_topic (msg, &topic) < 0)
return -1;
if (subhash_topic_match (p->sub, topic)) {
if (module_sendmsg (p, msg) < 0)
flux_msg_t *cpy;
if (!(cpy = flux_msg_copy (msg, true))
|| module_sendmsg_new (p, &cpy) < 0) {
flux_msg_decref (cpy);

Check warning on line 639 in src/broker/module.c

View check run for this annotation

Codecov / codecov/patch

src/broker/module.c#L639

Added line #L639 was not covered by tests
return -1;
}
}
return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion src/broker/module.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void module_set_poller_cb (module_t *p, modpoller_cb_f cb, void *arg);
/* Send/recv a message for to/from a specific module.
*/
flux_msg_t *module_recvmsg (module_t *p);
int module_sendmsg (module_t *p, const flux_msg_t *msg);
int module_sendmsg_new (module_t *p, flux_msg_t **msg);

/* Pass module's requests through this function to enable disconnect
* messages to be sent when the module is unloaded. The callback will
Expand Down
159 changes: 84 additions & 75 deletions src/broker/overlay.c
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@
struct flux_msglist *health_requests;
};

static void overlay_mcast_child (struct overlay *ov, const flux_msg_t *msg);
static void overlay_mcast_child (struct overlay *ov, flux_msg_t *msg);
static int overlay_sendmsg_child (struct overlay *ov, const flux_msg_t *msg);
static int overlay_sendmsg_parent (struct overlay *ov, const flux_msg_t *msg);
static void hello_response_handler (struct overlay *ov, const flux_msg_t *msg);
Expand Down Expand Up @@ -572,20 +572,18 @@
return -1;
}

int overlay_sendmsg (struct overlay *ov,
const flux_msg_t *msg,
overlay_where_t where)
int overlay_sendmsg_new (struct overlay *ov,
flux_msg_t **msg,
overlay_where_t where)
{
int type;
uint8_t flags;
flux_msg_t *cpy = NULL;
const char *uuid;
uint32_t nodeid;
struct child *child = NULL;
int rc;

if (flux_msg_get_type (msg, &type) < 0
|| flux_msg_get_flags (msg, &flags) < 0)
if (flux_msg_get_type (*msg, &type) < 0
|| flux_msg_get_flags (*msg, &flags) < 0)
return -1;
switch (type) {
case FLUX_MSGTYPE_REQUEST:
Expand All @@ -595,47 +593,40 @@
* select the peer, and our uuid remains as part of the source addr.
*/
if (where == OVERLAY_ANY) {
if (flux_msg_get_nodeid (msg, &nodeid) < 0)
goto error;
if (flux_msg_get_nodeid (*msg, &nodeid) < 0)
return -1;
if ((flags & FLUX_MSGFLAG_UPSTREAM) && nodeid == ov->rank)
where = OVERLAY_UPSTREAM;
else {
if ((child = child_lookup_route (ov, nodeid))) {
if (!subtree_is_online (child->status)) {
errno = EHOSTUNREACH;
goto error;
return -1;
}
if (!(cpy = flux_msg_copy (msg, true)))
goto error;
if (flux_msg_route_push (cpy, ov->uuid) < 0)
goto error;
if (flux_msg_route_push (cpy, child->uuid) < 0)
goto error;
msg = cpy;
if (flux_msg_route_push (*msg, ov->uuid) < 0
|| flux_msg_route_push (*msg, child->uuid) < 0)
return -1;

Check warning on line 608 in src/broker/overlay.c

View check run for this annotation

Codecov / codecov/patch

src/broker/overlay.c#L608

Added line #L608 was not covered by tests
where = OVERLAY_DOWNSTREAM;
}
else
where = OVERLAY_UPSTREAM;
}
}
if (where == OVERLAY_UPSTREAM) {
rc = overlay_sendmsg_parent (ov, msg);
if (rc == 0)
rpc_track_update (ov->parent.tracker, msg);
if (overlay_sendmsg_parent (ov, *msg) < 0)
return -1;
rpc_track_update (ov->parent.tracker, *msg);
}
else {
rc = overlay_sendmsg_child (ov, msg);
if (rc == 0) {
if (!child) {
if ((uuid = flux_msg_route_last (msg)))
child = child_lookup_online (ov, ov->uuid);
}
if (child)
rpc_track_update (child->tracker, msg);
if (overlay_sendmsg_child (ov, *msg) < 0)
return -1;
if (!child) {
if ((uuid = flux_msg_route_last (*msg)))
child = child_lookup_online (ov, ov->uuid);

Check warning on line 625 in src/broker/overlay.c

View check run for this annotation

Codecov / codecov/patch

src/broker/overlay.c#L624-L625

Added lines #L624 - L625 were not covered by tests
}
if (child)
rpc_track_update (child->tracker, *msg);
}
if (rc < 0)
goto error;
break;
case FLUX_MSGTYPE_RESPONSE:
/* Assume if next route matches parent, the message goes upstream;
Expand All @@ -644,46 +635,57 @@
*/
if (where == OVERLAY_ANY) {
if (ov->rank > 0
&& (uuid = flux_msg_route_last (msg)) != NULL
&& (uuid = flux_msg_route_last (*msg)) != NULL
&& streq (uuid, ov->parent.uuid))
where = OVERLAY_UPSTREAM;
else
where = OVERLAY_DOWNSTREAM;
}
if (where == OVERLAY_UPSTREAM)
rc = overlay_sendmsg_parent (ov, msg);
else
rc = overlay_sendmsg_child (ov, msg);
if (rc < 0)
goto error;
if (where == OVERLAY_UPSTREAM) {
if (overlay_sendmsg_parent (ov, *msg) < 0)
return -1;
}
else {
if (overlay_sendmsg_child (ov, *msg) < 0)
return -1;
}
break;
case FLUX_MSGTYPE_EVENT:
if (where == OVERLAY_DOWNSTREAM || where == OVERLAY_ANY)
overlay_mcast_child (ov, msg);
overlay_mcast_child (ov, *msg);
else {
/* N.B. add route delimiter if needed to pass unpublished
* event message upstream through router socket.
*/
if (!(flags & FLUX_MSGFLAG_ROUTE)) {
if (!(cpy = flux_msg_copy (msg, true)))
goto error;
flux_msg_route_enable (cpy);
msg = cpy;
flux_msg_route_enable (*msg);

Check warning on line 661 in src/broker/overlay.c

View check run for this annotation

Codecov / codecov/patch

src/broker/overlay.c#L661

Added line #L661 was not covered by tests
}
if (overlay_sendmsg_parent (ov, msg) < 0)
goto error;
if (overlay_sendmsg_parent (ov, *msg) < 0)
return -1;
}
break;
default:
goto inval;
errno = EINVAL;
return -1;

Check warning on line 669 in src/broker/overlay.c

View check run for this annotation

Codecov / codecov/patch

src/broker/overlay.c#L668-L669

Added lines #L668 - L669 were not covered by tests
}
flux_msg_decref (*msg);
*msg = NULL;
return 0;
}

int overlay_sendmsg (struct overlay *ov,
const flux_msg_t *msg,
overlay_where_t where)
{
flux_msg_t *cpy;

if (!(cpy = flux_msg_copy (msg, true)))
return -1;
if (overlay_sendmsg_new (ov, &cpy, where) < 0) {
flux_msg_destroy (cpy);
return -1;
}
flux_msg_decref (cpy);
return 0;
inval:
errno = EINVAL;
error:
flux_msg_decref (cpy);
return -1;
}

static void sync_cb (flux_future_t *f, void *arg)
Expand Down Expand Up @@ -801,30 +803,25 @@
return rc;
}

/* Push child->uuid onto the message, then pop it off again after sending.
*/
static int overlay_mcast_child_one (struct overlay *ov,
const flux_msg_t *msg,
flux_msg_t *msg,
struct child *child)
{
flux_msg_t *cpy;
int rc = -1;

if (!(cpy = flux_msg_copy (msg, true)))
if (flux_msg_route_push (msg, child->uuid) < 0)
return -1;
flux_msg_route_enable (cpy);
if (flux_msg_route_push (cpy, child->uuid) < 0)
goto done;
if (overlay_sendmsg_child (ov, cpy) < 0)
goto done;
rc = 0;
done:
flux_msg_destroy (cpy);
int rc = overlay_sendmsg_child (ov, msg);
(void)flux_msg_route_delete_last (msg);
return rc;
}

static void overlay_mcast_child (struct overlay *ov, const flux_msg_t *msg)
static void overlay_mcast_child (struct overlay *ov, flux_msg_t *msg)
{
struct child *child;

flux_msg_route_enable (msg);

foreach_overlay_child (ov, child) {
if (subtree_is_online (child->status)) {
if (overlay_mcast_child_one (ov, msg, child) < 0) {
Expand All @@ -841,7 +838,8 @@
static void logdrop (struct overlay *ov,
overlay_where_t where,
const flux_msg_t *msg,
const char *fmt, ...)
const char *fmt,
...)
{
char reason[128];
va_list ap;
Expand Down Expand Up @@ -883,8 +881,10 @@

/* Handle a message received from TBON child (downstream).
*/
static void child_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg)
static void child_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
struct overlay *ov = arg;
flux_msg_t *msg;
Expand Down Expand Up @@ -972,7 +972,9 @@
case FLUX_MSGTYPE_EVENT:
break;
}
ov->recv_cb (msg, OVERLAY_DOWNSTREAM, ov->recv_arg);
if (ov->recv_cb (&msg, OVERLAY_DOWNSTREAM, ov->recv_arg) < 0)
goto done;

Check warning on line 976 in src/broker/overlay.c

View check run for this annotation

Codecov / codecov/patch

src/broker/overlay.c#L976

Added line #L976 was not covered by tests
return;
done:
flux_msg_decref (msg);
}
Expand All @@ -991,8 +993,10 @@
log_tracker_error (ov->h, msg, errno);
}

static void parent_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg)
static void parent_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
struct overlay *ov = arg;
flux_msg_t *msg;
Expand Down Expand Up @@ -1056,7 +1060,9 @@
default:
break;
}
ov->recv_cb (msg, OVERLAY_UPSTREAM, ov->recv_arg);
if (ov->recv_cb (&msg, OVERLAY_UPSTREAM, ov->recv_arg) < 0)
goto done;
return;
done:
flux_msg_destroy (msg);
}
Expand Down Expand Up @@ -1203,8 +1209,11 @@
errno = saved_errno;
goto error;
}
flux_log (ov->h, LOG_DEBUG, "hello parent %lu %s",
(unsigned long)ov->parent.rank, uuid);
flux_log (ov->h,
LOG_DEBUG,
"hello parent %lu %s",
(unsigned long)ov->parent.rank,
uuid);
snprintf (ov->parent.uuid, sizeof (ov->parent.uuid), "%s", uuid);
ov->parent.hello_responded = true;
ov->parent.hello_error = false;
Expand Down
Loading
Loading