Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

List amqp10 connections #5881

Merged
merged 11 commits into from
Sep 30, 2022
3 changes: 3 additions & 0 deletions deps/rabbitmq_amqp1_0/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ suites = [
deps = [
"//deps/amqp10_common:erlang_app",
],
runtime_deps = [
"//deps/amqp10_client:erlang_app",
],
),
rabbitmq_integration_suite(
PACKAGE,
Expand Down
6 changes: 4 additions & 2 deletions deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ connection_info_local(Items) ->
list() ->
[ReaderPid
|| {_, TcpPid, _, [tcp_listener_sup]} <- supervisor:which_children(rabbit_sup),
{_, RanchLPid, _, [ranch_listener_sup]} <- supervisor:which_children(TcpPid),
{_, RanchCPid, _, [ranch_conns_sup]} <- supervisor:which_children(RanchLPid),
{_, RanchEPid, _, [ranch_embedded_sup]} <- supervisor:which_children(TcpPid),
{_, RanchLPid, _, [ranch_listener_sup]} <- supervisor:which_children(RanchEPid),
{_, RanchCSPid, _, [ranch_conns_sup_sup]} <- supervisor:which_children(RanchLPid),
{_, RanchCPid, _, [ranch_conns_sup]} <- supervisor:which_children(RanchCSPid),
{rabbit_connection_sup, ConnPid, _, _} <- supervisor:which_children(RanchCPid),
{reader, ReaderPid, _, _} <- supervisor:which_children(ConnPid)
].
9 changes: 6 additions & 3 deletions deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,9 @@ info(Pid, InfoItems) ->
UnknownItems -> throw({bad_argument, UnknownItems})
end.

info_internal(pid, #v1{}) -> self();
info_internal(connection, #v1{connection = Val}) ->
Val;
info_internal(node, #v1{}) -> node();
info_internal(auth_mechanism, #v1{connection = #v1_connection{auth_mechanism = none}}) ->
none;
Expand All @@ -763,11 +766,11 @@ info_internal(frame_max, #v1{connection = #v1_connection{frame_max = Val}}) ->
info_internal(timeout, #v1{connection = #v1_connection{timeout_sec = Val}}) ->
Val;
info_internal(user,
#v1{connection = #v1_connection{user = #user{username = none}}}) ->
'';
info_internal(username,
#v1{connection = #v1_connection{user = #user{username = Val}}}) ->
Val;
info_internal(user,
#v1{connection = #v1_connection{user = none}}) ->
'';
info_internal(state, #v1{connection_state = Val}) ->
Val;
info_internal(SockStat, S) when SockStat =:= recv_oct;
Expand Down
118 changes: 106 additions & 12 deletions deps/rabbitmq_amqp1_0/test/command_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_amqp1_0.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").

-define(COMMAND, 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListAmqp10ConnectionsCommand').

Expand All @@ -24,26 +24,36 @@ groups() ->
[
{non_parallel_tests, [], [
merge_defaults,
validate
]}
validate,
when_no_connections,
when_one_connection
]}
].

init_per_suite(Config) ->
Config1 = rabbit_ct_helpers:set_config(Config,
[{rmq_nodename_suffix, ?MODULE}]),
application:ensure_all_started(amqp10_client),
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config1,
rabbit_ct_broker_helpers:setup_steps()).
Config.

end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_broker_helpers:teardown_steps()).

init_per_group(_, Config) ->
Config.

init_per_group(Group, Config) ->
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
Config1 = rabbit_ct_helpers:set_config(
Config, [
{rmq_nodename_suffix, Suffix},
{amqp10_client_library, Group}
]),
rabbit_ct_helpers:run_setup_steps(
Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).

end_per_group(_, Config) ->
Config.
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).

init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
Expand All @@ -67,3 +77,87 @@ validate(_Config) ->
ok = ?COMMAND:validate([atom_to_binary(K, utf8) || K <- ?INFO_ITEMS], #{}),
{validation_failure,{bad_info_key,[other]}} =
?COMMAND:validate([<<"other">>], #{}).

when_no_connections(_Config) ->
[A] = rabbit_ct_broker_helpers:get_node_configs(_Config, nodename),
Opts = #{node => A, timeout => 2000, verbose => true},
[] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts)).

when_one_connection(_Config) ->
[A] = rabbit_ct_broker_helpers:get_node_configs(_Config, nodename),
Opts = #{node => A, timeout => 2000, verbose => true},

[Connection,Sender] = open_amqp10_connection(_Config),

[_] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts)),
close_amqp10_connection(Connection, Sender).

open_amqp10_connection(Config) ->
Host = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
QName = atom_to_binary(?FUNCTION_NAME, utf8),
Address = <<"/amq/queue/", QName/binary>>,
%% declare a quorum queue
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}),

% create a configuration map
OpnConf = #{address => Host,
port => Port,
container_id => atom_to_binary(?FUNCTION_NAME, utf8),
sasl => {plain, <<"guest">>, <<"guest">>}},

% ct:pal("opening connectoin with ~p", [OpnConf]),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session(Connection),
SenderLinkName = <<"test-sender">>,
{ok, Sender} = amqp10_client:attach_sender_link(Session,
SenderLinkName,
Address),

% wait for credit to be received
receive
{amqp10_event, {link, Sender, credited}} -> ok
after 2000 ->
exit(credited_timeout)
end,

OutMsg = amqp10_msg:new(<<"my-tag">>, <<"my-body">>, true),
ok = amqp10_client:send_msg(Sender, OutMsg),

flush("pre-receive"),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session,
<<"test-receiver">>,
Address),

% grant credit and drain
ok = amqp10_client:flow_link_credit(Receiver, 1, never, true),

% wait for a delivery
receive
{amqp10_msg, Receiver, _InMsg} -> ct:pal("Received amqp 1.0 message : ~w~n", [_InMsg]), ok
after 2000 ->
exit(delivery_timeout)
end,



[Connection, Sender].

flush(Prefix) ->
receive
Msg ->
ct:pal("~s flushed: ~w~n", [Prefix, Msg]),
flush(Prefix)
after 1 ->
ok
end.

close_amqp10_connection(Connection, Sender) ->
flush("final"),
ct:pal("Closing connection ~w~n", [Connection]),
ok = amqp10_client:detach_link(Sender),
ok = amqp10_client:close_connection(Connection),
ok.
44 changes: 44 additions & 0 deletions deps/rabbitmq_amqp1_0/test/system_SUITE_data/console/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using Amqp;
MarcialRosales marked this conversation as resolved.
Show resolved Hide resolved
using Amqp.Framing;

// See https://aka.ms/new-console-template for more information
Console.WriteLine("Sending 10 messages to amqp10-queue. Press a button to terminate");

// Create the AMQP connection string
var connectionString = $"amqp://guest:guest@localhost:5672/%2F";

// Create the AMQP connection
var connection = new Connection(new Address(connectionString));

// Create the AMQP session
var amqpSession = new Session(connection);

// Give a name to the sender
var senderSubscriptionId = "rabbitmq.amqp.sender";

// Name of the topic you will be sending messages (Name of the Queue)
var topic = "amqp10-queue";

// Create the AMQP sender
var sender = new SenderLink(amqpSession, senderSubscriptionId, topic);

for (var i = 0; i < 10; i++)
{
// Create message
var message = new Message($"Received message {i}");

// Add a meesage id
message.Properties = new Properties() { MessageId = Guid.NewGuid().ToString() };

// Add some message properties
message.ApplicationProperties = new ApplicationProperties();
message.ApplicationProperties["Message.Type.FullName"] = typeof(string).FullName;

// Send message
sender.Send(message);

Task.Delay(2000).Wait();
}

// Wait for a key to close the program
Console.Read();
11 changes: 11 additions & 0 deletions deps/rabbitmq_amqp1_0/test/system_SUITE_data/console/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Console application to test AMQP 1.0

This is a basic .NetCore console application which uses *AMQP 1.0* .Net **AMQPNetLite** client library.

Usage:
```
dotnet run
```

It sends 10 messages and it waits for key-stroke to terminate which is very convenient
when we need to create AMQP 1.0 connections.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="AMQPNetLite" Version="2.4.5" />
</ItemGroup>

</Project>