diff --git a/rxjava-contrib/rxjava-debug/build.gradle b/rxjava-contrib/rxjava-debug/build.gradle new file mode 100644 index 0000000000..13775cd88e --- /dev/null +++ b/rxjava-contrib/rxjava-debug/build.gradle @@ -0,0 +1,30 @@ +apply plugin: 'osgi' + +sourceCompatibility = JavaVersion.VERSION_1_6 +targetCompatibility = JavaVersion.VERSION_1_6 + +dependencies { + compile project(':rxjava-core') + testCompile project(":rxjava-core").sourceSets.test.output + provided 'junit:junit-dep:4.10' + provided 'org.mockito:mockito-core:1.8.5' +} + +javadoc { + options { + doclet = "org.benjchristensen.doclet.DocletExclude" + docletpath = [rootProject.file('./gradle/doclet-exclude.jar')] + stylesheetFile = rootProject.file('./gradle/javadocStyleSheet.css') + windowTitle = "RxJava Javadoc ${project.version}" + } + options.addStringOption('top').value = '

RxJava

' +} + +jar { + manifest { + name = 'rxjava-debug' + instruction 'Bundle-Vendor', 'Netflix' + instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava' + instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*' + } +} diff --git a/rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscriber.java b/rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscriber.java new file mode 100644 index 0000000000..5e17cb9151 --- /dev/null +++ b/rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscriber.java @@ -0,0 +1,68 @@ +package rx.operators; + +import rx.Observer; +import rx.Subscriber; +import rx.plugins.DebugNotification; +import rx.util.functions.Action1; +import rx.util.functions.Func1; + +public final class DebugSubscriber extends Subscriber { + private final Func1 onNextHook; + final Action1 events; + final Observer o; + Operator from = null; + Operator to = null; + + public DebugSubscriber( + Func1 onNextHook, + Action1 _events, + Subscriber _o, + Operator _out, + Operator _in) { + super(_o); + this.events = _events; + this.o = _o; + this.onNextHook = onNextHook; + this.from = _out; + this.to = _in; + this.add(new DebugSubscription(this)); + } + + @Override + public void onCompleted() { + events.call(DebugNotification.createOnCompleted(o, from, to)); + o.onCompleted(); + } + + @Override + public void onError(Throwable e) { + events.call(DebugNotification.createOnError(o, from, e, to)); + o.onError(e); + } + + @Override + public void onNext(T t) { + events.call(DebugNotification.createOnNext(o, from, t, to)); + o.onNext(onNextHook.call(t)); + } + + public Operator getFrom() { + return from; + } + + public void setFrom(Operator op) { + this.from = op; + } + + public Operator getTo() { + return to; + } + + public void setTo(Operator op) { + this.to = op; + } + + public Observer getActual() { + return o; + } +} \ No newline at end of file diff --git a/rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscription.java b/rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscription.java new file mode 100644 index 0000000000..e82960d5fe --- /dev/null +++ b/rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscription.java @@ -0,0 +1,23 @@ +package rx.operators; + +import rx.Subscription; +import rx.plugins.DebugNotification; + +final class DebugSubscription implements Subscription { + private final DebugSubscriber debugObserver; + + DebugSubscription(DebugSubscriber debugObserver) { + this.debugObserver = debugObserver; + } + + @Override + public void unsubscribe() { + debugObserver.events.call(DebugNotification. createUnsubscribe(debugObserver.o, debugObserver.from, debugObserver.to)); + debugObserver.unsubscribe(); + } + + @Override + public boolean isUnsubscribed() { + return debugObserver.isUnsubscribed(); + } +} \ No newline at end of file diff --git a/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugHook.java b/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugHook.java new file mode 100644 index 0000000000..f530de6e79 --- /dev/null +++ b/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugHook.java @@ -0,0 +1,99 @@ +package rx.plugins; + +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Subscriber; +import rx.Subscription; +import rx.operators.DebugSubscriber; +import rx.operators.Operator; +import rx.util.functions.Action1; +import rx.util.functions.Actions; +import rx.util.functions.Func1; +import rx.util.functions.Functions; + +/** + * Implements hooks into the {@link Observable} chain to emit a detailed account of all the events + * that happened. + * + * @author gscampbell + */ +public class DebugHook extends RxJavaObservableExecutionHook { + private final Func1 onNextHook; + private final Action1 events; + + /** + * Creates a new instance of the DebugHook RxJava plug-in that can be passed into + * {@link RxJavaPlugins} registerObservableExecutionHook(hook) method. + * + * @param onNextDataHook + * all of the onNext values are passed through this function to allow for + * manipulation of the values + * @param events + * This action is invoked as each notification is generated + */ + public DebugHook(Func1 onNextDataHook, Action1 events) { + this.onNextHook = onNextDataHook == null ? Functions.identity() : onNextDataHook; + this.events = events == null ? Actions.empty() : events; + } + + @Override + public OnSubscribe onSubscribeStart(Observable observableInstance, final OnSubscribe f) { + return new OnSubscribe() { + @Override + public void call(Subscriber o) { + events.call(DebugNotification.createSubscribe(o, f)); + f.call(wrapOutbound(null, o)); + } + }; + } + + @Override + public Subscription onSubscribeReturn(Observable observableInstance, Subscription subscription) { + return subscription; + } + + @Override + public OnSubscribe onCreate(final OnSubscribe f) { + return new OnSubscribe() { + @Override + public void call(Subscriber o) { + f.call(wrapInbound(null, o)); + } + }; + } + + @Override + public Operator onLift(final Operator bind) { + return new Operator() { + @Override + public Subscriber call(final Subscriber o) { + return wrapInbound(bind, bind.call(wrapOutbound(bind, o))); + } + }; + } + + @Override + public Subscription onAdd(Subscriber subscriber, Subscription s) { + return s; + } + + @SuppressWarnings("unchecked") + private Subscriber wrapOutbound(Operator bind, Subscriber o) { + if (o instanceof DebugSubscriber) { + if (bind != null) + ((DebugSubscriber) o).setFrom(bind); + return o; + } + return new DebugSubscriber(onNextHook, events, o, bind, null); + } + + @SuppressWarnings("unchecked") + private Subscriber wrapInbound(Operator bind, Subscriber o) { + if (o instanceof DebugSubscriber) { + if (bind != null) + ((DebugSubscriber) o).setTo(bind); + return o; + } + return new DebugSubscriber(onNextHook, events, o, null, bind); + } +} diff --git a/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugNotification.java b/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugNotification.java new file mode 100644 index 0000000000..af66b94217 --- /dev/null +++ b/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugNotification.java @@ -0,0 +1,109 @@ +package rx.plugins; + +import rx.Notification; +import rx.Observable.OnSubscribe; +import rx.Observer; +import rx.observers.SafeSubscriber; +import rx.operators.DebugSubscriber; +import rx.operators.Operator; +import rx.plugins.DebugNotification.Kind; + +public class DebugNotification { + public static enum Kind { + OnNext, OnError, OnCompleted, Subscribe, Unsubscribe + } + + private final OnSubscribe source; + private final Operator from; + private final Kind kind; + private final Notification notification; + private final Operator to; + private final long nanoTime; + private final long threadId; + private Observer o; + + public static DebugNotification createSubscribe(Observer o, OnSubscribe source) { + Operator to = null; + Operator from = null; + if (o instanceof DebugSubscriber) { + to = ((DebugSubscriber) o).getTo(); + from = ((DebugSubscriber) o).getFrom(); + o = ((DebugSubscriber) o).getActual(); + } + return new DebugNotification(o, from, Kind.Subscribe, null, to, source); + } + + public static DebugNotification createOnNext(Observer o, Operator from, T t, Operator to) { + return new DebugNotification(o, from, Kind.OnNext, Notification.createOnNext(t), to, null); + } + + public static DebugNotification createOnError(Observer o, Operator from, Throwable e, Operator to) { + return new DebugNotification(o, from, Kind.OnError, Notification. createOnError(e), to, null); + } + + public static DebugNotification createOnCompleted(Observer o, Operator from, Operator to) { + return new DebugNotification(o, from, Kind.OnCompleted, Notification. createOnCompleted(), to, null); + } + + public static DebugNotification createUnsubscribe(Observer o, Operator from, Operator to) { + return new DebugNotification(o, from, Kind.Unsubscribe, null, to, null); + } + + private DebugNotification(Observer o, Operator from, Kind kind, Notification notification, Operator to, OnSubscribe source) { + this.o = (o instanceof SafeSubscriber) ? ((SafeSubscriber) o).getActual() : o; + this.from = from; + this.kind = kind; + this.notification = notification; + this.to = to; + this.source = source; + this.nanoTime = System.nanoTime(); + this.threadId = Thread.currentThread().getId(); + } + + public Operator getFrom() { + return from; + } + + public Notification getNotification() { + return notification; + } + + public Operator getTo() { + return to; + } + + public long getNanoTime() { + return nanoTime; + } + + public long getThreadId() { + return threadId; + } + + public Kind getKind() { + return kind; + } + + @Override + public String toString() { + final StringBuilder s = new StringBuilder("{"); + s.append(" \"nano\": ").append(nanoTime); + s.append(", \"thread\": ").append(threadId); + s.append(", \"observer\": \"").append(o.getClass().getName()).append("@").append(Integer.toHexString(o.hashCode())).append("\""); + s.append(", \"type\": \"").append(kind).append("\""); + if (notification != null) { + if (notification.hasValue()) + s.append(", \"value\": \"").append(notification.getValue()).append("\""); + if (notification.hasThrowable()) + s.append(", \"exception\": \"").append(notification.getThrowable().getMessage().replace("\\", "\\\\").replace("\"", "\\\"")).append("\""); + } + if (source != null) + s.append(", \"source\": \"").append(source.getClass().getName()).append("@").append(Integer.toHexString(source.hashCode())).append("\""); + if (from != null) + s.append(", \"from\": \"").append(from.getClass().getName()).append("@").append(Integer.toHexString(from.hashCode())).append("\""); + if (to != null) + s.append(", \"to\": \"").append(to.getClass().getName()).append("@").append(Integer.toHexString(to.hashCode())).append("\""); + s.append("}"); + return s.toString(); + } +} diff --git a/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/NotificationEvent.java b/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/NotificationEvent.java new file mode 100644 index 0000000000..998c562529 --- /dev/null +++ b/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/NotificationEvent.java @@ -0,0 +1,104 @@ +package rx.plugins; + +import rx.Notification; +import rx.Observable.OnSubscribe; +import rx.Observer; +import rx.observers.SafeSubscriber; +import rx.operators.DebugSubscriber; +import rx.operators.Operator; + +public class NotificationEvent { + public static enum Kind { + OnNext, OnError, OnCompleted, Subscribe, Unsubscribe + } + + private final OnSubscribe source; + private final Operator from; + private final Kind kind; + private final Notification notification; + private final Operator to; + private final long nanoTime; + private final long threadId; + private Observer o; + + public static NotificationEvent createSubscribe(Observer o, OnSubscribe source) { + Operator to = null; + Operator from = null; + if (o instanceof DebugSubscriber) { + to = ((DebugSubscriber) o).getTo(); + from = ((DebugSubscriber) o).getFrom(); + o = ((DebugSubscriber) o).getActual(); + } + return new NotificationEvent(o, from, Kind.Subscribe, null, to, source); + } + + public static NotificationEvent createOnNext(Observer o, Operator from, T t, Operator to) { + return new NotificationEvent(o, from, Kind.OnNext, Notification.createOnNext(t), to, null); + } + + public static NotificationEvent createOnError(Observer o, Operator from, Throwable e, Operator to) { + return new NotificationEvent(o, from, Kind.OnError, Notification. createOnError(e), to, null); + } + + public static NotificationEvent createOnCompleted(Observer o, Operator from, Operator to) { + return new NotificationEvent(o, from, Kind.OnCompleted, Notification. createOnCompleted(), to, null); + } + + public static NotificationEvent createUnsubscribe(Observer o, Operator from, Operator to) { + return new NotificationEvent(o, from, Kind.Unsubscribe, null, to, null); + } + + private NotificationEvent(Observer o, Operator from, Kind kind, Notification notification, Operator to, OnSubscribe source) { + this.o = (o instanceof SafeSubscriber) ? ((SafeSubscriber) o).getActual() : o; + this.from = from; + this.kind = kind; + this.notification = notification; + this.to = to; + this.source = source; + this.nanoTime = System.nanoTime(); + this.threadId = Thread.currentThread().getId(); + } + + public Operator getFrom() { + return from; + } + + public Notification getNotification() { + return notification; + } + + public Operator getTo() { + return to; + } + + public long getNanoTime() { + return nanoTime; + } + + public long getThreadId() { + return threadId; + } + + @Override + public String toString() { + final StringBuilder s = new StringBuilder("{"); + s.append(" \"nano\": ").append(nanoTime); + s.append(", \"thread\": ").append(threadId); + s.append(", \"observer\": \"").append(o.getClass().getName()).append("@").append(Integer.toHexString(o.hashCode())).append("\""); + s.append(", \"type\": \"").append(kind).append("\""); + if (notification != null) { + if (notification.hasValue()) + s.append(", \"value\": \"").append(notification.getValue()).append("\""); + if (notification.hasThrowable()) + s.append(", \"exception\": \"").append(notification.getThrowable().getMessage().replace("\\", "\\\\").replace("\"", "\\\"")).append("\""); + } + if (source != null) + s.append(", \"source\": \"").append(source.getClass().getName()).append("@").append(Integer.toHexString(source.hashCode())).append("\""); + if (from != null) + s.append(", \"from\": \"").append(from.getClass().getName()).append("@").append(Integer.toHexString(from.hashCode())).append("\""); + if (to != null) + s.append(", \"to\": \"").append(to.getClass().getName()).append("@").append(Integer.toHexString(to.hashCode())).append("\""); + s.append("}"); + return s.toString(); + } +} diff --git a/rxjava-contrib/rxjava-debug/src/test/java/rx/debug/DebugHookTest.java b/rxjava-contrib/rxjava-debug/src/test/java/rx/debug/DebugHookTest.java new file mode 100644 index 0000000000..e90de3c11f --- /dev/null +++ b/rxjava-contrib/rxjava-debug/src/test/java/rx/debug/DebugHookTest.java @@ -0,0 +1,130 @@ +package rx.debug; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.Arrays; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import rx.Notification; +import rx.Observable; +import rx.Observer; +import rx.plugins.DebugHook; +import rx.plugins.DebugNotification; +import rx.plugins.PlugReset; +import rx.plugins.RxJavaPlugins; +import rx.util.functions.Action1; +import rx.util.functions.Func1; + +public class DebugHookTest { + @Before + @After + public void reset() { + PlugReset.reset(); + } + + @Test + @Ignore + public void testSimple() { + Action1 events = mock(Action1.class); + final DebugHook hook = new DebugHook(null, events); + RxJavaPlugins.getInstance().registerObservableExecutionHook(hook); + Observable.empty().subscribe(); + verify(events, times(1)).call(subscribe()); + verify(events, times(1)).call(onCompleted()); + } + + @Test + public void testOneOp() { + Action1 events = mock(Action1.class); + final DebugHook hook = new DebugHook(null, events); + RxJavaPlugins.getInstance().registerObservableExecutionHook(hook); + Observable.from(Arrays.asList(1, 3)).flatMap(new Func1>() { + @Override + public Observable call(Integer it) { + return Observable.from(Arrays.asList(it, it + 1)); + } + }).take(3).subscribe(new Observer() { + @Override + public void onCompleted() { + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onNext(Integer t) { + } + }); + verify(events, times(6)).call(subscribe()); + verify(events, times(4)).call(onNext(1)); + // one less because it originates from the inner observable sent to merge + verify(events, times(3)).call(onNext(2)); + verify(events, times(4)).call(onNext(3)); + // because the take unsubscribes + verify(events, never()).call(onNext(4)); + } + + private static DebugNotification onNext(final T value) { + return argThat(new BaseMatcher>() { + @Override + public boolean matches(Object item) { + if (item instanceof DebugNotification) { + DebugNotification dn = (DebugNotification) item; + Notification n = dn.getNotification(); + return n != null && n.hasValue() && n.getValue().equals(value); + } + return false; + } + + @Override + public void describeTo(Description description) { + description.appendText("OnNext " + value); + } + }); + } + + private static DebugNotification subscribe() { + return argThat(new BaseMatcher() { + @Override + public boolean matches(Object item) { + if (item instanceof DebugNotification) { + DebugNotification dn = (DebugNotification) item; + return dn.getKind() == DebugNotification.Kind.Subscribe; + } + return false; + } + + @Override + public void describeTo(Description description) { + description.appendText("Subscribe"); + } + }); + } + + private static DebugNotification onCompleted() { + return argThat(new BaseMatcher() { + @Override + public boolean matches(Object item) { + if (item instanceof DebugNotification) { + DebugNotification dn = (DebugNotification) item; + Notification n = dn.getNotification(); + return n != null && n.isOnCompleted(); + } + return false; + } + + @Override + public void describeTo(Description description) { + description.appendText("onCompleted"); + } + }); + } +} diff --git a/rxjava-contrib/rxjava-debug/src/test/java/rx/plugins/PlugReset.java b/rxjava-contrib/rxjava-debug/src/test/java/rx/plugins/PlugReset.java new file mode 100644 index 0000000000..0292d1b5e3 --- /dev/null +++ b/rxjava-contrib/rxjava-debug/src/test/java/rx/plugins/PlugReset.java @@ -0,0 +1,7 @@ +package rx.plugins; + +public class PlugReset { + public static void reset() { + RxJavaPlugins.getInstance().reset(); + } +} diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index eb0cd08ca8..02fc8a63fe 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -61,6 +61,7 @@ import rx.operators.OperationJoinPatterns; import rx.operators.OperationMaterialize; import rx.operators.OperationMergeDelayError; +import rx.operators.OperationMergeMaxConcurrent; import rx.operators.OperationMinMax; import rx.operators.OperationMulticast; import rx.operators.OperationOnErrorResumeNextViaFunction; @@ -93,13 +94,13 @@ import rx.operators.OperationToObservableFuture; import rx.operators.OperationUsing; import rx.operators.OperationWindow; +import rx.operators.Operator; import rx.operators.OperatorCast; import rx.operators.OperatorDoOnEach; import rx.operators.OperatorFilter; import rx.operators.OperatorGroupBy; import rx.operators.OperatorMap; import rx.operators.OperatorMerge; -import rx.operators.OperationMergeMaxConcurrent; import rx.operators.OperatorObserveOn; import rx.operators.OperatorParallel; import rx.operators.OperatorRepeat; @@ -254,10 +255,9 @@ public static interface OnSubscribeFunc extends Function { */ public Observable lift(final Func1, Subscriber> bind) { return new Observable(new OnSubscribe() { - @Override public void call(Subscriber o) { - subscribe(bind.call(o)); + subscribe(hook.onLift((Operator) bind).call(o)); } }); } diff --git a/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java b/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java index d757d85251..0ff56ac759 100644 --- a/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java +++ b/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java @@ -164,4 +164,7 @@ protected void _onError(Throwable e) { } } + public Subscriber getActual() { + return actual; + } } diff --git a/rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java b/rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java index c3aa2b44ad..da4500c494 100644 --- a/rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java +++ b/rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java @@ -20,6 +20,7 @@ import rx.Observable.OnSubscribeFunc; import rx.Subscriber; import rx.Subscription; +import rx.operators.Operator; import rx.util.functions.Func1; /** @@ -37,7 +38,6 @@ * * */ public abstract class RxJavaObservableExecutionHook { - /** * Invoked before {@link Observable#subscribe(rx.Subscriber)} is about to be executed. *

@@ -93,4 +93,15 @@ public Throwable onSubscribeError(Observable observableInstance return e; } + public OnSubscribe onCreate(OnSubscribe f) { + return f; + } + + public Operator onLift(final Operator bind) { + return bind; + } + + public Subscription onAdd(Subscriber subscriber, Subscription s) { + return s; + } } diff --git a/rxjava-core/src/main/java/rx/util/functions/Actions.java b/rxjava-core/src/main/java/rx/util/functions/Actions.java index 5d4d3656dc..7b233bb7fb 100644 --- a/rxjava-core/src/main/java/rx/util/functions/Actions.java +++ b/rxjava-core/src/main/java/rx/util/functions/Actions.java @@ -16,7 +16,6 @@ package rx.util.functions; import rx.Observer; -import rx.Subscriber; /** * Utility class for the Action interfaces. @@ -26,6 +25,58 @@ private Actions() { throw new IllegalStateException("No instances!"); } + public static final EmptyAction empty() { + return EMPTY_ACTION; + } + + private static final EmptyAction EMPTY_ACTION = new EmptyAction(); + + private static final class EmptyAction implements Action0, Action1, Action2, Action3, Action4, Action5, Action6, Action7, Action8, Action9, ActionN { + @Override + public void call() { + } + + @Override + public void call(Object t1) { + } + + @Override + public void call(Object t1, Object t2) { + } + + @Override + public void call(Object t1, Object t2, Object t3) { + } + + @Override + public void call(Object t1, Object t2, Object t3, Object t4) { + } + + @Override + public void call(Object t1, Object t2, Object t3, Object t4, Object t5) { + } + + @Override + public void call(Object t1, Object t2, Object t3, Object t4, Object t5, Object t6) { + } + + @Override + public void call(Object t1, Object t2, Object t3, Object t4, Object t5, Object t6, Object t7) { + } + + @Override + public void call(Object t1, Object t2, Object t3, Object t4, Object t5, Object t6, Object t7, Object t8) { + } + + @Override + public void call(Object t1, Object t2, Object t3, Object t4, Object t5, Object t6, Object t7, Object t8, Object t9) { + } + + @Override + public void call(Object... args) { + } + } + /** * Extracts a method reference to the observer's onNext method * in the form of an Action1. diff --git a/settings.gradle b/settings.gradle index 22dd94ec82..5be4134cef 100644 --- a/settings.gradle +++ b/settings.gradle @@ -9,5 +9,6 @@ include 'rxjava-core', \ 'rxjava-contrib:rxjava-android', \ 'rxjava-contrib:rxjava-apache-http', \ 'rxjava-contrib:rxjava-string', \ +'rxjava-contrib:rxjava-debug', \ 'rxjava-contrib:rxjava-async-util', \ 'rxjava-contrib:rxjava-computation-expressions'