-
Notifications
You must be signed in to change notification settings - Fork 120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for async requests #228
Conversation
There's some inconsistency between the `Finch.request/3` type spec and the `Finch.Pool.request/5` callback -- the public API is that `:error` tuples are always `{:error, Exception.t()}`, but the callback is looser (`term()`). In a couple of cases, the HTTP/2 pool was returning the string `"read_only"`.
Looking forward to this! If you don't mind me reqing this thread, here's a proof of concept of using this branch: (thanks to the enhanced Mix.install([
:req,
{:finch, github: "zachallaun/finch", branch: "async", override: true},
:bandit
])
defmodule MyPlug do
use Plug.Router
plug(:match)
plug(:dispatch)
get "/redirect" do
to = "/stream"
conn
|> Plug.Conn.put_resp_header("location", to)
|> Plug.Conn.send_resp(302, "redirecting to #{to}")
end
get "/stream" do
conn = Plug.Conn.send_chunked(conn, 200)
Enum.reduce_while(~w(a b c d), conn, fn chunk, conn ->
IO.inspect({:send, chunk})
case Plug.Conn.chunk(conn, chunk) do
{:ok, conn} ->
{:cont, conn}
{:error, :closed} ->
{:halt, conn}
end
end)
end
end
fun = fn request, finch_request, finch_name, finch_options ->
ref = Finch.async_request(finch_request, finch_name, finch_options)
receive do
{^ref, {:status, status}} ->
receive do
{^ref, {:headers, headers}} ->
body =
Stream.unfold(ref, fn ref ->
receive do
{^ref, {:data, data}} ->
IO.inspect({:recv, data})
{data, ref}
{^ref, :done} ->
nil
end
end)
{request, Req.Response.new(status: status, headers: headers, body: body)}
end
end
end
{:ok, _} = Bandit.start_link(plug: MyPlug, scheme: :http, port: 4000)
stream = Req.get!("http://localhost:4000/redirect", finch_request: fun).body
IO.inspect({:take, Enum.take(stream, 1)})
Process.sleep(1000)
IO.inspect({:take, Enum.take(stream, 10)}) Outputs:
Again just a PoC, eventually this would be behind just a Otherwise we'd go with some kind of Req.stream which is just a thin wrapper around this, users would need to handle :status, :headers, :data bits themselves, and we would NOT leverage response steps. |
Yeah servers can send Trailer headers. :| |
I don’t mind at all! While I hope that others will find good use for this feature, integration with Req is my own primary use-case :) So: yes, servers can sent trailing headers, and I also think the Mint docs say that you can get multiple headers chunks, so this is theoretically a valid series:
That said… I think there’s a way for Req to take this into consideration to allow processing in response steps. (Not doing so would be, imo, a big missed opportunity for Req!) I’m frankly guessing, but I don’t think trailing headers are all that common…? What if stream steps get a stream of PartialResponse structs? Req consumes the stream until it receives the first :data, then starts emitting partial responses containing status, all headers, and that data chunk? If more headers show up, the next partial response includes them. (Also, I’m happy discussing wherever you think is best, but maybe it makes sense to move this to the streaming issue in Req since I know there were other folks interested.) edit: just read the link about Trailer headers. Sounds like you know up front what to expect, which might make it easier to account for or build a step API around. Edit: only one headers should follow the status according to mint: https://hexdocs.pm/mint/Mint.HTTP.html#stream/2-responses |
(Unrelated to the discussion with Wojtek above) If anyone has a script that can be used for benchmarking Finch, I'd love to see it! As mentioned in the OP, I'd like to make sure that this branch is benchmarked against |
We can continue the conversation elsewhere, good call! wojtekmach/req#82 |
There are still a few things to address, I'd appreciate review at this point! Please see the OP for remaining TODOs and some additional notes. |
Wrote up a basic benchmark, would appreciate someone else taking a look at some point.
To reproduce: # in your local finch project
curl -o bench_finch.exs https://github.com/zachallaun/finch/blob/async-bench/bench_finch.exs
git checkout main
elixir bench_finch.exs
git checkout async
elixir bench_finch.exs
# set SAVE env var to overwrite saved results
SAVE=1 elixir bench_finch.exs |
Don't worry about #227, let's merge this PR first and then I can rebase that much smaller PR if necessary. I'm ok with continuing h2 telemetry updates in a subsequent PR, there is already a lot going on here and I think consumers like Req can already start playing with the api without all of the telemetry in place. |
Sounds good to me! For now, should we just add a note to the top of the
|
@sneako Do you perchance know what's going on with this ranch error? Only on OTP 26. |
Hmm, it's curious that the error is still there, but the tests are still "passing" in CI despite skipping a number of tests -- it's showing 86 tests run, but there are 90. |
I am able to reproduce the otp 26 failure locally, but haven't quite figured out the cause yet... It looks like since this error occurs in the setup_all callback, it causes all 4 tests in this file to be skipped (hence 86 tests), but it does seem like a bug that ExUnit does not consider these to be failures when that happens... |
Agreed. The test suite should be failing. A bug report/PR is welcome! |
Opened elixir-lang/elixir#12650 |
@sneako So the problem is with this line. Maybe this is something that's changed in OTP26 (there were changes to SSL), but this needs to be a list of strings. Changing it to I admit that this is somewhat beyond my understanding. If you have any suggestions about what to do with this test, I'd appreciate it! |
Didn't have too much time last night to dig in, but here is the commit that changed this behaviour in OTP: erlang/otp@cd00692#diff-c6e4e01fbddfaa6057fc43144940a337eb3db17891c62cce9ad2123e0107f776R2024 It does state: "This commit have changed error messages for a couple of error cases, and is also more restrict with checking the options." I think they do intend for |
Thanks for looking into it! Given that this is somewhat tangential to this PR, is it worth opening a new issue/PR to address it later? |
Sure! Let's just remove OTP 26 from the test matrix for now |
I’m on my phone at the moment but that sounds good to me! I’ll be able to remove it in about an hour, but you’re also welcome to push to this branch if you are so inclined! Other than the couple of TODOs that came out of this to address in separate PRs, is there anything else you’d like me to look at/improve here prior to merge? |
Thank you for this excellent work! |
Thank you so much for shepherding it through! |
Thanks @zachallaun this is a great improvement for the entire ecosystem. |
Thanks again for everyone involved! I was playing with this API in a test: Mix.install([
{:finch, github: "sneako/finch"},
:bypass
])
ExUnit.start()
defmodule MyApp.Test do
use ExUnit.Case
test "it works" do
bypass = Bypass.open()
Bypass.expect(bypass, "GET", "/", fn conn ->
conn = Plug.Conn.send_chunked(conn, 200)
stream = Stream.map(1..5, &to_string/1)
Enum.reduce_while(stream, conn, fn chunk, conn ->
Process.sleep(100)
IO.inspect(chunk, label: :send)
case Plug.Conn.chunk(conn, chunk) do
{:ok, conn} ->
{:cont, conn}
{:error, :closed} ->
{:halt, conn}
end
end)
end)
start_supervised!({Finch, name: MyFinch})
ref =
Finch.build(:get, "http://0.0.0.0:#{bypass.port}")
|> Finch.async_request(MyFinch)
receive do
{^ref, chunk} -> dbg(chunk)
end
end
end And I got this:
I think the test process exited while the request was still going but per docs:
|
Thanks for reporting! I had this issue at one point while developing the
feature but thought it was accounted for. Apparently not! I’ll look into
this more tonight.
…On Fri, Jun 16, 2023 at 4:00 PM Wojtek Mach ***@***.***> wrote:
Thanks again for everyone involved!
I was playing with this API in a test:
Mix.install([
{:finch, github: "sneako/finch"},
:bypass])
ExUnit.start()
defmodule MyApp.Test do
use ExUnit.Case
test "it works" do
bypass = Bypass.open()
Bypass.expect(bypass, "GET", "/", fn conn ->
conn = Plug.Conn.send_chunked(conn, 200)
stream = Stream.map(1..5, &to_string/1)
Enum.reduce_while(stream, conn, fn chunk, conn ->
Process.sleep(100)
IO.inspect(chunk, label: :send)
case Plug.Conn.chunk(conn, chunk) do
{:ok, conn} ->
{:cont, conn}
{:error, :closed} ->
{:halt, conn}
end
end)
end)
start_supervised!({Finch, name: MyFinch})
ref =
Finch.build(:get, "http://0.0.0.0:#{bypass.port}")
|> Finch.async_request(MyFinch)
receive do
{^ref, chunk} -> dbg(chunk)
end
endend
And I got this:
[stream.exs:39: MyApp.Test."test it works"/1]
chunk #=> {:status, 200}
1) test it works (MyApp.Test)
stream.exs:11
** (exit) shutdown
Finished in 0.05 seconds (0.01s on load, 0.00s async, 0.04s sync)
1 test, 1 failure
I think the test process exited while the request was still going but per
docs:
If the calling process exits before the request has completed, the request
will be canceled.
—
Reply to this email directly, view it on GitHub
<#228 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAD3BAVAFVXPJES5DPMLG3LXLS3OBANCNFSM6AAAAAAY2AQ5X4>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
After a bit more testing, I found that this behavior was already present before Mix.install([
{:finch, "0.16.0"},
:bypass
])
ExUnit.start()
defmodule MyApp.Test do
use ExUnit.Case
test "it works" do
bypass = Bypass.open()
Bypass.expect(bypass, "GET", "/", fn conn ->
conn = Plug.Conn.send_chunked(conn, 200)
stream = Stream.map(1..5, &to_string/1)
Enum.reduce_while(stream, conn, fn chunk, conn ->
Process.sleep(100)
IO.inspect(chunk, label: :send)
case Plug.Conn.chunk(conn, chunk) do
{:ok, conn} ->
{:cont, conn}
{:error, :closed} ->
{:halt, conn}
end
end)
end)
start_supervised!({Finch, name: MyFinch})
outer = self()
spawn_link(fn ->
Finch.build(:get, "http://0.0.0.0:#{bypass.port}")
|> Finch.stream(MyFinch, nil, fn chunk, _ ->
send(outer, chunk)
end)
end)
receive do
chunk -> dbg(chunk)
end
end
end Running this:
I believe the Bypass instance is receiving a We can verify this by copying what happens in Mix.install([
{:finch, "0.16.0"},
:bypass
])
ExUnit.start()
defmodule MyApp.Test do
use ExUnit.Case
test "it works" do
{:ok, pid} = DynamicSupervisor.start_child(Bypass.Supervisor, Bypass.Instance.child_spec([]))
port = Bypass.Instance.call(pid, :port)
bypass = %Bypass{pid: pid, port: port}
on_exit(fn ->
dbg(Bypass.Instance.call(bypass.pid, :on_exit))
end)
Bypass.expect(bypass, "GET", "/", fn conn ->
conn = Plug.Conn.send_chunked(conn, 200)
stream = Stream.map(1..5, &to_string/1)
Enum.reduce_while(stream, conn, fn chunk, conn ->
Process.sleep(100)
IO.inspect(chunk, label: :send)
case Plug.Conn.chunk(conn, chunk) do
{:ok, conn} ->
{:cont, conn}
{:error, :closed} ->
{:halt, conn}
end
end)
end)
start_supervised!({Finch, name: MyFinch})
outer = self()
spawn_link(fn ->
Finch.build(:get, "http://0.0.0.0:#{bypass.port}")
|> Finch.stream(MyFinch, nil, fn chunk, _ ->
send(outer, chunk)
end)
end)
receive do
chunk -> dbg(chunk)
end
end
end
I think the Plug.Cowboy test server being shut down early when the test exits (the Bypass instance monitors it here and saves the exit as a result here). Overall, I'm not sure that there's anything to be done in Finch. This sort of feels like a design flaw of Bypass with regards to streaming responses that may not complete. |
Oh wow, excellent research. Sorry about that! |
Nothing to apologize for! If you run into anything else that seems weird while testing it out, please ping me. |
See #208 for context and past discussion.
Todo
Finch.HTTP2.Pool
- fix significant duplication between:request
call and:async_request
cast (fixed by 69d37d6)Finch.HTTP2.Pool
- send errors that are currently sent as:gen_statem
replies incontinue_requests
(fixed by 69d37d6)Finch.HTTP2.Pool
- add tests for:connected_read_only
and:disconnected
stateFinch.HTTP2.Pool
- add tests for all handled error conditionsFinch.HTTP2.Pool
- monitor caller process and cancel when caller exitsmain
to ensure no performance regressions for sync requestsFinch.HTTP2.Pool
-emit telemetry events(this will be addressed in a separate PR)To facilitate review, I'll try to list changes here that affect non-async requests as well so that they can be given careful consideration.
:error
tuples that didn't match the public interface in certain cases, returning{:error, "read_only"}
. In these cases, it now returns{:error, Finch.Error.exception(:read_only)}
.:gen_statem
underlying the HTTP/2 pool no longer uses:reply
actions to respond to callers, but:gen_statem.reply/2
instead. This allows the same code-path either reply orsend
depending on whether the request was sync or async (see here).%RequestStream{}
struct was accumulating state that was unrelated to streaming the request. This change simplifies the struct to only contain information required for streaming the body and stores it in a map of other state that the pool needs to maintain per request.