Skip to content

Commit

Permalink
change close semantics for fixed-length sources
Browse files Browse the repository at this point in the history
Previously, the of_list, of_array, and of_string constructors each would
produce a stream s for which

  is_closed s = false

This is counter-intuitive as all the elements of the stream s have been
generated and should be consumable without blocking. This commit changes
the behavior of this constructors such that

  is_closed s = true

immediately upon creation.
  • Loading branch information
seliopou committed May 3, 2016
1 parent b9aed80 commit 47cd91f
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 16 deletions.
29 changes: 22 additions & 7 deletions src/core/lwt_stream.ml
Original file line number Diff line number Diff line change
Expand Up @@ -170,17 +170,35 @@ let enqueue' e last =
let enqueue e s =
enqueue' e s.last

let feed_direct f last close =
let x = f () in
(* Push the element to the end of the queue. *)
enqueue' x last;
if x = None then Lwt.wakeup close ();
;;

(* Create a stream from a fixed-length direct source, e.g., lists, arrays, and
strings. This constructor will call the genterator function until
termination, producing a closed stream with all the elements it will provide
in the internal queue. *)
let from_fixed f =
let s = from_direct f in
while not (is_closed s) do
feed_direct f s.last s.close
done;
s

let of_list l =
let l = ref l in
from_direct
from_fixed
(fun () ->
match !l with
| [] -> None
| x :: l' -> l := l'; Some x)

let of_array a =
let len = Array.length a and i = ref 0 in
from_direct
from_fixed
(fun () ->
if !i = len then
None
Expand All @@ -192,7 +210,7 @@ let of_array a =

let of_string s =
let len = String.length s and i = ref 0 in
from_direct
from_fixed
(fun () ->
if !i = len then
None
Expand Down Expand Up @@ -389,10 +407,7 @@ let feed s =
Lwt.protected thread
end
| From_direct f ->
let x = f () in
(* Push the element to the end of the queue. *)
enqueue x s;
if x = None then Lwt.wakeup s.close ();
feed_direct f s.last s.close;
Lwt.return_unit
| Push push ->
push.push_waiting <- true;
Expand Down
28 changes: 19 additions & 9 deletions tests/core/test_lwt_stream.ml
Original file line number Diff line number Diff line change
Expand Up @@ -292,20 +292,30 @@ let suite = suite "lwt_stream" [
Lwt_stream.to_list (Lwt_stream.map_exn stream) >>= fun l' ->
return (l = l'));

test "on_termination"

test "closed"
(fun () ->
let st = Lwt_stream.of_list [1; 2] in
let b1 = Lwt_stream.(is_closed (of_list [])) in
let b2 = Lwt_stream.(is_closed (of_list [1;2;3])) in
let b3 = Lwt_stream.(is_closed (of_array [||])) in
let b4 = Lwt_stream.(is_closed (of_array [|1;2;3;|])) in
let b5 = Lwt_stream.(is_closed (of_string "")) in
let b6 = Lwt_stream.(is_closed (of_string "123")) in
let b7 = Lwt_stream.(is_closed (from_direct (fun () -> Some 1))) in
let b8 = Lwt_stream.(is_closed (from_direct (fun () -> None))) in
let st = Lwt_stream.from_direct (
let value = ref (Some 1) in
fun () -> let r = !value in value := None; r)
in
let b = ref false in
Lwt_stream.on_termination st (fun () -> b := true);
ignore (Lwt_stream.peek st);
let b1 = !b = false in
ignore (Lwt_stream.junk st);
Lwt.async (fun () -> Lwt_stream.closed st >|= fun () -> b := true);
ignore (Lwt_stream.peek st);
let b2 = !b = false in
let b9 = !b = false in
ignore (Lwt_stream.junk st);
ignore (Lwt_stream.peek st);
let b3 = !b = true in
Lwt.return (b1 && b2 && b3));
let b10 = !b = true in
let b11 = Lwt_stream.is_closed st in
return (b1 && b2 && b3 && b4 && b5 && b6 && not b7 && not b8 && b9 && b10 && b11));

test "choose_exhausted"
(fun () ->
Expand Down

0 comments on commit 47cd91f

Please sign in to comment.