diff --git a/.github/workflows/elixir.yml b/.github/workflows/elixir.yml index 243fd1c9..2b8b15b9 100644 --- a/.github/workflows/elixir.yml +++ b/.github/workflows/elixir.yml @@ -5,59 +5,59 @@ on: branches: - main jobs: - mix_test: - strategy: - fail-fast: false - matrix: - include: - - pair: - elixir: '1.7.4' - otp: '22.x' - - pair: - elixir: '1.14.1' - otp: '25.x' - lint: lint - - runs-on: ubuntu-20.04 - steps: - - uses: actions/checkout@v2 - - uses: actions/cache@v2 - with: - path: deps - key: deps-${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-${{ hashFiles('**/mix.lock') }} - restore-keys: deps-${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}- - - uses: actions/cache@v2 - with: - path: _build - key: build-${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-${{ hashFiles('**/mix.lock') }} - restore-keys: build-${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}- - - uses: erlef/setup-beam@v1 - with: - otp-version: ${{matrix.pair.otp}} - elixir-version: ${{matrix.pair.elixir}} - - - name: Install Dependencies - run: mix deps.get - - - run: mix format --check-formatted - if: ${{ matrix.lint }} - - - run: mix deps.unlock --check-unused - if: ${{ matrix.lint }} - - - run: mix deps.compile - - - run: mix compile --warnings-as-errors - if: ${{ matrix.lint }} - - - name: Run Credo - run: mix credo - if: ${{ matrix.lint }} - - - name: Run Tests - run: mix test - if: ${{ ! matrix.lint }} - - - name: Run Tests - run: mix test --warnings-as-errors - if: ${{ matrix.lint }} + mix_test: + strategy: + fail-fast: false + matrix: + include: + - pair: + elixir: "1.7.4" + otp: "22.x" + - pair: + elixir: "1.14.5" + otp: "25.x" + lint: lint + + runs-on: ubuntu-20.04 + steps: + - uses: actions/checkout@v2 + - uses: actions/cache@v2 + with: + path: deps + key: deps-${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-${{ hashFiles('**/mix.lock') }} + restore-keys: deps-${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}- + - uses: actions/cache@v2 + with: + path: _build + key: build-${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-${{ hashFiles('**/mix.lock') }} + restore-keys: build-${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}- + - uses: erlef/setup-beam@v1 + with: + otp-version: ${{matrix.pair.otp}} + elixir-version: ${{matrix.pair.elixir}} + + - name: Install Dependencies + run: mix deps.get + + - run: mix format --check-formatted + if: ${{ matrix.lint }} + + - run: mix deps.unlock --check-unused + if: ${{ matrix.lint }} + + - run: mix deps.compile + + - run: mix compile --warnings-as-errors + if: ${{ matrix.lint }} + + - name: Run Credo + run: mix credo + if: ${{ matrix.lint }} + + - name: Run Tests + run: mix test + if: ${{ ! matrix.lint }} + + - name: Run Tests + run: mix test --warnings-as-errors + if: ${{ matrix.lint }} diff --git a/lib/finch.ex b/lib/finch.ex index 60c0003e..fadc5d08 100644 --- a/lib/finch.ex +++ b/lib/finch.ex @@ -6,6 +6,7 @@ defmodule Finch do |> Enum.fetch!(1) alias Finch.{PoolManager, Request, Response} + require Finch.Pool use Supervisor @@ -83,6 +84,18 @@ defmodule Finch do """ @type name() :: atom() + @type request_opt() :: {:pool_timeout, pos_integer()} | {:receive_timeout, pos_integer()} + + @typedoc """ + Options used by request functions. + """ + @type request_opts() :: [request_opt()] + + @typedoc """ + The reference used to identify a request sent using `async_request/3`. + """ + @opaque request_ref() :: Finch.Pool.request_ref() + @typedoc """ The stream function given to `stream/5`. """ @@ -263,14 +276,9 @@ defmodule Finch do ## Options - * `:pool_timeout` - This timeout is applied when we check out a connection from the pool. - Default value is `5_000`. - - * `:receive_timeout` - The maximum time to wait for a response before returning an error. - Default value is `15_000`. - + Shares options with `request/3`. """ - @spec stream(Request.t(), name(), acc, stream(acc), keyword) :: + @spec stream(Request.t(), name(), acc, stream(acc), request_opts()) :: {:ok, acc} | {:error, Exception.t()} when acc: term() def stream(%Request{} = req, name, acc, fun, opts \\ []) when is_function(fun, 2) do @@ -280,20 +288,10 @@ defmodule Finch do end defp __stream__(%Request{} = req, name, acc, fun, opts) when is_function(fun, 2) do - shp = build_shp(req) - {pool, pool_mod} = PoolManager.get_pool(name, shp) + {pool, pool_mod} = get_pool(req, name) pool_mod.request(pool, req, acc, fun, opts) end - defp build_shp(%Request{scheme: scheme, unix_socket: unix_socket}) - when is_binary(unix_socket) do - {scheme, {:local, unix_socket}, 0} - end - - defp build_shp(%Request{scheme: scheme, host: host, port: port}) do - {scheme, host, port} - end - @doc """ Sends an HTTP request and returns a `Finch.Response` struct. @@ -306,7 +304,7 @@ defmodule Finch do Default value is `15_000`. """ - @spec request(Request.t(), name(), keyword()) :: + @spec request(Request.t(), name(), request_opts()) :: {:ok, Response.t()} | {:error, Exception.t()} def request(req, name, opts \\ []) @@ -351,7 +349,7 @@ defmodule Finch do See `request/3` for more detailed information. """ - @spec request!(Request.t(), name(), keyword()) :: + @spec request!(Request.t(), name(), request_opts()) :: Response.t() def request!(%Request{} = req, name, opts \\ []) do case request(req, name, opts) do @@ -359,4 +357,86 @@ defmodule Finch do {:error, exception} -> raise exception end end + + @doc """ + Sends an HTTP request asynchronously, returning a request reference. + + If the request is sent using HTTP1, an extra process is spawned to + consume messages from the underlying socket. If you wish to maximize + request rate, a strategy using `request/3` or `stream/5` should be + used to avoid this overhead. + + ## Receiving the response + + Response information is sent to the calling process as it is received + in `{ref, response}` tuples. + + If the calling process exits before the request has completed, the + request will be canceled. + + Responses include: + + * `{:status, status}` - HTTP response status + * `{:headers, headers}` - HTTP response headers + * `{:data, data}` - section of the HTTP response body + * `{:error, exception}` - an error occurred during the request + * `:done` - request has completed successfully + + On a successful request, a single `:status` message will be followed + by a single `:headers` message, after which more than one `:data` + messages may be sent. If trailing headers are present, a final + `:headers` message may be sent. Any `:done` or `:error` message + indicates that the request has succeeded or failed and no further + messages are expected. + + ## Example + + iex> Stream.resource( + ...> fn -> + ...> Finch.build(:get, "https://httpbin.org/stream/5") + ...> |> Finch.async_request(MyFinch) + ...> end, + ...> fn ref -> + ...> receive do + ...> {^ref, :done} -> {:halt, ref} + ...> {^ref, response} -> {[response], ref} + ...> end + ...> end, + ...> fn _ref -> :ok end + ...> ) |> Enum.to_list() + [ + {:status, 200}, + {:headers, [...]}, + {:data, "..."}, + ... + :done + ] + + ## Options + + Shares options with `request/3`. + """ + @spec async_request(Request.t(), name(), request_opts()) :: request_ref() + def async_request(%Request{} = req, name, opts \\ []) do + {pool, pool_mod} = get_pool(req, name) + pool_mod.async_request(pool, req, opts) + end + + @doc """ + Cancels a request sent with `async_request/3`. + """ + @spec cancel_async_request(request_ref()) :: :ok + def cancel_async_request(request_ref) when Finch.Pool.is_request_ref(request_ref) do + {pool_mod, _cancel_ref} = request_ref + pool_mod.cancel_async_request(request_ref) + end + + defp get_pool(%Request{scheme: scheme, unix_socket: unix_socket}, name) + when is_binary(unix_socket) do + PoolManager.get_pool(name, {scheme, {:local, unix_socket}, 0}) + end + + defp get_pool(%Request{scheme: scheme, host: host, port: port}, name) do + PoolManager.get_pool(name, {scheme, host, port}) + end end diff --git a/lib/finch/http1/pool.ex b/lib/finch/http1/pool.ex index 2927e9ad..d5e17b94 100644 --- a/lib/finch/http1/pool.ex +++ b/lib/finch/http1/pool.ex @@ -74,6 +74,48 @@ defmodule Finch.HTTP1.Pool do end end + @impl Finch.Pool + def async_request(pool, req, opts) do + owner = self() + + pid = + spawn_link(fn -> + monitor = Process.monitor(owner) + request_ref = {__MODULE__, self()} + + case request(pool, req, {owner, monitor, request_ref}, &send_async_response/2, opts) do + {:ok, _} -> send(owner, {request_ref, :done}) + {:error, error} -> send(owner, {request_ref, {:error, error}}) + end + end) + + {__MODULE__, pid} + end + + defp send_async_response(response, {owner, monitor, request_ref}) do + if process_down?(monitor) do + exit(:shutdown) + end + + send(owner, {request_ref, response}) + {owner, monitor, request_ref} + end + + defp process_down?(monitor) do + receive do + {:DOWN, ^monitor, _, _, _} -> true + after + 0 -> false + end + end + + @impl Finch.Pool + def cancel_async_request({_, pid} = _request_ref) do + Process.unlink(pid) + Process.exit(pid, :shutdown) + :ok + end + @impl NimblePool def init_pool({registry, shp, opts}) do # Register our pool with our module name as the key. This allows the caller diff --git a/lib/finch/http2/pool.ex b/lib/finch/http2/pool.ex index 6e93b8ac..d2f53297 100644 --- a/lib/finch/http2/pool.ex +++ b/lib/finch/http2/pool.ex @@ -27,16 +27,16 @@ defmodule Finch.HTTP2.Pool do # Call the pool with the request. The pool will multiplex multiple requests # and stream the result set back to the calling process using `send` - @impl true + @impl Finch.Pool def request(pool, request, acc, fun, opts) do opts = Keyword.put_new(opts, :receive_timeout, @default_receive_timeout) timeout = opts[:receive_timeout] + request_ref = make_request_ref(pool) metadata = %{request: request} - start_time = Telemetry.start(:send, metadata) - with {:ok, ref} <- :gen_statem.call(pool, {:request, request, opts}) do + with :ok <- :gen_statem.call(pool, {:request, request_ref, request, opts}) do Telemetry.stop(:send, start_time, metadata) monitor = Process.monitor(pool) # If the timeout is an integer, we add a fail-safe "after" clause that fires @@ -48,7 +48,7 @@ defmodule Finch.HTTP2.Pool do start_time = Telemetry.start(:recv, metadata) try do - result = response_waiting_loop(acc, fun, ref, monitor, fail_safe_timeout) + result = response_waiting_loop(acc, fun, request_ref, monitor, fail_safe_timeout) case result do {:ok, acc, {status, headers}} -> @@ -65,8 +65,8 @@ defmodule Finch.HTTP2.Pool do kind, error -> Telemetry.exception(:recv, start_time, kind, error, __STACKTRACE__, metadata) - :gen_statem.call(pool, {:cancel, ref}) - clean_responses(ref) + :ok = :gen_statem.call(pool, {:cancel, request_ref}) + clean_responses(request_ref) Process.demonitor(monitor) :erlang.raise(kind, error, __STACKTRACE__) @@ -74,56 +74,84 @@ defmodule Finch.HTTP2.Pool do end end + @impl Finch.Pool + def async_request(pool, req, opts) do + opts = Keyword.put_new(opts, :receive_timeout, @default_receive_timeout) + request_ref = make_request_ref(pool) + + :ok = :gen_statem.cast(pool, {:async_request, self(), request_ref, req, opts}) + + request_ref + end + + @impl Finch.Pool + def cancel_async_request({_, {pool, _}} = request_ref) do + :ok = :gen_statem.call(pool, {:cancel, request_ref}) + clean_responses(request_ref) + end + + defp make_request_ref(pool) do + {__MODULE__, {pool, make_ref()}} + end + defp response_waiting_loop( acc, fun, - ref, + request_ref, monitor_ref, fail_safe_timeout, status \\ nil, headers \\ [] ) - defp response_waiting_loop(acc, fun, ref, monitor_ref, fail_safe_timeout, status, headers) do + defp response_waiting_loop( + acc, + fun, + request_ref, + monitor_ref, + fail_safe_timeout, + status, + headers + ) do receive do - {:status, ^ref, value} -> + {^request_ref, {:status, value}} -> response_waiting_loop( fun.({:status, value}, acc), fun, - ref, + request_ref, monitor_ref, fail_safe_timeout, value, headers ) - {:headers, ^ref, value} -> + {^request_ref, {:headers, value}} -> response_waiting_loop( fun.({:headers, value}, acc), fun, - ref, + request_ref, monitor_ref, fail_safe_timeout, status, headers ++ value ) - {:data, ^ref, value} -> + {^request_ref, {:data, value}} -> response_waiting_loop( fun.({:data, value}, acc), fun, - ref, + request_ref, monitor_ref, fail_safe_timeout, status, headers ) - {:done, ^ref} -> + {^request_ref, :done} -> Process.demonitor(monitor_ref) {:ok, acc, {status, headers}} - {:error, ^ref, error} -> + {^request_ref, {:error, error}} -> Process.demonitor(monitor_ref) {:error, error, {status, headers}} @@ -139,19 +167,11 @@ defmodule Finch.HTTP2.Pool do end end - defp clean_responses(ref) do + defp clean_responses(request_ref) do receive do - {kind, ^ref, _value} when kind in [:status, :headers, :data] -> - clean_responses(ref) - - {:done, ^ref} -> - :ok - - {:error, ^ref, _error} -> - :ok + {^request_ref, _} -> clean_responses(request_ref) after - 0 -> - :ok + 0 -> :ok end end @@ -169,6 +189,8 @@ defmodule Finch.HTTP2.Pool do host: host, port: port, requests: %{}, + refs: %{}, + requests_by_pid: %{}, backoff_base: 500, backoff_max: 10_000, connect_opts: pool_opts[:conn_opts] || [] @@ -188,8 +210,11 @@ defmodule Finch.HTTP2.Pool do # requests def disconnected(:enter, _, data) do :ok = - Enum.each(data.requests, fn {ref, request} -> - send(request.from_pid, {:error, ref, Error.exception(:connection_closed)}) + Enum.each(data.requests, fn {_ref, request} -> + send( + request.from_pid, + {request.request_ref, {:error, Error.exception(:connection_closed)}} + ) end) # It's possible that we're entering this state before we are alerted of the @@ -247,15 +272,21 @@ defmodule Finch.HTTP2.Pool do end # Immediately fail a request if we're disconnected - def disconnected({:call, from}, {:request, _, _}, _data) do + def disconnected({:call, from}, {:request, _, _, _}, _data) do {:keep_state_and_data, {:reply, from, {:error, Error.exception(:disconnected)}}} end # Ignore cancel requests if we are disconnected - def disconnected({:call, from}, {:cancel, _ref}, _data) do + def disconnected({:call, from}, {:cancel, _request_ref}, _data) do {:keep_state_and_data, {:reply, from, {:error, Error.exception(:disconnected)}}} end + # Immediately fail a request if we're disconnected + def disconnected(:cast, {:async_request, pid, request_ref, _, _}, _data) do + send(pid, {request_ref, {:error, Error.exception(:disconnected)}}) + :keep_state_and_data + end + # We cancel all request timeouts as soon as we enter the :disconnected state, but # some timeouts might fire while changing states, so we need to handle them here. # Since we replied to all pending requests when entering the :disconnected state, @@ -282,47 +313,25 @@ defmodule Finch.HTTP2.Pool do # Issue request to the upstream server. We store a ref to the request so we # know who to respond to when we've completed everything - def connected({:call, from}, {:request, req, opts}, data) do - request = RequestStream.new(req.body, from) - - with {:ok, data, ref} <- request(data, req), - data = put_in(data.requests[ref], request), - {:ok, data, actions} <- continue_request(data, ref) do - # Set a timeout to close the request after a given timeout - request_timeout = {{:timeout, {:request_timeout, ref}}, opts[:receive_timeout], nil} - - {:keep_state, data, actions ++ [request_timeout]} - else - {:error, data, %HTTPError{reason: :closed_for_writing}} -> - actions = [{:reply, from, {:error, "read_only"}}] - - if HTTP2.open?(data.conn, :read) && Enum.any?(data.requests) do - {:next_state, :connected_read_only, data, actions} - else - {:next_state, :disconnected, data, actions} - end + def connected({:call, {from_pid, _from_ref} = from}, {:request, request_ref, req, opts}, data) do + send_request(from, from_pid, request_ref, req, opts, data) + end - {:error, data, error} -> - actions = [{:reply, from, {:error, error}}] + def connected({:call, from}, {:cancel, request_ref}, data) do + data = cancel_request(data, request_ref) + {:keep_state, data, {:reply, from, :ok}} + end - if HTTP2.open?(data.conn) do - {:keep_state, data, actions} - else - {:next_state, :disconnected, data, actions} - end + def connected(:cast, {:async_request, pid, request_ref, req, opts}, data) do + if is_nil(data.requests_by_pid[pid]) do + Process.monitor(pid) end - end - def connected({:call, from}, {:cancel, ref}, data) do - conn = - case HTTP2.cancel_request(data.conn, ref) do - {:ok, conn} -> conn - {:error, conn, _error} -> conn - end + send_request(nil, pid, request_ref, req, opts, data) + end - data = put_in(data.conn, conn) - {_from, data} = pop_in(data.requests[ref]) - {:keep_state, data, {:reply, from, :ok}} + def connected(:info, {:DOWN, _, :process, pid, _}, data) do + {:keep_state, cancel_requests(data, pid)} end def connected(:info, message, data) do @@ -333,8 +342,8 @@ defmodule Finch.HTTP2.Pool do cond do HTTP2.open?(data.conn, :write) -> - {data, streaming_actions} = continue_requests(data) - {:keep_state, data, response_actions ++ streaming_actions} + data = continue_requests(data) + {:keep_state, data, response_actions} HTTP2.open?(data.conn, :read) && Enum.any?(data.requests) -> {:next_state, :connected_read_only, data, response_actions} @@ -365,10 +374,10 @@ defmodule Finch.HTTP2.Pool do end def connected({:timeout, {:request_timeout, ref}}, _content, data) do - with {:pop, {request, data}} when not is_nil(request) <- {:pop, pop_in(data.requests[ref])}, + with {:pop, {request, data}} when not is_nil(request) <- {:pop, pop_request(data, ref)}, {:ok, conn} <- HTTP2.cancel_request(data.conn, ref) do data = put_in(data.conn, conn) - send(request.from_pid, {:error, ref, Error.exception(:request_timeout)}) + send(request.from_pid, {request.request_ref, {:error, Error.exception(:request_timeout)}}) {:keep_state, data} else {:error, conn, _error} -> @@ -398,31 +407,41 @@ defmodule Finch.HTTP2.Pool do def connected_read_only(event, content, data) def connected_read_only(:enter, _old_state, data) do - {actions, data} = - Enum.flat_map_reduce(data.requests, data, fn + data = + Enum.reduce(data.requests, data, fn # request is awaiting a response and should stay in state - {_ref, %{status: :done}}, data -> - {[], data} + {_ref, %{stream: %{status: :done}}}, data -> + data # request is still sending data and should be discarded - {ref, %{status: :streaming} = request}, data -> - {^request, data} = pop_in(data.requests[ref]) - {[{:reply, request.from, {:error, Error.exception(:read_only)}}], data} + {ref, %{stream: %{status: :streaming}} = request}, data -> + {^request, data} = pop_request(data, ref) + reply(request, {:error, Error.exception(:read_only)}) + data end) - {:keep_state, data, actions} + {:keep_state, data} end # If we're in a read only state than respond with an error immediately - def connected_read_only({:call, from}, {:request, _, _}, _) do + def connected_read_only({:call, from}, {:request, _, _, _}, _) do {:keep_state_and_data, {:reply, from, {:error, Error.exception(:read_only)}}} end - def connected_read_only({:call, from}, {:cancel, ref}, data) do - {_from, data} = pop_in(data.requests[ref]) + def connected_read_only({:call, from}, {:cancel, request_ref}, data) do + data = cancel_request(data, request_ref) {:keep_state, data, {:reply, from, :ok}} end + def connected_read_only(:cast, {:async_request, pid, request_ref, _, _}, _) do + send(pid, {request_ref, {:error, Error.exception(:read_only)}}) + :keep_state_and_data + end + + def connected_read_only(:info, {:DOWN, _, :process, pid, _}, data) do + {:keep_state, cancel_requests(data, pid)} + end + def connected_read_only(:info, message, data) do case HTTP2.stream(data.conn, message) do {:ok, conn, responses} -> @@ -468,11 +487,11 @@ defmodule Finch.HTTP2.Pool do # We might get a request timeout that fired in the moment when we received the # whole request, so we don't have the request in the state but we get the # timer event anyways. In those cases, we don't do anything. - {request, data} = pop_in(data.requests[ref]) + {request, data} = pop_request(data, ref) # Its possible that the request doesn't exist so we guard against that here. if request != nil do - send(request.from_pid, {:error, ref, Error.exception(:request_timeout)}) + send(request.from_pid, {request.request_ref, {:error, Error.exception(:request_timeout)}}) end # If we're out of requests then we should enter the disconnected state. @@ -484,30 +503,87 @@ defmodule Finch.HTTP2.Pool do end end + defp send_request(from, from_pid, request_ref, req, opts, data) do + request = %{ + stream: RequestStream.new(req.body), + from: from, + from_pid: from_pid, + request_ref: request_ref + } + + body = if req.body == nil, do: nil, else: :stream + + data + |> start_request(req.method, Finch.Request.request_path(req), req.headers, body) + |> stream_request(request, opts) + end + + defp start_request(data, method, path, headers, body) do + case HTTP2.request(data.conn, method, path, headers, body) do + {:ok, conn, ref} -> {:ok, put_in(data.conn, conn), ref} + {:error, conn, reason} -> {:error, put_in(data.conn, conn), reason} + end + end + + defp stream_request({:ok, data, ref}, request, opts) do + data = put_request(data, ref, request) + + case continue_request(data, ref, request) do + {:ok, data} -> + # Set a timeout to close the request after a given timeout + request_timeout = {{:timeout, {:request_timeout, ref}}, opts[:receive_timeout], nil} + + {:keep_state, data, [request_timeout]} + + error -> + stream_request(error, request, opts) + end + end + + defp stream_request({:error, data, %HTTPError{reason: :closed_for_writing}}, request, _opts) do + reply(request, {:error, Error.exception(:read_only)}) + + if HTTP2.open?(data.conn, :read) && Enum.any?(data.requests) do + {:next_state, :connected_read_only, data} + else + {:next_state, :disconnected, data} + end + end + + defp stream_request({:error, data, error}, request, _opts) do + reply(request, {:error, error}) + + if HTTP2.open?(data.conn) do + {:keep_state, data} + else + {:next_state, :disconnected, data} + end + end + defp handle_responses(data, responses) do Enum.reduce(responses, {data, _actions = []}, fn response, {data, actions} -> handle_response(data, response, actions) end) end - defp handle_response(data, {kind, ref, _value} = response, actions) + defp handle_response(data, {kind, ref, value}, actions) when kind in [:status, :headers, :data] do if request = data.requests[ref] do - send(request.from_pid, response) + send(request.from_pid, {request.request_ref, {kind, value}}) end {data, actions} end - defp handle_response(data, {:done, ref} = response, actions) do - {request, data} = pop_in(data.requests[ref]) - if request, do: send(request.from_pid, response) + defp handle_response(data, {:done, ref}, actions) do + {request, data} = pop_request(data, ref) + if request, do: send(request.from_pid, {request.request_ref, :done}) {data, [cancel_request_timeout_action(ref) | actions]} end - defp handle_response(data, {:error, ref, _error} = response, actions) do - {request, data} = pop_in(data.requests[ref]) - if request, do: send(request.from_pid, response) + defp handle_response(data, {:error, ref, error}, actions) do + {request, data} = pop_request(data, ref) + if request, do: send(request.from_pid, {request.request_ref, {:error, error}}) {data, [cancel_request_timeout_action(ref) | actions]} end @@ -535,17 +611,6 @@ defmodule Finch.HTTP2.Pool do :rand.uniform(max_sleep) end - # a wrapper around Mint.HTTP2.request/5 - # wrapping allows us to more easily encapsulate the conn within `data` - defp request(data, req) do - body = if req.body == nil, do: nil, else: :stream - - case HTTP2.request(data.conn, req.method, Finch.Request.request_path(req), req.headers, body) do - {:ok, conn, ref} -> {:ok, put_in(data.conn, conn), ref} - {:error, conn, reason} -> {:error, put_in(data.conn, conn), reason} - end - end - # this is also a wrapper (Mint.HTTP2.stream_request_body/3) defp stream_request_body(data, ref, body) do case HTTP2.stream_request_body(data.conn, ref, body) do @@ -556,7 +621,7 @@ defmodule Finch.HTTP2.Pool do defp stream_chunks(data, ref, body) do with {:ok, data} <- stream_request_body(data, ref, body) do - if data.requests[ref].status == :done do + if data.requests[ref].stream.status == :done do stream_request_body(data, ref, :eof) else {:ok, data} @@ -565,42 +630,45 @@ defmodule Finch.HTTP2.Pool do end defp continue_requests(data) do - Enum.reduce(data.requests, {data, []}, fn {ref, request}, {data, actions} -> - with true <- request.status == :streaming, + Enum.reduce(data.requests, data, fn {ref, request}, data -> + with true <- request.stream.status == :streaming, true <- HTTP2.open?(data.conn, :write), - {:ok, data, new_actions} <- continue_request(data, ref) do - {data, new_actions ++ actions} + {:ok, data} <- continue_request(data, ref, request) do + data else false -> - {data, actions} + data {:error, data, %HTTPError{reason: :closed_for_writing}} -> - {data, [{:reply, request.from, {:error, "read_only"}} | actions]} + reply(request, {:error, Error.exception(:read_only)}) + data {:error, data, reason} -> - {data, [{:reply, request.from, {:error, reason}} | actions]} + reply(request, {:error, reason}) + data end end) end - defp continue_request(data, ref) do - request = data.requests[ref] - reply_action = {:reply, request.from, {:ok, ref}} - - with :streaming <- request.status, + defp continue_request(data, ref, request) do + with :streaming <- request.stream.status, window = smallest_window(data.conn, ref), - {request, chunks} = RequestStream.next_chunk(request, window), + {stream, chunks} = RequestStream.next_chunk(request.stream, window), + request = %{request | stream: stream}, data = put_in(data.requests[ref], request), {:ok, data} <- stream_chunks(data, ref, chunks) do - actions = if request.status == :done, do: [reply_action], else: [] + if request.stream.status == :done && request.from do + reply(request, :ok) + end - {:ok, data, actions} + {:ok, data} else :done -> - {:ok, data, [reply_action]} + if request.from, do: reply(request, :ok) + {:ok, data} {:error, data, reason} -> - {_from, data} = pop_in(data.requests[ref]) + {_from, data} = pop_request(data, ref) {:error, data, reason} end @@ -612,4 +680,84 @@ defmodule Finch.HTTP2.Pool do HTTP2.get_window_size(conn, {:request, ref}) ) end + + defp cancel_requests(data, pid) do + if request_refs = data.requests_by_pid[pid] do + Enum.reduce(request_refs, data, fn request_ref, data -> + cancel_request(data, request_ref) + end) + else + data + end + end + + defp cancel_request(data, request_ref) do + # If the Mint ref isn't present, it was removed because the request + # already completed and there's nothing to cancel. + if ref = data.refs[request_ref] do + conn = + case HTTP2.cancel_request(data.conn, ref) do + {:ok, conn} -> conn + {:error, conn, _error} -> conn + end + + data = put_in(data.conn, conn) + {_from, data} = pop_request(data, ref) + data + else + data + end + end + + defp put_request(data, ref, request) do + data + |> put_in([:requests, ref], request) + |> put_in([:refs, request.request_ref], ref) + |> put_pid(request.from_pid, request.request_ref) + end + + defp pop_request(data, ref) do + case pop_in(data.requests[ref]) do + {nil, data} -> + {nil, data} + + {request, data} -> + {_ref, data} = + data + |> pop_pid(request.from_pid, request.request_ref) + |> pop_in([:refs, request.request_ref]) + + {request, data} + end + end + + defp put_pid(data, pid, request_ref) do + update_in(data.requests_by_pid, fn requests_by_pid -> + Map.update(requests_by_pid, pid, MapSet.new([request_ref]), &MapSet.put(&1, request_ref)) + end) + end + + defp pop_pid(data, pid, request_ref) do + update_in(data.requests_by_pid, fn requests_by_pid -> + requests = + requests_by_pid + |> Map.get(pid, MapSet.new()) + |> MapSet.delete(request_ref) + + if Enum.empty?(requests) do + Map.delete(requests_by_pid, pid) + else + Map.put(requests_by_pid, pid, requests) + end + end) + end + + defp reply(%{from: nil, from_pid: pid, request_ref: request_ref}, reply) do + send(pid, {request_ref, reply}) + :ok + end + + defp reply(%{from: from}, reply) do + :gen_statem.reply(from, reply) + end end diff --git a/lib/finch/http2/request_stream.ex b/lib/finch/http2/request_stream.ex index e4d57e94..8fc4d8bc 100644 --- a/lib/finch/http2/request_stream.ex +++ b/lib/finch/http2/request_stream.ex @@ -1,9 +1,9 @@ defmodule Finch.HTTP2.RequestStream do @moduledoc false - defstruct [:body, :from, :from_pid, :status, :buffer, :continuation] + defstruct [:body, :status, :buffer, :continuation] - def new(body, {from_pid, _from_ref} = from) do + def new(body) do enumerable = case body do {:stream, stream} -> Stream.map(stream, &with_byte_size/1) @@ -15,8 +15,6 @@ defmodule Finch.HTTP2.RequestStream do %__MODULE__{ body: body, - from: from, - from_pid: from_pid, status: if(body == nil, do: :done, else: :streaming), buffer: <<>>, continuation: &Enumerable.reduce(enumerable, &1, reducer) diff --git a/lib/finch/pool.ex b/lib/finch/pool.ex index e1e0c100..e9839ffa 100644 --- a/lib/finch/pool.ex +++ b/lib/finch/pool.ex @@ -1,7 +1,16 @@ defmodule Finch.Pool do @moduledoc false # Defines a behaviour that both http1 and http2 pools need to implement. + + @type request_ref :: {pool_mod :: module(), cancel_ref :: term()} + @callback request(pid(), Finch.Request.t(), acc, Finch.stream(acc), list()) :: {:ok, acc} | {:error, term()} when acc: term() + + @callback async_request(pid(), Finch.Request.t(), list()) :: request_ref() + + @callback cancel_async_request(request_ref()) :: :ok + + defguard is_request_ref(ref) when tuple_size(ref) == 2 and is_atom(elem(ref, 0)) end diff --git a/lib/finch/telemetry.ex b/lib/finch/telemetry.ex index 3f8a8db4..227ec4ba 100644 --- a/lib/finch/telemetry.ex +++ b/lib/finch/telemetry.ex @@ -55,7 +55,7 @@ defmodule Finch.Telemetry do ### Queue Start - `[:finch, :queue, :start]` - Executed before checking out a connection from the pool. + `[:finch, :queue, :start]` - Executed before checking out an HTTP1 connection from the pool. #### Measurements @@ -68,7 +68,7 @@ defmodule Finch.Telemetry do ### Queue Stop - `[:finch, :queue, :stop]` - Executed after a connection is retrieved from the pool. + `[:finch, :queue, :stop]` - Executed after an HTTP1 connection is retrieved from the pool. #### Measurements @@ -82,7 +82,7 @@ defmodule Finch.Telemetry do ### Queue Exception - `[:finch, :queue, :exception]` - Executed if checking out a connection throws an exception. + `[:finch, :queue, :exception]` - Executed if checking out an HTTP1 connection throws an exception. #### Measurements @@ -199,7 +199,7 @@ defmodule Finch.Telemetry do ### Reused Connection - `[:finch, :reused_connection]` - Executed if an existing connection is reused. There are no measurements provided with this event. + `[:finch, :reused_connection]` - Executed if an existing HTTP1 connection is reused. There are no measurements provided with this event. #### Metadata @@ -209,7 +209,7 @@ defmodule Finch.Telemetry do ### Conn Max Idle Time Exceeded - `[:finch, :conn_max_idle_time_exceeded]` - Executed if a connection was discarded because the `conn_max_idle_time` had been reached. + `[:finch, :conn_max_idle_time_exceeded]` - Executed if an HTTP1 connection was discarded because the `conn_max_idle_time` had been reached. #### Measurements @@ -223,7 +223,7 @@ defmodule Finch.Telemetry do ### Pool Max Idle Time Exceeded - `[:finch, :pool_max_idle_time_exceeded]` - Executed if a pool was terminated because the `pool_max_idle_time` has been reached. There are no measurements provided with this event. + `[:finch, :pool_max_idle_time_exceeded]` - Executed if an HTTP1 pool was terminated because the `pool_max_idle_time` has been reached. There are no measurements provided with this event. #### Metadata @@ -233,7 +233,7 @@ defmodule Finch.Telemetry do ### Max Idle Time Exceeded (Deprecated) - `[:finch, :max_idle_time_exceeded]` - Executed if a connection was discarded because the `max_idle_time` had been reached. + `[:finch, :max_idle_time_exceeded]` - Executed if an HTTP1 connection was discarded because the `max_idle_time` had been reached. *Deprecated:* use `:conn_max_idle_time_exceeded` event instead. diff --git a/test/finch/http1/integration_proxy_test.exs b/test/finch/http1/integration_proxy_test.exs index 57a3980a..fbfeb96b 100644 --- a/test/finch/http1/integration_proxy_test.exs +++ b/test/finch/http1/integration_proxy_test.exs @@ -4,7 +4,7 @@ defmodule Finch.HTTP1.IntegrationProxyTest do alias Finch.HTTP1Server setup_all do - port = 4002 + port = 4004 # Not quite a proper forward proxy server, but good enough {:ok, _} = HTTP1Server.start(port) diff --git a/test/finch/http1/pool_test.exs b/test/finch/http1/pool_test.exs index 70bc226a..563d9555 100644 --- a/test/finch/http1/pool_test.exs +++ b/test/finch/http1/pool_test.exs @@ -1,6 +1,17 @@ defmodule Finch.HTTP1.PoolTest do use FinchCase, async: true + alias Finch.HTTP1Server + + setup_all do + port = 4005 + url = "http://localhost:#{port}" + + start_supervised!({HTTP1Server, port: port}) + + {:ok, url: url} + end + @tag capture_log: true test "should terminate pool after idle timeout", %{bypass: bypass, finch_name: finch_name} do test_name = to_string(finch_name) @@ -47,4 +58,85 @@ defmodule Finch.HTTP1.PoolTest do :telemetry.detach(test_name) end + + describe "async_request" do + @describetag bypass: false + + setup %{finch_name: finch_name} do + start_supervised!({Finch, name: finch_name, pools: %{default: [protocol: :http1]}}) + :ok + end + + test "sends responses to the caller", %{finch_name: finch_name, url: url} do + request_ref = + Finch.build(:get, url <> "/stream/5/5") + |> Finch.async_request(finch_name) + + assert_receive {^request_ref, {:status, 200}}, 500 + assert_receive {^request_ref, {:headers, headers}} when is_list(headers) + for _ <- 1..5, do: assert_receive({^request_ref, {:data, _}}) + assert_receive {^request_ref, :done} + end + + test "sends errors to the caller", %{finch_name: finch_name, url: url} do + request_ref = + Finch.build(:get, url <> "/wait/100") + |> Finch.async_request(finch_name, receive_timeout: 10) + + assert_receive {^request_ref, {:error, %{reason: :timeout}}}, 500 + end + + test "canceled with cancel_async_request/1", %{ + finch_name: finch_name, + url: url + } do + ref = + Finch.build(:get, url <> "/stream/1/50") + |> Finch.async_request(finch_name) + + assert_receive {^ref, {:status, 200}}, 500 + Finch.HTTP1.Pool.cancel_async_request(ref) + refute_receive {^ref, {:data, _}} + end + + test "canceled if calling process exits normally", %{finch_name: finch_name, url: url} do + outer = self() + + spawn(fn -> + ref = + Finch.build(:get, url <> "/stream/5/500") + |> Finch.async_request(finch_name) + + # allow process to exit normally after sending + send(outer, ref) + end) + + assert_receive {Finch.HTTP1.Pool, pid} when is_pid(pid) + + ref = Process.monitor(pid) + assert_receive {:DOWN, ^ref, _, _, _}, 500 + end + + test "canceled if calling process exits abnormally", %{finch_name: finch_name, url: url} do + outer = self() + + caller = + spawn(fn -> + ref = + Finch.build(:get, url <> "/stream/5/500") + |> Finch.async_request(finch_name) + + send(outer, ref) + + # ensure process stays alive until explicitly exited + Process.sleep(:infinity) + end) + + assert_receive {Finch.HTTP1.Pool, pid} when is_pid(pid) + + ref = Process.monitor(pid) + Process.exit(caller, :shutdown) + assert_receive {:DOWN, ^ref, _, _, _}, 500 + end + end end diff --git a/test/finch/telemetry_test.exs b/test/finch/http1/telemetry_test.exs similarity index 97% rename from test/finch/telemetry_test.exs rename to test/finch/http1/telemetry_test.exs index 9432c408..6c260941 100644 --- a/test/finch/telemetry_test.exs +++ b/test/finch/http1/telemetry_test.exs @@ -1,4 +1,4 @@ -defmodule Finch.TelemetryTest do +defmodule Finch.HTTP1.TelemetryTest do use FinchCase, async: false @moduletag :capture_log @@ -8,7 +8,9 @@ defmodule Finch.TelemetryTest do Plug.Conn.send_resp(conn, 200, "OK") end) - start_supervised!({Finch, name: finch_name, pools: %{default: [conn_max_idle_time: 10]}}) + start_supervised!( + {Finch, name: finch_name, pools: %{default: [protocol: :http1, conn_max_idle_time: 10]}} + ) :ok end @@ -37,6 +39,8 @@ defmodule Finch.TelemetryTest do assert_receive {:telemetry_event, [:finch, :recv, :stop], %{headers: headers}} assert {"x-foo-response", "bar-response"} in headers + + :telemetry.detach(to_string(finch_name)) end test "reports response status code", %{bypass: bypass, finch_name: finch_name} do @@ -55,6 +59,8 @@ defmodule Finch.TelemetryTest do assert {:ok, %{status: 201}} = Finch.request(request, finch_name) assert_receive {:telemetry_event, [:finch, :recv, :stop], %{status: 201}} + + :telemetry.detach(to_string(finch_name)) end test "reports reused connections", %{bypass: bypass, finch_name: finch_name} do diff --git a/test/finch/http2/integration_test.exs b/test/finch/http2/integration_test.exs index 4ba922e1..0d31541f 100644 --- a/test/finch/http2/integration_test.exs +++ b/test/finch/http2/integration_test.exs @@ -149,9 +149,45 @@ defmodule Finch.HTTP2.IntegrationTest do ) assert catch_throw( - Finch.stream(Finch.build(:get, url), TestFinch, :ok, fn {:status, _}, :ok -> - throw(:error) - end) + Finch.stream( + Finch.build(:get, url <> "/stream/1/500"), + TestFinch, + :ok, + fn {:status, _}, :ok -> + throw(:error) + end + ) + ) == :error + + refute_receive _ + end + + test "cancel completed streaming response", %{url: url} do + start_supervised!( + {Finch, + name: TestFinch, + pools: %{ + default: [ + protocol: :http2, + conn_opts: [ + transport_opts: [ + verify: :verify_none + ] + ] + ] + }} + ) + + assert catch_throw( + Finch.stream( + Finch.build(:get, url), + TestFinch, + :ok, + fn + {:data, _}, :ok -> throw(:error) + _, :ok -> :ok + end + ) ) == :error refute_receive _ diff --git a/test/finch/http2/pool_test.exs b/test/finch/http2/pool_test.exs index 7b680d05..8fa56555 100644 --- a/test/finch/http2/pool_test.exs +++ b/test/finch/http2/pool_test.exs @@ -4,6 +4,7 @@ defmodule Finch.HTTP2.PoolTest do import Mint.HTTP2.Frame alias Finch.HTTP2.Pool + alias Finch.HTTP2Server alias Finch.MockHTTP2Server defmacrop assert_recv_frames(frames) when is_list(frames) do @@ -35,235 +36,447 @@ defmodule Finch.HTTP2.PoolTest do ) end - test "request/response", %{request: req} do - us = self() + describe "requests" do + test "request/response", %{request: req} do + us = self() - {:ok, pool} = - start_server_and_connect_with(fn port -> - start_pool(port) + {:ok, pool} = + start_server_and_connect_with(fn port -> + start_pool(port) + end) + + spawn(fn -> + {:ok, resp} = request(pool, req, []) + send(us, {:resp, {:ok, resp}}) end) - spawn(fn -> - {:ok, resp} = request(pool, req, []) - send(us, {:resp, {:ok, resp}}) - end) + assert_recv_frames([headers(stream_id: stream_id)]) - assert_recv_frames([headers(stream_id: stream_id)]) + hbf = server_encode_headers([{":status", "200"}]) - hbf = server_encode_headers([{":status", "200"}]) + server_send_frames([ + headers(stream_id: stream_id, hbf: hbf, flags: set_flags(:headers, [:end_headers])), + data(stream_id: stream_id, data: "hello to you", flags: set_flags(:data, [:end_stream])) + ]) - server_send_frames([ - headers(stream_id: stream_id, hbf: hbf, flags: set_flags(:headers, [:end_headers])), - data(stream_id: stream_id, data: "hello to you", flags: set_flags(:data, [:end_stream])) - ]) + assert_receive {:resp, {:ok, {200, [], "hello to you"}}} + end - assert_receive {:resp, {:ok, {200, [], "hello to you"}}} - end + test "errors such as :max_header_list_size_reached are returned to the caller", %{ + request: req + } do + server_settings = [max_header_list_size: 5] - test "errors such as :max_header_list_size_reached are returned to the caller", %{request: req} do - server_settings = [max_header_list_size: 5] + {:ok, pool} = + start_server_and_connect_with([server_settings: server_settings], fn port -> + start_pool(port) + end) - {:ok, pool} = - start_server_and_connect_with([server_settings: server_settings], fn port -> - start_pool(port) - end) + assert {:error, error} = request(pool, %{req | headers: [{"foo", "bar"}]}, []) + assert %{reason: {:max_header_list_size_exceeded, _, _}} = error + end - assert {:error, error} = request(pool, %{req | headers: [{"foo", "bar"}]}, []) - assert %{reason: {:max_header_list_size_exceeded, _, _}} = error - end + test "if server sends GOAWAY and then replies, we get the replies but are closed for writing", + %{request: req} do + us = self() - test "if server sends GOAWAY and then replies, we get the replies but are closed for writing", - %{request: req} do - us = self() + {:ok, pool} = + start_server_and_connect_with(fn port -> + start_pool(port) + end) - {:ok, pool} = - start_server_and_connect_with(fn port -> - start_pool(port) + spawn(fn -> + result = request(pool, req, []) + send(us, {:resp, result}) end) - spawn(fn -> - result = request(pool, req, []) - send(us, {:resp, result}) - end) + assert_recv_frames([headers(stream_id: stream_id)]) - assert_recv_frames([headers(stream_id: stream_id)]) + hbf = server_encode_headers([{":status", "200"}]) - hbf = server_encode_headers([{":status", "200"}]) + # Force the connection to enter read only mode + server_send_frames([ + goaway(last_stream_id: stream_id, error_code: :no_error, debug_data: "all good") + ]) - # Force the connection to enter read only mode - server_send_frames([ - goaway(last_stream_id: stream_id, error_code: :no_error, debug_data: "all good") - ]) + :timer.sleep(10) - :timer.sleep(10) + # We can't send any more requests since the connection is closed for writing. + assert {:error, %Finch.Error{reason: :read_only}} = request(pool, req, []) - # We can't send any more requests since the connection is closed for writing. - assert {:error, %Finch.Error{reason: :read_only}} = request(pool, req, []) + server_send_frames([ + headers(stream_id: stream_id, hbf: hbf, flags: set_flags(:headers, [:end_headers])), + data(stream_id: stream_id, data: "hello", flags: set_flags(:data, [:end_stream])) + ]) - server_send_frames([ - headers(stream_id: stream_id, hbf: hbf, flags: set_flags(:headers, [:end_headers])), - data(stream_id: stream_id, data: "hello", flags: set_flags(:data, [:end_stream])) - ]) + assert_receive {:resp, {:ok, {200, [], "hello"}}} - assert_receive {:resp, {:ok, {200, [], "hello"}}} + # If the server now closes the socket, we actually shut down. + :ok = :ssl.close(server_socket()) - # If the server now closes the socket, we actually shut down. - :ok = :ssl.close(server_socket()) + Process.sleep(50) - Process.sleep(50) + # If we try to make a request now that the server shut down, we get an error. + assert {:error, %Finch.Error{reason: :disconnected}} = request(pool, req, []) + end - # If we try to make a request now that the server shut down, we get an error. - assert {:error, %Finch.Error{reason: :disconnected}} = request(pool, req, []) - end + test "if server disconnects while there are waiting clients, we notify those clients", %{ + request: req + } do + us = self() - test "if server disconnects while there are waiting clients, we notify those clients", %{ - request: req - } do - us = self() + {:ok, pool} = + start_server_and_connect_with(fn port -> + start_pool(port) + end) - {:ok, pool} = - start_server_and_connect_with(fn port -> - start_pool(port) + spawn(fn -> + result = request(pool, req, []) + send(us, {:resp, result}) end) - spawn(fn -> - result = request(pool, req, []) - send(us, {:resp, result}) - end) + assert_recv_frames([headers(stream_id: stream_id)]) - assert_recv_frames([headers(stream_id: stream_id)]) + hbf = server_encode_headers([{":status", "200"}]) - hbf = server_encode_headers([{":status", "200"}]) + server_send_frames([ + headers(stream_id: stream_id, hbf: hbf, flags: set_flags(:headers, [:end_headers])) + ]) - server_send_frames([ - headers(stream_id: stream_id, hbf: hbf, flags: set_flags(:headers, [:end_headers])) - ]) + :ok = :ssl.close(server_socket()) - :ok = :ssl.close(server_socket()) + assert_receive {:resp, {:error, %Finch.Error{reason: :connection_closed}}} + end - assert_receive {:resp, {:error, %Finch.Error{reason: :connection_closed}}} - end + test "if connections reaches max concurrent streams, we return an error", %{request: req} do + server_settings = [max_concurrent_streams: 1] - test "if connections reaches max concurrent streams, we return an error", %{request: req} do - server_settings = [max_concurrent_streams: 1] + {:ok, pool} = + start_server_and_connect_with([server_settings: server_settings], fn port -> + start_pool(port) + end) - {:ok, pool} = - start_server_and_connect_with([server_settings: server_settings], fn port -> - start_pool(port) + spawn(fn -> + request(pool, req, []) end) - spawn(fn -> - request(pool, req, []) - end) + assert_recv_frames([headers(stream_id: _stream_id)]) - assert_recv_frames([headers(stream_id: _stream_id)]) + assert {:error, %Mint.HTTPError{reason: :too_many_concurrent_requests}} = + request(pool, req, []) + end - assert {:error, %Mint.HTTPError{reason: :too_many_concurrent_requests}} = - request(pool, req, []) - end + test "request timeout with timeout of 0", %{request: req} do + us = self() - test "request timeout with timeout of 0", %{request: req} do - us = self() + {:ok, pool} = + start_server_and_connect_with(fn port -> + start_pool(port) + end) - {:ok, pool} = - start_server_and_connect_with(fn port -> - start_pool(port) + spawn(fn -> + resp = request(pool, req, receive_timeout: 0) + send(us, {:resp, resp}) end) - spawn(fn -> - resp = request(pool, req, receive_timeout: 0) - send(us, {:resp, resp}) - end) + assert_recv_frames([headers(stream_id: stream_id), rst_stream(stream_id: stream_id)]) - assert_recv_frames([headers(stream_id: stream_id), rst_stream(stream_id: stream_id)]) + assert_receive {:resp, {:error, %Finch.Error{reason: :request_timeout}}} + end - assert_receive {:resp, {:error, %Finch.Error{reason: :request_timeout}}} - end + test "request timeout with timeout > 0", %{request: req} do + us = self() - test "request timeout with timeout > 0", %{request: req} do - us = self() + {:ok, pool} = + start_server_and_connect_with(fn port -> + start_pool(port) + end) - {:ok, pool} = - start_server_and_connect_with(fn port -> - start_pool(port) + spawn(fn -> + resp = request(pool, req, receive_timeout: 50) + send(us, {:resp, resp}) end) - spawn(fn -> - resp = request(pool, req, receive_timeout: 50) - send(us, {:resp, resp}) - end) + assert_recv_frames([headers(stream_id: stream_id)]) - assert_recv_frames([headers(stream_id: stream_id)]) + hbf = server_encode_headers([{":status", "200"}]) - hbf = server_encode_headers([{":status", "200"}]) + server_send_frames([ + headers(stream_id: stream_id, hbf: hbf, flags: set_flags(:headers, [:end_headers])) + ]) - server_send_frames([ - headers(stream_id: stream_id, hbf: hbf, flags: set_flags(:headers, [:end_headers])) - ]) + assert_receive {:resp, {:error, %Finch.Error{reason: :request_timeout}}} + end - assert_receive {:resp, {:error, %Finch.Error{reason: :request_timeout}}} - end + test "request timeout with timeout > 0 that fires after request is done", %{request: req} do + us = self() - test "request timeout with timeout > 0 that fires after request is done", %{request: req} do - us = self() + {:ok, pool} = + start_server_and_connect_with(fn port -> + start_pool(port) + end) - {:ok, pool} = - start_server_and_connect_with(fn port -> - start_pool(port) + spawn(fn -> + resp = request(pool, req, receive_timeout: 50) + send(us, {:resp, resp}) end) - spawn(fn -> - resp = request(pool, req, receive_timeout: 50) - send(us, {:resp, resp}) - end) + assert_recv_frames([headers(stream_id: stream_id)]) - assert_recv_frames([headers(stream_id: stream_id)]) + server_send_frames([ + headers( + stream_id: stream_id, + hbf: server_encode_headers([{":status", "200"}]), + flags: set_flags(:headers, [:end_headers, :end_stream]) + ) + ]) - server_send_frames([ - headers( - stream_id: stream_id, - hbf: server_encode_headers([{":status", "200"}]), - flags: set_flags(:headers, [:end_headers, :end_stream]) - ) - ]) + assert_receive {:resp, {:ok, _}} - assert_receive {:resp, {:ok, _}} + assert_recv_frames([rst_stream(stream_id: ^stream_id, error_code: :no_error)]) - assert_recv_frames([rst_stream(stream_id: ^stream_id, error_code: :no_error)]) + refute_receive _any, 200 + end - refute_receive _any, 200 - end + test "request timeout with timeout > 0 where :done arrives after timeout", %{request: req} do + us = self() - test "request timeout with timeout > 0 where :done arrives after timeout", %{request: req} do - us = self() + {:ok, pool} = + start_server_and_connect_with(fn port -> + start_pool(port) + end) - {:ok, pool} = - start_server_and_connect_with(fn port -> - start_pool(port) + spawn(fn -> + resp = request(pool, req, receive_timeout: 10) + send(us, {:resp, resp}) end) - spawn(fn -> - resp = request(pool, req, receive_timeout: 10) - send(us, {:resp, resp}) - end) + assert_recv_frames([headers(stream_id: stream_id)]) + + # We sleep enough so that the timeout fires, then we send a response. + Process.sleep(30) + + server_send_frames([ + headers( + stream_id: stream_id, + hbf: server_encode_headers([{":status", "200"}]), + flags: set_flags(:headers, [:end_headers, :end_stream]) + ) + ]) + + # When there's a timeout, we cancel the request. + assert_recv_frames([rst_stream(stream_id: ^stream_id, error_code: :cancel)]) + + assert_receive {:resp, {:error, %Finch.Error{reason: :request_timeout}}} + end + end + + describe "async requests" do + test "sends responses to the caller", %{test: finch_name} do + start_finch!(finch_name) + {:ok, url} = start_server!() + + request_ref = + Finch.build(:get, url) + |> Finch.async_request(finch_name) + + assert_receive {^request_ref, {:status, 200}}, 300 + assert_receive {^request_ref, {:headers, headers}} when is_list(headers) + assert_receive {^request_ref, {:data, "Hello world!"}} + assert_receive {^request_ref, :done} + end + + test "sends errors to the caller", %{test: finch_name} do + start_finch!(finch_name) + {:ok, url} = start_server!() + + request_ref = + Finch.build(:get, url <> "/wait/100") + |> Finch.async_request(finch_name, receive_timeout: 10) + + assert_receive {^request_ref, {:error, %{reason: :request_timeout}}}, 300 + end + + test "canceled with cancel_async_request/1", %{test: finch_name} do + start_finch!(finch_name) + {:ok, url} = start_server!() + + ref = + Finch.build(:get, url <> "/stream/1/50") + |> Finch.async_request(finch_name) + + assert_receive {^ref, {:status, 200}} + Finch.HTTP2.Pool.cancel_async_request(ref) + refute_receive {^ref, {:data, _}} + end + + test "canceled if calling process exits normally", %{test: finch_name} do + start_finch!(finch_name) + {:ok, url} = start_server!() + + outer = self() + + caller = + spawn(fn -> + ref = + Finch.build(:get, url <> "/stream/5/500") + |> Finch.async_request(finch_name) + + # allow process to exit normally after sending + send(outer, ref) + end) + + assert_receive {Finch.HTTP2.Pool, {pool, _}} = ref - assert_recv_frames([headers(stream_id: stream_id)]) + assert {_, %{refs: %{^ref => _}}} = :sys.get_state(pool) - # We sleep enough so that the timeout fires, then we send a response. - Process.sleep(30) + Process.sleep(100) + refute Process.alive?(caller) - server_send_frames([ - headers( - stream_id: stream_id, - hbf: server_encode_headers([{":status", "200"}]), - flags: set_flags(:headers, [:end_headers, :end_stream]) + assert {_, %{refs: refs}} = :sys.get_state(pool) + assert refs == %{} + end + + test "canceled if calling process exits abnormally", %{test: finch_name} do + start_finch!(finch_name) + {:ok, url} = start_server!() + + outer = self() + + caller = + spawn(fn -> + ref = + Finch.build(:get, url <> "/stream/5/500") + |> Finch.async_request(finch_name) + + send(outer, ref) + + # ensure process stays alive until explicitly exited + Process.sleep(:infinity) + end) + + assert_receive {Finch.HTTP2.Pool, {pool, _}} = ref + + assert {_, %{refs: %{^ref => _}}} = :sys.get_state(pool) + + Process.exit(caller, :shutdown) + Process.sleep(100) + refute Process.alive?(caller) + + assert {_, %{refs: refs}} = :sys.get_state(pool) + assert refs == %{} + end + + test "if server sends GOAWAY and then replies, we get the replies but are closed for writing", + %{request: req} do + {:ok, pool} = + start_server_and_connect_with(fn port -> + start_pool(port) + end) + + ref = Pool.async_request(pool, req, []) + + assert_recv_frames([headers(stream_id: stream_id)]) + + hbf = server_encode_headers([{":status", "200"}]) + + # Force the connection to enter read only mode + server_send_frames([ + goaway(last_stream_id: stream_id, error_code: :no_error, debug_data: "all good") + ]) + + :timer.sleep(10) + + # We can't send any more requests since the connection is closed for writing. + ref2 = Pool.async_request(pool, req, []) + assert_receive {^ref2, {:error, %Finch.Error{reason: :read_only}}} + + server_send_frames([ + headers(stream_id: stream_id, hbf: hbf, flags: set_flags(:headers, [:end_headers])), + data(stream_id: stream_id, data: "hello", flags: set_flags(:data, [:end_stream])) + ]) + + assert_receive {^ref, {:status, 200}} + assert_receive {^ref, {:headers, []}} + assert_receive {^ref, {:data, "hello"}} + + # If the server now closes the socket, we actually shut down. + :ok = :ssl.close(server_socket()) + + Process.sleep(50) + + # If we try to make a request now that the server shut down, we get an error. + ref3 = Pool.async_request(pool, req, []) + assert_receive {^ref3, {:error, %Finch.Error{reason: :disconnected}}} + end + + test "if server disconnects while there are waiting clients, we notify those clients", %{ + request: req + } do + {:ok, pool} = + start_server_and_connect_with(fn port -> + start_pool(port) + end) + + ref = Pool.async_request(pool, req, []) + + assert_recv_frames([headers(stream_id: stream_id)]) + + hbf = server_encode_headers([{":status", "200"}]) + + server_send_frames([ + headers(stream_id: stream_id, hbf: hbf, flags: set_flags(:headers, [:end_headers])) + ]) + + assert_receive {^ref, {:status, 200}} + assert_receive {^ref, {:headers, []}} + + :ok = :ssl.close(server_socket()) + + assert_receive {^ref, {:error, %Finch.Error{reason: :connection_closed}}} + end + + test "errors such as :max_header_list_size_reached are returned to the caller", %{ + request: req + } do + server_settings = [max_header_list_size: 5] + + {:ok, pool} = + start_server_and_connect_with([server_settings: server_settings], fn port -> + start_pool(port) + end) + + ref = Pool.async_request(pool, %{req | headers: [{"foo", "bar"}]}, []) + + assert_receive {^ref, {:error, %{reason: {:max_header_list_size_exceeded, _, _}}}} + end + + defp start_finch!(finch_name) do + start_supervised!( + {Finch, + name: finch_name, + pools: %{ + default: [ + protocol: :http2, + count: 5, + conn_opts: [ + transport_opts: [ + verify: :verify_none + ] + ] + ] + }} ) - ]) + end - # When there's a timeout, we cancel the request. - assert_recv_frames([rst_stream(stream_id: ^stream_id, error_code: :cancel)]) + defp start_server! do + port = 4006 + url = "https://localhost:#{port}" - assert_receive {:resp, {:error, %Finch.Error{reason: :request_timeout}}} + start_supervised!({HTTP2Server, port: port}) + + {:ok, url} + end end @pdict_key {__MODULE__, :http2_test_server} diff --git a/test/finch/http2/telemetry_test.exs b/test/finch/http2/telemetry_test.exs new file mode 100644 index 00000000..61909a03 --- /dev/null +++ b/test/finch/http2/telemetry_test.exs @@ -0,0 +1,251 @@ +defmodule Finch.HTTP2.TelemetryTest do + use FinchCase, async: false + + @moduletag :capture_log + + setup %{bypass: bypass, finch_name: finch_name} do + Bypass.expect(bypass, "GET", "/", fn conn -> + Plug.Conn.send_resp(conn, 200, "OK") + end) + + start_supervised!( + {Finch, name: finch_name, pools: %{default: [protocol: :http2, conn_max_idle_time: 10]}} + ) + + :ok + end + + test "reports request and response headers", %{bypass: bypass, finch_name: finch_name} do + self = self() + + :telemetry.attach_many( + to_string(finch_name), + [[:finch, :send, :start], [:finch, :recv, :stop]], + fn name, _, metadata, _ -> send(self, {:telemetry_event, name, metadata}) end, + nil + ) + + Bypass.expect(bypass, "GET", "/", fn conn -> + conn + |> Plug.Conn.put_resp_header("x-foo-response", "bar-response") + |> Plug.Conn.send_resp(200, "OK") + end) + + request = Finch.build(:get, endpoint(bypass), [{"x-foo-request", "bar-request"}]) + assert {:ok, %{status: 200}} = Finch.request(request, finch_name) + + assert_receive {:telemetry_event, [:finch, :send, :start], + %{request: %{headers: [{"x-foo-request", "bar-request"}]}}} + + assert_receive {:telemetry_event, [:finch, :recv, :stop], %{headers: headers}} + assert {"x-foo-response", "bar-response"} in headers + + :telemetry.detach(to_string(finch_name)) + end + + test "reports response status code", %{bypass: bypass, finch_name: finch_name} do + self = self() + + :telemetry.attach( + to_string(finch_name), + [:finch, :recv, :stop], + fn name, _, metadata, _ -> send(self, {:telemetry_event, name, metadata}) end, + nil + ) + + Bypass.expect(bypass, "GET", "/", fn conn -> Plug.Conn.send_resp(conn, 201, "OK") end) + + request = Finch.build(:get, endpoint(bypass)) + assert {:ok, %{status: 201}} = Finch.request(request, finch_name) + + assert_receive {:telemetry_event, [:finch, :recv, :stop], %{status: 201}} + + :telemetry.detach(to_string(finch_name)) + end + + test "reports request spans", %{bypass: bypass, finch_name: finch_name} do + parent = self() + ref = make_ref() + + handler = fn event, measurements, meta, _config -> + case event do + [:finch, :request, :start] -> + assert is_integer(measurements.system_time) + assert meta.name == finch_name + assert %Finch.Request{} = meta.request + + send(parent, {ref, :start}) + + [:finch, :request, :stop] -> + assert is_integer(measurements.duration) + assert meta.name == finch_name + assert %Finch.Request{} = meta.request + + assert {:ok, %Finch.Response{body: "OK", status: 200}} = meta.result + + send(parent, {ref, :stop}) + + [:finch, :request, :exception] -> + assert is_integer(measurements.duration) + assert meta.name == finch_name + assert %Finch.Request{} = meta.request + assert meta.kind == :exit + assert {:timeout, _} = meta.reason + assert meta.stacktrace != nil + + send(parent, {ref, :exception}) + + _ -> + flunk("Unknown event") + end + end + + :telemetry.attach_many( + to_string(finch_name), + [ + [:finch, :request, :start], + [:finch, :request, :stop], + [:finch, :request, :exception] + ], + handler, + nil + ) + + assert {:ok, %{status: 200}} = + Finch.build(:get, endpoint(bypass)) |> Finch.request(finch_name) + + assert_receive {^ref, :start} + assert_receive {^ref, :stop} + + Bypass.down(bypass) + + :telemetry.detach(to_string(finch_name)) + end + + test "reports connection spans", %{bypass: bypass, finch_name: finch_name} do + parent = self() + ref = make_ref() + + handler = fn event, measurements, meta, _config -> + case event do + [:finch, :connect, :start] -> + assert is_integer(measurements.system_time) + assert is_atom(meta.scheme) + assert is_integer(meta.port) + assert is_binary(meta.host) + send(parent, {ref, :start}) + + [:finch, :connect, :stop] -> + assert is_integer(measurements.duration) + assert is_atom(meta.scheme) + assert is_integer(meta.port) + assert is_binary(meta.host) + send(parent, {ref, :stop}) + + _ -> + flunk("Unknown event") + end + end + + :telemetry.attach_many( + to_string(finch_name), + [ + [:finch, :connect, :start], + [:finch, :connect, :stop] + ], + handler, + nil + ) + + assert {:ok, %{status: 200}} = + Finch.build(:get, endpoint(bypass)) |> Finch.request(finch_name) + + assert_receive {^ref, :start} + assert_receive {^ref, :stop} + + :telemetry.detach(to_string(finch_name)) + end + + test "reports send spans", %{bypass: bypass, finch_name: finch_name} do + parent = self() + ref = make_ref() + + handler = fn event, measurements, meta, _config -> + case event do + [:finch, :send, :start] -> + assert is_integer(measurements.system_time) + assert %Finch.Request{} = meta.request + send(parent, {ref, :start}) + + [:finch, :send, :stop] -> + assert is_integer(measurements.duration) + assert %Finch.Request{} = meta.request + send(parent, {ref, :stop}) + + _ -> + flunk("Unknown event") + end + end + + :telemetry.attach_many( + to_string(finch_name), + [ + [:finch, :send, :start], + [:finch, :send, :stop], + [:finch, :send, :exception] + ], + handler, + nil + ) + + assert {:ok, %{status: 200}} = + Finch.build(:get, endpoint(bypass)) |> Finch.request(finch_name) + + assert_receive {^ref, :start} + assert_receive {^ref, :stop} + + :telemetry.detach(to_string(finch_name)) + end + + test "reports recv spans", %{bypass: bypass, finch_name: finch_name} do + parent = self() + ref = make_ref() + + handler = fn event, measurements, meta, _config -> + case event do + [:finch, :recv, :start] -> + assert is_integer(measurements.system_time) + assert %Finch.Request{} = meta.request + send(parent, {ref, :start}) + + [:finch, :recv, :stop] -> + assert is_integer(measurements.duration) + assert %Finch.Request{} = meta.request + assert is_integer(meta.status) + assert is_list(meta.headers) + send(parent, {ref, :stop}) + + _ -> + flunk("Unknown event") + end + end + + :telemetry.attach_many( + to_string(finch_name), + [ + [:finch, :recv, :start], + [:finch, :recv, :stop] + ], + handler, + nil + ) + + assert {:ok, %{status: 200}} = + Finch.build(:get, endpoint(bypass)) |> Finch.request(finch_name) + + assert_receive {^ref, :start} + assert_receive {^ref, :stop} + + :telemetry.detach(to_string(finch_name)) + end +end diff --git a/test/finch_test.exs b/test/finch_test.exs index c43cbbc4..cf02b19f 100644 --- a/test/finch_test.exs +++ b/test/finch_test.exs @@ -1,6 +1,5 @@ defmodule FinchTest do use FinchCase, async: true - doctest Finch import ExUnit.CaptureIO @@ -725,6 +724,50 @@ defmodule FinchTest do end end + describe "async_request/3 with HTTP/1" do + test "sends response messages to calling process", %{bypass: bypass, finch_name: finch_name} do + start_supervised!({Finch, name: finch_name}) + + Bypass.expect_once(bypass, "GET", "/", fn conn -> + Plug.Conn.send_resp(conn, 200, "OK") + end) + + request_ref = + Finch.build(:get, endpoint(bypass)) + |> Finch.async_request(finch_name) + + assert_receive {^request_ref, {:status, 200}} + assert_receive {^request_ref, {:headers, headers}} when is_list(headers) + assert_receive {^request_ref, {:data, "OK"}} + assert_receive {^request_ref, :done} + end + + test "sends chunked response messages to calling process", %{ + bypass: bypass, + finch_name: finch_name + } do + start_supervised!({Finch, name: finch_name}) + + Bypass.expect(bypass, fn conn -> + conn = Plug.Conn.send_chunked(conn, 200) + + Enum.reduce(1..5, conn, fn _, conn -> + {:ok, conn} = Plug.Conn.chunk(conn, "chunk-data") + conn + end) + end) + + request_ref = + Finch.build(:get, endpoint(bypass)) + |> Finch.async_request(finch_name) + + assert_receive {^request_ref, {:status, 200}} + assert_receive {^request_ref, {:headers, headers}} when is_list(headers) + for _ <- 1..5, do: assert_receive({^request_ref, {:data, "chunk-data"}}) + assert_receive {^request_ref, :done} + end + end + defp get_pools(name, shp) do Registry.lookup(name, shp) end diff --git a/test/support/finch_case.ex b/test/support/finch_case.ex index aa93c1e3..d6ba2be6 100644 --- a/test/support/finch_case.ex +++ b/test/support/finch_case.ex @@ -8,8 +8,14 @@ defmodule FinchCase do end end - setup %{test: finch_name} do - {:ok, bypass: Bypass.open(), finch_name: finch_name} + setup context do + bypass = + case context do + %{bypass: false} -> [] + %{} -> [bypass: Bypass.open()] + end + + {:ok, bypass ++ [finch_name: context.test]} end @doc "Returns the url for a Bypass instance" diff --git a/test/support/http1_server.ex b/test/support/http1_server.ex index 22298ccd..640e99ac 100644 --- a/test/support/http1_server.ex +++ b/test/support/http1_server.ex @@ -1,23 +1,23 @@ defmodule Finch.HTTP1Server do @moduledoc false - def start(port) do - children = [ - Plug.Adapters.Cowboy.child_spec( - scheme: :http, - plug: Finch.HTTP1Server.PlugRouter, - options: [ - port: port, - otp_app: :finch, - protocol_options: [ - idle_timeout: 3_000, - request_timeout: 10_000 - ] + def child_spec(opts) do + Plug.Adapters.Cowboy.child_spec( + scheme: :http, + plug: Finch.HTTP1Server.PlugRouter, + options: [ + port: Keyword.fetch!(opts, :port), + otp_app: :finch, + protocol_options: [ + idle_timeout: 3_000, + request_timeout: 10_000 ] - ) - ] + ] + ) + end - Supervisor.start_link(children, strategy: :one_for_one) + def start(port) do + Supervisor.start_link([child_spec(port: port)], strategy: :one_for_one) end end @@ -37,4 +37,22 @@ defmodule Finch.HTTP1Server.PlugRouter do |> send_resp(200, "Hello #{name}!") |> halt() end + + get "/wait/:delay" do + delay = conn.params["delay"] |> String.to_integer() + Process.sleep(delay) + send_resp(conn, 200, "ok") + end + + get "/stream/:num/:delay" do + num = conn.params["num"] |> String.to_integer() + delay = conn.params["delay"] |> String.to_integer() + conn = send_chunked(conn, 200) + + Enum.reduce(1..num, conn, fn i, conn -> + Process.sleep(delay) + {:ok, conn} = chunk(conn, "chunk-#{i}\n") + conn + end) + end end diff --git a/test/support/http2_server.ex b/test/support/http2_server.ex index 602d0c87..f8d01792 100644 --- a/test/support/http2_server.ex +++ b/test/support/http2_server.ex @@ -3,29 +3,29 @@ defmodule Finch.HTTP2Server do @fixtures_dir Path.expand("../fixtures", __DIR__) - def start(port) do - children = [ - Plug.Adapters.Cowboy.child_spec( - scheme: :https, - plug: Finch.HTTP2Server.PlugRouter, - options: [ - port: port, - cipher_suite: :strong, - certfile: Path.join([@fixtures_dir, "selfsigned.pem"]), - keyfile: Path.join([@fixtures_dir, "selfsigned_key.pem"]), - otp_app: :finch, - protocol_options: [ - idle_timeout: 3_000, - inactivity_timeout: 5_000, - max_keepalive: 1_000, - request_timeout: 10_000, - shutdown_timeout: 10_000 - ] + def child_spec(opts) do + Plug.Adapters.Cowboy.child_spec( + scheme: :https, + plug: Finch.HTTP2Server.PlugRouter, + options: [ + port: Keyword.fetch!(opts, :port), + cipher_suite: :strong, + certfile: Path.join([@fixtures_dir, "selfsigned.pem"]), + keyfile: Path.join([@fixtures_dir, "selfsigned_key.pem"]), + otp_app: :finch, + protocol_options: [ + idle_timeout: 3_000, + inactivity_timeout: 5_000, + max_keepalive: 1_000, + request_timeout: 10_000, + shutdown_timeout: 10_000 ] - ) - ] + ] + ) + end - Supervisor.start_link(children, strategy: :one_for_one) + def start(port) do + Supervisor.start_link([child_spec(port: port)], strategy: :one_for_one) end end @@ -60,4 +60,16 @@ defmodule Finch.HTTP2Server.PlugRouter do Process.sleep(delay) send_resp(conn, 200, "ok") end + + get "/stream/:num/:delay" do + num = conn.params["num"] |> String.to_integer() + delay = conn.params["delay"] |> String.to_integer() + conn = send_chunked(conn, 200) + + Enum.reduce(1..num, conn, fn i, conn -> + Process.sleep(delay) + {:ok, conn} = chunk(conn, "chunk-#{i}\n") + conn + end) + end end