Skip to content

Commit

Permalink
Add implementation of TaskSeq.except/exceptOfSeq with tests and xml d…
Browse files Browse the repository at this point in the history
…ocumentation
  • Loading branch information
abelbraaksma committed Nov 9, 2022
1 parent c236b1a commit 47dedbd
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<Compile Include="TaskSeq.Delay.Tests.fs" />
<Compile Include="TaskSeq.Empty.Tests.fs" />
<Compile Include="TaskSeq.ExactlyOne.Tests.fs" />
<Compile Include="TaskSeq.Except.Tests.fs" />
<Compile Include="TaskSeq.Exists.Tests.fs" />
<Compile Include="TaskSeq.Filter.Tests.fs" />
<Compile Include="TaskSeq.FindIndex.Tests.fs" />
Expand Down
141 changes: 141 additions & 0 deletions src/FSharp.Control.TaskSeq.Test/TaskSeq.Except.Tests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
module TaskSeq.Tests.Except

open System
open Xunit
open FsUnit.Xunit
open FsToolkit.ErrorHandling

open FSharp.Control

//
// TaskSeq.except
// TaskSeq.exceptOfSeq
//


module EmptySeq =
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-except`` variant =
Gen.getEmptyVariant variant
|> TaskSeq.except (Gen.getEmptyVariant variant)
|> verifyEmpty

[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-exceptOfSeq`` variant =
Gen.getEmptyVariant variant
|> TaskSeq.exceptOfSeq Seq.empty
|> verifyEmpty

[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-except v2`` variant =
Gen.getEmptyVariant variant
|> TaskSeq.except TaskSeq.empty
|> verifyEmpty

[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-except v3`` variant =
TaskSeq.empty
|> TaskSeq.except (Gen.getEmptyVariant variant)
|> verifyEmpty

[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-except no side effect in exclude seq if source seq is empty`` variant =
let mutable i = 0

let exclude = taskSeq {
i <- i + 1
yield 12
}

TaskSeq.empty
|> TaskSeq.except exclude
|> verifyEmpty
|> Task.map (fun () -> i |> should equal 0) // exclude seq is only enumerated after first item in source

module Immutable =
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-except removes duplicates`` variant =
TaskSeq.ofList [ 1; 1; 2; 3; 4; 12; 12; 12; 13; 13; 13; 13; 13; 99 ]
|> TaskSeq.except (Gen.getSeqImmutable variant)
|> TaskSeq.toArrayAsync
|> Task.map (should equal [| 12; 13; 99 |])

[<Fact>]
let ``TaskSeq-except removes duplicates with empty itemsToExcept`` () =
TaskSeq.ofList [ 1; 1; 2; 3; 4; 12; 12; 12; 13; 13; 13; 13; 13; 99 ]
|> TaskSeq.except TaskSeq.empty
|> TaskSeq.toArrayAsync
|> Task.map (should equal [| 1; 2; 3; 4; 12; 13; 99 |])

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-except removes everything`` variant =
Gen.getSeqImmutable variant
|> TaskSeq.except (Gen.getSeqImmutable variant)
|> verifyEmpty

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-except removes everything with duplicates`` variant =
taskSeq {
yield! Gen.getSeqImmutable variant
yield! Gen.getSeqImmutable variant
yield! Gen.getSeqImmutable variant
yield! Gen.getSeqImmutable variant
}
|> TaskSeq.except (Gen.getSeqImmutable variant)
|> verifyEmpty

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-exceptOfSeq removes duplicates`` variant =
TaskSeq.ofList [ 1; 1; 2; 3; 4; 12; 12; 12; 13; 13; 13; 13; 13; 99 ]
|> TaskSeq.exceptOfSeq [ 1..10 ]
|> TaskSeq.toArrayAsync
|> Task.map (should equal [| 12; 13; 99 |])

[<Fact>]
let ``TaskSeq-exceptOfSeq removes duplicates with empty itemsToExcept`` () =
TaskSeq.ofList [ 1; 1; 2; 3; 4; 12; 12; 12; 13; 13; 13; 13; 13; 99 ]
|> TaskSeq.exceptOfSeq Seq.empty
|> TaskSeq.toArrayAsync
|> Task.map (should equal [| 1; 2; 3; 4; 12; 13; 99 |])

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-exceptOfSeq removes everything`` variant =
Gen.getSeqImmutable variant
|> TaskSeq.exceptOfSeq [ 1..10 ]
|> verifyEmpty

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-exceptOfSeq removes everything with duplicates`` variant =
taskSeq {
yield! Gen.getSeqImmutable variant
yield! Gen.getSeqImmutable variant
yield! Gen.getSeqImmutable variant
yield! Gen.getSeqImmutable variant
}
|> TaskSeq.exceptOfSeq [ 1..10 ]
|> verifyEmpty

module SideEffects =
[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
let ``TaskSeq-except removes duplicates`` variant =
TaskSeq.ofList [ 1; 1; 2; 3; 4; 12; 12; 12; 13; 13; 13; 13; 13; 99 ]
|> TaskSeq.except (Gen.getSeqWithSideEffect variant)
|> TaskSeq.toArrayAsync
|> Task.map (should equal [| 12; 13; 99 |])

[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
let ``TaskSeq-except removes everything`` variant =
Gen.getSeqWithSideEffect variant
|> TaskSeq.except (Gen.getSeqWithSideEffect variant)
|> verifyEmpty

[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
let ``TaskSeq-except removes everything with duplicates`` variant =
taskSeq {
yield! Gen.getSeqWithSideEffect variant
yield! Gen.getSeqWithSideEffect variant
yield! Gen.getSeqWithSideEffect variant
yield! Gen.getSeqWithSideEffect variant
}
|> TaskSeq.except (Gen.getSeqWithSideEffect variant)
|> verifyEmpty
25 changes: 2 additions & 23 deletions src/FSharp.Control.TaskSeq/TaskSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -56,29 +56,6 @@ module TaskSeq =
e.DisposeAsync().AsTask().Wait()
}

// FIXME: incomplete and incorrect code!!! TODO: still needed?
let toSeqOfTasks (source: taskSeq<'T>) = seq {
let e = source.GetAsyncEnumerator(CancellationToken())

// TODO: check this!
try
let mutable go = false

while go do
yield task {
let! step = e.MoveNextAsync()
go <- step

if step then
return e.Current
else
return Unchecked.defaultof<_> // FIXME!
}

finally
e.DisposeAsync().AsTask().Wait()
}

let toArrayAsync source =
Internal.toResizeArrayAsync source
|> Task.map (fun a -> a.ToArray())
Expand Down Expand Up @@ -281,6 +258,8 @@ module TaskSeq =
let tryFindAsync predicate source = Internal.tryFind (PredicateAsync predicate) source
let tryFindIndex predicate source = Internal.tryFindIndex (Predicate predicate) source
let tryFindIndexAsync predicate source = Internal.tryFindIndex (PredicateAsync predicate) source
let except itemsToExclude source = Internal.except itemsToExclude source
let exceptOfSeq itemsToExclude source = Internal.exceptOfSeq itemsToExclude source

let exists predicate source =
Internal.tryFind (Predicate predicate) source
Expand Down
38 changes: 38 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,44 @@ module TaskSeq =
/// <exception cref="T:ArgumentNullException">Thrown when the input sequence is null.</exception>
val existsAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> Task<bool>

/// <summary>
/// Returns a new task sequence with the distinct elements of the second task sequence which do not appear in the
/// <paramref name="itemsToExclude" />, using generic hash and equality comparisons to compare values.
/// </summary>
///
/// <remarks>
/// Note that this function returns a task sequence that digests the whole of the first input task sequence as soon as
/// the result sequence first gets awaited or iterated. As a result this function should not be used with
/// large or infinite sequences in the first parameter. The function makes no assumption on the ordering of the first input
/// sequence.
/// </remarks>
///
/// <param name="itemsToExclude">A task sequence whose elements that also occur in the second sequence will cause those elements to be removed from the returned sequence.</param>
/// <param name="source">A sequence whose elements that are not also in first will be returned.</param>
/// <returns>A sequence that contains the set difference of the elements of two sequences.</returns>
///
/// <exception cref="T:ArgumentNullException">Thrown when either of the two input sequences is null.</exception>
val except<'T when 'T: equality> : itemsToExclude: taskSeq<'T> -> source: taskSeq<'T> -> taskSeq<'T>

/// <summary>
/// Returns a new task sequence with the distinct elements of the second task sequence which do not appear in the
/// <paramref name="itemsToExclude" />, using generic hash and equality comparisons to compare values.
/// </summary>
///
/// <remarks>
/// Note that this function returns a task sequence that digests the whole of the first input task sequence as soon as
/// the result sequence first gets awaited or iterated. As a result this function should not be used with
/// large or infinite sequences in the first parameter. The function makes no assumption on the ordering of the first input
/// sequence.
/// </remarks>
///
/// <param name="itemsToExclude">A task sequence whose elements that also occur in the second sequence will cause those elements to be removed from the returned sequence.</param>
/// <param name="source">A sequence whose elements that are not also in first will be returned.</param>
/// <returns>A sequence that contains the set difference of the elements of two sequences.</returns>
///
/// <exception cref="T:ArgumentNullException">Thrown when either of the two input sequences is null.</exception>
val exceptOfSeq<'T when 'T: equality> : itemsToExclude: seq<'T> -> source: taskSeq<'T> -> taskSeq<'T>

/// <summary>
/// Zips two task sequences, returning a taskSeq of the tuples of each sequence, in order. May raise ArgumentException
/// if the sequences are or unequal length.
Expand Down
100 changes: 100 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeqInternal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -517,3 +517,103 @@ module internal TaskSeqInternal =
| true -> yield item
| false -> ()
}
// Consider turning using an F# version of this instead?
// https://github.com/i3arnon/ConcurrentHashSet
type ConcurrentHashSet<'T when 'T: equality>(ct) =
let _rwLock = new ReaderWriterLockSlim()
let hashSet = HashSet<'T>(Array.empty, HashIdentity.Structural)

member _.Add item =
_rwLock.EnterWriteLock()

try
hashSet.Add item
finally
_rwLock.ExitWriteLock()

member _.AddMany items =
_rwLock.EnterWriteLock()

try
for item in items do
hashSet.Add item |> ignore

finally
_rwLock.ExitWriteLock()

member _.AddManyAsync(source: taskSeq<'T>) = task {
use e = source.GetAsyncEnumerator(ct)
let mutable go = true
let! step = e.MoveNextAsync()
go <- step

while go do
// NOTE: r/w lock cannot cross thread boundaries. Should we use SemaphoreSlim instead?
// or alternatively, something like this: https://github.com/StephenCleary/AsyncEx/blob/8a73d0467d40ca41f9f9cf827c7a35702243abb8/src/Nito.AsyncEx.Coordination/AsyncReaderWriterLock.cs#L16
// not sure how they compare.

_rwLock.EnterWriteLock()

try
hashSet.Add e.Current |> ignore
finally
_rwLock.ExitWriteLock()

let! step = e.MoveNextAsync()
go <- step
}

interface IAsyncDisposable with
override _.DisposeAsync() =
if not (isNull _rwLock) then
_rwLock.Dispose()

ValueTask.CompletedTask

let except itemsToExclude (source: taskSeq<_>) = taskSeq {
use e = source.GetAsyncEnumerator(CancellationToken())
let mutable go = true
let! step = e.MoveNextAsync()
go <- step

if step then
// only create hashset by the time we actually start iterating
use hashSet = new ConcurrentHashSet<_>(CancellationToken())
do! hashSet.AddManyAsync itemsToExclude

while go do
let current = e.Current

// if true, it was added, and therefore unique, so we return it
// if false, it existed, and therefore a duplicate, and we skip
if hashSet.Add current then
yield current

let! step = e.MoveNextAsync()
go <- step

}

let exceptOfSeq itemsToExclude (source: taskSeq<_>) = taskSeq {
use e = source.GetAsyncEnumerator(CancellationToken())
let mutable go = true
let! step = e.MoveNextAsync()
go <- step

if step then
// only create hashset by the time we actually start iterating
use hashSet = new ConcurrentHashSet<_>(CancellationToken())
do hashSet.AddMany itemsToExclude

while go do
let current = e.Current

// if true, it was added, and therefore unique, so we return it
// if false, it existed, and therefore a duplicate, and we skip
if hashSet.Add current then
yield current

let! step = e.MoveNextAsync()
go <- step

}

0 comments on commit 47dedbd

Please sign in to comment.