Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example of creating Observable from an I/O source #840

Closed
fommil opened this issue Feb 8, 2014 · 22 comments
Closed

Example of creating Observable from an I/O source #840

fommil opened this issue Feb 8, 2014 · 22 comments

Comments

@fommil
Copy link

fommil commented Feb 8, 2014

I'd like to see an example of something simple like reading lines from a File, in Scala.

It would then be incredibly useful to see:

  • applying filters (e.g. users only receive non-comment lines), potentially with the Observable itself knowing it can skip processing if nobody is subscribed (I'm not sure if this is even possible... but could allow some optimisations)
  • parallel calculations in the subscribers but with throttling on the observable. imagine a 10GB CSV file being read by the observable, Subscribers doing expensive processing in parallel per row, but no more than 100 rows are ever actually "alive" in the system, thereby controlling the memory usage.
@benjchristensen
Copy link
Member

parallel calculations

Is it okay for the parallel execution to be unordered or do you need the output to remain in the same order as coming from the file? The difference is significant in how the problem is tackled.

skip processing if nobody is subscribed

If nobody subscribes then the Observable is not running unless you have turned it into a "hot" Observable behind a Subject, but that doesn't really make sense for file processing.

Thus, a filter operator would drop all comment lines and emit only the unfiltered lines and you'd only process what you're interested in.

@fommil
Copy link
Author

fommil commented Feb 10, 2014

unordered is fine

@abersnaze
Copy link
Contributor

For working with large/infinite streams of text I recommend you look at the rxjava-string contrib module. There are a few basic operators working with strings and byte arrays. I just submitted a pull request #843 to add a from(Reader) method to make the example below work. Translation to Scala is left to the reader.

import static rx.observables.StringObservable.*;

...

split(from(new FileReader("foo.txt")), "\n").filter(new Func1<String, Boolean>() {
    public Boolean call(String line) {
        return !line.startsWith("//");
    }
}).subscribe(...);

@fommil
Copy link
Author

fommil commented Feb 10, 2014

I' not actually interested in string files, I've just used that as a simple example without having to explain the binary format of the file we're parsing :-)

@fommil
Copy link
Author

fommil commented Feb 10, 2014

The from code is a good starting place though, thanks!

@benjchristensen
Copy link
Member

Take a look at the parallel operator for doing processing work on multiple threads.

@fommil
Copy link
Author

fommil commented Feb 16, 2014

@abersnaze should onCompleted be called when it gets to the end of the file? I'm seeing infinite hanging here: https://github.com/fommil/rx-playground/blob/master/src/main/scala/com/github/fommil/rx/scratch.scala#L154

@benjchristensen how am I supposed to use parallel? It takes two parameters, I was hoping to just add parallel similarly to the way Scala collections can be .par'd

@abersnaze
Copy link
Contributor

Yes. If oncompleted is not called things tend to hang forever.

On Feb 16, 2014, at 11:50 AM, Sam Halliday [email protected] wrote:

@abersnaze should onCompleted be called when it gets to the end of the file? I'm seeing infinite hanging here: https://github.com/fommil/rx-playground/blob/master/src/main/scala/com/github/fommil/rx/scratch.scala#L154

@benjchristensen how am I supposed to use parallel? It takes two parameters, I was hoping to just add in similarly to the way Scala collections can be .par'd


Reply to this email directly or view it on GitHub.

@fommil
Copy link
Author

fommil commented Feb 17, 2014

@abersnaze is that a bug you'll be fixing soon?

@fommil
Copy link
Author

fommil commented Feb 19, 2014

@benjchristensen I wrapped my Observable with parallel (is this the correct way to do it? It feels clunky, having to pass the identity, and mixing the Java/Scala APIs to use the StringObservables)

https://github.com/fommil/rx-playground/blob/master/src/main/scala/com/github/fommil/rx/scratch.scala#L150

but this results in an OutOfMemoryException when run with 10mb heap. (I'm trying to simulate parsing a large input source that cannot be retained in memory).

This is very representative of problems I face on an recurring basis and it appears that the current Observable parallel setup is to do the same thing as creating lots of Futures (which also OOMs, or takes forever to complete).

I am aware that you've said (on the mailing list) that this is a work in progress and there are plans to allow customisation of the parallel strategy.

For me it would be ideal if the producer and consumer were seen as pots of atomic work (e.g. readNext, onNext) and I could write a parallelisation strategy on that basis... e.g. always call readNext if I have less than 10 (buffered + outstanding readNext), otherwise always call onNext and (the differentiator from the rest) if there are no elements in my buffer then give the Thread back to the pool and don't make any more decisions until one of the readNexts returns.

@benjchristensen
Copy link
Member

Could you write a unit test in Java without the various dependencies of that code that demonstrates the OOM issue?

@fommil
Copy link
Author

fommil commented Feb 19, 2014

@benjchristensen if it helps, absolutely! I might drop the StringObservables too, and just use an infinite stream of integers or something

@benjchristensen
Copy link
Member

Thank you, it definitely would. I'd like to use the unit test to track down the issue, resolve it and put in the codebase. If you submit it as a PR I'll merge it in as I work on fixing it.

@fommil
Copy link
Author

fommil commented Feb 19, 2014

@benjchristensen I'm struggling with gradle.

A problem occurred configuring root project 'rxjava'.
> Could not resolve all dependencies for configuration ':classpath'.
   > Could not download artifact 'com.jcraft:jsch:0.1.44-1@jar'
      > Artifact 'com.jcraft:jsch:0.1.44-1@jar' not found.

I might just do this as a unit test in my repo and then copy it over when I can compile and open in IntelliJ.

@fommil
Copy link
Author

fommil commented Feb 19, 2014

actually, writing this might have to run as part of a separate "low heap size" test package. It'll obviously pass if the default jvm flags are used.

@fommil
Copy link
Author

fommil commented Feb 19, 2014

@benjchristensen please find test... you'll need to run it with a really small heap (like 5m) to see the problem. My gradle foo is not strong enough to help you set that up (I think I'd actually struggle under sbt or maven as well to be honest!)

Output:

testInfiniteSource: PASS
testInfiniteSourceWithParallel: Exception in thread "main" java.lang.OutOfMemoryError: Java heap space

@benjchristensen
Copy link
Member

Thank you, I'll play with this.

@fommil
Copy link
Author

fommil commented Feb 19, 2014

to avoid the need to change the memory settings, the test could be easily changed to return arrays of 10 to 100MB. That outta cause OOMs with your default test settings (what is your default?)

@fommil
Copy link
Author

fommil commented Feb 20, 2014

@benjchristensen see trivial update that OOMs with default JVM settings.

@fommil
Copy link
Author

fommil commented Feb 23, 2014

@benjchristensen I should like to draw your attention to my experiment at https://github.com/fommil/rx-playground/commit/26d39e030dc10e9c7c99be638537b55473be8537

This is horrendously inefficient at the moment, and ugly, but the principles are there. In a nutshell, I have defined an interface BlockingProducer and a means to turn it into an Observable, with the ability to throttle the number of threads in consumers (I use consumer/subscriber synonymously here) whilst minimising blocking.

The algorithm is pretty simple, effectively: produce if the buffer is getting low (and we're not already blocking on the producer in another thread), otherwise consume (if there is anything to consume), otherwise rest (by giving the thread back to the pool). There is a pool of Futures, so this typically results in 1 producer and N consumer threads at any given moment. Mosts importantly, memory use is kept under control.

The performance could be greatly improved by reducing (or eliminating) the need to use locks, and the code could be cleaned up (dramatically) by a few simple refactors. However, a simple test (in the hack.scala) of reading a 50,000 CSV file, with 1ms processing time per line, does result in running N times faster. Contrast this with Observable.parallel which is prone to OutOfMemoryException as I showed above.

Next steps could be:

  • support other types of producers, e.g. non-blocking but with a back-off and retry strategy (possibly moot, because this is most effective when a tight loop over production creates too much data!); producers that can provide in parallel; or producers that require regular polling / heartbeats.
  • allow a "cool down" period after production. This is necessary when polling physical devices with a set frequency (e.g. the emotiv EEG headset via a USB link).
  • abstract the produce / consume decision so that it is more pluggable by users of the API
  • rewrite in Java so as to be more easily within the JavaRX framework (Runnable instead of Future, ScheduledExecutor instead of ExecutionContext, etc), try to be non-blocking as much as possible and minimise object creation (e.g. re-use Runnables, lose intermediate Consume class).

@benjchristensen
Copy link
Member

The backpressure concerns of this issue are being tracked in #1000 and worked on as part of the Reactive Streams (https://github.com/reactive-streams/reactive-streams) project.

If there is anything out of this discussion that you'd like to contribute back, please submit it as part of one of the existing contrib-modules or as a new one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants