From 141a184151f0e415e68d78165f2402c1f7649d89 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Tue, 12 Sep 2023 05:19:57 -0700 Subject: [PATCH] Use pg_local to track AMQP 1.0 connections Fixes #9371 Moves `pg_local` to `rabbit_common` --- deps/amqp_client/src/amqp_connection.erl | 10 +++++-- .../src/amqp_direct_connection.erl | 28 +++++++++++------ deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0.erl | 30 +++++++++---------- .../src/rabbit_amqp1_0_reader.erl | 5 +++- .../src/rabbit_amqp1_0_session_process.erl | 2 +- .../src/rabbit_amqp1_0_session_sup.erl | 6 ++-- .../src/rabbit_mgmt_format.erl | 2 ++ 7 files changed, 51 insertions(+), 32 deletions(-) diff --git a/deps/amqp_client/src/amqp_connection.erl b/deps/amqp_client/src/amqp_connection.erl index 9f8c6ddb92ed..4681147db15c 100644 --- a/deps/amqp_client/src/amqp_connection.erl +++ b/deps/amqp_client/src/amqp_connection.erl @@ -65,7 +65,8 @@ -export([error_atom/1]). -export([info/2, info_keys/1, info_keys/0]). -export([connection_name/1, update_secret/3]). --export([socket_adapter_info/2]). +-export([socket_adapter_info/2, + socket_adapter_info/3]). -define(DEFAULT_CONSUMER, {amqp_selective_consumer, []}). @@ -379,7 +380,12 @@ info_keys() -> %% @doc Takes a socket and a protocol, returns an #amqp_adapter_info{} %% based on the socket for the protocol given. socket_adapter_info(Sock, Protocol) -> - amqp_direct_connection:socket_adapter_info(Sock, Protocol). + socket_adapter_info(Sock, Protocol, undefined). + +%% @doc Takes a socket and a protocol, returns an #amqp_adapter_info{} +%% based on the socket for the protocol given. +socket_adapter_info(Sock, Protocol, UniqueId) -> + amqp_direct_connection:socket_adapter_info(Sock, Protocol, UniqueId). %% @spec (ConnectionPid) -> ConnectionName %% where diff --git a/deps/amqp_client/src/amqp_direct_connection.erl b/deps/amqp_client/src/amqp_direct_connection.erl index 2b4b637b5e13..6cfc65dd7635 100644 --- a/deps/amqp_client/src/amqp_direct_connection.erl +++ b/deps/amqp_client/src/amqp_direct_connection.erl @@ -17,7 +17,8 @@ -export([init/0, terminate/2, connect/4, do/2, open_channel_args/1, i/2, info_keys/0, handle_message/2, closing/3, channels_terminated/1]). --export([socket_adapter_info/2]). +-export([socket_adapter_info/2, + socket_adapter_info/3]). -record(state, {node, user, @@ -176,17 +177,26 @@ ensure_adapter_info(A = #amqp_adapter_info{name = unknown}) -> ensure_adapter_info(Info) -> Info. socket_adapter_info(Sock, Protocol) -> + socket_adapter_info(Sock, Protocol, undefined). + +socket_adapter_info(Sock, Protocol, UniqueId) -> {PeerHost, PeerPort, Host, Port} = - case rabbit_net:socket_ends(Sock, inbound) of - {ok, Res} -> Res; - _ -> {unknown, unknown, unknown, unknown} - end, - Name = case rabbit_net:connection_string(Sock, inbound) of - {ok, Res1} -> Res1; - _Error -> "(unknown)" + case rabbit_net:socket_ends(Sock, inbound) of + {ok, Res} -> Res; + _ -> {unknown, unknown, unknown, unknown} + end, + ConnectionString = case rabbit_net:connection_string(Sock, inbound) of + {ok, Res1} -> Res1; + _Error -> "(unknown)" + end, + Name = case UniqueId of + undefined -> + rabbit_data_coercion:to_binary(ConnectionString); + _ -> + rabbit_data_coercion:to_binary(rabbit_misc:format("~s (~tp)", [ConnectionString, UniqueId])) end, #amqp_adapter_info{protocol = Protocol, - name = list_to_binary(Name), + name = Name, host = Host, port = Port, peer_host = PeerHost, diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0.erl index dc8e955c5aaa..72c1e9ac432f 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0.erl @@ -6,10 +6,11 @@ %% -module(rabbit_amqp1_0). --export([connection_info_local/1, - emit_connection_info_local/3, +-export([emit_connection_info_local/3, emit_connection_info_all/4, - list/0]). + list/0, + register_connection/1, + unregister_connection/1]). emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) -> Pids = [spawn_link(Node, rabbit_amqp1_0, emit_connection_info_local, @@ -26,17 +27,14 @@ emit_connection_info_local(Items, Ref, AggregatorPid) -> end, list()). -connection_info_local(Items) -> - Connections = list(), - [rabbit_amqp1_0_reader:info(Pid, Items) || Pid <- Connections]. - +-spec list() -> [pid()]. list() -> - [ReaderPid - || {_, TcpPid, _, [tcp_listener_sup]} <- supervisor:which_children(rabbit_sup), - {_, RanchEPid, _, [ranch_embedded_sup]} <- supervisor:which_children(TcpPid), - {_, RanchLPid, _, [ranch_listener_sup]} <- supervisor:which_children(RanchEPid), - {_, RanchCSPid, _, [ranch_conns_sup_sup]} <- supervisor:which_children(RanchLPid), - {_, RanchCPid, _, [ranch_conns_sup]} <- supervisor:which_children(RanchCSPid), - {rabbit_connection_sup, ConnPid, _, _} <- supervisor:which_children(RanchCPid), - {reader, ReaderPid, _, _} <- supervisor:which_children(ConnPid) - ]. + pg_local:get_members(rabbit_amqp10_connections). + +-spec register_connection(pid()) -> ok. +register_connection(Pid) -> + pg_local:join(rabbit_amqp10_connections, Pid). + +-spec unregister_connection(pid()) -> ok. +unregister_connection(Pid) -> + pg_local:leave(rabbit_amqp10_connections, Pid). diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl index e2f24c7e00ad..be985e2adc6e 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl @@ -238,10 +238,12 @@ update_last_blocked_by(Throttle) -> close_connection(State = #v1{connection = #v1_connection{ timeout_sec = TimeoutSec}}) -> + Pid = self(), erlang:send_after((if TimeoutSec > 0 andalso TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec; true -> ?CLOSING_TIMEOUT - end) * 1000, self(), terminate_connection), + end) * 1000, Pid, terminate_connection), + rabbit_amqp1_0:unregister_connection(Pid), State#v1{connection_state = closed}. handle_dependent_exit(ChPid, Reason, State) -> @@ -434,6 +436,7 @@ handle_1_0_connection_frame(#'v1_0.open'{ max_frame_size = ClientFrameMax, container_id = {utf8, rabbit_nodes:cluster_name()}, properties = server_properties()}), Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), + rabbit_amqp1_0:register_connection(self()), control_throttle( State1#v1{throttle = Throttle#throttle{alarmed_by = Conserve}}); diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl index 0191ceaa6eca..fc9574c3ef18 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl @@ -39,7 +39,7 @@ init({Channel, ReaderPid, WriterPid, #user{username = Username}, VHost, case amqp_connection:start( #amqp_params_direct{username = Username, virtual_host = VHost, - adapter_info = AdapterInfo}) of + adapter_info = AdapterInfo) of {ok, Conn} -> case amqp_connection:open_channel(Conn) of {ok, Ch} -> diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_sup.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_sup.erl index c5fc1f035fcd..8ef40a0f7d79 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_sup.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_sup.erl @@ -62,7 +62,7 @@ start_link({amqp10_framing, Sock, Channel, FrameMax, ReaderPid, start => {rabbit_amqp1_0_session_process, start_link, [ {Channel, ReaderPid, WriterPid, User, VHost, FrameMax, - adapter_info(User, SocketForAdapterInfo), Collector} + adapter_info(User, SocketForAdapterInfo, Channel), Collector} ]}, restart => transient, significant => true, @@ -98,7 +98,7 @@ init([]) -> %% See rabbit_direct.erl to see how `authz_bakends` is propagated from % amqp_adapter_info.additional_info to the rabbit_access_control module -adapter_info(User, Sock) -> - AdapterInfo = amqp_connection:socket_adapter_info(Sock, {'AMQP', "1.0"}), +adapter_info(User, Sock, UniqueId) -> + AdapterInfo = amqp_connection:socket_adapter_info(Sock, {'AMQP', "1.0"}, UniqueId), AdapterInfo#amqp_adapter_info{additional_info = AdapterInfo#amqp_adapter_info.additional_info ++ [{authz_backends, User#user.authz_backends}]}. diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_format.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_format.erl index 5a333801ea24..7672a3f51d39 100644 --- a/deps/rabbitmq_management_agent/src/rabbit_mgmt_format.erl +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_format.erl @@ -91,6 +91,8 @@ format_args({arguments, Value}) -> format_args(Stat) -> Stat. +format_connection_created({authz_backends, Value}) -> + {authz_backends, print("~tp", Value)}; format_connection_created({host, Value}) -> {host, addr(Value)}; format_connection_created({peer_host, Value}) ->