Skip to content

Commit

Permalink
Fix DemandBuffer Data Races
Browse files Browse the repository at this point in the history
We're still seeing crashes due to what look like data races in
`DemandBuffer`. Here, I'm making fixes based on what appears to have
helped in
pointfreeco#3447:

In `buffer(value:)`:

* Check if the demand is unlimited and, if so, unlock before calling
  `subscriber.receive(value)`.
* If the demand is not unlimited, append the value to the buffer and
  call `flush()` after unlocking.

In `flush(adding:)`:

* Process values in a loop, acquiring and releasing the lock around
  shared state access.
* Unlock before calling `subscriber.receive(value)` to avoid holding
  the lock during subscriber calls.
* After sending a value, lock again to update `demandState.requested`
  with any additional demand returned by the subscriber.
* Ensure that `subscriber.receive(completion:)` is called outside the
  lock.

Also add a new `DemandBufferTests` test file. I added some tests that
I was hoping would fail without the above changes, but they all pass
both before and after, even with ThreadSanitizer enabled.
  • Loading branch information
jpsim committed Nov 29, 2024
1 parent fb82284 commit 80c8d92
Show file tree
Hide file tree
Showing 2 changed files with 209 additions and 28 deletions.
67 changes: 39 additions & 28 deletions Sources/ComposableArchitecture/Internal/Create.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,25 @@ final class DemandBuffer<S: Subscriber>: @unchecked Sendable {

func buffer(value: S.Input) -> Subscribers.Demand {
lock.lock()
defer { lock.unlock() }

precondition(
self.completion == nil, "How could a completed publisher sent values?! Beats me 🤷‍♂️")
self.completion == nil, "How could a completed publisher send values?! Beats me 🤷‍♂️")

switch demandState.requested {
case .unlimited:
if demandState.requested == .unlimited {
lock.unlock()
return subscriber.receive(value)
default:
} else {
buffer.append(value)
lock.unlock()
return flush()
}
}

func complete(completion: Subscribers.Completion<S.Failure>) {
lock.lock()
defer { lock.unlock() }

precondition(
self.completion == nil, "Completion have already occurred, which is quite awkward 🥺")

self.completion == nil, "Completion has already occurred, which is quite awkward 🥺")
self.completion = completion
lock.unlock()
_ = flush()
}

Expand All @@ -66,39 +63,53 @@ final class DemandBuffer<S: Subscriber>: @unchecked Sendable {
}

private func flush(adding newDemand: Subscribers.Demand? = nil) -> Subscribers.Demand {
self.lock.sync {
var sentDemand = Subscribers.Demand.none
var completionToSend: Subscribers.Completion<S.Failure>?

if let newDemand = newDemand {
demandState.requested += newDemand
}
lock.lock()
if let newDemand = newDemand {
demandState.requested += newDemand
}
lock.unlock()

// If buffer isn't ready for flushing, return immediately
guard demandState.requested > 0 || newDemand == Subscribers.Demand.none else { return .none }
var loop = true
while loop {
var valueToSend: S.Input?

while !buffer.isEmpty && demandState.processed < demandState.requested {
demandState.requested += subscriber.receive(buffer.remove(at: 0))
lock.lock()
if !buffer.isEmpty && demandState.processed < demandState.requested {
valueToSend = buffer.remove(at: 0)
demandState.processed += 1
}

if let completion = completion {
// Completion event was already sent
sentDemand += 1
} else if let completion = completion {
buffer = []
demandState = .init()
self.completion = nil
subscriber.receive(completion: completion)
return .none
completionToSend = completion
loop = false
} else {
loop = false
}
lock.unlock()

if let value = valueToSend {
let additionalDemand = subscriber.receive(value)
lock.lock()
demandState.requested += additionalDemand
lock.unlock()
}
}

let sentDemand = demandState.requested - demandState.sent
demandState.sent += sentDemand
return sentDemand
if let completion = completionToSend {
subscriber.receive(completion: completion)
}

return sentDemand
}

struct Demand {
var processed: Subscribers.Demand = .none
var requested: Subscribers.Demand = .none
var sent: Subscribers.Demand = .none
}
}

Expand Down
170 changes: 170 additions & 0 deletions Tests/ComposableArchitectureTests/DemandBufferTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#if DEBUG
@preconcurrency import Combine
@testable @preconcurrency import ComposableArchitecture
import XCTest

final class DemandBufferTests: BaseTCATestCase {
func testConcurrentSend() async throws {
let values = LockIsolated<Set<Int>>([])

let effect = AnyPublisher<Int, Never>.create { subscriber in
Task.detached { @Sendable in
for index in 0...1_000 {
subscriber.send(index)
}
subscriber.send(completion: .finished)
}
return AnyCancellable {}
}

let cancellable = effect.sink { value in
values.withValue {
_ = $0.insert(value)
}
}

try await Task.sleep(nanoseconds: NSEC_PER_SEC)

XCTAssertEqual(values.value, Set(0...1_000))

_ = cancellable
}

func testConcurrentDemandAndSend() async throws {
let values = LockIsolated<Set<Int>>([])
let subscriberLock = LockIsolated<Void>(())

let effectSubscriber = LockIsolated<Effect<Int>.Subscriber?>(nil)
let effect = AnyPublisher<Int, Never>.create { subscriber in
effectSubscriber.setValue(subscriber)
return AnyCancellable {}
}

let cancellable = effect.sink { value in
values.withValue {
_ = $0.insert(value)
}
}

await withTaskGroup(of: Void.self) { group in
for index in 0..<1_000 {
group.addTask { @Sendable in
effectSubscriber.value?.send(index)
}
group.addTask { @Sendable in
subscriberLock.withValue { _ in
_ = (effectSubscriber.value as? any Subscription)?.request(.max(1))
}
}
}
}

try await Task.sleep(nanoseconds: NSEC_PER_SEC)

XCTAssertEqual(values.value, Set(0..<1_000))

_ = cancellable
}

func testReentrantSubscriber() async throws {
let values = LockIsolated<Set<Int>>([])
let effectSubscriber = LockIsolated<Effect<Int>.Subscriber?>(nil)

let effect = AnyPublisher<Int, Never>.create { subscriber in
effectSubscriber.setValue(subscriber)
return AnyCancellable {}
}

let cancellable = effect.sink { value in
values.withValue {
_ = $0.insert(value)
}
if value < 1_000 {
Task { @MainActor in
effectSubscriber.value?.send(value + 1_000)
}
}
}

Task.detached { @Sendable in
for index in 0..<1_000 {
effectSubscriber.value?.send(index)
}
}

try await Task.sleep(nanoseconds: NSEC_PER_SEC)

XCTAssertEqual(values.value, Set(0..<2_000))

_ = cancellable
}

func testNoDeadlockOnReentrantSend() {
let values = LockIsolated<Set<Int>>([])
let expectation = XCTestExpectation(description: "Test should not deadlock")

let effectSubscriber = LockIsolated<Effect<Int>.Subscriber?>(nil)
let effect = AnyPublisher<Int, Never>.create { subscriber in
effectSubscriber.withValue { $0 = subscriber }
return AnyCancellable {}
}

let cancellable = effect.sink { value in
values.withValue {
_ = $0.insert(value)
}
// Prevent infinite recursion by limiting re-entrant calls
if value == 0 {
effectSubscriber.withValue { $0?.send(value + 1_000) }
} else {
expectation.fulfill()
}
}

// Ensure that 'effectSubscriber' is set before we use it
effectSubscriber.withValue { subscriber in
XCTAssertNotNil(subscriber)
subscriber?.send(0)
}

// Wait for the test to complete
wait(for: [expectation], timeout: 1.0)

XCTAssertEqual(values.value, Set([0, 1_000]))

_ = cancellable
}

func testConcurrentSendAndCompletion() {
let values = LockIsolated<Set<Int>>([])
let expectation = XCTestExpectation(description: "All values received")

let effect = AnyPublisher<Int, Never>.create { subscriber in
// Concurrently send values
DispatchQueue.concurrentPerform(iterations: 1000) { index in
subscriber.send(index)
}
// Send completion
DispatchQueue.global().async {
subscriber.send(completion: .finished)
}
return AnyCancellable {}
}

let cancellable = effect.sink(
receiveCompletion: { _ in expectation.fulfill() },
receiveValue: { value in
values.withValue {
_ = $0.insert(value)
}
}
)

wait(for: [expectation], timeout: 5.0)

XCTAssertEqual(values.value.count, 1000)

_ = cancellable
}
}
#endif

0 comments on commit 80c8d92

Please sign in to comment.