Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S2S DNS discovery fix #4278

Merged
merged 4 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions big_tests/test.config
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
%% used to test s2s features
{fed, [{node, fed1@localhost},
{domain, <<"fed1">>},
{secondary_domain, <<"fed2">>},
{host_type, <<"fed1">>},
{vars, "fed1"},
{incoming_s2s_port, 5299},
Expand Down
61 changes: 58 additions & 3 deletions big_tests/tests/s2s_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
-include_lib("exml/include/exml.hrl").
-include_lib("exml/include/exml_stream.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("kernel/include/inet.hrl").

%% Module aliases
-define(dh, distributed_helper).
Expand Down Expand Up @@ -67,8 +68,8 @@ essentials() ->
[simple_message].

all_tests() ->
[connections_info, nonexistent_user, unknown_domain, malformed_jid,
dialback_with_wrong_key | essentials()].
[connections_info, dns_discovery, dns_discovery_ip_fail, nonexistent_user,
unknown_domain, malformed_jid, dialback_with_wrong_key | essentials()].

negative() ->
[timeout_waiting_for_message].
Expand Down Expand Up @@ -96,6 +97,7 @@ users() ->
%%%===================================================================

init_per_suite(Config) ->
mongoose_helper:inject_module(?MODULE, reload),
Config1 = s2s_helper:init_s2s(escalus:init_per_suite(Config)),
escalus:create_users(Config1, escalus:get_users(users())).

Expand All @@ -114,9 +116,40 @@ init_per_group(GroupName, Config) ->
end_per_group(_GroupName, _Config) ->
ok.

init_per_testcase(dns_discovery = CaseName, Config) ->
meck_inet_res("_xmpp-server._tcp.fed2"),
ok = rpc(mim(), meck, new, [inet, [no_link, unstick, passthrough]]),
ok = rpc(mim(), meck, expect,
[inet, getaddr,
fun ("fed2", inet) ->
{ok, {127, 0, 0, 1}};
(Address, Family) ->
meck:passthrough([Address, Family])
end]),
Config1 = escalus_users:update_userspec(Config, alice2, server, <<"fed2">>),
escalus:init_per_testcase(CaseName, Config1);
init_per_testcase(dns_discovery_ip_fail = CaseName, Config) ->
meck_inet_res("_xmpp-server._tcp.fed3"),
ok = rpc(mim(), meck, new, [inet, [no_link, unstick, passthrough]]),
escalus:init_per_testcase(CaseName, Config);
init_per_testcase(CaseName, Config) ->
escalus:init_per_testcase(CaseName, Config).

meck_inet_res(Domain) ->
ok = rpc(mim(), meck, new, [inet_res, [no_link, unstick, passthrough]]),
ok = rpc(mim(), meck, expect,
[inet_res, getbyname,
fun (Domain1, srv, _Timeout) when Domain1 == Domain ->
{ok, {hostent, Domain, [], srv, 1,
[{30, 10, 5299, "localhost"}]}};
(Name, Type, Timeout) ->
meck:passthrough([Name, Type, Timeout])
end]).

end_per_testcase(CaseName, Config) when CaseName =:= dns_discovery;
CaseName =:= dns_discovery_ip_fail ->
rpc(mim(), meck, unload, []),
escalus:end_per_testcase(CaseName, Config);
end_per_testcase(CaseName, Config) ->
escalus:end_per_testcase(CaseName, Config).

Expand Down Expand Up @@ -164,9 +197,31 @@ connections_info(Config) ->
[_ | _] = get_s2s_connections(?dh:mim(), FedDomain, out),
ok.

dns_discovery(Config) ->
simple_message(Config),
%% Ensure that the mocked DNS discovery for connecting to the other server
History = rpc(mim(), meck, history, [inet_res]),
?assertEqual(length(History), 2),
?assertEqual(s2s_helper:has_xmpp_server(History, "fed2"), true),
ok.


dns_discovery_ip_fail(Config) ->
escalus:fresh_story(Config, [{alice, 1}], fun(Alice1) ->

escalus:send(Alice1, escalus_stanza:chat_to(
<<"alice2@fed3">>,
<<"Hello, second Alice!">>)),

Stanza = escalus:wait_for_stanza(Alice1, 10000),
escalus:assert(is_error, [<<"cancel">>, <<"remote-server-not-found">>], Stanza),
History = rpc(mim(), meck, history, [inet]),
?assertEqual(s2s_helper:has_inet_errors(History, "fed3"), true)
end).

get_s2s_connections(RPCSpec, Domain, Type) ->
AllS2SConnections = ?dh:rpc(RPCSpec, mongoose_s2s_info, get_connections, [Type]),
DomainS2SConnections =
DomainS2SConnections =
[Connection || Connection <- AllS2SConnections,
Type =/= in orelse [Domain] =:= maps:get(domains, Connection),
Type =/= out orelse Domain =:= maps:get(server, Connection)],
Expand Down
23 changes: 23 additions & 0 deletions big_tests/tests/s2s_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
-export([init_s2s/1]).
-export([end_s2s/1]).
-export([configure_s2s/2]).
-export([has_inet_errors/2]).
-export([has_xmpp_server/2]).

-import(distributed_helper, [rpc_spec/1, rpc/4]).
-import(domain_helper, [host_type/1]).
Expand Down Expand Up @@ -82,3 +84,24 @@ restart_s2s(#{} = Spec, S2SListener) ->
{_, Pid, _, _} <- ChildrenIn],

mongoose_helper:restart_listener(Spec, S2SListener).

has_inet_errors(History, Server) ->
Inet = lists:any(
fun({_, {inet, getaddr, [Server1, inet]}, {error, nxdomain}})
when Server1 == Server -> true;
(_) -> false
end, History),
Inet6 = lists:any(
fun({_, {inet, getaddr, [Server1, inet6]}, {error, nxdomain}})
when Server1 == Server -> true;
(_) -> false
end, History),
Inet andalso Inet6.

has_xmpp_server(History, Server) ->
lists:all(
fun({_, _, {ok, {hostent, "_xmpp-server._tcp." ++ Server1, _, srv, _, _}}})
when Server1 == Server -> true;
(_) -> false
end, History).

2 changes: 1 addition & 1 deletion rel/fed1.vars-toml.config
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

%% This node is for s2s testing.
%% "localhost" host should NOT be defined.
{hosts, "\"fed1\""}.
{hosts, "\"fed1\", \"fed2\""}.
{default_server_domain, "\"fed1\""}.
{cluster_name, "fed"}.

Expand Down
78 changes: 51 additions & 27 deletions src/ejabberd_s2s_out.erl
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,13 @@
| wait_before_retry.

%% FSM handler return value
-type fsm_return() :: {'stop', Reason :: 'normal', state()}
| {'next_state', statename(), state()}
| {'next_state', statename(), state(), Timeout :: integer()}.
-type fsm_return() :: {stop, Reason :: normal, state()}
| {next_state, statename(), state()}
| {next_state, statename(), state(), Timeout :: integer()}.

-type addr() :: #{ip_tuple := inet:ip_address(),
-type dns_name() :: string().

-type addr() :: #{address := inet:ip_address() | dns_name(),
port := inet:port_number(),
type := inet | inet6}.

Expand Down Expand Up @@ -171,11 +173,11 @@
%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
-spec start(ejabberd_s2s:fromto(), _) -> {'error', _} | {'ok', 'undefined' | pid()} | {'ok', 'undefined' | pid(), _}.
-spec start(ejabberd_s2s:fromto(), _) -> {error, _} | {ok, undefined | pid()} | {ok, undefined | pid(), _}.
start(FromTo, Type) ->
supervisor:start_child(ejabberd_s2s_out_sup, [FromTo, Type]).

-spec start_link(ejabberd_s2s:fromto(), _) -> 'ignore' | {'error', _} | {'ok', pid()}.
-spec start_link(ejabberd_s2s:fromto(), _) -> ignore | {error, _} | {ok, pid()}.
start_link(FromTo, Type) ->
p1_fsm:start_link(ejabberd_s2s_out, [FromTo, Type],
fsm_limit_opts() ++ ?FSMOPTS).
Expand All @@ -197,7 +199,7 @@
%% ignore |
%% {stop, StopReason}
%%----------------------------------------------------------------------
-spec init(list()) -> {'ok', 'open_socket', state()}.
-spec init(list()) -> {ok, open_socket, state()}.
init([{From, Server} = FromTo, Type]) ->
process_flag(trap_exit, true),
?LOG_DEBUG(#{what => s2s_out_started,
Expand Down Expand Up @@ -254,7 +256,7 @@
AddrList = get_addr_list(HostType, StateData#state.server),
case lists:foldl(fun(_, {ok, Socket}) ->
{ok, Socket};
(#{ip_tuple := Addr, port := Port, type := Type}, _) ->
(#{address := Addr, port := Port, type := Type}, _) ->
open_socket2(HostType, Type, Addr, Port)
end, ?SOCKET_DEFAULT_RESULT, AddrList) of
{ok, Socket} ->
Expand Down Expand Up @@ -282,7 +284,7 @@
{next_state, open_socket, StateData}.

-spec open_socket2(mongooseim:host_type(), inet | inet6, inet:ip_address(), inet:port_number()) ->
{'error', _} | {'ok', _}.
{error, _} | {ok, _}.
open_socket2(HostType, Type, Addr, Port) ->
?LOG_DEBUG(#{what => s2s_out_connecting,
address => Addr, port => Port}),
Expand Down Expand Up @@ -712,12 +714,12 @@
%%% Internal functions
%%%----------------------------------------------------------------------

-spec send_text(state(), binary()) -> 'ok'.
-spec send_text(state(), binary()) -> ok.
send_text(StateData, Text) ->
mongoose_transport:send_text(StateData#state.socket, Text).


-spec send_element(state(), exml:element()|mongoose_acc:t()) -> 'ok'.
-spec send_element(state(), exml:element()|mongoose_acc:t()) -> ok.
send_element(StateData, #xmlel{} = El) ->
mongoose_transport:send_element(StateData#state.socket, El).

Expand All @@ -727,7 +729,7 @@
Acc.


-spec send_queue(state(), Q :: element_queue()) -> 'ok'.
-spec send_queue(state(), Q :: element_queue()) -> ok.
send_queue(StateData, Q) ->
case queue:out(Q) of
{{value, {Acc, El}}, Q1} ->
Expand All @@ -739,7 +741,7 @@


%% @doc Bounce a single message (xmlel)
-spec bounce_element(Acc :: mongoose_acc:t(), El :: exml:element(), Error :: exml:element()) -> 'ok'.
-spec bounce_element(Acc :: mongoose_acc:t(), El :: exml:element(), Error :: exml:element()) -> ok.
bounce_element(Acc, El, Error) ->
case mongoose_acc:stanza_type(Acc) of
<<"error">> -> ok;
Expand All @@ -752,7 +754,7 @@
end.


-spec bounce_queue(Q :: element_queue(), Error :: exml:element()) -> 'ok'.
-spec bounce_queue(Q :: element_queue(), Error :: exml:element()) -> ok.
bounce_queue(Q, Error) ->
case queue:out(Q) of
{{value, {Acc, El}}, Q1} ->
Expand All @@ -768,7 +770,7 @@
mongoose_bin:gen_from_crypto().


-spec cancel_timer(reference()) -> 'ok'.
-spec cancel_timer(reference()) -> ok.
cancel_timer(Timer) ->
erlang:cancel_timer(Timer),
receive
Expand All @@ -779,7 +781,7 @@
end.


-spec bounce_messages(exml:element()) -> 'ok'.
-spec bounce_messages(exml:element()) -> ok.
bounce_messages(Error) ->
receive
{send_element, Acc, El} ->
Expand Down Expand Up @@ -840,36 +842,58 @@
ASCIIAddr -> do_lookup_services(HostType, ASCIIAddr)
end.

-spec do_lookup_services(mongooseim:host_type(),jid:lserver()) -> [addr()].
-spec do_lookup_services(mongooseim:host_type(), jid:lserver()) -> [addr()].
do_lookup_services(HostType, Server) ->
Res = srv_lookup(HostType, Server),
case Res of
{error, Reason} ->
?LOG_DEBUG(#{what => s2s_srv_lookup_failed,
reason => Reason, server => Server}),
[];
{ok, #hostent{h_addr_list = AddrList, h_addrtype = Type}} ->
{AddrList, Type} ->
%% Probabilities are not exactly proportional to weights
%% for simplicity (higher weights are overvalued)
case (catch lists:map(fun calc_addr_index/1, AddrList)) of
{'EXIT', _Reason} ->
[];
IndexedAddrs ->
Addrs = [#{ip_tuple => Addr, port => Port, type => Type}
Addrs = [#{address => Addr, port => Port, type => Type}
|| {_Index, Addr, Port} <- lists:keysort(1, IndexedAddrs)],
?LOG_DEBUG(#{what => s2s_srv_lookup_success,
addresses => Addrs, server => Server}),
Addrs
end
end.


-spec srv_lookup(mongooseim:host_type(), jid:lserver()) ->
{'error', atom()} | {'ok', inet:hostent()}.
{error, atom()} | {list(), inet | inet6}.
srv_lookup(HostType, Server) ->
#{timeout := TimeoutSec, retries := Retries} = mongoose_config:get_opt([{s2s, HostType}, dns]),
srv_lookup(Server, timer:seconds(TimeoutSec), Retries).
case srv_lookup(Server, timer:seconds(TimeoutSec), Retries) of
{error, Reason} ->
{error, Reason};
{ok, #hostent{h_addr_list = AddrList}} ->
case get_inet_protocol(Server) of
{error, Reason} ->
{error, Reason};
Type ->
{AddrList, Type}
end
end.

-spec get_inet_protocol(jid:lserver()) -> {error, atom()} | inet | inet6.
get_inet_protocol(Server) ->
case inet:getaddr(binary_to_list(Server), inet) of
{ok, _IPv4Addr} ->
inet;
{error, _} ->
case inet:getaddr(binary_to_list(Server), inet6) of
{ok, _IPv6Addr} ->
inet6;

Check warning on line 892 in src/ejabberd_s2s_out.erl

View check run for this annotation

Codecov / codecov/patch

src/ejabberd_s2s_out.erl#L892

Added line #L892 was not covered by tests
{error, Reason} ->
{error, Reason}
end
end.

%% @doc XXX - this behaviour is suboptimal in the case that the domain
%% has a "_xmpp-server._tcp." but not a "_jabber._tcp." record and
Expand All @@ -878,7 +902,7 @@
-spec srv_lookup(jid:server(),
Timeout :: non_neg_integer(),
Retries :: pos_integer()
) -> {'error', atom()} | {'ok', inet:hostent()}.
) -> {error, atom()} | {ok, inet:hostent()}.
srv_lookup(_Server, _Timeout, Retries) when Retries < 1 ->
{error, timeout};
srv_lookup(Server, Timeout, Retries) ->
Expand All @@ -901,7 +925,7 @@
lookup_addrs(HostType, Server) ->
Port = outgoing_s2s_port(HostType),
lists:foldl(fun(Type, []) ->
[#{ip_tuple => Addr, port => Port, type => Type}
[#{address => Addr, port => Port, type => Type}
|| Addr <- lookup_addrs_for_type(Server, Type)];
(_Type, Addrs) ->
Addrs
Expand Down Expand Up @@ -963,7 +987,7 @@

%% @doc Calculate timeout depending on which state we are in:
%% Can return integer > 0 | infinity
-spec get_timeout_interval(statename()) -> 'infinity' | non_neg_integer().
-spec get_timeout_interval(statename()) -> infinity | non_neg_integer().
get_timeout_interval(StateName) ->
case StateName of
%% Validation implies dialback: Networking can take longer:
Expand Down Expand Up @@ -1021,7 +1045,7 @@
Pids).


-spec fsm_limit_opts() -> [{'max_queue', integer()}].
-spec fsm_limit_opts() -> [{max_queue, integer()}].
fsm_limit_opts() ->
case mongoose_config:lookup_opt(max_fsm_queue) of
{ok, N} ->
Expand All @@ -1045,7 +1069,7 @@
{ok, #{ip_address := IPAddress} = M} ->
{ok, IPTuple} = inet:parse_address(IPAddress),
Port = get_predefined_port(HostType, M),
[#{ip_tuple => IPTuple, port => Port, type => addr_type(IPTuple)}];
[#{address => IPTuple, port => Port, type => addr_type(IPTuple)}];
{error, not_found} ->
[]
end.
Expand Down