-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
2.x: Improve coverage & related cleanup 03/05 #5891
Conversation
if (SubscriptionHelper.setOnce(this, s)) { | ||
s.request(batchSize); | ||
} | ||
SubscriptionHelper.setOnce(this, s, batchSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the request
call comes directly after setOnce
, they can be combined into one and this resolves a lot of partial coverage cases.
} | ||
// otherwise, someone else changed the state (perhaps a concurrent | ||
// request or cancellation) so retry | ||
// add to the current requested and cap it at MAX_VALUE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was one of the earliest v2 operators where the infrastructure lacked the addCancel
method, simplifying this method significantly.
boolean setOther(Subscription o) { | ||
return SubscriptionHelper.setOnce(other, o); | ||
void setOther(Subscription o) { | ||
SubscriptionHelper.setOnce(other, o, Long.MAX_VALUE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is always Long.MAX_VALUE
and moved up from below.
} catch (ExecutionException ex) { | ||
if (!d.isDisposed()) { | ||
observer.onError(ex.getCause()); | ||
} catch (Throwable ex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reduce the code duplication to one catch and extract the actual Throwable
if ex
was the ExecutionException
.
@@ -54,7 +54,7 @@ protected void subscribeActual(SingleObserver<? super T> subscriber) { | |||
|
|||
@Override | |||
public void onSubscribe(Disposable d) { | |||
if (DisposableHelper.set(this, d)) { | |||
if (DisposableHelper.setOnce(this, d)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should have been setOnce
all along.
} | ||
ScheduledExecutorService next = Executors.newScheduledThreadPool(1, new RxThreadFactory("RxSchedulerPurge")); | ||
if (PURGE_THREAD.compareAndSet(curr, next)) { | ||
tryStart(PURGE_ENABLED); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Factored out into a method which takes this parameter to be able to unit test the setup code. In normal code, this start
call will usually happen once (class init).
@@ -81,61 +82,77 @@ public static void start() { | |||
* Stops the purge thread. | |||
*/ | |||
public static void shutdown() { | |||
ScheduledExecutorService exec = PURGE_THREAD.get(); | |||
ScheduledExecutorService exec = PURGE_THREAD.getAndSet(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting the PURGE_THREAD
to null
simplifies the start check above.
if (properties.containsKey(PURGE_ENABLED_KEY)) { | ||
purgeEnable = Boolean.getBoolean(PURGE_ENABLED_KEY); | ||
} | ||
PurgeProperties pp = new PurgeProperties(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Factored out to be able to test parameter loading. This static code block executes once per class init.
/** | ||
* Creates a ScheduledExecutorService with the given factory. | ||
* @param factory the thread factory | ||
* @return the ScheduledExecutorService | ||
*/ | ||
public static ScheduledExecutorService create(ThreadFactory factory) { | ||
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory); | ||
if (PURGE_ENABLED && exec instanceof ScheduledThreadPoolExecutor) { | ||
tryPutIntoPool(PURGE_ENABLED, exec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Factored out to be able to test with purge not enabled and a non- ScheduledThreadPoolExecutor
instance in case Executors.new ScheduledThreadExecutor
doesn't return such in some odd JVM.
} else { | ||
e.purge(); | ||
} | ||
for (ScheduledThreadPoolExecutor e : new ArrayList<ScheduledThreadPoolExecutor>(POOLS.keySet())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the surrounding try-catch as it is practically impossible this loop crashes.
RxJavaPlugins.onError(e); | ||
return; | ||
} | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The delay > 0
was always true because execTime > t
thus execTime - t == delay
is always > 0
.
* RxJavaPlugins. | ||
* @param transform the transform to drive an operator | ||
*/ | ||
public static void checkDoubleOnSubscribeCompletableToFlowable(Function<Completable, ? extends Publisher<?>> transform) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional double onSubscribe
call checks with operators going from Completable
to Flowable
|Observable
.
* @return true if the operation succeeded, false if the target field was not null. | ||
* @since 2.1.11 | ||
*/ | ||
public static boolean setOnce(AtomicReference<Subscription> field, Subscription s, long request) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we get rid of the other method that we used before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, there are many cases which require some other preparations before issuing the initial request.
this.d = s; | ||
actual.onSubscribe(this); | ||
public void onSubscribe(Disposable d) { | ||
if (DisposableHelper.validate(this.d, d)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validation was missing.
Updated with some additional cleanup and fix to |
Codecov Report
@@ Coverage Diff @@
## 2.x #5891 +/- ##
============================================
+ Coverage 97.61% 97.94% +0.33%
- Complexity 5945 5985 +40
============================================
Files 655 655
Lines 43860 43836 -24
Branches 6098 6072 -26
============================================
+ Hits 42812 42935 +123
+ Misses 327 277 -50
+ Partials 721 624 -97
Continue to review full report at Codecov.
|
Improve the coverage of various components, fix impossible paths and other fixes. See the comments attached to the code changes.