Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add is_closed and closed operations to Lwt_stream #223

Closed
wants to merge 10 commits into from
67 changes: 33 additions & 34 deletions src/core/lwt_stream.ml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ type 'a source =
type 'a t = {
source : 'a source;
(* The source of the stream. *)
closed : unit Lwt.t;
(* A thread that sleeps until the stream is closed. *)
mutable node : 'a node;
(* Pointer to first pending element, or to [last] if there is no
pending element. *)
Expand Down Expand Up @@ -130,28 +132,31 @@ let clone s =
| _ -> ());
{
source = s.source;
closed = s.closed;
node = s.node;
last = s.last;
hooks = s.hooks;
}

let from f =
let from_source source =
let last = new_node () in
{
source = From { from_create = f; from_thread = Lwt.return_unit };
node = last;
last = ref last;
hooks = ref [];
let closed, closed_w = Lwt.wait () in
let mark_closed () =
if not (Lwt.is_sleeping closed) then Lwt.wakeup closed_w () in
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be clearer to explicitly wakeup the closed thread at the end of the stream rather than rely on the execution of this hook. Plus with the current code, I think you get Lwt_stream.is_closed stream = false inside on_termination callbacks given that this hook is executed last. Maybe that was the indented behavior? In any case it'd be good to have tests for this in tests/core/test_lwt_stream.ml.

If you want to avoid storing both the Lwt.t and the Lwt.u inside the Lst_stream.t, you can store only the latter and define closed as follow, which is a no-op:

let closed s = Lwt.waiter_of_wakener s.closed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How I respond to this comment depends on how—if at all—the semantics of on_termination will change in this pull request, so I'll hold off for now.

{ source = source
; closed = closed
; node = last
; last = ref last
; hooks = ref [mark_closed]
}

let from f =
from_source (From { from_create = f; from_thread = Lwt.return_unit })

let from_direct f =
let last = new_node () in
{
source = From_direct f;
node = last;
last = ref last;
hooks = ref [];
}
let t = from_source (From_direct f) in
List.iter (fun f -> f ()) !(t.hooks);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this line, why do you run the termination hooks straight after creating the stream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of a direct source, the stream is terminated upon creation. While the user doesn't have an opportunity to register any termination callbacks, the module itself may have via the from_source constructor. And indeed it currently does do just that to handle the is_closed functionality that this pull request adds (your comment above).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was reading the code through again and realized that this line should indeed not be there.

t

let on_termination s f =
s.hooks := f :: !(s.hooks)
Expand Down Expand Up @@ -191,8 +196,6 @@ let of_string s =
end)

let create_with_reference () =
(* Create the cell pointing to the end of the queue. *)
let last = ref (new_node ()) in
(* Create the source for notifications of new elements. *)
let source, wakener_cell =
let waiter, wakener = Lwt.wait () in
Expand All @@ -201,13 +204,14 @@ let create_with_reference () =
push_external = Obj.repr () },
ref wakener)
in
(* Set to [true] when the end-of-stream is sent. *)
let closed = ref false in
let hooks = ref [] in
let t = from_source (Push source) in
(* [push] should not close over [t] so that it can be garbage collected even
* there are still references to [push]. Unpack all the components of [t]
* that [push] needs and reference those identifiers instead. *)
let closed = t.closed and last = t.last and hooks = t.hooks in
(* The push function. It does not keep a reference to the stream. *)
let push x =
if !closed then raise Closed;
if x = None then closed := true;
if not (Lwt.is_sleeping closed) then raise Closed;
(* Push the element at the end of the queue. *)
let node = !last and new_last = new_node () in
node.data <- x;
Expand All @@ -229,12 +233,7 @@ let create_with_reference () =
exception. *)
if x = None then List.iter (fun f -> f ()) !hooks
in
({ source = Push source;
node = !last;
last = last;
hooks = hooks },
push,
fun x -> source.push_external <- Obj.repr x)
(t, push, fun x -> source.push_external <- Obj.repr x)

let create () =
let source, push, _ = create_with_reference () in
Expand Down Expand Up @@ -352,9 +351,6 @@ end

let create_bounded size =
if size < 0 then invalid_arg "Lwt_stream.create_bounded";
(* Create the cell pointing to the end of the queue. *)
let last = ref (new_node ()) in
let hooks = ref [] in
(* Create the source for notifications of new elements. *)
let info, wakener_cell =
let waiter, wakener = Lwt.wait () in
Expand All @@ -369,11 +365,8 @@ let create_bounded size =
pushb_external = Obj.repr () },
ref wakener)
in
({ source = Push_bounded info;
node = !last;
last = last;
hooks = hooks },
new bounded_push_impl info wakener_cell last hooks)
let t = from_source (Push_bounded info) in
(t, new bounded_push_impl info wakener_cell t.last t.hooks)

(* Wait for a new element to be added to the queue of pending element
of the stream. *)
Expand Down Expand Up @@ -736,6 +729,12 @@ let rec is_empty s =
else
Lwt.return (s.node.data = None)

let is_closed s =
not (Lwt.is_sleeping s.closed)

let closed s =
s.closed

let map f s =
from (fun () -> get s >|= function
| Some x ->
Expand Down
16 changes: 12 additions & 4 deletions src/core/lwt_stream.mli
Original file line number Diff line number Diff line change
Expand Up @@ -215,22 +215,30 @@ val junk_old : 'a t -> unit Lwt.t

val get_available : 'a t -> 'a list
(** [get_available st] returns all available elements of [l] without
blocking *)
blocking. *)

val get_available_up_to : int -> 'a t -> 'a list
(** [get_available_up_to n st] returns up to [n] elements of [l]
without blocking *)
without blocking. *)

val is_empty : 'a t -> bool Lwt.t
(** [is_empty st] returns wether the given stream is empty *)
(** [is_empty st] returns whether the given stream is empty. *)

val is_closed : 'a t -> bool
(** [is_closed st] returns whether the given stream has been closed. Even if
the stream is closed, it still may contain elements waiting to be read. *)

val closed : 'a t -> unit Lwt.t
(** [closed st] returns a thread that will sleep until the stream has been
closed. *)

val on_termination : 'a t -> (unit -> unit) -> unit
(** [on_termination st f] executes [f] when the end of the stream [st]
is reached. Note that the stream may still contains elements if
{!peek} or similar was used. *)

val on_terminate : 'a t -> (unit -> unit) -> unit
(* Deprecated, use [on_termination] *)
(* Deprecated, use [on_termination]. *)

(** {2 Stream transversal} *)

Expand Down