Skip to content

Commit

Permalink
Merge pull request #1182 from amazari/rxjava-javafx
Browse files Browse the repository at this point in the history
Merge pull request #1182
  • Loading branch information
benjchristensen committed May 9, 2014
2 parents cb60dd4 + 716de90 commit 6c336e0
Show file tree
Hide file tree
Showing 8 changed files with 603 additions and 0 deletions.
30 changes: 30 additions & 0 deletions rxjava-contrib/rxjava-javafx/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
apply plugin: 'osgi'

sourceCompatibility = JavaVersion.VERSION_1_6
targetCompatibility = JavaVersion.VERSION_1_6

dependencies {
compile project(':rxjava-core')
provided 'junit:junit-dep:4.10'
provided 'org.mockito:mockito-core:1.8.5'
}

javadoc {
options {
doclet = "org.benjchristensen.doclet.DocletExclude"
docletpath = [rootProject.file('./gradle/doclet-exclude.jar')]
stylesheetFile = rootProject.file('./gradle/javadocStyleSheet.css')
windowTitle = "RxJava Javadoc ${project.version}"
}
options.addStringOption('top').value = '<h2 class="title" style="padding-top:40px">RxJava</h2>'
}

jar {
manifest {
name = 'rxjava-javafx'
instruction 'Bundle-Vendor', 'Netflix'
instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava'
instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*'
instruction 'Fragment-Host', 'com.netflix.rxjava.core'
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package rx.javafx.sources;

import javafx.event.Event;
import javafx.event.EventHandler;
import javafx.event.EventType;
import javafx.scene.Node;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.schedulers.JavaFxScheduler;
import rx.subscriptions.JavaFxSubscriptions;

public class NodeEventSource {
/**
* @see rx.observables.JavaFxObservable#fromNodeEvents
*/
public static <T extends Event> Observable<T> fromNodeEvents(final Node source, final EventType<T> eventType) {

return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(final Subscriber<? super T> subscriber) {
final EventHandler<T> handler = new EventHandler<T>() {
@Override
public void handle(T t) {
subscriber.onNext(t);
}
};

source.addEventHandler(eventType, handler);

subscriber.add(JavaFxSubscriptions.unsubscribeInEventDispatchThread(new Action0() {
@Override
public void call() {
source.removeEventHandler(eventType, handler);
}
}));
}

}).subscribeOn(JavaFxScheduler.getInstance());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package rx.javafx.sources;

import javafx.beans.value.ChangeListener;
import javafx.beans.value.ObservableValue;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.subscriptions.JavaFxSubscriptions;

public class ObservableValueSource {

/**
* @see rx.observables.JavaFxObservable#fromObservableValue
*/
public static <T> Observable<T> fromObservableValue(final ObservableValue<T> fxObservable) {
return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(final Subscriber<? super T> subscriber) {
subscriber.onNext(fxObservable.getValue());

final ChangeListener<T> listener = new ChangeListener<T>() {
@Override
public void changed(final ObservableValue<? extends T> observableValue, final T prev, final T current) {
subscriber.onNext(current);
}
};

fxObservable.addListener(listener);

subscriber.add(JavaFxSubscriptions.unsubscribeInEventDispatchThread(new Action0() {
@Override
public void call() {
fxObservable.removeListener(listener);
}
}));

}
});
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.observables;


import javafx.beans.value.ObservableValue;
import javafx.event.Event;
import javafx.event.EventType;
import javafx.scene.Node;
import rx.Observable;
import rx.javafx.sources.NodeEventSource;
import rx.javafx.sources.ObservableValueSource;


public enum JavaFxObservable {
; // no instances


/**
* Creates an observable corresponding to javafx ui events.
*
* @param node The target of the UI events.
* @param eventType The type of the observed UI events
* @return An Observable of UI events, appropriately typed
*/
public static <T extends Event> Observable<T> fromNodeEvents(final Node node, final EventType<T> eventType) {
return NodeEventSource.fromNodeEvents(node, eventType);
}

/**
* Create an rx Observable from a javafx ObservableValue
*
* @param fxObservable the observed ObservableValue
* @param <T> the type of the observed value
* @return an Observable emitting values as the wrapped ObservableValue changes
*/
public static <T> Observable<T> fromObservableValue(final ObservableValue<T> fxObservable) {
return ObservableValueSource.fromObservableValue(fxObservable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.schedulers;

import javafx.animation.KeyFrame;
import javafx.animation.Timeline;
import javafx.application.Platform;
import javafx.event.ActionEvent;
import javafx.event.EventHandler;
import javafx.util.Duration;
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;

import java.util.concurrent.TimeUnit;

/**
* Executes work on the JavaFx UI thread.
* This scheduler should only be used with actions that execute quickly.
*/
public final class JavaFxScheduler extends Scheduler {
private static final JavaFxScheduler INSTANCE = new JavaFxScheduler();

/* package for unit test */JavaFxScheduler() {
}

public static JavaFxScheduler getInstance() {
return INSTANCE;
}

private static void assertThatTheDelayIsValidForTheJavaFxTimer(long delay) {
if (delay < 0 || delay > Integer.MAX_VALUE) {
throw new IllegalArgumentException(String.format("The JavaFx timer only accepts non-negative delays up to %d milliseconds.", Integer.MAX_VALUE));
}
}

@Override
public Worker createWorker() {
return new InnerJavaFxScheduler();
}

private static class InnerJavaFxScheduler extends Worker {

private final CompositeSubscription innerSubscription = new CompositeSubscription();

@Override
public void unsubscribe() {
innerSubscription.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
return innerSubscription.isUnsubscribed();
}

@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
long delay = unit.toMillis(delayTime);
assertThatTheDelayIsValidForTheJavaFxTimer(delay);
final BooleanSubscription s = BooleanSubscription.create();

final Timeline timeline = new Timeline(new KeyFrame(Duration.millis(delay), new EventHandler<ActionEvent>() {

@Override
public void handle(ActionEvent event) {
if (innerSubscription.isUnsubscribed() || s.isUnsubscribed()) {
return;
}
action.call();
innerSubscription.remove(s);
}
}));

timeline.setCycleCount(1);
timeline.play();

innerSubscription.add(s);

// wrap for returning so it also removes it from the 'innerSubscription'
return Subscriptions.create(new Action0() {

@Override
public void call() {
timeline.stop();
s.unsubscribe();
innerSubscription.remove(s);
}

});
}

@Override
public Subscription schedule(final Action0 action) {
final BooleanSubscription s = BooleanSubscription.create();
Platform.runLater(new Runnable() {
@Override
public void run() {
if (innerSubscription.isUnsubscribed() || s.isUnsubscribed()) {
return;
}
action.call();
innerSubscription.remove(s);
}
});

innerSubscription.add(s);
// wrap for returning so it also removes it from the 'innerSubscription'
return Subscriptions.create(new Action0() {

@Override
public void call() {
s.unsubscribe();
innerSubscription.remove(s);
}

});
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.subscriptions;


import javafx.application.Platform;
import rx.Scheduler.Worker;
import rx.Subscription;
import rx.functions.Action0;
import rx.schedulers.JavaFxScheduler;

public final class JavaFxSubscriptions {

private JavaFxSubscriptions() {
// no instance
}

/**
* Create an Subscription that always runs <code>unsubscribe</code> in the event dispatch thread.
*
* @param unsubscribe the action to be performed in the ui thread at un-subscription
* @return an Subscription that always runs <code>unsubscribe</code> in the event dispatch thread.
*/
public static Subscription unsubscribeInEventDispatchThread(final Action0 unsubscribe) {
return Subscriptions.create(new Action0() {
@Override
public void call() {
if (Platform.isFxApplicationThread()) {
unsubscribe.call();
} else {
final Worker inner = JavaFxScheduler.getInstance().createWorker();
inner.schedule(new Action0() {
@Override
public void call() {
unsubscribe.call();
inner.unsubscribe();
}
});
}
}
});
}
}
Loading

0 comments on commit 6c336e0

Please sign in to comment.