From fb570438297562f34e5ff8abfaf01174175c0333 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Tue, 12 Sep 2023 05:19:57 -0700 Subject: [PATCH] Use pg_local to track AMQP 1.0 connections Fixes #9371 Moves `pg_local` to `rabbit_common` --- deps/rabbit/BUILD.bazel | 5 ---- deps/rabbit/app.bzl | 11 ------- deps/rabbit_common/BUILD.bazel | 5 ++++ deps/rabbit_common/app.bzl | 11 +++++++ .../src/pg_local.erl | 0 .../test/unit_pg_local_SUITE.erl | 0 deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0.erl | 30 +++++++++---------- .../src/rabbit_amqp1_0_reader.erl | 5 +++- .../src/rabbit_mgmt_data.erl | 8 ++++- .../src/rabbit_mgmt_format.erl | 2 ++ 10 files changed, 43 insertions(+), 34 deletions(-) rename deps/{rabbit => rabbit_common}/src/pg_local.erl (100%) rename deps/{rabbit => rabbit_common}/test/unit_pg_local_SUITE.erl (100%) diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 0134351bfda2..745a11081f9e 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -1004,11 +1004,6 @@ rabbitmq_suite( ], ) -rabbitmq_suite( - name = "unit_pg_local_SUITE", - size = "small", -) - rabbitmq_suite( name = "unit_plugin_directories_SUITE", size = "small", diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index 7432ed8694ae..5af1030efb18 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -41,7 +41,6 @@ def all_beam_files(name = "all_beam_files"): "src/mc_util.erl", "src/mirrored_supervisor.erl", "src/mirrored_supervisor_sups.erl", - "src/pg_local.erl", "src/pid_recomposition.erl", "src/rabbit.erl", "src/rabbit_access_control.erl", @@ -292,7 +291,6 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/mc_util.erl", "src/mirrored_supervisor.erl", "src/mirrored_supervisor_sups.erl", - "src/pg_local.erl", "src/pid_recomposition.erl", "src/rabbit.erl", "src/rabbit_access_control.erl", @@ -555,7 +553,6 @@ def all_srcs(name = "all_srcs"): "src/mc_util.erl", "src/mirrored_supervisor.erl", "src/mirrored_supervisor_sups.erl", - "src/pg_local.erl", "src/pid_recomposition.erl", "src/rabbit.erl", "src/rabbit_access_control.erl", @@ -1768,14 +1765,6 @@ def test_suite_beam_files(name = "test_suite_beam_files"): erlc_opts = "//:test_erlc_opts", deps = ["//deps/rabbit_common:erlang_app"], ) - erlang_bytecode( - name = "unit_pg_local_SUITE_beam_files", - testonly = True, - srcs = ["test/unit_pg_local_SUITE.erl"], - outs = ["test/unit_pg_local_SUITE.beam"], - app_name = "rabbit", - erlc_opts = "//:test_erlc_opts", - ) erlang_bytecode( name = "unit_plugin_directories_SUITE_beam_files", testonly = True, diff --git a/deps/rabbit_common/BUILD.bazel b/deps/rabbit_common/BUILD.bazel index 687aedc6a81c..5d525a130a3a 100644 --- a/deps/rabbit_common/BUILD.bazel +++ b/deps/rabbit_common/BUILD.bazel @@ -170,6 +170,11 @@ rabbitmq_suite( ], ) +rabbitmq_suite( + name = "unit_pg_local_SUITE", + size = "small", +) + rabbitmq_suite( name = "unit_priority_queue_SUITE", size = "small", diff --git a/deps/rabbit_common/app.bzl b/deps/rabbit_common/app.bzl index ac112ef6043c..3cc311d5d350 100644 --- a/deps/rabbit_common/app.bzl +++ b/deps/rabbit_common/app.bzl @@ -32,6 +32,7 @@ def all_beam_files(name = "all_beam_files"): "src/file_handle_cache_stats.erl", "src/mirrored_supervisor_locks.erl", "src/mnesia_sync.erl", + "src/pg_local.erl", "src/pmon.erl", "src/priority_queue.erl", "src/rabbit_amqp_connection.erl", @@ -127,6 +128,7 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/file_handle_cache_stats.erl", "src/mirrored_supervisor_locks.erl", "src/mnesia_sync.erl", + "src/pg_local.erl", "src/pmon.erl", "src/priority_queue.erl", "src/rabbit_amqp_connection.erl", @@ -215,6 +217,7 @@ def all_srcs(name = "all_srcs"): "src/gen_server2.erl", "src/mirrored_supervisor_locks.erl", "src/mnesia_sync.erl", + "src/pg_local.erl", "src/pmon.erl", "src/priority_queue.erl", "src/rabbit_amqp_connection.erl", @@ -348,6 +351,14 @@ def test_suite_beam_files(name = "test_suite_beam_files"): erlc_opts = "//:test_erlc_opts", deps = ["@proper//:erlang_app"], ) + erlang_bytecode( + name = "unit_pg_local_SUITE_beam_files", + testonly = True, + srcs = ["test/unit_pg_local_SUITE.erl"], + outs = ["test/unit_pg_local_SUITE.beam"], + app_name = "rabbit", + erlc_opts = "//:test_erlc_opts", + ) erlang_bytecode( name = "unit_priority_queue_SUITE_beam_files", testonly = True, diff --git a/deps/rabbit/src/pg_local.erl b/deps/rabbit_common/src/pg_local.erl similarity index 100% rename from deps/rabbit/src/pg_local.erl rename to deps/rabbit_common/src/pg_local.erl diff --git a/deps/rabbit/test/unit_pg_local_SUITE.erl b/deps/rabbit_common/test/unit_pg_local_SUITE.erl similarity index 100% rename from deps/rabbit/test/unit_pg_local_SUITE.erl rename to deps/rabbit_common/test/unit_pg_local_SUITE.erl diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0.erl index dc8e955c5aaa..72c1e9ac432f 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0.erl @@ -6,10 +6,11 @@ %% -module(rabbit_amqp1_0). --export([connection_info_local/1, - emit_connection_info_local/3, +-export([emit_connection_info_local/3, emit_connection_info_all/4, - list/0]). + list/0, + register_connection/1, + unregister_connection/1]). emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) -> Pids = [spawn_link(Node, rabbit_amqp1_0, emit_connection_info_local, @@ -26,17 +27,14 @@ emit_connection_info_local(Items, Ref, AggregatorPid) -> end, list()). -connection_info_local(Items) -> - Connections = list(), - [rabbit_amqp1_0_reader:info(Pid, Items) || Pid <- Connections]. - +-spec list() -> [pid()]. list() -> - [ReaderPid - || {_, TcpPid, _, [tcp_listener_sup]} <- supervisor:which_children(rabbit_sup), - {_, RanchEPid, _, [ranch_embedded_sup]} <- supervisor:which_children(TcpPid), - {_, RanchLPid, _, [ranch_listener_sup]} <- supervisor:which_children(RanchEPid), - {_, RanchCSPid, _, [ranch_conns_sup_sup]} <- supervisor:which_children(RanchLPid), - {_, RanchCPid, _, [ranch_conns_sup]} <- supervisor:which_children(RanchCSPid), - {rabbit_connection_sup, ConnPid, _, _} <- supervisor:which_children(RanchCPid), - {reader, ReaderPid, _, _} <- supervisor:which_children(ConnPid) - ]. + pg_local:get_members(rabbit_amqp10_connections). + +-spec register_connection(pid()) -> ok. +register_connection(Pid) -> + pg_local:join(rabbit_amqp10_connections, Pid). + +-spec unregister_connection(pid()) -> ok. +unregister_connection(Pid) -> + pg_local:leave(rabbit_amqp10_connections, Pid). diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl index e2f24c7e00ad..be985e2adc6e 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl @@ -238,10 +238,12 @@ update_last_blocked_by(Throttle) -> close_connection(State = #v1{connection = #v1_connection{ timeout_sec = TimeoutSec}}) -> + Pid = self(), erlang:send_after((if TimeoutSec > 0 andalso TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec; true -> ?CLOSING_TIMEOUT - end) * 1000, self(), terminate_connection), + end) * 1000, Pid, terminate_connection), + rabbit_amqp1_0:unregister_connection(Pid), State#v1{connection_state = closed}. handle_dependent_exit(ChPid, Reason, State) -> @@ -434,6 +436,7 @@ handle_1_0_connection_frame(#'v1_0.open'{ max_frame_size = ClientFrameMax, container_id = {utf8, rabbit_nodes:cluster_name()}, properties = server_properties()}), Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), + rabbit_amqp1_0:register_connection(self()), control_throttle( State1#v1{throttle = Throttle#throttle{alarmed_by = Conserve}}); diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_data.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_data.erl index 9cf1d0f78f6f..ad924ddeeccd 100644 --- a/deps/rabbitmq_management_agent/src/rabbit_mgmt_data.erl +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_data.erl @@ -46,7 +46,13 @@ created_stats(Name, Type) -> case ets:select(Type, [{{'_', '$2', '$3'}, [{'==', Name, '$2'}], ['$3']}]) of [] -> not_found; - [Elem] -> Elem + [Elem] -> Elem; + L when is_list(L) -> + %% Note: https://github.com/rabbitmq/rabbitmq-server/issues/9371 + %% This special case is due to the fact that AMQP 1.0 connections + %% can create multiple entries in connection_created_stats with the + %% same `Name` value. + hd(L) end. created_stats(Type) -> diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_format.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_format.erl index 5a333801ea24..7672a3f51d39 100644 --- a/deps/rabbitmq_management_agent/src/rabbit_mgmt_format.erl +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_format.erl @@ -91,6 +91,8 @@ format_args({arguments, Value}) -> format_args(Stat) -> Stat. +format_connection_created({authz_backends, Value}) -> + {authz_backends, print("~tp", Value)}; format_connection_created({host, Value}) -> {host, addr(Value)}; format_connection_created({peer_host, Value}) ->