-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
2.x: Add scheduler creation factories #5002
Conversation
Resolves ReactiveX#4993 This is a pretty vanilla copy from RxJava 1's implementation. Note that I had to tune NewThread scheduler to not be a singleton to support this. We had talked about borrowing from project reactor's APIs for different overloads, let me know if you think we should add more fine-grained controls through these.
One other thing I'm worried about - is |
@@ -56,6 +54,9 @@ | |||
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority); | |||
|
|||
EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority); | |||
|
|||
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any particular reason why this was moved to the end?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It requires WORKER_THREAD_FACTORY to be initialized now since it's a parameter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right.
* system properties for configuring new thread creation. Cannot be null. | ||
*/ | ||
public SingleScheduler(ThreadFactory threadFactory) { | ||
this.threadFactory = ObjectHelper.requireNonNull(threadFactory, "threadFactory was null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either this is unnecessary or you forgot to do this with the other new constructors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, meant to remove it from all these constructors and just do it in the factory methods, accidentally missed this one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -180,6 +181,90 @@ public static Scheduler from(Executor executor) { | |||
} | |||
|
|||
/** | |||
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#computation()}. | |||
* @return the created Scheduler instance | |||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add @since 2.0.5 - experimental
as well for every new method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @return the created Scheduler instance | ||
*/ | ||
@Experimental | ||
public static Scheduler newComputation() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we'd want to allow non-parameterized instances to be created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you expand on that a bit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the use case for creating yet another computation/io/newThread scheduler with the default settings? I thought you wanted to override the ThreadFactory
.
*/ | ||
@Experimental | ||
public static Scheduler newComputation(ThreadFactory threadFactory) { | ||
return new ComputationScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory == null")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The usual NPE message is threadFactory is null
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha, will update
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another problem is that people will try to override the scheduler-init with these method calls for Indeed, |
Heading to bed right now but will update the PR in the morning with requested changes. Also let me know if there's anywhere I should put tests, as I wasn't sure what the right place would be. |
Sounds good to me, will do that in my updates tomorrow |
Yes, tests that verify the custom thread factory actually worked by checking a custom thread name for each case. |
Current coverage is 95.46% (diff: 79.48%)@@ 2.x #5002 diff @@
==========================================
Files 592 592
Lines 37989 38009 +20
Methods 0 0
Messages 0 0
Branches 5772 5772
==========================================
+ Hits 36257 36284 +27
+ Misses 764 761 -3
+ Partials 968 964 -4
|
Added scheduler because we're not in Scheduler anymore. Changed to "create" because "newNewThread" was weird
Also tweaked the naming a bit.
I tried setting this up matching some of the cdl-based approaches in the plugin tests but wan't able to get it working (just hangs). Pushed what I had in a1029b4, any insight? |
Heading on vacation for a couple weeks, but let me know what you think of getting the tests to work. I should have some time here and there to update the PR |
I've already told you: use the runnable in the factory method on the Thread constructor. |
Aren't I just testing the test ThreadFactory implementations then, and not that the schedulers are hooked up properly to power the scheduler? |
|
||
private static void verifyThread(Worker w, Predicate<Thread> threadPredicate) { | ||
try { | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove double try
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private static void verifyThread(Worker w, Predicate<Thread> threadPredicate) { | ||
try { | ||
try { | ||
final AtomicReference<Thread> value = new AtomicReference<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe change to AtomicBoolean
and just run the predicate inside the worker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then you don't even need to supply the Thread
to the Predicate
actually. It can verify whatever it wants.
What's the problem with the tests? They seem to be verifying that the supplied factory was used. |
Oh you didn't pass the supplied |
Ah! This is what I was missing, thanks for pointing that out. I've updated tests (which should now all be passing), and also made them a bit more robust (full integration with schedulers) to simulate a more real world use case. |
Failing test looks like a flake... |
Wait no it's not. Repro'd locally, will fix |
Fixed, I think. I added manual shutdowns of schedulers after they're done. Only IO had this lifecycle issue over tests, but I did the shutdown in all to be safe. |
Ok this time it seems actually flaky, as the same commit passed in a different travis job against my fork - https://travis-ci.org/hzsweers/RxJava/builds/195074173 |
} | ||
|
||
/** | ||
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#computation()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These need some rewording emphasizing that the there is always a new Scheduler instance, unlike with the non-new method call.
@@ -56,6 +54,9 @@ | |||
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority); | |||
|
|||
EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority); | |||
|
|||
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right.
* @return the created Scheduler instance | ||
*/ | ||
@Experimental | ||
public static Scheduler newComputation() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the use case for creating yet another computation/io/newThread scheduler with the default settings? I thought you wanted to override the ThreadFactory
.
ThreadFactory factory = new ThreadFactory() { | ||
@Override | ||
public Thread newThread(Runnable r) { | ||
return new Thread("Test"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this doesn't allow executing tasks because you ignore r
.
return "Test".equals(thread.getName()); | ||
} | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have to shut down the scheduler (in a finally block) to avoid leaking threads.
@@ -927,6 +929,58 @@ public static Completable onAssembly(Completable source) { | |||
} | |||
|
|||
/** | |||
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#computation()} | |||
* except using {@code threadFactory} for thread creation. | |||
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add notes to all of these methods something like this:
Note that the returned
Scheduler
must be shut down manually if theThreadFactory
doesn't create a daemon thread, otherwise the JVM may not quit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "Note that this takes precedence..." may be misleading. The number of threads is still based on the system configuration. I'd remove this sentence entirely
Thanks! |
Resolves #4993
This is a pretty vanilla copy from RxJava 1's implementation. Note that I had to tune NewThread scheduler to not be a singleton to support this.
We had talked about borrowing from project reactor's APIs for different overloads, let me know if you think we should add more fine-grained controls through these.