From 4f79c0ed9354f5c80ed1f8680b49ce77d78769f2 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Sun, 11 Dec 2022 10:06:29 +0000 Subject: [PATCH] Add takeWhile, takeWhileInclusive --- src/FSharp.Control.TaskSeq/TaskSeq.fs | 4 ++ src/FSharp.Control.TaskSeq/TaskSeq.fsi | 30 +++++++++++ src/FSharp.Control.TaskSeq/TaskSeqInternal.fs | 51 +++++++++++++++++++ 3 files changed, 85 insertions(+) diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fs b/src/FSharp.Control.TaskSeq/TaskSeq.fs index 05f5313c..1643ac52 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fs @@ -253,6 +253,10 @@ module TaskSeq = let chooseAsync chooser source = Internal.choose (TryPickAsync chooser) source let filter predicate source = Internal.filter (Predicate predicate) source let filterAsync predicate source = Internal.filter (PredicateAsync predicate) source + let takeWhile predicate source = Internal.takeWhile (Predicate predicate) source + let takeWhileAsync predicate source = Internal.takeWhile (PredicateAsync predicate) source + let takeWhile predicate source = Internal.takeWhileInclusive (Predicate predicate) source + let takeWhileAsync predicate source = Internal.takeWhileInclusive (PredicateAsync predicate) source let tryPick chooser source = Internal.tryPick (TryPick chooser) source let tryPickAsync chooser source = Internal.tryPick (TryPickAsync chooser) source let tryFind predicate source = Internal.tryFind (Predicate predicate) source diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fsi b/src/FSharp.Control.TaskSeq/TaskSeq.fsi index 1f0f1497..0a4cfc59 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fsi +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fsi @@ -365,6 +365,36 @@ module TaskSeq = /// val filter: predicate: ('T -> bool) -> source: taskSeq<'T> -> taskSeq<'T> + /// + /// Yields items from the source while the function returns . + /// The first result concludes consumption of the source. + /// If is asynchronous, consider using . + /// + val takeWhile: predicate: ('T -> bool) -> source: taskSeq<'T> -> taskSeq<'T> + + /// + /// Yields items from the source while the asynchronous function returns . + /// The first result concludes consumption of the source. + /// If does not need to be asynchronous, consider using . + /// + val takeWhileAsync: predicate: ('T -> #Task) -> source: taskSeq<'T> -> taskSeq<'T> + + /// + /// Yields items from the source while the function returns . + /// The first result concludes consumption of the source, but is included in the result. + /// If is asynchronous, consider using . + /// If the final item is not desired, consider using . + /// + val takeWhileInclusive: predicate: ('T -> bool) -> source: taskSeq<'T> -> taskSeq<'T> + + /// + /// Yields items from the source while the asynchronous function returns . + /// The first result concludes consumption of the source, but is included in the result. + /// If does not need to be asynchronous, consider using . + /// If the final item is not desired, consider using . + /// + val takeWhileInclusiveAsync: predicate: ('T -> #Task) -> source: taskSeq<'T> -> taskSeq<'T> + /// /// Returns a new collection containing only the elements of the collection /// for which the given asynchronous function returns . diff --git a/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs b/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs index 8f7446ef..62f2884a 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs @@ -531,6 +531,57 @@ module internal TaskSeqInternal = | true -> yield item | false -> () } + + let takeWhile predicate (source: taskSeq<_>) = taskSeq { + use e = source.GetAsyncEnumerator(CancellationToken()) + let! step = e.MoveNextAsync() + let mutable go = step + + match predicate with + | Predicate predicate -> + while go do + let value = e.Current + if predicate value then + yield value + let! more = e.MoveNextAsync() + go <- more + else go <- false + | PredicateAsync predicate -> + while go do + let value = e.Current + match! predicate value with + | true -> + yield value + let! more = e.MoveNextAsync() + go <- more + | false -> go <- false + } + + let takeWhileInclusive predicate (source: taskSeq<_>) = taskSeq { + use e = source.GetAsyncEnumerator(CancellationToken()) + let! step = e.MoveNextAsync() + let mutable go = step + + match predicate with + | Predicate predicate -> + while go do + let value = e.Current + yield value + if predicate value then + let! more = e.MoveNextAsync() + go <- more + else go <- false + | PredicateAsync predicate -> + while go do + let value = e.Current + yield value + match! predicate value with + | true -> + let! more = e.MoveNextAsync() + go <- more + | false -> go <- false + } + // Consider turning using an F# version of this instead? // https://github.com/i3arnon/ConcurrentHashSet type ConcurrentHashSet<'T when 'T: equality>(ct) =