Skip to content

Commit

Permalink
Implement TaskSeq.skipWhile, skipWhileAsync, skipWhileInclusive
Browse files Browse the repository at this point in the history
… and `skipWhileInclusiveAsync`
  • Loading branch information
abelbraaksma committed Dec 21, 2023
1 parent de928fb commit b9d62a6
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 13 deletions.
4 changes: 4 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ type TaskSeq private () =
static member takeWhileAsync predicate source = Internal.takeWhile Exclusive (PredicateAsync predicate) source
static member takeWhileInclusive predicate source = Internal.takeWhile Inclusive (Predicate predicate) source
static member takeWhileInclusiveAsync predicate source = Internal.takeWhile Inclusive (PredicateAsync predicate) source
static member skipWhile predicate source = Internal.takeWhile Exclusive (Predicate predicate) source
static member skipWhileAsync predicate source = Internal.takeWhile Exclusive (PredicateAsync predicate) source
static member skipWhileInclusive predicate source = Internal.takeWhile Inclusive (Predicate predicate) source
static member skipWhileInclusiveAsync predicate source = Internal.takeWhile Inclusive (PredicateAsync predicate) source

static member tryPick chooser source = Internal.tryPick (TryPick chooser) source
static member tryPickAsync chooser source = Internal.tryPick (TryPickAsync chooser) source
Expand Down
66 changes: 61 additions & 5 deletions src/FSharp.Control.TaskSeq/TaskSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -830,10 +830,10 @@ type TaskSeq =
static member takeWhile: predicate: ('T -> bool) -> source: TaskSeq<'T> -> TaskSeq<'T>

/// <summary>
/// Returns a sequence that, when iterated, yields elements of the underlying sequence while the
/// Returns a task sequence that, when iterated, yields elements of the underlying sequence while the
/// given asynchronous function <paramref name="predicate" /> returns <see cref="true" />, and then returns no further elements.
/// The first element where the predicate returns <see cref="false" /> is not included in the resulting sequence
/// (see also <see cref="TaskSeq.takeWhileInclusive" />).
/// (see also <see cref="TaskSeq.takeWhileInclusiveAsync" />).
/// If <paramref name="predicate" /> is synchronous, consider using <see cref="TaskSeq.takeWhile" />.
/// </summary>
///
Expand All @@ -844,7 +844,7 @@ type TaskSeq =
static member takeWhileAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> TaskSeq<'T>

/// <summary>
/// Returns a sequence that, when iterated, yields elements of the underlying sequence until the given
/// Returns a task sequence that, when iterated, yields elements of the underlying sequence until the given
/// function <paramref name="predicate" /> returns <see cref="false" />, returns that element
/// and then returns no further elements (see also <see cref="TaskSeq.takeWhile" />). This function returns
/// at least one element of a non-empty sequence, or the empty task sequence if the input is empty.
Expand All @@ -858,9 +858,9 @@ type TaskSeq =
static member takeWhileInclusive: predicate: ('T -> bool) -> source: TaskSeq<'T> -> TaskSeq<'T>

/// <summary>
/// Returns a sequence that, when iterated, yields elements of the underlying sequence until the given
/// Returns a task sequence that, when iterated, yields elements of the underlying sequence until the given
/// asynchronous function <paramref name="predicate" /> returns <see cref="false" />, returns that element
/// and then returns no further elements (see also <see cref="TaskSeq.takeWhile" />). This function returns
/// and then returns no further elements (see also <see cref="TaskSeq.takeWhileAsync" />). This function returns
/// at least one element of a non-empty sequence, or the empty task sequence if the input is empty.
/// If <paramref name="predicate" /> is synchronous, consider using <see cref="TaskSeq.takeWhileInclusive" />.
/// </summary>
Expand All @@ -871,6 +871,62 @@ type TaskSeq =
/// <exception cref="T:ArgumentNullException">Thrown when the input task sequence is null.</exception>
static member takeWhileInclusiveAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> TaskSeq<'T>

/// <summary>
/// Returns a task sequence that, when iterated, skips elements of the underlying sequence while the
/// given function <paramref name="predicate" /> returns <see cref="true" />, and then yields the remaining
/// elements. The first element where the predicate returns <see cref="false" /> is returned, which means that this
/// function will skip 0 or more elements (see also <see cref="TaskSeq.skipWhileInclusive" />).
/// If <paramref name="predicate" /> is asynchronous, consider using <see cref="TaskSeq.skipWhileAsync" />.
/// </summary>
///
/// <param name="predicate">A function that evaluates to false when no more items should be skipped.</param>
/// <param name="source">The input task sequence.</param>
/// <returns>The resulting task sequence.</returns>
/// <exception cref="T:ArgumentNullException">Thrown when the input task sequence is null.</exception>
static member skipWhile: predicate: ('T -> bool) -> source: TaskSeq<'T> -> TaskSeq<'T>

/// <summary>
/// Returns a task sequence that, when iterated, skips elements of the underlying sequence while the
/// given asynchronous function <paramref name="predicate" /> returns <see cref="true" />, and then yields the
/// remaining elements. The first element where the predicate returns <see cref="false" /> is returned, which
/// means that this function will skip 0 or more elements (see also <see cref="TaskSeq.skipWhileInclusive" />).
/// If <paramref name="predicate" /> is synchronous, consider using <see cref="TaskSeq.skipWhile" />.
/// </summary>
///
/// <param name="predicate">An asynchronous function that evaluates to false when no more items should be skipped.</param>
/// <param name="source">The input task sequence.</param>
/// <returns>The resulting task sequence.</returns>
/// <exception cref="T:ArgumentNullException">Thrown when the input task sequence is null.</exception>
static member skipWhileAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> TaskSeq<'T>

/// <summary>
/// Returns a task sequence that, when iterated, skips elements of the underlying sequence until the given
/// function <paramref name="predicate" /> returns <see cref="false" />, also skips that element
/// and then yields the remaining elements (see also <see cref="TaskSeq.skipWhile" />). This function skips
/// at least one element of a non-empty sequence, or returns the empty task sequence if the input is empty.
/// If <paramref name="predicate" /> is asynchronous, consider using <see cref="TaskSeq.skipWhileInclusiveAsync" />.
/// </summary>`
///
/// <param name="predicate">A function that evaluates to false when no more items should be skipped.</param>
/// <param name="source">The input task sequence.</param>
/// <returns>The resulting task sequence.</returns>
/// <exception cref="T:ArgumentNullException">Thrown when the input task sequence is null.</exception>
static member skipWhileInclusive: predicate: ('T -> bool) -> source: TaskSeq<'T> -> TaskSeq<'T>

/// <summary>
/// Returns a task sequence that, when iterated, skips elements of the underlying sequence until the given
/// function <paramref name="predicate" /> returns <see cref="false" />, also skips that element
/// and then yields the remaining elements (see also <see cref="TaskSeq.skipWhileAsync" />). This function skips
/// at least one element of a non-empty sequence, or returns the empty task sequence if the input is empty.
/// If <paramref name="predicate" /> is synchronous, consider using <see cref="TaskSeq.skipWhileInclusive" />.
/// </summary>
///
/// <param name="predicate">An asynchronous function that evaluates to false when no more items should be skipped.</param>
/// <param name="source">The input task sequence.</param>
/// <returns>The resulting task sequence.</returns>
/// <exception cref="T:ArgumentNullException">Thrown when the input task sequence is null.</exception>
static member skipWhileInclusiveAsync: predicate: ('T -> #Task<bool>) -> source: TaskSeq<'T> -> TaskSeq<'T>

/// <summary>
/// Applies the given function <paramref name="chooser" /> to successive elements, returning the first result where
/// the function returns <see cref="Some(x)" />.
Expand Down
84 changes: 76 additions & 8 deletions src/FSharp.Control.TaskSeq/TaskSeqInternal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ type internal AsyncEnumStatus =

[<Struct>]
type internal WhileKind =
/// The item under test is included even if false
/// The item under test is included (or skipped) even if false
| Inclusive
/// The item under test is always excluded
/// The item under test is always excluded (or not skipped)
| Exclusive

[<Struct>]
Expand Down Expand Up @@ -731,55 +731,123 @@ module internal TaskSeqInternal =

taskSeq {
use e = source.GetAsyncEnumerator CancellationToken.None
let! step = e.MoveNextAsync()
let mutable more = step
let! moveFirst = e.MoveNextAsync()
let mutable more = moveFirst

match whileKind, predicate with
| Exclusive, Predicate predicate ->
| Exclusive, Predicate predicate -> // takeWhile
while more do
let value = e.Current
more <- predicate value

if more then
// yield ONLY if predicate is true
yield value
let! ok = e.MoveNextAsync()
more <- ok

| Inclusive, Predicate predicate ->
| Inclusive, Predicate predicate -> // takeWhileInclusive
while more do
let value = e.Current
more <- predicate value

// yield, regardless of result of predicate
yield value

if more then
let! ok = e.MoveNextAsync()
more <- ok

| Exclusive, PredicateAsync predicate ->
| Exclusive, PredicateAsync predicate -> // takeWhileAsync
while more do
let value = e.Current
let! passed = predicate value
more <- passed

if more then
// yield ONLY if predicate is true
yield value
let! ok = e.MoveNextAsync()
more <- ok

| Inclusive, PredicateAsync predicate ->
| Inclusive, PredicateAsync predicate -> // takeWhileInclusiveAsync
while more do
let value = e.Current
let! passed = predicate value
more <- passed

// yield regardless of predicate
yield value

if more then
let! ok = e.MoveNextAsync()
more <- ok
}

let skipWhile whileKind predicate (source: TaskSeq<_>) =
checkNonNull (nameof source) source

taskSeq {
use e = source.GetAsyncEnumerator CancellationToken.None
let! moveFirst = e.MoveNextAsync()
let mutable more = moveFirst

match whileKind, predicate with
| Exclusive, Predicate predicate -> // skipWhile
while more && predicate e.Current do
let! ok = e.MoveNextAsync()
more <- ok

// yield the rest
if more then
while! e.MoveNextAsync() do
yield e.Current

| Inclusive, Predicate predicate -> // skipWhileInclusive
while more && predicate e.Current do
let! ok = e.MoveNextAsync()
more <- ok

if more then
// yield the last one where the predicate was false
yield e.Current

// yield the rest
while! e.MoveNextAsync() do
yield e.Current

| Exclusive, PredicateAsync predicate -> // skipWhileAsync
while more do
let! passed = predicate e.Current
more <- passed

if more then
let! ok = e.MoveNextAsync()
more <- ok

// yield the rest
if more then
while! e.MoveNextAsync() do
yield e.Current

| Inclusive, PredicateAsync predicate -> // skipWhileInclusiveAsync
while more do
let! passed = predicate e.Current
more <- passed

if more then
let! ok = e.MoveNextAsync()
more <- ok

if more then
// yield the last one where the predicate was false
yield e.Current

// yield the rest
while! e.MoveNextAsync() do
yield e.Current
}

// Consider turning using an F# version of this instead?
// https://github.com/i3arnon/ConcurrentHashSet
type ConcurrentHashSet<'T when 'T: equality>(ct) =
Expand Down

0 comments on commit b9d62a6

Please sign in to comment.