Skip to content

Commit

Permalink
Use pg_local to track AMQP 1.0 connections
Browse files Browse the repository at this point in the history
Fixes #9371

Moves `pg_local` to `rabbit_common`
  • Loading branch information
lukebakken committed Sep 12, 2023
1 parent c30ae67 commit 141a184
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 32 deletions.
10 changes: 8 additions & 2 deletions deps/amqp_client/src/amqp_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@
-export([error_atom/1]).
-export([info/2, info_keys/1, info_keys/0]).
-export([connection_name/1, update_secret/3]).
-export([socket_adapter_info/2]).
-export([socket_adapter_info/2,
socket_adapter_info/3]).

-define(DEFAULT_CONSUMER, {amqp_selective_consumer, []}).

Expand Down Expand Up @@ -379,7 +380,12 @@ info_keys() ->
%% @doc Takes a socket and a protocol, returns an #amqp_adapter_info{}
%% based on the socket for the protocol given.
socket_adapter_info(Sock, Protocol) ->
amqp_direct_connection:socket_adapter_info(Sock, Protocol).
socket_adapter_info(Sock, Protocol, undefined).

%% @doc Takes a socket and a protocol, returns an #amqp_adapter_info{}
%% based on the socket for the protocol given.
socket_adapter_info(Sock, Protocol, UniqueId) ->
amqp_direct_connection:socket_adapter_info(Sock, Protocol, UniqueId).

%% @spec (ConnectionPid) -> ConnectionName
%% where
Expand Down
28 changes: 19 additions & 9 deletions deps/amqp_client/src/amqp_direct_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
-export([init/0, terminate/2, connect/4, do/2, open_channel_args/1, i/2,
info_keys/0, handle_message/2, closing/3, channels_terminated/1]).

-export([socket_adapter_info/2]).
-export([socket_adapter_info/2,
socket_adapter_info/3]).

-record(state, {node,
user,
Expand Down Expand Up @@ -176,17 +177,26 @@ ensure_adapter_info(A = #amqp_adapter_info{name = unknown}) ->
ensure_adapter_info(Info) -> Info.

socket_adapter_info(Sock, Protocol) ->
socket_adapter_info(Sock, Protocol, undefined).

socket_adapter_info(Sock, Protocol, UniqueId) ->
{PeerHost, PeerPort, Host, Port} =
case rabbit_net:socket_ends(Sock, inbound) of
{ok, Res} -> Res;
_ -> {unknown, unknown, unknown, unknown}
end,
Name = case rabbit_net:connection_string(Sock, inbound) of
{ok, Res1} -> Res1;
_Error -> "(unknown)"
case rabbit_net:socket_ends(Sock, inbound) of
{ok, Res} -> Res;
_ -> {unknown, unknown, unknown, unknown}
end,
ConnectionString = case rabbit_net:connection_string(Sock, inbound) of
{ok, Res1} -> Res1;
_Error -> "(unknown)"
end,
Name = case UniqueId of
undefined ->
rabbit_data_coercion:to_binary(ConnectionString);
_ ->
rabbit_data_coercion:to_binary(rabbit_misc:format("~s (~tp)", [ConnectionString, UniqueId]))
end,
#amqp_adapter_info{protocol = Protocol,
name = list_to_binary(Name),
name = Name,
host = Host,
port = Port,
peer_host = PeerHost,
Expand Down
30 changes: 14 additions & 16 deletions deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
%%
-module(rabbit_amqp1_0).

-export([connection_info_local/1,
emit_connection_info_local/3,
-export([emit_connection_info_local/3,
emit_connection_info_all/4,
list/0]).
list/0,
register_connection/1,
unregister_connection/1]).

emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
Pids = [spawn_link(Node, rabbit_amqp1_0, emit_connection_info_local,
Expand All @@ -26,17 +27,14 @@ emit_connection_info_local(Items, Ref, AggregatorPid) ->
end,
list()).

connection_info_local(Items) ->
Connections = list(),
[rabbit_amqp1_0_reader:info(Pid, Items) || Pid <- Connections].

-spec list() -> [pid()].
list() ->
[ReaderPid
|| {_, TcpPid, _, [tcp_listener_sup]} <- supervisor:which_children(rabbit_sup),
{_, RanchEPid, _, [ranch_embedded_sup]} <- supervisor:which_children(TcpPid),
{_, RanchLPid, _, [ranch_listener_sup]} <- supervisor:which_children(RanchEPid),
{_, RanchCSPid, _, [ranch_conns_sup_sup]} <- supervisor:which_children(RanchLPid),
{_, RanchCPid, _, [ranch_conns_sup]} <- supervisor:which_children(RanchCSPid),
{rabbit_connection_sup, ConnPid, _, _} <- supervisor:which_children(RanchCPid),
{reader, ReaderPid, _, _} <- supervisor:which_children(ConnPid)
].
pg_local:get_members(rabbit_amqp10_connections).

-spec register_connection(pid()) -> ok.
register_connection(Pid) ->
pg_local:join(rabbit_amqp10_connections, Pid).

-spec unregister_connection(pid()) -> ok.
unregister_connection(Pid) ->
pg_local:leave(rabbit_amqp10_connections, Pid).
5 changes: 4 additions & 1 deletion deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,12 @@ update_last_blocked_by(Throttle) ->

close_connection(State = #v1{connection = #v1_connection{
timeout_sec = TimeoutSec}}) ->
Pid = self(),
erlang:send_after((if TimeoutSec > 0 andalso
TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec;
true -> ?CLOSING_TIMEOUT
end) * 1000, self(), terminate_connection),
end) * 1000, Pid, terminate_connection),
rabbit_amqp1_0:unregister_connection(Pid),
State#v1{connection_state = closed}.

handle_dependent_exit(ChPid, Reason, State) ->
Expand Down Expand Up @@ -434,6 +436,7 @@ handle_1_0_connection_frame(#'v1_0.open'{ max_frame_size = ClientFrameMax,
container_id = {utf8, rabbit_nodes:cluster_name()},
properties = server_properties()}),
Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
rabbit_amqp1_0:register_connection(self()),
control_throttle(
State1#v1{throttle = Throttle#throttle{alarmed_by = Conserve}});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ init({Channel, ReaderPid, WriterPid, #user{username = Username}, VHost,
case amqp_connection:start(
#amqp_params_direct{username = Username,
virtual_host = VHost,
adapter_info = AdapterInfo}) of
adapter_info = AdapterInfo) of
{ok, Conn} ->
case amqp_connection:open_channel(Conn) of
{ok, Ch} ->
Expand Down
6 changes: 3 additions & 3 deletions deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ start_link({amqp10_framing, Sock, Channel, FrameMax, ReaderPid,
start =>
{rabbit_amqp1_0_session_process, start_link, [
{Channel, ReaderPid, WriterPid, User, VHost, FrameMax,
adapter_info(User, SocketForAdapterInfo), Collector}
adapter_info(User, SocketForAdapterInfo, Channel), Collector}
]},
restart => transient,
significant => true,
Expand Down Expand Up @@ -98,7 +98,7 @@ init([]) ->
%% See rabbit_direct.erl to see how `authz_bakends` is propagated from
% amqp_adapter_info.additional_info to the rabbit_access_control module

adapter_info(User, Sock) ->
AdapterInfo = amqp_connection:socket_adapter_info(Sock, {'AMQP', "1.0"}),
adapter_info(User, Sock, UniqueId) ->
AdapterInfo = amqp_connection:socket_adapter_info(Sock, {'AMQP', "1.0"}, UniqueId),
AdapterInfo#amqp_adapter_info{additional_info =
AdapterInfo#amqp_adapter_info.additional_info ++ [{authz_backends, User#user.authz_backends}]}.
2 changes: 2 additions & 0 deletions deps/rabbitmq_management_agent/src/rabbit_mgmt_format.erl
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ format_args({arguments, Value}) ->
format_args(Stat) ->
Stat.

format_connection_created({authz_backends, Value}) ->
{authz_backends, print("~tp", Value)};
format_connection_created({host, Value}) ->
{host, addr(Value)};
format_connection_created({peer_host, Value}) ->
Expand Down

0 comments on commit 141a184

Please sign in to comment.