Skip to content

Commit

Permalink
Handle send errors in http2
Browse files Browse the repository at this point in the history
To get faster reaction times on failing socket-send of echos/requests
we now handle the response from the send call where applicable.

To avoid a larger change in the statemachine a send error triggers an
event and is handled like an remote socket error/close.

Fixes #227 and touches #224
  • Loading branch information
bjosv committed Sep 17, 2020
1 parent 921c471 commit 9b66e83
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 12 deletions.
33 changes: 21 additions & 12 deletions src/gun_http2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ init(_ReplyTo, Socket, Transport, Opts0) ->
State = #http2_state{socket=Socket,
transport=Transport, opts=Opts, content_handlers=Handlers,
http2_machine=HTTP2Machine},
Transport:send(Socket, Preface),
send_packet(Transport, Socket, Preface),
{connected, State}.

switch_transport(Transport, Socket, State) ->
Expand Down Expand Up @@ -289,8 +289,8 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl

maybe_ack(State=#http2_state{socket=Socket, transport=Transport}, Frame) ->
case Frame of
{settings, _} -> Transport:send(Socket, cow_http2:settings_ack());
{ping, Opaque} -> Transport:send(Socket, cow_http2:ping_ack(Opaque));
{settings, _} -> send_packet(Transport, Socket, cow_http2:settings_ack());
{ping, Opaque} -> send_packet(Transport, Socket, cow_http2:ping_ack(Opaque));
_ -> ok
end,
State.
Expand Down Expand Up @@ -476,7 +476,7 @@ update_window(State=#http2_state{socket=Socket, transport=Transport,
ok ->
State;
{ok, Increment, HTTP2Machine} ->
Transport:send(Socket, cow_http2:window_update(Increment)),
send_packet(Transport, Socket, cow_http2:window_update(Increment)),
State#http2_state{http2_machine=HTTP2Machine}
end.

Expand All @@ -494,7 +494,7 @@ update_window(State=#http2_state{socket=Socket, transport=Transport,
end,
case {Data1, Data2} of
{<<>>, <<>>} -> ok;
_ -> Transport:send(Socket, [Data1, Data2])
_ -> send_packet(Transport, Socket, [Data1, Data2])
end,
State#http2_state{http2_machine=HTTP2Machine}.

Expand Down Expand Up @@ -567,7 +567,7 @@ close_stream(#stream{ref=StreamRef, reply_to=ReplyTo}, Reason) ->
ok.

keepalive(State=#http2_state{socket=Socket, transport=Transport}, _, EvHandlerState) ->
Transport:send(Socket, cow_http2:ping(0)),
send_packet(Transport, Socket, cow_http2:ping(0)),
{State, EvHandlerState}.

headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
Expand All @@ -589,7 +589,7 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
{ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers(
StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers),
Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
send_packet(Transport, Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1),
InitialFlow = initial_flow(InitialFlow0, Opts),
Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow,
Expand Down Expand Up @@ -621,7 +621,7 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
end,
{ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers(
StreamID, HTTP2Machine1, IsFin0, PseudoHeaders, Headers),
Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
send_packet(Transport, Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1),
InitialFlow = initial_flow(InitialFlow0, Opts),
Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow,
Expand Down Expand Up @@ -722,7 +722,7 @@ send_data(State0, StreamID, IsFin, [Data|Tail], EvHandler, EvHandlerState) ->

send_data_frame(State=#http2_state{socket=Socket, transport=Transport},
StreamID, IsFin, {data, Data}) ->
Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data)),
send_packet(Transport, Socket, cow_http2:data(StreamID, IsFin, Data)),
State;
%% @todo Uncomment this once sendfile is supported.
%send_data_frame(State=#http2_state{socket=Socket, transport=Transport},
Expand All @@ -735,12 +735,12 @@ send_data_frame(State=#http2_state{socket=Socket, transport=Transport,
http2_machine=HTTP2Machine0}, StreamID, nofin, {trailers, Trailers}) ->
{ok, HeaderBlock, HTTP2Machine}
= cow_http2_machine:prepare_trailers(StreamID, HTTP2Machine0, Trailers),
Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)),
send_packet(Transport, Socket, cow_http2:headers(StreamID, fin, HeaderBlock)),
State#http2_state{http2_machine=HTTP2Machine}.

reset_stream(State0=#http2_state{socket=Socket, transport=Transport},
StreamID, StreamError={stream_error, Reason, _}) ->
Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)),
send_packet(Transport, Socket, cow_http2:rst_stream(StreamID, Reason)),
case take_stream(State0, StreamID) of
{#stream{ref=StreamRef, reply_to=ReplyTo}, State} ->
ReplyTo ! {gun_error, self(), StreamRef, StreamError},
Expand All @@ -754,7 +754,7 @@ cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP
case get_stream_by_ref(State, StreamRef) of
#stream{id=StreamID} ->
{ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0),
Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)),
send_packet(Transport, Socket, cow_http2:rst_stream(StreamID, cancel)),
EvHandlerState = EvHandler:cancel(#{
stream_ref => StreamRef,
reply_to => ReplyTo,
Expand Down Expand Up @@ -868,3 +868,12 @@ delete_stream(State=#http2_state{streams=Streams, stream_refs=Refs}, StreamID) -
streams=maps:remove(StreamID, Streams),
stream_refs=maps:remove(StreamRef, Refs)
}.

send_packet(Transport, Socket, Packet) ->
case Transport:send(Socket, Packet) of
ok ->
ok;
{error, Reason} ->
{_, _, Error} = Transport:messages(),
self() ! {Error, Socket, Reason}
end.
130 changes: 130 additions & 0 deletions test/connection_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
%% Copyright (c) 2017-2020, Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
%% copyright notice and this permission notice appear in all copies.
%%
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

-module(connection_SUITE).
-compile(export_all).
-compile(nowarn_export_all).

-import(ct_helper, [doc/1]).
-import(gun_test, [http2_handshake/2]).

suite() ->
[{timetrap, 180000}].

all() ->
[{group, gun}].

groups() ->
[{gun, [parallel], ct_helper:all(?MODULE)}].

init_per_suite(Config) ->
case os:type() of
{_, linux} -> Config;
_ -> {skip, "linux only due to socket juggling"}
end.

end_per_suite(_) -> ok.

%% Tests.

http2_send_request_fail(_) ->
doc("Handle send failures of requests in http2."),
{ok, ListenSocket} = gen_tcp:listen(0, [binary, {active, false}]),
{ok, {_, Port}} = inet:sockname(ListenSocket),
%% Socket buffers needs to be smaller than local_window/ConnWindow
{ok, Pid} = gun:open("localhost", Port, #{protocols => [http2],
tcp_opts => [{send_timeout, 250},
{send_timeout_close, true},
{sndbuf, 2048},
{nodelay, true}]
}),
{ok, ClientSocket} = gen_tcp:accept(ListenSocket, 5000),
inet:setopts(ClientSocket, [{recbuf, 512}]),
http2_handshake(ClientSocket, gen_tcp),
{ok, http2} = gun:await_up(Pid),
post_loop(Pid, 1000), %% Fill buffer
receive
{gun_error, Pid, _, {closed, {error, _}}} ->
gun:close(Pid);
Msg ->
error({fail, Msg})
after 5000 ->
error(timeout)
end.

http2_send_ping_fail(_) ->
doc("Handle send failures of ping in http2."),
{ok, ListenSocket} = gen_tcp:listen(0, [binary, {active, false}]),
{ok, {_, Port}} = inet:sockname(ListenSocket),
{ok, Pid} = gun:open("localhost", Port, #{protocols => [http2],
http2_opts => #{keepalive => 1},
tcp_opts => [{send_timeout, 250},
{send_timeout_close, true},
{sndbuf, 256},
{nodelay, true}]
}),
{ok, ClientSocket} = gen_tcp:accept(ListenSocket, 5000),
inet:setopts(ClientSocket, [{recbuf, 256}]),
http2_handshake(ClientSocket, gen_tcp),
{ok, http2} = gun:await_up(Pid),
receive
{gun_down, Pid, http2, {error, _}, []} ->
gun:close(Pid);
Msg ->
error({fail, Msg})
after 5000 ->
error(timeout)
end.

http2_send_ping_ack_fail(_) ->
doc("Handle send failures of ping ack in http2."),
{ok, ListenSocket} = gen_tcp:listen(0, [binary, {active, false}]),
{ok, {_, Port}} = inet:sockname(ListenSocket),
{ok, Pid} = gun:open("localhost", Port, #{protocols => [http2],
http2_opts => #{keepalive => infinity},
tcp_opts => [{send_timeout, 250},
{send_timeout_close, true},
{sndbuf, 256},
{nodelay, true}]
}),
{ok, ClientSocket} = gen_tcp:accept(ListenSocket, 5000),
inet:setopts(ClientSocket, [{recbuf, 256}]),
http2_handshake(ClientSocket, gen_tcp),
{ok, http2} = gun:await_up(Pid),
ping_loop(ClientSocket, 1800), %% Send pings triggering ping acks
receive
{gun_down, Pid, http2, {error, _}, []} ->
gun:close(Pid);
Msg ->
error({fail, Msg})
after 5000 ->
error(timeout)
end.

%% Helpers

post_loop(_Pid, 0) ->
ok;
post_loop(Pid, Loops) ->
Body = <<0:1000>>,
gun:post(Pid, "/organizations/ninenines",
[{<<"content-type">>, "application/octet-stream"}],
Body),
post_loop(Pid, Loops - 1).

ping_loop(_Socket, 0) ->
ok;
ping_loop(Socket, Loops) ->
gun_tcp:send(Socket, cow_http2:ping(0)),
ping_loop(Socket, Loops - 1).

0 comments on commit 9b66e83

Please sign in to comment.