Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schedulers (merge of pull #199) #225

Merged
merged 26 commits into from
Apr 5, 2013

Conversation

benjchristensen
Copy link
Member

Manual merge of #199 by @mairbek plus the following changes:

  • made some classes non-public so they don't become part of the published API (if we find they have value in the public API we can make them so but once published it's hard to remove them so I'd rather keep them implementation details until then)
  • consolidated ExecutorScheduler and ScheduledExecutorScheduler
  • made ExecutorScheduler use a system-wide ScheduledExecutorScheduler for handling delayed events when only an Executor is available
  • made the IOThreadPool use a non-bounded cached thread-pool

Next step from here is to review all operator implementations and add the Scheduler overloads.

mairbek and others added 20 commits March 14, 2013 14:53
ExecutorScheduler throws exception for the delayed action.
…edulers-merge

Conflicts:
	rxjava-core/src/main/java/rx/Observable.java
For now keeping ScheduledObserver an implementation detail until it's clear we want it part of the long-term public API.
ScheduledExecutorScheduler is just an extension of ExecutorScheduler so keeping them together for less surface area on the API.
Until there is a use case other than unit testing I'm moving this to a non-public role so it's not part of the public API.
- added Javadocs
- moved some classes to package-private until they are proven necessary for the public API
- made ExecutorScheduler support Executor implementations and still work with time delays by using a system-wide scheduler/timer
- made IO thread-pool unbounded with a cached thread pool
- I plan on using this to expand unit testing around various aspects of schedulers
- this is not done as an inner-class as it does not correlate with just one class but is cross-functional over many classes thus it fits best here
@cloudbees-pull-request-builder

RxJava-pull-requests #77 SUCCESS
This pull request looks good

@cloudbees-pull-request-builder

RxJava-pull-requests #78 FAILURE
Looks like there's a problem with this pull request

@benjchristensen
Copy link
Member Author

The unit test is poorly written (non-deterministic). I'll fix in the morning.

@jmhofer
Copy link
Contributor

jmhofer commented Apr 5, 2013

Awesome work! - With this, it should be easy to implement operators like "sample" etc.

- This applies to any pools RxJava itself creates. It will be up to users to do this for Executors they inject.
@benjchristensen
Copy link
Member Author

Yes it should open the gates to many operators we haven't been able to pursue.

 … considering very long running app with lots of IO events.
@benjchristensen
Copy link
Member Author

Committed a few tweaks and fixes.

Open questions for me are:

  • we're not using the Scheduler.now value anywhere, should we be? or is that only for the Virtual scheduler used for testing?
  • not quite sure how the overloads should work on merge etc ... I've implemented them but the behavior might not yet match C#

@benjchristensen
Copy link
Member Author

Can someone with an Rx.Net environment setup implement a test similar to this from Java and tell me the output?

@Test
    public void testMixedSchedulers() throws InterruptedException {
        final String mainThreadName = Thread.currentThread().getName();

        Observable<String> o = Observable.<String> create(new Func1<Observer<String>, Subscription>() {

            @Override
            public Subscription call(Observer<String> observer) {

                System.out.println("Origin observable is running on: " + Thread.currentThread().getName());

                assertFalse(Thread.currentThread().getName().equals(mainThreadName));
                assertTrue("Actually: " + Thread.currentThread().getName(), Thread.currentThread().getName().startsWith("RxIOThreadPool"));

                observer.onNext("one");
                observer.onNext("two");
                observer.onNext("three");
                observer.onCompleted();

                return Subscriptions.empty();
            }
        }).subscribeOn(Schedulers.threadPoolForIO()); // subscribe to the source on the IO thread pool

        // now merge on the CPU threadpool
        o = Observable.<String> merge(o, Observable.<String> from("four", "five"))
                .subscribeOn(Schedulers.threadPoolForComputation())
                .map(new Func1<String, String>() {

                    @Override
                    public String call(String v) {
                        // opportunity to see what thread the merge is running on
                        System.out.println("Merge is running on: " + Thread.currentThread().getName());
                        return v;
                    }

                });

        final CountDownLatch latch = new CountDownLatch(1);

        final AtomicReference<RuntimeException> onError = new AtomicReference<RuntimeException>();

        // subscribe on a new thread
        o.subscribe(new Observer<String>() {

            @Override
            public void onCompleted() {
                System.out.println("==> received onCompleted");
                latch.countDown();
            }

            @Override
            public void onError(Exception e) {
                System.out.println("==> received onError: " + e.getMessage());
                onError.set((RuntimeException) e);
                latch.countDown();
            }

            @Override
            public void onNext(String v) {
                System.out.println("==> Final subscribe is running on: " + Thread.currentThread().getName());
                System.out.println("==> onNext: " + v);

            }
        }, Schedulers.newThread());

        // wait for the above to finish or blow up if it's blocked
        latch.await(5, TimeUnit.SECONDS);
    }

I'm trying to understand how a sequence should work when multiple subscribeOn operators are applied at different steps of a sequence.

Of course Rx.Net doesn't have the IO and CPU thread pools ... those are just helper methods to Executors which would be 2 separate threadpools for different work types so you'll need to adjust that.

@cloudbees-pull-request-builder

RxJava-pull-requests #79 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member Author

Is there any reason to use Scheduler.schedule on an internal operator implementation instead of just leveraging subscribeOn (other than when we need the delay arguments of course)?

For example, on a method overload of merge is this okay?

        return merge(source).subscribeOn(scheduler);

or is there some reason to inside the OperationMerge do this:

                return scheduler.schedule(new Func0<Subscription>() {

                    @Override
                    public Subscription call() {
                        return new MergeObservable<T>(o).call(observer);
                    }
                });

They seem to accomplish the same thing but would like to know if there's a reason to prefer one over the other.

I prefer just reusing subscribeOn.

I can't tell from reading C# code what it does as I can't find the extensions that implement the override methods!

/**
* Schedules an action to be executed in dueTime.
*
* @param action
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add param for dueTime

@benjchristensen
Copy link
Member Author

I'm pulling the trigger and merging this into master so people can start playing with it and providing feedback.

The public API changes are fairly limited still so most changes (which I fully expect) will be implementation details.

benjchristensen added a commit that referenced this pull request Apr 5, 2013
@benjchristensen benjchristensen merged commit 2bf03cf into ReactiveX:master Apr 5, 2013
I have some outstanding questions on how these should be implemented (or even why we need them when the 'subscribeOn' operator is far cleaner) so want to remove them for now so they don't make it into the public incorrectly.
@cloudbees-pull-request-builder

RxJava-pull-requests #80 SUCCESS
This pull request looks good

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants