From 9b66e83b0d3015944c5518e8f4c07ed1c93873e1 Mon Sep 17 00:00:00 2001 From: Bjorn Svensson Date: Wed, 16 Sep 2020 09:02:07 +0200 Subject: [PATCH] Handle send errors in http2 To get faster reaction times on failing socket-send of echos/requests we now handle the response from the send call where applicable. To avoid a larger change in the statemachine a send error triggers an event and is handled like an remote socket error/close. Fixes #227 and touches #224 --- src/gun_http2.erl | 33 ++++++---- test/connection_SUITE.erl | 130 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 151 insertions(+), 12 deletions(-) create mode 100644 test/connection_SUITE.erl diff --git a/src/gun_http2.erl b/src/gun_http2.erl index fef10969..7a21f128 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -145,7 +145,7 @@ init(_ReplyTo, Socket, Transport, Opts0) -> State = #http2_state{socket=Socket, transport=Transport, opts=Opts, content_handlers=Handlers, http2_machine=HTTP2Machine}, - Transport:send(Socket, Preface), + send_packet(Transport, Socket, Preface), {connected, State}. switch_transport(Transport, Socket, State) -> @@ -289,8 +289,8 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl maybe_ack(State=#http2_state{socket=Socket, transport=Transport}, Frame) -> case Frame of - {settings, _} -> Transport:send(Socket, cow_http2:settings_ack()); - {ping, Opaque} -> Transport:send(Socket, cow_http2:ping_ack(Opaque)); + {settings, _} -> send_packet(Transport, Socket, cow_http2:settings_ack()); + {ping, Opaque} -> send_packet(Transport, Socket, cow_http2:ping_ack(Opaque)); _ -> ok end, State. @@ -476,7 +476,7 @@ update_window(State=#http2_state{socket=Socket, transport=Transport, ok -> State; {ok, Increment, HTTP2Machine} -> - Transport:send(Socket, cow_http2:window_update(Increment)), + send_packet(Transport, Socket, cow_http2:window_update(Increment)), State#http2_state{http2_machine=HTTP2Machine} end. @@ -494,7 +494,7 @@ update_window(State=#http2_state{socket=Socket, transport=Transport, end, case {Data1, Data2} of {<<>>, <<>>} -> ok; - _ -> Transport:send(Socket, [Data1, Data2]) + _ -> send_packet(Transport, Socket, [Data1, Data2]) end, State#http2_state{http2_machine=HTTP2Machine}. @@ -567,7 +567,7 @@ close_stream(#stream{ref=StreamRef, reply_to=ReplyTo}, Reason) -> ok. keepalive(State=#http2_state{socket=Socket, transport=Transport}, _, EvHandlerState) -> - Transport:send(Socket, cow_http2:ping(0)), + send_packet(Transport, Socket, cow_http2:ping(0)), {State, EvHandlerState}. headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, @@ -589,7 +589,7 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers), - Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), + send_packet(Transport, Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), InitialFlow = initial_flow(InitialFlow0, Opts), Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, @@ -621,7 +621,7 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, end, {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( StreamID, HTTP2Machine1, IsFin0, PseudoHeaders, Headers), - Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), + send_packet(Transport, Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), InitialFlow = initial_flow(InitialFlow0, Opts), Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, @@ -722,7 +722,7 @@ send_data(State0, StreamID, IsFin, [Data|Tail], EvHandler, EvHandlerState) -> send_data_frame(State=#http2_state{socket=Socket, transport=Transport}, StreamID, IsFin, {data, Data}) -> - Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data)), + send_packet(Transport, Socket, cow_http2:data(StreamID, IsFin, Data)), State; %% @todo Uncomment this once sendfile is supported. %send_data_frame(State=#http2_state{socket=Socket, transport=Transport}, @@ -735,12 +735,12 @@ send_data_frame(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamID, nofin, {trailers, Trailers}) -> {ok, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_trailers(StreamID, HTTP2Machine0, Trailers), - Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)), + send_packet(Transport, Socket, cow_http2:headers(StreamID, fin, HeaderBlock)), State#http2_state{http2_machine=HTTP2Machine}. reset_stream(State0=#http2_state{socket=Socket, transport=Transport}, StreamID, StreamError={stream_error, Reason, _}) -> - Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)), + send_packet(Transport, Socket, cow_http2:rst_stream(StreamID, Reason)), case take_stream(State0, StreamID) of {#stream{ref=StreamRef, reply_to=ReplyTo}, State} -> ReplyTo ! {gun_error, self(), StreamRef, StreamError}, @@ -754,7 +754,7 @@ cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP case get_stream_by_ref(State, StreamRef) of #stream{id=StreamID} -> {ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0), - Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)), + send_packet(Transport, Socket, cow_http2:rst_stream(StreamID, cancel)), EvHandlerState = EvHandler:cancel(#{ stream_ref => StreamRef, reply_to => ReplyTo, @@ -868,3 +868,12 @@ delete_stream(State=#http2_state{streams=Streams, stream_refs=Refs}, StreamID) - streams=maps:remove(StreamID, Streams), stream_refs=maps:remove(StreamRef, Refs) }. + +send_packet(Transport, Socket, Packet) -> + case Transport:send(Socket, Packet) of + ok -> + ok; + {error, Reason} -> + {_, _, Error} = Transport:messages(), + self() ! {Error, Socket, Reason} + end. diff --git a/test/connection_SUITE.erl b/test/connection_SUITE.erl new file mode 100644 index 00000000..9c5c8be0 --- /dev/null +++ b/test/connection_SUITE.erl @@ -0,0 +1,130 @@ +%% Copyright (c) 2017-2020, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(connection_SUITE). +-compile(export_all). +-compile(nowarn_export_all). + +-import(ct_helper, [doc/1]). +-import(gun_test, [http2_handshake/2]). + +suite() -> + [{timetrap, 180000}]. + +all() -> + [{group, gun}]. + +groups() -> + [{gun, [parallel], ct_helper:all(?MODULE)}]. + +init_per_suite(Config) -> + case os:type() of + {_, linux} -> Config; + _ -> {skip, "linux only due to socket juggling"} + end. + +end_per_suite(_) -> ok. + +%% Tests. + +http2_send_request_fail(_) -> + doc("Handle send failures of requests in http2."), + {ok, ListenSocket} = gen_tcp:listen(0, [binary, {active, false}]), + {ok, {_, Port}} = inet:sockname(ListenSocket), + %% Socket buffers needs to be smaller than local_window/ConnWindow + {ok, Pid} = gun:open("localhost", Port, #{protocols => [http2], + tcp_opts => [{send_timeout, 250}, + {send_timeout_close, true}, + {sndbuf, 2048}, + {nodelay, true}] + }), + {ok, ClientSocket} = gen_tcp:accept(ListenSocket, 5000), + inet:setopts(ClientSocket, [{recbuf, 512}]), + http2_handshake(ClientSocket, gen_tcp), + {ok, http2} = gun:await_up(Pid), + post_loop(Pid, 1000), %% Fill buffer + receive + {gun_error, Pid, _, {closed, {error, _}}} -> + gun:close(Pid); + Msg -> + error({fail, Msg}) + after 5000 -> + error(timeout) + end. + +http2_send_ping_fail(_) -> + doc("Handle send failures of ping in http2."), + {ok, ListenSocket} = gen_tcp:listen(0, [binary, {active, false}]), + {ok, {_, Port}} = inet:sockname(ListenSocket), + {ok, Pid} = gun:open("localhost", Port, #{protocols => [http2], + http2_opts => #{keepalive => 1}, + tcp_opts => [{send_timeout, 250}, + {send_timeout_close, true}, + {sndbuf, 256}, + {nodelay, true}] + }), + {ok, ClientSocket} = gen_tcp:accept(ListenSocket, 5000), + inet:setopts(ClientSocket, [{recbuf, 256}]), + http2_handshake(ClientSocket, gen_tcp), + {ok, http2} = gun:await_up(Pid), + receive + {gun_down, Pid, http2, {error, _}, []} -> + gun:close(Pid); + Msg -> + error({fail, Msg}) + after 5000 -> + error(timeout) + end. + +http2_send_ping_ack_fail(_) -> + doc("Handle send failures of ping ack in http2."), + {ok, ListenSocket} = gen_tcp:listen(0, [binary, {active, false}]), + {ok, {_, Port}} = inet:sockname(ListenSocket), + {ok, Pid} = gun:open("localhost", Port, #{protocols => [http2], + http2_opts => #{keepalive => infinity}, + tcp_opts => [{send_timeout, 250}, + {send_timeout_close, true}, + {sndbuf, 256}, + {nodelay, true}] + }), + {ok, ClientSocket} = gen_tcp:accept(ListenSocket, 5000), + inet:setopts(ClientSocket, [{recbuf, 256}]), + http2_handshake(ClientSocket, gen_tcp), + {ok, http2} = gun:await_up(Pid), + ping_loop(ClientSocket, 1800), %% Send pings triggering ping acks + receive + {gun_down, Pid, http2, {error, _}, []} -> + gun:close(Pid); + Msg -> + error({fail, Msg}) + after 5000 -> + error(timeout) + end. + +%% Helpers + +post_loop(_Pid, 0) -> + ok; +post_loop(Pid, Loops) -> + Body = <<0:1000>>, + gun:post(Pid, "/organizations/ninenines", + [{<<"content-type">>, "application/octet-stream"}], + Body), + post_loop(Pid, Loops - 1). + +ping_loop(_Socket, 0) -> + ok; +ping_loop(Socket, Loops) -> + gun_tcp:send(Socket, cow_http2:ping(0)), + ping_loop(Socket, Loops - 1).