Skip to content

Commit

Permalink
Handle send errors
Browse files Browse the repository at this point in the history
To get faster reaction times on failing socket-send of echos/requests
we now handle the response from the send call where applicable.
  • Loading branch information
bjosv authored and zuiderkwast committed Apr 26, 2021
1 parent 30a971a commit 8b89aa3
Show file tree
Hide file tree
Showing 9 changed files with 647 additions and 341 deletions.
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{deps, [
{cowlib,".*",{git,"https://github.com/ninenines/cowlib","2.10.0"}}
{cowlib,".*",{git,"https://github.com/ninenines/cowlib","master"}}
]}.
{erl_opts, [debug_info,warn_export_vars,warn_shadow_vars,warn_obsolete_guard]}.
64 changes: 36 additions & 28 deletions src/gun.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1226,15 +1226,23 @@ connected(internal, {connected, Socket, NewProtocol},
State0=#state{owner=Owner, opts=Opts, transport=Transport}) ->
{Protocol, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts),
{StateName, ProtoState} = Protocol:init(Owner, Socket, Transport, ProtoOpts),
State1 = State0#state{socket=Socket, protocol=Protocol, protocol_state=ProtoState},
%% @todo Don't send gun_up and gun_down if Protocol:init/4 or active/1 failes here.
Owner ! {gun_up, self(), Protocol:name()},
case active(State0#state{socket=Socket, protocol=Protocol, protocol_state=ProtoState}) of
{ok, State} ->
case Protocol:has_keepalive() of
true -> {next_state, StateName, keepalive_timeout(State)};
false -> {next_state, StateName, State}
end;
Disconnect ->
Disconnect
case StateName of
Error={error, _} ->
disconnect(State1, Error);
StateName ->
case active(State1) of
{ok, State2} ->
State = case Protocol:has_keepalive() of
true -> keepalive_timeout(State2);
false -> State2
end,
{next_state, StateName, State};
Disconnect ->
Disconnect
end
end;
%% Public HTTP interface.
%%
Expand All @@ -1244,32 +1252,31 @@ connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers, InitialFlow
State=#state{origin_host=Host, origin_port=Port,
protocol=Protocol, protocol_state=ProtoState, cookie_store=CookieStore0,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{ProtoState2, CookieStore, EvHandlerState} = Protocol:headers(ProtoState,
{Commands, CookieStore, EvHandlerState} = Protocol:headers(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo,
Method, Host, Port, Path, Headers,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
{keep_state, State#state{protocol_state=ProtoState2, cookie_store=CookieStore,
event_handler_state=EvHandlerState}};
commands(Commands, State#state{cookie_store=CookieStore,
event_handler_state=EvHandlerState});
connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body, InitialFlow},
State=#state{origin_host=Host, origin_port=Port,
protocol=Protocol, protocol_state=ProtoState, cookie_store=CookieStore0,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{ProtoState2, CookieStore, EvHandlerState} = Protocol:request(ProtoState,
{Commands, CookieStore, EvHandlerState} = Protocol:request(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo,
Method, Host, Port, Path, Headers, Body,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
{keep_state, State#state{protocol_state=ProtoState2, cookie_store=CookieStore,
event_handler_state=EvHandlerState}};
commands(Commands, State#state{cookie_store=CookieStore,
event_handler_state=EvHandlerState});
connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow},
State=#state{origin_host=Host, origin_port=Port,
protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{ProtoState2, EvHandlerState} = Protocol:connect(ProtoState,
{Commands, EvHandlerState} = Protocol:connect(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo,
Destination, #{host => Host, port => Port},
Headers, InitialFlow, EvHandler, EvHandlerState0),
{keep_state, State#state{protocol_state=ProtoState2,
event_handler_state=EvHandlerState}};
commands(Commands, State#state{event_handler_state=EvHandlerState});
%% Public Websocket interface.
connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers}, State=#state{opts=Opts}) ->
WsOpts = maps:get(ws_opts, Opts, #{}),
Expand All @@ -1284,11 +1291,11 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers, WsOpts},
opts => WsOpts
}, EvHandlerState0),
%% @todo Can fail if HTTP/1.0.
{ProtoState2, CookieStore, EvHandlerState} = Protocol:ws_upgrade(ProtoState,
{Commands, CookieStore, EvHandlerState} = Protocol:ws_upgrade(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo,
Host, Port, Path, Headers, WsOpts, CookieStore0, EvHandler, EvHandlerState1),
{keep_state, State#state{protocol_state=ProtoState2, cookie_store=CookieStore,
event_handler_state=EvHandlerState}};
commands(Commands, State#state{cookie_store=CookieStore,
event_handler_state=EvHandlerState});
%% @todo Maybe better standardize the protocol callbacks argument orders.
connected(cast, {ws_send, ReplyTo, StreamRef, Frames}, State=#state{
protocol=Protocol, protocol_state=ProtoState,
Expand Down Expand Up @@ -1365,10 +1372,10 @@ closing(Type, Event, State) ->
handle_common_connected(cast, {data, ReplyTo, StreamRef, IsFin, Data}, _,
State=#state{protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{ProtoState2, EvHandlerState} = Protocol:data(ProtoState,
{Commands, EvHandlerState} = Protocol:data(ProtoState,
dereference_stream_ref(StreamRef, State),
ReplyTo, IsFin, Data, EvHandler, EvHandlerState0),
{keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
commands(Commands, State#state{event_handler_state=EvHandlerState});
handle_common_connected(info, {timeout, TRef, Name}, _,
State=#state{protocol=Protocol, protocol_state=ProtoState}) ->
Commands = Protocol:timeout(ProtoState, Name, TRef),
Expand Down Expand Up @@ -1412,21 +1419,21 @@ handle_common_connected_no_input(info, {handle_continue, StreamRef, Msg}, _,
event_handler_state=EvHandlerState}));
%% Timeouts.
handle_common_connected_no_input(info, keepalive, _,
State=#state{protocol=Protocol, protocol_state=ProtoState0,
State0=#state{protocol=Protocol, protocol_state=ProtoState0,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{ProtoState, EvHandlerState} = Protocol:keepalive(ProtoState0, EvHandler, EvHandlerState0),
{keep_state, keepalive_timeout(State#state{
protocol_state=ProtoState, event_handler_state=EvHandlerState})};
{Commands, EvHandlerState} = Protocol:keepalive(ProtoState0, EvHandler, EvHandlerState0),
commands(Commands, keepalive_timeout(State0#state{
event_handler_state=EvHandlerState}));
handle_common_connected_no_input(cast, {update_flow, ReplyTo, StreamRef, Flow}, _,
State0=#state{protocol=Protocol, protocol_state=ProtoState}) ->
Commands = Protocol:update_flow(ProtoState, ReplyTo, StreamRef, Flow),
maybe_active(commands(Commands, State0));
handle_common_connected_no_input(cast, {cancel, ReplyTo, StreamRef}, _,
State=#state{protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState,
{Commands, EvHandlerState} = Protocol:cancel(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo, EvHandler, EvHandlerState0),
{keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
commands(Commands, State#state{event_handler_state=EvHandlerState});
handle_common_connected_no_input({call, From}, {stream_info, StreamRef}, _,
State=#state{intermediaries=Intermediaries0, protocol=Protocol, protocol_state=ProtoState}) ->
Intermediaries = [I || I=#{protocol := http} <- Intermediaries0],
Expand Down Expand Up @@ -1665,6 +1672,7 @@ commands([{switch_protocol, NewProtocol, ReplyTo}], State0=#state{
#{tunnel_transport := _} -> ProtoOpts0;
_ -> ProtoOpts0#{tunnel_transport => tcp}
end,
%% @todo Handle error result from Protocol:init/4
{StateName, ProtoState} = Protocol:init(ReplyTo, Socket, Transport, ProtoOpts),
ProtocolChangedEvent = case ProtoOpts of
#{stream_ref := StreamRef} ->
Expand Down
133 changes: 75 additions & 58 deletions src/gun_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -546,45 +546,47 @@ close_streams(State, [#stream{ref=StreamRef, reply_to=ReplyTo}|Tail], Reason) ->
close_streams(State, Tail, Reason).

%% We don't send a keep-alive when a CONNECT request was initiated.
keepalive(State=#http_state{streams=[#stream{ref={connect, _, _}}]}, _, EvHandlerState) ->
{State, EvHandlerState};
keepalive(#http_state{streams=[#stream{ref={connect, _, _}}]}, _, EvHandlerState) ->
{[], EvHandlerState};
%% We can only keep-alive by sending an empty line in-between streams.
keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}, _, EvHandlerState) ->
Transport:send(Socket, <<"\r\n">>),
{State, EvHandlerState};
keepalive(State, _, EvHandlerState) ->
{State, EvHandlerState}.
keepalive(#http_state{socket=Socket, transport=Transport, out=head}, _, EvHandlerState) ->
case Transport:send(Socket, <<"\r\n">>) of
ok -> {[], EvHandlerState};
Error={error, _} -> {Error, EvHandlerState}
end;
keepalive(_State, _, EvHandlerState) ->
{[], EvHandlerState}.

headers(State, StreamRef, ReplyTo, _, _, _, _, _, _, CookieStore, _, EvHandlerState)
when is_list(StreamRef) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{badstate, "The stream is not a tunnel."}},
{State, CookieStore, EvHandlerState};
headers(State=#http_state{opts=Opts, out=head},
{[], CookieStore, EvHandlerState};
headers(State0=#http_state{opts=Opts, out=head},
StreamRef, ReplyTo, Method, Host, Port, Path, Headers,
InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) ->
{Authority, Conn, Out, CookieStore, EvHandlerState} = send_request(State,
{Authority, Conn, Out, CookieStore, EvHandlerState} = send_request(State0,
StreamRef, ReplyTo, Method, Host, Port, Path, Headers, undefined,
CookieStore0, EvHandler, EvHandlerState0, ?FUNCTION_NAME),
InitialFlow = initial_flow(InitialFlow0, Opts),
{new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo,
Method, Authority, Path, InitialFlow),
CookieStore, EvHandlerState}.
State = new_stream(State0#http_state{connection=Conn, out=Out}, StreamRef,
ReplyTo, Method, Authority, Path, InitialFlow),
{{state, State}, CookieStore, EvHandlerState}.

request(State, StreamRef, ReplyTo, _, _, _, _, _, _, _, CookieStore, _, EvHandlerState)
when is_list(StreamRef) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{badstate, "The stream is not a tunnel."}},
{State, CookieStore, EvHandlerState};
{[], CookieStore, EvHandlerState};
request(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo,
Method, Host, Port, Path, Headers, Body,
InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) ->
{Authority, Conn, Out, CookieStore, EvHandlerState} = send_request(State,
StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body,
CookieStore0, EvHandler, EvHandlerState0, ?FUNCTION_NAME),
InitialFlow = initial_flow(InitialFlow0, Opts),
{new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo,
Method, Authority, Path, InitialFlow),
{{state, new_stream(State#http_state{connection=Conn, out=Out}, StreamRef,
ReplyTo, Method, Authority, Path, InitialFlow)},
CookieStore, EvHandlerState}.

initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow;
Expand Down Expand Up @@ -631,6 +633,7 @@ send_request(State=#http_state{socket=Socket, transport=Transport, version=Versi
headers => Headers
},
EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
%% @todo Handle send errors.
Transport:send(Socket, [
cow_http:request(Method, Path, Version, Headers),
[Body || Body =/= undefined]]),
Expand Down Expand Up @@ -689,41 +692,52 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
DataLength = iolist_size(Data),
case Out of
body_chunked when Version =:= 'HTTP/1.1', IsFin =:= fin ->
case Data of
DataToSend = case Data of
<<>> ->
Transport:send(Socket, cow_http_te:last_chunk());
cow_http_te:last_chunk();
_ ->
Transport:send(Socket, [
[
cow_http_te:chunk(Data),
cow_http_te:last_chunk()
])
]
end,
RequestEndEvent = #{
stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
},
EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0),
{State#http_state{out=head}, EvHandlerState};
case Transport:send(Socket, DataToSend) of
ok ->
RequestEndEvent = #{
stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
},
EvHandlerState = EvHandler:request_end(RequestEndEvent,
EvHandlerState0),
{{state, State#http_state{out=head}}, EvHandlerState};
Error={error, _} ->
{Error, EvHandlerState0}
end;
body_chunked when Version =:= 'HTTP/1.1' ->
Transport:send(Socket, cow_http_te:chunk(Data)),
{State, EvHandlerState0};
case Transport:send(Socket, cow_http_te:chunk(Data)) of
ok -> {[], EvHandlerState0};
Error={error, _} -> {Error, EvHandlerState0}
end;
{body, Length} when DataLength =< Length ->
Transport:send(Socket, Data),
Length2 = Length - DataLength,
if
Length2 =:= 0, IsFin =:= fin ->
case Transport:send(Socket, Data) of
ok when Length2 =:= 0, IsFin =:= fin ->
RequestEndEvent = #{
stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
},
EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0),
{State#http_state{out=head}, EvHandlerState};
Length2 > 0, IsFin =:= nofin ->
{State#http_state{out={body, Length2}}, EvHandlerState0}
{{state, State#http_state{out=head}}, EvHandlerState};
ok when Length2 > 0, IsFin =:= nofin ->
{{state, State#http_state{out={body, Length2}}}, EvHandlerState0};
Error={error, _} ->
{Error, EvHandlerState0}
end;
body_chunked -> %% HTTP/1.0
Transport:send(Socket, Data),
{State, EvHandlerState0}
case Transport:send(Socket, Data) of
ok -> {[], EvHandlerState0};
Error={error, _} -> {Error, EvHandlerState0}
end
end;
_ ->
{error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0}
Expand All @@ -733,12 +747,12 @@ connect(State, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState)
when is_list(StreamRef) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{badstate, "The stream is not a tunnel."}},
{State, EvHandlerState};
{[], EvHandlerState};
connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState)
when Streams =/= [] ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"CONNECT can only be used with HTTP/1.1 when no other streams are active."}},
{State, EvHandlerState};
{[], EvHandlerState};
connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version},
StreamRef, ReplyTo, Destination=#{host := Host0}, _TunnelInfo, Headers0, InitialFlow0,
EvHandler, EvHandlerState0) ->
Expand Down Expand Up @@ -775,19 +789,22 @@ connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version
headers => Headers
},
EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
Transport:send(Socket, [
cow_http:request(<<"CONNECT">>, Authority, Version, Headers)
]),
EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1),
RequestEndEvent = #{
stream_ref => RealStreamRef,
reply_to => ReplyTo
},
EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2),
InitialFlow = initial_flow(InitialFlow0, Opts),
{new_stream(State, {connect, StreamRef, Destination}, ReplyTo,
<<"CONNECT">>, Authority, <<>>, InitialFlow),
EvHandlerState}.
case Transport:send(Socket, cow_http:request(<<"CONNECT">>,
Authority, Version, Headers)) of
ok ->
EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1),
RequestEndEvent = #{
stream_ref => RealStreamRef,
reply_to => ReplyTo
},
EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2),
InitialFlow = initial_flow(InitialFlow0, Opts),
{{state, new_stream(State, {connect, StreamRef, Destination},
ReplyTo, <<"CONNECT">>, Authority, <<>>, InitialFlow)},
EvHandlerState};
Error={error, _} ->
{Error, EvHandlerState1}
end.

%% We can't cancel anything, we can just stop forwarding messages to the owner.
cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
Expand All @@ -800,7 +817,7 @@ cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
endpoint => local,
reason => cancel
}, EvHandlerState0),
{State, EvHandlerState};
{{state, State}, EvHandlerState};
false ->
{error_stream_not_found(State0, StreamRef, ReplyTo), EvHandlerState0}
end.
Expand Down Expand Up @@ -830,12 +847,12 @@ down(#http_state{streams=Streams}) ->
error_stream_closed(State, StreamRef, ReplyTo) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream has already been closed."}},
State.
{state, State}.

error_stream_not_found(State, StreamRef, ReplyTo) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream cannot be found."}},
State.
{state, State}.

%% Headers information retrieval.

Expand Down Expand Up @@ -926,12 +943,12 @@ ws_upgrade(State, StreamRef, ReplyTo, _, _, _, _, _, CookieStore, _, EvHandlerSt
when is_list(StreamRef) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{badstate, "The stream is not a tunnel."}},
{State, CookieStore, EvHandlerState};
{[], CookieStore, EvHandlerState};
ws_upgrade(State=#http_state{version='HTTP/1.0'},
StreamRef, ReplyTo, _, _, _, _, _, CookieStore, _, EvHandlerState) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"Websocket cannot be used over an HTTP/1.0 connection."}},
{State, CookieStore, EvHandlerState};
{[], CookieStore, EvHandlerState};
ws_upgrade(State=#http_state{out=head}, StreamRef, ReplyTo,
Host, Port, Path, Headers0, WsOpts, CookieStore0, EvHandler, EvHandlerState0) ->
{Headers1, GunExtensions} = case maps:get(compress, WsOpts, false) of
Expand Down Expand Up @@ -959,9 +976,9 @@ ws_upgrade(State=#http_state{out=head}, StreamRef, ReplyTo,
StreamRef, ReplyTo, <<"GET">>, Host, Port, Path, Headers, undefined,
CookieStore0, EvHandler, EvHandlerState0, ?FUNCTION_NAME),
InitialFlow = maps:get(flow, WsOpts, infinity),
{new_stream(State#http_state{connection=Conn, out=Out},
{{state, new_stream(State#http_state{connection=Conn, out=Out},
#websocket{ref=StreamRef, reply_to=ReplyTo, key=Key, extensions=GunExtensions, opts=WsOpts},
ReplyTo, <<"GET">>, Authority, Path, InitialFlow),
ReplyTo, <<"GET">>, Authority, Path, InitialFlow)},
CookieStore, EvHandlerState}.

ws_handshake(Buffer, State, Ws=#websocket{key=Key}, Headers) ->
Expand Down
Loading

0 comments on commit 8b89aa3

Please sign in to comment.