Skip to content

Commit

Permalink
Merge pull request #5935 from rabbitmq/mergify/bp/v3.10.x/pr-5933
Browse files Browse the repository at this point in the history
List amqp10 connections (backport #5881) (backport #5933)
  • Loading branch information
michaelklishin authored Oct 8, 2022
2 parents cf8f0b5 + 61a2070 commit d0dc925
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 17 deletions.
6 changes: 4 additions & 2 deletions deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ connection_info_local(Items) ->
list() ->
[ReaderPid
|| {_, TcpPid, _, [tcp_listener_sup]} <- supervisor:which_children(rabbit_sup),
{_, RanchLPid, _, [ranch_listener_sup]} <- supervisor:which_children(TcpPid),
{_, RanchCPid, _, [ranch_conns_sup]} <- supervisor:which_children(RanchLPid),
{_, RanchEPid, _, [ranch_embedded_sup]} <- supervisor:which_children(TcpPid),
{_, RanchLPid, _, [ranch_listener_sup]} <- supervisor:which_children(RanchEPid),
{_, RanchCSPid, _, [ranch_conns_sup_sup]} <- supervisor:which_children(RanchLPid),
{_, RanchCPid, _, [ranch_conns_sup]} <- supervisor:which_children(RanchCSPid),
{rabbit_connection_sup, ConnPid, _, _} <- supervisor:which_children(RanchCPid),
{reader, ReaderPid, _, _} <- supervisor:which_children(ConnPid)
].
9 changes: 6 additions & 3 deletions deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,9 @@ info(Pid, InfoItems) ->
UnknownItems -> throw({bad_argument, UnknownItems})
end.

info_internal(pid, #v1{}) -> self();
info_internal(connection, #v1{connection = Val}) ->
Val;
info_internal(node, #v1{}) -> node();
info_internal(auth_mechanism, #v1{connection = #v1_connection{auth_mechanism = none}}) ->
none;
Expand All @@ -763,11 +766,11 @@ info_internal(frame_max, #v1{connection = #v1_connection{frame_max = Val}}) ->
info_internal(timeout, #v1{connection = #v1_connection{timeout_sec = Val}}) ->
Val;
info_internal(user,
#v1{connection = #v1_connection{user = #user{username = none}}}) ->
'';
info_internal(username,
#v1{connection = #v1_connection{user = #user{username = Val}}}) ->
Val;
info_internal(user,
#v1{connection = #v1_connection{user = none}}) ->
'';
info_internal(state, #v1{connection_state = Val}) ->
Val;
info_internal(SockStat, S) when SockStat =:= recv_oct;
Expand Down
123 changes: 111 additions & 12 deletions deps/rabbitmq_amqp1_0/test/command_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_amqp1_0.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").

-define(COMMAND, 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListAmqp10ConnectionsCommand').

Expand All @@ -24,26 +24,36 @@ groups() ->
[
{non_parallel_tests, [], [
merge_defaults,
validate
]}
validate,
when_no_connections,
when_one_connection
]}
].

init_per_suite(Config) ->
Config1 = rabbit_ct_helpers:set_config(Config,
[{rmq_nodename_suffix, ?MODULE}]),
application:ensure_all_started(amqp10_client),
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config1,
rabbit_ct_broker_helpers:setup_steps()).
Config.

end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_broker_helpers:teardown_steps()).

init_per_group(_, Config) ->
Config.

init_per_group(Group, Config) ->
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
Config1 = rabbit_ct_helpers:set_config(
Config, [
{rmq_nodename_suffix, Suffix},
{amqp10_client_library, Group}
]),
rabbit_ct_helpers:run_setup_steps(
Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).

end_per_group(_, Config) ->
Config.
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).

init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
Expand All @@ -67,3 +77,92 @@ validate(_Config) ->
ok = ?COMMAND:validate([atom_to_binary(K, utf8) || K <- ?INFO_ITEMS], #{}),
{validation_failure,{bad_info_key,[other]}} =
?COMMAND:validate([<<"other">>], #{}).

when_no_connections(_Config) ->
[A] = rabbit_ct_broker_helpers:get_node_configs(_Config, nodename),
Opts = #{node => A, timeout => 2000, verbose => true},
[] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts)).

when_one_connection(_Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
false ->
[A] = rabbit_ct_broker_helpers:get_node_configs(_Config, nodename),
Opts = #{node => A, timeout => 2000, verbose => true},

[Connection,Sender] = open_amqp10_connection(_Config),

[_] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts)),
close_amqp10_connection(Connection, Sender);
true ->
{skip, "stream tests are skipped in mixed mode"}
end.

open_amqp10_connection(Config) ->
Host = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
QName = atom_to_binary(?FUNCTION_NAME, utf8),
Address = <<"/amq/queue/", QName/binary>>,
%% declare a quorum queue
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}),

% create a configuration map
OpnConf = #{address => Host,
port => Port,
container_id => atom_to_binary(?FUNCTION_NAME, utf8),
sasl => {plain, <<"guest">>, <<"guest">>}},

% ct:pal("opening connectoin with ~p", [OpnConf]),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session(Connection),
SenderLinkName = <<"test-sender">>,
{ok, Sender} = amqp10_client:attach_sender_link(Session,
SenderLinkName,
Address),

% wait for credit to be received
receive
{amqp10_event, {link, Sender, credited}} -> ok
after 2000 ->
exit(credited_timeout)
end,

OutMsg = amqp10_msg:new(<<"my-tag">>, <<"my-body">>, true),
ok = amqp10_client:send_msg(Sender, OutMsg),

flush("pre-receive"),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session,
<<"test-receiver">>,
Address),

% grant credit and drain
ok = amqp10_client:flow_link_credit(Receiver, 1, never, true),

% wait for a delivery
receive
{amqp10_msg, Receiver, _InMsg} -> ct:pal("Received amqp 1.0 message : ~w~n", [_InMsg]), ok
after 2000 ->
exit(delivery_timeout)
end,



[Connection, Sender].

flush(Prefix) ->
receive
Msg ->
ct:pal("~s flushed: ~w~n", [Prefix, Msg]),
flush(Prefix)
after 1 ->
ok
end.

close_amqp10_connection(Connection, Sender) ->
flush("final"),
ct:pal("Closing connection ~w~n", [Connection]),
ok = amqp10_client:detach_link(Sender),
ok = amqp10_client:close_connection(Connection),
ok.
44 changes: 44 additions & 0 deletions deps/rabbitmq_amqp1_0/test/system_SUITE_data/console/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using Amqp;
using Amqp.Framing;

// See https://aka.ms/new-console-template for more information
Console.WriteLine("Sending 10 messages to amqp10-queue. Press a button to terminate");

// Create the AMQP connection string
var connectionString = $"amqp://guest:guest@localhost:5672/%2F";

// Create the AMQP connection
var connection = new Connection(new Address(connectionString));

// Create the AMQP session
var amqpSession = new Session(connection);

// Give a name to the sender
var senderSubscriptionId = "rabbitmq.amqp.sender";

// Name of the topic you will be sending messages (Name of the Queue)
var topic = "amqp10-queue";

// Create the AMQP sender
var sender = new SenderLink(amqpSession, senderSubscriptionId, topic);

for (var i = 0; i < 10; i++)
{
// Create message
var message = new Message($"Received message {i}");

// Add a meesage id
message.Properties = new Properties() { MessageId = Guid.NewGuid().ToString() };

// Add some message properties
message.ApplicationProperties = new ApplicationProperties();
message.ApplicationProperties["Message.Type.FullName"] = typeof(string).FullName;

// Send message
sender.Send(message);

Task.Delay(2000).Wait();
}

// Wait for a key to close the program
Console.Read();
11 changes: 11 additions & 0 deletions deps/rabbitmq_amqp1_0/test/system_SUITE_data/console/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Console application to test AMQP 1.0

This is a basic .NetCore console application which uses *AMQP 1.0* .Net **AMQPNetLite** client library.

Usage:
```
dotnet run
```

It sends 10 messages and it waits for key-stroke to terminate which is very convenient
when we need to create AMQP 1.0 connections.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="AMQPNetLite" Version="2.4.5" />
</ItemGroup>

</Project>

0 comments on commit d0dc925

Please sign in to comment.