Skip to content

Commit

Permalink
Use pg_local to track AMQP 1.0 connections
Browse files Browse the repository at this point in the history
Fixes #9371

Since each AMQP 1.0 connection opens several direct AMQP connections, we
must assign each direct connection a unique name to prevent multiple
entries in the `connection_created_stats` table.

Also, use `pg_local` to track AMQP 1.0 connections instead of walking
the supervisor trees.

Nuke authz_backends from connection created event 💥

Fix regex for connection name because UniqueId is part of it now (channel number)
  • Loading branch information
lukebakken committed Sep 15, 2023
1 parent ed88e38 commit c94d22a
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 39 deletions.
10 changes: 8 additions & 2 deletions deps/amqp_client/src/amqp_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@
-export([error_atom/1]).
-export([info/2, info_keys/1, info_keys/0]).
-export([connection_name/1, update_secret/3]).
-export([socket_adapter_info/2]).
-export([socket_adapter_info/2,
socket_adapter_info/3]).

-define(DEFAULT_CONSUMER, {amqp_selective_consumer, []}).

Expand Down Expand Up @@ -379,7 +380,12 @@ info_keys() ->
%% @doc Takes a socket and a protocol, returns an #amqp_adapter_info{}
%% based on the socket for the protocol given.
socket_adapter_info(Sock, Protocol) ->
amqp_direct_connection:socket_adapter_info(Sock, Protocol).
socket_adapter_info(Sock, Protocol, undefined).

%% @doc Takes a socket and a protocol, returns an #amqp_adapter_info{}
%% based on the socket for the protocol given.
socket_adapter_info(Sock, Protocol, UniqueId) ->
amqp_direct_connection:socket_adapter_info(Sock, Protocol, UniqueId).

%% @spec (ConnectionPid) -> ConnectionName
%% where
Expand Down
28 changes: 19 additions & 9 deletions deps/amqp_client/src/amqp_direct_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
-export([init/0, terminate/2, connect/4, do/2, open_channel_args/1, i/2,
info_keys/0, handle_message/2, closing/3, channels_terminated/1]).

-export([socket_adapter_info/2]).
-export([socket_adapter_info/2,
socket_adapter_info/3]).

-record(state, {node,
user,
Expand Down Expand Up @@ -176,17 +177,26 @@ ensure_adapter_info(A = #amqp_adapter_info{name = unknown}) ->
ensure_adapter_info(Info) -> Info.

socket_adapter_info(Sock, Protocol) ->
socket_adapter_info(Sock, Protocol, undefined).

socket_adapter_info(Sock, Protocol, UniqueId) ->
{PeerHost, PeerPort, Host, Port} =
case rabbit_net:socket_ends(Sock, inbound) of
{ok, Res} -> Res;
_ -> {unknown, unknown, unknown, unknown}
end,
Name = case rabbit_net:connection_string(Sock, inbound) of
{ok, Res1} -> Res1;
_Error -> "(unknown)"
case rabbit_net:socket_ends(Sock, inbound) of
{ok, Res} -> Res;
_ -> {unknown, unknown, unknown, unknown}
end,
ConnectionString = case rabbit_net:connection_string(Sock, inbound) of
{ok, Res1} -> Res1;
_Error -> "(unknown)"
end,
Name = case UniqueId of
undefined ->
rabbit_data_coercion:to_binary(ConnectionString);
_ ->
rabbit_data_coercion:to_binary(rabbit_misc:format("~s (~tp)", [ConnectionString, UniqueId]))
end,
#amqp_adapter_info{protocol = Protocol,
name = list_to_binary(Name),
name = Name,
host = Host,
port = Port,
peer_host = PeerHost,
Expand Down
7 changes: 5 additions & 2 deletions deps/rabbit/test/rabbit_core_metrics_gc_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,13 @@ connection_metrics(Config) ->

DeadPid = rabbit_ct_broker_helpers:rpc(Config, A, ?MODULE, dead_pid, []),

Infos = [{info0, foo}, {info1, bar}, {info2, baz},
{authz_backends, [rabbit_auth_backend_oauth2,rabbit_auth_backend_http]}],

rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics,
connection_created, [DeadPid, infos]),
connection_created, [DeadPid, Infos]),
rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics,
connection_stats, [DeadPid, infos]),
connection_stats, [DeadPid, Infos]),
rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics,
connection_stats, [DeadPid, 1, 1, 1]),

Expand Down
16 changes: 14 additions & 2 deletions deps/rabbit_common/src/rabbit_core_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,9 @@ terminate() ->
|| {Table, _Type} <- ?CORE_TABLES ++ ?CORE_EXTRA_TABLES],
ok.

connection_created(Pid, Infos) ->
ets:insert(connection_created, {Pid, Infos}),
connection_created(Pid, Infos0) ->
Infos1 = maybe_cleanup_infos(Infos0),
ets:insert(connection_created, {Pid, Infos1}),
ets:update_counter(connection_churn_metrics, node(), {2, 1},
?CONNECTION_CHURN_METRICS),
ok.
Expand Down Expand Up @@ -446,3 +447,14 @@ format_auth_attempt({{RemoteAddress, Username, Protocol}, Total, Succeeded, Fail
format_auth_attempt({Protocol, Total, Succeeded, Failed}) ->
[{protocol, atom_to_binary(Protocol, utf8)}, {auth_attempts, Total},
{auth_attempts_failed, Failed}, {auth_attempts_succeeded, Succeeded}].

maybe_cleanup_infos(Infos0) when is_list(Infos0) ->
%% Note: authz_backends is added in rabbit_amqp1_0_session_sup:adapter_info/3
%% We delete it here, if present, because it should not be stored in the
%% connection_created table.
%%
%% TODO @ansd this will no longer be necessary once this PR is merged:
%% https://github.com/rabbitmq/rabbitmq-server/pull/9022
proplists:delete(authz_backends, Infos0);
maybe_cleanup_infos(Infos) ->
Infos.
30 changes: 14 additions & 16 deletions deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
%%
-module(rabbit_amqp1_0).

-export([connection_info_local/1,
emit_connection_info_local/3,
-export([emit_connection_info_local/3,
emit_connection_info_all/4,
list/0]).
list/0,
register_connection/1,
unregister_connection/1]).

emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
Pids = [spawn_link(Node, rabbit_amqp1_0, emit_connection_info_local,
Expand All @@ -26,17 +27,14 @@ emit_connection_info_local(Items, Ref, AggregatorPid) ->
end,
list()).

connection_info_local(Items) ->
Connections = list(),
[rabbit_amqp1_0_reader:info(Pid, Items) || Pid <- Connections].

-spec list() -> [pid()].
list() ->
[ReaderPid
|| {_, TcpPid, _, [tcp_listener_sup]} <- supervisor:which_children(rabbit_sup),
{_, RanchEPid, _, [ranch_embedded_sup]} <- supervisor:which_children(TcpPid),
{_, RanchLPid, _, [ranch_listener_sup]} <- supervisor:which_children(RanchEPid),
{_, RanchCSPid, _, [ranch_conns_sup_sup]} <- supervisor:which_children(RanchLPid),
{_, RanchCPid, _, [ranch_conns_sup]} <- supervisor:which_children(RanchCSPid),
{rabbit_connection_sup, ConnPid, _, _} <- supervisor:which_children(RanchCPid),
{reader, ReaderPid, _, _} <- supervisor:which_children(ConnPid)
].
pg_local:get_members(rabbit_amqp10_connections).

-spec register_connection(pid()) -> ok.
register_connection(Pid) ->
pg_local:join(rabbit_amqp10_connections, Pid).

-spec unregister_connection(pid()) -> ok.
unregister_connection(Pid) ->
pg_local:leave(rabbit_amqp10_connections, Pid).
5 changes: 4 additions & 1 deletion deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,12 @@ update_last_blocked_by(Throttle) ->

close_connection(State = #v1{connection = #v1_connection{
timeout_sec = TimeoutSec}}) ->
Pid = self(),
erlang:send_after((if TimeoutSec > 0 andalso
TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec;
true -> ?CLOSING_TIMEOUT
end) * 1000, self(), terminate_connection),
end) * 1000, Pid, terminate_connection),
rabbit_amqp1_0:unregister_connection(Pid),
State#v1{connection_state = closed}.

handle_dependent_exit(ChPid, Reason, State) ->
Expand Down Expand Up @@ -434,6 +436,7 @@ handle_1_0_connection_frame(#'v1_0.open'{ max_frame_size = ClientFrameMax,
container_id = {utf8, rabbit_nodes:cluster_name()},
properties = server_properties()}),
Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
rabbit_amqp1_0:register_connection(self()),
control_throttle(
State1#v1{throttle = Throttle#throttle{alarmed_by = Conserve}});

Expand Down
6 changes: 3 additions & 3 deletions deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ start_link({amqp10_framing, Sock, Channel, FrameMax, ReaderPid,
start =>
{rabbit_amqp1_0_session_process, start_link, [
{Channel, ReaderPid, WriterPid, User, VHost, FrameMax,
adapter_info(User, SocketForAdapterInfo), Collector}
adapter_info(User, SocketForAdapterInfo, Channel), Collector}
]},
restart => transient,
significant => true,
Expand Down Expand Up @@ -98,7 +98,7 @@ init([]) ->
%% See rabbit_direct.erl to see how `authz_bakends` is propagated from
% amqp_adapter_info.additional_info to the rabbit_access_control module

adapter_info(User, Sock) ->
AdapterInfo = amqp_connection:socket_adapter_info(Sock, {'AMQP', "1.0"}),
adapter_info(User, Sock, UniqueId) ->
AdapterInfo = amqp_connection:socket_adapter_info(Sock, {'AMQP', "1.0"}, UniqueId),
AdapterInfo#amqp_adapter_info{additional_info =
AdapterInfo#amqp_adapter_info.additional_info ++ [{authz_backends, User#user.authz_backends}]}.
10 changes: 6 additions & 4 deletions deps/rabbitmq_amqp1_0/test/proxy_protocol_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ proxy_protocol_v1(Config) ->
{ok, _Packet} = gen_tcp:recv(Socket, 0, ?TIMEOUT),
ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, connection_name, []),
match = re:run(ConnectionName, <<"^192.168.1.1:80 -> 192.168.1.2:81$">>, [{capture, none}]),
match = re:run(ConnectionName, <<"^192.168.1.1:80 -> 192.168.1.2:81 \\(\\d\\)">>, [{capture, none}]),
gen_tcp:close(Socket),
ok.

Expand All @@ -82,7 +82,7 @@ proxy_protocol_v1_tls(Config) ->
timer:sleep(1000),
ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, connection_name, []),
match = re:run(ConnectionName, <<"^192.168.1.1:80 -> 192.168.1.2:81$">>, [{capture, none}]),
match = re:run(ConnectionName, <<"^192.168.1.1:80 -> 192.168.1.2:81 \\(\\d\\)$">>, [{capture, none}]),
gen_tcp:close(Socket),
ok.

Expand All @@ -100,7 +100,7 @@ proxy_protocol_v2_local(Config) ->
{ok, _Packet} = gen_tcp:recv(Socket, 0, ?TIMEOUT),
ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, connection_name, []),
match = re:run(ConnectionName, <<"^127.0.0.1:\\d+ -> 127.0.0.1:\\d+$">>, [{capture, none}]),
match = re:run(ConnectionName, <<"^127.0.0.1:\\d+ -> 127.0.0.1:\\d+ \\(\\d\\)$">>, [{capture, none}]),
gen_tcp:close(Socket),
ok.

Expand Down Expand Up @@ -144,7 +144,9 @@ connection_name() ->
end.

connection_registered() ->
length(ets:tab2list(connection_created)) > 0.
I = ets:info(connection_created),
Size = proplists:get_value(size, I),
Size > 0.

retry(_Function, 0) ->
false;
Expand Down

0 comments on commit c94d22a

Please sign in to comment.