-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add is_closed and closed operations to Lwt_stream #223
Conversation
Before this commit the Lwt_stream API provided no reliable way to detect if a stream had been closed. While on_termination notifies the user if the stream is closed in the future, it does not tell the user if the stream was closed in the past. Introducing the is_closed function addresses this problem, allowing the user to detect if a stream has been closed at any point in time.
This function returns a thread that will sleep until the stream is closed. The thread can be used in place of if_closed and on_terminate to detect when a stream is closed at any point in time.
Shouldn't |
I would expect @diml As original author of I checked depending OPAM packages. It seems that only |
hooks = ref []; | ||
let closed, closed_w = Lwt.wait () in | ||
let mark_closed () = | ||
if not (Lwt.is_sleeping closed) then Lwt.wakeup closed_w () in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be clearer to explicitly wakeup the closed
thread at the end of the stream rather than rely on the execution of this hook. Plus with the current code, I think you get Lwt_stream.is_closed stream = false
inside on_termination
callbacks given that this hook is executed last. Maybe that was the indented behavior? In any case it'd be good to have tests for this in tests/core/test_lwt_stream.ml
.
If you want to avoid storing both the Lwt.t
and the Lwt.u
inside the Lst_stream.t
, you can store only the latter and define closed
as follow, which is a no-op:
let closed s = Lwt.waiter_of_wakener s.closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How I respond to this comment depends on how—if at all—the semantics of on_termination
will change in this pull request, so I'll hold off for now.
The proposed change in behavior for @rneswold, when you write Lwt code, you implicitly make the assumption that the code is running on a single system thread. You should definitely not try to use the same |
The hooks will be called at the appropriate time once `f` finally returns `None`.
The reason I did not modify |
I think you should change the semantics. If we need to do a major version because of it, we will. There are also several other changes that can be included at that point. This could also be considered a bug fix. The documentation makes no claims about the behavior of |
If it's the case that there's no coyness about releasing new major versions (which would be great!), and that there's agreement that |
I agree with @seliopou. I can't remember why I added |
I also agree, |
Then barring any new issues that have yet to surface, I will commit a patch to this branch that includes I am interested in this feature solely for use in cohttp, so @rgrinberg if you would like any assistance, let me know in the usual places. |
I agree with is_closed being better than Why remove |
I think we mean to remove (first deprecate) both of them. |
I was speaking of them collectively. Both of them should get the boot. I'll edit my post to refer to |
OK cool. Kill 'em both. |
As per the discussion on the issue tracker, Lwt_stream.on_termination will be deprecated and ultimately removed in favor for Lwt_stream.closed. To facilitate that, this commit introduces a new implementation of Lwt_stream.closed that does not rely on termination hooks, and implementation Lwt_stream.on_termination in terms of Lwt_Stream.closed.
The Lwt_stream module had multiple copies of low-level enqueueing code of two varieties copied in serveral places.This commit moves one variety of enqueing code into a single function called enqueue', and implements the second variety as the function enqueue, in terms of enqueue'.
Previously, the of_list, of_array, and of_string constructors each would produce a stream s for which is_closed s = false This is counter-intuitive as all the elements of the stream s have been generated and should be consumable without blocking. This commit changes the behavior of this constructors such that is_closed s = true immediately upon creation.
This should be ready to go. The last commit may be controversial, but from my perspective is the sensible way to treat those sorts of steams. So please review and discuss if necessary. |
@seliopou, AFAICT your last commit in this PR notionally "restores" semantics to |
You are correct, however in general streams constructed by |
Indeed, the difference is in which side effects you can observe after marking a stream "closed" early. With It's not clear to me that either The documentation for It also seems odd to me that streams created with |
Point of clarification: |
Oops, understood. Perhaps we can do this, but why these special cases? Streams may contain a finite number of elements and not block readers, without being closed, in general. |
It's not just that the stream length is finite, but that length, and each element that contributes to that length, is known by the constructor. No computation needs to be done in order to generate the streams, whereas using You could get the same behavior for some type in a module let to_stream t =
let st = Lwt_stream.from_direct (M.to_generator t) in
ignore (Lwt_stream.npeek (M.length t) st);
assert (Lwt_stream.is_closed st);
st The code doesn't take this approach in It's also worth noting that this is the same behavior that |
But computation does need to be done to generate the streams, at the least memory allocation, because Lwt's For comparison, Async's I'm open to proposals for changing the implementation TL;DR I don't think this is suitable for a lazy |
Clones share queues, but walk them independently, so I don't think your observation is correct. It's the same behavior. |
Precisely because they share queues and walk them independently, the entire queue will have to be retained in memory as a "copy" of the large list. Is that correct? |
let feed_direct f last close = | ||
let x = f () in | ||
(* Push the element to the end of the queue. *) | ||
enqueue' x last; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aantron this is pushing elements onto the queue. feed_direct
is called in a loop in from_fixed
.
Thank you, saw |
This reverts commit 47cd91f.
Previously, the
Lwt_stream
API provided no reliable way to detect if a stream had been closed. While on_termination notifies the user if the stream is closed in the future, it does not tell the user if the stream was closed in the past.Introducing the
is_closed
function addresses this problem, allowing the user to detect if a stream has been closed at any point in time using code like the following:Alternative to this pattern, one could use the
Lwt_stream.close
operation to get a thread that will sleep until the stream is closed: