Skip to content

Commit

Permalink
Merge pull request #3242 from akarnokd/OperatorUsing2x
Browse files Browse the repository at this point in the history
Operator using, some internal refactorings.
  • Loading branch information
akarnokd committed Aug 29, 2015
2 parents ff0ae2d + 69c666d commit 640e3c1
Show file tree
Hide file tree
Showing 20 changed files with 205 additions and 56 deletions.
10 changes: 10 additions & 0 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1753,4 +1753,14 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> combineLates
return combineLatest(combiner, false, bufferSize(), p1, p2, p3, p4, p5, p6, p7, p8, p9);
}

public static <T, D> Observable<T> using(Supplier<? extends D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> disposer) {
return using(resourceSupplier, sourceSupplier, disposer, true);
}

public static <T, D> Observable<T> using(Supplier<? extends D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> disposer, boolean eager) {
Objects.requireNonNull(resourceSupplier);
Objects.requireNonNull(sourceSupplier);
Objects.requireNonNull(disposer);
return create(new PublisherUsing<>(resourceSupplier, sourceSupplier, disposer, eager));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ public Subscriber<? super T> apply(Subscriber<? super U> t) {
try {
u = initialSupplier.get();
} catch (Throwable e) {
t.onSubscribe(EmptySubscription.INSTANCE);
t.onError(e);
EmptySubscription.error(e, t);
return CancelledSubscriber.INSTANCE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,12 @@ public Subscriber<? super T> apply(Subscriber<? super T> t) {
try {
coll = predicateSupplier.get();
} catch (Throwable e) {
t.onSubscribe(EmptySubscription.INSTANCE);
t.onError(e);
EmptySubscription.error(e, t);
return CancelledSubscriber.INSTANCE;
}

if (coll == null) {
t.onSubscribe(EmptySubscription.INSTANCE);
t.onError(new NullPointerException("predicateSupplier returned null"));
EmptySubscription.error(new NullPointerException("predicateSupplier returned null"), t);
return CancelledSubscriber.INSTANCE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,7 @@ public void subscribe(Subscriber<? super T> s) {
if (ACTUAL.compareAndSet(this, null, s)) {
s.onSubscribe(this);
} else {
s.onSubscribe(EmptySubscription.INSTANCE);
s.onError(new IllegalStateException("Only one Subscriber allowed!"));
EmptySubscription.error(new IllegalStateException("Only one Subscriber allowed!"), s);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ public static <T, U, R> Observable<R> multicastSelector(
co = connectableFactory.get();
observable = selector.apply(co);
} catch (Throwable e) {
child.onSubscribe(EmptySubscription.INSTANCE);
child.onError(e);
EmptySubscription.error(e, child);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ public void onError(Throwable t) {
frc.dispose();
// in case the other emits an onError before the main even sets a subscription
if (sus.compareAndSet(false, true)) {
serial.onSubscribe(EmptySubscription.INSTANCE);
serial.onError(t);
EmptySubscription.error(t, serial);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,21 @@ public void onSubscribe(Subscription s) {
public void onNext(U t) {
frc.dispose();
if (tus.compareAndSet(false, true)) {
serial.onSubscribe(EmptySubscription.INSTANCE);
serial.onComplete();
EmptySubscription.complete(serial);
}
}
@Override
public void onError(Throwable t) {
frc.dispose();
if (tus.compareAndSet(false, true)) {
serial.onSubscribe(EmptySubscription.INSTANCE);
serial.onError(t);
EmptySubscription.error(t, serial);
}
}
@Override
public void onComplete() {
frc.dispose();
if (tus.compareAndSet(false, true)) {
serial.onSubscribe(EmptySubscription.INSTANCE);
serial.onComplete();
EmptySubscription.complete(serial);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ public Subscriber<? super T> apply(Subscriber<? super U> t) {
try {
coll = collectionSupplier.get();
} catch (Throwable e) {
t.onSubscribe(EmptySubscription.INSTANCE);
t.onError(e);
EmptySubscription.error(e, t);
return CancelledSubscriber.INSTANCE;
}
return new ToListSubscriber<>(t, coll);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,7 @@ public boolean setOther(Subscription o) {

public void otherError(Throwable e) {
if (S.compareAndSet(this, null, CANCELLED)) {
actual.onSubscribe(EmptySubscription.INSTANCE);
actual.onError(e);
EmptySubscription.error(e, actual);
} else {
if (s != CANCELLED) {
cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ public void subscribe(Subscriber<? super T> s) {
}

if (count == 0) {
s.onSubscribe(EmptySubscription.INSTANCE);
s.onComplete();
EmptySubscription.complete(s);
return;
} else
if (count == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ public void subscribe(Subscriber<? super R> s) {
}

if (count == 0) {
s.onSubscribe(EmptySubscription.INSTANCE);
s.onComplete();
EmptySubscription.complete(s);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,12 @@ public void subscribe(Subscriber<? super T> s) {
try {
pub = supplier.get();
} catch (Throwable t) {
s.onSubscribe(EmptySubscription.INSTANCE);
s.onError(t);
EmptySubscription.error(t, s);
return;
}

if (pub == null) {
s.onSubscribe(EmptySubscription.INSTANCE);
s.onError(new NullPointerException());
EmptySubscription.error(new NullPointerException("null publisher supplied"), s);
return;
}
pub.subscribe(s);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public static <T> Publisher<T> empty() {

@Override
public void subscribe(Subscriber<? super Object> s) {
s.onSubscribe(EmptySubscription.INSTANCE);
s.onComplete();
EmptySubscription.complete(s);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,16 @@ public PublisherErrorSource(Supplier<? extends Throwable> errorSupplier) {
}
@Override
public void subscribe(Subscriber<? super T> s) {
s.onSubscribe(EmptySubscription.INSTANCE);
Throwable error;
try {
error = errorSupplier.get();
} catch (Throwable t) {
s.onError(t);
error = t;
return;
}
if (error != null) {
s.onError(error);
} else {
s.onError(new NullPointerException());
if (error == null) {
error = new NullPointerException();
}
EmptySubscription.error(error, s);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ public void subscribe(Subscriber<? super T> s) {
try {
it = source.iterator();
} catch (Throwable e) {
s.onSubscribe(EmptySubscription.INSTANCE);
s.onError(e);
EmptySubscription.error(e, s);
return;
}
s.onSubscribe(new IteratorSourceSubscription<>(it, s));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ public <U> Publisher<U> flatMap(Function<? super T, ? extends Publisher<? extend
try {
other = mapper.apply(value);
} catch (Throwable e) {
s.onSubscribe(EmptySubscription.INSTANCE);
s.onError(e);
EmptySubscription.error(e, s);
return;
}
other.subscribe(s);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,13 @@ public void subscribe(Subscriber<? super T> s) {
try {
it = stream.iterator();
} catch (Throwable e) {
s.onSubscribe(EmptySubscription.INSTANCE);
s.onError(e);
EmptySubscription.error(e, s);
return;
}
s.onSubscribe(new StreamSourceSubscription<>(stream, it, s));
return;
}
s.onSubscribe(EmptySubscription.INSTANCE);
s.onError(new IllegalStateException("Contents already consumed"));
EmptySubscription.error(new IllegalStateException("Contents already consumed"), s);
}

static final class StreamSourceSubscription<T> extends AtomicLong implements Subscription {
Expand Down
143 changes: 143 additions & 0 deletions src/main/java/io/reactivex/internal/operators/PublisherUsing.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators;

import java.util.function.*;

import org.reactivestreams.*;

import io.reactivex.internal.subscriptions.*;
import io.reactivex.plugins.RxJavaPlugins;

public final class PublisherUsing<T, D> implements Publisher<T> {
final Supplier<? extends D> resourceSupplier;
final Function<? super D, ? extends Publisher<? extends T>> sourceSupplier;
final Consumer<? super D> disposer;
final boolean eager;

public PublisherUsing(Supplier<? extends D> resourceSupplier,
Function<? super D, ? extends Publisher<? extends T>> sourceSupplier,
Consumer<? super D> disposer,
boolean eager) {
this.resourceSupplier = resourceSupplier;
this.sourceSupplier = sourceSupplier;
this.disposer = disposer;
this.eager = eager;
}

@Override
public void subscribe(Subscriber<? super T> s) {
D resource;

try {
resource = resourceSupplier.get();
} catch (Throwable e) {
EmptySubscription.error(e, s);
return;
}

Publisher<? extends T> source;
try {
source = sourceSupplier.apply(resource);
} catch (Throwable e) {
EmptySubscription.error(e, s);
return;
}

UsingSubscriber<T, D> us = new UsingSubscriber<>(s, resource, disposer, eager);

source.subscribe(us);
}

static final class UsingSubscriber<T, D> implements Subscriber<T>, Subscription {
final Subscriber<? super T> actual;
final D resource;
final Consumer<? super D> disposer;
final boolean eager;

Subscription s;

public UsingSubscriber(Subscriber<? super T> actual, D resource, Consumer<? super D> disposer, boolean eager) {
this.actual = actual;
this.resource = resource;
this.disposer = disposer;
this.eager = eager;
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validateSubscription(this.s, s)) {
return;
}
this.s = s;
actual.onSubscribe(this);
}

@Override
public void onNext(T t) {
actual.onNext(t);
}

@Override
public void onError(Throwable t) {
if (eager) {
try {
disposer.accept(resource);
} catch (Throwable e) {
t.addSuppressed(e);
}
actual.onError(t);
} else {
actual.onError(t);
disposeAfter();
}
}

@Override
public void onComplete() {
if (eager) {
try {
disposer.accept(resource);
} catch (Throwable e) {
actual.onError(e);
return;
}
actual.onComplete();
} else {
actual.onComplete();
disposeAfter();
}
}

@Override
public void request(long n) {
s.request(n);
}

@Override
public void cancel() {
s.cancel();
disposeAfter();
}

void disposeAfter() {
try {
disposer.accept(resource);
} catch (Throwable e) {
// can't call actual.onError unless it is serialized, which is expensive
RxJavaPlugins.onError(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ public void subscribe(Subscriber<? super R> s) {
}

if (count == 0) {
s.onSubscribe(EmptySubscription.INSTANCE);
s.onComplete();
EmptySubscription.complete(s);
return;
}

Expand Down
Loading

0 comments on commit 640e3c1

Please sign in to comment.