Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race conditions in PrimitiveSequence+Concurrency #2641

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ All notable changes to this project will be documented in this file.
* Provides `Infallible` versions of `CombineLatest+Collection` helpers.
* Explicitly declare `APPLICATION_EXTENSION_API_ONLY` for CocoaPods

### Anomalies

* Fixes a crash that could occur when awaiting a `Single`, `Maybe`, or `Completable` that was disposed.

## 6.5.0

You can now use `await` on `Observable`-conforming objects (as well as `Driver`, `Signal`, `Infallible`, `Single`, `Completable`) using the following syntax:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,29 @@ public extension PrimitiveSequenceType where Trait == SingleTrait {
operation: {
try await withCheckedThrowingContinuation { continuation in
var didResume = false
let lock = RecursiveLock()
disposable.setDisposable(
self.subscribe(
onSuccess: {
didResume = true
continuation.resume(returning: $0)
onSuccess: { value in
lock.withLock {
guard !didResume else { return }
didResume = true
continuation.resume(returning: value)
}
},
onFailure: {
didResume = true
continuation.resume(throwing: $0)
onFailure: { error in
lock.withLock {
guard !didResume else { return }
didResume = true
continuation.resume(throwing: error)
}
},
onDisposed: {
guard !didResume else { return }
continuation.resume(throwing: CancellationError())
lock.withLock {
guard !didResume else { return }
didResume = true
continuation.resume(throwing: CancellationError())
}
}
)
)
Expand Down Expand Up @@ -111,27 +121,37 @@ public extension PrimitiveSequenceType where Trait == MaybeTrait {
return try await withTaskCancellationHandler(
operation: {
try await withCheckedThrowingContinuation { continuation in
var didEmit = false
var didResume = false
let lock = RecursiveLock()
Copy link
Contributor

@nikolaykasyanov nikolaykasyanov Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take this with a grain of salt, as far as I can tell RecursiveLock is the to-go tool in this codebase.

  • does it have to be recursive?
  • can it be replaced with an atomic & compare-and-swap?

Copy link
Author

@0xpablo 0xpablo Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, that's a good point. I used RecursiveLock precisely because of that, I saw that was the tool being used to synchronize code in the rest of the codebase. I initially thought about that but given that even AtomicInt inside RxSwift is backed by a NSLock, I didn't want to introduce a different way to synchronize this.
If a maintainer wants me to switch to CAS for this I'm happy to do that.

Copy link

@hoc081098 hoc081098 Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think changing didResume to AtomicInt and using fetchOr might be better 🙏

Copy link
Author

@0xpablo 0xpablo Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, AtomicInt is backed by an NSLock so the only thing that would do is making this code less clear.

disposable.setDisposable(
self.subscribe(
onSuccess: { value in
didEmit = true
didResume = true
continuation.resume(returning: value)
lock.withLock {
guard !didResume else { return }
didResume = true
continuation.resume(returning: value)
}
},
onError: { error in
didResume = true
continuation.resume(throwing: error)
lock.withLock {
guard !didResume else { return }
didResume = true
continuation.resume(throwing: error)
}
},
onCompleted: {
guard !didEmit else { return }
didResume = true
continuation.resume(returning: nil)
lock.withLock {
guard !didResume else { return }
didResume = true
continuation.resume(returning: nil)
}
},
onDisposed: {
guard !didResume else { return }
continuation.resume(throwing: CancellationError())
lock.withLock {
guard !didResume else { return }
didResume = true
continuation.resume(throwing: CancellationError())
}
}
)
)
Expand Down Expand Up @@ -168,19 +188,29 @@ public extension PrimitiveSequenceType where Trait == CompletableTrait, Element
operation: {
try await withCheckedThrowingContinuation { continuation in
var didResume = false
let lock = RecursiveLock()
disposable.setDisposable(
self.subscribe(
onCompleted: {
didResume = true
continuation.resume()
lock.withLock {
guard !didResume else { return }
didResume = true
continuation.resume()
}
},
onError: { error in
didResume = true
continuation.resume(throwing: error)
lock.withLock {
guard !didResume else { return }
didResume = true
continuation.resume(throwing: error)
}
},
onDisposed: {
guard !didResume else { return }
continuation.resume(throwing: CancellationError())
lock.withLock {
guard !didResume else { return }
didResume = true
continuation.resume(throwing: CancellationError())
}
}
)
)
Expand Down
102 changes: 102 additions & 0 deletions Tests/RxSwiftTests/PrimitiveSequence+ConcurrencyTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,40 @@ extension PrimitiveSequenceConcurrencyTests {
[randomResult]
)
}

/// A previous implementation of the `Single` to swift concurrency bridge had a bug where it would sometimes call the continuation twice.
/// The current number of iterations is a sweet spot to not make the tests too slow while still catching the bug in most runs.
/// If you are debugging this issue you might want to increase the iterations and/or run this test repeatedly.
func testSingleContinuationIsNotResumedTwice() {
let expectation = XCTestExpectation()
let iterations = 10000
for i in 0 ..< iterations {
DispatchQueue.global(qos: .userInitiated).async {
let single = Single<Int>.create { observer in
DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + 0.005) {
observer(.success(42))
}
return Disposables.create()
}

let task = Task {
_ = try await single.value
}

DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + 0.005) {
task.cancel()
}

self.sleep(Double.random(in: 0.004...0.006))

if i == iterations - 1 {
expectation.fulfill()
}
}
}

wait(for: [expectation], timeout: 10)
}
}

// MARK: - Maybe
Expand Down Expand Up @@ -167,6 +201,40 @@ extension PrimitiveSequenceConcurrencyTests {
try await Task.sleep(nanoseconds: 1_000_000)
task.cancel()
}

/// A previous implementation of the `Single` to swift concurrency bridge had a bug where it would sometimes call the continuation twice.
/// The current number of iterations is a sweet spot to not make the tests too slow while still catching the bug in most runs.
/// If you are debugging this issue you might want to increase the iterations and/or run this test repeatedly.
func testMaybeContinuationIsNotResumedTwice() {
let expectation = XCTestExpectation()
let iterations = 10000
for i in 0 ..< iterations {
DispatchQueue.global(qos: .userInitiated).async {
let maybe = Maybe<Bool>.create { observer in
DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + 0.005) {
observer(.success(true))
}
return Disposables.create()
}

let task = Task {
_ = try await maybe.value
}

DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + 0.005) {
task.cancel()
}

self.sleep(Double.random(in: 0.004...0.006))

if i == iterations - 1 {
expectation.fulfill()
}
}
}

wait(for: [expectation], timeout: 10)
}
}

// MARK: - Completable
Expand Down Expand Up @@ -220,6 +288,40 @@ extension PrimitiveSequenceConcurrencyTests {
}
}.cancel()
}

/// A previous implementation of the `Single` to swift concurrency bridge had a bug where it would sometimes call the continuation twice.
/// The current number of iterations is a sweet spot to not make the tests too slow while still catching the bug in most runs.
/// If you are debugging this issue you might want to increase the iterations and/or run this test repeatedly.
func testCompletableContinuationIsNotResumedTwice() {
let expectation = XCTestExpectation()
let iterations = 10000
for i in 0 ..< iterations {
DispatchQueue.global(qos: .userInitiated).async {
let completable = Completable.create { observer in
DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + 0.005) {
observer(.completed)
}
return Disposables.create()
}

let task = Task {
_ = try await completable.value
}

DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + 0.005) {
task.cancel()
}

self.sleep(Double.random(in: 0.004...0.006))

if i == iterations - 1 {
expectation.fulfill()
}
}
}

wait(for: [expectation], timeout: 10)
}
}
#endif