Skip to content

Commit

Permalink
erts: Fix fragmented send to finish before exiting
Browse files Browse the repository at this point in the history
If a process is suspended doing a fragmented send and then
receives an exit signal it was terminated before it could
finish sending the message leading to a memory leak on the
receiving side.

This change fixes that so that the message is allowed to
finish being sent before the process exits.

Closes #5876
  • Loading branch information
garazdawi committed Apr 27, 2022
1 parent 6d5a5f3 commit c78a8e9
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 10 deletions.
5 changes: 4 additions & 1 deletion erts/emulator/beam/bif.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ static Export* flush_monitor_messages_trap = NULL;
static Export* set_cpu_topology_trap = NULL;
static Export* await_port_send_result_trap = NULL;
Export* erts_format_cpu_topology_trap = NULL;
static Export dsend_continue_trap_export;
#ifndef DEBUG
static
#endif
Export dsend_continue_trap_export;
Export *erts_convert_time_unit_trap = NULL;

static Export *await_msacc_mod_trap = NULL;
Expand Down
7 changes: 5 additions & 2 deletions erts/emulator/beam/dist.c
Original file line number Diff line number Diff line change
Expand Up @@ -2533,6 +2533,7 @@ erts_dsig_send(ErtsDSigSendContext *ctx)
erts_mtx_unlock(&dep->qlock);

plp = erts_proclist_create(ctx->c_p);

erts_suspend(ctx->c_p, ERTS_PROC_LOCK_MAIN, NULL);
suspended = 1;
erts_mtx_lock(&dep->qlock);
Expand Down Expand Up @@ -2607,12 +2608,15 @@ erts_dsig_send(ErtsDSigSendContext *ctx)
}
/* More fragments left to be sent, yield and re-schedule */
if (ctx->fragments) {
ctx->c_p->flags |= F_FRAGMENTED_SEND;
retval = ERTS_DSIG_SEND_CONTINUE;
if (!resume && erts_system_monitor_flags.busy_dist_port)
monitor_generic(ctx->c_p, am_busy_dist_port, cid);
goto done;
}
}

if (ctx->c_p) ctx->c_p->flags &= ~F_FRAGMENTED_SEND;
ctx->obuf = NULL;

if (suspended) {
Expand Down Expand Up @@ -2991,9 +2995,8 @@ erts_dist_command(Port *prt, int initial_reds)
obufsize = 0;
if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT)
&& de_busy && qsize < erts_dist_buf_busy_limit) {
ErtsProcList *suspendees;
int resumed;
suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
ErtsProcList *suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
erts_mtx_unlock(&dep->qlock);

resumed = erts_resume_processes(suspendees);
Expand Down
6 changes: 4 additions & 2 deletions erts/emulator/beam/erl_proc_sig_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -3444,9 +3444,11 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
ErtsMonitorSuspend *msp;
erts_aint_t mstate;
msp = (ErtsMonitorSuspend *) erts_monitor_to_data(tmon);
mstate = erts_atomic_read_acqb(&msp->state);
if (mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE)
mstate = erts_atomic_read_band_acqb(
&msp->state, ~ERTS_MSUSPEND_STATE_FLG_ACTIVE);
if (mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE) {
erts_resume(c_p, ERTS_PROC_LOCK_MAIN);
}
break;
}
default:
Expand Down
34 changes: 31 additions & 3 deletions erts/emulator/beam/erl_process.c
Original file line number Diff line number Diff line change
Expand Up @@ -12692,6 +12692,7 @@ enum continue_exit_phase {
ERTS_CONTINUE_EXIT_MONITORS,
ERTS_CONTINUE_EXIT_LT_MONITORS,
ERTS_CONTINUE_EXIT_HANDLE_PROC_SIG,
ERTS_CONTINUE_EXIT_DIST_SEND,
ERTS_CONTINUE_EXIT_DIST_LINKS,
ERTS_CONTINUE_EXIT_DIST_MONITORS,
ERTS_CONTINUE_EXIT_DONE,
Expand All @@ -12709,6 +12710,10 @@ struct continue_exit_state {
Uint32 block_rla_ref;
};

#ifdef DEBUG
extern Export dsend_continue_trap_export;
#endif

void
erts_continue_exit_process(Process *p)
{
Expand Down Expand Up @@ -12946,6 +12951,20 @@ erts_continue_exit_process(Process *p)
trap_state->pectxt.dist_state = NIL;
trap_state->pectxt.yield = 0;

p->rcount = 0;

if (p->flags & F_FRAGMENTED_SEND) {
/* The process was re-scheduled while doing a fragmented
distributed send (possibly because it was suspended).
We need to finish doing that send as otherwise incomplete
fragmented messages will be sent to other nodes potentially
causing memory leaks.
*/
ASSERT(p->current == &dsend_continue_trap_export.info.mfa);
/* arg_reg[0] is the argument used in dsend_continue_trap_export */
trap_state->pectxt.dist_state = p->arg_reg[0];
}

erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ);

erts_proc_sig_fetch(p);
Expand Down Expand Up @@ -13004,11 +13023,12 @@ erts_continue_exit_process(Process *p)

reds -= r;

trap_state->phase = ERTS_CONTINUE_EXIT_DIST_LINKS;
trap_state->phase = ERTS_CONTINUE_EXIT_DIST_SEND;
}
case ERTS_CONTINUE_EXIT_DIST_LINKS: {
case ERTS_CONTINUE_EXIT_DIST_SEND: {

continue_dist_send:
continue_dist_send:
ASSERT(p->rcount == 0);
if (is_not_nil(trap_state->pectxt.dist_state)) {
Binary* bin = erts_magic_ref2bin(trap_state->pectxt.dist_state);
ErtsDSigSendContext* ctx = (ErtsDSigSendContext*) ERTS_MAGIC_BIN_DATA(bin);
Expand Down Expand Up @@ -13041,6 +13061,13 @@ erts_continue_exit_process(Process *p)
goto restart;
}

trap_state->phase = ERTS_CONTINUE_EXIT_DIST_LINKS;
}
case ERTS_CONTINUE_EXIT_DIST_LINKS: {

if (is_not_nil(trap_state->pectxt.dist_state))
goto continue_dist_send;

reds = erts_link_tree_foreach_delete_yielding(
&trap_state->pectxt.dist_links,
erts_proc_exit_handle_dist_link,
Expand All @@ -13049,6 +13076,7 @@ erts_continue_exit_process(Process *p)
reds);
if (reds <= 0 || trap_state->pectxt.yield)
goto yield;

trap_state->phase = ERTS_CONTINUE_EXIT_DIST_MONITORS;
}
case ERTS_CONTINUE_EXIT_DIST_MONITORS: {
Expand Down
1 change: 1 addition & 0 deletions erts/emulator/beam/erl_process.h
Original file line number Diff line number Diff line change
Expand Up @@ -1435,6 +1435,7 @@ extern int erts_system_profile_ts_type;
#define F_DIRTY_MINOR_GC (1 << 21) /* Dirty minor GC scheduled */
#define F_HIBERNATED (1 << 22) /* Hibernated */
#define F_TRAP_EXIT (1 << 23) /* Trapping exit */
#define F_FRAGMENTED_SEND (1 << 24) /* Process is doing a distributed fragmented send */

/* Signal queue flags */
#define FS_OFF_HEAP_MSGQ (1 << 0) /* Off heap msg queue */
Expand Down
7 changes: 5 additions & 2 deletions erts/etc/unix/etp-commands.in
Original file line number Diff line number Diff line change
Expand Up @@ -2091,8 +2091,11 @@ end
define etp-proc-flags-int
# Args: int
#
if ($arg0 & ~((1 << 24)-1))
printf "GARBAGE<%x> ", ($arg0 & ~((1 << 24)-1))
if ($arg0 & ~((1 << 25)-1))
printf "GARBAGE<%x> ", ($arg0 & ~((1 << 25)-1))
end
if ($arg0 & (1 << 24))
printf "fragmented-send "
end
if ($arg0 & (1 << 23))
printf "trap-exit "
Expand Down

0 comments on commit c78a8e9

Please sign in to comment.