From f2338709cdba748ec3d7e7869e4ef228d7a0eb2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 28 Sep 2022 16:00:00 +0200 Subject: [PATCH] Make ensure_monitors more defensive in SAC coordinator Do not assume the connection PID of a consumer is still known from the state on state cleaning when unregistering a consumer. Fixes #5889 --- .../rabbit/src/rabbit_stream_sac_coordinator.erl | 3 ++- .../test/rabbit_stream_sac_coordinator_SUITE.erl | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl index dc01c99c673e..8ee12b0e2ff0 100644 --- a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl @@ -379,7 +379,8 @@ ensure_monitors(#command_unregister_consumer{vhost = VirtualHost, #?MODULE{groups = StreamGroups0, pids_groups = PidsGroups0} = State0, Monitors, - Effects) -> + Effects) + when is_map_key(Pid, PidsGroups0) -> GroupId = {VirtualHost, Stream, ConsumerName}, #{Pid := PidGroup0} = PidsGroups0, PidGroup1 = diff --git a/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl b/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl index 1282800dbc52..7412a38de2b2 100644 --- a/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl @@ -260,6 +260,7 @@ ensure_monitors_test(_) -> ?assertEqual(#{self() => sac}, Monitors2), ?assertEqual([{monitor, process, self()}, {monitor, node, node()}], Effects2), + Group2 = cgroup([consumer(self(), 1, true)]), Command2 = @@ -278,6 +279,21 @@ ensure_monitors_test(_) -> ?assertEqual(#{self() => sac}, Monitors3), ?assertEqual([], Effects3), + %% trying with an unknown connection PID + %% the function should not change anything + UnknownConnectionPid = spawn(fun() -> ok end), + PassthroughCommand = + unregister_consumer_command(<<"stream">>, + <<"app">>, + UnknownConnectionPid, + 0), + + {State3, Monitors3, Effects3} = + rabbit_stream_sac_coordinator:ensure_monitors(PassthroughCommand, + State3, + Monitors3, + []), + Command3 = unregister_consumer_command(<<"stream">>, <<"app">>, self(), 1),