Skip to content

Commit

Permalink
Merge branch 'lukas/24/erts/fix-dist-fragment-exit-leak/OTP-18077' in…
Browse files Browse the repository at this point in the history
…to lukas/25/erts/fix-dist-fragment-exit-leak/OTP-18077
  • Loading branch information
garazdawi committed Apr 27, 2022
2 parents a7b9802 + 7694529 commit dc7cf3b
Show file tree
Hide file tree
Showing 7 changed files with 425 additions and 21 deletions.
5 changes: 4 additions & 1 deletion erts/emulator/beam/bif.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ static Export* flush_monitor_messages_trap = NULL;
static Export* set_cpu_topology_trap = NULL;
static Export* await_port_send_result_trap = NULL;
Export* erts_format_cpu_topology_trap = NULL;
static Export dsend_continue_trap_export;
#ifndef DEBUG
static
#endif
Export dsend_continue_trap_export;
Export *erts_convert_time_unit_trap = NULL;

static Export *await_msacc_mod_trap = NULL;
Expand Down
7 changes: 5 additions & 2 deletions erts/emulator/beam/dist.c
Original file line number Diff line number Diff line change
Expand Up @@ -3396,6 +3396,7 @@ erts_dsig_send(ErtsDSigSendContext *ctx)
erts_mtx_unlock(&dep->qlock);

plp = erts_proclist_create(ctx->c_p);

erts_suspend(ctx->c_p, ERTS_PROC_LOCK_MAIN, NULL);
suspended = 1;
erts_mtx_lock(&dep->qlock);
Expand Down Expand Up @@ -3466,12 +3467,15 @@ erts_dsig_send(ErtsDSigSendContext *ctx)
}
/* More fragments left to be sent, yield and re-schedule */
if (ctx->fragments) {
ctx->c_p->flags |= F_FRAGMENTED_SEND;
retval = ERTS_DSIG_SEND_CONTINUE;
if (!resume && erts_system_monitor_flags.busy_dist_port)
monitor_generic(ctx->c_p, am_busy_dist_port, cid);
goto done;
}
}

if (ctx->c_p) ctx->c_p->flags &= ~F_FRAGMENTED_SEND;
ctx->obuf = NULL;

if (suspended) {
Expand Down Expand Up @@ -3855,9 +3859,8 @@ erts_dist_command(Port *prt, int initial_reds)
obufsize = 0;
if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT)
&& de_busy && qsize < erts_dist_buf_busy_limit) {
ErtsProcList *suspendees;
int resumed;
suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
ErtsProcList *suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
erts_mtx_unlock(&dep->qlock);

resumed = erts_resume_processes(suspendees);
Expand Down
7 changes: 5 additions & 2 deletions erts/emulator/beam/erl_proc_sig_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -5469,9 +5469,11 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
ErtsMonitorSuspend *msp;
erts_aint_t mstate;
msp = (ErtsMonitorSuspend *) erts_monitor_to_data(tmon);
mstate = erts_atomic_read_acqb(&msp->state);
if (mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE)
mstate = erts_atomic_read_band_acqb(
&msp->state, ~ERTS_MSUSPEND_STATE_FLG_ACTIVE);
if (mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE) {
erts_resume(c_p, ERTS_PROC_LOCK_MAIN);
}
break;
}
default:
Expand Down Expand Up @@ -6307,6 +6309,7 @@ erts_proc_sig_signal_size(ErtsSignal *sig)
case ERTS_MON_TYPE_PROC:
case ERTS_MON_TYPE_DIST_PROC:
case ERTS_MON_TYPE_NODE:
case ERTS_MON_TYPE_SUSPEND:
size = erts_monitor_size((ErtsMonitor *) sig);
break;
default:
Expand Down
34 changes: 31 additions & 3 deletions erts/emulator/beam/erl_process.c
Original file line number Diff line number Diff line change
Expand Up @@ -13726,6 +13726,7 @@ enum continue_exit_phase {
ERTS_CONTINUE_EXIT_MONITORS,
ERTS_CONTINUE_EXIT_LT_MONITORS,
ERTS_CONTINUE_EXIT_HANDLE_PROC_SIG,
ERTS_CONTINUE_EXIT_DIST_SEND,
ERTS_CONTINUE_EXIT_DIST_LINKS,
ERTS_CONTINUE_EXIT_DIST_MONITORS,
ERTS_CONTINUE_EXIT_DIST_PEND_SPAWN_MONITORS,
Expand All @@ -13744,6 +13745,10 @@ struct continue_exit_state {
Uint32 block_rla_ref;
};

#ifdef DEBUG
extern Export dsend_continue_trap_export;
#endif

void
erts_continue_exit_process(Process *p)
{
Expand Down Expand Up @@ -13983,6 +13988,20 @@ erts_continue_exit_process(Process *p)
trap_state->pectxt.dist_state = NIL;
trap_state->pectxt.yield = 0;

p->rcount = 0;

if (p->flags & F_FRAGMENTED_SEND) {
/* The process was re-scheduled while doing a fragmented
distributed send (possibly because it was suspended).
We need to finish doing that send as otherwise incomplete
fragmented messages will be sent to other nodes potentially
causing memory leaks.
*/
ASSERT(p->current == &dsend_continue_trap_export.info.mfa);
/* arg_reg[0] is the argument used in dsend_continue_trap_export */
trap_state->pectxt.dist_state = p->arg_reg[0];
}

erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ);

erts_proc_sig_queue_flush_and_deinstall_buffers(p);
Expand Down Expand Up @@ -14045,11 +14064,12 @@ erts_continue_exit_process(Process *p)

reds -= r;

trap_state->phase = ERTS_CONTINUE_EXIT_DIST_LINKS;
trap_state->phase = ERTS_CONTINUE_EXIT_DIST_SEND;
}
case ERTS_CONTINUE_EXIT_DIST_LINKS: {
case ERTS_CONTINUE_EXIT_DIST_SEND: {

continue_dist_send:
continue_dist_send:
ASSERT(p->rcount == 0);
if (is_not_nil(trap_state->pectxt.dist_state)) {
Binary* bin = erts_magic_ref2bin(trap_state->pectxt.dist_state);
ErtsDSigSendContext* ctx = (ErtsDSigSendContext*) ERTS_MAGIC_BIN_DATA(bin);
Expand Down Expand Up @@ -14082,6 +14102,13 @@ erts_continue_exit_process(Process *p)
goto restart;
}

trap_state->phase = ERTS_CONTINUE_EXIT_DIST_LINKS;
}
case ERTS_CONTINUE_EXIT_DIST_LINKS: {

if (is_not_nil(trap_state->pectxt.dist_state))
goto continue_dist_send;

reds = erts_link_tree_foreach_delete_yielding(
&trap_state->pectxt.dist_links,
erts_proc_exit_handle_dist_link,
Expand All @@ -14090,6 +14117,7 @@ erts_continue_exit_process(Process *p)
reds);
if (reds <= 0 || trap_state->pectxt.yield)
goto yield;

trap_state->phase = ERTS_CONTINUE_EXIT_DIST_MONITORS;
}
case ERTS_CONTINUE_EXIT_DIST_MONITORS: {
Expand Down
3 changes: 2 additions & 1 deletion erts/emulator/beam/erl_process.h
Original file line number Diff line number Diff line change
Expand Up @@ -1564,7 +1564,8 @@ extern int erts_system_profile_ts_type;
#define F_DIRTY_MINOR_GC (1 << 20) /* Dirty minor GC scheduled */
#define F_HIBERNATED (1 << 21) /* Hibernated */
#define F_TRAP_EXIT (1 << 22) /* Trapping exit */
#define F_DBG_FORCED_TRAP (1 << 23) /* DEBUG: Last BIF call was a forced trap */
#define F_FRAGMENTED_SEND (1 << 23) /* Process is doing a distributed fragmented send */
#define F_DBG_FORCED_TRAP (1 << 24) /* DEBUG: Last BIF call was a forced trap */

/* Signal queue flags */
#define FS_OFF_HEAP_MSGQ (1 << 0) /* Off heap msg queue */
Expand Down
158 changes: 157 additions & 1 deletion erts/emulator/test/distribution_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
bad_dist_ext_size/1,
start_epmd_false/1, no_epmd/1, epmd_module/1,
bad_dist_fragments/1,
exit_dist_fragments/1,
message_latency_large_message/1,
message_latency_large_link_exit/1,
message_latency_large_monitor_exit/1,
Expand Down Expand Up @@ -101,7 +102,7 @@ all() ->
{group, trap_bif}, {group, dist_auto_connect},
dist_parallel_send, atom_roundtrip, unicode_atom_roundtrip,
contended_atom_cache_entry, contended_unicode_atom_cache_entry,
{group, message_latency},
{group, message_latency}, exit_dist_fragments,
{group, bad_dist}, {group, bad_dist_ext},
dist_entry_refc_race,
start_epmd_false, no_epmd, epmd_module, system_limit,
Expand Down Expand Up @@ -2433,6 +2434,161 @@ dmsg_bad_atom_cache_ref() ->
dmsg_bad_tag() -> %% Will fail early at heap size calculation
[$?, 66].

%% Test that processes exiting while sending a fragmented message works
%% as it should. We test that this works while the process doing the send
%% is suspended/resumed in order to trigger bugs in suspend/resume handling
%% while exiting.
%% We also make sure that the binary memory of the receiving node does not grow
%% without shrinking back as there used to be a memory leak on the receiving side.
exit_dist_fragments(_Config) ->
{ok, Peer, Node} = ?CT_PEER(),
try
ct:log("Allocations before:~n~p",[erpc:call(Node,instrument,allocations, [])]),
{BinInfo, BinInfoMref} =
spawn_monitor(Node,
fun() ->
(fun F(Acc) ->
H = try erlang:memory(binary)
catch _:_ -> 0 end,
receive
{get, Pid} -> Pid ! lists:reverse([H|Acc])
after 100 ->
F([H|Acc])
end
end)([])
end),
{Tracer, Mref} = spawn_monitor(fun gather_exited/0),
erlang:trace(self(), true, [{tracer, Tracer}, set_on_spawn, procs, exiting]),
exit_suspend(Node),
receive
{'DOWN',Mref,_,_,_} ->
BinInfo ! {get, self()},
receive
{'DOWN',BinInfoMref,_,_,Reason} ->
ct:fail(Reason);
Info ->
Before = hd(Info),
Max = lists:max(Info),
After = lists:last(Info),
ct:log("Binary memory before: ~p~n"
"Binary memory max: ~p~n"
"Binary memory after: ~p",
[Before, Max, After]),
ct:log("Allocations after:~n~p",
[erpc:call(Node,instrument,allocations, [])]),
%% We check that the binary data
%% used after is not too large
if
Before * 10 =< After ->
ct:fail("Binary memory after is much larger than before!");
true -> ok
end
end
end
after
peer:stop(Peer)
end.

%% Make sure that each spawned process also has exited
gather_exited() ->
process_flag(message_queue_data, off_heap),
gather_exited(#{}).
gather_exited(Pids) ->
receive
{trace,Pid,spawned,_,_} ->
gather_exited(Pids#{ Pid => true });
{trace,Pid,exited_out,_,_} ->
{true, NewPids} = maps:take(Pid, Pids),
gather_exited(NewPids);
_M ->
gather_exited(Pids)
after 1000 ->
if Pids == #{} -> ok;
true -> exit(Pids)
end
end.

exit_suspend(RemoteNode) ->
exit_suspend(RemoteNode, 100).
exit_suspend(RemoteNode, N) ->
Payload = [<<0:100000/unit:8>> || _ <- lists:seq(1, 10)],
exit_suspend(RemoteNode, N, Payload).
exit_suspend(RemoteNode, N, Payload) ->
Echo = fun F() ->
receive
{From, Msg} ->
From ! erlang:iolist_size(Msg),
F()
end
end,
Pinger =
fun() ->
false = process_flag(trap_exit, true),
RemotePid = spawn_link(RemoteNode, Echo),
Iterations = case erlang:system_info(emu_type) of
opt ->
100;
_ ->
10
end,
exit_suspend_loop(RemotePid, 2, Payload, Iterations)
end,
Pids = [spawn_link(Pinger) || _ <- lists:seq(1, N)],
MRefs = [monitor(process, Pid) || Pid <- Pids],
[receive {'DOWN',MRef,_,_,_} -> ok end || MRef <- MRefs],
Pids.

exit_suspend_loop(RemotePid, _Suspenders, _Payload, 0) ->
exit(RemotePid, die),
receive
{'EXIT', RemotePid, _} ->
ok
end;
exit_suspend_loop(RemotePid, Suspenders, Payload, N) ->
LocalPid = spawn_link(
fun() ->
Parent = self(),
[spawn_link(
fun F() ->
try
begin
erlang:suspend_process(Parent),
erlang:yield(),
erlang:suspend_process(Parent),
erlang:yield(),
erlang:resume_process(Parent),
erlang:yield(),
erlang:suspend_process(Parent),
erlang:yield(),
erlang:resume_process(Parent),
erlang:yield(),
erlang:resume_process(Parent),
erlang:yield()
end of
_ ->
F()
catch _:_ ->
ok
end
end) || _ <- lists:seq(1, Suspenders)],
(fun F() ->
RemotePid ! {self(), Payload},
receive _IOListSize -> ok end,
F()
end)()
end),
exit_suspend_loop(LocalPid, RemotePid, Suspenders, Payload, N - 1).
exit_suspend_loop(LocalPid, RemotePid, Suspenders, Payload, N) ->
receive
{'EXIT', LocalPid, _} ->
exit_suspend_loop(RemotePid, Suspenders, Payload, N);
{'EXIT', _, Reason} ->
exit(Reason)
after 100 ->
exit(LocalPid, die),
exit_suspend_loop(LocalPid, RemotePid, Suspenders, Payload, N)
end.

start_epmd_false(Config) when is_list(Config) ->
%% Start a node with the option -start_epmd false.
{ok, Peer, OtherNode} = ?CT_PEER(["-start_epmd", "false"]),
Expand Down
Loading

0 comments on commit dc7cf3b

Please sign in to comment.