Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge() does not serialize on_next() calls #236

Closed
vermaport opened this issue Apr 6, 2018 · 1 comment
Closed

merge() does not serialize on_next() calls #236

vermaport opened this issue Apr 6, 2018 · 1 comment
Assignees
Labels

Comments

@vermaport
Copy link

Similar to ReactiveX/RxJava#200, it seems like merge() does not serialize emissions if its sources are emitting on different threads. Is there an RxPy equivalent to SynchronizedObserver to work around this?

I translated the unit test written for this issue, and it fails on RxPy 1.6.1 (latest in PIP).

ReactiveX/RxJava@effc08d#diff-e28fd5678569ef71dfc9e835c73a5c36R42

from threading import Event, Lock

from rx import Observable
from rx.concurrency import EventLoopScheduler
from rx.core import ObservableBase


class TestAsynchronousObservable(ObservableBase):
    def __init__(self, scheduler=None):
        super(TestAsynchronousObservable, self).__init__()
        self.scheduler = scheduler or EventLoopScheduler()
        self.on_next_being_sent = Event()

    def hello(self, scheduler, observer):
        self.on_next_being_sent.set()
        observer.on_next("hello")
        observer.on_completed()

    def _subscribe_core(self, observer):
        self.scheduler.schedule(self.hello, observer)


counter_lock = Lock()
total_counter = 0
concurrent_counter = 0
unblock = Event()
completed = Event()


def on_next(item):
    global counter_lock
    global total_counter
    global concurrent_counter
    global unblock

    with counter_lock:
        total_counter += 1
        concurrent_counter += 1

    unblock.wait()

    concurrent_counter -= 1


o1 = TestAsynchronousObservable()
o2 = TestAsynchronousObservable()
m = Observable.merge(o1, o2).subscribe(on_next, on_completed=completed.set)

# Wait for both Observables to send.
o1.on_next_being_sent.wait()
o2.on_next_being_sent.wait()

# One of the Observables should be blocked.
assert concurrent_counter == 1  # <-- Asserts here since merge() does not serialize the on_next() calls.

# Release it so it can finish.
unblock.set()

# Wait for the merge to complete.
completed.wait()

assert total_counter == 2
assert concurrent_counter == 0
@dbrattli dbrattli added the bug label Jun 23, 2018
@dbrattli dbrattli self-assigned this Jun 23, 2018
@lock
Copy link

lock bot commented Jun 23, 2019

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

@lock lock bot locked as resolved and limited conversation to collaborators Jun 23, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

2 participants