diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 17265286..47f670fa 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -40,7 +40,6 @@ %% Flow control. flow :: integer() | infinity, - flow_window = 0 :: non_neg_integer(), %% Content handlers state. handler_state :: undefined | gun_content_handler:state() @@ -87,6 +86,10 @@ do_check_options([{keepalive, infinity}|Opts]) -> do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 -> do_check_options(Opts); %% @todo Add all http2_machine options. +do_check_options([{initial_connection_window_size, _}|Opts]) -> + do_check_options(Opts); +do_check_options([{initial_stream_window_size, _}|Opts]) -> + do_check_options(Opts); do_check_options([{max_frame_size_received, _}|Opts]) -> do_check_options(Opts); do_check_options([Opt|_]) -> @@ -94,7 +97,13 @@ do_check_options([Opt|_]) -> name() -> http2. -init(Owner, Socket, Transport, Opts) -> +init(Owner, Socket, Transport, Opts0) -> + %% We have different defaults than the protocol in order + %% to optimize for performance when receiving responses. + Opts = Opts0#{ + initial_connection_window_size => maps:get(initial_connection_window_size, Opts0, 8000000), + initial_stream_window_size => maps:get(initial_stream_window_size, Opts0, 8000000) + }, {ok, Preface, HTTP2Machine} = cow_http2_machine:init(client, Opts), Handlers = maps:get(content_handlers, Opts, [gun_data_h]), %% @todo Better validate the preface being received. @@ -209,62 +218,47 @@ maybe_ack(State=#http2_state{socket=Socket, transport=Transport}, Frame) -> end, State. -lingering_data_frame(State=#http2_state{socket=Socket, transport=Transport, - http2_machine=HTTP2Machine0}, DataLen) -> - Transport:send(Socket, cow_http2:window_update(DataLen)), - HTTP2Machine1 = cow_http2_machine:update_window(DataLen, HTTP2Machine0), - State#http2_state{http2_machine=HTTP2Machine1}. +lingering_data_frame(State, _DataLen) -> + %% We only update the connection's window when receiving + %% a lingering data frame. + update_window(State). -data_frame(State=#http2_state{socket=Socket, transport=Transport, - http2_machine=HTTP2Machine0}, StreamID, IsFin, Data, - EvHandler, EvHandlerState0) -> +data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0) -> Stream = #stream{ref=StreamRef, reply_to=ReplyTo, flow=Flow0, - flow_window=FlowWindow0, handler_state=Handlers0} = get_stream_by_id(State, StreamID), + handler_state=Handlers0} = get_stream_by_id(State0, StreamID), {ok, Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0), Flow = case Flow0 of infinity -> infinity; _ -> Flow0 - Dec end, - Size = byte_size(Data), - FlowWindow = if - IsFin =:= nofin, Flow =< 0 -> - FlowWindow0 + Size; - true -> - FlowWindow0 - end, - {HTTP2Machine, EvHandlerState} = case Size of + State1 = store_stream(State0, Stream#stream{flow=Flow, handler_state=Handlers}), + {State, EvHandlerState} = case byte_size(Data) of %% We do not send a WINDOW_UPDATE if the DATA frame was of size 0. 0 when IsFin =:= fin -> EvHandlerState1 = EvHandler:response_end(#{ stream_ref => StreamRef, reply_to => ReplyTo }, EvHandlerState0), - {HTTP2Machine0, EvHandlerState1}; + {State1, EvHandlerState1}; 0 -> - {HTTP2Machine0, EvHandlerState0}; + {State1, EvHandlerState0}; _ -> - Transport:send(Socket, cow_http2:window_update(Size)), - HTTP2Machine1 = cow_http2_machine:update_window(Size, HTTP2Machine0), %% We do not send a stream WINDOW_UPDATE when the flow control kicks in %% (it'll be sent when the flow recovers) or for the last DATA frame. case IsFin of nofin when Flow =< 0 -> - {HTTP2Machine1, EvHandlerState0}; + {update_window(State1), EvHandlerState0}; nofin -> - Transport:send(Socket, cow_http2:window_update(StreamID, Size)), - {cow_http2_machine:update_window(StreamID, Size, HTTP2Machine1), - EvHandlerState0}; + {update_window(State1, StreamID), EvHandlerState0}; fin -> EvHandlerState1 = EvHandler:response_end(#{ stream_ref => StreamRef, reply_to => ReplyTo }, EvHandlerState0), - {HTTP2Machine1, EvHandlerState1} + {update_window(State1), EvHandlerState1} end end, - {maybe_delete_stream(store_stream(State#http2_state{http2_machine=HTTP2Machine}, - Stream#stream{flow=Flow, flow_window=FlowWindow, handler_state=Handlers}), StreamID, remote, IsFin), - EvHandlerState}. + {maybe_delete_stream(State, StreamID, remote, IsFin), EvHandlerState}. headers_frame(State=#http2_state{content_handlers=Handlers0}, StreamID, IsFin, Headers, PseudoHeaders, _BodyLen, @@ -378,21 +372,18 @@ ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) -> connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error) end. -update_flow(State=#http2_state{socket=Socket, transport=Transport, - http2_machine=HTTP2Machine0}, _ReplyTo, StreamRef, Inc) -> +update_flow(State, _ReplyTo, StreamRef, Inc) -> case get_stream_by_ref(State, StreamRef) of - Stream=#stream{id=StreamID, flow=Flow0, flow_window=FlowWindow} -> + Stream=#stream{id=StreamID, flow=Flow0} -> Flow = case Flow0 of infinity -> infinity; _ -> Flow0 + Inc end, if - %% Flow is active again, update the window. + %% Flow is active again, update the stream's window. Flow0 =< 0, Flow > 0 -> - Transport:send(Socket, cow_http2:window_update(StreamID, FlowWindow)), - HTTP2Machine = cow_http2_machine:update_window(StreamID, FlowWindow, HTTP2Machine0), - {state, store_stream(State#http2_state{http2_machine=HTTP2Machine}, - Stream#stream{flow=Flow, flow_window=0})}; + {state, update_window(store_stream(State, + Stream#stream{flow=Flow}), StreamID)}; true -> {state, store_stream(State, Stream#stream{flow=Flow})} end; @@ -400,6 +391,35 @@ update_flow(State=#http2_state{socket=Socket, transport=Transport, [] end. +%% Only update the connection's window. +update_window(State=#http2_state{socket=Socket, transport=Transport, + opts=#{initial_connection_window_size := ConnWindow}, http2_machine=HTTP2Machine0}) -> + case cow_http2_machine:ensure_window(ConnWindow, HTTP2Machine0) of + ok -> + State; + {ok, Increment, HTTP2Machine} -> + Transport:send(Socket, cow_http2:window_update(Increment)), + State#http2_state{http2_machine=HTTP2Machine} + end. + +%% Update both the connection and the stream's window. +update_window(State=#http2_state{socket=Socket, transport=Transport, + opts=#{initial_connection_window_size := ConnWindow, initial_stream_window_size := StreamWindow}, + http2_machine=HTTP2Machine0}, StreamID) -> + {Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(ConnWindow, HTTP2Machine0) of + ok -> {<<>>, HTTP2Machine0}; + {ok, Increment1, HTTP2Machine1} -> {cow_http2:window_update(Increment1), HTTP2Machine1} + end, + {Data2, HTTP2Machine} = case cow_http2_machine:ensure_window(StreamID, StreamWindow, HTTP2Machine2) of + ok -> {<<>>, HTTP2Machine2}; + {ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3} + end, + case {Data1, Data2} of + {<<>>, <<>>} -> ok; + _ -> Transport:send(Socket, [Data1, Data2]) + end, + State#http2_state{http2_machine=HTTP2Machine}. + %% We may have to cancel streams even if we receive multiple %% GOAWAY frames as the LastStreamID value may be lower than %% the one previously received. diff --git a/test/flow_SUITE.erl b/test/flow_SUITE.erl index 937af26a..5076bbc0 100644 --- a/test/flow_SUITE.erl +++ b/test/flow_SUITE.erl @@ -71,6 +71,8 @@ default_flow_http2(_) -> flow => 1, %% We set the max frame size to the same as the initial %% window size in order to reduce the number of data messages. + initial_connection_window_size => 65535, + initial_stream_window_size => 65535, max_frame_size_received => 65535 }, protocols => [http2] @@ -86,8 +88,11 @@ default_flow_http2(_) -> %% Then we confirm that we can override it per request. StreamRef2 = gun:get(ConnPid, "/", [], #{flow => 2}), {response, nofin, 200, _} = gun:await(ConnPid, StreamRef2), - %% We set the flow to 2 therefore we will receive *3* data messages - %% and then nothing because two windows have been fully consumed. + %% We set the flow to 2 but due to the ensure_window algorithm + %% we end up receiving *5* data messages before flow control kicks in, + %% equivalent to 3 SSE events. + {data, nofin, _} = gun:await(ConnPid, StreamRef2), + {data, nofin, _} = gun:await(ConnPid, StreamRef2), {data, nofin, _} = gun:await(ConnPid, StreamRef2), {data, nofin, _} = gun:await(ConnPid, StreamRef2), {data, nofin, _} = gun:await(ConnPid, StreamRef2), @@ -132,7 +137,11 @@ flow_http2(_) -> {ok, ConnPid} = gun:open("localhost", Port, #{ %% We set the max frame size to the same as the initial %% window size in order to reduce the number of data messages. - http2_opts => #{max_frame_size_received => 65535}, + http2_opts => #{ + initial_connection_window_size => 65535, + initial_stream_window_size => 65535, + max_frame_size_received => 65535 + }, protocols => [http2] }), {ok, http2} = gun:await_up(ConnPid), @@ -145,14 +154,16 @@ flow_http2(_) -> %% We consumed all the window available. 65535 = byte_size(D1) + byte_size(D2), {error, timeout} = gun:await(ConnPid, StreamRef, 3000), - %% We then update the flow and get *3* more data messages but no more. + %% We then update the flow and get *5* more data messages but no more. gun:update_flow(ConnPid, StreamRef, 2), {data, nofin, D3} = gun:await(ConnPid, StreamRef), {data, nofin, D4} = gun:await(ConnPid, StreamRef), {data, nofin, D5} = gun:await(ConnPid, StreamRef), + {data, nofin, D6} = gun:await(ConnPid, StreamRef), + {data, nofin, D7} = gun:await(ConnPid, StreamRef), %% We consumed all the window available again. - %% D3 is the end of the truncated D2, D4 is full and D5 truncated. - 65535 = byte_size(D3) + byte_size(D4) + byte_size(D5), + %% D3 is the end of the truncated D2, D4, D5 and D6 are full and D7 truncated. + 131070 = byte_size(D3) + byte_size(D4) + byte_size(D5) + byte_size(D6) + byte_size(D7), {error, timeout} = gun:await(ConnPid, StreamRef, 1000), gun:close(ConnPid) after @@ -302,6 +313,8 @@ sse_flow_http2(_) -> %% window size in order to reduce the number of data messages. http2_opts => #{ content_handlers => [gun_sse_h, gun_data_h], + initial_connection_window_size => 65535, + initial_stream_window_size => 65535, max_frame_size_received => 65535 }, protocols => [http2] @@ -314,10 +327,12 @@ sse_flow_http2(_) -> %% the second event was fully received. {sse, _} = gun:await(ConnPid, StreamRef), {error, timeout} = gun:await(ConnPid, StreamRef, 3000), - %% We then update the flow and get 2 more event messages but no more. + %% We then update the flow and get 3 more event messages but no more. + %% We get an extra message because of the ensure_window algorithm. gun:update_flow(ConnPid, StreamRef, 2), {sse, _} = gun:await(ConnPid, StreamRef), {sse, _} = gun:await(ConnPid, StreamRef), + {sse, _} = gun:await(ConnPid, StreamRef), {error, timeout} = gun:await(ConnPid, StreamRef, 1000), gun:close(ConnPid) after diff --git a/test/gun_test.erl b/test/gun_test.erl index e74fcd01..a2cbf6dc 100644 --- a/test/gun_test.erl +++ b/test/gun_test.erl @@ -79,6 +79,8 @@ http2_handshake(Socket, Transport) -> %% Receive the SETTINGS from the preface. {ok, <>} = Transport:recv(Socket, 3, 5000), {ok, <<4:8, 0:40, _:Len/binary>>} = Transport:recv(Socket, 6 + Len, 5000), + %% Receive the WINDOW_UPDATE sent with the preface. + {ok, <<4:24, 8:8, 0:40, _:32>>} = Transport:recv(Socket, 13, 5000), %% Send the SETTINGS ack. ok = Transport:send(Socket, cow_http2:settings_ack()), %% Receive the SETTINGS ack. diff --git a/test/rfc7540_SUITE.erl b/test/rfc7540_SUITE.erl index 0db9dd0c..14e9c0c6 100644 --- a/test/rfc7540_SUITE.erl +++ b/test/rfc7540_SUITE.erl @@ -98,9 +98,17 @@ lingering_data_counts_toward_connection_window(_) -> %% Skip RST_STREAM. {ok, << 4:24, 3:8, 1:40, _:32 >>} = gen_tcp:recv(Socket, 13, 1000), %% Received a WINDOW_UPDATE frame after we got RST_STREAM. - {ok, << 4:24, 8:8, 0:40, 1000:32 >>} = gen_tcp:recv(Socket, 13, 1000) + {ok, << 4:24, 8:8, 0:40, Increment:32 >>} = gen_tcp:recv(Socket, 13, 1000), + true = Increment > 0 end), - {ok, ConnPid} = gun:open("localhost", Port, #{protocols => [http2]}), + {ok, ConnPid} = gun:open("localhost", Port, #{ + protocols => [http2], + http2_opts => #{ + %% We don't set 65535 because we still want to have an initial WINDOW_UPDATE. + initial_connection_window_size => 65536, + initial_stream_window_size => 65535 + } + }), {ok, http2} = gun:await_up(ConnPid), timer:sleep(100), %% Give enough time for the handshake to fully complete. %% Step 1.