-
Notifications
You must be signed in to change notification settings - Fork 120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HTTP1 stream #281
base: main
Are you sure you want to change the base?
HTTP1 stream #281
Conversation
holder = | ||
spawn_link(fn -> | ||
try do | ||
NimblePool.checkout!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This either needs to happen when the stream starts or you need to check later on that the process that checked out is the one that is streaming. Otherwise someone will pass the stream around to another process and it won't behave as expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise someone will pass the stream around to another process and it won't behave as expected
I've tested it and it worked as expected, I'll push the test in a sec. Take a look at it please, perhaps I've misunderstood it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be clear, it is probably best to do this lazily, if possible. As it is more flexible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will be bouncing out soon, but I meant this:
stream = Finch.actual_stream(fn -> ... end)
spawn(fn -> Enum.to_list(stream) end)
Even if this works today, because the process doing the streaming is not the one linked to, you could run into situations where the connection is never checked backed in, such as this:
stream = Finch.actual_stream(fn -> ... end)
pid = spawn(fn -> Enum.each(stream, fn _ -> Process.sleep(:infinity) end)
Process.sleep(1000)
Process.exit(pid, :shutdown)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could run into situations where the connection is never checked backed in, such as this:
Unfortunately, I don't see any way to track this without explicit links. This limitation can be reflected in the documentation. But it is generally true for any possible resourse-oriented stream, like File.stream!/3
, so
- No possible implicit solution
- True for any resouse-stream
Therefore, I wouldn't take any action except documentation for this one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so it is done against the process actually consuming the streaming.
Yeah, right, I haven't thought about it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have reviewed it and I like the new implementation a lot, it is much simpler. Great job! The only thing remaining is making the checkout late and dealing with suspensions (which is used by zip
). You have to be careful because a stream may not emit any item, be then suspended, and halted.
There are two ways you can do this: one is by moving the after
callback, aka send(holder, {ref, :stop, conn})
, to HTTP.conn. The flow of the code would be something like:
def stream(...) do
# ...
fn tagged_acc, fun ->
conn = NimblePool.checkout!(...)
HTTP1.request(tagged, fun, fn -> send(holder, {ref, :stop, conn}) end)
end
end
Moving all error flow to HTTP.conn is simpler, because suspending would then just be something like this:
def request(..., {:suspend, acc}, function, after_fun) do
{:suspended, acc, &request(..., &1, function, after_fun)}
end
In other words, streams are easier to implement if they are fully tail recursive, and not relying on try/catch and similar. Instead wrap each invocation of fun
in try/catch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've thought about it, and now we're back to the beginning where I've implemented it with Stream.resourse
, but now I also have to implement all of this suspend/halt logic plus change the request implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really. The work is almost all done. Making it tail recursive is a matter of passing an after block to the existing code and adding one single clause to handle suspend. I still think it will be less code than the original PR. :)
lib/finch/http1/conn.ex
Outdated
Telemetry.stop(:recv, start_time, metadata, extra_measurements) | ||
end | ||
|
||
send(handler, {ref, :stop, state}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see the connection being transferred back to the pool. Isn't that potentially an issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also notice that the after callback can be invoked in different situations:
- The stream failed
- Someone halted the stream
You need to recognize both scenarios accordingly. In the second one, the server can still be writing to the socket, and you need to cancel and stop that accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch, will fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the second one, the server can still be writing to the socket, and you need to cancel and stop that accordingly.
Yes, connection is closed right below the comment # In case some exception occured, we close the connection
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Halting the stream also hits this branch, afaik
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only case when connection is not closed, is when {:done, mint_ref}
is returned by Mint. But assume that after this message, server is never writing to the socket
|
||
_ -> | ||
exit(data) | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some code duplication with other functions, those should be extracted out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, but I think that DRYing should be done after everyone agrees on implementation details
{"server", "Cowboy"} | ||
]}, | ||
{:data, "OK"} | ||
] = Enum.to_list(stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to test several different scenarios:
- What happens if the stream is halted before the response finishes?
- What happens if the stream raises?
And we need to make sure the connection and the pool is still "functional" after that. The biggest concern with the implementation is that we check-in a connection with a bad state.
Btw, there may actually be a simpler implementation here. The main insight here is to have one process hold the connection and the other do the streaming. You can create any stream by returning an anonymous function that expects two arguments: the accumulator and one anonymous function. Therefore, actual stream could work like this: def actual_stream(...) do
fn fun, acc ->
conn = proxy_process_checkout!(...)
Conn.request(..., acc, fun)
end
end And that would allow us to reuse most of the infrastructure that is already in place. The differences are:
def actual_stream(...) do
fn fun, acc ->
conn = proxy_process_checkout!(...)
Conn.request(..., acc, fun, fn -> ...after... end)
end
end I think this direction would likely require fewer changes to the codebase and make sure that all request and stream implementations use the same code paths. |
All done, I've also implemented failsafe check for stream which were lost and not started |
@@ -625,6 +625,152 @@ defmodule FinchTest do | |||
end | |||
end | |||
|
|||
describe "actual_stream/3" do | |||
test "Not supported for HTTP2", %{finch_name: finch_name} do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other test names start in lowercase, we should follow the convention.
fail_safe_timeout: 100 | ||
) | ||
|
||
Process.sleep(200) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we should avoid sleeping. Can we?
ref = make_ref() | ||
|
||
holder = | ||
spawn_link(fn -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is still checking out a connection and holding to it, before the streaming starts. You need to move this inside fn tagged_acc, function ->
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is still checking out a connection and holding to it, before the streaming starts
Yeah, this is a design decision I've made on purpose.
If we perform checkout during stream start, we may end up in a situation, where stream was successfully created, but we can't iterate on it, because pool is busy and there's no free connection. And in this case, developer might be unable to retry the stream creation, because stream is data and it might've been already sent to another process and the request information can already be lost
On the other hand, current solution may lead to situations when connection was checked out, but no request was made since Stream was lost or enumeration has not started. I've implemented the fail-safe timeout for this situation specifically, to return connections to the pool
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you mean but I think this may only make things worse. You are saying that, if the system is "overloaded" (we have more requests than resources), you want to be able to retry, which is very valid, but you are also holding on to connections for long than you need, which will only make matters worse.
Ironically Finch.stream
sidesteps both of these problems, because the connection and streaming happen immediately.
If the concern is retry, we could add the ability to retry inside the stream instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sidesteps both of these problems, because the connection and streaming happen immediately.
This is both true and not. Infinite recursion bug in stream
callback will leave the connection checked-out forever, while actual_stream
solves this problem.
If the concern is retry, we could add the ability to retry inside the stream instead.
Yeah, this feels like a better solution, I agree. Check the connection on start and execute a callback or just retry with exponential backoff
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Infinite recursion bug in stream callback will leave the connection checked-out forever, while actual_stream solves this problem.
How would you have an infinite bug in the stream callback? Are you saying in case our implementation has a bug? I am not sure those are valid arguments: a bug in actual_stream
can cause connections to leak, eventually exhausting the pool and making the whole subsystem unusable. I don't think we should use it as an argument against it either, we just need to make sure to address all cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I mean something like
{:ok, stream} = Finch.actual_stream(...)
Enum.each(stream, fn _ -> infinite_loop() end)
This can happen and connection will never be returned in pool. However, server will close the socket if it's unused for a long time, but I am not sure about this, since I am unaware if Mint sends empty ACK's to keep socket open
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Sure, that can happen when consuming both types of streams. However, the fact someone can write this particular code does not justify us holding idle connections until the stream is consumed. Anyway, if we add the retry to the stream, we will be fine either way. So we can close this convo once we add retries and move checkout to the stream.
lib/finch/http1/pool.ex
Outdated
" Most likely fail_safe_timeout occured" | ||
end | ||
|
||
try do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current code is not dealing with stream suspensions. Try implementing Stream.zip(stream, stream)
. It needs to spawn two separate connections (instead of using the same one) and be able to yield out of this loop and come back in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current code is not dealing with stream suspensions
Yeah, I don't understand what suspension means, that's why I decided to use Stream.resourse
in the first place. I've read Enum
and Enumerable
documentation and haven't found any explanation of what halt and suspend mean and how to treat them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I've found it, it's in type docs. I'll try to use it, thanks
I wonder if it is possible to expose some of these internals publicly and create a stream on top of these. Say we have a LiveView and we use Finch.async_request today. My understanding is since it’s a firehose it could overflow the LiveView process to the point it’s unresponsive to handle_event and similar. If there is something like a blocking Finch.stream_next, returning something like :more/:done/etc, a LV could send a message to itself to stream next response chunk but otherwise be responsive to any other message. Or is there a selective receive for handle event somewhere to make sure it is handled first and this is mostly moot? |
Yes, I've pointed this out here. We don't need But today we already have a solution for this problem (but it copies the response twice). You just need to spawn a process doing So this PR won't solve the problem you're describing. We need to do |
@josevalim , I've implemented your suggestions, you can check the implementation. I didn't implement automatic retry though, but I think it's not a blocker. The only problem is that all failsafe timeout tests are passing until the last line and then dying with |
Thank you. I am swamped with ElixirConf, can you please remind me in 2 weeks if I did not comment before? |
@josevalim ping as requested |
This is a request for comments about initial HTTP1 streaming implementation from ideas in #236