Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle send errors #243

Closed
wants to merge 8 commits into from
30 changes: 19 additions & 11 deletions src/gun.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1237,17 +1237,25 @@ connected_ws_only(Type, Event, State) ->
connected(internal, {connected, Socket, NewProtocol},
State0=#state{owner=Owner, opts=Opts, transport=Transport}) ->
{Protocol, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts),
%% @todo Handle error result from Protocol:init/4
{ok, StateName, ProtoState} = Protocol:init(Owner, Socket, Transport, ProtoOpts),
Owner ! {gun_up, self(), Protocol:name()},
case active(State0#state{socket=Socket, protocol=Protocol, protocol_state=ProtoState}) of
{ok, State} ->
case Protocol:has_keepalive() of
true -> {next_state, StateName, keepalive_timeout(State)};
false -> {next_state, StateName, State}
end;
Disconnect ->
Disconnect
case Protocol:init(Owner, Socket, Transport, ProtoOpts) of
Error={error, _} ->
%% @todo Don't send gun_up and gun_down if Protocol:init/4 failes here.
Owner ! {gun_up, self(), Protocol:name()},
disconnect(State0, Error);
{ok, StateName, ProtoState} ->
%% @todo Don't send gun_up and gun_down if active/1 failes here.
Owner ! {gun_up, self(), Protocol:name()},
State1 = State0#state{socket=Socket, protocol=Protocol, protocol_state=ProtoState},
case active(State1) of
{ok, State2} ->
State = case Protocol:has_keepalive() of
true -> keepalive_timeout(State2);
false -> State2
end,
{next_state, StateName, State};
Disconnect ->
Disconnect
end
end;
%% Public HTTP interface.
%%
Expand Down
136 changes: 84 additions & 52 deletions src/gun_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,10 @@ keepalive(#http_state{streams=[#stream{ref={connect, _, _}}]}, _, EvHandlerState
{[], EvHandlerState};
essen marked this conversation as resolved.
Show resolved Hide resolved
%% We can only keep-alive by sending an empty line in-between streams.
keepalive(#http_state{socket=Socket, transport=Transport, out=head}, _, EvHandlerState) ->
Transport:send(Socket, <<"\r\n">>),
{[], EvHandlerState};
case Transport:send(Socket, <<"\r\n">>) of
ok -> {[], EvHandlerState};
Error={error, _} -> {Error, EvHandlerState}
end;
keepalive(_State, _, EvHandlerState) ->
{[], EvHandlerState}.
essen marked this conversation as resolved.
Show resolved Hide resolved

Expand All @@ -563,13 +565,18 @@ headers(State, StreamRef, ReplyTo, _, _, _, _, _, _, CookieStore, _, EvHandlerSt
headers(State=#http_state{opts=Opts, out=head},
StreamRef, ReplyTo, Method, Host, Port, Path, Headers,
InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) ->
{Authority, Conn, Out, CookieStore, EvHandlerState} = send_request(State,
{SendResult, Authority, Conn, Out, CookieStore, EvHandlerState} = send_request(State,
StreamRef, ReplyTo, Method, Host, Port, Path, Headers, undefined,
CookieStore0, EvHandler, EvHandlerState0, ?FUNCTION_NAME),
InitialFlow = initial_flow(InitialFlow0, Opts),
{{state, new_stream(State#http_state{connection=Conn, out=Out}, StreamRef,
ReplyTo, Method, Authority, Path, InitialFlow)},
CookieStore, EvHandlerState}.
Command = case SendResult of
ok ->
InitialFlow = initial_flow(InitialFlow0, Opts),
{state, new_stream(State#http_state{connection=Conn, out=Out}, StreamRef,
ReplyTo, Method, Authority, Path, InitialFlow)};
Error={error, _} ->
Error
end,
{Command, CookieStore, EvHandlerState}.

request(State, StreamRef, ReplyTo, _, _, _, _, _, _, _, CookieStore, _, EvHandlerState)
when is_list(StreamRef) ->
Expand All @@ -579,13 +586,18 @@ request(State, StreamRef, ReplyTo, _, _, _, _, _, _, _, CookieStore, _, EvHandle
request(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo,
Method, Host, Port, Path, Headers, Body,
InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) ->
{Authority, Conn, Out, CookieStore, EvHandlerState} = send_request(State,
{SendResult, Authority, Conn, Out, CookieStore, EvHandlerState} = send_request(State,
StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body,
CookieStore0, EvHandler, EvHandlerState0, ?FUNCTION_NAME),
InitialFlow = initial_flow(InitialFlow0, Opts),
{{state, new_stream(State#http_state{connection=Conn, out=Out}, StreamRef,
ReplyTo, Method, Authority, Path, InitialFlow)},
CookieStore, EvHandlerState}.
Command = case SendResult of
ok ->
InitialFlow = initial_flow(InitialFlow0, Opts),
{state, new_stream(State#http_state{connection=Conn, out=Out}, StreamRef,
ReplyTo, Method, Authority, Path, InitialFlow)};
Error={error, _} ->
Error
end,
{Command, CookieStore, EvHandlerState}.

initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow;
initial_flow(InitialFlow, _) -> InitialFlow.
Expand Down Expand Up @@ -632,7 +644,7 @@ send_request(State=#http_state{socket=Socket, transport=Transport, version=Versi
headers => Headers
},
EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
Transport:send(Socket, [
SendResult = Transport:send(Socket, [
cow_http:request(Method, Path, Version, Headers),
[Body || Body =/= undefined]]),
EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1),
Expand All @@ -646,7 +658,7 @@ send_request(State=#http_state{socket=Socket, transport=Transport, version=Versi
_ ->
EvHandlerState2
end,
{Authority, Conn, Out, CookieStore, EvHandlerState}.
{SendResult, Authority, Conn, Out, CookieStore, EvHandlerState}.

host_header(TransportName, Host0, Port) ->
Host = case Host0 of
Expand Down Expand Up @@ -692,41 +704,52 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
DataLength = iolist_size(Data),
case Out of
body_chunked when Version =:= 'HTTP/1.1', IsFin =:= fin ->
if
DataToSend = if
DataLength =:= 0 ->
Transport:send(Socket, cow_http_te:last_chunk());
cow_http_te:last_chunk();
true ->
Transport:send(Socket, [
[
cow_http_te:chunk(Data),
cow_http_te:last_chunk()
])
]
end,
RequestEndEvent = #{
stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
},
EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0),
{{state, State#http_state{out=head}}, EvHandlerState};
case Transport:send(Socket, DataToSend) of
ok ->
RequestEndEvent = #{
stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
},
EvHandlerState = EvHandler:request_end(RequestEndEvent,
EvHandlerState0),
{{state, State#http_state{out=head}}, EvHandlerState};
essen marked this conversation as resolved.
Show resolved Hide resolved
Error={error, _} ->
{Error, EvHandlerState0}
end;
body_chunked when Version =:= 'HTTP/1.1' ->
Transport:send(Socket, cow_http_te:chunk(Data)),
{[], EvHandlerState0};
case Transport:send(Socket, cow_http_te:chunk(Data)) of
ok -> {[], EvHandlerState0};
Error={error, _} -> {Error, EvHandlerState0}
end;
{body, Length} when DataLength =< Length ->
Transport:send(Socket, Data),
Length2 = Length - DataLength,
if
Length2 =:= 0, IsFin =:= fin ->
case Transport:send(Socket, Data) of
ok when Length2 =:= 0, IsFin =:= fin ->
RequestEndEvent = #{
stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
},
EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0),
{{state, State#http_state{out=head}}, EvHandlerState};
Length2 > 0, IsFin =:= nofin ->
{{state, State#http_state{out={body, Length2}}}, EvHandlerState0}
ok when Length2 > 0, IsFin =:= nofin ->
{{state, State#http_state{out={body, Length2}}}, EvHandlerState0};
essen marked this conversation as resolved.
Show resolved Hide resolved
Error={error, _} ->
{Error, EvHandlerState0}
end;
body_chunked -> %% HTTP/1.0
Transport:send(Socket, Data),
{[], EvHandlerState0}
case Transport:send(Socket, Data) of
ok -> {[], EvHandlerState0};
essen marked this conversation as resolved.
Show resolved Hide resolved
Error={error, _} -> {Error, EvHandlerState0}
end
end;
_ ->
error_stream_not_found(State, StreamRef, ReplyTo),
Expand Down Expand Up @@ -779,19 +802,22 @@ connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version
headers => Headers
},
EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
Transport:send(Socket, [
cow_http:request(<<"CONNECT">>, Authority, Version, Headers)
]),
EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1),
RequestEndEvent = #{
stream_ref => RealStreamRef,
reply_to => ReplyTo
},
EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2),
InitialFlow = initial_flow(InitialFlow0, Opts),
{{state, new_stream(State, {connect, StreamRef, Destination}, ReplyTo,
<<"CONNECT">>, Authority, <<>>, InitialFlow)},
EvHandlerState}.
case Transport:send(Socket, cow_http:request(<<"CONNECT">>,
Authority, Version, Headers)) of
ok ->
EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1),
RequestEndEvent = #{
stream_ref => RealStreamRef,
reply_to => ReplyTo
},
EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2),
InitialFlow = initial_flow(InitialFlow0, Opts),
{{state, new_stream(State, {connect, StreamRef, Destination},
ReplyTo, <<"CONNECT">>, Authority, <<>>, InitialFlow)},
EvHandlerState};
Error={error, _} ->
{Error, EvHandlerState1}
end.

%% We can't cancel anything, we can just stop forwarding messages to the owner.
cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
Expand Down Expand Up @@ -960,14 +986,20 @@ ws_upgrade(State=#http_state{out=head}, StreamRef, ReplyTo,
{<<"sec-websocket-key">>, Key}
|Headers2
],
{Authority, Conn, Out, CookieStore, EvHandlerState} = send_request(State,
{SendResult, Authority, Conn, Out, CookieStore, EvHandlerState} = send_request(State,
StreamRef, ReplyTo, <<"GET">>, Host, Port, Path, Headers, undefined,
CookieStore0, EvHandler, EvHandlerState0, ?FUNCTION_NAME),
InitialFlow = maps:get(flow, WsOpts, infinity),
{{state, new_stream(State#http_state{connection=Conn, out=Out},
#websocket{ref=StreamRef, reply_to=ReplyTo, key=Key, extensions=GunExtensions, opts=WsOpts},
ReplyTo, <<"GET">>, Authority, Path, InitialFlow)},
CookieStore, EvHandlerState}.
Command = case SendResult of
ok ->
InitialFlow = maps:get(flow, WsOpts, infinity),
{state, new_stream(State#http_state{connection=Conn, out=Out},
#websocket{ref=StreamRef, reply_to=ReplyTo, key=Key,
extensions=GunExtensions, opts=WsOpts},
ReplyTo, <<"GET">>, Authority, Path, InitialFlow)};
Error={error, _} ->
Error
end,
{Command, CookieStore, EvHandlerState}.

ws_handshake(Buffer, State, Ws=#websocket{key=Key}, Headers) ->
%% @todo check upgrade, connection
Expand Down
Loading