Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change close semantics for fixed-length sources #239

Closed
wants to merge 3 commits into from
Closed

change close semantics for fixed-length sources #239

wants to merge 3 commits into from

Conversation

seliopou
Copy link
Contributor

@seliopou seliopou commented May 5, 2016

This pull request includes a change to the close semantics of fixed-length sources that was previously included in #223, but requires further discussion. An extensive discussion in IRC took place between myself and @aantron, which can be found here.

In summary, changing of_list and of_array to be closed by default is likely an acceptable choice, but the library should accommodate similar functionality for other sort of structures. Right now, this could be accomplished with a function of the following form:

let init (k:('a -> unit) -> unit) : 'a Lwt_stream.t =
  let stream, push = Lwt_stream.create () in
  k (fun x -> push (Some x));
  k None;
  stream

However, that may not be at all obvious to users.

The motivation for this change is to allow libraries that consume a user-constructed stream to be able to determine the blocking behavior of a stream and use appropriate serialization strategies. As of now, there is no way in general to determine whether a stream will block in order to generate its elements, or if all its elements have been generated and can be read immediately. In the case of of_list and of_array, this is information is known when the constructors are called, but is then discarded. This pull request would not discard that information.

Depends on #223.

@aantron
Copy link
Collaborator

aantron commented May 20, 2016

I would like more people to review this proposal, if willing. Comparisons with, and experience from, Async are welcome.

What this change does

Relevant commit: 6bc3263

This change "eagerly" pre-allocates the whole internal stream queue for streams created with of_list, of_array, and of_string. The resulting streams are therefore closed (is_closed is true), and, indeed, reading from the resulting streams cannot result in blocking (or further memory allocation).

The existing functions are "lazy," in the sense that if you do of_list l, the list's spine is not loaded into the stream's queue. The spine is loaded piecemeal when (if) the stream is read, and does not have to be entirely retained in memory unless the stream is cloned.

So, there is a performance effect.

As I understand it, this change would simplify some part of Cohttp specifically.

What the obstacle is

TL;DR: The proposed change is not general enough.

For example, it seems wrong that these will evaluate to true and false, respectively:

Lwt_stream.is_closed @@ Lwt_stream.of_list @@ List.map (fun x -> x) l
Lwt_stream.is_closed @@ Lwt_stream.map (fun x -> x) @@ Lwt_stream.of_list l

And, as mentioned, it seems wrong that of_list, of_array, of_string have special "eager" semantics that are different from the rest of the "lazy" interface, and which are not "obviously" achievable for other finite non-blocking sources.

The change harms the regularity, and teachability, of Lwt's interface. However, it can be acceptable as is, if we can see it as "on the way" to a larger change that restores regularity. We don't have to implement the larger change – just, if someone requests, e.g., mapping closed streams, I don't want Lwt to be stuck with a wonky interface that is hard to get away from, and was a hack for Cohttp. If we can't come up with a nice hypothetical interface, then it seems code should be added to Cohttp, instead of Lwt, to give special treatment to list and array stream sources – if that is possible.

How to make it more general

  • Combinators like map, filter, etc., (but not map_p, etc.) that cannot introduce (Lwt) blocking, and cannot make a stream infinite, can be changed to carry the closed status of the argument stream to the result stream.
  • However, the way closed status is currently implemented, this entails pre-allocating stream spines and actually calling the function arguments each time a combinator is applied to a closed stream. Perhaps what we really need is some status other than is_closed, that doesn't conflate reaching the end of the stream with knowing that the end can be reached without blocking?
  • Some function, or documentation, should be provided, for constructing a closed (or finite non-blocking) stream from a finite non-blocking source that is not a list, array, or string.
  • Perhaps some function should also be provided, that can take any (finite) stream and convert it into a closed stream, by reading all the elements from a clone.
  • Other ideas welcome.

@ghost
Copy link

ghost commented May 23, 2016

I looked at Async pipes and they do what is proposed in this PR. i.e. Pipe.of_list will copy in the whole list and return a closed pipe reader. At the same time, Async.Pipe does a lot of things eagerly. For instance Pipe.map pipe ~f will start reading from pipe immediately and will produce elements on the resulting pipe before the user starts asking for it.

What I had in mind when I first wrote Lwt_stream was to implement something similar to Stream/Batteries.Enum for Lwt. That's why everything is lazy.

I heard several times that people prefer Async pipes to Lwt streams, so maybe this PR is a step in the right direction. At the same time I'd be slightly worried as it changes the semantic without notice for users, which might lead to hard to debug problems. Another solution would be to add a Lwt_pipe module to Lwt. Speaking of which I just saw that this: https://github.com/c-cube/lwt-pipe

@aantron
Copy link
Collaborator

aantron commented May 23, 2016

Thanks @diml!

I heard several times that people prefer Async pipes to Lwt streams

Do you remember the reason(s), or perhaps there are links somewhere?

At the same time, Async.Pipe does a lot of things eagerly. For instance Pipe.map pipe ~f will start reading from pipe immediately and will produce elements on the resulting pipe before the user starts asking for it.

This behavior of map and of_list in Async pipes is coherent. I would like to make sure that this PR doesn't put Lwt_stream in a state where map (and friends) and of_list are incoherent, and which can't be easily fixed. What do you think about the issue of

Lwt_stream.is_closed @@ Lwt_stream.of_list @@ List.map (fun x -> x) l Lwt_stream.is_closed @@ Lwt_stream.map (fun x -> x) @@ Lwt_stream.of_list l

in particular?

@aantron
Copy link
Collaborator

aantron commented May 23, 2016

Ok, I see two ways forward with this PR, and I prefer (2).

  1. IMO changing the actual semantics between eager and lazy for only of_list is a tricky issue. We could redesign the interface of Lwt_stream to deal with that, but we don't have the labor resources or feedback available to do it properly now. Other than that, I have no inherent problem with changing things, apart of course from not harming other users.
  2. I don't think @seliopou needs eager semantics, as much as just to know how a stream was created. So, @seliopou can amend this PR to introduce a new function, say, is_direct_and_finite (please try to come up with a better name, though :)). Add a bool field to each stream's representation, such that if a stream was created by of_list, etc., that field is set. For a more complete implementation, make it a bool ref field, and make it so that combinators like map create streams that share this field with the underlying stream. Then, if a stream becomes closed, set the ref to true. This will give @seliopou the information he needs in Cohttp, without messing with the semantics of the interface, and gives Lwt a somewhat coherent function that is easy to deprecate or reimplement later.

@seliopou I hope (2) is suitable for what you need. If you agree, please make the history of this PR into one commit with the implementation, optionally with tests (in the same commit). I would also appreciate if you capitalized the first letter of the message for consistency with recent history. EDIT: and please rebase over master.

@aantron aantron mentioned this pull request May 23, 2016
@seliopou
Copy link
Contributor Author

seliopou commented May 24, 2016

For example, it seems wrong that these will evaluate to true and false, respectively:

Lwt_stream.is_closed @@ Lwt_stream.of_list @@ List.map (fun x -> x) l
Lwt_stream.is_closed @@ Lwt_stream.map (fun x -> x) @@ Lwt_stream.of_list l

There is no reason to expect that those two operations should commute when lifted to a monad. As @diml pointed out, Pipe.of_list l does create a closed list. However, performing a Pipe.map on the result does not preserve its closed state. The following example:

let main () =
  let p1 = Pipe.of_list [1; 2; 3] in
  print_endline (string_of_bool (Pipe.is_closed p1));
  let p2 = Pipe.map p1 ~f:(fun x -> x + 1) in
  print_endline (string_of_bool (Pipe.is_closed p2));
  Clock.after (Time.Span.of_sec 1.0) >>> fun () ->
    print_endline (string_of_bool (Pipe.is_closed p2));;

...will produce the output:

true
false
false

If you change the Pipe.map to Pipe.map', then the output becomes:

true
false
true

In the first case, it does not report a closed state unless values in the pipe are consumed. This is because map respects pushback caused by p2's queue size, which is by default 0. In the second case, map' reads all queued elements from p1 and writes them all to p2 at once, thus closing p1 and resulting p2. In both cases, none of this happens synchronously (indicated by the second line of output).

Fundamentally, the debate is about whether of_list and related functions should be consumer-driven, or producer-driven. The current implementation makes them consumer-driven. This pull request would make them producer-driven. This change does not harm the regularity or teachability of lwt's interface, because lwt already supports both consumer- and producer-driven streams. Streams constructed via create and create_bounded are producer-driven, whereas streams made with from and from_direct are consumer-driven. The consumer/producer distinction is not about when internal structures are allocated, but when the user-defined computation occurs that defines the elements that end up in a stream's internal queue. For of_list and related functions, that computations is already done, suggesting that the resultant stream should be producer-driven.

An equally-valid implementation of of_list would be (consider this pseudocode as it has not been compiled):

let of_list l =
  let stream, push = Lwt_stream.create () in
  List.iter (fun x -> push (Some x)) l;
  push None;
  stream

Using from_fixed as the implementation currently does is confusing because it seems to imply that this should be consumer-driven, but it shouldn't. The only reason that they were implemented that way was because I was seeking to minimize the size of the change. A principle that I hold myself to while contributing to others' open-source projects.

I don't think that option (2) is the right way to go because it leaves the interface to this module in a worse state than it was before, both to understand, to teach, and to maintain in the future.

@aantron
Copy link
Collaborator

aantron commented May 24, 2016

I disagree with several statements above, but this paraphrase convinced me: "of_list can be interpreted as creating a push-stream, and Lwt_stream already has those." That push-streams seem a bit second-class is I suppose a way to interpret my previous concerns, but it is a separate matter.

Perhaps the docs for of_list (and friends) should say something along the lines of "...creates a push-stream and pushes all elements of l, resulting in a closed stream..." regardless of how it's implemented. Your current implementation is fine.

Please squash along the lines mentioned above. If you'd rather have me squash, let me know. I will post some branch and you can review and replace this PR with it.

Previously, the of_list, of_array, and of_string constructors each would
produce a stream s for which

  is_closed s = false

This is counter-intuitive as all the elements of the stream s have been
generated and should be consumable without blocking. This commit changes
the behavior of this constructors such that

  is_closed s = true

immediately upon creation.
@seliopou
Copy link
Contributor Author

seliopou commented Jun 2, 2016

This is ready to be merged.

let b = ref false in
let is_closed_in_notification = ref false in
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be removed. I think is_closed should be true inside threads waiting on closed, and it is conceivable that some future implementation might get that wrong.

Attaching a termination callback to an already closed stream should
result in the callback being invoked.
The test now consists of checking that the stream is not closed when
initially constructed, and that once all its elements are consumed, the
closed thread has returned, and the stream in fact has been closed.
let b6 = Lwt_stream.(is_closed (of_string "123")) in
let b7 = Lwt_stream.(is_closed (from_direct (fun () -> Some 1))) in
let b8 = Lwt_stream.(is_closed (from_direct (fun () -> None))) in
return (b1 && b2 && b3 && b4 && b5 && b6 && not b7 && not b8));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing this no longer tests, is that streams for which is_closed is false can be turned into streams for which it is true, by reading them.

@aantron
Copy link
Collaborator

aantron commented Jun 19, 2016

Apart from the nit above, looks good. Please squash (in part to avoid twiddling some of the lines in test_lwt_stream). If you like, I can do it, and post the squashed branch for you to review.

@seliopou
Copy link
Contributor Author

I will not be spending any more time on this pull request.

@aantron aantron closed this Jun 21, 2016
@seliopou
Copy link
Contributor Author

Childish.

@aantron
Copy link
Collaborator

aantron commented Jun 27, 2016

Please either update your PR so that

  • the test is not testing less than it did before, as explained above,
  • blame history is not interrupted for the entire test that this PR needlessly erases and then restores – leave the actual modification,

or allow me, as suggested, to cherry-pick or make amendments, subject to your review – which I am quite willing to do.

I can/will make the suggested documentation edit.

Technical objections and discussion welcome.

@seliopou
Copy link
Contributor Author

If I were revoking my contribution, I would have closed the PR myself with a note stating as much. As did already state, I'm not going to commit any more effort to getting these merged. Merge if you wish, however you wish, as long as attribution is retained.

@aantron
Copy link
Collaborator

aantron commented Jun 27, 2016

If I were revoking my contribution, I would have closed the PR myself with a note stating as much.

Thanks, that's very thoughtful.

I hope you don't think that I closed your PR because I thought you were revoking your contribution. I closed it because it is not suitable for merging as is. So, without your explicit permission to rewrite, and with you unwilling to amend, there was nothing further to do. I thought that was clear.

I am the more so careful about your permission, since you have expressed to me, on IRC in the past, that you dislike PR history being rewritten under any circumstances. So, thanks for making yourself clear in this instance.

Merge if you wish, however you wish, as long as attribution is retained.

I'll take a look later at whether I can amend this in such a way, that it remains easy to use with both blame and bisect, and still retain exact attribution. I will try to avoid it, but I may have to tweak a few lines in the is_closed test due to what the patch does to it – would you allow that?

For comparison, your commits, as they are now, would make several lines written by me look in blame as if they were written by you, though they haven't changed at all (and it wouldn't be easy to trace them back to their original insertion due to them being deleted and re-inserted). The authorship part of that doesn't bother me, and I hope you can offer a similar level of flexibility in this situation.

@aantron aantron modified the milestone: Lwt_stream Jun 29, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants