Skip to content

Commit

Permalink
Add TimerSequence type (#150)
Browse files Browse the repository at this point in the history
* Add TimerSequence type

* Fix DocC

* Improve wait until condition

* Fix typo

* Changes to TimerSequence based on MR feedback and Combine behavior

* remove unnecessary messages from test expectations

* remove print statement

* [timer-publisher] - Added a yield to waitUntil to fix a flake and to encourage other work to happen. This makes the sleeping imprecise, but that's likely fine for this kind of utility - TT

Co-authored-by: Tyler Thompson <[email protected]>

* [timer-publisher] - Test stability fixes - TT

Co-authored-by: Tyler Thompson <[email protected]>

* move waitUntilScheduled into a separate file

---------

Co-authored-by: Tyler Thompson <[email protected]>
Co-authored-by: Tyler Thompson <[email protected]>
  • Loading branch information
3 people authored Nov 11, 2024
1 parent b15b005 commit 5b217b7
Show file tree
Hide file tree
Showing 4 changed files with 408 additions and 0 deletions.
94 changes: 94 additions & 0 deletions Sources/Afluent/SequenceOperators/TimerSequence.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
//
// TimerSequence.swift
// Afluent
//
// Created by Annalise Mariottini on 11/9/24.
//

import Foundation

@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, visionOS 1.0, *)
extension AsyncSequences {
/// A sequence that repeatedly emits an instant on a given interval.
public struct TimerSequence<C: Clock>: AsyncSequence, Sendable {
public typealias Element = C.Instant

init(interval: C.Duration, tolerance: C.Duration?, clock: C) {
self.interval = interval
self.tolerance = tolerance
self.clock = clock
}

private let interval: C.Duration
private let tolerance: C.Duration?
private let clock: C

public struct AsyncIterator: AsyncIteratorProtocol {
private var cancellables: Set<AnyCancellable> = []

init(interval: C.Duration, tolerance: C.Duration?, clock: C) {
self.interval = interval
self.tolerance = tolerance
self.clock = clock
}

private let interval: C.Duration
private let tolerance: C.Duration?
private let clock: C
private var last: C.Instant?
private var finished = false

public mutating func next() async -> C.Instant? {
guard !finished else {
return nil
}
let next = (self.last ?? clock.now).advanced(by: self.interval)
do {
try await clock.sleep(until: next, tolerance: self.tolerance)
} catch {
self.finished = true
return nil
}
let now = clock.now
self.last = next
return now
}
}

public func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(interval: interval, tolerance: tolerance, clock: clock)
}
}
}

@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, visionOS 1.0, *)
public typealias TimerSequence = AsyncSequences.TimerSequence

@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, visionOS 1.0, *)
extension TimerSequence where C == ContinuousClock {
/// Returns a sequence that repeatedly emits an instant of a continuous clock on the given interval.
///
/// - Parameters:
/// - interval: The time interval on which to publish events. For example, a value of `.milliseconds(1)` will publish an event approximately every 0.01 seconds.
/// - tolerance: The allowed timing variance when emitting events. Defaults to `nil`, which will schedule with the default tolerance strategy.
public static func publish(every interval: C.Duration, tolerance: C.Duration? = nil)
-> AsyncSequences.TimerSequence<C>
{
AsyncSequences.TimerSequence(interval: interval, tolerance: tolerance, clock: .init())
}
}

@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, visionOS 1.0, *)
extension TimerSequence {
/// Returns a sequence that repeatedly emits an instant of the passed clock on the given interval.
///
/// - Parameters:
/// - interval: The time interval on which to publish events. For example, a value of `.milliseconds(1)` will publish an event approximately every 0.01 seconds.
/// - tolerance: The allowed timing variance when emitting events. Defaults to `nil`, which will schedule with the default tolerance strategy.
/// - clock: The clock instance to utilize for sequence timing. For example, `ContinuousClock` or `SuspendingClock`.
public static func publish(every interval: C.Duration, tolerance: C.Duration? = nil, clock: C)
-> AsyncSequences.TimerSequence<C>
{
AsyncSequences.TimerSequence(interval: interval, tolerance: tolerance, clock: clock)
}
}
258 changes: 258 additions & 0 deletions Tests/AfluentTests/SequenceTests/TimerSequenceTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
//
// TimerSequenceTests.swift
// Afluent
//
// Created by Annalise Mariottini on 11/9/24.
//

@_spi(Experimental) import Afluent
import Atomics
import Clocks
import ConcurrencyExtras
import Foundation
import Testing

struct TimerSequenceTests {
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, visionOS 1.0, *)
@Test(arguments: 1...10)
func timerSequencePublishesOnInterval(expectedCount: Int) async throws {
let testClock = TestClock()
let testOutput = TestOutput()

let task = try await Task<Void, any Error>.waitUntilScheduled {
for try await output in TimerSequence.publish(
every: .milliseconds(10), clock: testClock)
{
await testOutput.append(output)
}
}

await testClock.advance(by: .milliseconds(10) * expectedCount)

try await wait(
until: await testOutput.output.count == expectedCount, timeout: .milliseconds(1))

let expectedOutput: [TestClock<Duration>.Instant] = Array(1...expectedCount).map {
.init(offset: .milliseconds(10) * $0)
}
let actualOutput = await testOutput.output
#expect(actualOutput == expectedOutput)

task.cancel()
}

@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, visionOS 1.0, *)
@Test(arguments: 2...10)
func timerSequencePublishesOnInterval_withMultipleIterators(expectedCount: Int) async throws {
let testClock = TestClock()
let testOutput1 = TestOutput()
let testOutput2 = TestOutput()

let sequence = TimerSequence.publish(every: .milliseconds(10), clock: testClock)

let task1 = try await Task<Void, any Error>.waitUntilScheduled {
for try await output in sequence {
await testOutput1.append(output)
}
}

await testClock.advance(by: .milliseconds(10))
try await wait(until: await testOutput1.output.count == 1, timeout: .milliseconds(1))

let task2 = try await Task<Void, any Error>.waitUntilScheduled {
for try await output in sequence {
await testOutput2.append(output)
}
}

await testClock.advance(by: .milliseconds(10) * (expectedCount - 1))

try await wait(
until: await testOutput1.output.count == expectedCount, timeout: .milliseconds(1))

let expectedOutput1: [TestClock<Duration>.Instant] = Array(1...expectedCount).map {
.init(offset: .milliseconds(10) * $0)
}
// the second sequence subscribed after the 1st value was already published
let expectedOutput2: [TestClock<Duration>.Instant] = Array(2...expectedCount).map {
.init(offset: .milliseconds(10) * $0)
}
let actualOutput1 = await testOutput1.output
let actualOutput2 = await testOutput2.output
#expect(actualOutput1 == expectedOutput1)
#expect(actualOutput2 == expectedOutput2)

task1.cancel()
task2.cancel()
}

// For this test case, we want to test infrequent demand that occurs off of the interval of the publisher
// In this specific test, we have a publisher emitting every 0.01 seconds
// But we only await the next value every 0.015 seconds
// The first element emitted will be at 0.01, but every element after that will "skew" to the cadence of the demand
// E.g. [0.01, 0.025, 0.04, 0.055, 0.07, 0.085, ...]
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, visionOS 1.0, *)
@Test(arguments: [10])
func timerSequencePublishesOnInterval_whenNotContinuouslySubscribed(expectedCount: Int)
async throws
{
try await withMainSerialExecutor {
let testClock = TestClock()
let testOutput = TestOutput()

let skew = 1.5
let skewAmount = skew - 1.0

final class Wrapper<Iterator: AsyncIteratorProtocol>: @unchecked Sendable
where Iterator.Element: Sendable {
init(_ iterator: Iterator) {
self.iterator = iterator
}
var iterator: Iterator
var nextCalled = PassthroughSubject<Void>()

func next() async throws -> Iterator.Element? {
let task = Task {
try await iterator.next()
}
await Task.yield()
nextCalled.send()
return try await task.value
}
}

let wrappedIterator = Wrapper(
TimerSequence.publish(every: .milliseconds(10), clock: testClock)
.makeAsyncIterator())

async let nextCalled: Void? = try await wrappedIterator.nextCalled.first()
let task = try await Task<Void, any Error>.waitUntilScheduled {
for _ in 0..<expectedCount {
if let output = try await wrappedIterator.next() {
await testOutput.append(output)
await testClock.advance(by: .milliseconds(10) * skew)
}
}
}

try await nextCalled

await testClock.advance(by: .milliseconds(10) * skew)
try await wait(
until: await testOutput.output.count == expectedCount, timeout: .seconds(1))

let expectedOutput: [TestClock<Duration>.Instant] = Array(1...expectedCount)
.map {
switch $0 {
case 1:
return .init(offset: .milliseconds(10))
default:
let multiplier = Double($0) * skew
return .init(
offset: .milliseconds(10 * multiplier)
- .milliseconds(10 * skewAmount))
}
}
let actualOutput = await testOutput.output
#expect(actualOutput == expectedOutput)

task.cancel()
}
}

@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, visionOS 1.0, *)
@Test(arguments: 1...10)
func timerSequencePublishesOnInterval_whenTimePassesBetweeIteratorCreationAndActualDemand(
expectedCount: Int
) async throws {
let testClock = TestClock()
let testOutput = TestOutput()

let sequence = TimerSequence.publish(every: .milliseconds(10), clock: testClock)

let initialWaitIntervals = 5
let clockAdvanced = SingleValueSubject<Void>()

let task = try await Task<Void, any Error>.waitUntilScheduled {
var iterator = sequence.makeAsyncIterator()
await testClock.advance(by: .milliseconds(10) * initialWaitIntervals)
try clockAdvanced.send()
while let output = await iterator.next() {
await testOutput.append(output)
}
}

try await clockAdvanced.execute()
// this wait should fail, since the time interval advance occurred _before_ next() was called
await #expect(throws: TimeoutError.timedOut) {
try await wait(
until: await testOutput.output.count == initialWaitIntervals,
timeout: .milliseconds(1))
}
// now we advance the clock to our expected count interval amount
await testClock.advance(by: .milliseconds(10) * expectedCount)
try await wait(
until: await testOutput.output.count == expectedCount, timeout: .milliseconds(1))

// since time advanced before demand, there will be some initial offset in the output
let expectedOutput: [TestClock<Duration>.Instant] = Array(1...expectedCount)
.map { $0 + initialWaitIntervals }
.map { .init(offset: .milliseconds(10) * $0) }
let actualOutput = await testOutput.output
#expect(actualOutput == expectedOutput)

task.cancel()
}

@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, visionOS 1.0, *)
@Test func timerSequenceProperlyCancels() async throws {
let testClock = TestClock()
let testOutput = TestOutput()

let taskCancelledSubject = SingleValueSubject<Void>()

let task = try await Task<Void, any Error>.waitUntilScheduled {
var iterator = TimerSequence.publish(every: .milliseconds(10), clock: testClock)
.makeAsyncIterator()

let instant1 = try #require(await iterator.next())
await testOutput.append(instant1)

// this would throw, since we're cancelled
try? await taskCancelledSubject.execute()

let instant2 = await iterator.next()
#expect(instant2 == nil)
}

await testClock.advance(by: .milliseconds(10))
try await wait(until: await testOutput.output.count == 1, timeout: .seconds(1))
task.cancel()
try taskCancelledSubject.send()
try await task.value

let expectedOutput: [TestClock<Duration>.Instant] = [
.init(offset: .milliseconds(10))
]
let actualOutput = await testOutput.output
#expect(actualOutput == expectedOutput)
}
}

private actor TestOutput<Value> {
init(_ type: Value.Type) {
self.output = []
}

var output: [Value]

func append(_ instant: Value) {
self.output.append(instant)
}
}

extension TestOutput where Value == TestClock<Duration>.Instant {
init() {
self.init(Value.self)
}
}
33 changes: 33 additions & 0 deletions Tests/AfluentTests/WaitUntilCondition.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//
// WaitUntilCondition.swift
// Afluent
//
// Created by Annalise Mariottini on 11/9/24.
//

import Afluent

/// Waits for some condition before proceeding, unless the specified timeout is reached, in which case an error is thrown.
func wait(until condition: @autoclosure @escaping @Sendable () async -> Bool, timeout: Duration)
async throws
{
try await wait(until: await condition(), timeout: timeout, clock: ContinuousClock())
}

/// Waits for some condition before proceeding, unless the specified timeout is reached, in which case an error is thrown.
func wait<C: Clock>(
until condition: @autoclosure @escaping @Sendable () async -> Bool, timeout: C.Duration,
clock: C
) async throws {
let start = clock.now
let checkTimeout = {
if start.duration(to: clock.now) >= timeout {
throw TimeoutError.timedOut(duration: timeout)
}
}
while await condition() == false {
await Task.yield()
try checkTimeout()
try await clock.sleep(for: clock.minimumResolution)
}
}
Loading

0 comments on commit 5b217b7

Please sign in to comment.