Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RxScalaDemo: Make onBackpressureDropExample nicer #62

Closed
samuelgruetter opened this issue Nov 17, 2014 · 5 comments
Closed

RxScalaDemo: Make onBackpressureDropExample nicer #62

samuelgruetter opened this issue Nov 17, 2014 · 5 comments

Comments

@samuelgruetter
Copy link
Collaborator

I had a look at the onBackpressureDropExample in RxScalaDemo, and it took quite a while until I grasped it.

Of the 2000 items emitted by the fast Observable, the first 1024 are printed, and the others aren't. However, I'd have expected an output similar to

113
783
1683

That is, most numbers are dropped because the Subscriber is much slower than the producer, but some arbitrary elements get through. I then found out that observeOn requests 1024 items at once, and only then transmits the back pressure. So I wonder if we can make an example which does not need observeOn, or if there's another change to make the example more understandable, or if, at least, we could add some explanations. What do you think?

@samuelgruetter
Copy link
Collaborator Author

Moreover, this line in createFastObservable

(0 to 2000).takeWhile(_ => !subscriber.isUnsubscribed).foreach(subscriber.onNext(_))

probably does not what it should, as the following example illustrates:

scala> :paste
// Entering paste mode (ctrl-D to finish)

(0 to 5).takeWhile(_ => { println("checking if unsubscribed"); true })
        .foreach(_ => println("calling onNext"))

// Exiting paste mode, now interpreting.

checking if unsubscribed
checking if unsubscribed
checking if unsubscribed
checking if unsubscribed
checking if unsubscribed
checking if unsubscribed
calling onNext
calling onNext
calling onNext
calling onNext
calling onNext
calling onNext

@zsxwing
Copy link
Member

zsxwing commented Nov 18, 2014

My mistake :(

  1. toStream
(0 to 2000).toStream.takeWhile(_ => !subscriber.isUnsubscribed).foreach(subscriber.onNext(_))
  1. for-break
        import scala.util.control.Breaks.break
        for(i <- 0 to 2000) {
          if(subscriber.isUnsubscribed) break else  subscriber.onNext(_)
        }
  1. while loop
        var i = 0
        while(i < 2000 && !subscriber.isUnsubscribed) {
          subscriber.onNext(_)
          i += 1
        }

which one is better? I think 1) is fragile (at least for me, easy to forget toStream), 2) and 3) is easy to understand, but I don't like break. So I vote for 3)

@samuelgruetter
Copy link
Collaborator Author

I'd prefer 3) as well. With _ replaced by i of course ;-)

@zsxwing
Copy link
Member

zsxwing commented Nov 18, 2014

We need to make emitting items in Observable and receiving items in Subscriber run in different threads. Looks observeOn is necessary. The default value of RxRingBuffer.SIZE reduced to 128 in ReactiveX/RxJava#1836 Maybe now it's acceptable?

@dhoepelman
Copy link
Collaborator

Due to project EOL status, this improvement will not be made.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants