From 6679d68406e33f880102e3e2a578491c9c44b366 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 27 Sep 2022 20:11:27 +0200 Subject: [PATCH 1/6] Make many gun_http2 functions return state or error tuple The following functions used to return a state, but now return {state, State} or {error, Reason}: * frame/5 * update_window/1,2 * maybe_ack_or_notify/2 * reset_stream/3 * push_promise_frame/7 * goaway/2 * maybe_send_data/6 * send_data/4 * send_data/6 * send_data_frame/4 --- src/gun_http2.erl | 172 ++++++++++++++++++++++++++-------------------- 1 file changed, 99 insertions(+), 73 deletions(-) diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 7af64078..38f95686 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -203,7 +203,7 @@ parse(Data, State0=#http2_state{status=preface, http2_machine=HTTP2Machine}, case frame(State0#http2_state{status=connected}, Frame, CookieStore0, EvHandler, EvHandlerState0) of {Error={error, _}, CookieStore, EvHandlerState} -> {Error, CookieStore, EvHandlerState}; - {State, CookieStore, EvHandlerState} -> + {{state, State}, CookieStore, EvHandlerState} -> parse(Rest, State, CookieStore, EvHandler, EvHandlerState) end; more -> @@ -228,7 +228,7 @@ parse(Data, State0=#http2_state{status=Status, http2_machine=HTTP2Machine, strea case frame(State0, Frame, CookieStore0, EvHandler, EvHandlerState0) of {Error={error, _}, CookieStore, EvHandlerState} -> {Error, CookieStore, EvHandlerState}; - {State, CookieStore, EvHandlerState} -> + {{state, State}, CookieStore, EvHandlerState} -> parse(Rest, State, CookieStore, EvHandler, EvHandlerState) end; {ignore, Rest} -> @@ -239,8 +239,12 @@ parse(Data, State0=#http2_state{status=Status, http2_machine=HTTP2Machine, strea parse(Rest, State, CookieStore0, EvHandler, EvHandlerState0) end; {stream_error, StreamID, Reason, Human, Rest} -> - parse(Rest, reset_stream(State0, StreamID, {stream_error, Reason, Human}), - CookieStore0, EvHandler, EvHandlerState0); + case reset_stream(State0, StreamID, {stream_error, Reason, Human}) of + {state, State} -> + parse(Rest, State, CookieStore0, EvHandler, EvHandlerState0); + Error={error, _} -> + Error + end; Error = {connection_error, _, _} -> {connection_error(State0, Error), CookieStore0, EvHandlerState0}; %% If we both received and sent a GOAWAY frame and there are no streams @@ -290,41 +294,42 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, CookieStore, EvHan {maybe_ack_or_notify(State#http2_state{http2_machine=HTTP2Machine}, Frame), CookieStore, EvHandlerState}; {ok, {data, StreamID, IsFin, Data}, HTTP2Machine} -> - {StateRet, CookieStoreRet, EvHandlerStateRet} = data_frame( - State#http2_state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data, - CookieStore, EvHandler, EvHandlerState), - case StateRet of - {state, State1} -> {State1, CookieStoreRet, EvHandlerStateRet}; - Error -> {Error, CookieStoreRet, EvHandlerStateRet} - end; + data_frame(State#http2_state{http2_machine=HTTP2Machine}, + StreamID, IsFin, Data, CookieStore, EvHandler, EvHandlerState); {ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP2Machine} -> - headers_frame(State#http2_state{http2_machine=HTTP2Machine}, + {StateRet, CookieStoreRet, EvHandlerStateRet} = headers_frame( + State#http2_state{http2_machine=HTTP2Machine}, StreamID, IsFin, Headers, PseudoHeaders, BodyLen, - CookieStore, EvHandler, EvHandlerState); + CookieStore, EvHandler, EvHandlerState), + {{state, StateRet}, CookieStoreRet, EvHandlerStateRet}; {ok, {trailers, StreamID, Trailers}, HTTP2Machine} -> {StateRet, EvHandlerStateRet} = trailers_frame( State#http2_state{http2_machine=HTTP2Machine}, StreamID, Trailers, EvHandler, EvHandlerState), - {StateRet, CookieStore, EvHandlerStateRet}; + {{state, StateRet}, CookieStore, EvHandlerStateRet}; {ok, {rst_stream, StreamID, Reason}, HTTP2Machine} -> {StateRet, EvHandlerStateRet} = rst_stream_frame( State#http2_state{http2_machine=HTTP2Machine}, StreamID, Reason, EvHandler, EvHandlerState), - {StateRet, CookieStore, EvHandlerStateRet}; + {{state, StateRet}, CookieStore, EvHandlerStateRet}; {ok, {push_promise, StreamID, PromisedStreamID, Headers, PseudoHeaders}, HTTP2Machine} -> - {StateRet, EvHandlerStateRet} = push_promise_frame( + {StateOrError, EvHandlerStateRet} = push_promise_frame( State#http2_state{http2_machine=HTTP2Machine}, StreamID, PromisedStreamID, Headers, PseudoHeaders, EvHandler, EvHandlerState), - {StateRet, CookieStore, EvHandlerStateRet}; + {StateOrError, CookieStore, EvHandlerStateRet}; {ok, GoAway={goaway, _, _, _}, HTTP2Machine} -> {goaway(State#http2_state{http2_machine=HTTP2Machine}, GoAway), CookieStore, EvHandlerState}; {send, SendData, HTTP2Machine} -> - {StateRet, EvHandlerStateRet} = send_data( - maybe_ack_or_notify(State#http2_state{http2_machine=HTTP2Machine}, Frame), - SendData, EvHandler, EvHandlerState), - {StateRet, CookieStore, EvHandlerStateRet}; + case maybe_ack_or_notify(State#http2_state{http2_machine=HTTP2Machine}, Frame) of + {state, State1} -> + {StateOrError, EvHandlerStateRet} = send_data(State1, + SendData, EvHandler, EvHandlerState), + {StateOrError, CookieStore, EvHandlerStateRet}; + Error={error, _} -> + {Error, CookieStore, EvHandlerState} + end; {error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} -> {reset_stream(State#http2_state{http2_machine=HTTP2Machine}, StreamID, {stream_error, Reason, Human}), @@ -352,14 +357,14 @@ maybe_ack_or_notify(State=#http2_state{reply_to=ReplyTo, socket=Socket, _ -> ok end, - State. + {state, State}. data_frame(State0, StreamID, IsFin, Data, CookieStore0, EvHandler, EvHandlerState0) -> case get_stream_by_id(State0, StreamID) of Stream=#stream{tunnel=undefined} -> {State, EvHandlerState} = data_frame1(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0, Stream), - {State, CookieStore0, EvHandlerState}; + {{state, State}, CookieStore0, EvHandlerState}; Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> % %% @todo What about IsFin? {Commands, CookieStore, EvHandlerState1} = Proto:handle(Data, @@ -377,9 +382,13 @@ tunnel_commands([], Stream, State, _EvHandler, EvHandlerState) -> {{state, store_stream(State, Stream)}, EvHandlerState}; tunnel_commands([{send, IsFin, Data}|Tail], Stream=#stream{id=StreamID}, State0, EvHandler, EvHandlerState0) -> - {State, EvHandlerState} = maybe_send_data(State0, StreamID, - IsFin, Data, EvHandler, EvHandlerState0), - tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState); + case maybe_send_data(State0, StreamID, + IsFin, Data, EvHandler, EvHandlerState0) of + {{state, State}, EvHandlerState} -> + tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState); + ErrorResult={{error, _Reason}, _EvHandlerState} -> + ErrorResult + end; tunnel_commands([{state, ProtoState}|Tail], Stream=#stream{tunnel=Tunnel}, State, EvHandler, EvHandlerState) -> tunnel_commands(Tail, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}, @@ -412,16 +421,16 @@ data_frame1(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0, _ -> Flow0 - Dec end, State1 = store_stream(State0, Stream#stream{flow=Flow, handler_state=Handlers}), - {State, EvHandlerState} = case byte_size(Data) of + {StateOrError, EvHandlerState} = case byte_size(Data) of %% We do not send a WINDOW_UPDATE if the DATA frame was of size 0. 0 when IsFin =:= fin -> EvHandlerState1 = EvHandler:response_end(#{ stream_ref => stream_ref(State1, StreamRef), reply_to => ReplyTo }, EvHandlerState0), - {State1, EvHandlerState1}; + {{state, State1}, EvHandlerState1}; 0 -> - {State1, EvHandlerState0}; + {{state, State1}, EvHandlerState0}; _ -> %% We do not send a stream WINDOW_UPDATE when the flow control kicks in %% (it'll be sent when the flow recovers) or for the last DATA frame. @@ -438,7 +447,12 @@ data_frame1(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0, {update_window(State1), EvHandlerState1} end end, - {maybe_delete_stream(State, StreamID, remote, IsFin), EvHandlerState}. + case StateOrError of + {state, State} -> + {maybe_delete_stream(State, StreamID, remote, IsFin), EvHandlerState}; + Error={error, _} -> + {Error, EvHandlerState} + end. headers_frame(State0=#http2_state{opts=Opts}, StreamID, IsFin, Headers, #{status := Status}, _BodyLen, @@ -493,17 +507,17 @@ headers_frame_connect(State0=#http2_state{http2_machine=HTTP2Machine0}, close -> {ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0), State1 = State0#http2_state{http2_machine=HTTP2Machine}, - State = reset_stream(State1, StreamID, {stream_error, cancel, + StateOrError = reset_stream(State1, StreamID, {stream_error, cancel, 'The sec-websocket-extensions header is invalid. (RFC6455 9.1, RFC7692 7)'}), - {State, EvHandlerState}; + {StateOrError, EvHandlerState}; Extensions -> case gun_ws:select_protocol(Headers, WsOpts) of close -> {ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0), State1 = State0#http2_state{http2_machine=HTTP2Machine}, - State = reset_stream(State1, StreamID, {stream_error, cancel, + StateOrError = reset_stream(State1, StreamID, {stream_error, cancel, 'The sec-websocket-protocol header is invalid. (RFC6455 4.1)'}), - {State, EvHandlerState}; + {StateOrError, EvHandlerState}; Handler -> headers_frame_connect_websocket(State0, Stream, Headers, EvHandler, EvHandlerState, Extensions, Handler) @@ -712,12 +726,12 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport, connected -> NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef, reply_to=ReplyTo, flow=InitialFlow, authority=Authority, path=Path}, - {create_stream(State, NewStream), EvHandlerState}; + {{state, create_stream(State, NewStream)}, EvHandlerState}; %% We cancel the push_promise immediately when we are shutting down. _ -> {ok, HTTP2Machine} = cow_http2_machine:reset_stream(PromisedStreamID, HTTP2Machine0), Transport:send(Socket, cow_http2:rst_stream(PromisedStreamID, cancel)), - {State#http2_state{http2_machine=HTTP2Machine}, EvHandlerState} + {{state, State#http2_state{http2_machine=HTTP2Machine}}, EvHandlerState} end. ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) -> @@ -758,8 +772,8 @@ update_flow(State, _ReplyTo, StreamRef, Inc) -> if %% Flow is active again, update the stream's window. Flow0 =< 0, Flow > 0 -> - {state, update_window(store_stream(State, - Stream#stream{flow=Flow}), StreamID)}; + update_window(store_stream(State, + Stream#stream{flow=Flow}), StreamID); true -> {state, store_stream(State, Stream#stream{flow=Flow})} end; @@ -772,10 +786,10 @@ update_window(State=#http2_state{socket=Socket, transport=Transport, opts=#{initial_connection_window_size := ConnWindow}, http2_machine=HTTP2Machine0}) -> case cow_http2_machine:ensure_window(ConnWindow, HTTP2Machine0) of ok -> - State; + {state, State}; {ok, Increment, HTTP2Machine} -> Transport:send(Socket, cow_http2:window_update(Increment)), - State#http2_state{http2_machine=HTTP2Machine} + {state, State#http2_state{http2_machine=HTTP2Machine}} end. %% Update both the connection and the stream's window. @@ -794,7 +808,7 @@ update_window(State=#http2_state{socket=Socket, transport=Transport, {<<>>, <<>>} -> ok; _ -> Transport:send(Socket, [Data1, Data2]) end, - State#http2_state{http2_machine=HTTP2Machine}. + {state, State#http2_state{http2_machine=HTTP2Machine}}. %% We may have to cancel streams even if we receive multiple %% GOAWAY frames as the LastStreamID value may be lower than @@ -812,9 +826,9 @@ goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTT Transport:send(Socket, cow_http2:goaway( cow_http2_machine:get_last_streamid(HTTP2Machine), no_error, <<>>)), - State#http2_state{status=goaway}; + {state, State#http2_state{status=goaway}}; _ -> - State + {state, State} end. %% Cancel server-initiated streams that are above LastStreamID. @@ -960,9 +974,9 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, }, {{state, State}, CookieStore, EvHandler:request_end(RequestEndEvent, EvHandlerState)}; nofin -> - {StateRet, EvHandlerStateRet} = maybe_send_data( + {StateOrError, EvHandlerStateRet} = maybe_send_data( State, StreamID, fin, Body, EvHandler, EvHandlerState), - {{state, StateRet}, CookieStore, EvHandlerStateRet} + {StateOrError, CookieStore, EvHandlerStateRet} end; %% Tunneled request. request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, @@ -1037,9 +1051,8 @@ data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin, error_stream_closed(State, StreamRef, ReplyTo), {[], EvHandlerState}; {ok, _, _} when Tunnel =:= undefined -> - {State1, EvHandlerStateRet} = maybe_send_data(State, - StreamID, IsFin, Data, EvHandler, EvHandlerState), - {{state, State1}, EvHandlerStateRet}; + maybe_send_data(State, + StreamID, IsFin, Data, EvHandler, EvHandlerState); {ok, _, _} -> #tunnel{protocol=Proto, protocol_state=ProtoState0} = Tunnel, {Commands, EvHandlerState1} = Proto:data(ProtoState0, StreamRef, @@ -1074,53 +1087,66 @@ maybe_send_data(State=#http2_state{http2_machine=HTTP2Machine0}, StreamID, IsFin end, case cow_http2_machine:send_or_queue_data(StreamID, HTTP2Machine0, IsFin, Data) of {ok, HTTP2Machine} -> - {State#http2_state{http2_machine=HTTP2Machine}, EvHandlerState}; + {{state, State#http2_state{http2_machine=HTTP2Machine}}, EvHandlerState}; {send, SendData, HTTP2Machine} -> send_data(State#http2_state{http2_machine=HTTP2Machine}, SendData, EvHandler, EvHandlerState) end. send_data(State, [], _, EvHandlerState) -> - {State, EvHandlerState}; + {{state, State}, EvHandlerState}; send_data(State0, [{StreamID, IsFin, SendData}|Tail], EvHandler, EvHandlerState0) -> - {State, EvHandlerState} = send_data(State0, StreamID, IsFin, SendData, EvHandler, EvHandlerState0), - send_data(State, Tail, EvHandler, EvHandlerState). + case send_data(State0, StreamID, IsFin, SendData, EvHandler, EvHandlerState0) of + {{state, State}, EvHandlerState} -> + send_data(State, Tail, EvHandler, EvHandlerState); + ErrorResult={{error, _}, _EvHandlerState} -> + ErrorResult + end. send_data(State0, StreamID, IsFin, [Data], EvHandler, EvHandlerState0) -> - State = send_data_frame(State0, StreamID, IsFin, Data), - EvHandlerState = case IsFin of - nofin -> - EvHandlerState0; - fin -> - #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), - RequestEndEvent = #{ - stream_ref => stream_ref(State, StreamRef), - reply_to => ReplyTo - }, - EvHandler:request_end(RequestEndEvent, EvHandlerState0) - end, - {maybe_delete_stream(State, StreamID, local, IsFin), EvHandlerState}; + case send_data_frame(State0, StreamID, IsFin, Data) of + {state, State} -> + EvHandlerState = case IsFin of + nofin -> + EvHandlerState0; + fin -> + #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), + RequestEndEvent = #{ + stream_ref => stream_ref(State, StreamRef), + reply_to => ReplyTo + }, + EvHandler:request_end(RequestEndEvent, EvHandlerState0) + end, + {{state, maybe_delete_stream(State, StreamID, local, IsFin)}, EvHandlerState}; + Error={error, _Reason} -> + {Error, EvHandlerState0} + end; + send_data(State0, StreamID, IsFin, [Data|Tail], EvHandler, EvHandlerState) -> - State = send_data_frame(State0, StreamID, nofin, Data), - send_data(State, StreamID, IsFin, Tail, EvHandler, EvHandlerState). + case send_data_frame(State0, StreamID, nofin, Data) of + {state, State} -> + send_data(State, StreamID, IsFin, Tail, EvHandler, EvHandlerState); + Error={error, _Reason} -> + {Error, EvHandlerState} + end. send_data_frame(State=#http2_state{socket=Socket, transport=Transport}, StreamID, IsFin, {data, Data}) -> Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data)), - State; + {state, State}; %% @todo Uncomment this once sendfile is supported. %send_data_frame(State=#http2_state{socket=Socket, transport=Transport}, % StreamID, IsFin, {sendfile, Offset, Bytes, Path}) -> % Transport:send(Socket, cow_http2:data_header(StreamID, IsFin, Bytes)), % Transport:sendfile(Socket, Path, Offset, Bytes), -% State; +% {state, State}; %% The stream is terminated in cow_http2_machine:prepare_trailers. send_data_frame(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamID, nofin, {trailers, Trailers}) -> {ok, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_trailers(StreamID, HTTP2Machine0, Trailers), Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)), - State#http2_state{http2_machine=HTTP2Machine}. + {state, State#http2_state{http2_machine=HTTP2Machine}}. reset_stream(State0=#http2_state{socket=Socket, transport=Transport}, StreamID, StreamError={stream_error, Reason, _}) -> @@ -1128,9 +1154,9 @@ reset_stream(State0=#http2_state{socket=Socket, transport=Transport}, case take_stream(State0, StreamID) of {#stream{ref=StreamRef, reply_to=ReplyTo}, State} -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), StreamError}, - State; + {state, State}; error -> - State0 + {state, State0} end. connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, @@ -1260,7 +1286,7 @@ timeout(State, {cow_http2_machine, RealStreamRef, Name}, TRef) -> {state, store_stream(State, Stream#stream{ tunnel=Tunnel#tunnel{protocol_state=ProtoState}})}; {error, {connection_error, Reason, Human}} -> - {state, reset_stream(State, StreamID, {stream_error, Reason, Human})} + reset_stream(State, StreamID, {stream_error, Reason, Human}) end; %% We ignore timeout events for streams that no longer exist. error -> From 447c17c39faa07e90fb3e531f2a6ee53789c3d93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Mon, 10 Oct 2022 19:44:44 +0200 Subject: [PATCH 2/6] Address code review comments --- src/gun_http2.erl | 49 +++++++++++++++++++++++------------------------ 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 38f95686..266bf27a 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -235,7 +235,7 @@ parse(Data, State0=#http2_state{status=Status, http2_machine=HTTP2Machine, strea case ignored_frame(State0) of Error = {error, _} -> {Error, CookieStore0, EvHandlerState0}; - State -> + {state, State} -> parse(Rest, State, CookieStore0, EvHandler, EvHandlerState0) end; {stream_error, StreamID, Reason, Human, Rest} -> @@ -297,21 +297,19 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, CookieStore, EvHan data_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data, CookieStore, EvHandler, EvHandlerState); {ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP2Machine} -> - {StateRet, CookieStoreRet, EvHandlerStateRet} = headers_frame( - State#http2_state{http2_machine=HTTP2Machine}, + headers_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, IsFin, Headers, PseudoHeaders, BodyLen, - CookieStore, EvHandler, EvHandlerState), - {{state, StateRet}, CookieStoreRet, EvHandlerStateRet}; + CookieStore, EvHandler, EvHandlerState); {ok, {trailers, StreamID, Trailers}, HTTP2Machine} -> - {StateRet, EvHandlerStateRet} = trailers_frame( + {StateOrError, EvHandlerStateRet} = trailers_frame( State#http2_state{http2_machine=HTTP2Machine}, StreamID, Trailers, EvHandler, EvHandlerState), - {{state, StateRet}, CookieStore, EvHandlerStateRet}; + {StateOrError, CookieStore, EvHandlerStateRet}; {ok, {rst_stream, StreamID, Reason}, HTTP2Machine} -> - {StateRet, EvHandlerStateRet} = rst_stream_frame( + {StateOrError, EvHandlerStateRet} = rst_stream_frame( State#http2_state{http2_machine=HTTP2Machine}, StreamID, Reason, EvHandler, EvHandlerState), - {{state, StateRet}, CookieStore, EvHandlerStateRet}; + {StateOrError, CookieStore, EvHandlerStateRet}; {ok, {push_promise, StreamID, PromisedStreamID, Headers, PseudoHeaders}, HTTP2Machine} -> {StateOrError, EvHandlerStateRet} = push_promise_frame( State#http2_state{http2_machine=HTTP2Machine}, @@ -362,9 +360,9 @@ maybe_ack_or_notify(State=#http2_state{reply_to=ReplyTo, socket=Socket, data_frame(State0, StreamID, IsFin, Data, CookieStore0, EvHandler, EvHandlerState0) -> case get_stream_by_id(State0, StreamID) of Stream=#stream{tunnel=undefined} -> - {State, EvHandlerState} = data_frame1(State0, + {StateOrError, EvHandlerState} = data_frame1(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0, Stream), - {{state, State}, CookieStore0, EvHandlerState}; + {StateOrError, CookieStore0, EvHandlerState}; Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> % %% @todo What about IsFin? {Commands, CookieStore, EvHandlerState1} = Proto:handle(Data, @@ -449,7 +447,8 @@ data_frame1(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0, end, case StateOrError of {state, State} -> - {maybe_delete_stream(State, StreamID, remote, IsFin), EvHandlerState}; + {{state, maybe_delete_stream(State, StreamID, remote, IsFin)}, + EvHandlerState}; Error={error, _} -> {Error, EvHandlerState} end. @@ -465,7 +464,7 @@ headers_frame(State0=#http2_state{opts=Opts}, } = Stream, CookieStore = gun_cookies:set_cookie_header(scheme(State0), Authority, Path, Status, Headers, CookieStore0, Opts), - {State, EvHandlerState} = if + {StateOrError, EvHandlerState} = if Status >= 100, Status =< 199 -> headers_frame_inform(State0, Stream, Status, Headers, EvHandler, EvHandlerState0); Status >= 200, Status =< 299, element(#tunnel.state, Tunnel) =:= requested, IsFin =:= nofin -> @@ -473,7 +472,7 @@ headers_frame(State0=#http2_state{opts=Opts}, true -> headers_frame_response(State0, Stream, IsFin, Status, Headers, EvHandler, EvHandlerState0) end, - {State, CookieStore, EvHandlerState}. + {StateOrError, CookieStore, EvHandlerState}. headers_frame_inform(State, #stream{ref=StreamRef, reply_to=ReplyTo}, Status, Headers, EvHandler, EvHandlerState0) -> @@ -485,7 +484,7 @@ headers_frame_inform(State, #stream{ref=StreamRef, reply_to=ReplyTo}, status => Status, headers => Headers }, EvHandlerState0), - {State, EvHandlerState}. + {{state, State}, EvHandlerState}. headers_frame_connect(State0=#http2_state{http2_machine=HTTP2Machine0}, Stream=#stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, tunnel=#tunnel{ @@ -603,8 +602,8 @@ headers_frame_connect(State=#http2_state{transport=Transport, opts=Opts, tunnel_ end, {tunnel, ProtoState, EvHandlerState} = Proto:init( ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState3), - {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{state=established, - info=TunnelInfo, protocol=Proto, protocol_state=ProtoState}}), + {{state, store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{state=established, + info=TunnelInfo, protocol=Proto, protocol_state=ProtoState}})}, EvHandlerState}. headers_frame_connect_websocket(State, Stream=#stream{ref=StreamRef, reply_to=ReplyTo, @@ -635,8 +634,8 @@ headers_frame_connect_websocket(State, Stream=#stream{ref=StreamRef, reply_to=Re %% @todo Handle error result from Proto:init/4 {ok, connected_ws_only, ProtoState} = Proto:init( ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts), - {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{state=established, - protocol=Proto, protocol_state=ProtoState}}), + {{state, store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{state=established, + protocol=Proto, protocol_state=ProtoState}})}, EvHandlerState}. headers_frame_response(State=#http2_state{content_handlers=Handlers0}, @@ -662,9 +661,9 @@ headers_frame_response(State=#http2_state{content_handlers=Handlers0}, Status, Headers, Handlers0), EvHandlerState1} end, %% We disable the tunnel, if any, when receiving any non 2xx response. - {maybe_delete_stream(store_stream(State, + {{state, maybe_delete_stream(store_stream(State, Stream#stream{handler_state=Handlers, tunnel=undefined}), - StreamID, remote, IsFin), EvHandlerState}. + StreamID, remote, IsFin)}, EvHandlerState}. trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) -> #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), @@ -677,7 +676,7 @@ trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) -> }, EvHandlerState1 = EvHandler:response_trailers(ResponseEvent#{headers => Trailers}, EvHandlerState0), EvHandlerState = EvHandler:response_end(ResponseEvent, EvHandlerState1), - {maybe_delete_stream(State, StreamID, remote, fin), EvHandlerState}. + {{state, maybe_delete_stream(State, StreamID, remote, fin)}, EvHandlerState}. rst_stream_frame(State0, StreamID, Reason, EvHandler, EvHandlerState0) -> case take_stream(State0, StreamID) of @@ -690,9 +689,9 @@ rst_stream_frame(State0, StreamID, Reason, EvHandler, EvHandlerState0) -> endpoint => remote, reason => Reason }, EvHandlerState0), - {State, EvHandlerState}; + {{state, State}, EvHandlerState}; error -> - {State0, EvHandlerState0} + {{state, State0}, EvHandlerState0} end. %% Pushed streams receive the same initial flow value as the parent stream. @@ -737,7 +736,7 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport, ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) -> case cow_http2_machine:ignored_frame(HTTP2Machine0) of {ok, HTTP2Machine} -> - State#http2_state{http2_machine=HTTP2Machine}; + {state, State#http2_state{http2_machine=HTTP2Machine}}; {error, Error={connection_error, _, _}, HTTP2Machine} -> connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error) end. From 6b2b6556089f317b7d9b18503000484c95ef53f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Mon, 10 Oct 2022 21:27:48 +0200 Subject: [PATCH 3/6] Delete stream regardless of error in data_frame1 --- src/gun_http2.erl | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 266bf27a..688b445c 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -360,9 +360,9 @@ maybe_ack_or_notify(State=#http2_state{reply_to=ReplyTo, socket=Socket, data_frame(State0, StreamID, IsFin, Data, CookieStore0, EvHandler, EvHandlerState0) -> case get_stream_by_id(State0, StreamID) of Stream=#stream{tunnel=undefined} -> - {StateOrError, EvHandlerState} = data_frame1(State0, + {Commands, EvHandlerState} = data_frame1(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0, Stream), - {StateOrError, CookieStore0, EvHandlerState}; + {Commands, CookieStore0, EvHandlerState}; Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> % %% @todo What about IsFin? {Commands, CookieStore, EvHandlerState1} = Proto:handle(Data, @@ -445,13 +445,13 @@ data_frame1(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0, {update_window(State1), EvHandlerState1} end end, - case StateOrError of - {state, State} -> - {{state, maybe_delete_stream(State, StreamID, remote, IsFin)}, - EvHandlerState}; + {State, Errors} = case StateOrError of + {state, State2} -> + {State2, []}; Error={error, _} -> - {Error, EvHandlerState} - end. + {State1, [Error]} + end, + {[{state, maybe_delete_stream(State, StreamID, remote, IsFin)} | Errors], EvHandlerState}. headers_frame(State0=#http2_state{opts=Opts}, StreamID, IsFin, Headers, #{status := Status}, _BodyLen, From f160e0a1f5f00e605b6a94802e21f32aa4eeacea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Mon, 10 Oct 2022 21:33:14 +0200 Subject: [PATCH 4/6] Revert "Delete stream regardless of error in data_frame1" This reverts commit 6b2b6556089f317b7d9b18503000484c95ef53f2. --- src/gun_http2.erl | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 688b445c..266bf27a 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -360,9 +360,9 @@ maybe_ack_or_notify(State=#http2_state{reply_to=ReplyTo, socket=Socket, data_frame(State0, StreamID, IsFin, Data, CookieStore0, EvHandler, EvHandlerState0) -> case get_stream_by_id(State0, StreamID) of Stream=#stream{tunnel=undefined} -> - {Commands, EvHandlerState} = data_frame1(State0, + {StateOrError, EvHandlerState} = data_frame1(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0, Stream), - {Commands, CookieStore0, EvHandlerState}; + {StateOrError, CookieStore0, EvHandlerState}; Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> % %% @todo What about IsFin? {Commands, CookieStore, EvHandlerState1} = Proto:handle(Data, @@ -445,13 +445,13 @@ data_frame1(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0, {update_window(State1), EvHandlerState1} end end, - {State, Errors} = case StateOrError of - {state, State2} -> - {State2, []}; + case StateOrError of + {state, State} -> + {{state, maybe_delete_stream(State, StreamID, remote, IsFin)}, + EvHandlerState}; Error={error, _} -> - {State1, [Error]} - end, - {[{state, maybe_delete_stream(State, StreamID, remote, IsFin)} | Errors], EvHandlerState}. + {Error, EvHandlerState} + end. headers_frame(State0=#http2_state{opts=Opts}, StreamID, IsFin, Headers, #{status := Status}, _BodyLen, From 3fa1f7579c79851da0bbef01d36fbd1a9b4fa77a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Mon, 10 Oct 2022 21:35:08 +0200 Subject: [PATCH 5/6] Add todo for returning new state and error in data_frame1 --- src/gun_http2.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 266bf27a..6d99bb81 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -450,6 +450,7 @@ data_frame1(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0, {{state, maybe_delete_stream(State, StreamID, remote, IsFin)}, EvHandlerState}; Error={error, _} -> + %% @todo Delete stream and return new state and error commands. {Error, EvHandlerState} end. From 0b6915e3d9a5533dcaa49b35a4434ddae8073e69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 11 Oct 2022 15:03:58 +0200 Subject: [PATCH 6/6] Fix a return error tuple in parse/5 --- src/gun_http2.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 6d99bb81..46093e02 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -243,7 +243,7 @@ parse(Data, State0=#http2_state{status=Status, http2_machine=HTTP2Machine, strea {state, State} -> parse(Rest, State, CookieStore0, EvHandler, EvHandlerState0); Error={error, _} -> - Error + {Error, CookieStore0, EvHandlerState0} end; Error = {connection_error, _, _} -> {connection_error(State0, Error), CookieStore0, EvHandlerState0};