Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for async requests #228

Merged
merged 34 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
9daf971
ci: fix elixir.yml indentation
zachallaun May 30, 2023
8adbf5d
ci: update elixir version and add otp 26
zachallaun May 30, 2023
8aee995
wip: overly simple first pass at `Finch.async_request/3`
zachallaun May 30, 2023
785616c
wip: `Finch.cancel_async_request/2` (test failing)
zachallaun May 31, 2023
bb3beaa
wip: push async api to the adapters
zachallaun Jun 2, 2023
186385e
test: more reliable tests & test async cancel with http1 pool
zachallaun Jun 3, 2023
fc97489
fix: exit async req process if caller exits
zachallaun Jun 3, 2023
5cbba4e
test: add tests for success and error async requests to http1 pool test
zachallaun Jun 3, 2023
4f7e31e
refactor: use request_ref in existing http2 pool messages
zachallaun Jun 3, 2023
00e1752
refactor: use request_ref to cancel http2 requests
zachallaun Jun 3, 2023
f29b841
wip: implement async_request & cancelation for http2
zachallaun Jun 4, 2023
69d37d6
refactor: unify sync and async http2 request flow
zachallaun Jun 4, 2023
0b65bc8
refactor: clean up http2 request handling
zachallaun Jun 4, 2023
e43a005
fix: return consistent errors in http2 pool
zachallaun Jun 4, 2023
6632115
test(http2): add async request tests for error conditions
zachallaun Jun 6, 2023
8968a2e
test: only start finch and server when needed
zachallaun Jun 6, 2023
4d63e0b
fix(http2): cancel async requests if calling pid exits
zachallaun Jun 6, 2023
a1f81d2
test(http1): improve flaky pool tests
zachallaun Jun 6, 2023
7af3595
docs: improve docs and typespecs for `async_request/3`
zachallaun Jun 6, 2023
42d3fbe
test: no doctests on main module
zachallaun Jun 7, 2023
6e75444
test: http2 telemetry + always detach telemetry after test
zachallaun Jun 7, 2023
3142972
fix: don't send `{ref, :ok}` to async requests callers
zachallaun Jun 7, 2023
cf3e822
refactor: clean up http2 pool request sending
zachallaun Jun 7, 2023
c12aeec
fix: cancel http2 request when in connected_read_only
zachallaun Jun 8, 2023
efdb914
refactor: don't store extra state in request stream struct
zachallaun Jun 8, 2023
1289f7d
docs: note which telemetry events are http1-only
zachallaun Jun 8, 2023
d4f86b5
refactor: rename internal helper
zachallaun Jun 8, 2023
8bae1ce
chore: split out request_opt type from request_opts
zachallaun Jun 11, 2023
9ae43ef
chore: spawn linked async req process in http1 pool
zachallaun Jun 11, 2023
f89106b
refactor: simplify request_ref structure
zachallaun Jun 11, 2023
83b538a
test: test async cancelation when caller exits normally or abnormally
zachallaun Jun 11, 2023
f53e8e4
docs: additional documentation for async requests
zachallaun Jun 11, 2023
4d259b6
test: less finicky http1 pool tests
zachallaun Jun 11, 2023
aa2ced7
remove otp 26 from test matrix due to unexpected changes in ssl optio…
sneako Jun 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 59 additions & 56 deletions .github/workflows/elixir.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,59 +5,62 @@ on:
branches:
- main
jobs:
mix_test:
strategy:
fail-fast: false
matrix:
include:
- pair:
elixir: '1.7.4'
otp: '22.x'
- pair:
elixir: '1.14.1'
otp: '25.x'
lint: lint

runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v2
- uses: actions/cache@v2
with:
path: deps
key: deps-${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-${{ hashFiles('**/mix.lock') }}
restore-keys: deps-${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-
- uses: actions/cache@v2
with:
path: _build
key: build-${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-${{ hashFiles('**/mix.lock') }}
restore-keys: build-${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-
- uses: erlef/setup-beam@v1
with:
otp-version: ${{matrix.pair.otp}}
elixir-version: ${{matrix.pair.elixir}}

- name: Install Dependencies
run: mix deps.get

- run: mix format --check-formatted
if: ${{ matrix.lint }}

- run: mix deps.unlock --check-unused
if: ${{ matrix.lint }}

- run: mix deps.compile

- run: mix compile --warnings-as-errors
if: ${{ matrix.lint }}

- name: Run Credo
run: mix credo
if: ${{ matrix.lint }}

- name: Run Tests
run: mix test
if: ${{ ! matrix.lint }}

- name: Run Tests
run: mix test --warnings-as-errors
if: ${{ matrix.lint }}
mix_test:
strategy:
fail-fast: false
matrix:
include:
- pair:
elixir: '1.7.4'
otp: '22.x'
- pair:
elixir: '1.14.5'
otp: '25.x'
lint: lint
- pair:
elixir: '1.14.5'
otp: '26'

runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v2
- uses: actions/cache@v2
with:
path: deps
key: deps-${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-${{ hashFiles('**/mix.lock') }}
restore-keys: deps-${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-
- uses: actions/cache@v2
with:
path: _build
key: build-${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-${{ hashFiles('**/mix.lock') }}
restore-keys: build-${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-
- uses: erlef/setup-beam@v1
with:
otp-version: ${{matrix.pair.otp}}
elixir-version: ${{matrix.pair.elixir}}

- name: Install Dependencies
run: mix deps.get

- run: mix format --check-formatted
if: ${{ matrix.lint }}

- run: mix deps.unlock --check-unused
if: ${{ matrix.lint }}

- run: mix deps.compile

- run: mix compile --warnings-as-errors
if: ${{ matrix.lint }}

- name: Run Credo
run: mix credo
if: ${{ matrix.lint }}

- name: Run Tests
run: mix test
if: ${{ ! matrix.lint }}

- name: Run Tests
run: mix test --warnings-as-errors
if: ${{ matrix.lint }}
111 changes: 91 additions & 20 deletions lib/finch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Finch do
|> Enum.fetch!(1)

alias Finch.{PoolManager, Request, Response}
require Finch.Pool

use Supervisor

Expand Down Expand Up @@ -83,6 +84,19 @@ defmodule Finch do
"""
@type name() :: atom()

@typedoc """
Options used by request functions.
"""
@type request_opts() :: [
{:pool_timeout, pos_integer()}
| {:receive_timeout, pos_integer()}
]
zachallaun marked this conversation as resolved.
Show resolved Hide resolved

@typedoc """
The reference used to identify a request sent using `async_request/3`.
"""
@opaque request_ref() :: Finch.Pool.request_ref()

@typedoc """
The stream function given to `stream/5`.
"""
Expand Down Expand Up @@ -263,14 +277,9 @@ defmodule Finch do

## Options

* `:pool_timeout` - This timeout is applied when we check out a connection from the pool.
Default value is `5_000`.

* `:receive_timeout` - The maximum time to wait for a response before returning an error.
Default value is `15_000`.

Shares the same options as `request/3`.
"""
@spec stream(Request.t(), name(), acc, stream(acc), keyword) ::
@spec stream(Request.t(), name(), acc, stream(acc), request_opts()) ::
{:ok, acc} | {:error, Exception.t()}
when acc: term()
def stream(%Request{} = req, name, acc, fun, opts \\ []) when is_function(fun, 2) do
Expand All @@ -280,20 +289,10 @@ defmodule Finch do
end

defp __stream__(%Request{} = req, name, acc, fun, opts) when is_function(fun, 2) do
shp = build_shp(req)
{pool, pool_mod} = PoolManager.get_pool(name, shp)
{pool, pool_mod} = get_pool(req, name)
pool_mod.request(pool, req, acc, fun, opts)
end

defp build_shp(%Request{scheme: scheme, unix_socket: unix_socket})
when is_binary(unix_socket) do
{scheme, {:local, unix_socket}, 0}
end

defp build_shp(%Request{scheme: scheme, host: host, port: port}) do
{scheme, host, port}
end

@doc """
Sends an HTTP request and returns a `Finch.Response` struct.

Expand All @@ -306,7 +305,7 @@ defmodule Finch do
Default value is `15_000`.

"""
@spec request(Request.t(), name(), keyword()) ::
@spec request(Request.t(), name(), request_opts()) ::
{:ok, Response.t()}
| {:error, Exception.t()}
def request(req, name, opts \\ [])
Expand Down Expand Up @@ -351,12 +350,84 @@ defmodule Finch do

See `request/3` for more detailed information.
"""
@spec request!(Request.t(), name(), keyword()) ::
@spec request!(Request.t(), name(), request_opts()) ::
Response.t()
def request!(%Request{} = req, name, opts \\ []) do
case request(req, name, opts) do
{:ok, resp} -> resp
{:error, exception} -> raise exception
end
end

@doc """
Sends an HTTP request asynchronously, returning a request reference.

## Responses

Response information is sent to the calling process as it is received
in `{ref, response}` tuples. Responses include:

* `{:status, status}` - HTTP response status
* `{:headers, headers}` - HTTP response headers
* `{:data, data}` - section of the HTTP response body
* `{:error, exception}` - an error occurred during the request
* `:done` - request has completed successfully
zachallaun marked this conversation as resolved.
Show resolved Hide resolved

On a successful request, a single `:status` message will be followed
by a single `:headers` message, after which one or more `:data`
messages may be sent. If trailing headers are present, it is possible
that a final `:headers` message may be sent. A `:done` or an `:error`
message indicates that the request has succeeded or failed and no
further messages are expected.

## Example

iex> Stream.resource(
...> fn ->
...> Finch.build(:get, "https://httpbin.org/stream/5")
...> |> Finch.async_request(MyFinch)
...> end,
...> fn ref ->
...> receive do
...> {^ref, :done} -> {:halt, ref}
...> {^ref, response} -> {[response], ref}
...> end
...> end,
...> fn _ref -> :ok end
...> ) |> Enum.to_list()
[
{:status, 200},
{:headers, [...]},
{:data, "..."},
...
:done
]

## Options

Shares the same options as `request/3`.
"""
@spec async_request(Request.t(), name(), request_opts()) :: request_ref()
def async_request(%Request{} = req, name, opts \\ []) do
{pool, pool_mod} = get_pool(req, name)
pool_mod.async_request(pool, req, opts)
end

@doc """
Cancels a request sent with `async_request/3`.
"""
@spec cancel_async_request(request_ref()) :: :ok
def cancel_async_request(request_ref) when Finch.Pool.is_request_ref(request_ref) do
{_ref, _pool, pool_mod, _state} = request_ref
zachallaun marked this conversation as resolved.
Show resolved Hide resolved
pool_mod.cancel_async_request(request_ref)
end

defp get_pool(%Request{scheme: scheme, unix_socket: unix_socket}, name)
when is_binary(unix_socket) do
PoolManager.get_pool(name, {scheme, {:local, unix_socket}, 0})
end

defp get_pool(%Request{scheme: scheme, host: host, port: port}, name) do
PoolManager.get_pool(name, {scheme, host, port})
end
end
50 changes: 50 additions & 0 deletions lib/finch/http1/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,56 @@ defmodule Finch.HTTP1.Pool do
end
end

@impl Finch.Pool
def async_request(pool, req, opts) do
owner = self()

pid =
spawn(fn ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should link?

Suggested change
spawn(fn ->
spawn_link(fn ->

This will make sure any unexpected crashes will propagate up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we link, there may be no need to monitor, as this process we crash. Alternatively we link, trap exits, and monitor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that would definitely simplify things. I was thinking that we wouldn’t want internal crashes to take down the caller, only the caller to take down the request, but the request crashing already means the caller isn’t going to receive messages it’s probably expecting, so it’s probably better to just crash.

Copy link
Contributor Author

@zachallaun zachallaun Jun 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@josevalim See 9ae43ef -- do you have any insight into why the spawn_link process doesn't immediately terminate when the caller does? Specifically, this test was failing when I stripped out the monitoring code. I also tried adding Process.flag(:trap_exit, true) inside the anon fn and then changing process_down?(monitor) to a should_exit?() that looks for an {:EXIT, _, _} message, but didn't seem to receive one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this code:

        spawn(fn ->
          ref =
            Finch.build(:get, url <> "/stream/5/500")
            |> Finch.async_request(finch_name)

          send(outer, ref)
        end)

the spawned process immediately exits with reason :normal after send/2, which does not lead to any broken link. So by the time you send it a exit signal, it is already gone. Adding a Process.sleep(:infinity) should fix it.

It it up to you to decide if we still need monitoring. If you start something to stream asynchronously and you never consume it, what does it mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the spawned process immediately exits with reason :normal after send/2, which does not lead to any broken link. So by the time you send it a exit signal, it is already gone. Adding a Process.sleep(:infinity) should fix it.

Oh, duh! Thank you.

I’ll fix that and see how it behaves without monitoring and decide from there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Split the test to explicitly test both normal and abnormal caller exits. Decided to keep the monitoring around because I think canceling the request if there is no caller to receive it is the right thing to do. I'll document this behavior in async_request/3, so that folks know that the caller must stick around for the request to continue. I can imagine some potentially useful patterns around this behavior, e.g.

Task.async(fn ->
  some_streamed_request()
  |> Enum.find(fn
    {:data, data} -> some_chunk_im_looking_for?(data)
    _ -> false
  end)
end)

monitor = Process.monitor(owner)
request_ref = receive_next_within!(10)
zachallaun marked this conversation as resolved.
Show resolved Hide resolved

case request(pool, req, {owner, monitor, request_ref}, &send_async_response/2, opts) do
{:ok, _} -> send(owner, {request_ref, :done})
{:error, error} -> send(owner, {request_ref, {:error, error}})
end
end)

request_ref = {make_ref(), pool, __MODULE__, pid}
send(pid, request_ref)
end

defp receive_next_within!(timeout) do
receive do
value -> value
after
timeout -> raise "no message received within #{timeout}"
end
end

defp send_async_response(response, {owner, monitor, request_ref}) do
if process_down?(monitor) do
exit(:shutdown)
end

send(owner, {request_ref, response})
{owner, monitor, request_ref}
end

defp process_down?(monitor) do
receive do
sneako marked this conversation as resolved.
Show resolved Hide resolved
{:DOWN, ^monitor, _, _, _} -> true
after
0 -> false
end
end

@impl Finch.Pool
def cancel_async_request({_, _, _, pid}) do
Process.exit(pid, :shutdown)
:ok
end

@impl NimblePool
def init_pool({registry, shp, opts}) do
# Register our pool with our module name as the key. This allows the caller
Expand Down
Loading