Skip to content

Commit

Permalink
[PLATFORM-2274]: Investigate possible DNS caching issues on our rabbi…
Browse files Browse the repository at this point in the history
…tmq libraries (#199)

* Resolving IPs and connect to one of them

* Reformat

* Fix credo

* Add open test with ip resolution mocked

* Remove test that cannot work on CI
  • Loading branch information
cottinisimone authored Oct 1, 2024
1 parent fbb7020 commit 8325dbb
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 76 deletions.
4 changes: 2 additions & 2 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
erlang 23.1
elixir 1.11.2-otp-23
erlang 25.3.2.8
elixir 1.13.4-otp-25
113 changes: 62 additions & 51 deletions lib/amqp/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,30 @@ defmodule Amqpx.Connection do

import Amqpx.Core

alias Amqpx.{Connection, Helper}
alias Amqpx.{Connection, DNS, Helper}

defstruct [:pid]
@type t :: %Connection{pid: pid}

@default_params [
username: "guest",
password: "guest",
virtual_host: "/",
host: ~c"localhost",
port: 5672,
channel_max: 0,
frame_max: 0,
heartbeat: 10,
connection_timeout: 50_000,
ssl_options: :none,
client_properties: [],
socket_options: [],
auth_mechanisms: [
&:amqp_auth_mechanisms.plain/3,
&:amqp_auth_mechanisms.amqplain/3
]
]

@doc """
Opens a new connection without a name.
Expand All @@ -21,7 +40,7 @@ defmodule Amqpx.Connection do
end

@doc """
Opens an new Connection to an Amqpx broker.
Opens a new Connection to an Amqpx broker.
The connections created by this module are supervised under amqp_client's supervision tree.
Please note that connections do not get restarted automatically by the supervision tree in
Expand Down Expand Up @@ -104,9 +123,7 @@ defmodule Amqpx.Connection do
end

def open(options, name) when is_list(options) and (is_binary(name) or name == :undefined) do
options
|> merge_options_to_default()
|> do_open(name)
do_open(options, @default_params, name)
end

def open(uri, options) when is_binary(uri) and is_list(options) do
Expand All @@ -133,31 +150,34 @@ defmodule Amqpx.Connection do
{:ok, t()} | {:error, atom()} | {:error, any()}
def open(uri, name, options) when is_binary(uri) and is_list(options) do
case uri |> String.to_charlist() |> :amqp_uri.parse() do
{:ok, amqp_params} -> amqp_params |> merge_options_to_amqp_params(options) |> do_open(name)
error -> error
{:ok, amqp_params} ->
amqp_params = amqp_params_network(amqp_params)
do_open(options, amqp_params, name)

error ->
error
end
end

@doc false
@spec merge_options_to_amqp_params(tuple, keyword) :: tuple
def merge_options_to_amqp_params(amqp_params, options) do
options = normalize_ssl_options(options)
params = amqp_params_network(amqp_params)
@spec merge_options(keyword, keyword) :: tuple
def merge_options(params, default) do
default = normalize_ssl_options(default)

amqp_params_network(
username: keys_get(options, params, :username),
password: Helper.get_password(options, params),
virtual_host: keys_get(options, params, :virtual_host),
host: options |> keys_get(params, :host) |> to_charlist,
port: keys_get(options, params, :port),
channel_max: keys_get(options, params, :channel_max),
frame_max: keys_get(options, params, :frame_max),
heartbeat: keys_get(options, params, :heartbeat),
connection_timeout: keys_get(options, params, :connection_timeout),
ssl_options: keys_get(options, params, :ssl_options),
client_properties: keys_get(options, params, :client_properties),
socket_options: keys_get(options, params, :socket_options),
auth_mechanisms: keys_get(options, params, :auth_mechanisms)
username: keys_get(params, default, :username),
password: Helper.get_password(params, default),
virtual_host: keys_get(params, default, :virtual_host),
host: params |> keys_get(default, :host) |> to_charlist(),
port: keys_get(params, default, :port),
channel_max: keys_get(params, default, :channel_max),
frame_max: keys_get(params, default, :frame_max),
heartbeat: keys_get(params, default, :heartbeat),
connection_timeout: keys_get(params, default, :connection_timeout),
ssl_options: keys_get(params, default, :ssl_options),
client_properties: keys_get(params, default, :client_properties),
socket_options: keys_get(params, default, :socket_options),
auth_mechanisms: keys_get(params, default, :auth_mechanisms)
)
end

Expand All @@ -166,28 +186,6 @@ defmodule Amqpx.Connection do
Keyword.get(k1, key, Keyword.get(k2, key))
end

defp merge_options_to_default(options) do
amqp_params_network(
username: Keyword.get(options, :username, "guest"),
password: Helper.get_password(options, nil),
virtual_host: Keyword.get(options, :virtual_host, "/"),
host: options |> Keyword.get(:host, 'localhost') |> to_charlist,
port: Keyword.get(options, :port, :undefined),
channel_max: Keyword.get(options, :channel_max, 0),
frame_max: Keyword.get(options, :frame_max, 0),
heartbeat: Keyword.get(options, :heartbeat, 10),
connection_timeout: Keyword.get(options, :connection_timeout, 50_000),
ssl_options: Keyword.get(options, :ssl_options, :none),
client_properties: Keyword.get(options, :client_properties, []),
socket_options: Keyword.get(options, :socket_options, []),
auth_mechanisms:
Keyword.get(options, :auth_mechanisms, [
&:amqp_auth_mechanisms.plain/3,
&:amqp_auth_mechanisms.amqplain/3
])
)
end

@doc """
Closes an open Connection.
"""
Expand All @@ -199,11 +197,24 @@ defmodule Amqpx.Connection do
end
end

defp do_open(amqp_params, name) do
case :amqp_connection.start(amqp_params, name) do
{:ok, pid} -> {:ok, %Connection{pid: pid}}
error -> error
end
@spec do_open(Keyword.t(), Keyword.t(), String.t()) ::
{:ok, t()} | {:error, atom()} | {:error, any()}
defp do_open(params, default_params, name) do
params
|> keys_get(default_params, :host)
|> to_charlist()
|> DNS.resolve_ips()
|> Enum.reduce_while(nil, fn ip, _ ->
amqp_params = params |> Keyword.put(:host, ip) |> merge_options(default_params)

case :amqp_connection.start(amqp_params, name) do
{:ok, pid} ->
{:halt, {:ok, %Connection{pid: pid}}}

error ->
{:cont, error}
end
end)
end

defp normalize_ssl_options(options) when is_list(options) do
Expand Down
20 changes: 20 additions & 0 deletions lib/amqp/dns.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule Amqpx.DNS do
@moduledoc """
Module to resolve a DNS record into an A record.
"""

@doc """
Resolves the IP addresses of a given hostname. If the hostname
cannot be resolved, it returns the hostname itself.
"""
@spec resolve_ips(charlist) :: [charlist]
def resolve_ips(host) do
case :inet.gethostbyname(host) do
{:ok, {:hostent, _, _, _, _, ips}} ->
ips |> Enum.map(&:inet.ntoa/1) |> Enum.dedup()

_ ->
[host]
end
end
end
119 changes: 96 additions & 23 deletions test/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,29 @@ defmodule ConnectionTest do
use ExUnit.Case

import Amqpx.Core
alias Amqpx.Connection
import Mock

alias Amqpx.{Connection, DNS}

@obfuscate_password false

test "open connection with host as binary" do
assert {:ok, conn} =
Connection.open(
username: "amqpx",
password: "amqpx",
host: "rabbit",
obfuscate_password: @obfuscate_password
)
@invalid_ip '192.168.1.1'
@valid_ip '192.168.1.2'

@open_options [
username: "amqpx",
password: "amqpx",
host: "rabbit",
obfuscate_password: @obfuscate_password
]

test "open connection with host as binary" do
assert {:ok, conn} = Connection.open(@open_options)
assert :ok = Connection.close(conn)
end

test "open connection with host as char list" do
assert {:ok, conn} =
Connection.open(
username: "amqpx",
password: "amqpx",
host: 'rabbit',
obfuscate_password: @obfuscate_password
)

assert {:ok, conn} = Connection.open(@open_options)
assert :ok = Connection.close(conn)
end

Expand All @@ -38,7 +36,7 @@ defmodule ConnectionTest do
test "open connection using both uri and options" do
assert {:ok, conn} =
Connection.open("amqp://amqpx:amqpx@nonexistent:5672",
host: 'rabbit',
host: ~c"rabbit",
obfuscate_password: @obfuscate_password
)

Expand All @@ -48,7 +46,7 @@ defmodule ConnectionTest do
test "open connection with uri, name, and options" do
assert {:ok, conn} =
Connection.open("amqp://amqpx:amqpx@nonexistent:5672", "my-connection",
host: 'rabbit',
host: ~c"rabbit",
obfuscate_password: @obfuscate_password
)

Expand All @@ -59,13 +57,88 @@ defmodule ConnectionTest do
uri = "amqp://guest:amqpx@rabbit:5672"
{:ok, amqp_params} = uri |> String.to_charlist() |> :amqp_uri.parse()

record =
Connection.merge_options_to_amqp_params(amqp_params, username: "amqpx", obfuscate_password: @obfuscate_password)

params = [username: "amqpx", obfuscate_password: @obfuscate_password]
default = amqp_params_network(amqp_params)
record = Connection.merge_options(params, default)
params = amqp_params_network(record)

assert params[:username] == "amqpx"
assert params[:password] == "amqpx"
assert params[:host] == 'rabbit'
assert params[:host] == ~c"rabbit"
end

describe "connecting using dns name resolution" do
test "Connection.open fails if none of the ips are reachable" do
with_mock DNS, resolve_ips: fn _host -> [@invalid_ip] end do
start = fn params, _name ->
params = amqp_params_network(params)

if params[:host] == @valid_ip do
{:ok, :c.pid(0, 250, 0)}
else
{:error, :econnrefused}
end
end

with_mock :amqp_connection, start: start do
assert {:error, :econnrefused} = Connection.open(@open_options)
end
end
end

test "Connection.open retry every ip until one succeed" do
with_mock DNS, resolve_ips: fn _host -> [@invalid_ip, @valid_ip] end do
pid = :c.pid(0, 250, 0)

start = fn params, _name ->
params = amqp_params_network(params)

if params[:host] == @valid_ip do
{:ok, pid}
else
{:error, :econnrefused}
end
end

with_mock :amqp_connection, start: start do
assert {:ok, %Connection{pid: ^pid}} = Connection.open(@open_options)
end
end
end

test "Connection.open retry every ip until one succeed, reversed" do
with_mock DNS, resolve_ips: fn _host -> [@valid_ip, @invalid_ip] end do
pid = :c.pid(0, 250, 0)

start = fn params, _name ->
params = amqp_params_network(params)

if params[:host] == @valid_ip do
{:ok, pid}
else
{:error, :econnrefused}
end
end

with_mock :amqp_connection, start: start do
assert {:ok, %Connection{pid: ^pid}} = Connection.open(@open_options)
end
end
end
end

describe "ip resolution" do
test "localhost is resolved as 127.0.0.1" do
assert [~c"127.0.0.1"] = DNS.resolve_ips(~c"localhost")
end

test "rabbit can be resolved into an ip" do
assert [ip] = DNS.resolve_ips(~c"rabbit")
assert {:ok, _} = :inet.parse_address(ip)
end

test "unknown host will not be resolved" do
assert [~c"nonexistent"] = DNS.resolve_ips(~c"nonexistent")
end
end
end

0 comments on commit 8325dbb

Please sign in to comment.