-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
2.x: WIP removes anonymous inner classes. #5174
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -280,16 +280,7 @@ public void onSubscribe(Subscription s) { | |
|
||
w.schedulePeriodically(this, timeskip, timeskip, unit); | ||
|
||
w.schedule(new Runnable() { | ||
@Override | ||
public void run() { | ||
synchronized (BufferSkipBoundedSubscriber.this) { | ||
buffers.remove(b); | ||
} | ||
|
||
fastPathOrderedEmitMax(b, false, w); | ||
} | ||
}, timespan, unit); | ||
w.schedule(new RemoveFromBuffer(b), timespan, unit); | ||
} | ||
|
||
@Override | ||
|
@@ -367,23 +358,31 @@ public void run() { | |
buffers.add(b); | ||
} | ||
|
||
w.schedule(new Runnable() { | ||
@Override | ||
public void run() { | ||
synchronized (BufferSkipBoundedSubscriber.this) { | ||
buffers.remove(b); | ||
} | ||
|
||
fastPathOrderedEmitMax(b, false, w); | ||
} | ||
}, timespan, unit); | ||
w.schedule(new RemoveFromBuffer(b), timespan, unit); | ||
} | ||
|
||
@Override | ||
public boolean accept(Subscriber<? super U> a, U v) { | ||
a.onNext(v); | ||
return true; | ||
} | ||
|
||
private final class RemoveFromBuffer implements Runnable { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No private |
||
private final U buffer; | ||
|
||
RemoveFromBuffer(U buffer) { | ||
this.buffer = buffer; | ||
} | ||
|
||
@Override | ||
public void run() { | ||
synchronized (BufferSkipBoundedSubscriber.this) { | ||
buffers.remove(buffer); | ||
} | ||
|
||
fastPathOrderedEmitMax(buffer, false, w); | ||
} | ||
} | ||
} | ||
|
||
static final class BufferExactBoundedSubscriber<T, U extends Collection<? super T>> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -132,12 +132,7 @@ public void subscribeActual(Subscriber<? super R> s) { | |
return; | ||
} | ||
if (n == 1) { | ||
((Publisher<T>)a[0]).subscribe(new MapSubscriber<T, R>(s, new Function<T, R>() { | ||
@Override | ||
public R apply(T t) throws Exception { | ||
return combiner.apply(new Object[] { t }); | ||
} | ||
})); | ||
((Publisher<T>)a[0]).subscribe(new MapSubscriber<T, R>(s, new SingletonArrayFunc())); | ||
return; | ||
} | ||
|
||
|
@@ -557,4 +552,11 @@ public void requestOne() { | |
|
||
} | ||
} | ||
|
||
private final class SingletonArrayFunc implements Function<T, R> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No private |
||
@Override | ||
public R apply(T t) throws Exception { | ||
return combiner.apply(new Object[] { t }); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,40 +78,17 @@ public void onSubscribe(Subscription s) { | |
|
||
@Override | ||
public void onNext(final T t) { | ||
w.schedule(new Runnable() { | ||
@Override | ||
public void run() { | ||
actual.onNext(t); | ||
} | ||
}, delay, unit); | ||
w.schedule(new OnNext(t), delay, unit); | ||
} | ||
|
||
@Override | ||
public void onError(final Throwable t) { | ||
w.schedule(new Runnable() { | ||
@Override | ||
public void run() { | ||
try { | ||
actual.onError(t); | ||
} finally { | ||
w.dispose(); | ||
} | ||
} | ||
}, delayError ? delay : 0, unit); | ||
w.schedule(new OnError(t), delayError ? delay : 0, unit); | ||
} | ||
|
||
@Override | ||
public void onComplete() { | ||
w.schedule(new Runnable() { | ||
@Override | ||
public void run() { | ||
try { | ||
actual.onComplete(); | ||
} finally { | ||
w.dispose(); | ||
} | ||
} | ||
}, delay, unit); | ||
w.schedule(new OnComplete(), delay, unit); | ||
} | ||
|
||
@Override | ||
|
@@ -125,5 +102,45 @@ public void cancel() { | |
w.dispose(); | ||
} | ||
|
||
private final class OnNext implements Runnable { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No private |
||
private final T t; | ||
|
||
OnNext(T t) { | ||
this.t = t; | ||
} | ||
|
||
@Override | ||
public void run() { | ||
actual.onNext(t); | ||
} | ||
} | ||
|
||
private class OnError implements Runnable { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No private but final There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here too. |
||
private final Throwable t; | ||
|
||
public OnError(Throwable t) { | ||
this.t = t; | ||
} | ||
|
||
@Override | ||
public void run() { | ||
try { | ||
actual.onError(t); | ||
} finally { | ||
w.dispose(); | ||
} | ||
} | ||
} | ||
|
||
private class OnComplete implements Runnable { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no private but final There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here too. |
||
@Override | ||
public void run() { | ||
try { | ||
actual.onComplete(); | ||
} finally { | ||
w.dispose(); | ||
} | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,71 +38,90 @@ public void subscribeActual(final Subscriber<? super T> child) { | |
final SubscriptionArbiter serial = new SubscriptionArbiter(); | ||
child.onSubscribe(serial); | ||
|
||
FlowableSubscriber<U> otherSubscriber = new FlowableSubscriber<U>() { | ||
boolean done; | ||
FlowableSubscriber<U> otherSubscriber = new DelaySubscriber(serial, child); | ||
|
||
other.subscribe(otherSubscriber); | ||
} | ||
|
||
private final class DelaySubscriber implements FlowableSubscriber<U> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no private There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Still private |
||
private final SubscriptionArbiter serial; | ||
private final Subscriber<? super T> child; | ||
boolean done; | ||
|
||
DelaySubscriber(SubscriptionArbiter serial, Subscriber<? super T> child) { | ||
this.serial = serial; | ||
this.child = child; | ||
} | ||
|
||
@Override | ||
public void onSubscribe(final Subscription s) { | ||
serial.setSubscription(new DelaySubscription(s)); | ||
s.request(Long.MAX_VALUE); | ||
} | ||
|
||
@Override | ||
public void onNext(U t) { | ||
onComplete(); | ||
} | ||
|
||
@Override | ||
public void onError(Throwable e) { | ||
if (done) { | ||
RxJavaPlugins.onError(e); | ||
return; | ||
} | ||
done = true; | ||
child.onError(e); | ||
} | ||
|
||
@Override | ||
public void onComplete() { | ||
if (done) { | ||
return; | ||
} | ||
done = true; | ||
|
||
main.subscribe(new OnCompleteSubscriber()); | ||
} | ||
|
||
private class DelaySubscription implements Subscription { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no private but final |
||
private final Subscription s; | ||
|
||
public DelaySubscription(Subscription s) { | ||
this.s = s; | ||
} | ||
|
||
@Override | ||
public void onSubscribe(final Subscription s) { | ||
serial.setSubscription(new Subscription() { | ||
@Override | ||
public void request(long n) { | ||
// ignored | ||
} | ||
|
||
@Override | ||
public void cancel() { | ||
s.cancel(); | ||
} | ||
}); | ||
s.request(Long.MAX_VALUE); | ||
public void request(long n) { | ||
// ignored | ||
} | ||
|
||
@Override | ||
public void onNext(U t) { | ||
onComplete(); | ||
public void cancel() { | ||
s.cancel(); | ||
} | ||
} | ||
|
||
private final class OnCompleteSubscriber implements FlowableSubscriber<T> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No private |
||
@Override | ||
public void onError(Throwable e) { | ||
if (done) { | ||
RxJavaPlugins.onError(e); | ||
return; | ||
} | ||
done = true; | ||
child.onError(e); | ||
public void onSubscribe(Subscription s) { | ||
serial.setSubscription(s); | ||
} | ||
|
||
@Override | ||
public void onComplete() { | ||
if (done) { | ||
return; | ||
} | ||
done = true; | ||
|
||
main.subscribe(new FlowableSubscriber<T>() { | ||
@Override | ||
public void onSubscribe(Subscription s) { | ||
serial.setSubscription(s); | ||
} | ||
|
||
@Override | ||
public void onNext(T t) { | ||
child.onNext(t); | ||
} | ||
|
||
@Override | ||
public void onError(Throwable t) { | ||
child.onError(t); | ||
} | ||
|
||
@Override | ||
public void onComplete() { | ||
child.onComplete(); | ||
} | ||
}); | ||
public void onNext(T t) { | ||
child.onNext(t); | ||
} | ||
}; | ||
|
||
other.subscribe(otherSubscriber); | ||
@Override | ||
public void onError(Throwable t) { | ||
child.onError(t); | ||
} | ||
|
||
@Override | ||
public void onComplete() { | ||
child.onComplete(); | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and no private