Skip to content

Commit

Permalink
Use pg_local to track AMQP 1.0 connections
Browse files Browse the repository at this point in the history
Fixes #9371

Moves `pg_local` to `rabbit_common`
  • Loading branch information
lukebakken committed Sep 12, 2023
1 parent c30ae67 commit fb57043
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 34 deletions.
5 changes: 0 additions & 5 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1004,11 +1004,6 @@ rabbitmq_suite(
],
)

rabbitmq_suite(
name = "unit_pg_local_SUITE",
size = "small",
)

rabbitmq_suite(
name = "unit_plugin_directories_SUITE",
size = "small",
Expand Down
11 changes: 0 additions & 11 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ def all_beam_files(name = "all_beam_files"):
"src/mc_util.erl",
"src/mirrored_supervisor.erl",
"src/mirrored_supervisor_sups.erl",
"src/pg_local.erl",
"src/pid_recomposition.erl",
"src/rabbit.erl",
"src/rabbit_access_control.erl",
Expand Down Expand Up @@ -292,7 +291,6 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/mc_util.erl",
"src/mirrored_supervisor.erl",
"src/mirrored_supervisor_sups.erl",
"src/pg_local.erl",
"src/pid_recomposition.erl",
"src/rabbit.erl",
"src/rabbit_access_control.erl",
Expand Down Expand Up @@ -555,7 +553,6 @@ def all_srcs(name = "all_srcs"):
"src/mc_util.erl",
"src/mirrored_supervisor.erl",
"src/mirrored_supervisor_sups.erl",
"src/pg_local.erl",
"src/pid_recomposition.erl",
"src/rabbit.erl",
"src/rabbit_access_control.erl",
Expand Down Expand Up @@ -1768,14 +1765,6 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/rabbit_common:erlang_app"],
)
erlang_bytecode(
name = "unit_pg_local_SUITE_beam_files",
testonly = True,
srcs = ["test/unit_pg_local_SUITE.erl"],
outs = ["test/unit_pg_local_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
)
erlang_bytecode(
name = "unit_plugin_directories_SUITE_beam_files",
testonly = True,
Expand Down
5 changes: 5 additions & 0 deletions deps/rabbit_common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ rabbitmq_suite(
],
)

rabbitmq_suite(
name = "unit_pg_local_SUITE",
size = "small",
)

rabbitmq_suite(
name = "unit_priority_queue_SUITE",
size = "small",
Expand Down
11 changes: 11 additions & 0 deletions deps/rabbit_common/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def all_beam_files(name = "all_beam_files"):
"src/file_handle_cache_stats.erl",
"src/mirrored_supervisor_locks.erl",
"src/mnesia_sync.erl",
"src/pg_local.erl",
"src/pmon.erl",
"src/priority_queue.erl",
"src/rabbit_amqp_connection.erl",
Expand Down Expand Up @@ -127,6 +128,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/file_handle_cache_stats.erl",
"src/mirrored_supervisor_locks.erl",
"src/mnesia_sync.erl",
"src/pg_local.erl",
"src/pmon.erl",
"src/priority_queue.erl",
"src/rabbit_amqp_connection.erl",
Expand Down Expand Up @@ -215,6 +217,7 @@ def all_srcs(name = "all_srcs"):
"src/gen_server2.erl",
"src/mirrored_supervisor_locks.erl",
"src/mnesia_sync.erl",
"src/pg_local.erl",
"src/pmon.erl",
"src/priority_queue.erl",
"src/rabbit_amqp_connection.erl",
Expand Down Expand Up @@ -348,6 +351,14 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["@proper//:erlang_app"],
)
erlang_bytecode(
name = "unit_pg_local_SUITE_beam_files",
testonly = True,
srcs = ["test/unit_pg_local_SUITE.erl"],
outs = ["test/unit_pg_local_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
)
erlang_bytecode(
name = "unit_priority_queue_SUITE_beam_files",
testonly = True,
Expand Down
File renamed without changes.
File renamed without changes.
30 changes: 14 additions & 16 deletions deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
%%
-module(rabbit_amqp1_0).

-export([connection_info_local/1,
emit_connection_info_local/3,
-export([emit_connection_info_local/3,
emit_connection_info_all/4,
list/0]).
list/0,
register_connection/1,
unregister_connection/1]).

emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
Pids = [spawn_link(Node, rabbit_amqp1_0, emit_connection_info_local,
Expand All @@ -26,17 +27,14 @@ emit_connection_info_local(Items, Ref, AggregatorPid) ->
end,
list()).

connection_info_local(Items) ->
Connections = list(),
[rabbit_amqp1_0_reader:info(Pid, Items) || Pid <- Connections].

-spec list() -> [pid()].
list() ->
[ReaderPid
|| {_, TcpPid, _, [tcp_listener_sup]} <- supervisor:which_children(rabbit_sup),
{_, RanchEPid, _, [ranch_embedded_sup]} <- supervisor:which_children(TcpPid),
{_, RanchLPid, _, [ranch_listener_sup]} <- supervisor:which_children(RanchEPid),
{_, RanchCSPid, _, [ranch_conns_sup_sup]} <- supervisor:which_children(RanchLPid),
{_, RanchCPid, _, [ranch_conns_sup]} <- supervisor:which_children(RanchCSPid),
{rabbit_connection_sup, ConnPid, _, _} <- supervisor:which_children(RanchCPid),
{reader, ReaderPid, _, _} <- supervisor:which_children(ConnPid)
].
pg_local:get_members(rabbit_amqp10_connections).

-spec register_connection(pid()) -> ok.
register_connection(Pid) ->
pg_local:join(rabbit_amqp10_connections, Pid).

-spec unregister_connection(pid()) -> ok.
unregister_connection(Pid) ->
pg_local:leave(rabbit_amqp10_connections, Pid).
5 changes: 4 additions & 1 deletion deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,12 @@ update_last_blocked_by(Throttle) ->

close_connection(State = #v1{connection = #v1_connection{
timeout_sec = TimeoutSec}}) ->
Pid = self(),
erlang:send_after((if TimeoutSec > 0 andalso
TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec;
true -> ?CLOSING_TIMEOUT
end) * 1000, self(), terminate_connection),
end) * 1000, Pid, terminate_connection),
rabbit_amqp1_0:unregister_connection(Pid),
State#v1{connection_state = closed}.

handle_dependent_exit(ChPid, Reason, State) ->
Expand Down Expand Up @@ -434,6 +436,7 @@ handle_1_0_connection_frame(#'v1_0.open'{ max_frame_size = ClientFrameMax,
container_id = {utf8, rabbit_nodes:cluster_name()},
properties = server_properties()}),
Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
rabbit_amqp1_0:register_connection(self()),
control_throttle(
State1#v1{throttle = Throttle#throttle{alarmed_by = Conserve}});

Expand Down
8 changes: 7 additions & 1 deletion deps/rabbitmq_management_agent/src/rabbit_mgmt_data.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@
created_stats(Name, Type) ->
case ets:select(Type, [{{'_', '$2', '$3'}, [{'==', Name, '$2'}], ['$3']}]) of
[] -> not_found;
[Elem] -> Elem
[Elem] -> Elem;
L when is_list(L) ->
%% Note: https://github.com/rabbitmq/rabbitmq-server/issues/9371
%% This special case is due to the fact that AMQP 1.0 connections
%% can create multiple entries in connection_created_stats with the
%% same `Name` value.
hd(L)
end.

created_stats(Type) ->
Expand Down
2 changes: 2 additions & 0 deletions deps/rabbitmq_management_agent/src/rabbit_mgmt_format.erl
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ format_args({arguments, Value}) ->
format_args(Stat) ->
Stat.

format_connection_created({authz_backends, Value}) ->
{authz_backends, print("~tp", Value)};
format_connection_created({host, Value}) ->
{host, addr(Value)};
format_connection_created({peer_host, Value}) ->
Expand Down

0 comments on commit fb57043

Please sign in to comment.