From 80c8d92840e7b7049d4786873a6decdf34a6f292 Mon Sep 17 00:00:00 2001 From: JP Simard Date: Fri, 29 Nov 2024 10:39:47 -0500 Subject: [PATCH] Fix DemandBuffer Data Races We're still seeing crashes due to what look like data races in `DemandBuffer`. Here, I'm making fixes based on what appears to have helped in https://github.com/pointfreeco/swift-composable-architecture/pull/3447: In `buffer(value:)`: * Check if the demand is unlimited and, if so, unlock before calling `subscriber.receive(value)`. * If the demand is not unlimited, append the value to the buffer and call `flush()` after unlocking. In `flush(adding:)`: * Process values in a loop, acquiring and releasing the lock around shared state access. * Unlock before calling `subscriber.receive(value)` to avoid holding the lock during subscriber calls. * After sending a value, lock again to update `demandState.requested` with any additional demand returned by the subscriber. * Ensure that `subscriber.receive(completion:)` is called outside the lock. Also add a new `DemandBufferTests` test file. I added some tests that I was hoping would fail without the above changes, but they all pass both before and after, even with ThreadSanitizer enabled. --- .../Internal/Create.swift | 67 ++++--- .../DemandBufferTests.swift | 170 ++++++++++++++++++ 2 files changed, 209 insertions(+), 28 deletions(-) create mode 100644 Tests/ComposableArchitectureTests/DemandBufferTests.swift diff --git a/Sources/ComposableArchitecture/Internal/Create.swift b/Sources/ComposableArchitecture/Internal/Create.swift index 4b47b677f866..7350a8570efd 100644 --- a/Sources/ComposableArchitecture/Internal/Create.swift +++ b/Sources/ComposableArchitecture/Internal/Create.swift @@ -36,28 +36,25 @@ final class DemandBuffer: @unchecked Sendable { func buffer(value: S.Input) -> Subscribers.Demand { lock.lock() - defer { lock.unlock() } - precondition( - self.completion == nil, "How could a completed publisher sent values?! Beats me 🤷‍♂️") + self.completion == nil, "How could a completed publisher send values?! Beats me 🤷‍♂️") - switch demandState.requested { - case .unlimited: + if demandState.requested == .unlimited { + lock.unlock() return subscriber.receive(value) - default: + } else { buffer.append(value) + lock.unlock() return flush() } } func complete(completion: Subscribers.Completion) { lock.lock() - defer { lock.unlock() } - precondition( - self.completion == nil, "Completion have already occurred, which is quite awkward 🥺") - + self.completion == nil, "Completion has already occurred, which is quite awkward 🥺") self.completion = completion + lock.unlock() _ = flush() } @@ -66,39 +63,53 @@ final class DemandBuffer: @unchecked Sendable { } private func flush(adding newDemand: Subscribers.Demand? = nil) -> Subscribers.Demand { - self.lock.sync { + var sentDemand = Subscribers.Demand.none + var completionToSend: Subscribers.Completion? - if let newDemand = newDemand { - demandState.requested += newDemand - } + lock.lock() + if let newDemand = newDemand { + demandState.requested += newDemand + } + lock.unlock() - // If buffer isn't ready for flushing, return immediately - guard demandState.requested > 0 || newDemand == Subscribers.Demand.none else { return .none } + var loop = true + while loop { + var valueToSend: S.Input? - while !buffer.isEmpty && demandState.processed < demandState.requested { - demandState.requested += subscriber.receive(buffer.remove(at: 0)) + lock.lock() + if !buffer.isEmpty && demandState.processed < demandState.requested { + valueToSend = buffer.remove(at: 0) demandState.processed += 1 - } - - if let completion = completion { - // Completion event was already sent + sentDemand += 1 + } else if let completion = completion { buffer = [] demandState = .init() self.completion = nil - subscriber.receive(completion: completion) - return .none + completionToSend = completion + loop = false + } else { + loop = false + } + lock.unlock() + + if let value = valueToSend { + let additionalDemand = subscriber.receive(value) + lock.lock() + demandState.requested += additionalDemand + lock.unlock() } + } - let sentDemand = demandState.requested - demandState.sent - demandState.sent += sentDemand - return sentDemand + if let completion = completionToSend { + subscriber.receive(completion: completion) } + + return sentDemand } struct Demand { var processed: Subscribers.Demand = .none var requested: Subscribers.Demand = .none - var sent: Subscribers.Demand = .none } } diff --git a/Tests/ComposableArchitectureTests/DemandBufferTests.swift b/Tests/ComposableArchitectureTests/DemandBufferTests.swift new file mode 100644 index 000000000000..adfcdbc83248 --- /dev/null +++ b/Tests/ComposableArchitectureTests/DemandBufferTests.swift @@ -0,0 +1,170 @@ +#if DEBUG + @preconcurrency import Combine + @testable @preconcurrency import ComposableArchitecture + import XCTest + + final class DemandBufferTests: BaseTCATestCase { + func testConcurrentSend() async throws { + let values = LockIsolated>([]) + + let effect = AnyPublisher.create { subscriber in + Task.detached { @Sendable in + for index in 0...1_000 { + subscriber.send(index) + } + subscriber.send(completion: .finished) + } + return AnyCancellable {} + } + + let cancellable = effect.sink { value in + values.withValue { + _ = $0.insert(value) + } + } + + try await Task.sleep(nanoseconds: NSEC_PER_SEC) + + XCTAssertEqual(values.value, Set(0...1_000)) + + _ = cancellable + } + + func testConcurrentDemandAndSend() async throws { + let values = LockIsolated>([]) + let subscriberLock = LockIsolated(()) + + let effectSubscriber = LockIsolated.Subscriber?>(nil) + let effect = AnyPublisher.create { subscriber in + effectSubscriber.setValue(subscriber) + return AnyCancellable {} + } + + let cancellable = effect.sink { value in + values.withValue { + _ = $0.insert(value) + } + } + + await withTaskGroup(of: Void.self) { group in + for index in 0..<1_000 { + group.addTask { @Sendable in + effectSubscriber.value?.send(index) + } + group.addTask { @Sendable in + subscriberLock.withValue { _ in + _ = (effectSubscriber.value as? any Subscription)?.request(.max(1)) + } + } + } + } + + try await Task.sleep(nanoseconds: NSEC_PER_SEC) + + XCTAssertEqual(values.value, Set(0..<1_000)) + + _ = cancellable + } + + func testReentrantSubscriber() async throws { + let values = LockIsolated>([]) + let effectSubscriber = LockIsolated.Subscriber?>(nil) + + let effect = AnyPublisher.create { subscriber in + effectSubscriber.setValue(subscriber) + return AnyCancellable {} + } + + let cancellable = effect.sink { value in + values.withValue { + _ = $0.insert(value) + } + if value < 1_000 { + Task { @MainActor in + effectSubscriber.value?.send(value + 1_000) + } + } + } + + Task.detached { @Sendable in + for index in 0..<1_000 { + effectSubscriber.value?.send(index) + } + } + + try await Task.sleep(nanoseconds: NSEC_PER_SEC) + + XCTAssertEqual(values.value, Set(0..<2_000)) + + _ = cancellable + } + + func testNoDeadlockOnReentrantSend() { + let values = LockIsolated>([]) + let expectation = XCTestExpectation(description: "Test should not deadlock") + + let effectSubscriber = LockIsolated.Subscriber?>(nil) + let effect = AnyPublisher.create { subscriber in + effectSubscriber.withValue { $0 = subscriber } + return AnyCancellable {} + } + + let cancellable = effect.sink { value in + values.withValue { + _ = $0.insert(value) + } + // Prevent infinite recursion by limiting re-entrant calls + if value == 0 { + effectSubscriber.withValue { $0?.send(value + 1_000) } + } else { + expectation.fulfill() + } + } + + // Ensure that 'effectSubscriber' is set before we use it + effectSubscriber.withValue { subscriber in + XCTAssertNotNil(subscriber) + subscriber?.send(0) + } + + // Wait for the test to complete + wait(for: [expectation], timeout: 1.0) + + XCTAssertEqual(values.value, Set([0, 1_000])) + + _ = cancellable + } + + func testConcurrentSendAndCompletion() { + let values = LockIsolated>([]) + let expectation = XCTestExpectation(description: "All values received") + + let effect = AnyPublisher.create { subscriber in + // Concurrently send values + DispatchQueue.concurrentPerform(iterations: 1000) { index in + subscriber.send(index) + } + // Send completion + DispatchQueue.global().async { + subscriber.send(completion: .finished) + } + return AnyCancellable {} + } + + let cancellable = effect.sink( + receiveCompletion: { _ in expectation.fulfill() }, + receiveValue: { value in + values.withValue { + _ = $0.insert(value) + } + } + ) + + wait(for: [expectation], timeout: 5.0) + + XCTAssertEqual(values.value.count, 1000) + + _ = cancellable + } + } +#endif