Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make many gun_http2 functions return state or error tuple #301

Closed
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 99 additions & 73 deletions src/gun_http2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ parse(Data, State0=#http2_state{status=preface, http2_machine=HTTP2Machine},
case frame(State0#http2_state{status=connected}, Frame, CookieStore0, EvHandler, EvHandlerState0) of
{Error={error, _}, CookieStore, EvHandlerState} ->
{Error, CookieStore, EvHandlerState};
{State, CookieStore, EvHandlerState} ->
{{state, State}, CookieStore, EvHandlerState} ->
parse(Rest, State, CookieStore, EvHandler, EvHandlerState)
end;
more ->
Expand All @@ -228,7 +228,7 @@ parse(Data, State0=#http2_state{status=Status, http2_machine=HTTP2Machine, strea
case frame(State0, Frame, CookieStore0, EvHandler, EvHandlerState0) of
{Error={error, _}, CookieStore, EvHandlerState} ->
{Error, CookieStore, EvHandlerState};
{State, CookieStore, EvHandlerState} ->
{{state, State}, CookieStore, EvHandlerState} ->
parse(Rest, State, CookieStore, EvHandler, EvHandlerState)
end;
{ignore, Rest} ->
Expand All @@ -239,8 +239,12 @@ parse(Data, State0=#http2_state{status=Status, http2_machine=HTTP2Machine, strea
parse(Rest, State, CookieStore0, EvHandler, EvHandlerState0)
end;
{stream_error, StreamID, Reason, Human, Rest} ->
parse(Rest, reset_stream(State0, StreamID, {stream_error, Reason, Human}),
CookieStore0, EvHandler, EvHandlerState0);
case reset_stream(State0, StreamID, {stream_error, Reason, Human}) of
{state, State} ->
parse(Rest, State, CookieStore0, EvHandler, EvHandlerState0);
Error={error, _} ->
Error
end;
Error = {connection_error, _, _} ->
{connection_error(State0, Error), CookieStore0, EvHandlerState0};
%% If we both received and sent a GOAWAY frame and there are no streams
Expand Down Expand Up @@ -290,41 +294,42 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, CookieStore, EvHan
{maybe_ack_or_notify(State#http2_state{http2_machine=HTTP2Machine}, Frame),
CookieStore, EvHandlerState};
{ok, {data, StreamID, IsFin, Data}, HTTP2Machine} ->
{StateRet, CookieStoreRet, EvHandlerStateRet} = data_frame(
State#http2_state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data,
CookieStore, EvHandler, EvHandlerState),
case StateRet of
{state, State1} -> {State1, CookieStoreRet, EvHandlerStateRet};
Error -> {Error, CookieStoreRet, EvHandlerStateRet}
end;
data_frame(State#http2_state{http2_machine=HTTP2Machine},
StreamID, IsFin, Data, CookieStore, EvHandler, EvHandlerState);
{ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP2Machine} ->
headers_frame(State#http2_state{http2_machine=HTTP2Machine},
{StateRet, CookieStoreRet, EvHandlerStateRet} = headers_frame(
State#http2_state{http2_machine=HTTP2Machine},
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
StreamID, IsFin, Headers, PseudoHeaders, BodyLen,
CookieStore, EvHandler, EvHandlerState);
CookieStore, EvHandler, EvHandlerState),
{{state, StateRet}, CookieStoreRet, EvHandlerStateRet};
{ok, {trailers, StreamID, Trailers}, HTTP2Machine} ->
{StateRet, EvHandlerStateRet} = trailers_frame(
State#http2_state{http2_machine=HTTP2Machine},
StreamID, Trailers, EvHandler, EvHandlerState),
{StateRet, CookieStore, EvHandlerStateRet};
{{state, StateRet}, CookieStore, EvHandlerStateRet};
{ok, {rst_stream, StreamID, Reason}, HTTP2Machine} ->
{StateRet, EvHandlerStateRet} = rst_stream_frame(
State#http2_state{http2_machine=HTTP2Machine},
StreamID, Reason, EvHandler, EvHandlerState),
{StateRet, CookieStore, EvHandlerStateRet};
{{state, StateRet}, CookieStore, EvHandlerStateRet};
{ok, {push_promise, StreamID, PromisedStreamID, Headers, PseudoHeaders}, HTTP2Machine} ->
{StateRet, EvHandlerStateRet} = push_promise_frame(
{StateOrError, EvHandlerStateRet} = push_promise_frame(
State#http2_state{http2_machine=HTTP2Machine},
StreamID, PromisedStreamID, Headers, PseudoHeaders,
EvHandler, EvHandlerState),
{StateRet, CookieStore, EvHandlerStateRet};
{StateOrError, CookieStore, EvHandlerStateRet};
{ok, GoAway={goaway, _, _, _}, HTTP2Machine} ->
{goaway(State#http2_state{http2_machine=HTTP2Machine}, GoAway),
CookieStore, EvHandlerState};
{send, SendData, HTTP2Machine} ->
{StateRet, EvHandlerStateRet} = send_data(
maybe_ack_or_notify(State#http2_state{http2_machine=HTTP2Machine}, Frame),
SendData, EvHandler, EvHandlerState),
{StateRet, CookieStore, EvHandlerStateRet};
case maybe_ack_or_notify(State#http2_state{http2_machine=HTTP2Machine}, Frame) of
{state, State1} ->
{StateOrError, EvHandlerStateRet} = send_data(State1,
SendData, EvHandler, EvHandlerState),
{StateOrError, CookieStore, EvHandlerStateRet};
Error={error, _} ->
{Error, CookieStore, EvHandlerState}
end;
{error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} ->
{reset_stream(State#http2_state{http2_machine=HTTP2Machine},
StreamID, {stream_error, Reason, Human}),
Expand Down Expand Up @@ -352,14 +357,14 @@ maybe_ack_or_notify(State=#http2_state{reply_to=ReplyTo, socket=Socket,
_ ->
ok
end,
State.
{state, State}.

data_frame(State0, StreamID, IsFin, Data, CookieStore0, EvHandler, EvHandlerState0) ->
case get_stream_by_id(State0, StreamID) of
Stream=#stream{tunnel=undefined} ->
{State, EvHandlerState} = data_frame1(State0,
StreamID, IsFin, Data, EvHandler, EvHandlerState0, Stream),
{State, CookieStore0, EvHandlerState};
{{state, State}, CookieStore0, EvHandlerState};
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
% %% @todo What about IsFin?
{Commands, CookieStore, EvHandlerState1} = Proto:handle(Data,
Expand All @@ -377,9 +382,13 @@ tunnel_commands([], Stream, State, _EvHandler, EvHandlerState) ->
{{state, store_stream(State, Stream)}, EvHandlerState};
tunnel_commands([{send, IsFin, Data}|Tail], Stream=#stream{id=StreamID},
State0, EvHandler, EvHandlerState0) ->
{State, EvHandlerState} = maybe_send_data(State0, StreamID,
IsFin, Data, EvHandler, EvHandlerState0),
tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState);
case maybe_send_data(State0, StreamID,
IsFin, Data, EvHandler, EvHandlerState0) of
{{state, State}, EvHandlerState} ->
tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState);
ErrorResult={{error, _Reason}, _EvHandlerState} ->
ErrorResult
end;
tunnel_commands([{state, ProtoState}|Tail], Stream=#stream{tunnel=Tunnel},
State, EvHandler, EvHandlerState) ->
tunnel_commands(Tail, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}},
Expand Down Expand Up @@ -412,16 +421,16 @@ data_frame1(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0,
_ -> Flow0 - Dec
end,
State1 = store_stream(State0, Stream#stream{flow=Flow, handler_state=Handlers}),
{State, EvHandlerState} = case byte_size(Data) of
{StateOrError, EvHandlerState} = case byte_size(Data) of
%% We do not send a WINDOW_UPDATE if the DATA frame was of size 0.
0 when IsFin =:= fin ->
EvHandlerState1 = EvHandler:response_end(#{
stream_ref => stream_ref(State1, StreamRef),
reply_to => ReplyTo
}, EvHandlerState0),
{State1, EvHandlerState1};
{{state, State1}, EvHandlerState1};
0 ->
{State1, EvHandlerState0};
{{state, State1}, EvHandlerState0};
_ ->
%% We do not send a stream WINDOW_UPDATE when the flow control kicks in
%% (it'll be sent when the flow recovers) or for the last DATA frame.
Expand All @@ -438,7 +447,12 @@ data_frame1(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0,
{update_window(State1), EvHandlerState1}
end
end,
{maybe_delete_stream(State, StreamID, remote, IsFin), EvHandlerState}.
case StateOrError of
{state, State} ->
{maybe_delete_stream(State, StreamID, remote, IsFin), EvHandlerState};
Error={error, _} ->
{Error, EvHandlerState}
end.
essen marked this conversation as resolved.
Show resolved Hide resolved

headers_frame(State0=#http2_state{opts=Opts},
StreamID, IsFin, Headers, #{status := Status}, _BodyLen,
Expand Down Expand Up @@ -493,17 +507,17 @@ headers_frame_connect(State0=#http2_state{http2_machine=HTTP2Machine0},
close ->
{ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0),
State1 = State0#http2_state{http2_machine=HTTP2Machine},
State = reset_stream(State1, StreamID, {stream_error, cancel,
StateOrError = reset_stream(State1, StreamID, {stream_error, cancel,
'The sec-websocket-extensions header is invalid. (RFC6455 9.1, RFC7692 7)'}),
{State, EvHandlerState};
{StateOrError, EvHandlerState};
Extensions ->
case gun_ws:select_protocol(Headers, WsOpts) of
close ->
{ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0),
State1 = State0#http2_state{http2_machine=HTTP2Machine},
State = reset_stream(State1, StreamID, {stream_error, cancel,
StateOrError = reset_stream(State1, StreamID, {stream_error, cancel,
'The sec-websocket-protocol header is invalid. (RFC6455 4.1)'}),
{State, EvHandlerState};
{StateOrError, EvHandlerState};
Handler ->
headers_frame_connect_websocket(State0, Stream, Headers,
EvHandler, EvHandlerState, Extensions, Handler)
Expand Down Expand Up @@ -712,12 +726,12 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport,
connected ->
NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef,
reply_to=ReplyTo, flow=InitialFlow, authority=Authority, path=Path},
{create_stream(State, NewStream), EvHandlerState};
{{state, create_stream(State, NewStream)}, EvHandlerState};
%% We cancel the push_promise immediately when we are shutting down.
_ ->
{ok, HTTP2Machine} = cow_http2_machine:reset_stream(PromisedStreamID, HTTP2Machine0),
Transport:send(Socket, cow_http2:rst_stream(PromisedStreamID, cancel)),
{State#http2_state{http2_machine=HTTP2Machine}, EvHandlerState}
{{state, State#http2_state{http2_machine=HTTP2Machine}}, EvHandlerState}
end.

ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) ->
Expand Down Expand Up @@ -758,8 +772,8 @@ update_flow(State, _ReplyTo, StreamRef, Inc) ->
if
%% Flow is active again, update the stream's window.
Flow0 =< 0, Flow > 0 ->
{state, update_window(store_stream(State,
Stream#stream{flow=Flow}), StreamID)};
update_window(store_stream(State,
Stream#stream{flow=Flow}), StreamID);
true ->
{state, store_stream(State, Stream#stream{flow=Flow})}
end;
Expand All @@ -772,10 +786,10 @@ update_window(State=#http2_state{socket=Socket, transport=Transport,
opts=#{initial_connection_window_size := ConnWindow}, http2_machine=HTTP2Machine0}) ->
case cow_http2_machine:ensure_window(ConnWindow, HTTP2Machine0) of
ok ->
State;
{state, State};
{ok, Increment, HTTP2Machine} ->
Transport:send(Socket, cow_http2:window_update(Increment)),
State#http2_state{http2_machine=HTTP2Machine}
{state, State#http2_state{http2_machine=HTTP2Machine}}
end.

%% Update both the connection and the stream's window.
Expand All @@ -794,7 +808,7 @@ update_window(State=#http2_state{socket=Socket, transport=Transport,
{<<>>, <<>>} -> ok;
_ -> Transport:send(Socket, [Data1, Data2])
end,
State#http2_state{http2_machine=HTTP2Machine}.
{state, State#http2_state{http2_machine=HTTP2Machine}}.

%% We may have to cancel streams even if we receive multiple
%% GOAWAY frames as the LastStreamID value may be lower than
Expand All @@ -812,9 +826,9 @@ goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTT
Transport:send(Socket, cow_http2:goaway(
cow_http2_machine:get_last_streamid(HTTP2Machine),
no_error, <<>>)),
State#http2_state{status=goaway};
{state, State#http2_state{status=goaway}};
_ ->
State
{state, State}
end.

%% Cancel server-initiated streams that are above LastStreamID.
Expand Down Expand Up @@ -960,9 +974,9 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
},
{{state, State}, CookieStore, EvHandler:request_end(RequestEndEvent, EvHandlerState)};
nofin ->
{StateRet, EvHandlerStateRet} = maybe_send_data(
{StateOrError, EvHandlerStateRet} = maybe_send_data(
State, StreamID, fin, Body, EvHandler, EvHandlerState),
{{state, StateRet}, CookieStore, EvHandlerStateRet}
{StateOrError, CookieStore, EvHandlerStateRet}
end;
%% Tunneled request.
request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
Expand Down Expand Up @@ -1037,9 +1051,8 @@ data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin,
error_stream_closed(State, StreamRef, ReplyTo),
{[], EvHandlerState};
{ok, _, _} when Tunnel =:= undefined ->
{State1, EvHandlerStateRet} = maybe_send_data(State,
StreamID, IsFin, Data, EvHandler, EvHandlerState),
{{state, State1}, EvHandlerStateRet};
maybe_send_data(State,
StreamID, IsFin, Data, EvHandler, EvHandlerState);
{ok, _, _} ->
#tunnel{protocol=Proto, protocol_state=ProtoState0} = Tunnel,
{Commands, EvHandlerState1} = Proto:data(ProtoState0, StreamRef,
Expand Down Expand Up @@ -1074,63 +1087,76 @@ maybe_send_data(State=#http2_state{http2_machine=HTTP2Machine0}, StreamID, IsFin
end,
case cow_http2_machine:send_or_queue_data(StreamID, HTTP2Machine0, IsFin, Data) of
{ok, HTTP2Machine} ->
{State#http2_state{http2_machine=HTTP2Machine}, EvHandlerState};
{{state, State#http2_state{http2_machine=HTTP2Machine}}, EvHandlerState};
{send, SendData, HTTP2Machine} ->
send_data(State#http2_state{http2_machine=HTTP2Machine}, SendData,
EvHandler, EvHandlerState)
end.

send_data(State, [], _, EvHandlerState) ->
{State, EvHandlerState};
{{state, State}, EvHandlerState};
send_data(State0, [{StreamID, IsFin, SendData}|Tail], EvHandler, EvHandlerState0) ->
{State, EvHandlerState} = send_data(State0, StreamID, IsFin, SendData, EvHandler, EvHandlerState0),
send_data(State, Tail, EvHandler, EvHandlerState).
case send_data(State0, StreamID, IsFin, SendData, EvHandler, EvHandlerState0) of
{{state, State}, EvHandlerState} ->
send_data(State, Tail, EvHandler, EvHandlerState);
ErrorResult={{error, _}, _EvHandlerState} ->
ErrorResult
end.

send_data(State0, StreamID, IsFin, [Data], EvHandler, EvHandlerState0) ->
State = send_data_frame(State0, StreamID, IsFin, Data),
EvHandlerState = case IsFin of
nofin ->
EvHandlerState0;
fin ->
#stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID),
RequestEndEvent = #{
stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
},
EvHandler:request_end(RequestEndEvent, EvHandlerState0)
end,
{maybe_delete_stream(State, StreamID, local, IsFin), EvHandlerState};
case send_data_frame(State0, StreamID, IsFin, Data) of
{state, State} ->
EvHandlerState = case IsFin of
nofin ->
EvHandlerState0;
fin ->
#stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID),
RequestEndEvent = #{
stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
},
EvHandler:request_end(RequestEndEvent, EvHandlerState0)
end,
{{state, maybe_delete_stream(State, StreamID, local, IsFin)}, EvHandlerState};
Error={error, _Reason} ->
{Error, EvHandlerState0}
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
end;

send_data(State0, StreamID, IsFin, [Data|Tail], EvHandler, EvHandlerState) ->
State = send_data_frame(State0, StreamID, nofin, Data),
send_data(State, StreamID, IsFin, Tail, EvHandler, EvHandlerState).
case send_data_frame(State0, StreamID, nofin, Data) of
{state, State} ->
send_data(State, StreamID, IsFin, Tail, EvHandler, EvHandlerState);
Error={error, _Reason} ->
{Error, EvHandlerState}
end.

send_data_frame(State=#http2_state{socket=Socket, transport=Transport},
StreamID, IsFin, {data, Data}) ->
Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data)),
State;
{state, State};
%% @todo Uncomment this once sendfile is supported.
%send_data_frame(State=#http2_state{socket=Socket, transport=Transport},
% StreamID, IsFin, {sendfile, Offset, Bytes, Path}) ->
% Transport:send(Socket, cow_http2:data_header(StreamID, IsFin, Bytes)),
% Transport:sendfile(Socket, Path, Offset, Bytes),
% State;
% {state, State};
%% The stream is terminated in cow_http2_machine:prepare_trailers.
send_data_frame(State=#http2_state{socket=Socket, transport=Transport,
http2_machine=HTTP2Machine0}, StreamID, nofin, {trailers, Trailers}) ->
{ok, HeaderBlock, HTTP2Machine}
= cow_http2_machine:prepare_trailers(StreamID, HTTP2Machine0, Trailers),
Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)),
State#http2_state{http2_machine=HTTP2Machine}.
{state, State#http2_state{http2_machine=HTTP2Machine}}.

reset_stream(State0=#http2_state{socket=Socket, transport=Transport},
StreamID, StreamError={stream_error, Reason, _}) ->
Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)),
case take_stream(State0, StreamID) of
{#stream{ref=StreamRef, reply_to=ReplyTo}, State} ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), StreamError},
State;
{state, State};
error ->
State0
{state, State0}
end.

connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
Expand Down Expand Up @@ -1260,7 +1286,7 @@ timeout(State, {cow_http2_machine, RealStreamRef, Name}, TRef) ->
{state, store_stream(State, Stream#stream{
tunnel=Tunnel#tunnel{protocol_state=ProtoState}})};
{error, {connection_error, Reason, Human}} ->
{state, reset_stream(State, StreamID, {stream_error, Reason, Human})}
reset_stream(State, StreamID, {stream_error, Reason, Human})
end;
%% We ignore timeout events for streams that no longer exist.
error ->
Expand Down