diff --git a/Makefile b/Makefile index 1609a376..371d2ff9 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,7 @@ CT_OPTS += -ct_hooks gun_ct_hook [] # -boot start_sasl LOCAL_DEPS = public_key ssl DEPS = cowlib -dep_cowlib = git https://github.com/ninenines/cowlib 2.13.0 +dep_cowlib = git https://github.com/Nordix/cowlib respect-remote-concurrency-limit DOC_DEPS = asciideck diff --git a/rebar.config b/rebar.config index e242d757..9f9b2ac5 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,4 @@ {deps, [ -{cowlib,".*",{git,"https://github.com/ninenines/cowlib","2.13.0"}} +{cowlib,".*",{git,"https://github.com/Nordix/cowlib","respect-remote-concurrency-limit"}} ]}. {erl_opts, [debug_info,warn_export_vars,warn_shadow_vars,warn_obsolete_guard]}. diff --git a/src/gun_http2.erl b/src/gun_http2.erl index bfd2d31c..c789e5cf 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -990,7 +990,44 @@ headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, {[], CookieStore0, EvHandlerState0} end. -request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, +request(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, Method, Host, Port, + Path, Headers, Body, InitialFlow, CookieStore, EvHandler, EvHandlerState) + when is_reference(StreamRef) -> + + case cow_http2_machine:is_remote_concurrency_limit_reached(HTTP2Machine) of + true -> + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), + {stream_error, too_many_streams, + 'Maximum concurrency limit has been reached.'}}, + {[], CookieStore, EvHandlerState}; + false -> + request1(State, StreamRef, ReplyTo, Method, Host, Port, + Path, Headers, Body, InitialFlow, CookieStore, + EvHandler, EvHandlerState) + end; +%% Tunneled request. +request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, + Path, Headers, Body, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) -> + case get_stream_by_ref(State, StreamRef) of + %% @todo We should send an error to the user if the stream isn't ready. + Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{ + origin_host := OriginHost, origin_port := OriginPort}}} -> + {Commands, CookieStore, EvHandlerState1} = Proto:request(ProtoState0, RealStreamRef, + ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body, + InitialFlow, CookieStore0, EvHandler, EvHandlerState0), + {ResCommands, EvHandlerState} = tunnel_commands(Commands, + Stream, State, EvHandler, EvHandlerState1), + {ResCommands, CookieStore, EvHandlerState}; + #stream{tunnel=undefined} -> + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, + "The stream is not a tunnel."}}, + {[], CookieStore0, EvHandlerState0}; + error -> + error_stream_not_found(State, StreamRef, ReplyTo), + {[], CookieStore0, EvHandlerState0} + end. + +request1(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) when is_reference(StreamRef) -> @@ -1005,7 +1042,7 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, RequestEvent = #{ stream_ref => RealStreamRef, reply_to => ReplyTo, - function => ?FUNCTION_NAME, + function => request, method => Method, authority => Authority, path => Path, @@ -1039,27 +1076,6 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, end; Error={error, _} -> {Error, CookieStore, EvHandlerState1} - end; -%% Tunneled request. -request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, - Path, Headers, Body, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) -> - case get_stream_by_ref(State, StreamRef) of - %% @todo We should send an error to the user if the stream isn't ready. - Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{ - origin_host := OriginHost, origin_port := OriginPort}}} -> - {Commands, CookieStore, EvHandlerState1} = Proto:request(ProtoState0, RealStreamRef, - ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body, - InitialFlow, CookieStore0, EvHandler, EvHandlerState0), - {ResCommands, EvHandlerState} = tunnel_commands(Commands, - Stream, State, EvHandler, EvHandlerState1), - {ResCommands, CookieStore, EvHandlerState}; - #stream{tunnel=undefined} -> - ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, - "The stream is not a tunnel."}}, - {[], CookieStore0, EvHandlerState0}; - error -> - error_stream_not_found(State, StreamRef, ReplyTo), - {[], CookieStore0, EvHandlerState0} end. initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow; diff --git a/test/rfc7540_SUITE.erl b/test/rfc7540_SUITE.erl index 79ae347e..93103576 100644 --- a/test/rfc7540_SUITE.erl +++ b/test/rfc7540_SUITE.erl @@ -377,6 +377,31 @@ lingering_data_counts_toward_connection_window(_) -> timer:sleep(300), gun:close(ConnPid). +respect_max_concurrent_streams(_) -> + doc("The SETTINGS_MAX_CONCURRENT_STREAMS setting can be used to " + "restrict the number of concurrent streams. (RFC7540 5.1.2, RFC7540 6.5.2)"), + Ref = make_ref(), + Routes = [{'_', [{"/delayed", delayed_hello_h, 500}]}], + ProtoOpts = #{ + env => #{dispatch => cowboy_router:compile(Routes)}, + tcp => #{protocols => [http2]}, + max_concurrent_streams => 1 + }, + [{ref, _}, {port, Port}] = gun_test:init_cowboy_tcp(Ref, ProtoOpts, []), + try + {ok, ConnPid} = gun:open("localhost", Port, #{protocols => [http2]}), + {ok, http2} = gun:await_up(ConnPid), + StreamRef1 = gun:get(ConnPid, "/delayed"), + StreamRef2 = gun:get(ConnPid, "/delayed"), + {error, {stream_error, Reason}} = gun:await(ConnPid, StreamRef2), + {stream_error, too_many_streams, _Human} = Reason, + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1), + {ok, _} = gun:await_body(ConnPid, StreamRef1), + gun:close(ConnPid) + after + cowboy:stop_listener(Ref) + end. + headers_priority_flag(_) -> doc("HEADERS frames may include a PRIORITY flag indicating " "that stream dependency information is attached. (RFC7540 6.2)"),