Skip to content

Commit

Permalink
Merge pull request #852 from benjchristensen/rxjava-debug
Browse files Browse the repository at this point in the history
rxjava-debug
  • Loading branch information
benjchristensen committed Feb 11, 2014
2 parents 485c22a + a2162f2 commit 93541f8
Show file tree
Hide file tree
Showing 13 changed files with 641 additions and 5 deletions.
30 changes: 30 additions & 0 deletions rxjava-contrib/rxjava-debug/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
apply plugin: 'osgi'

sourceCompatibility = JavaVersion.VERSION_1_6
targetCompatibility = JavaVersion.VERSION_1_6

dependencies {
compile project(':rxjava-core')
testCompile project(":rxjava-core").sourceSets.test.output
provided 'junit:junit-dep:4.10'
provided 'org.mockito:mockito-core:1.8.5'
}

javadoc {
options {
doclet = "org.benjchristensen.doclet.DocletExclude"
docletpath = [rootProject.file('./gradle/doclet-exclude.jar')]
stylesheetFile = rootProject.file('./gradle/javadocStyleSheet.css')
windowTitle = "RxJava Javadoc ${project.version}"
}
options.addStringOption('top').value = '<h2 class="title" style="padding-top:40px">RxJava</h2>'
}

jar {
manifest {
name = 'rxjava-debug'
instruction 'Bundle-Vendor', 'Netflix'
instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava'
instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*'
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package rx.operators;

import rx.Observer;
import rx.Subscriber;
import rx.plugins.DebugNotification;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

public final class DebugSubscriber<T> extends Subscriber<T> {
private final Func1<T, T> onNextHook;
final Action1<DebugNotification> events;
final Observer<? super T> o;
Operator<T, ?> from = null;
Operator<?, T> to = null;

public DebugSubscriber(
Func1<T, T> onNextHook,
Action1<DebugNotification> _events,
Subscriber<? super T> _o,
Operator<T, ?> _out,
Operator<?, T> _in) {
super(_o);
this.events = _events;
this.o = _o;
this.onNextHook = onNextHook;
this.from = _out;
this.to = _in;
this.add(new DebugSubscription<T>(this));
}

@Override
public void onCompleted() {
events.call(DebugNotification.createOnCompleted(o, from, to));
o.onCompleted();
}

@Override
public void onError(Throwable e) {
events.call(DebugNotification.createOnError(o, from, e, to));
o.onError(e);
}

@Override
public void onNext(T t) {
events.call(DebugNotification.createOnNext(o, from, t, to));
o.onNext(onNextHook.call(t));
}

public Operator<T, ?> getFrom() {
return from;
}

public void setFrom(Operator<T, ?> op) {
this.from = op;
}

public Operator<?, T> getTo() {
return to;
}

public void setTo(Operator<?, T> op) {
this.to = op;
}

public Observer<? super T> getActual() {
return o;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package rx.operators;

import rx.Subscription;
import rx.plugins.DebugNotification;

final class DebugSubscription<T> implements Subscription {
private final DebugSubscriber<T> debugObserver;

DebugSubscription(DebugSubscriber<T> debugObserver) {
this.debugObserver = debugObserver;
}

@Override
public void unsubscribe() {
debugObserver.events.call(DebugNotification.<T> createUnsubscribe(debugObserver.o, debugObserver.from, debugObserver.to));
debugObserver.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
return debugObserver.isUnsubscribed();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package rx.plugins;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.Subscription;
import rx.operators.DebugSubscriber;
import rx.operators.Operator;
import rx.util.functions.Action1;
import rx.util.functions.Actions;
import rx.util.functions.Func1;
import rx.util.functions.Functions;

/**
* Implements hooks into the {@link Observable} chain to emit a detailed account of all the events
* that happened.
*
* @author gscampbell
*/
public class DebugHook extends RxJavaObservableExecutionHook {
private final Func1 onNextHook;
private final Action1<DebugNotification> events;

/**
* Creates a new instance of the DebugHook RxJava plug-in that can be passed into
* {@link RxJavaPlugins} registerObservableExecutionHook(hook) method.
*
* @param onNextDataHook
* all of the onNext values are passed through this function to allow for
* manipulation of the values
* @param events
* This action is invoked as each notification is generated
*/
public DebugHook(Func1 onNextDataHook, Action1<DebugNotification> events) {
this.onNextHook = onNextDataHook == null ? Functions.identity() : onNextDataHook;
this.events = events == null ? Actions.empty() : events;
}

@Override
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> f) {
return new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> o) {
events.call(DebugNotification.createSubscribe(o, f));
f.call(wrapOutbound(null, o));
}
};
}

@Override
public <T> Subscription onSubscribeReturn(Observable<? extends T> observableInstance, Subscription subscription) {
return subscription;
}

@Override
public <T> OnSubscribe<T> onCreate(final OnSubscribe<T> f) {
return new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> o) {
f.call(wrapInbound(null, o));
}
};
}

@Override
public <T, R> Operator<R, T> onLift(final Operator<R, T> bind) {
return new Operator<R, T>() {
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return wrapInbound(bind, bind.call(wrapOutbound(bind, o)));
}
};
}

@Override
public <T> Subscription onAdd(Subscriber<T> subscriber, Subscription s) {
return s;
}

@SuppressWarnings("unchecked")
private <R> Subscriber<? super R> wrapOutbound(Operator<R, ?> bind, Subscriber<? super R> o) {
if (o instanceof DebugSubscriber) {
if (bind != null)
((DebugSubscriber<R>) o).setFrom(bind);
return o;
}
return new DebugSubscriber<R>(onNextHook, events, o, bind, null);
}

@SuppressWarnings("unchecked")
private <T> Subscriber<? super T> wrapInbound(Operator<?, T> bind, Subscriber<? super T> o) {
if (o instanceof DebugSubscriber) {
if (bind != null)
((DebugSubscriber<T>) o).setTo(bind);
return o;
}
return new DebugSubscriber<T>(onNextHook, events, o, null, bind);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package rx.plugins;

import rx.Notification;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.observers.SafeSubscriber;
import rx.operators.DebugSubscriber;
import rx.operators.Operator;
import rx.plugins.DebugNotification.Kind;

public class DebugNotification<T> {
public static enum Kind {
OnNext, OnError, OnCompleted, Subscribe, Unsubscribe
}

private final OnSubscribe<T> source;
private final Operator<T, ?> from;
private final Kind kind;
private final Notification<T> notification;
private final Operator<?, T> to;
private final long nanoTime;
private final long threadId;
private Observer o;

public static <T> DebugNotification<T> createSubscribe(Observer<? super T> o, OnSubscribe<T> source) {
Operator<?, T> to = null;
Operator<T, ?> from = null;
if (o instanceof DebugSubscriber) {
to = ((DebugSubscriber<T>) o).getTo();
from = ((DebugSubscriber<T>) o).getFrom();
o = ((DebugSubscriber) o).getActual();
}
return new DebugNotification<T>(o, from, Kind.Subscribe, null, to, source);
}

public static <T> DebugNotification<T> createOnNext(Observer<? super T> o, Operator<T, ?> from, T t, Operator<?, T> to) {
return new DebugNotification<T>(o, from, Kind.OnNext, Notification.createOnNext(t), to, null);
}

public static <T> DebugNotification<T> createOnError(Observer<? super T> o, Operator<T, ?> from, Throwable e, Operator<?, T> to) {
return new DebugNotification<T>(o, from, Kind.OnError, Notification.<T> createOnError(e), to, null);
}

public static <T> DebugNotification<T> createOnCompleted(Observer<? super T> o, Operator<T, ?> from, Operator<?, T> to) {
return new DebugNotification<T>(o, from, Kind.OnCompleted, Notification.<T> createOnCompleted(), to, null);
}

public static <T> DebugNotification<T> createUnsubscribe(Observer<? super T> o, Operator<T, ?> from, Operator<?, T> to) {
return new DebugNotification<T>(o, from, Kind.Unsubscribe, null, to, null);
}

private DebugNotification(Observer o, Operator<T, ?> from, Kind kind, Notification<T> notification, Operator<?, T> to, OnSubscribe<T> source) {
this.o = (o instanceof SafeSubscriber) ? ((SafeSubscriber) o).getActual() : o;
this.from = from;
this.kind = kind;
this.notification = notification;
this.to = to;
this.source = source;
this.nanoTime = System.nanoTime();
this.threadId = Thread.currentThread().getId();
}

public Operator<T, ?> getFrom() {
return from;
}

public Notification<T> getNotification() {
return notification;
}

public Operator<?, T> getTo() {
return to;
}

public long getNanoTime() {
return nanoTime;
}

public long getThreadId() {
return threadId;
}

public Kind getKind() {
return kind;
}

@Override
public String toString() {
final StringBuilder s = new StringBuilder("{");
s.append(" \"nano\": ").append(nanoTime);
s.append(", \"thread\": ").append(threadId);
s.append(", \"observer\": \"").append(o.getClass().getName()).append("@").append(Integer.toHexString(o.hashCode())).append("\"");
s.append(", \"type\": \"").append(kind).append("\"");
if (notification != null) {
if (notification.hasValue())
s.append(", \"value\": \"").append(notification.getValue()).append("\"");
if (notification.hasThrowable())
s.append(", \"exception\": \"").append(notification.getThrowable().getMessage().replace("\\", "\\\\").replace("\"", "\\\"")).append("\"");
}
if (source != null)
s.append(", \"source\": \"").append(source.getClass().getName()).append("@").append(Integer.toHexString(source.hashCode())).append("\"");
if (from != null)
s.append(", \"from\": \"").append(from.getClass().getName()).append("@").append(Integer.toHexString(from.hashCode())).append("\"");
if (to != null)
s.append(", \"to\": \"").append(to.getClass().getName()).append("@").append(Integer.toHexString(to.hashCode())).append("\"");
s.append("}");
return s.toString();
}
}
Loading

0 comments on commit 93541f8

Please sign in to comment.