Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New operation Finally (issue #43) #196

Merged
merged 7 commits into from
Apr 3, 2013
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import rx.operators.OperationDefer;
import rx.operators.OperationDematerialize;
import rx.operators.OperationFilter;
import rx.operators.OperationFinally;
import rx.operators.OperationWhere;
import rx.operators.OperationMap;
import rx.operators.OperationMaterialize;
Expand Down Expand Up @@ -1182,6 +1183,18 @@ public static <T> Observable<T> concat(Observable<T>... source) {
return _create(OperationConcat.concat(source));
}

/**
* Emits the same objects as the given Observable, calling the given action
* when it calls <code>onComplete</code> or <code>onError</code>.
* @param source an observable
* @param action an action to be called when the source completes or errors.
* @return an Observable that emits the same objects, then calls the action.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212133(v=vs.103).aspx">MSDN: Observable.Finally Method</a>
*/
public static <T> Observable<T> finally0(Observable source, Action0 action) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we can't have finally as the method name as it is reserved, but finally0 seems awkward. Other than Action0 (because it has different arities) what is the reasoning behind using 0 at the end? I don't think finally will ever have different arities, and if it did it would just have overloads with more arguments.

We likely want to keep the prefix finally since that's what people will be looking for coming from other Rx implementations so perhaps something like finallyDo?

return _create(OperationFinally.finally0(source, action));
}

/**
* Groups the elements of an observable and selects the resulting elements by using a specified function.
*
Expand Down Expand Up @@ -2413,6 +2426,17 @@ public Observable<T> filter(Func1<T, Boolean> predicate) {
return filter(this, predicate);
}

/**
* Registers an action to be called when this observable calls
* <code>onComplete</code> or <code>onError</code>.
* @param action an action to be called when this observable completes or errors.
* @return an Observable that emits the same objects as this observable, then calls the action.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212133(v=vs.103).aspx">MSDN: Observable.Finally Method</a>
*/
public Observable<T> finally0(Action0 action) {
return _create(OperationFinally.finally0(this, action));
}

/**
* Filters an Observable by discarding any of its emissions that do not meet some test.
* <p>
Expand Down
132 changes: 132 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationFinally.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

import org.junit.Before;
import org.junit.Test;

import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.util.AtomicObservableSubscription;
import rx.util.functions.Action0;
import rx.util.functions.Func1;

public final class OperationFinally {

/**
* Call a given action when a sequence completes (with or without an
* exception). The returned observable is exactly as threadsafe as the
* source observable; in particular, any situation allowing the source to
* call onComplete or onError multiple times allows the returned observable
* to call the final action multiple times.
* <p/>
* Note that "finally" is a Java reserved word and cannot be an identifier,
* so we use "finally0".
*
* @param sequence An observable sequence of elements
* @param action An action to be taken when the sequence is complete or throws an exception
* @return An observable sequence with the same elements as the input.
* After the last element is consumed (and {@link Observer#onCompleted} has been called),
* or after an exception is thrown (and {@link Observer#onError} has been called),
* the given action will be called.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212133(v=vs.103).aspx">MSDN Observable.Finally method</a>
*/
public static <T> Func1<Observer<T>, Subscription> finally0(final Observable<T> sequence, final Action0 action) {
return new Func1<Observer<T>, Subscription>() {
@Override
public Subscription call(Observer<T> observer) {
return new Finally<T>(sequence, action).call(observer);
}
};
}

private static class Finally<T> implements Func1<Observer<T>, Subscription> {
private final Observable<T> sequence;
private final Action0 finalAction;
private Subscription s;

Finally(final Observable<T> sequence, Action0 finalAction) {
this.sequence = sequence;
this.finalAction = finalAction;
}

private final AtomicObservableSubscription Subscription = new AtomicObservableSubscription();

private final Subscription actualSubscription = new Subscription() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to create this object, that's what AtomicObservableSubscription.wrap already does. It checks for null and only invokes if it received a subscription (meaning it executed asynchronously).

@Override
public void unsubscribe() {
if (null != s)
s.unsubscribe();
}
};

public Subscription call(Observer<T> observer) {
s = sequence.subscribe(new FinallyObserver(observer));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Off the top of my head I think this would become:

return atomicSubscription.wrap(sequence.subscribe(new FinallyObserver(observer)));

We don't need to store the reference to s separately and have 2 layers of wrapping.

return Subscription.wrap(actualSubscription);
}

private class FinallyObserver implements Observer<T> {
private final Observer<T> observer;

FinallyObserver(Observer<T> observer) {
this.observer = observer;
}

@Override
public void onCompleted() {
observer.onCompleted();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If onCompleted throws an exception the action would not be executed.

finalAction.call();
}

@Override
public void onError(Exception e) {
observer.onError(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If onError throws an exception the action would not be executed.

finalAction.call();
}

@Override
public void onNext(T args) {
observer.onNext(args);
}
}
}

public static class UnitTest {
private Action0 aAction0;
private Observer<String> aObserver;
@Before
public void before() {
aAction0 = mock(Action0.class);
aObserver = mock(Observer.class);
}
private void checkActionCalled(Observable<String> input) {
Observable.create(finally0(input, aAction0)).subscribe(aObserver);
verify(aAction0, times(1)).call();
}
@Test
public void testFinallyCalledOnComplete() {
checkActionCalled(Observable.toObservable(new String[] {"1", "2", "3"}));
}
@Test
public void testFinallyCalledOnError() {
checkActionCalled(Observable.<String>error(new RuntimeException("expected")));
}
}
}