diff --git a/Sources/Afluent/SequenceOperators/TimerSequence.swift b/Sources/Afluent/SequenceOperators/TimerSequence.swift new file mode 100644 index 000000000..be96a76e2 --- /dev/null +++ b/Sources/Afluent/SequenceOperators/TimerSequence.swift @@ -0,0 +1,94 @@ +// +// TimerSequence.swift +// Afluent +// +// Created by Annalise Mariottini on 11/9/24. +// + +import Foundation + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, visionOS 1.0, *) +extension AsyncSequences { + /// A sequence that repeatedly emits an instant on a given interval. + public struct TimerSequence: AsyncSequence, Sendable { + public typealias Element = C.Instant + + init(interval: C.Duration, tolerance: C.Duration?, clock: C) { + self.interval = interval + self.tolerance = tolerance + self.clock = clock + } + + private let interval: C.Duration + private let tolerance: C.Duration? + private let clock: C + + public struct AsyncIterator: AsyncIteratorProtocol { + private var cancellables: Set = [] + + init(interval: C.Duration, tolerance: C.Duration?, clock: C) { + self.interval = interval + self.tolerance = tolerance + self.clock = clock + } + + private let interval: C.Duration + private let tolerance: C.Duration? + private let clock: C + private var last: C.Instant? + private var finished = false + + public mutating func next() async -> C.Instant? { + guard !finished else { + return nil + } + let next = (self.last ?? clock.now).advanced(by: self.interval) + do { + try await clock.sleep(until: next, tolerance: self.tolerance) + } catch { + self.finished = true + return nil + } + let now = clock.now + self.last = next + return now + } + } + + public func makeAsyncIterator() -> AsyncIterator { + AsyncIterator(interval: interval, tolerance: tolerance, clock: clock) + } + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, visionOS 1.0, *) +public typealias TimerSequence = AsyncSequences.TimerSequence + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, visionOS 1.0, *) +extension TimerSequence where C == ContinuousClock { + /// Returns a sequence that repeatedly emits an instant of a continuous clock on the given interval. + /// + /// - Parameters: + /// - interval: The time interval on which to publish events. For example, a value of `.milliseconds(1)` will publish an event approximately every 0.01 seconds. + /// - tolerance: The allowed timing variance when emitting events. Defaults to `nil`, which will schedule with the default tolerance strategy. + public static func publish(every interval: C.Duration, tolerance: C.Duration? = nil) + -> AsyncSequences.TimerSequence + { + AsyncSequences.TimerSequence(interval: interval, tolerance: tolerance, clock: .init()) + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, visionOS 1.0, *) +extension TimerSequence { + /// Returns a sequence that repeatedly emits an instant of the passed clock on the given interval. + /// + /// - Parameters: + /// - interval: The time interval on which to publish events. For example, a value of `.milliseconds(1)` will publish an event approximately every 0.01 seconds. + /// - tolerance: The allowed timing variance when emitting events. Defaults to `nil`, which will schedule with the default tolerance strategy. + /// - clock: The clock instance to utilize for sequence timing. For example, `ContinuousClock` or `SuspendingClock`. + public static func publish(every interval: C.Duration, tolerance: C.Duration? = nil, clock: C) + -> AsyncSequences.TimerSequence + { + AsyncSequences.TimerSequence(interval: interval, tolerance: tolerance, clock: clock) + } +} diff --git a/Tests/AfluentTests/SequenceTests/TimerSequenceTests.swift b/Tests/AfluentTests/SequenceTests/TimerSequenceTests.swift new file mode 100644 index 000000000..294596538 --- /dev/null +++ b/Tests/AfluentTests/SequenceTests/TimerSequenceTests.swift @@ -0,0 +1,258 @@ +// +// TimerSequenceTests.swift +// Afluent +// +// Created by Annalise Mariottini on 11/9/24. +// + +@_spi(Experimental) import Afluent +import Atomics +import Clocks +import ConcurrencyExtras +import Foundation +import Testing + +struct TimerSequenceTests { + @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, visionOS 1.0, *) + @Test(arguments: 1...10) + func timerSequencePublishesOnInterval(expectedCount: Int) async throws { + let testClock = TestClock() + let testOutput = TestOutput() + + let task = try await Task.waitUntilScheduled { + for try await output in TimerSequence.publish( + every: .milliseconds(10), clock: testClock) + { + await testOutput.append(output) + } + } + + await testClock.advance(by: .milliseconds(10) * expectedCount) + + try await wait( + until: await testOutput.output.count == expectedCount, timeout: .milliseconds(1)) + + let expectedOutput: [TestClock.Instant] = Array(1...expectedCount).map { + .init(offset: .milliseconds(10) * $0) + } + let actualOutput = await testOutput.output + #expect(actualOutput == expectedOutput) + + task.cancel() + } + + @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, visionOS 1.0, *) + @Test(arguments: 2...10) + func timerSequencePublishesOnInterval_withMultipleIterators(expectedCount: Int) async throws { + let testClock = TestClock() + let testOutput1 = TestOutput() + let testOutput2 = TestOutput() + + let sequence = TimerSequence.publish(every: .milliseconds(10), clock: testClock) + + let task1 = try await Task.waitUntilScheduled { + for try await output in sequence { + await testOutput1.append(output) + } + } + + await testClock.advance(by: .milliseconds(10)) + try await wait(until: await testOutput1.output.count == 1, timeout: .milliseconds(1)) + + let task2 = try await Task.waitUntilScheduled { + for try await output in sequence { + await testOutput2.append(output) + } + } + + await testClock.advance(by: .milliseconds(10) * (expectedCount - 1)) + + try await wait( + until: await testOutput1.output.count == expectedCount, timeout: .milliseconds(1)) + + let expectedOutput1: [TestClock.Instant] = Array(1...expectedCount).map { + .init(offset: .milliseconds(10) * $0) + } + // the second sequence subscribed after the 1st value was already published + let expectedOutput2: [TestClock.Instant] = Array(2...expectedCount).map { + .init(offset: .milliseconds(10) * $0) + } + let actualOutput1 = await testOutput1.output + let actualOutput2 = await testOutput2.output + #expect(actualOutput1 == expectedOutput1) + #expect(actualOutput2 == expectedOutput2) + + task1.cancel() + task2.cancel() + } + + // For this test case, we want to test infrequent demand that occurs off of the interval of the publisher + // In this specific test, we have a publisher emitting every 0.01 seconds + // But we only await the next value every 0.015 seconds + // The first element emitted will be at 0.01, but every element after that will "skew" to the cadence of the demand + // E.g. [0.01, 0.025, 0.04, 0.055, 0.07, 0.085, ...] + @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, visionOS 1.0, *) + @Test(arguments: [10]) + func timerSequencePublishesOnInterval_whenNotContinuouslySubscribed(expectedCount: Int) + async throws + { + try await withMainSerialExecutor { + let testClock = TestClock() + let testOutput = TestOutput() + + let skew = 1.5 + let skewAmount = skew - 1.0 + + final class Wrapper: @unchecked Sendable + where Iterator.Element: Sendable { + init(_ iterator: Iterator) { + self.iterator = iterator + } + var iterator: Iterator + var nextCalled = PassthroughSubject() + + func next() async throws -> Iterator.Element? { + let task = Task { + try await iterator.next() + } + await Task.yield() + nextCalled.send() + return try await task.value + } + } + + let wrappedIterator = Wrapper( + TimerSequence.publish(every: .milliseconds(10), clock: testClock) + .makeAsyncIterator()) + + async let nextCalled: Void? = try await wrappedIterator.nextCalled.first() + let task = try await Task.waitUntilScheduled { + for _ in 0...Instant] = Array(1...expectedCount) + .map { + switch $0 { + case 1: + return .init(offset: .milliseconds(10)) + default: + let multiplier = Double($0) * skew + return .init( + offset: .milliseconds(10 * multiplier) + - .milliseconds(10 * skewAmount)) + } + } + let actualOutput = await testOutput.output + #expect(actualOutput == expectedOutput) + + task.cancel() + } + } + + @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, visionOS 1.0, *) + @Test(arguments: 1...10) + func timerSequencePublishesOnInterval_whenTimePassesBetweeIteratorCreationAndActualDemand( + expectedCount: Int + ) async throws { + let testClock = TestClock() + let testOutput = TestOutput() + + let sequence = TimerSequence.publish(every: .milliseconds(10), clock: testClock) + + let initialWaitIntervals = 5 + let clockAdvanced = SingleValueSubject() + + let task = try await Task.waitUntilScheduled { + var iterator = sequence.makeAsyncIterator() + await testClock.advance(by: .milliseconds(10) * initialWaitIntervals) + try clockAdvanced.send() + while let output = await iterator.next() { + await testOutput.append(output) + } + } + + try await clockAdvanced.execute() + // this wait should fail, since the time interval advance occurred _before_ next() was called + await #expect(throws: TimeoutError.timedOut) { + try await wait( + until: await testOutput.output.count == initialWaitIntervals, + timeout: .milliseconds(1)) + } + // now we advance the clock to our expected count interval amount + await testClock.advance(by: .milliseconds(10) * expectedCount) + try await wait( + until: await testOutput.output.count == expectedCount, timeout: .milliseconds(1)) + + // since time advanced before demand, there will be some initial offset in the output + let expectedOutput: [TestClock.Instant] = Array(1...expectedCount) + .map { $0 + initialWaitIntervals } + .map { .init(offset: .milliseconds(10) * $0) } + let actualOutput = await testOutput.output + #expect(actualOutput == expectedOutput) + + task.cancel() + } + + @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, visionOS 1.0, *) + @Test func timerSequenceProperlyCancels() async throws { + let testClock = TestClock() + let testOutput = TestOutput() + + let taskCancelledSubject = SingleValueSubject() + + let task = try await Task.waitUntilScheduled { + var iterator = TimerSequence.publish(every: .milliseconds(10), clock: testClock) + .makeAsyncIterator() + + let instant1 = try #require(await iterator.next()) + await testOutput.append(instant1) + + // this would throw, since we're cancelled + try? await taskCancelledSubject.execute() + + let instant2 = await iterator.next() + #expect(instant2 == nil) + } + + await testClock.advance(by: .milliseconds(10)) + try await wait(until: await testOutput.output.count == 1, timeout: .seconds(1)) + task.cancel() + try taskCancelledSubject.send() + try await task.value + + let expectedOutput: [TestClock.Instant] = [ + .init(offset: .milliseconds(10)) + ] + let actualOutput = await testOutput.output + #expect(actualOutput == expectedOutput) + } +} + +private actor TestOutput { + init(_ type: Value.Type) { + self.output = [] + } + + var output: [Value] + + func append(_ instant: Value) { + self.output.append(instant) + } +} + +extension TestOutput where Value == TestClock.Instant { + init() { + self.init(Value.self) + } +} diff --git a/Tests/AfluentTests/WaitUntilCondition.swift b/Tests/AfluentTests/WaitUntilCondition.swift new file mode 100644 index 000000000..aa2268002 --- /dev/null +++ b/Tests/AfluentTests/WaitUntilCondition.swift @@ -0,0 +1,33 @@ +// +// WaitUntilCondition.swift +// Afluent +// +// Created by Annalise Mariottini on 11/9/24. +// + +import Afluent + +/// Waits for some condition before proceeding, unless the specified timeout is reached, in which case an error is thrown. +func wait(until condition: @autoclosure @escaping @Sendable () async -> Bool, timeout: Duration) + async throws +{ + try await wait(until: await condition(), timeout: timeout, clock: ContinuousClock()) +} + +/// Waits for some condition before proceeding, unless the specified timeout is reached, in which case an error is thrown. +func wait( + until condition: @autoclosure @escaping @Sendable () async -> Bool, timeout: C.Duration, + clock: C +) async throws { + let start = clock.now + let checkTimeout = { + if start.duration(to: clock.now) >= timeout { + throw TimeoutError.timedOut(duration: timeout) + } + } + while await condition() == false { + await Task.yield() + try checkTimeout() + try await clock.sleep(for: clock.minimumResolution) + } +} diff --git a/Tests/AfluentTests/WaitUntilScheduled.swift b/Tests/AfluentTests/WaitUntilScheduled.swift new file mode 100644 index 000000000..2404c5a69 --- /dev/null +++ b/Tests/AfluentTests/WaitUntilScheduled.swift @@ -0,0 +1,23 @@ +// +// WaitUntilScheduled.swift +// Afluent +// +// Created by Annalise Mariottini on 11/11/24. +// + +import Afluent + +extension Task where Failure == Error { + /// Spawns a new Task to run some async operation and waits for that task to begin execution before proceeding. + static func waitUntilScheduled( + operation: sending @escaping @isolated(any) () async throws -> Success + ) async throws -> Self { + let sub = SingleValueSubject() + let task = Task { + try? sub.send() + return try await operation() + } + try await sub.execute() + return task + } +}