Skip to content

Commit

Permalink
Make ensure_monitors more defensive in SAC coordinator
Browse files Browse the repository at this point in the history
Do not assume the connection PID of a consumer is still
known from the state on state cleaning when unregistering
a consumer.

Fixes #5889
  • Loading branch information
acogoluegnes authored and kjnilsson committed Oct 3, 2022
1 parent d47ad8e commit f233870
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 1 deletion.
3 changes: 2 additions & 1 deletion deps/rabbit/src/rabbit_stream_sac_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,8 @@ ensure_monitors(#command_unregister_consumer{vhost = VirtualHost,
#?MODULE{groups = StreamGroups0, pids_groups = PidsGroups0} =
State0,
Monitors,
Effects) ->
Effects)
when is_map_key(Pid, PidsGroups0) ->
GroupId = {VirtualHost, Stream, ConsumerName},
#{Pid := PidGroup0} = PidsGroups0,
PidGroup1 =
Expand Down
16 changes: 16 additions & 0 deletions deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ ensure_monitors_test(_) ->
?assertEqual(#{self() => sac}, Monitors2),
?assertEqual([{monitor, process, self()}, {monitor, node, node()}],
Effects2),

Group2 = cgroup([consumer(self(), 1, true)]),

Command2 =
Expand All @@ -278,6 +279,21 @@ ensure_monitors_test(_) ->
?assertEqual(#{self() => sac}, Monitors3),
?assertEqual([], Effects3),

%% trying with an unknown connection PID
%% the function should not change anything
UnknownConnectionPid = spawn(fun() -> ok end),
PassthroughCommand =
unregister_consumer_command(<<"stream">>,
<<"app">>,
UnknownConnectionPid,
0),

{State3, Monitors3, Effects3} =
rabbit_stream_sac_coordinator:ensure_monitors(PassthroughCommand,
State3,
Monitors3,
[]),

Command3 =
unregister_consumer_command(<<"stream">>, <<"app">>, self(), 1),

Expand Down

0 comments on commit f233870

Please sign in to comment.