Skip to content

Commit

Permalink
Merge pull request #57 from protogenes/feature/nullable-observer
Browse files Browse the repository at this point in the history
add nullable Bindings
  • Loading branch information
thomasnield authored Nov 24, 2017
2 parents 8217e75 + dda401c commit 632904a
Show file tree
Hide file tree
Showing 5 changed files with 375 additions and 81 deletions.
44 changes: 27 additions & 17 deletions src/main/java/io/reactivex/rxjavafx/observers/BindingObserver.java
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2017 Netflix, Inc.
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -19,28 +19,32 @@
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.observables.ConnectableObservable;
import javafx.beans.InvalidationListener;
import javafx.beans.binding.Binding;
import javafx.beans.value.ChangeListener;
import javafx.beans.value.ObservableValue;
import javafx.collections.ObservableList;

final class BindingObserver<T, S> implements Observer<T>, ObservableValue<S>, Binding<S> {

final class BindingObserver<T> implements Observer<T>, ObservableValue<T>, Binding<T> {

private final Consumer<Throwable> onError;
private final Function<T, S> unmaskingFunction;
private final Consumer<Throwable> onError;
private final ConnectableObservable<T> obs;
private boolean connected = false;
private Disposable disposable;
private ExpressionHelper<T> helper;
private T value;
private Disposable disposable;
private ExpressionHelper<S> helper;
private S value;

BindingObserver(Consumer<Throwable> onError) {
BindingObserver(Function<T, S> unmaskingFunction, Consumer<Throwable> onError) {
this.unmaskingFunction = unmaskingFunction;
this.onError = onError;
this.obs = null;
}
BindingObserver(ConnectableObservable<T> obs, Consumer<Throwable> onError) {

BindingObserver(Function<T, S> unmaskingFunction, ConnectableObservable<T> obs, Consumer<Throwable> onError) {
this.unmaskingFunction = unmaskingFunction;
this.onError = onError;
this.obs = obs;
}
Expand All @@ -66,17 +70,23 @@ public void onError(Throwable e) {

@Override
public void onNext(T t) {
value = t;
fireValueChangedEvent();
try {
value = unmaskingFunction.apply(t);
fireValueChangedEvent();
} catch (Exception e) {
onError(e);
}
}

@Override
public T getValue() {
public S getValue() {
if (!connected && obs != null) {
obs.connect();
connected = true;
}
return value;
}

@Override
public boolean isValid() {
return true;
Expand Down Expand Up @@ -111,7 +121,7 @@ public void addListener(InvalidationListener listener) {
* {@inheritDoc}
*/
@Override
public void addListener(ChangeListener<? super T> listener) {
public void addListener(ChangeListener<? super S> listener) {
helper = ExpressionHelper.addListener(helper, this, listener);
}

Expand All @@ -127,13 +137,13 @@ public void removeListener(InvalidationListener listener) {
* {@inheritDoc}
*/
@Override
public void removeListener(ChangeListener<? super T> listener) {
public void removeListener(ChangeListener<? super S> listener) {
helper = ExpressionHelper.removeListener(helper, listener);
}

/**
* Notify the currently registered observers of a value change.
*
* <p>
* This implementation will ignore all adds and removes of observers that
* are done while a notification is processed. The changes take effect in
* the following call to fireValueChangedEvent.
Expand Down
66 changes: 38 additions & 28 deletions src/main/java/io/reactivex/rxjavafx/observers/BindingSubscriber.java
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2017 Netflix, Inc.
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -18,6 +18,7 @@
import com.sun.javafx.binding.ExpressionHelper;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import javafx.beans.InvalidationListener;
import javafx.beans.binding.Binding;
import javafx.beans.value.ChangeListener;
Expand All @@ -26,23 +27,32 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

final class BindingSubscriber<T, S> implements Subscriber<T>, ObservableValue<S>, Binding<S> {

final class BindingSubscriber<T> implements Subscriber<T>, ObservableValue<T>, Binding<T> {

private final Consumer<Throwable> onError;
private final ConnectableFlowable<T> flowable;
private final Function<T, S> unmaskingFunction;
private final Consumer<Throwable> onError;
private final ConnectableFlowable<T> obs;
private boolean connected = false;
private Subscription subscription;
private ExpressionHelper<T> helper;
private T value;
private Subscription subscription;
private ExpressionHelper<S> helper;
private S value;

BindingSubscriber(Consumer<Throwable> onError) {
this.flowable = null;
BindingSubscriber(Function<T, S> unmaskingFunction, Consumer<Throwable> onError) {
this.unmaskingFunction = unmaskingFunction;
this.onError = onError;
this.obs = null;
}
BindingSubscriber(ConnectableFlowable<T> flowable, Consumer<Throwable> onError) {
this.flowable = flowable;

BindingSubscriber(Function<T, S> unmaskingFunction, ConnectableFlowable<T> obs, Consumer<Throwable> onError) {
this.unmaskingFunction = unmaskingFunction;
this.onError = onError;
this.obs = obs;
}

@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
this.subscription.request(Long.MAX_VALUE);
}

@Override
Expand All @@ -59,25 +69,25 @@ public void onError(Throwable e) {
}
}

@Override
public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(Long.MAX_VALUE);
}

@Override
public void onNext(T t) {
value = t;
fireValueChangedEvent();
try {
value = unmaskingFunction.apply(t);
fireValueChangedEvent();
} catch (Exception e) {
onError(e);
}
}

@Override
public T getValue() {
if (!connected && flowable != null) {
flowable.connect();
public S getValue() {
if (!connected && obs != null) {
obs.connect();
connected = true;
}
return value;
}

@Override
public boolean isValid() {
return true;
Expand Down Expand Up @@ -112,7 +122,7 @@ public void addListener(InvalidationListener listener) {
* {@inheritDoc}
*/
@Override
public void addListener(ChangeListener<? super T> listener) {
public void addListener(ChangeListener<? super S> listener) {
helper = ExpressionHelper.addListener(helper, this, listener);
}

Expand All @@ -128,13 +138,13 @@ public void removeListener(InvalidationListener listener) {
* {@inheritDoc}
*/
@Override
public void removeListener(ChangeListener<? super T> listener) {
public void removeListener(ChangeListener<? super S> listener) {
helper = ExpressionHelper.removeListener(helper, listener);
}

/**
* Notify the currently registered observers of a value change.
*
* <p>
* This implementation will ignore all adds and removes of observers that
* are done while a notification is processed. The changes take effect in
* the following call to fireValueChangedEvent.
Expand Down
105 changes: 96 additions & 9 deletions src/main/java/io/reactivex/rxjavafx/observers/JavaFxObserver.java
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2017 Netflix, Inc.
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -18,42 +18,129 @@
import io.reactivex.Observable;
import io.reactivex.functions.Consumer;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.rxjavafx.observables.JavaFxObservable;
import javafx.beans.binding.Binding;
import javafx.beans.value.ObservableValue;

import java.util.Optional;

public enum JavaFxObserver {
;//no instances

/**
* Turns an Observable into an eager JavaFX Binding that subscribes immediately to the Observable. Calling the Binding's dispose() method will handle the unsubscription.
*/
public static <T> Binding<T> toBinding(Observable<T> obs) {
BindingObserver<T> bindingObserver = new BindingObserver<>(e -> {});
return toBinding(obs, JavaFxObserver::onError);
}

/**
* Turns an Observable into an eager JavaFX Binding that subscribes immediately to the Observable. Calling the Binding's dispose() method will handle the unsubscription.
*/
public static <T> Binding<T> toBinding(Observable<T> obs, Consumer<Throwable> onErrorAction) {
BindingObserver<T, T> bindingObserver = new BindingObserver<>(t -> t, onErrorAction);
obs.subscribe(bindingObserver);
return bindingObserver;
}

/**
* Turns an Observable into an eager JavaFX Binding that subscribes immediately to the Observable. Calling the Binding's dispose() method will handle the unsubscription.
* This variant unmasks a nullable value as in {@link JavaFxObservable#valuesOf(ObservableValue, Object)} and emits null when the sentinel is encountered.
*/
public static <T> Binding<T> toNullBinding(Observable<T> obs, T nullSentinel) {
return toNullBinding(obs, nullSentinel, JavaFxObserver::onError);
}

/**
* Turns an Observable into an eager JavaFX Binding that subscribes immediately to the Observable. Calling the Binding's dispose() method will handle the unsubscription.
* This variant unmasks a nullable value as in {@link JavaFxObservable#valuesOf(ObservableValue, Object)} and emits null when the sentinel is encountered.
*/
public static <T> Binding<T> toNullBinding(Observable<T> obs, T nullSentinel, Consumer<Throwable> onErrorAction) {
if (nullSentinel == null) {
throw new NullPointerException("The null value sentinel must not be null.");
}
BindingObserver<T, T> bindingObserver = new BindingObserver<>(t -> t == nullSentinel ? null : t, onErrorAction);
obs.subscribe(bindingObserver);
return bindingObserver;
}

/**
* Turns an Observable into an eager JavaFX Binding that subscribes immediately to the Observable. Calling the Binding's dispose() method will handle the unsubscription.
* This variant unmasks a nullable value as in {@link JavaFxObservable#nullableValuesOf(ObservableValue)} and emits null when the value is not present.
*/
public static <T> Binding<T> toBinding(Observable<T> obs, Consumer<Throwable> onErrorAction ) {
BindingObserver<T> bindingObserver = new BindingObserver<>(onErrorAction);
public static <T> Binding<T> toNullableBinding(Observable<Optional<T>> obs) {
return toNullableBinding(obs, JavaFxObserver::onError);
}

/**
* Turns an Observable into an eager JavaFX Binding that subscribes immediately to the Observable. Calling the Binding's dispose() method will handle the unsubscription.
* This variant unmasks a nullable value as in {@link JavaFxObservable#nullableValuesOf(ObservableValue)} and emits null when the value is not present.
*/
public static <T> Binding<T> toNullableBinding(Observable<Optional<T>> obs, Consumer<Throwable> onErrorAction) {
BindingObserver<Optional<T>, T> bindingObserver = new BindingObserver<>(o -> o.orElse(null), onErrorAction);
obs.subscribe(bindingObserver);
return bindingObserver;
}

/**
* Turns an Observable into an lazy JavaFX Binding that subscribes to the Observable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription.
*/
public static <T> Binding<T> toLazyBinding(Observable<T> obs) {
return toLazyBinding(obs, JavaFxObserver::onError);
}

/**
* Turns an Observable into an eager JavaFX Binding that subscribes to the Observable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription.
*/
public static <T> Binding<T> toLazyBinding(Observable<T> obs, Consumer<Throwable> onErrorAction) {
ConnectableObservable<T> published = obs.publish();
BindingObserver<T> bindingObserver = new BindingObserver<>(published, e -> {});
BindingObserver<T, T> bindingObserver = new BindingObserver<>(t -> t, published, onErrorAction);
published.subscribe(bindingObserver);
return bindingObserver;
}

/**
* Turns an Observable into an eager JavaFX Binding that subscribes to the Observable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription.
* This variant unmasks a nullable value as in {@link JavaFxObservable#valuesOf(ObservableValue, Object)} and emits null when the sentinel is encountered.
*/
public static <T> Binding<T> toLazyBinding(Observable<T> obs, Consumer<Throwable> onErrorAction ) {
public static <T> Binding<T> toLazyNullBinding(Observable<T> obs, T nullSentinel) {
return toLazyNullBinding(obs, nullSentinel, JavaFxObserver::onError);
}

/**
* Turns an Observable into an eager JavaFX Binding that subscribes to the Observable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription.
* This variant unmasks a nullable value as in {@link JavaFxObservable#valuesOf(ObservableValue, Object)} and emits null when the sentinel is encountered.
*/
public static <T> Binding<T> toLazyNullBinding(Observable<T> obs, T nullSentinel, Consumer<Throwable> onErrorAction) {
if (nullSentinel == null) {
throw new NullPointerException("The null value sentinel must not be null.");
}
ConnectableObservable<T> published = obs.publish();
BindingObserver<T> bindingObserver = new BindingObserver<>(published,onErrorAction);
BindingObserver<T, T> bindingObserver = new BindingObserver<>(t -> t == nullSentinel ? null : t, published, onErrorAction);
published.subscribe(bindingObserver);
return bindingObserver;
}

/**
* Turns an Observable into an lazy JavaFX Binding that subscribes to the Observable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription.
* This variant unmasks a nullable value as in {@link JavaFxObservable#nullableValuesOf(ObservableValue)} and emits null when the value is not present.
*/
public static <T> Binding<T> toLazyNullableBinding(Observable<Optional<T>> obs) {
return toLazyNullableBinding(obs, JavaFxObserver::onError);
}

/**
* Turns an Observable into an lazy JavaFX Binding that subscribes to the Observable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription.
* This variant unmasks a nullable value as in {@link JavaFxObservable#nullableValuesOf(ObservableValue)} and emits null when the value is not present.
*/
public static <T> Binding<T> toLazyNullableBinding(Observable<Optional<T>> obs, Consumer<Throwable> onErrorAction) {
ConnectableObservable<Optional<T>> published = obs.publish();
BindingObserver<Optional<T>, T> bindingObserver = new BindingObserver<>(o -> o.orElse(null), published, onErrorAction);
published.subscribe(bindingObserver);
return bindingObserver;
}

private static void onError(Throwable t) {
// nothing
}
}
Loading

0 comments on commit 632904a

Please sign in to comment.