diff --git a/build.gradle b/build.gradle index c7378d469af..440cac0acc4 100644 --- a/build.gradle +++ b/build.gradle @@ -63,3 +63,7 @@ subprojects { } } +project(':rxjava-core') { + sourceSets.test.java.srcDir 'src/test/java' +} + diff --git a/gradle.properties b/gradle.properties index bf54d765c83..fa6cdb407b1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.10.2 +version=0.11.0-SNAPSHOT diff --git a/language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/rx_examples.clj b/language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/rx_examples.txt similarity index 100% rename from language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/rx_examples.clj rename to language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/rx_examples.txt diff --git a/language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/video_example.clj b/language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/video_example.txt similarity index 100% rename from language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/video_example.clj rename to language-adaptors/rxjava-clojure/src/examples/clojure/rx/lang/clojure/examples/video_example.txt diff --git a/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/DummyClojureClass.clj b/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/DummyClojureClass.clj new file mode 100644 index 00000000000..bcdebdc3bb1 --- /dev/null +++ b/language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/DummyClojureClass.clj @@ -0,0 +1,6 @@ +(ns rx.lang.clojure.DummyClojureClass) + +(defn hello-world [username] + (println (format "Hello, %s" username))) + +(hello-world "world") diff --git a/language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/GroovyActionWrapper.java b/language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/GroovyActionWrapper.java new file mode 100644 index 00000000000..24394e1db72 --- /dev/null +++ b/language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/GroovyActionWrapper.java @@ -0,0 +1,61 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.groovy; + +import groovy.lang.Closure; +import rx.util.functions.Action; +import rx.util.functions.Action0; +import rx.util.functions.Action1; +import rx.util.functions.Action2; +import rx.util.functions.Action3; + +/** + * Concrete wrapper that accepts a {@link Closure} and produces any needed Rx {@link Action}. + * + * @param + * @param + * @param + * @param + */ +public class GroovyActionWrapper implements Action, Action0, Action1, Action2, Action3 { + + private final Closure closure; + + public GroovyActionWrapper(Closure closure) { + this.closure = closure; + } + + @Override + public void call() { + closure.call(); + } + + @Override + public void call(T1 t1) { + closure.call(t1); + } + + @Override + public void call(T1 t1, T2 t2) { + closure.call(t1, t2); + } + + @Override + public void call(T1 t1, T2 t2, T3 t3) { + closure.call(t1, t2, t3); + } + +} \ No newline at end of file diff --git a/language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/GroovyFunctionWrapper.java b/language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/GroovyFunctionWrapper.java new file mode 100644 index 00000000000..6d4d34ef8e9 --- /dev/null +++ b/language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/GroovyFunctionWrapper.java @@ -0,0 +1,68 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.groovy; + +import groovy.lang.Closure; +import rx.util.functions.Func0; +import rx.util.functions.Func1; +import rx.util.functions.Func2; +import rx.util.functions.Func3; +import rx.util.functions.Func4; +import rx.util.functions.Function; + +/** + * Concrete wrapper that accepts a {@link Closure} and produces any needed Rx {@link Function}. + * + * @param + * @param + * @param + * @param + * @param + */ +public class GroovyFunctionWrapper implements Func0, Func1, Func2, Func3, Func4 { + + private final Closure closure; + + + public GroovyFunctionWrapper(Closure closure) { + this.closure = closure; + } + + @Override + public R call() { + return (R) closure.call(); + } + + @Override + public R call(T1 t1) { + return (R) closure.call(t1); + } + + @Override + public R call(T1 t1, T2 t2) { + return (R) closure.call(t1, t2); + } + + @Override + public R call(T1 t1, T2 t2, T3 t3) { + return (R) closure.call(t1, t2, t3); + } + + @Override + public R call(T1 t1, T2 t2, T3 t3, T4 t4) { + return (R) closure.call(t1, t2, t3, t4); + } +} \ No newline at end of file diff --git a/language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/RxGroovyExtensionModule.java b/language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/RxGroovyExtensionModule.java new file mode 100644 index 00000000000..91350cb56c1 --- /dev/null +++ b/language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/RxGroovyExtensionModule.java @@ -0,0 +1,179 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.groovy; + +import groovy.lang.Closure; +import groovy.lang.GroovySystem; +import groovy.lang.MetaMethod; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.codehaus.groovy.reflection.CachedClass; +import org.codehaus.groovy.reflection.ReflectionCache; +import org.codehaus.groovy.runtime.m12n.ExtensionModule; +import org.codehaus.groovy.runtime.metaclass.MetaClassRegistryImpl; + +import rx.Observable; +import rx.observables.BlockingObservable; +import rx.util.functions.Action; +import rx.util.functions.Function; + +/** + * ExtensionModule that adds extension methods to support groovy.lang.Closure + * anywhere rx.util.functions.Function/Action is used in classes defined in CLASS_TO_EXTEND. + * + * It is specifically intended for providing extension methods on Observable. + */ +public class RxGroovyExtensionModule extends ExtensionModule { + + @SuppressWarnings("rawtypes") + private final static Class[] CLASS_TO_EXTEND = new Class[] { Observable.class, BlockingObservable.class }; + + public RxGroovyExtensionModule() { + super("RxGroovyExtensionModule", "1.0"); + } + + /** + * Keeping this code around a little while as it was hard to figure out ... and I'm still messing with it while debugging. + * + * Once the rest of this ExtensionModule stuff is working I'll delete this method. + * + * This is used for manually initializing rather than going via the org.codehaus.groovy.runtime.ExtensionModule properties file. + */ + public static void initializeManuallyForTesting() { + System.out.println("initialize"); + MetaClassRegistryImpl mcRegistry = ((MetaClassRegistryImpl) GroovySystem.getMetaClassRegistry()); + // RxGroovyExtensionModule em = new RxGroovyExtensionModule(); + + Properties p = new Properties(); + p.setProperty("moduleFactory", "rx.lang.groovy.RxGroovyPropertiesModuleFactory"); + Map> metaMethods = new HashMap>(); + mcRegistry.registerExtensionModuleFromProperties(p, RxGroovyExtensionModule.class.getClassLoader(), metaMethods); + + for (ExtensionModule m : mcRegistry.getModuleRegistry().getModules()) { + System.out.println("Module: " + m.getName()); + } + + for (CachedClass cc : metaMethods.keySet()) { + System.out.println("Adding MetaMethods to CachedClass: " + cc); + cc.addNewMopMethods(metaMethods.get(cc)); + } + } + + @SuppressWarnings("rawtypes") + @Override + public List getMetaMethods() { + // System.out.println("**** RxGroovyExtensionModule => Initializing and returning MetaMethods."); + List methods = new ArrayList(); + + for (Class classToExtend : CLASS_TO_EXTEND) { + for (final Method m : classToExtend.getMethods()) { + for (Class c : m.getParameterTypes()) { + if (Function.class.isAssignableFrom(c)) { + methods.add(createMetaMethod(m)); + // break out of parameter-type loop + break; + } + } + } + } + + return methods; + } + + private MetaMethod createMetaMethod(final Method m) { + return new MetaMethod() { + + @Override + public int getModifiers() { + return m.getModifiers(); + } + + @Override + public String getName() { + return m.getName(); + } + + @SuppressWarnings("rawtypes") + @Override + public Class getReturnType() { + return m.getReturnType(); + } + + @Override + public CachedClass getDeclaringClass() { + return ReflectionCache.getCachedClass(m.getDeclaringClass()); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public Object invoke(Object object, Object[] arguments) { + // System.out.println("***** RxGroovyExtensionModule => invoked [" + getName() + "]: " + object + " args: " + arguments[0]); + try { + Object[] newArgs = new Object[arguments.length]; + for (int i = 0; i < arguments.length; i++) { + final Object o = arguments[i]; + if (o instanceof Closure) { + if (Action.class.isAssignableFrom(m.getParameterTypes()[i])) { + newArgs[i] = new GroovyActionWrapper((Closure) o); + } else { + newArgs[i] = new GroovyFunctionWrapper((Closure) o); + } + + } else { + newArgs[i] = o; + } + } + return m.invoke(object, newArgs); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } catch (IllegalArgumentException e) { + throw new RuntimeException(e); + } catch (InvocationTargetException e) { + if (e.getCause() instanceof RuntimeException) { + // re-throw whatever was thrown to us + throw (RuntimeException) e.getCause(); + } else { + throw new RuntimeException(e); + } + } + } + + @SuppressWarnings("rawtypes") + @Override + public CachedClass[] getParameterTypes() { + Class[] pts = m.getParameterTypes(); + CachedClass[] cc = new CachedClass[pts.length]; + for (int i = 0; i < pts.length; i++) { + if (Function.class.isAssignableFrom(pts[i])) { + // function type to be replaced by closure + cc[i] = ReflectionCache.getCachedClass(Closure.class); + } else { + // non-function type + cc[i] = ReflectionCache.getCachedClass(pts[i]); + } + } + return cc; + } + }; + } +} diff --git a/language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/RxGroovyPropertiesModuleFactory.java b/language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/RxGroovyPropertiesModuleFactory.java new file mode 100644 index 00000000000..149bca56213 --- /dev/null +++ b/language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/RxGroovyPropertiesModuleFactory.java @@ -0,0 +1,37 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.groovy; + +import java.util.Properties; + +import org.codehaus.groovy.runtime.m12n.ExtensionModule; +import org.codehaus.groovy.runtime.m12n.PropertiesModuleFactory; + +/** + * Factory for {@link RxGroovyExtensionModule} to add extension methods. + *

+ * This is loaded from /META-INF/services/org.codehaus.groovy.runtime.ExtensionModule + *

+ * The property is defined as: moduleFactory=rx.lang.groovy.RxGroovyPropertiesModuleFactory + */ +public class RxGroovyPropertiesModuleFactory extends PropertiesModuleFactory { + + @Override + public ExtensionModule newModule(Properties properties, ClassLoader classLoader) { + return new RxGroovyExtensionModule(); + } + +} diff --git a/language-adaptors/rxjava-groovy/src/main/resources/META-INF/services/org.codehaus.groovy.runtime.ExtensionModule b/language-adaptors/rxjava-groovy/src/main/resources/META-INF/services/org.codehaus.groovy.runtime.ExtensionModule new file mode 100644 index 00000000000..975068e80c7 --- /dev/null +++ b/language-adaptors/rxjava-groovy/src/main/resources/META-INF/services/org.codehaus.groovy.runtime.ExtensionModule @@ -0,0 +1 @@ +moduleFactory=rx.lang.groovy.RxGroovyPropertiesModuleFactory \ No newline at end of file diff --git a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy index dd36af10880..e0fdfdcfc33 100644 --- a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy +++ b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy @@ -58,7 +58,7 @@ def class ObservableTests { @Test public void testFilter() { - Observable.filter(Observable.from(1, 2, 3), {it >= 2}).subscribe({ result -> a.received(result)}); + Observable.from(1, 2, 3).filter({it >= 2}).subscribe({ result -> a.received(result)}); verify(a, times(0)).received(1); verify(a, times(1)).received(2); verify(a, times(1)).received(3); @@ -82,7 +82,7 @@ def class ObservableTests { @Test public void testMap2() { - Observable.map(Observable.from(1, 2, 3), {'hello_' + it}).subscribe({ result -> a.received(result)}); + Observable.from(1, 2, 3).map({'hello_' + it}).subscribe({ result -> a.received(result)}); verify(a, times(1)).received("hello_" + 1); verify(a, times(1)).received("hello_" + 2); verify(a, times(1)).received("hello_" + 3); @@ -90,7 +90,7 @@ def class ObservableTests { @Test public void testMaterialize() { - Observable.materialize(Observable.from(1, 2, 3)).subscribe({ result -> a.received(result)}); + Observable.from(1, 2, 3).materialize().subscribe({ result -> a.received(result)}); // we expect 4 onNext calls: 3 for 1, 2, 3 ObservableNotification.OnNext and 1 for ObservableNotification.OnCompleted verify(a, times(4)).received(any(Notification.class)); verify(a, times(0)).error(any(Exception.class)); @@ -162,7 +162,7 @@ def class ObservableTests { @Test public void testSkipTake() { - Observable.skip(Observable.from(1, 2, 3), 1).take(1).subscribe({ result -> a.received(result)}); + Observable.from(1, 2, 3).skip(1).take(1).subscribe({ result -> a.received(result)}); verify(a, times(0)).received(1); verify(a, times(1)).received(2); verify(a, times(0)).received(3); @@ -170,7 +170,7 @@ def class ObservableTests { @Test public void testSkip() { - Observable.skip(Observable.from(1, 2, 3), 2).subscribe({ result -> a.received(result)}); + Observable.from(1, 2, 3).skip(2).subscribe({ result -> a.received(result)}); verify(a, times(0)).received(1); verify(a, times(0)).received(2); verify(a, times(1)).received(3); @@ -178,7 +178,7 @@ def class ObservableTests { @Test public void testTake() { - Observable.take(Observable.from(1, 2, 3), 2).subscribe({ result -> a.received(result)}); + Observable.from(1, 2, 3).take(2).subscribe({ result -> a.received(result)}); verify(a, times(1)).received(1); verify(a, times(1)).received(2); verify(a, times(0)).received(3); @@ -192,7 +192,7 @@ def class ObservableTests { @Test public void testTakeWhileViaGroovy() { - Observable.takeWhile(Observable.from(1, 2, 3), { x -> x < 3}).subscribe({ result -> a.received(result)}); + Observable.from(1, 2, 3).takeWhile( { x -> x < 3}).subscribe({ result -> a.received(result)}); verify(a, times(1)).received(1); verify(a, times(1)).received(2); verify(a, times(0)).received(3); @@ -200,7 +200,7 @@ def class ObservableTests { @Test public void testTakeWhileWithIndexViaGroovy() { - Observable.takeWhileWithIndex(Observable.from(1, 2, 3), { x, i -> i < 2}).subscribe({ result -> a.received(result)}); + Observable.from(1, 2, 3).takeWhileWithIndex({ x, i -> i < 2}).subscribe({ result -> a.received(result)}); verify(a, times(1)).received(1); verify(a, times(1)).received(2); verify(a, times(0)).received(3); @@ -212,24 +212,12 @@ def class ObservableTests { verify(a, times(1)).received(Arrays.asList(1, 2, 3, 4, 5)); } - @Test - public void testToSortedListStatic() { - Observable.toSortedList(Observable.from(1, 3, 2, 5, 4)).subscribe({ result -> a.received(result)}); - verify(a, times(1)).received(Arrays.asList(1, 2, 3, 4, 5)); - } - @Test public void testToSortedListWithFunction() { new TestFactory().getNumbers().toSortedList({a, b -> a - b}).subscribe({ result -> a.received(result)}); verify(a, times(1)).received(Arrays.asList(1, 2, 3, 4, 5)); } - @Test - public void testToSortedListWithFunctionStatic() { - Observable.toSortedList(Observable.from(1, 3, 2, 5, 4), {a, b -> a - b}).subscribe({ result -> a.received(result)}); - verify(a, times(1)).received(Arrays.asList(1, 2, 3, 4, 5)); - } - @Test public void testForEach() { Observable.create(new AsyncObservable()).toBlockingObservable().forEach({ result -> a.received(result)}); diff --git a/rxjava-contrib/rxjava-swing/src/main/java/rx/concurrency/SwingScheduler.java b/rxjava-contrib/rxjava-swing/src/main/java/rx/concurrency/SwingScheduler.java index ced759f6f58..2ce298c5903 100644 --- a/rxjava-contrib/rxjava-swing/src/main/java/rx/concurrency/SwingScheduler.java +++ b/rxjava-contrib/rxjava-swing/src/main/java/rx/concurrency/SwingScheduler.java @@ -38,7 +38,6 @@ import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; -import rx.util.functions.Func0; import rx.util.functions.Func2; /** @@ -187,14 +186,12 @@ public void testPeriodicScheduling() throws Exception { final CountDownLatch latch = new CountDownLatch(4); final Action0 innerAction = mock(Action0.class); - final Action0 unsubscribe = mock(Action0.class); - final Func0 action = new Func0() { + final Action0 action = new Action0() { @Override - public Subscription call() { + public void call() { try { innerAction.call(); assertTrue(SwingUtilities.isEventDispatchThread()); - return Subscriptions.create(unsubscribe); } finally { latch.countDown(); } @@ -210,7 +207,6 @@ public Subscription call() { sub.unsubscribe(); waitForEmptyEventQueue(); verify(innerAction, times(4)).call(); - verify(unsubscribe, times(4)).call(); } @Test diff --git a/rxjava-contrib/rxjava-swing/src/main/java/rx/observables/SwingObservable.java b/rxjava-contrib/rxjava-swing/src/main/java/rx/observables/SwingObservable.java index 174c529d2df..b2806381919 100644 --- a/rxjava-contrib/rxjava-swing/src/main/java/rx/observables/SwingObservable.java +++ b/rxjava-contrib/rxjava-swing/src/main/java/rx/observables/SwingObservable.java @@ -26,7 +26,6 @@ import javax.swing.AbstractButton; import rx.Observable; -import static rx.Observable.filter; import rx.swing.sources.AbstractButtonSource; import rx.swing.sources.ComponentEventSource; import rx.swing.sources.KeyEventSource; @@ -68,7 +67,7 @@ public static Observable fromKeyEvents(Component component) { * @return Observable of key events. */ public static Observable fromKeyEvents(Component component, final Set keyCodes) { - return filter(fromKeyEvents(component), new Func1() { + return fromKeyEvents(component).filter(new Func1() { @Override public Boolean call(KeyEvent event) { return keyCodes.contains(event.getKeyCode()); diff --git a/rxjava-contrib/rxjava-swing/src/main/java/rx/swing/sources/KeyEventSource.java b/rxjava-contrib/rxjava-swing/src/main/java/rx/swing/sources/KeyEventSource.java index 3716b599f96..291e0202aa1 100644 --- a/rxjava-contrib/rxjava-swing/src/main/java/rx/swing/sources/KeyEventSource.java +++ b/rxjava-contrib/rxjava-swing/src/main/java/rx/swing/sources/KeyEventSource.java @@ -85,7 +85,7 @@ public void call() { * @see SwingObservable.fromKeyEvents(Component, Set) */ public static Observable> currentlyPressedKeysOf(Component component) { - return Observable.>scan(fromKeyEventsOf(component), new HashSet(), new Func2, KeyEvent, Set>() { + return fromKeyEventsOf(component).>scan(new HashSet(), new Func2, KeyEvent, Set>() { @Override public Set call(Set pressedKeys, KeyEvent event) { Set afterEvent = new HashSet(pressedKeys); diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index ae3042ba1bb..2b9ba46fcbd 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,33 +15,18 @@ */ package rx; -import static org.junit.Assert.*; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; +import rx.concurrency.Schedulers; import rx.observables.BlockingObservable; import rx.observables.ConnectableObservable; import rx.observables.GroupedObservable; -import rx.operators.OperationOnExceptionResumeNextViaObservable; -import rx.operators.SafeObservableSubscription; -import rx.operators.SafeObserver; import rx.operators.OperationAll; import rx.operators.OperationBuffer; import rx.operators.OperationCache; @@ -61,6 +46,7 @@ import rx.operators.OperationOnErrorResumeNextViaFunction; import rx.operators.OperationOnErrorResumeNextViaObservable; import rx.operators.OperationOnErrorReturn; +import rx.operators.OperationOnExceptionResumeNextViaObservable; import rx.operators.OperationSample; import rx.operators.OperationScan; import rx.operators.OperationSkip; @@ -76,15 +62,15 @@ import rx.operators.OperationToObservableIterable; import rx.operators.OperationToObservableList; import rx.operators.OperationToObservableSortedList; -import rx.operators.OperationWhere; import rx.operators.OperationZip; +import rx.operators.SafeObservableSubscription; +import rx.operators.SafeObserver; import rx.plugins.RxJavaErrorHandler; import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; import rx.subjects.PublishSubject; import rx.subjects.ReplaySubject; import rx.subjects.Subject; -import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; import rx.util.BufferClosing; import rx.util.BufferOpening; @@ -115,12 +101,12 @@ *

* For more information see the RxJava * Wiki - * + * * @param */ public class Observable { -//TODO use a consistent parameter naming scheme (for example: for all operators that modify a source Observable, the parameter representing that source Observable should have the same name, e.g. "source" -- currently such parameters are named any of "sequence", "that", "source", "items", or "observable") + //TODO use a consistent parameter naming scheme (for example: for all operators that modify a source Observable, the parameter representing that source Observable should have the same name, e.g. "source" -- currently such parameters are named any of "sequence", "that", "source", "items", or "observable") private final static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); @@ -131,7 +117,7 @@ public class Observable { *

* NOTE: Use {@link #create(Func1)} to create an Observable instead of this method unless you * specifically have a need for inheritance. - * + * * @param onSubscribe * {@link Func1} to be executed when {@link #subscribe(Observer)} is called. */ @@ -147,16 +133,14 @@ protected Observable() { /** * An {@link Observer} must call an Observable's {@code subscribe} method in order to * receive items and notifications from the Observable. - * + * *

A typical implementation of {@code subscribe} does the following: *

- * It stores a reference to the Observer in a collection object, such as a - * {@code List} object. + * It stores a reference to the Observer in a collection object, such as a {@code List} object. *

* It returns a reference to the {@link Subscription} interface. This enables Observers to * unsubscribe, that is, to stop receiving items and notifications before the Observable stops - * sending them, which also invokes the Observer's {@link Observer#onCompleted onCompleted} - * method. + * sending them, which also invokes the Observer's {@link Observer#onCompleted onCompleted} method. *

* An Observable<T> instance is responsible for accepting all subscriptions * and notifying all Observers. Unless the documentation for a particular @@ -165,13 +149,13 @@ protected Observable() { *

* For more information see the * RxJava Wiki - * - * @param observer the observer + * + * @param observer + * the observer * @return a {@link Subscription} reference with which the {@link Observer} can stop receiving items * before the Observable has finished sending them * @throws IllegalArgumentException - * if the {@link Observer} provided as the argument to {@code subscribe()} is - * {@code null} + * if the {@link Observer} provided as the argument to {@code subscribe()} is {@code null} */ public Subscription subscribe(Observer observer) { // allow the hook to intercept and/or decorate @@ -226,26 +210,24 @@ public Subscription subscribe(Observer observer) { /** * An {@link Observer} must call an Observable's {@code subscribe} method in order to * receive items and notifications from the Observable. - * + * *

A typical implementation of {@code subscribe} does the following: *

- * It stores a reference to the Observer in a collection object, such as a - * {@code List} object. + * It stores a reference to the Observer in a collection object, such as a {@code List} object. *

* It returns a reference to the {@link Subscription} interface. This enables Observers to * unsubscribe, that is, to stop receiving items and notifications before the Observable stops - * sending them, which also invokes the Observer's {@link Observer#onCompleted onCompleted} - * method. + * sending them, which also invokes the Observer's {@link Observer#onCompleted onCompleted} method. *

* An {@code Observable} instance is responsible for accepting all subscriptions - * and notifying all Observers. Unless the documentation for a particular - * {@code Observable} implementation indicates otherwise, Observers should make no + * and notifying all Observers. Unless the documentation for a particular {@code Observable} implementation indicates otherwise, Observers should make no * assumptions about the order in which multiple Observers will receive their notifications. *

* For more information see the * RxJava Wiki - * - * @param observer the observer + * + * @param observer + * the observer * @param scheduler * the {@link Scheduler} on which Observers subscribe to the Observable * @return a {@link Subscription} reference with which Observers can stop receiving items and @@ -281,13 +263,13 @@ public Subscription subscribe(final Map callbacks) { /** * Wrapping since raw functions provided by the user are being invoked. - * + * * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ return protectivelyWrapAndSubscribe(new Observer() { @Override - public void onCompleted() { + public void onCompleted() { Object onComplete = callbacks.get("onCompleted"); if (onComplete != null) { Functions.from(onComplete).call(); @@ -306,11 +288,11 @@ public void onError(Throwable e) { } @Override - public void onNext(Object args) { + public void onNext(Object args) { onNext.call(args); } - }); + }); } public Subscription subscribe(final Map callbacks, Scheduler scheduler) { @@ -324,7 +306,7 @@ public Subscription subscribe(final Action1 onNext) { /** * Wrapping since raw functions provided by the user are being invoked. - * + * * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ return protectivelyWrapAndSubscribe(new Observer() { @@ -362,7 +344,7 @@ public Subscription subscribe(final Action1 onNext, final Action1 /** * Wrapping since raw functions provided by the user are being invoked. - * + * * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ return protectivelyWrapAndSubscribe(new Observer() { @@ -403,7 +385,7 @@ public Subscription subscribe(final Action1 onNext, final Action1 /** * Wrapping since raw functions provided by the user are being invoked. - * + * * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ return protectivelyWrapAndSubscribe(new Observer() { @@ -434,7 +416,7 @@ public Subscription subscribe(final Action1 onNext, final Action1 /** * Returns a {@link ConnectableObservable} that upon connection causes the source Observable to * push results into the specified subject. - * + * * @param subject * the {@link Subject} for the {@link ConnectableObservable} to push source items * into @@ -444,12 +426,12 @@ public Subscription subscribe(final Action1 onNext, final Action1 * push results into the specified {@link Subject} */ public ConnectableObservable multicast(Subject subject) { - return multicast(this, subject); + return OperationMulticast.multicast(this, subject); } /** * Allow the {@link RxJavaErrorHandler} to receive the exception from onError. - * + * * @param e */ private void handleError(Throwable e) { @@ -459,9 +441,9 @@ private void handleError(Throwable e) { /** * An Observable that never sends any information to an {@link Observer}. - * + * * This Observable is useful primarily for testing purposes. - * + * * @param * the type of item emitted by the Observable */ @@ -479,9 +461,8 @@ public Subscription call(Observer t1) { } /** - * an Observable that invokes {@link Observer#onError onError} when the {@link Observer} - * subscribes to it. - * + * an Observable that invokes {@link Observer#onError onError} when the {@link Observer} subscribes to it. + * * @param * the type of item emitted by the Observable */ @@ -492,7 +473,7 @@ public ThrowObservable(final Throwable exception) { /** * Accepts an {@link Observer} and calls its {@link Observer#onError onError} method. - * + * * @param observer * an {@link Observer} of this Observable * @return a reference to the subscription @@ -509,229 +490,12 @@ public Subscription call(Observer observer) { } /** - * Creates an Observable which produces buffers of collected values. This Observable produces connected - * non-overlapping buffers. The current buffer is emitted and replaced with a new buffer when the - * Observable produced by the specified {@link Func0} produces a {@link BufferClosing} object. The - * {@link Func0} will then be used to create a new Observable to listen for the end of the next buffer. - * - * @param source - * The source {@link Observable} which produces values. - * @param bufferClosingSelector - * The {@link Func0} which is used to produce an {@link Observable} for every buffer created. - * When this {@link Observable} produces a {@link BufferClosing} object, the associated buffer - * is emitted and replaced with a new one. - * @return - * An {@link Observable} which produces connected non-overlapping buffers, which are emitted - * when the current {@link Observable} created with the {@link Func0} argument produces a - * {@link BufferClosing} object. - */ - public static Observable> buffer(Observable source, Func0> bufferClosingSelector) { - return create(OperationBuffer.buffer(source, bufferClosingSelector)); - } - - /** - * Creates an Observable which produces buffers of collected values. This Observable produces buffers. - * Buffers are created when the specified "bufferOpenings" Observable produces a {@link BufferOpening} object. - * Additionally the {@link Func0} argument is used to create an Observable which produces {@link BufferClosing} - * objects. When this Observable produces such an object, the associated buffer is emitted. - * - * @param source - * The source {@link Observable} which produces values. - * @param bufferOpenings - * The {@link Observable} which when it produces a {@link BufferOpening} object, will cause - * another buffer to be created. - * @param bufferClosingSelector - * The {@link Func0} which is used to produce an {@link Observable} for every buffer created. - * When this {@link Observable} produces a {@link BufferClosing} object, the associated buffer - * is emitted. - * @return - * An {@link Observable} which produces buffers which are created and emitted when the specified - * {@link Observable}s publish certain objects. - */ - public static Observable> buffer(Observable source, Observable bufferOpenings, Func1> bufferClosingSelector) { - return create(OperationBuffer.buffer(source, bufferOpenings, bufferClosingSelector)); - } - - /** - * Creates an Observable which produces buffers of collected values. This Observable produces connected - * non-overlapping buffers, each containing "count" elements. When the source Observable completes or - * encounters an error, the current buffer is emitted, and the event is propagated. - * - * @param source - * The source {@link Observable} which produces values. - * @param count - * The maximum size of each buffer before it should be emitted. - * @return - * An {@link Observable} which produces connected non-overlapping buffers containing at most - * "count" produced values. - */ - public static Observable> buffer(Observable source, int count) { - return create(OperationBuffer.buffer(source, count)); - } - - /** - * Creates an Observable which produces buffers of collected values. This Observable produces buffers every - * "skip" values, each containing "count" elements. When the source Observable completes or encounters an error, - * the current buffer is emitted and the event is propagated. - * - * @param source - * The source {@link Observable} which produces values. - * @param count - * The maximum size of each buffer before it should be emitted. - * @param skip - * How many produced values need to be skipped before starting a new buffer. Note that when "skip" and - * "count" are equals that this is the same operation as {@link Observable#buffer(Observable, int)}. - * @return - * An {@link Observable} which produces buffers every "skipped" values containing at most - * "count" produced values. - */ - public static Observable> buffer(Observable source, int count, int skip) { - return create(OperationBuffer.buffer(source, count, skip)); - } - - /** - * Creates an Observable which produces buffers of collected values. This Observable produces connected - * non-overlapping buffers, each of a fixed duration specified by the "timespan" argument. When the source - * Observable completes or encounters an error, the current buffer is emitted and the event is propagated. - * - * @param source - * The source {@link Observable} which produces values. - * @param timespan - * The period of time each buffer is collecting values before it should be emitted, and - * replaced with a new buffer. - * @param unit - * The unit of time which applies to the "timespan" argument. - * @return - * An {@link Observable} which produces connected non-overlapping buffers with a fixed duration. - */ - public static Observable> buffer(Observable source, long timespan, TimeUnit unit) { - return create(OperationBuffer.buffer(source, timespan, unit)); - } - - /** - * Creates an Observable which produces buffers of collected values. This Observable produces connected - * non-overlapping buffers, each of a fixed duration specified by the "timespan" argument. When the source - * Observable completes or encounters an error, the current buffer is emitted and the event is propagated. - * - * @param source - * The source {@link Observable} which produces values. - * @param timespan - * The period of time each buffer is collecting values before it should be emitted, and - * replaced with a new buffer. - * @param unit - * The unit of time which applies to the "timespan" argument. - * @param scheduler - * The {@link Scheduler} to use when determining the end and start of a buffer. - * @return - * An {@link Observable} which produces connected non-overlapping buffers with a fixed duration. - */ - public static Observable> buffer(Observable source, long timespan, TimeUnit unit, Scheduler scheduler) { - return create(OperationBuffer.buffer(source, timespan, unit, scheduler)); - } - - /** - * Creates an Observable which produces buffers of collected values. This Observable produces connected - * non-overlapping buffers, each of a fixed duration specified by the "timespan" argument or a maximum size - * specified by the "count" argument (which ever is reached first). When the source Observable completes - * or encounters an error, the current buffer is emitted and the event is propagated. - * - * @param source - * The source {@link Observable} which produces values. - * @param timespan - * The period of time each buffer is collecting values before it should be emitted, and - * replaced with a new buffer. - * @param unit - * The unit of time which applies to the "timespan" argument. - * @param count - * The maximum size of each buffer before it should be emitted. - * @return - * An {@link Observable} which produces connected non-overlapping buffers which are emitted after - * a fixed duration or when the buffer has reached maximum capacity (which ever occurs first). - */ - public static Observable> buffer(Observable source, long timespan, TimeUnit unit, int count) { - return create(OperationBuffer.buffer(source, timespan, unit, count)); - } - - /** - * Creates an Observable which produces buffers of collected values. This Observable produces connected - * non-overlapping buffers, each of a fixed duration specified by the "timespan" argument or a maximum size - * specified by the "count" argument (which ever is reached first). When the source Observable completes - * or encounters an error, the current buffer is emitted and the event is propagated. - * - * @param source - * The source {@link Observable} which produces values. - * @param timespan - * The period of time each buffer is collecting values before it should be emitted, and - * replaced with a new buffer. - * @param unit - * The unit of time which applies to the "timespan" argument. - * @param count - * The maximum size of each buffer before it should be emitted. - * @param scheduler - * The {@link Scheduler} to use when determining the end and start of a buffer. - * @return - * An {@link Observable} which produces connected non-overlapping buffers which are emitted after - * a fixed duration or when the buffer has reached maximum capacity (which ever occurs first). - */ - public static Observable> buffer(Observable source, long timespan, TimeUnit unit, int count, Scheduler scheduler) { - return create(OperationBuffer.buffer(source, timespan, unit, count, scheduler)); - } - - /** - * Creates an Observable which produces buffers of collected values. This Observable starts a new buffer - * periodically, which is determined by the "timeshift" argument. Each buffer is emitted after a fixed timespan - * specified by the "timespan" argument. When the source Observable completes or encounters an error, the - * current buffer is emitted and the event is propagated. - * - * @param source - * The source {@link Observable} which produces values. - * @param timespan - * The period of time each buffer is collecting values before it should be emitted. - * @param timeshift - * The period of time after which a new buffer will be created. - * @param unit - * The unit of time which applies to the "timespan" and "timeshift" argument. - * @return - * An {@link Observable} which produces new buffers periodically, and these are emitted after - * a fixed timespan has elapsed. - */ - public static Observable> buffer(Observable source, long timespan, long timeshift, TimeUnit unit) { - return create(OperationBuffer.buffer(source, timespan, timeshift, unit)); - } - - /** - * Creates an Observable which produces buffers of collected values. This Observable starts a new buffer - * periodically, which is determined by the "timeshift" argument. Each buffer is emitted after a fixed timespan - * specified by the "timespan" argument. When the source Observable completes or encounters an error, the - * current buffer is emitted and the event is propagated. - * - * @param source - * The source {@link Observable} which produces values. - * @param timespan - * The period of time each buffer is collecting values before it should be emitted. - * @param timeshift - * The period of time after which a new buffer will be created. - * @param unit - * The unit of time which applies to the "timespan" and "timeshift" argument. - * @param scheduler - * The {@link Scheduler} to use when determining the end and start of a buffer. - * @return - * An {@link Observable} which produces new buffers periodically, and these are emitted after - * a fixed timespan has elapsed. - */ - public static Observable> buffer(Observable source, long timespan, long timeshift, TimeUnit unit, Scheduler scheduler) { - return create(OperationBuffer.buffer(source, timespan, timeshift, unit, scheduler)); - } - - /** - * Creates an Observable that will execute the given function when an {@link Observer} - * subscribes to it. + * Creates an Observable that will execute the given function when an {@link Observer} subscribes to it. *

* *

* Write the function you pass to create so that it behaves as an Observable: It - * should invoke the Observer's {@link Observer#onNext onNext}, - * {@link Observer#onError onError}, and {@link Observer#onCompleted onCompleted} methods + * should invoke the Observer's {@link Observer#onNext onNext}, {@link Observer#onError onError}, and {@link Observer#onCompleted onCompleted} methods * appropriately. *

* A well-formed Observable must invoke either the Observer's onCompleted method @@ -739,12 +503,11 @@ public static Observable> buffer(Observable source, long timespan *

* See Rx Design Guidelines (PDF) * for detailed information. - * + * * @param * the type of the items that this Observable emits * @param func - * a function that accepts an {@code Observer}, invokes its - * {@code onNext}, {@code onError}, and {@code onCompleted} methods + * a function that accepts an {@code Observer}, invokes its {@code onNext}, {@code onError}, and {@code onCompleted} methods * as appropriate, and returns a {@link Subscription} to allow the Observer to * canceling the subscription * @return an Observable that, when an {@link Observer} subscribes to it, will execute the given @@ -759,180 +522,102 @@ public static Observable create(Func1, Subscription> func) { * its {@link Observer#onCompleted onCompleted} method. *

* - * + * * @param * the type of the items (ostensibly) emitted by the Observable * @return an Observable that returns no data to the {@link Observer} and immediately invokes * the {@link Observer}'s {@link Observer#onCompleted() onCompleted} method */ public static Observable empty() { - return toObservable(new ArrayList()); + return from(new ArrayList()); } /** - * Returns an Observable that invokes an {@link Observer}'s {@link Observer#onError onError} - * method when the Observer subscribes to it + * Returns an Observable that invokes an {@link Observer}'s {@link Observer#onError onError} method when the Observer subscribes to it *

* - * + * * @param exception * the particular error to report * @param * the type of the items (ostensibly) emitted by the Observable - * @return an Observable that invokes the {@link Observer}'s - * {@link Observer#onError onError} method when the Observer subscribes to it + * @return an Observable that invokes the {@link Observer}'s {@link Observer#onError onError} method when the Observer subscribes to it */ public static Observable error(Throwable exception) { return new ThrowObservable(exception); } - /** - * Filters an Observable by discarding any items it emits that do not satisfy some predicate. - *

- * - * - * @param that - * the Observable to filter - * @param predicate - * a function that evaluates the items emitted by the source Observable, returning - * {@code true} if they pass the filter - * @return an Observable that emits only those items emitted by the source Observable for which the - * predicate evaluates to {@code true} - */ - public static Observable filter(Observable that, Func1 predicate) { - return create(OperationFilter.filter(that, predicate)); - } - - /** - * Filters an Observable by discarding any items it emits that do not satisfy some predicate - *

- * - * - * - * @param that - * the Observable to filter - * @param predicate - * a function that evaluates an item emitted by the source Observable, and - * returns {@code true} if it passes the filter - * @return an Observable that emits only those items emitted by the source Observable for which - * the predicate evaluates to {@code true} - */ - public static Observable where(Observable that, Func1 predicate) { - return create(OperationWhere.where(that, predicate)); - } - /** * Converts an {@link Iterable} sequence into an Observable. *

* - * - *

Implementation note: the entire iterable sequence will be immediately emitted each time an - * {@link Observer} subscribes. Since this occurs before the {@link Subscription} is returned, + * + *

Implementation note: the entire iterable sequence will be immediately emitted each time an {@link Observer} subscribes. Since this occurs before the {@link Subscription} is returned, * it in not possible to unsubscribe from the sequence before it completes. - * + * * @param iterable * the source {@link Iterable} sequence * @param * the type of items in the {@link Iterable} sequence and the type of items to be * emitted by the resulting Observable * @return an Observable that emits each item in the source {@link Iterable} sequence - * @see #toObservable(Iterable) */ public static Observable from(Iterable iterable) { - return toObservable(iterable); + return create(OperationToObservableIterable.toObservableIterable(iterable)); } /** * Converts an Array into an Observable. *

* - * - *

Implementation note: the entire array will be immediately emitted each time an - * {@link Observer} subscribes. Since this occurs before the {@link Subscription} is returned, + * + *

Implementation note: the entire array will be immediately emitted each time an {@link Observer} subscribes. Since this occurs before the {@link Subscription} is returned, * it in not possible to unsubscribe from the sequence before it completes. - * + * * @param items * the source Array * @param * the type of items in the Array, and the type of items to be emitted by the * resulting Observable * @return an Observable that emits each item in the source Array - * @see #toObservable(Object...) */ public static Observable from(T... items) { - return toObservable(items); + return create(OperationToObservableIterable.toObservableIterable(Arrays.asList(items))); } /** * Generates an Observable that emits a sequence of integers within a specified range. *

* - * - *

Implementation note: the entire range will be immediately emitted each time an - * {@link Observer} subscribes. Since this occurs before the {@link Subscription} is returned, + * + *

Implementation note: the entire range will be immediately emitted each time an {@link Observer} subscribes. Since this occurs before the {@link Subscription} is returned, * it in not possible to unsubscribe from the sequence before it completes. - * + * * @param start * the value of the first integer in the sequence * @param count * the number of sequential integers to generate - * + * * @return an Observable that emits a range of sequential integers - * + * * @see Observable.Range Method (Int32, Int32) */ public static Observable range(int start, int count) { return from(Range.createWithCount(start, count)); } - /** - * Asynchronously subscribes and unsubscribes Observers on the specified {@link Scheduler}. - *

- * - * - * @param source - * the source Observable - * @param scheduler - * the {@link Scheduler} to perform subscription and unsubscription actions on - * @param - * the type of the items emitted by the Observable - * @return the source Observable modified so that its subscriptions and unsubscriptions happen - * on the specified {@link Scheduler} - */ - public static Observable subscribeOn(Observable source, Scheduler scheduler) { - return create(OperationSubscribeOn.subscribeOn(source, scheduler)); - } - - /** - * Asynchronously notify Observers on the specified {@link Scheduler}. - *

- * - * - * @param source - * the source Observable - * @param scheduler - * the {@link Scheduler} to notify Observers on - * @param - * the type of the items emitted by the Observable - * @return the source Observable modified so that its Observers are notified on the specified - * {@link Scheduler} - */ - public static Observable observeOn(Observable source, Scheduler scheduler) { - return create(OperationObserveOn.observeOn(source, scheduler)); - } - /** * Returns an Observable that calls an Observable factory to create its Observable for each * new Observer that subscribes. That is, for each subscriber, the actuall Observable is determined * by the factory function. - * + * *

* *

* The defer operator allows you to defer or delay emitting items from an Observable until such * time as an Observer subscribes to the Observable. This allows an {@link Observer} to easily * obtain updates or a refreshed version of the sequence. - * + * * @param observableFactory * the Observable factory function to invoke for each {@link Observer} that * subscribes to the resulting Observable @@ -957,7 +642,7 @@ public static Observable defer(Func0> observableFactory) { * from() will convert an {@link Iterable} object into an Observable that emits * each of the items in the Iterable, one at a time, while the just() method * converts an Iterable into an Observable that emits the entire Iterable as a single item. - * + * * @param value * the item to pass to the {@link Observer}'s {@link Observer#onNext onNext} method * @param @@ -968,91 +653,7 @@ public static Observable just(T value) { List list = new ArrayList(); list.add(value); - return toObservable(list); - } - - /** - * Returns an Observable that applies a function of your choosing to each item emitted by an - * Observable and emits the result. - *

- * - * - * @param sequence - * the source Observable - * @param func - * a function to apply to each item emitted by the source Observable - * @param - * the type of items emitted by the the source Observable - * @param - * the type of items to be emitted by the resulting Observable - * @return an Observable that emits the items from the source Observable as transformed by the - * given function - */ - public static Observable map(Observable sequence, Func1 func) { - return create(OperationMap.map(sequence, func)); - } - - /** - * Creates a new Observable by applying a function that you supply to each item emitted by - * the source Observable, where that function returns an Observable, and then merging those - * resulting Observables and emitting the results of this merger. - *

- * - *

- * Note: {@code mapMany} and {@code flatMap} are equivalent. - * - * @param sequence - * the source Observable - * @param func - * a function that, when applied to an item emitted by the source Observable, - * returns an Observable - * @param - * the type of items emitted by the source Observable - * @param - * the type of items emitted by the Observables that are returned from - * {@code func} - * @return an Observable that emits the result of applying the transformation function to each - * item emitted by the source Observable and merging the results of the Observables - * obtained from this transformation - * @see #flatMap(Observable, Func1) - */ - public static Observable mapMany(Observable sequence, Func1> func) { - return create(OperationMap.mapMany(sequence, func)); - } - - /** - * Turns all of the notifications from a source Observable into {@link Observer#onNext onNext} - * emissions, and marks them with their original notification types within {@link Notification} - * objects. - *

- * - * - * @param sequence - * the Observable you want to materialize in this way - * @return an Observable that emits items that are the result of materializing the - * notifications of the source Observable. - * @see MSDN: Observable.Materialize - */ - public static Observable> materialize(final Observable sequence) { - return create(OperationMaterialize.materialize(sequence)); - } - - /** - * Reverses the effect of {@link #materialize materialize} by transforming the - * {@link Notification} objects emitted by a source Observable into the items or notifications - * they represent. - *

- * - * - * @param sequence - * an Observable that emits {@link Notification} objects that represent the items and - * notifications emitted by an Observable - * @return an Observable that emits the items and notifications embedded in the - * {@link Notification} objects emitted by the source Observable - * @see MSDN: Observable.Dematerialize - */ - public static Observable dematerialize(final Observable> sequence) { - return create(OperationDematerialize.dematerialize(sequence)); + return from(list); } /** @@ -1062,11 +663,10 @@ public static Observable dematerialize(final Observable> *

* You can combine the items emitted by multiple Observables so that they act like a single * Observable, by using the merge method. - * + * * @param source * a list of Observables - * @return an Observable that emits items that are the result of flattening the - * {@code source} list of Observables + * @return an Observable that emits items that are the result of flattening the {@code source} list of Observables * @see MSDN: Observable.Merge */ public static Observable merge(List> source) { @@ -1081,7 +681,7 @@ public static Observable merge(List> source) { *

* You can combine the items emitted by multiple Observables so that they act like a single * Observable, by using the {@code merge} method. - * + * * @param source * an Observable that emits Observables * @return an Observable that emits items that are the result of flattening the items emitted @@ -1099,7 +699,7 @@ public static Observable merge(Observable> source) { *

* You can combine items emitted by multiple Observables so that they act like a single * Observable, by using the {@code merge} method. - * + * * @param source * a series of Observables * @return an Observable that emits items that are the result of flattening the items emitted @@ -1110,34 +710,12 @@ public static Observable merge(Observable... source) { return create(OperationMerge.merge(source)); } - /** - * Returns an Observable that emits the items from the {@code source} Observable until - * the {@code other} Observable emits an item. - *

- * - * - * @param source - * the source Observable - * @param other - * the Observable whose first emitted item will cause {@code takeUntil} to stop - * emitting items from the {@code source} Observable - * @param - * the type of items emitted by {@code source} - * @param - * the type of items emitted by {@code other} - * @return an Observable that emits the items emitted by {@code source} until such time as - * {@code other} emits its first item - */ - public static Observable takeUntil(final Observable source, final Observable other) { - return OperationTakeUntil.takeUntil(source, other); - } - /** * Returns an Observable that emits the items emitted by two or more Observables, one after the * other. *

* - * + * * @param source * a series of Observables * @return an Observable that emits items that are the result of combining the items emitted by @@ -1148,100 +726,6 @@ public static Observable concat(Observable... source) { return create(OperationConcat.concat(source)); } - /** - * Returns an Observable that emits the same items as the source Observable, and then calls - * the given Action after the Observable completes. - *

- * - * - * @param source - * an Observable - * @param action - * an {@link Action0} to be invoked when the source Observable completes - * or errors - * @return an Observable that emits the same items as the source, then invokes the action - * @see MSDN: - * Observable.Finally Method - */ - public static Observable finallyDo(Observable source, Action0 action) { - return create(OperationFinally.finallyDo(source, action)); - } - - /** - * Creates a new Observable by applying a function that you supply to each item emitted by - * the source Observable, where that function returns an Observable, and then merging those - * resulting Observables and emitting the results of this merger. - *

- * - *

- * Note: {@code mapMany} and {@code flatMap} are equivalent. - * - * @param sequence - * the source Observable - * @param func - * a function that, when applied to each item emitted by the source Observable, - * generates an Observable - * @param - * the type of items emitted by the source Observable - * @param - * the type of items emitted by the Observables that are returned from - * {@code func} - * @return an Observable that emits the result of applying the transformation function to each - * item emitted by the source Observable and merging the results of the Observables - * obtained from this transformation - * @see #mapMany(Observable, Func1) - */ - public static Observable flatMap(Observable sequence, Func1> func) { - return mapMany(sequence, func); - } - - /** - *

- * - * - * @param source - * an Observable whose items you want to group - * @param keySelector - * a function that extracts the key for each item omitted by the source Observable - * @param elementSelector - * a function to map each item emitted by the source Observable to an item emitted - * by a {@link GroupedObservable} - * @param - * the key type - * @param - * the type of items emitted by the source Observable - * @param - * the type of items to be emitted by the resulting {@link GroupedObservable}s - * @return an Observable that emits {@link GroupedObservable}s, each of which corresponds to a - * unique key value and emits items representing items from the source Observable that - * share that key value - */ - public static Observable> groupBy(Observable source, final Func1 keySelector, final Func1 elementSelector) { - return create(OperationGroupBy.groupBy(source, keySelector, elementSelector)); - } - - /** - * Groups the items emitted by an Observable according to a specified criterion, and emits these - * grouped items as {@link GroupedObservable}s, one GroupedObservable per group. - *

- * - * - * @param source - * an Observable whose items you want to group - * @param keySelector - * a function that extracts the key for each item emitted by the source Observable - * @param - * the key type - * @param - * the type of items to be emitted by the resulting {@link GroupedObservable}s - * @return an Observable that emits {@link GroupedObservable}s, each of which corresponds to a - * unique key value and emits items representing items from the source Observable that - * share that key value - */ - public static Observable> groupBy(Observable source, final Func1 keySelector) { - return create(OperationGroupBy.groupBy(source, keySelector)); - } - /** * This behaves like {@link #merge(java.util.List)} except that if any of the merged Observables * notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will @@ -1250,13 +734,12 @@ public static Observable> groupBy(Observable s *

* *

- * Even if multiple merged Observables send {@code onError} notifications, - * {@code mergeDelayError} will only invoke the {@code onError} method of its + * Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only invoke the {@code onError} method of its * Observers once. *

* This method allows an Observer to receive all successfully emitted items from all of the * source Observables without being interrupted by an error notification from one of them. - * + * * @param source * a list of Observables * @return an Observable that emits items that are the result of flattening the items emitted by @@ -1275,13 +758,12 @@ public static Observable mergeDelayError(List> source) { *

* *

- * Even if multiple merged Observables send {@code onError} notifications, - * {@code mergeDelayError} will only invoke the {@code onError} method of its + * Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only invoke the {@code onError} method of its * Observers once. *

* This method allows an Observer to receive all successfully emitted items from all of the * source Observables without being interrupted by an error notification from one of them. - * + * * @param source * an Observable that emits Observables * @return an Observable that emits items that are the result of flattening the items emitted by @@ -1300,13 +782,12 @@ public static Observable mergeDelayError(Observable> source *

* *

- * Even if multiple merged Observables send {@code onError} notifications, - * {@code mergeDelayError} will only invoke the {@code onError} method of its + * Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only invoke the {@code onError} method of its * Observers once. *

* This method allows an Observer to receive all successfully emitted items from all of the * source Observables without being interrupted by an error notification from one of them. - * + * * @param source * a series of Observables * @return an Observable that emits items that are the result of flattening the items emitted by @@ -1323,7 +804,7 @@ public static Observable mergeDelayError(Observable... source) { * *

* This Observable is useful primarily for testing purposes. - * + * * @param * the type of items (not) emitted by the Observable * @return an Observable that never sends any items or notifications to an {@link Observer} @@ -1333,350 +814,19 @@ public static Observable never() { } /** - * Instruct an Observable to pass control to another Observable (the return value of a function) - * rather than invoking {@link Observer#onError onError} if it encounters an error. - *

- * - *

- * By default, when an Observable encounters an error that prevents it from emitting the - * expected item to its Observer, the Observable invokes its {@link Observer}'s - * {@code onError} method, and then quits without invoking any more of its Observer's - * methods. The {@code onErrorResumeNext} method changes this behavior. If you pass a - * function that returns an Observable ({@code resumeFunction}) to - * {@code onErrorResumeNext}, if the source Observable encounters an error, instead of - * invoking its Observer's {@code onError} function, it will instead relinquish control to - * this new Observable, which will invoke the Observer's {@link Observer#onNext onNext} method - * if it is able to do so. In such a case, because no Observable necessarily invokes - * {@code onError}, the Observer may never know that an error happened. - *

- * You can use this to prevent errors from propagating or to supply fallback data should errors - * be encountered. - * - * @param that - * the source Observable - * @param resumeFunction - * a function that returns an Observable that will take over if the source Observable - * encounters an error - * @return an Observable, identical to the source Observable with its behavior modified as described - */ - public static Observable onErrorResumeNext(final Observable that, final Func1> resumeFunction) { - return create(OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(that, resumeFunction)); - } - - /** - * Instruct an Observable to pass control to another Observable rather than invoking - * {@link Observer#onError onError} if it encounters an error. - *

- * - *

- * By default, when an Observable encounters an error that prevents it from emitting the - * expected item to its Observer, the Observable invokes its {@link Observer}'s - * {@code onError} method, and then quits without invoking any more of its Observer's - * methods. The {@code onErrorResumeNext} method changes this behavior. If you pass an - * Observable ({@code resumeSequence}) to {@code onErrorResumeNext}, if the original - * Observable encounters an error, instead of invoking its Observer's onError - * method, it will instead relinquish control to this new Observable, which will invoke the - * Observer's {@link Observer#onNext onNext} method if it is able to do so. In such a case, - * because no Observable necessarily invokes {@code onError}, the Observer may never know - * that an error happened. - *

- * You can use this to prevent errors from propagating or to supply fallback data should errors - * be encountered. - * - * @param that - * the source Observable - * @param resumeSequence - * a Observable that will take over if the source Observable encounters an error - * @return an Observable, identical to the source Observable with its behavior modified as described - */ - public static Observable onErrorResumeNext(final Observable that, final Observable resumeSequence) { - return create(OperationOnErrorResumeNextViaObservable.onErrorResumeNextViaObservable(that, resumeSequence)); - } - - /** - * Instruct an Observable to emit a particular item to its Observer's {@code onNext} - * function rather than invoking {@link Observer#onError onError} if it encounters an error. - *

- * - *

- * By default, when an Observable encounters an error that prevents it from emitting the - * expected item to its {@link Observer}, the Observable invokes its Observer's - * {@code onError} method, and then quits without invoking any more of its Observer's - * methods. The {@code onErrorReturn} method changes this behavior. If you pass a function - * ({@code resumeFunction}) to {@code onErrorReturn}, if the source Observable - * encounters an error, instead of invoking its Observer's {@code onError} method, it will - * instead pass the return value of {@code resumeFunction} to the Observer's - * {@link Observer#onNext onNext} method. - *

- * You can use this to prevent errors from propagating or to supply fallback data should errors - * be encountered. - * - * @param that - * the source Observable - * @param resumeFunction - * a function that returns an item that will be passed into an {@link Observer}'s - * {@link Observer#onNext onNext} method if the Observable encounters an error that - * would otherwise cause it to invoke {@link Observer#onError onError} - * @return an Observable, identical to the source Observable with its behavior modified as described - */ - public static Observable onErrorReturn(final Observable that, Func1 resumeFunction) { - return create(OperationOnErrorReturn.onErrorReturn(that, resumeFunction)); - } - - /** - * Instruct an Observable to pass control to another Observable rather than invoking {@link Observer#onError onError} if it encounters an error of type {@link java.lang.Exception}. - *

- * This differs from {@link #onErrorResumeNext} in that this one does not handle {@link java.lang.Throwable} or {@link java.lang.Error} but lets those continue through. - *

- * - *

- * By default, when an Observable encounters an error that prevents it from emitting the - * expected item to its Observer, the Observable invokes its {@link Observer}'s {@code onError} method, and then quits without invoking any more of its Observer's - * methods. The {@code onErrorResumeNext} method changes this behavior. If you pass an - * Observable ({@code resumeSequence}) to {@code onErrorResumeNext}, if the original - * Observable encounters an error, instead of invoking its Observer's onError - * method, it will instead relinquish control to this new Observable, which will invoke the - * Observer's {@link Observer#onNext onNext} method if it is able to do so. In such a case, - * because no Observable necessarily invokes {@code onError}, the Observer may never know - * that an error happened. - *

- * You can use this to prevent errors from propagating or to supply fallback data should errors - * be encountered. - * - * @param that - * the source Observable - * @param resumeSequence - * a Observable that will take over if the source Observable encounters an error - * @return an Observable, identical to the source Observable with its behavior modified as described - */ - public static Observable onExceptionResumeNext(final Observable that, final Observable resumeSequence) { - return create(OperationOnExceptionResumeNextViaObservable.onExceptionResumeNextViaObservable(that, resumeSequence)); - } - - /** - * Returns a {@link ConnectableObservable} that shares a single subscription to the underlying - * Observable that will replay all of its items and notifications to any future - * {@link Observer}. - *

- * - * @param that - * the source Observable - * @return a {@link ConnectableObservable} that upon connection causes the source Observable to - * emit items to its {@link Observer}s - */ - public static ConnectableObservable replay(final Observable that) { - return OperationMulticast.multicast(that, ReplaySubject. create()); - } - - /** - * This method has similar behavior to {@link #replay} except that this auto-subscribes to - * the source Observable rather than returning a {@link ConnectableObservable}. - *

- * - *

- * This is useful when you want an Observable to cache responses and you can't control the - * subscribe/unsubscribe behavior of all the {@link Observer}s. - *

- * NOTE: You sacrifice the ability to unsubscribe from the origin when you use the - * cache() operator so be careful not to use this operator on Observables that - * emit an infinite or very large number of items that will use up memory. - * - * @return an Observable that when first subscribed to, caches all of the items and - * notifications it emits so it can replay them for subsequent subscribers. - */ - public static Observable cache(final Observable that) { - return create(OperationCache.cache(that)); - } - - /** - * Returns a {@link ConnectableObservable}, which waits until its - * {@link ConnectableObservable#connect} method is called before it begins emitting items to - * those {@link Observer}s that have subscribed to it. - *

- * - * - * @param that - * the source Observable - * @return a {@link ConnectableObservable} that upon connection causes the source Observable to - * emit items to its {@link Observer} - */ - public static ConnectableObservable publish(final Observable that) { - return OperationMulticast.multicast(that, PublishSubject. create()); - } - - /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by the source Observable into the same function, and so on until all items have been emitted - * by the source Observable, and emits the final result from the final call to your function as - * its sole item. - *

- * - *

- * This technique, which is called "reduce" or "aggregate" here, is sometimes called "fold," - * "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, - * has an {@code inject} method that does a similar operation on lists. - * - * @param - * the type of item emitted by the source Observable - * @param sequence - * the source Observable - * @param accumulator - * an accumulator function to be invoked on each item emitted by the source - * Observable, the result of which will be used in the next accumulator call - * @return an Observable that emits a single item that is the result of applying the - * accumulator function to the sequence of items emitted by the source Observable - * @see MSDN: Observable.Aggregate - * @see Wikipedia: Fold (higher-order function) - */ - public static Observable reduce(Observable sequence, Func2 accumulator) { - return takeLast(create(OperationScan.scan(sequence, accumulator)), 1); - } - - /** - *

- * - * - * @see #reduce(Observable, Func2) - */ - public static Observable aggregate(Observable sequence, Func2 accumulator) { - return reduce(sequence, accumulator); - } - - /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by the source Observable into the same function, and so on until all items have been emitted - * by the source Observable, emitting the final result from the final call to your function as - * its sole item. - *

- * - *

- * This technique, which is called "reduce" or "aggregate" here, is sometimes called "fold," - * "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, - * has an {@code inject} method that does a similar operation on lists. - * - * @param - * the type of item emitted by the source Observable - * @param - * the type returned by the accumulator function, and the type of the seed - * @param sequence - * the source Observable - * @param initialValue - * a seed to pass in to the first execution of the accumulator function - * @param accumulator - * an accumulator function to be invoked on each item emitted by the source - * Observable, the result of which will be used in the next accumulator call - * @return an Observable that emits a single item that is the result of applying the - * accumulator function to the sequence of items emitted by the source Observable - * @see MSDN: Observable.Aggregate - * @see Wikipedia: Fold (higher-order function) - */ - public static Observable reduce(Observable sequence, R initialValue, Func2 accumulator) { - return takeLast(create(OperationScan.scan(sequence, initialValue, accumulator)), 1); - } - - /** - *

- * - * - * @see #reduce(Observable, Object, Func2) - */ - public static Observable aggregate(Observable sequence, R initialValue, Func2 accumulator) { - return reduce(sequence, initialValue, accumulator); - } - - /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by the source Observable into the same function, and so on until all items have been emitted - * by the source Observable, emitting the result of each of these iterations. - *

- * - * - * @param - * the type of item emitted by the source Observable - * @param sequence - * the source Observable - * @param accumulator - * an accumulator function to be invoked on each item emitted by the source - * Observable, the result of which will be emitted and used in the next accumulator - * call - * @return an Observable that emits items that are the result of accumulating the items from - * the source Observable - * @see MSDN: Observable.Scan - */ - public static Observable scan(Observable sequence, Func2 accumulator) { - return create(OperationScan.scan(sequence, accumulator)); - } - - /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by the source Observable into the same function, and so on until all items have been emitted - * by the source Observable, emitting the result of each of these iterations. - *

- * - *

- * Note that when you pass a seed to {@code scan()}, that seed will be the first item - * emitted by the resulting Observable. - * - * @param - * the type of item emitted by the source Observable - * @param - * the type returned by the accumulator function, and the type of the seed - * @param sequence - * the source Observable - * @param initialValue - * the initial (seed) accumulator value - * @param accumulator - * an accumulator function to be invoked on each item emitted by the source - * Observable, the result of which will be emitted and used in the next accumulator - * call - * @return an Observable that emits items that are the result of accumulating the items emitted - * by the source Observable - * @see MSDN: Observable.Scan - */ - public static Observable scan(Observable sequence, R initialValue, Func2 accumulator) { - return create(OperationScan.scan(sequence, initialValue, accumulator)); - } - - /** - * source Observable satisfy a condition. - *

- * - * - * @param sequence - * an Observable whose emitted items you are evaluating - * @param predicate - * a function that evaluates each emitted item and returns a Boolean - * @param - * the type of items emitted by the source Observable - * @return an Observable that emits {@code true} if all of the items emitted by the source - * Observable satisfy the predicate; otherwise, {@code false} - */ - public static Observable all(final Observable sequence, final Func1 predicate) { - return create(OperationAll.all(sequence, predicate)); - } - - /** - * Observable and emits the remaining items. - *

- * + * Given an Observable that emits Observables, creates a single Observable that + * emits the items emitted by the most recently published of those Observables. *

- * You can ignore the first {@code num} items emitted by an Observable and attend only to - * those items that come after, by modifying the Observable with the {@code skip} method. - * - * @param items - * the source Observable - * @param num - * the number of items to skip - * @return an Observable that emits the same items emitted by the source Observable, except for - * the first {@code num} items - * @see MSDN: Observable.Skip Method + * + * + * @param sequenceOfSequences + * the source Observable that emits Observables + * @return an Observable that emits only the items emitted by the most recently published + * Observable */ - public static Observable skip(final Observable items, int num) { - return create(OperationSkip.skip(items, num)); + public static Observable switchDo(Observable> sequenceOfSequences) { + // TODO should this static remain? I have left it because it is an Observable + return create(OperationSwitch.switchDo(sequenceOfSequences)); } /** @@ -1684,14 +834,19 @@ public static Observable skip(final Observable items, int num) { * emits the items emitted by the most recently published of those Observables. *

* - * + * * @param sequenceOfSequences * the source Observable that emits Observables * @return an Observable that emits only the items emitted by the most recently published * Observable + * @throws ClassCastException + * if sequence not of type {@code Observable} */ - public static Observable switchDo(Observable> sequenceOfSequences) { - return create(OperationSwitch.switchDo(sequenceOfSequences)); + @SuppressWarnings("unchecked") + public Observable switchDo() { + // TODO can we come up with a better name than this? It should be 'switch' but that is reserved. + // Perhaps 'switchOnNext'? + return create(OperationSwitch.switchDo((Observable>) this)); } /** @@ -1700,14 +855,10 @@ public static Observable switchDo(Observable> sequenceOfSeq *

* *

- * A well-behaved Observable does not interleave its invocations of the - * {@link Observer#onNext onNext}, {@link Observer#onCompleted onCompleted}, and - * {@link Observer#onError onError} methods of its {@link Observer}s; it invokes - * {@code onCompleted} or {@code onError} only once; and it never invokes - * {@code onNext} after invoking either {@code onCompleted} or {@code onError}. - * {@code synchronize} enforces this, and the Observable it returns invokes - * {@code onNext} and {@code onCompleted} or {@code onError} synchronously. - * + * A well-behaved Observable does not interleave its invocations of the {@link Observer#onNext onNext}, {@link Observer#onCompleted onCompleted}, and {@link Observer#onError onError} methods of + * its {@link Observer}s; it invokes {@code onCompleted} or {@code onError} only once; and it never invokes {@code onNext} after invoking either {@code onCompleted} or {@code onError}. + * {@code synchronize} enforces this, and the Observable it returns invokes {@code onNext} and {@code onCompleted} or {@code onError} synchronously. + * * @param observable * the source Observable * @param @@ -1719,177 +870,17 @@ public static Observable synchronize(Observable observable) { return create(OperationSynchronize.synchronize(observable)); } - /** - * Returns an Observable that emits the first {@code num} items emitted by the source - * Observable. - *

- * - *

- * This method returns an Observable that will invoke a subscribing {@link Observer}'s - * {@link Observer#onNext onNext} method a maximum of {@code num} times before invoking - * {@link Observer#onCompleted onCompleted}. - * - * @param items - * the source Observable - * @param num - * the number of items to emit from the start of the sequence emitted by the source - * Observable - * @return an Observable that emits only the first {@code num} items emitted by the source - * Observable - */ - public static Observable take(final Observable items, final int num) { - return create(OperationTake.take(items, num)); - } - - /** - * Returns an Observable that emits the last {@code count} items emitted by the source - * Observable. - *

- * - * - * @param items - * the source Observable - * @param count - * the number of items to emit from the end of the sequence emitted by the source - * Observable - * @return an Observable that emits only the last count items emitted by the source - * Observable - */ - public static Observable takeLast(final Observable items, final int count) { - return create(OperationTakeLast.takeLast(items, count)); - } - - /** - * Returns an Observable that emits the items emitted by a source Observable so long as a given - * predicate, operating on the items emitted, remains true. - *

- * - * - * @param items - * the source Observable - * @param predicate - * a function to test each item emitted by the source Observable for a condition - * @return an Observable that emits items from the source Observable so long as the predicate - * continues to return {@code true} for each item, then completes - */ - public static Observable takeWhile(final Observable items, Func1 predicate) { - return create(OperationTakeWhile.takeWhile(items, predicate)); - } - - /** - * Returns an Observable that emits the items emitted by a source Observable so long as a given - * predicate remains true, where the predicate can operate on both the item and its index - * relative to the complete sequence. - *

- * - * - * @param items - * the source Observable - * @param predicate - * a function to test each item emitted by the source Observable for a condition; - * the second parameter of the function represents the index of the source item - * @return an Observable that emits items from the source Observable so long as the predicate - * continues to return {@code true} for each item, then completes - */ - public static Observable takeWhileWithIndex(final Observable items, Func2 predicate) { - return create(OperationTakeWhile.takeWhileWithIndex(items, predicate)); - } - /** * Wraps each item emitted by a source Observable in a {@link Timestamped} object. *

* - * + * * @return an Observable that emits timestamped items from the source Observable */ public Observable> timestamp() { return create(OperationTimestamp.timestamp(this)); } - /** - * Returns an Observable that emits a single item, a list composed of all the items emitted by - * the source Observable. - *

- * - *

- * Normally, an Observable that emits multiple items will do so by invoking its - * {@link Observer}'s {@link Observer#onNext onNext} method once for each such item. {@code toList} - * allows you can change this behavior, instructing the Observable to compose a List of all of the - * items and then invoke the Observer's {@code onNext} function once, passing the entire list. - *

- * Be careful not to use this operator on Observables that emit an infinite or very large - * number of items, as all items will be held in memory and you do not have the option to - * unsubscribe. - * - * @param that - * the source Observable - * @return an Observable that emits a single item: a {@code List} containing all of the - * items emitted by the source Observable - */ - public static Observable> toList(final Observable that) { - return create(OperationToObservableList.toObservableList(that)); - } - - /** - * Returns a {@link ConnectableObservable} that upon connection causes the source Observable to - * emit items into the specified {@link Subject}. - * - * @param source - * the source Observable whose emitted items will be pushed into the specified - * {@link Subject} - * @param subject - * the {@link Subject} to push source items into - * @param - * the type of items emitted by the source Observable - * @param - * the type of the {@link Subject} - * @return a {@link ConnectableObservable} that upon connection causes the source Observable to - * push items into the specified {@link Subject} - */ - public static ConnectableObservable multicast(Observable source, final Subject subject) { - return OperationMulticast.multicast(source, subject); - } - - /** - * Converts an {@link Iterable} sequence into an Observable. - *

- * - *

- * You can convert any object that supports the Iterable interface into an Observable that - * emits each item in the Iterable, by passing the Iterable into the toObservable - * method. - * - * @param iterable - * the source {@link Iterable} sequence - * @param - * the type of items in the {@link Iterable} sequence and the type of items to be - * emitted by the resulting Observable - * @return an Observable that emits each item in the source {@link Iterable} sequence - */ - public static Observable toObservable(Iterable iterable) { - return create(OperationToObservableIterable.toObservableIterable(iterable)); - } - - /** - * Converts a {@link Future} into an Observable. - * - *

- * - *

- * Important note: This Observable is blocking; you cannot unsubscribe from it. - * - * @param future - * the source {@link Future} - * @param - * the type of of object that the {@link Future} returns, and also the type of the - * item emitted by the resulting Observable - * @return an Observable that emits the item from the source {@link Future} - * @deprecated Replaced by {@link #from(Future)} - */ - public static Observable toObservable(Future future) { - return create(OperationToObservableFuture.toObservableFuture(future)); - } - /** * Converts a {@link Future} into an Observable. *

@@ -1900,7 +891,7 @@ public static Observable toObservable(Future future) { * object into the {@code from} method. *

* Important note: This Observable is blocking; you cannot unsubscribe from it. - * + * * @param future * the source {@link Future} * @param @@ -1913,24 +904,26 @@ public static Observable from(Future future) { } /** - * Converts a {@link Future} into an Observable with timeout. + * Converts a {@link Future} into an Observable. *

- * Important note: This Observable is blocking; you cannot unsubscribe from it. - * + * + *

+ * You can convert any object that supports the {@link Future} interface into an Observable that + * emits the return value of the {@link Future#get} method of that object, by passing the + * object into the {@code from} method. + *

+ * * @param future * the source {@link Future} - * @param timeout - * the maximum time to wait - * @param unit - * the {@link TimeUnit} of the time argument + * @param scheduler + * the {@link Scheduler} to wait for the Future on. Use a Scheduler such as {@link Schedulers#threadPoolForIO()} that can block and wait on the future. * @param * the type of object that the {@link Future} returns, and also the type of item to * be emitted by the resulting Observable - * @return an Observable that emits the item from the source {@link Future} - * @deprecated Replaced by {@link #from(Future, long, TimeUnit)} + * @return an Observable that emits the item from the source Future */ - public static Observable toObservable(Future future, long timeout, TimeUnit unit) { - return create(OperationToObservableFuture.toObservableFuture(future, timeout, unit)); + public static Observable from(Future future, Scheduler scheduler) { + return create(OperationToObservableFuture.toObservableFuture(future)).subscribeOn(scheduler); } /** @@ -1943,7 +936,7 @@ public static Observable toObservable(Future future, long timeout, Tim * object into the {@code from} method. *

* Important note: This Observable is blocking; you cannot unsubscribe from it. - * + * * @param future * the source {@link Future} * @param timeout @@ -1959,70 +952,16 @@ public static Observable from(Future future, long timeout, TimeUnit un return create(OperationToObservableFuture.toObservableFuture(future, timeout, unit)); } - /** - * Converts an array sequence into an Observable. - * - * @param items - * the source array - * @param - * the type of items in the array, and also the type of items emitted by the - * resulting Observable - * @return an Observable that emits each item in the source array - * @deprecated Use {@link #from(Object...)} - */ - public static Observable toObservable(T... items) { - return toObservable(Arrays.asList(items)); - } - - /** - * Return an Observable that emits a single list of the items emitted by the source Observable, in sorted - * order (each item emitted by the source Observable must implement {@link Comparable} with - * respect to all other items emitted by the source Observable). - *

- * - * - * @param sequence - * the source Observable - * @throws ClassCastException - * if any emitted item does not implement {@link Comparable} with respect to all - * other emitted items - * @return an Observable that emits a single,sorted list of the items from the source Observable - */ - public static Observable> toSortedList(Observable sequence) { - return create(OperationToObservableSortedList.toSortedList(sequence)); - } - - /** - * Return an Observable that emits a single list of the items emitted by the source Observable, sorted - * by the given comparison function. - *

- * - * - * @param sequence - * the source Observable - * @param sortFunction - * a function that compares two items emitted by the source Observable and returns - * an Integer that indicates their sort order - * @return an Observable that emits a single, sorted list of the items from the source Observable - */ - public static Observable> toSortedList(Observable sequence, Func2 sortFunction) { - return create(OperationToObservableSortedList.toSortedList(sequence, sortFunction)); - } - /** *

* + *

{@code zip} applies this function in strict sequence, so the first item emitted by the + * new Observable will be the result of the function applied to the first item emitted by {@code w0} and the first item emitted by {@code w1}; the second item emitted by + * the new Observable will be the result of the function applied to the second item emitted by {@code w0} and the second item emitted by {@code w1}; and so forth. *

- * {@code zip} applies this function in strict sequence, so the first item emitted by the - * new Observable will be the result of the function applied to the first item emitted by - * {@code w0} and the first item emitted by {@code w1}; the second item emitted by - * the new Observable will be the result of the function applied to the second item emitted by - * {@code w0} and the second item emitted by {@code w1}; and so forth. - *

- * The resulting {@code Observable} returned from {@code zip} will invoke - * {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations + * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations * of the source Observable that emits the fewest items. - * + * * @param w0 * one source Observable * @param w1 @@ -2042,7 +981,7 @@ public static Observable zip(Observable w0, Observable w1 * emitted by two source Observables are equal. *

* - * + * * @param first * one Observable to compare * @param second @@ -2067,7 +1006,7 @@ public Boolean call(T first, T second) { * function. *

* - * + * * @param first * one Observable to compare * @param second @@ -2088,18 +1027,14 @@ public static Observable sequenceEqual(Observable first, Observa * combinations of three items emitted, in sequence, by three other Observables. *

* + *

{@code zip} applies this function in strict sequence, so the first item emitted by the + * new Observable will be the result of the function applied to the first item emitted by {@code w0}, the first item emitted by {@code w1}, and the first item emitted by {@code w2}; the second + * item emitted by the new Observable will be the result of the + * function applied to the second item emitted by {@code w0}, the second item emitted by {@code w1}, and the second item emitted by {@code w2}; and so forth. *

- * {@code zip} applies this function in strict sequence, so the first item emitted by the - * new Observable will be the result of the function applied to the first item emitted by - * {@code w0}, the first item emitted by {@code w1}, and the first item emitted by - * {@code w2}; the second item emitted by the new Observable will be the result of the - * function applied to the second item emitted by {@code w0}, the second item emitted by - * {@code w1}, and the second item emitted by {@code w2}; and so forth. - *

- * The resulting {@code Observable} returned from {@code zip} will invoke - * {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations + * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations * of the source Observable that emits the fewest items. - * + * * @param w0 * one source Observable * @param w1 @@ -2120,18 +1055,15 @@ public static Observable zip(Observable w0, Observable * - *

- * {@code zip} applies this function in strict sequence, so the first item emitted by the - * new Observable will be the result of the function applied to the first item emitted by - * {@code w0}, the first item emitted by {@code w1}, the first item emitted by - * {@code w2}, and the first item emitted by {@code w3}; the second item emitted by + *

{@code zip} applies this function in strict sequence, so the first item emitted by the + * new Observable will be the result of the function applied to the first item emitted by {@code w0}, the first item emitted by {@code w1}, the first item emitted by {@code w2}, and the first item + * emitted by {@code w3}; the second item emitted by * the new Observable will be the result of the function applied to the second item emitted by * each of those Observables; and so forth. *

- * The resulting {@code Observable} returned from {@code zip} will invoke - * {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations + * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations * of the source Observable that emits the fewest items. - * + * * @param w0 * one source Observable * @param w1 @@ -2150,34 +1082,64 @@ public static Observable zip(Observable w0, Observabl } /** - * Creates an Observable which produces buffers of collected values. + * Combines the given observables, emitting an event containing an aggregation of the latest values of each of the source observables + * each time an event is received from one of the source observables, where the aggregation is defined by the given function. + *

+ * * + * @param w0 + * The first source observable. + * @param w1 + * The second source observable. + * @param combineFunction + * The aggregation function used to combine the source observable values. + * @return An Observable that combines the source Observables with the given combine function + */ + public static Observable combineLatest(Observable w0, Observable w1, Func2 combineFunction) { + return create(OperationCombineLatest.combineLatest(w0, w1, combineFunction)); + } + + /** + * @see #combineLatest(Observable, Observable, Func2) + */ + public static Observable combineLatest(Observable w0, Observable w1, Observable w2, Func3 combineFunction) { + return create(OperationCombineLatest.combineLatest(w0, w1, w2, combineFunction)); + } + + /** + * @see #combineLatest(Observable, Observable, Func2) + */ + public static Observable combineLatest(Observable w0, Observable w1, Observable w2, Observable w3, Func4 combineFunction) { + return create(OperationCombineLatest.combineLatest(w0, w1, w2, w3, combineFunction)); + } + + /** + * Creates an Observable which produces buffers of collected values. + * *

This Observable produces connected non-overlapping buffers. The current buffer is - * emitted and replaced with a new buffer when the Observable produced by the specified - * {@link Func0} produces a {@link BufferClosing} object. The * {@link Func0} will then + * emitted and replaced with a new buffer when the Observable produced by the specified {@link Func0} produces a {@link BufferClosing} object. The * {@link Func0} will then * be used to create a new Observable to listen for the end of the next buffer. - * + * * @param bufferClosingSelector * The {@link Func0} which is used to produce an {@link Observable} for every buffer created. * When this {@link Observable} produces a {@link BufferClosing} object, the associated buffer * is emitted and replaced with a new one. * @return - * An {@link Observable} which produces connected non-overlapping buffers, which are emitted - * when the current {@link Observable} created with the {@link Func0} argument produces a - * {@link BufferClosing} object. + * An {@link Observable} which produces connected non-overlapping buffers, which are emitted + * when the current {@link Observable} created with the {@link Func0} argument produces a {@link BufferClosing} object. */ public Observable> buffer(Func0> bufferClosingSelector) { - return buffer(this, bufferClosingSelector); + return create(OperationBuffer.buffer(this, bufferClosingSelector)); } /** * Creates an Observable which produces buffers of collected values. - * + * *

This Observable produces buffers. Buffers are created when the specified "bufferOpenings" * Observable produces a {@link BufferOpening} object. Additionally the {@link Func0} argument * is used to create an Observable which produces {@link BufferClosing} objects. When this * Observable produces such an object, the associated buffer is emitted. - * + * * @param bufferOpenings * The {@link Observable} which, when it produces a {@link BufferOpening} object, will cause * another buffer to be created. @@ -2186,76 +1148,75 @@ public Observable> buffer(Func0> bufferClosing * When this {@link Observable} produces a {@link BufferClosing} object, the associated buffer * is emitted. * @return - * An {@link Observable} which produces buffers which are created and emitted when the specified - * {@link Observable}s publish certain objects. + * An {@link Observable} which produces buffers which are created and emitted when the specified {@link Observable}s publish certain objects. */ public Observable> buffer(Observable bufferOpenings, Func1> bufferClosingSelector) { - return buffer(this, bufferOpenings, bufferClosingSelector); + return create(OperationBuffer.buffer(this, bufferOpenings, bufferClosingSelector)); } /** * Creates an Observable which produces buffers of collected values. - * + * *

This Observable produces connected non-overlapping buffers, each containing "count" * elements. When the source Observable completes or encounters an error, the current * buffer is emitted, and the event is propagated. - * + * * @param count * The maximum size of each buffer before it should be emitted. * @return - * An {@link Observable} which produces connected non-overlapping buffers containing at most - * "count" produced values. + * An {@link Observable} which produces connected non-overlapping buffers containing at most + * "count" produced values. */ public Observable> buffer(int count) { - return buffer(this, count); + return create(OperationBuffer.buffer(this, count)); } /** * Creates an Observable which produces buffers of collected values. - * + * *

This Observable produces buffers every "skip" values, each containing "count" * elements. When the source Observable completes or encounters an error, the current * buffer is emitted, and the event is propagated. - * + * * @param count * The maximum size of each buffer before it should be emitted. * @param skip * How many produced values need to be skipped before starting a new buffer. Note that when "skip" and * "count" are equals that this is the same operation as {@link Observable#buffer(Observable, int)}. * @return - * An {@link Observable} which produces buffers every "skipped" values containing at most - * "count" produced values. + * An {@link Observable} which produces buffers every "skipped" values containing at most + * "count" produced values. */ public Observable> buffer(int count, int skip) { - return buffer(this, count, skip); + return create(OperationBuffer.buffer(this, count, skip)); } /** * Creates an Observable which produces buffers of collected values. - * + * *

This Observable produces connected non-overlapping buffers, each of a fixed duration * specified by the "timespan" argument. When the source Observable completes or encounters * an error, the current buffer is emitted and the event is propagated. - * + * * @param timespan * The period of time each buffer is collecting values before it should be emitted, and * replaced with a new buffer. * @param unit * The unit of time which applies to the "timespan" argument. * @return - * An {@link Observable} which produces connected non-overlapping buffers with a fixed duration. + * An {@link Observable} which produces connected non-overlapping buffers with a fixed duration. */ public Observable> buffer(long timespan, TimeUnit unit) { - return buffer(this, timespan, unit); + return create(OperationBuffer.buffer(this, timespan, unit)); } /** * Creates an Observable which produces buffers of collected values. - * + * *

This Observable produces connected non-overlapping buffers, each of a fixed duration * specified by the "timespan" argument. When the source Observable completes or encounters * an error, the current buffer is emitted and the event is propagated. - * + * * @param timespan * The period of time each buffer is collecting values before it should be emitted, and * replaced with a new buffer. @@ -2264,10 +1225,10 @@ public Observable> buffer(long timespan, TimeUnit unit) { * @param scheduler * The {@link Scheduler} to use when determining the end and start of a buffer. * @return - * An {@link Observable} which produces connected non-overlapping buffers with a fixed duration. + * An {@link Observable} which produces connected non-overlapping buffers with a fixed duration. */ public Observable> buffer(long timespan, TimeUnit unit, Scheduler scheduler) { - return buffer(this, timespan, unit, scheduler); + return create(OperationBuffer.buffer(this, timespan, unit, scheduler)); } /** @@ -2275,7 +1236,7 @@ public Observable> buffer(long timespan, TimeUnit unit, Scheduler schedu * non-overlapping buffers, each of a fixed duration specified by the "timespan" argument or a maximum size * specified by the "count" argument (which ever is reached first). When the source Observable completes * or encounters an error, the current buffer is emitted and the event is propagated. - * + * * @param timespan * The period of time each buffer is collecting values before it should be emitted, and * replaced with a new buffer. @@ -2284,11 +1245,11 @@ public Observable> buffer(long timespan, TimeUnit unit, Scheduler schedu * @param count * The maximum size of each buffer before it should be emitted. * @return - * An {@link Observable} which produces connected non-overlapping buffers which are emitted after - * a fixed duration or when the buffer has reached maximum capacity (which ever occurs first). + * An {@link Observable} which produces connected non-overlapping buffers which are emitted after + * a fixed duration or when the buffer has reached maximum capacity (which ever occurs first). */ public Observable> buffer(long timespan, TimeUnit unit, int count) { - return buffer(this, timespan, unit, count); + return create(OperationBuffer.buffer(this, timespan, unit, count)); } /** @@ -2296,7 +1257,7 @@ public Observable> buffer(long timespan, TimeUnit unit, int count) { * non-overlapping buffers, each of a fixed duration specified by the "timespan" argument or a maximum size * specified by the "count" argument (which ever is reached first). When the source Observable completes * or encounters an error, the current buffer is emitted and the event is propagated. - * + * * @param timespan * The period of time each buffer is collecting values before it should be emitted, and * replaced with a new buffer. @@ -2307,11 +1268,11 @@ public Observable> buffer(long timespan, TimeUnit unit, int count) { * @param scheduler * The {@link Scheduler} to use when determining the end and start of a buffer. * @return - * An {@link Observable} which produces connected non-overlapping buffers which are emitted after - * a fixed duration or when the buffer has reached maximum capacity (which ever occurs first). + * An {@link Observable} which produces connected non-overlapping buffers which are emitted after + * a fixed duration or when the buffer has reached maximum capacity (which ever occurs first). */ public Observable> buffer(long timespan, TimeUnit unit, int count, Scheduler scheduler) { - return buffer(this, timespan, unit, count, scheduler); + return create(OperationBuffer.buffer(this, timespan, unit, count, scheduler)); } /** @@ -2319,7 +1280,7 @@ public Observable> buffer(long timespan, TimeUnit unit, int count, Sched * periodically, which is determined by the "timeshift" argument. Each buffer is emitted after a fixed timespan * specified by the "timespan" argument. When the source Observable completes or encounters an error, the * current buffer is emitted and the event is propagated. - * + * * @param timespan * The period of time each buffer is collecting values before it should be emitted. * @param timeshift @@ -2327,11 +1288,11 @@ public Observable> buffer(long timespan, TimeUnit unit, int count, Sched * @param unit * The unit of time which applies to the "timespan" and "timeshift" argument. * @return - * An {@link Observable} which produces new buffers periodically, and these are emitted after - * a fixed timespan has elapsed. + * An {@link Observable} which produces new buffers periodically, and these are emitted after + * a fixed timespan has elapsed. */ public Observable> buffer(long timespan, long timeshift, TimeUnit unit) { - return buffer(this, timespan, timeshift, unit); + return create(OperationBuffer.buffer(this, timespan, timeshift, unit)); } /** @@ -2339,7 +1300,7 @@ public Observable> buffer(long timespan, long timeshift, TimeUnit unit) * periodically, which is determined by the "timeshift" argument. Each buffer is emitted after a fixed timespan * specified by the "timespan" argument. When the source Observable completes or encounters an error, the * current buffer is emitted and the event is propagated. - * + * * @param timespan * The period of time each buffer is collecting values before it should be emitted. * @param timeshift @@ -2349,28 +1310,26 @@ public Observable> buffer(long timespan, long timeshift, TimeUnit unit) * @param scheduler * The {@link Scheduler} to use when determining the end and start of a buffer. * @return - * An {@link Observable} which produces new buffers periodically, and these are emitted after - * a fixed timespan has elapsed. + * An {@link Observable} which produces new buffers periodically, and these are emitted after + * a fixed timespan has elapsed. */ public Observable> buffer(long timespan, long timeshift, TimeUnit unit, Scheduler scheduler) { - return buffer(this, timespan, timeshift, unit, scheduler); + return create(OperationBuffer.buffer(this, timespan, timeshift, unit, scheduler)); } /** * Returns an Observable that emits the results of a function of your choosing applied to * combinations of four items emitted, in sequence, by four other Observables. - *

- * {@code zip} applies this function in strict sequence, so the first item emitted by the + *

{@code zip} applies this function in strict sequence, so the first item emitted by the * new Observable will be the result of the function applied to the first item emitted by * all of the Observalbes; the second item emitted by the new Observable will be the result of * the function applied to the second item emitted by each of those Observables; and so forth. *

- * The resulting {@code Observable} returned from {@code zip} will invoke - * {@code onNext} as many times as the number of {@code onNext} invokations of the + * The resulting {@code Observable} returned from {@code zip} will invoke {@code onNext} as many times as the number of {@code onNext} invokations of the * source Observable that emits the fewest items. *

* - * + * * @param ws * An Observable of source Observables * @param reduceFunction @@ -2390,18 +1349,16 @@ public Observable call(List> wsList) { /** * Returns an Observable that emits the results of a function of your choosing applied to * combinations of four items emitted, in sequence, by four other Observables. - *

- * {@code zip} applies this function in strict sequence, so the first item emitted by the + *

{@code zip} applies this function in strict sequence, so the first item emitted by the * new Observable will be the result of the function applied to the first item emitted by * all of the Observalbes; the second item emitted by the new Observable will be the result of * the function applied to the second item emitted by each of those Observables; and so forth. *

- * The resulting {@code Observable} returned from {@code zip} will invoke - * {@code onNext} as many times as the number of {@code onNext} invokations of the + * The resulting {@code Observable} returned from {@code zip} will invoke {@code onNext} as many times as the number of {@code onNext} invokations of the * source Observable that emits the fewest items. *

* - * + * * @param ws * A collection of source Observables * @param reduceFunction @@ -2413,30 +1370,27 @@ public static Observable zip(Collection> ws, FuncN reduc return create(OperationZip.zip(ws, reduceFunction)); } - /** + /** *

* - * + * * @param predicate - * a function that evaluates the items emitted by the source Observable, returning - * {@code true} if they pass the filter + * a function that evaluates the items emitted by the source Observable, returning {@code true} if they pass the filter * @return an Observable that emits only those items in the original Observable that the filter * evaluates as {@code true} */ public Observable filter(Func1 predicate) { - return filter(this, predicate); + return create(OperationFilter.filter(this, predicate)); } /** - * Registers an {@link Action0} to be called when this Observable invokes - * {@link Observer#onCompleted onCompleted} or {@link Observer#onError onError}. + * Registers an {@link Action0} to be called when this Observable invokes {@link Observer#onCompleted onCompleted} or {@link Observer#onError onError}. *

* - * + * * @param action * an {@link Action0} to be invoked when the source Observable finishes - * @return an Observable that emits the same items as the source Observable, then invokes the - * {@link Action0} + * @return an Observable that emits the same items as the source Observable, then invokes the {@link Action0} * @see MSDN: Observable.Finally Method */ public Observable finallyDo(Action0 action) { @@ -2451,7 +1405,7 @@ public Observable finallyDo(Action0 action) { * *

* Note: {@code mapMany} and {@code flatMap} are equivalent. - * + * * @param func * a function that, when applied to an item emitted by the source Observable, returns * an Observable @@ -2467,16 +1421,15 @@ public Observable flatMap(Func1> func) { /** *

* - * + * * @param predicate - * a function that evaluates an item emitted by the source Observable, returning - * {@code true} if it passes the filter + * a function that evaluates an item emitted by the source Observable, returning {@code true} if it passes the filter * @return an Observable that emits only those items in the original Observable that the filter * evaluates as {@code true} * @see #filter(Func1) */ public Observable where(Func1 predicate) { - return where(this, predicate); + return filter(predicate); } /** @@ -2484,14 +1437,14 @@ public Observable where(Func1 predicate) { * Observable and emits the result. *

* - * + * * @param func * a function to apply to each item emitted by the Observable * @return an Observable that emits the items from the source Observable, transformed by the * given function */ public Observable map(Func1 func) { - return map(this, func); + return create(OperationMap.map(this, func)); } /** @@ -2502,7 +1455,7 @@ public Observable map(Func1 func) { * *

* Note: mapMany and flatMap are equivalent. - * + * * @param func * a function that, when applied to an item emitted by the source Observable, returns * an Observable @@ -2512,50 +1465,48 @@ public Observable map(Func1 func) { * @see #flatMap(Func1) */ public Observable mapMany(Func1> func) { - return mapMany(this, func); + return create(OperationMap.mapMany(this, func)); } /** - * Turns all of the notifications from a source Observable into {@link Observer#onNext onNext} - * emissions, and marks them with their original notification types within {@link Notification} - * objects. + * Turns all of the notifications from a source Observable into {@link Observer#onNext onNext} emissions, and marks them with their original notification types within {@link Notification} objects. *

* - * + * * @return an Observable whose items are the result of materializing the items and * notifications of the source Observable * @see MSDN: Observable.materialize */ public Observable> materialize() { - return materialize(this); + return create(OperationMaterialize.materialize(this)); } /** * Asynchronously subscribes and unsubscribes Observers on the specified {@link Scheduler}. *

* - * + * * @param scheduler * the {@link Scheduler} to perform subscription and unsubscription actions on * @return the source Observable modified so that its subscriptions and unsubscriptions happen * on the specified {@link Scheduler} */ public Observable subscribeOn(Scheduler scheduler) { - return subscribeOn(this, scheduler); + return create(OperationSubscribeOn.subscribeOn(this, scheduler)); } /** * Asynchronously notify {@link Observer}s on the specified {@link Scheduler}. *

* - * + * * @param scheduler * the {@link Scheduler} to notify {@link Observer}s on * @return the source Observable modified so that its {@link Observer}s are notified on the * specified {@link Scheduler} */ public Observable observeOn(Scheduler scheduler) { - return observeOn(this, scheduler); + return create(OperationObserveOn.observeOn(this, scheduler)); } /** @@ -2564,21 +1515,19 @@ public Observable observeOn(Scheduler scheduler) { * or notifications they represent. *

* - * - * @return an Observable that emits the items and notifications embedded in the - * {@link Notification} objects emitted by the source Observable + * + * @return an Observable that emits the items and notifications embedded in the {@link Notification} objects emitted by the source Observable * @see MSDN: Observable.dematerialize * @throws Throwable * if the source Observable is not of type {@code Observable>}. */ @SuppressWarnings("unchecked") public Observable dematerialize() { - return dematerialize((Observable>) this); + return create(OperationDematerialize.dematerialize((Observable>) this)); } /** - * Instruct an Observable to pass control to another Observable rather than invoking - * {@link Observer#onError onError} if it encounters an error. + * Instruct an Observable to pass control to another Observable rather than invoking {@link Observer#onError onError} if it encounters an error. *

* *

@@ -2589,26 +1538,24 @@ public Observable dematerialize() { * function that returns an Observable (resumeFunction) to * onErrorResumeNext, if the original Observable encounters an error, instead of * invoking its Observer's onError method, it will instead relinquish control to - * the Observable returned from resumeFunction, which will invoke the Observer's - * {@link Observer#onNext onNext} method if it is able to do so. In such a case, because no + * the Observable returned from resumeFunction, which will invoke the Observer's {@link Observer#onNext onNext} method if it is able to do so. In such a case, because no * Observable necessarily invokes onError, the Observer may never know that an * error happened. *

* You can use this to prevent errors from propagating or to supply fallback data should errors * be encountered. - * + * * @param resumeFunction * a function that returns an Observable that will take over if the source Observable * encounters an error * @return the original Observable, with appropriately modified behavior */ public Observable onErrorResumeNext(final Func1> resumeFunction) { - return onErrorResumeNext(this, resumeFunction); + return create(OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(this, resumeFunction)); } /** - * Instruct an Observable to pass control to another Observable rather than invoking - * {@link Observer#onError onError} if it encounters an error. + * Instruct an Observable to pass control to another Observable rather than invoking {@link Observer#onError onError} if it encounters an error. *

* *

@@ -2619,26 +1566,24 @@ public Observable onErrorResumeNext(final Func1> res * another Observable (resumeSequence) to an Observable's * onErrorResumeNext method, if the original Observable encounters an error, * instead of invoking its Observer's onError method, it will instead relinquish - * control to resumeSequence which will invoke the Observer's - * {@link Observer#onNext onNext} method if it is able to do so. In such a case, because no + * control to resumeSequence which will invoke the Observer's {@link Observer#onNext onNext} method if it is able to do so. In such a case, because no * Observable necessarily invokes onError, the Observer may never know that an * error happened. *

* You can use this to prevent errors from propagating or to supply fallback data should errors * be encountered. - * + * * @param resumeSequence * a function that returns an Observable that will take over if the source Observable * encounters an error * @return the original Observable, with appropriately modified behavior */ public Observable onErrorResumeNext(final Observable resumeSequence) { - return onErrorResumeNext(this, resumeSequence); + return create(OperationOnErrorResumeNextViaObservable.onErrorResumeNextViaObservable(this, resumeSequence)); } - + /** - * Instruct an Observable to pass control to another Observable rather than invoking - * {@link Observer#onError onError} if it encounters an error of type {@link java.lang.Exception}. + * Instruct an Observable to pass control to another Observable rather than invoking {@link Observer#onError onError} if it encounters an error of type {@link java.lang.Exception}. *

* This differs from {@link #onErrorResumeNext} in that this one does not handle {@link java.lang.Throwable} or {@link java.lang.Error} but lets those continue through. *

@@ -2651,21 +1596,20 @@ public Observable onErrorResumeNext(final Observable resumeSequence) { * another Observable (resumeSequence) to an Observable's * onErrorResumeNext method, if the original Observable encounters an error, * instead of invoking its Observer's onError method, it will instead relinquish - * control to resumeSequence which will invoke the Observer's - * {@link Observer#onNext onNext} method if it is able to do so. In such a case, because no + * control to resumeSequence which will invoke the Observer's {@link Observer#onNext onNext} method if it is able to do so. In such a case, because no * Observable necessarily invokes onError, the Observer may never know that an * error happened. *

* You can use this to prevent errors from propagating or to supply fallback data should errors * be encountered. - * + * * @param resumeSequence * a function that returns an Observable that will take over if the source Observable * encounters an error * @return the original Observable, with appropriately modified behavior */ public Observable onExceptionResumeNext(final Observable resumeSequence) { - return onExceptionResumeNext(this, resumeSequence); + return create(OperationOnExceptionResumeNextViaObservable.onExceptionResumeNextViaObservable(this, resumeSequence)); } /** @@ -2685,14 +1629,14 @@ public Observable onExceptionResumeNext(final Observable resumeSequence) { *

* You can use this to prevent errors from propagating or to supply fallback data should errors * be encountered. - * + * * @param resumeFunction * a function that returns an item that the new Observable will emit if the source * Observable encounters an error * @return the original Observable with appropriately modified behavior */ public Observable onErrorReturn(Func1 resumeFunction) { - return onErrorReturn(this, resumeFunction); + return create(OperationOnErrorReturn.onErrorReturn(this, resumeFunction)); } /** @@ -2707,7 +1651,7 @@ public Observable onErrorReturn(Func1 resumeFunction) { * This technique, which is called "reduce" or "aggregate" here, is sometimes called "fold," * "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, * has an inject method that does a similar operation on lists. - * + * * @param accumulator * An accumulator function to be invoked on each item emitted by the source * Observable, whose result will be used in the next accumulator call @@ -2717,21 +1661,20 @@ public Observable onErrorReturn(Func1 resumeFunction) { * @see Wikipedia: Fold (higher-order function) */ public Observable reduce(Func2 accumulator) { - return reduce(this, accumulator); + return create(OperationScan.scan(this, accumulator)).takeLast(1); } /** * Returns a {@link ConnectableObservable} that shares a single subscription to the underlying - * Observable that will replay all of its items and notifications to any future - * {@link Observer}. + * Observable that will replay all of its items and notifications to any future {@link Observer}. *

* - * + * * @return a {@link ConnectableObservable} that upon connection causes the source Observable to * emit items to its {@link Observer}s */ public ConnectableObservable replay() { - return replay(this); + return OperationMulticast.multicast(this, ReplaySubject. create()); } /** @@ -2746,37 +1689,36 @@ public ConnectableObservable replay() { * NOTE: You sacrifice the ability to unsubscribe from the origin when you use the * cache() operator so be careful not to use this operator on Observables that * emit an infinite or very large number of items that will use up memory. - * + * * @return an Observable that when first subscribed to, caches all of its notifications for * the benefit of subsequent subscribers. */ public Observable cache() { - return cache(this); + return create(OperationCache.cache(this)); } /** - * Returns a {@link ConnectableObservable}, which waits until its - * {@link ConnectableObservable#connect connect} method is called before it begins emitting + * Returns a {@link ConnectableObservable}, which waits until its {@link ConnectableObservable#connect connect} method is called before it begins emitting * items to those {@link Observer}s that have subscribed to it. *

* - * + * * @return a {@link ConnectableObservable} that upon connection causes the source Observable to * emit items to its {@link Observer}s */ public ConnectableObservable publish() { - return publish(this); + return OperationMulticast.multicast(this, PublishSubject. create()); } /** * Synonymous with reduce(). *

* - * + * * @see #reduce(Func2) */ public Observable aggregate(Func2 accumulator) { - return aggregate(this, accumulator); + return reduce(accumulator); } /** @@ -2791,7 +1733,7 @@ public Observable aggregate(Func2 accumulator) { * This technique, which is called "reduce" or "aggregate" here, is sometimes called "fold," * "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, * has an inject method that does a similar operation on lists. - * + * * @param initialValue * the initial (seed) accumulator value * @param accumulator @@ -2803,18 +1745,18 @@ public Observable aggregate(Func2 accumulator) { * @see Wikipedia: Fold (higher-order function) */ public Observable reduce(R initialValue, Func2 accumulator) { - return reduce(this, initialValue, accumulator); + return create(OperationScan.scan(this, initialValue, accumulator)).takeLast(1); } /** * Synonymous with reduce(). *

* - * + * * @see #reduce(Object, Func2) */ public Observable aggregate(R initialValue, Func2 accumulator) { - return aggregate(this, initialValue, accumulator); + return reduce(initialValue, accumulator); } /** @@ -2829,16 +1771,15 @@ public Observable aggregate(R initialValue, Func2 accumulator) { *

* Note that when you pass a seed to scan() the resulting Observable will emit * that seed as its first emitted item. - * + * * @param accumulator * an accumulator function to be invoked on each item emitted by the source - * Observable, whose result will be emitted to {@link Observer}s via - * {@link Observer#onNext onNext} and used in the next accumulator call. + * Observable, whose result will be emitted to {@link Observer}s via {@link Observer#onNext onNext} and used in the next accumulator call. * @return an Observable that emits the results of each call to the accumulator function * @see MSDN: Observable.Scan */ public Observable scan(Func2 accumulator) { - return scan(this, accumulator); + return create(OperationScan.scan(this, accumulator)); } /** @@ -2846,7 +1787,7 @@ public Observable scan(Func2 accumulator) { * Observable at a specified time interval. *

* - * + * * @param period * the sampling rate * @param unit @@ -2863,7 +1804,7 @@ public Observable sample(long period, TimeUnit unit) { * Observable at a specified time interval. *

* - * + * * @param period * the sampling rate * @param unit @@ -2889,19 +1830,17 @@ public Observable sample(long period, TimeUnit unit, Scheduler scheduler) { *

* Note that when you pass a seed to scan() the resulting Observable will emit * that seed as its first emitted item. - * + * * @param initialValue * the initial (seed) accumulator value * @param accumulator * an accumulator function to be invoked on each item emitted by the source - * Observable, whose result will be emitted to {@link Observer}s via - * {@link Observer#onNext onNext} and used in the next accumulator call. + * Observable, whose result will be emitted to {@link Observer}s via {@link Observer#onNext onNext} and used in the next accumulator call. * @return an Observable that emits the results of each call to the accumulator function - * @see MSDN: - * Observable.Scan + * @see MSDN: Observable.Scan */ public Observable scan(R initialValue, Func2 accumulator) { - return scan(this, initialValue, accumulator); + return create(OperationScan.scan(this, initialValue, accumulator)); } /** @@ -2909,14 +1848,14 @@ public Observable scan(R initialValue, Func2 accumulator) { * the source Observable satisfy a condition. *

* - * + * * @param predicate * a function that evaluates an item and returns a Boolean * @return an Observable that emits true if all items emitted by the source * Observable satisfy the predicate; otherwise, false */ public Observable all(Func1 predicate) { - return all(this, predicate); + return create(OperationAll.all(this, predicate)); } /** @@ -2927,14 +1866,14 @@ public Observable all(Func1 predicate) { *

* You can ignore the first num items emitted by an Observable and attend only to * those items that come after, by modifying the Observable with the skip method. - * + * * @param num * the number of items to skip * @return an Observable that is identical to the source Observable except that it does not * emit the first num items that the source emits */ public Observable skip(int num) { - return skip(this, num); + return create(OperationSkip.skip(this, num)); } /** @@ -2943,10 +1882,9 @@ public Observable skip(int num) { *

* *

- * This method returns an Observable that will invoke a subscribing {@link Observer}'s - * {@link Observer#onNext onNext} function a maximum of num times before invoking + * This method returns an Observable that will invoke a subscribing {@link Observer}'s {@link Observer#onNext onNext} function a maximum of num times before invoking * {@link Observer#onCompleted onCompleted}. - * + * * @param num * the number of items to take * @return an Observable that emits only the first num items from the source @@ -2954,7 +1892,7 @@ public Observable skip(int num) { * fewer than num items */ public Observable take(final int num) { - return take(this, num); + return create(OperationTake.take(this, num)); } /** @@ -2962,7 +1900,7 @@ public Observable take(final int num) { * specified condition is true. *

* - * + * * @param predicate * a function that evaluates an item emitted by the source Observable and returns a * Boolean @@ -2970,7 +1908,7 @@ public Observable take(final int num) { * satisfies the condition defined by predicate */ public Observable takeWhile(final Func1 predicate) { - return takeWhile(this, predicate); + return create(OperationTakeWhile.takeWhile(this, predicate)); } /** @@ -2979,7 +1917,7 @@ public Observable takeWhile(final Func1 predicate) { * relative to the complete sequence. *

* - * + * * @param predicate * a function to test each item emitted by the source Observable for a condition; * the second parameter of the function represents the index of the source item @@ -2987,7 +1925,7 @@ public Observable takeWhile(final Func1 predicate) { * continues to return true for each item, then completes */ public Observable takeWhileWithIndex(final Func2 predicate) { - return takeWhileWithIndex(this, predicate); + return create(OperationTakeWhile.takeWhileWithIndex(this, predicate)); } /** @@ -2995,7 +1933,7 @@ public Observable takeWhileWithIndex(final Func2 predica * Observable. *

* - * + * * @param count * the number of items to emit from the end of the sequence emitted by the source * Observable @@ -3003,7 +1941,7 @@ public Observable takeWhileWithIndex(final Func2 predica * Observable */ public Observable takeLast(final int count) { - return takeLast(this, count); + return create(OperationTakeLast.takeLast(this, count)); } /** @@ -3011,7 +1949,7 @@ public Observable takeLast(final int count) { * other Observable emits an item. *

* - * + * * @param other * the Observable whose first emitted item will cause takeUntil to stop * emitting items from the source Observable @@ -3021,7 +1959,7 @@ public Observable takeLast(final int count) { * other emits its first item */ public Observable takeUntil(Observable other) { - return takeUntil(this, other); + return OperationTakeUntil.takeUntil(this, other); } /** @@ -3030,21 +1968,19 @@ public Observable takeUntil(Observable other) { *

* *

- * Normally, an Observable that returns multiple items will do so by invoking its - * {@link Observer}'s {@link Observer#onNext onNext} method for each such item. You can change + * Normally, an Observable that returns multiple items will do so by invoking its {@link Observer}'s {@link Observer#onNext onNext} method for each such item. You can change * this behavior, instructing the Observable to compose a list of all of these items and then to * invoke the Observer's onNext function once, passing it the entire list, by - * calling the Observable's toList method prior to calling its {@link #subscribe} - * method. + * calling the Observable's toList method prior to calling its {@link #subscribe} method. *

* Be careful not to use this operator on Observables that emit infinite or very large numbers * of items, as you do not have the option to unsubscribe. - * + * * @return an Observable that emits a single item: a List containing all of the items emitted by * the source Observable. */ public Observable> toList() { - return toList(this); + return create(OperationToObservableList.toObservableList(this)); } /** @@ -3053,14 +1989,14 @@ public Observable> toList() { * all other items in the sequence). *

* - * + * * @throws ClassCastException * if any item emitted by the Observable does not implement {@link Comparable} with * respect to all other items emitted by the Observable * @return an Observable that emits the items from the source Observable in sorted order */ public Observable> toSortedList() { - return toSortedList(this); + return create(OperationToObservableSortedList.toSortedList(this)); } /** @@ -3068,21 +2004,21 @@ public Observable> toSortedList() { * order based on a specified comparison function *

* - * + * * @param sortFunction * a function that compares two items emitted by the source Observable and returns * an Integer that indicates their sort order * @return an Observable that emits the items from the source Observable in sorted order */ public Observable> toSortedList(Func2 sortFunction) { - return toSortedList(this, sortFunction); + return create(OperationToObservableSortedList.toSortedList(this, sortFunction)); } /** * Emit a specified set of items before beginning to emit items from the source Observable. *

* - * + * * @param values * the items you want the modified Observable to emit first * @return an Observable that exhibits the modified behavior @@ -3097,7 +2033,7 @@ public Observable startWith(T... values) { * grouped items as {@link GroupedObservable}s, one GroupedObservable per group. *

* - * + * * @param keySelector * a function that extracts the key from an item * @param elementSelector @@ -3111,7 +2047,7 @@ public Observable startWith(T... values) { * share that key value */ public Observable> groupBy(final Func1 keySelector, final Func1 elementSelector) { - return groupBy(this, keySelector, elementSelector); + return create(OperationGroupBy.groupBy(this, keySelector, elementSelector)); } /** @@ -3119,7 +2055,7 @@ public Observable> groupBy(final Func1 keyS * grouped items as {@link GroupedObservable}s, one GroupedObservable per group. *

* - * + * * @param keySelector * a function that extracts the key for each item * @param @@ -3129,15 +2065,14 @@ public Observable> groupBy(final Func1 keyS * share that key value */ public Observable> groupBy(final Func1 keySelector) { - return groupBy(this, keySelector); + return create(OperationGroupBy.groupBy(this, keySelector)); } /** * Converts an Observable into a {@link BlockingObservable} (an Observable with blocking * operators). - * - * @see Blocking - * Observable Operators + * + * @see Blocking Observable Operators */ public BlockingObservable toBlockingObservable() { return BlockingObservable.from(this); @@ -3147,9 +2082,9 @@ public BlockingObservable toBlockingObservable() { * Whether a given {@link Function} is an internal implementation inside rx.* packages or not. *

* For why this is being used see https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" - * + * * NOTE: If strong reasons for not depending on package names comes up then the implementation of this method can change to looking for a marker interface. - * + * * @param f * @return {@code true} if the given function is an internal implementation, and {@code false} otherwise. */ @@ -3165,517 +2100,4 @@ private boolean isInternalImplementation(Object o) { return p != null && p.getName().startsWith("rx.operators"); } - public static class UnitTest { - - @Mock - Observer w; - - @Before - public void before() { - MockitoAnnotations.initMocks(this); - } - - @Test - public void testCreate() { - - Observable observable = create(new Func1, Subscription>() { - - @Override - public Subscription call(Observer Observer) { - Observer.onNext("one"); - Observer.onNext("two"); - Observer.onNext("three"); - Observer.onCompleted(); - return Subscriptions.empty(); - } - - }); - - @SuppressWarnings("unchecked") - Observer aObserver = mock(Observer.class); - observable.subscribe(aObserver); - verify(aObserver, times(1)).onNext("one"); - verify(aObserver, times(1)).onNext("two"); - verify(aObserver, times(1)).onNext("three"); - verify(aObserver, Mockito.never()).onError(any(Throwable.class)); - verify(aObserver, times(1)).onCompleted(); - } - - @Test - public void testReduce() { - Observable Observable = toObservable(1, 2, 3, 4); - reduce(Observable, new Func2() { - - @Override - public Integer call(Integer t1, Integer t2) { - return t1 + t2; - } - - }).subscribe(w); - // we should be called only once - verify(w, times(1)).onNext(anyInt()); - verify(w).onNext(10); - } - - @Test - public void testReduceWithInitialValue() { - Observable Observable = toObservable(1, 2, 3, 4); - reduce(Observable, 50, new Func2() { - - @Override - public Integer call(Integer t1, Integer t2) { - return t1 + t2; - } - - }).subscribe(w); - // we should be called only once - verify(w, times(1)).onNext(anyInt()); - verify(w).onNext(60); - } - - @Test - public void testSequenceEqual() { - Observable first = toObservable(1, 2, 3); - Observable second = toObservable(1, 2, 4); - @SuppressWarnings("unchecked") - Observer result = mock(Observer.class); - sequenceEqual(first, second).subscribe(result); - verify(result, times(2)).onNext(true); - verify(result, times(1)).onNext(false); - } - - @Test - public void testOnSubscribeFails() { - @SuppressWarnings("unchecked") - Observer observer = mock(Observer.class); - final RuntimeException re = new RuntimeException("bad impl"); - Observable o = Observable.create(new Func1, Subscription>() { - - @Override - public Subscription call(Observer t1) { - throw re; - } - - }); - o.subscribe(observer); - verify(observer, times(0)).onNext(anyString()); - verify(observer, times(0)).onCompleted(); - verify(observer, times(1)).onError(re); - } - - @Test - public void testMaterializeDematerializeChaining() { - Observable obs = Observable.just(1); - Observable chained = obs.materialize().dematerialize(); - - @SuppressWarnings("unchecked") - Observer observer = mock(Observer.class); - chained.subscribe(observer); - - verify(observer, times(1)).onNext(1); - verify(observer, times(1)).onCompleted(); - verify(observer, times(0)).onError(any(Throwable.class)); - } - - /** - * The error from the user provided Observer is not handled by the subscribe method try/catch. - * - * It is handled by the AtomicObserver that wraps the provided Observer. - * - * Result: Passes (if AtomicObserver functionality exists) - */ - @Test - public void testCustomObservableWithErrorInObserverAsynchronous() throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); - final AtomicInteger count = new AtomicInteger(); - final AtomicReference error = new AtomicReference(); - Observable.create(new Func1, Subscription>() { - - @Override - public Subscription call(final Observer observer) { - final BooleanSubscription s = new BooleanSubscription(); - new Thread(new Runnable() { - - @Override - public void run() { - try { - if (!s.isUnsubscribed()) { - observer.onNext("1"); - observer.onNext("2"); - observer.onNext("three"); - observer.onNext("4"); - observer.onCompleted(); - } - } finally { - latch.countDown(); - } - } - }).start(); - return s; - } - }).subscribe(new Observer() { - @Override - public void onCompleted() { - System.out.println("completed"); - } - - @Override - public void onError(Throwable e) { - error.set(e); - System.out.println("error"); - e.printStackTrace(); - } - - @Override - public void onNext(String v) { - int num = Integer.parseInt(v); - System.out.println(num); - // doSomething(num); - count.incrementAndGet(); - } - - }); - - // wait for async sequence to complete - latch.await(); - - assertEquals(2, count.get()); - assertNotNull(error.get()); - if (!(error.get() instanceof NumberFormatException)) { - fail("It should be a NumberFormatException"); - } - } - - /** - * The error from the user provided Observer is handled by the subscribe try/catch because this is synchronous - * - * Result: Passes - */ - @Test - public void testCustomObservableWithErrorInObserverSynchronous() { - final AtomicInteger count = new AtomicInteger(); - final AtomicReference error = new AtomicReference(); - Observable.create(new Func1, Subscription>() { - - @Override - public Subscription call(Observer observer) { - observer.onNext("1"); - observer.onNext("2"); - observer.onNext("three"); - observer.onNext("4"); - observer.onCompleted(); - return Subscriptions.empty(); - } - }).subscribe(new Observer() { - - @Override - public void onCompleted() { - System.out.println("completed"); - } - - @Override - public void onError(Throwable e) { - error.set(e); - System.out.println("error"); - e.printStackTrace(); - } - - @Override - public void onNext(String v) { - int num = Integer.parseInt(v); - System.out.println(num); - // doSomething(num); - count.incrementAndGet(); - } - - }); - assertEquals(2, count.get()); - assertNotNull(error.get()); - if (!(error.get() instanceof NumberFormatException)) { - fail("It should be a NumberFormatException"); - } - } - - /** - * The error from the user provided Observable is handled by the subscribe try/catch because this is synchronous - * - * - * Result: Passes - */ - @Test - public void testCustomObservableWithErrorInObservableSynchronous() { - final AtomicInteger count = new AtomicInteger(); - final AtomicReference error = new AtomicReference(); - Observable.create(new Func1, Subscription>() { - - @Override - public Subscription call(Observer observer) { - observer.onNext("1"); - observer.onNext("2"); - throw new NumberFormatException(); - } - }).subscribe(new Observer() { - - @Override - public void onCompleted() { - System.out.println("completed"); - } - - @Override - public void onError(Throwable e) { - error.set(e); - System.out.println("error"); - e.printStackTrace(); - } - - @Override - public void onNext(String v) { - System.out.println(v); - count.incrementAndGet(); - } - - }); - assertEquals(2, count.get()); - assertNotNull(error.get()); - if (!(error.get() instanceof NumberFormatException)) { - fail("It should be a NumberFormatException"); - } - } - - @Test - public void testPublish() throws InterruptedException { - final AtomicInteger counter = new AtomicInteger(); - ConnectableObservable o = Observable.create(new Func1, Subscription>() { - - @Override - public Subscription call(final Observer observer) { - final BooleanSubscription subscription = new BooleanSubscription(); - new Thread(new Runnable() { - - @Override - public void run() { - counter.incrementAndGet(); - observer.onNext("one"); - observer.onCompleted(); - } - }).start(); - return subscription; - } - }).publish(); - - final CountDownLatch latch = new CountDownLatch(2); - - // subscribe once - o.subscribe(new Action1() { - - @Override - public void call(String v) { - assertEquals("one", v); - latch.countDown(); - } - }); - - // subscribe again - o.subscribe(new Action1() { - - @Override - public void call(String v) { - assertEquals("one", v); - latch.countDown(); - } - }); - - Subscription s = o.connect(); - try { - if (!latch.await(1000, TimeUnit.MILLISECONDS)) { - fail("subscriptions did not receive values"); - } - assertEquals(1, counter.get()); - } finally { - s.unsubscribe(); - } - } - - @Test - public void testReplay() throws InterruptedException { - final AtomicInteger counter = new AtomicInteger(); - ConnectableObservable o = Observable.create(new Func1, Subscription>() { - - @Override - public Subscription call(final Observer observer) { - final BooleanSubscription subscription = new BooleanSubscription(); - new Thread(new Runnable() { - - @Override - public void run() { - counter.incrementAndGet(); - observer.onNext("one"); - observer.onCompleted(); - } - }).start(); - return subscription; - } - }).replay(); - - // we connect immediately and it will emit the value - Subscription s = o.connect(); - try { - - // we then expect the following 2 subscriptions to get that same value - final CountDownLatch latch = new CountDownLatch(2); - - // subscribe once - o.subscribe(new Action1() { - - @Override - public void call(String v) { - assertEquals("one", v); - latch.countDown(); - } - }); - - // subscribe again - o.subscribe(new Action1() { - - @Override - public void call(String v) { - assertEquals("one", v); - latch.countDown(); - } - }); - - if (!latch.await(1000, TimeUnit.MILLISECONDS)) { - fail("subscriptions did not receive values"); - } - assertEquals(1, counter.get()); - } finally { - s.unsubscribe(); - } - } - - @Test - public void testCache() throws InterruptedException { - final AtomicInteger counter = new AtomicInteger(); - Observable o = Observable.create(new Func1, Subscription>() { - - @Override - public Subscription call(final Observer observer) { - final BooleanSubscription subscription = new BooleanSubscription(); - new Thread(new Runnable() { - - @Override - public void run() { - counter.incrementAndGet(); - observer.onNext("one"); - observer.onCompleted(); - } - }).start(); - return subscription; - } - }).cache(); - - // we then expect the following 2 subscriptions to get that same value - final CountDownLatch latch = new CountDownLatch(2); - - // subscribe once - o.subscribe(new Action1() { - - @Override - public void call(String v) { - assertEquals("one", v); - latch.countDown(); - } - }); - - // subscribe again - o.subscribe(new Action1() { - - @Override - public void call(String v) { - assertEquals("one", v); - latch.countDown(); - } - }); - - if (!latch.await(1000, TimeUnit.MILLISECONDS)) { - fail("subscriptions did not receive values"); - } - assertEquals(1, counter.get()); - } - - /** - * https://github.com/Netflix/RxJava/issues/198 - * - * Rx Design Guidelines 5.2 - * - * "when calling the Subscribe method that only has an onNext argument, the OnError behavior will be - * to rethrow the exception on the thread that the message comes out from the Observable. - * The OnCompleted behavior in this case is to do nothing." - */ - @Test - public void testErrorThrownWithoutErrorHandlerSynchronous() { - try { - error(new RuntimeException("failure")).subscribe(new Action1() { - - @Override - public void call(Object t1) { - // won't get anything - } - - }); - fail("expected exception"); - } catch (Throwable e) { - assertEquals("failure", e.getMessage()); - } - } - - /** - * https://github.com/Netflix/RxJava/issues/198 - * - * Rx Design Guidelines 5.2 - * - * "when calling the Subscribe method that only has an onNext argument, the OnError behavior will be - * to rethrow the exception on the thread that the message comes out from the Observable. - * The OnCompleted behavior in this case is to do nothing." - * - * @throws InterruptedException - */ - @Test - public void testErrorThrownWithoutErrorHandlerAsynchronous() throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); - final AtomicReference exception = new AtomicReference(); - Observable.create(new Func1, Subscription>() { - - @Override - public Subscription call(final Observer observer) { - new Thread(new Runnable() { - - @Override - public void run() { - try { - observer.onError(new Error("failure")); - } catch (Throwable e) { - // without an onError handler it has to just throw on whatever thread invokes it - exception.set(e); - } - latch.countDown(); - } - }).start(); - return Subscriptions.empty(); - } - }).subscribe(new Action1() { - - @Override - public void call(String t1) { - - } - - }); - // wait for exception - latch.await(3000, TimeUnit.MILLISECONDS); - assertNotNull(exception.get()); - assertEquals("failure", exception.get().getMessage()); - } - } - } diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index 22735209cdc..bcfd48bf681 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -151,39 +151,6 @@ public Subscription schedule(T state, Func2 acti } } - /** - * Schedules a cancelable action to be executed. - * - * @param action - * Action to schedule. - * @return a subscription to be able to unsubscribe from action. - */ - public Subscription schedule(final Func1 action) { - return schedule(null, new Func2() { - - @Override - public Subscription call(Scheduler scheduler, @SuppressWarnings("unused") Void state) { - return action.call(scheduler); - } - }); - } - - /** - * Schedules a cancelable action to be executed. - * - * @param action - * action - * @return a subscription to be able to unsubscribe from action. - */ - public Subscription schedule(final Func0 action) { - return schedule(null, new Func2() { - - @Override - public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) { - return action.call(); - } - }); - } /** * Schedules an action to be executed. @@ -203,27 +170,6 @@ public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @Suppr }); } - /** - * Schedules a cancelable action to be executed in delayTime. - * - * @param action - * Action to schedule. - * @param delayTime - * Time the action is to be delayed before executing. - * @param unit - * Time unit of the delay time. - * @return a subscription to be able to unsubscribe from action. - */ - public Subscription schedule(final Func1 action, long delayTime, TimeUnit unit) { - return schedule(null, new Func2() { - - @Override - public Subscription call(Scheduler scheduler, @SuppressWarnings("unused") Void state) { - return action.call(scheduler); - } - }, delayTime, unit); - } - /** * Schedules an action to be executed in delayTime. * @@ -242,66 +188,6 @@ public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @Suppr }, delayTime, unit); } - /** - * Schedules a cancelable action to be executed in delayTime. - * - * @param action - * action - * @return a subscription to be able to unsubscribe from action. - */ - public Subscription schedule(final Func0 action, long delayTime, TimeUnit unit) { - return schedule(null, new Func2() { - - @Override - public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) { - return action.call(); - } - }, delayTime, unit); - } - - /** - * Schedules a cancelable action to be executed periodically. - * - * @param action - * The action to execute periodically. - * @param initialDelay - * Time to wait before executing the action for the first time. - * @param period - * The time interval to wait each time in between executing the action. - * @param unit - * The time unit the interval above is given in. - * @return A subscription to be able to unsubscribe from action. - */ - public Subscription schedulePeriodically(final Func1 action, long initialDelay, long period, TimeUnit unit) { - return schedulePeriodically(null, new Func2() { - @Override - public Subscription call(Scheduler scheduler, @SuppressWarnings("unused") Void state) { - return action.call(scheduler); - } - }, initialDelay, period, unit); - } - - /** - * Schedules a cancelable action to be executed periodically. - * - * @param action - * The action to execute periodically. - * @param initialDelay - * Time to wait before executing the action for the first time. - * @param period - * The time interval to wait each time in between executing the action. - * @param unit - * The time unit the interval above is given in. - * @return A subscription to be able to unsubscribe from action. - */ - public Subscription schedulePeriodically(final Func0 action, long initialDelay, long period, TimeUnit unit) { - return schedulePeriodically(null, new Func2() { - @Override - public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) { - return action.call(); - } - }, initialDelay, period, unit); - } /** * Schedules an action to be executed periodically. diff --git a/rxjava-core/src/main/java/rx/operators/OperationDematerialize.java b/rxjava-core/src/main/java/rx/operators/OperationDematerialize.java index 3d5c2d83929..14ac8748b6a 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDematerialize.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDematerialize.java @@ -92,7 +92,7 @@ public static class UnitTest { @SuppressWarnings("unchecked") public void testDematerialize1() { Observable> notifications = Observable.from(1, 2).materialize(); - Observable dematerialize = Observable.dematerialize(notifications); + Observable dematerialize = notifications.dematerialize(); Observer aObserver = mock(Observer.class); dematerialize.subscribe(aObserver); diff --git a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java index 8c0342a37ff..5aa6ff6f681 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java @@ -16,6 +16,8 @@ package rx.operators; import org.junit.Test; +import org.mockito.Matchers; + import rx.Observable; import rx.Observer; import rx.Scheduler; @@ -24,6 +26,7 @@ import rx.util.functions.Action0; import rx.util.functions.Func0; import rx.util.functions.Func1; +import rx.util.functions.Func2; import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; @@ -50,9 +53,9 @@ public SubscribeOn(Observable source, Scheduler scheduler) { @Override public Subscription call(final Observer observer) { - return scheduler.schedule(new Func0() { + return scheduler.schedule(null, new Func2() { @Override - public Subscription call() { + public Subscription call(Scheduler s, T t) { return new ScheduledSubscription(source.subscribe(observer), scheduler); } }); @@ -91,7 +94,7 @@ public void testSubscribeOn() { Observer observer = mock(Observer.class); Subscription subscription = Observable.create(subscribeOn(w, scheduler)).subscribe(observer); - verify(scheduler, times(1)).schedule(any(Func0.class)); + verify(scheduler, times(1)).schedule(isNull(), any(Func2.class)); subscription.unsubscribe(); verify(scheduler, times(1)).schedule(any(Action0.class)); verifyNoMoreInteractions(scheduler); diff --git a/rxjava-core/src/main/java/rx/operators/OperationToIterator.java b/rxjava-core/src/main/java/rx/operators/OperationToIterator.java index f3c1b210679..4b56e7dda1b 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationToIterator.java +++ b/rxjava-core/src/main/java/rx/operators/OperationToIterator.java @@ -49,10 +49,10 @@ public class OperationToIterator { * the type of source. * @return the iterator that could be used to iterate over the elements of the observable. */ - public static Iterator toIterator(Observable that) { + public static Iterator toIterator(Observable source) { final BlockingQueue> notifications = new LinkedBlockingQueue>(); - Observable.materialize(that).subscribe(new Observer>() { + source.materialize().subscribe(new Observer>() { @Override public void onCompleted() { // ignore diff --git a/rxjava-core/src/main/java/rx/operators/OperationWhere.java b/rxjava-core/src/main/java/rx/operators/OperationWhere.java deleted file mode 100644 index d82b6d829b8..00000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationWhere.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import org.junit.Test; -import org.mockito.Mockito; - -import rx.Observable; -import rx.Observer; -import rx.Subscription; -import rx.util.functions.Func1; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -/** - * Filters an Observable by discarding any items it emits that do not meet some test. - *

- * - */ -public final class OperationWhere { - - public static Func1, Subscription> where(Observable that, Func1 predicate) { - return OperationFilter.filter(that, predicate); - } - - public static class UnitTest { - - @Test - public void testWhere() { - Observable w = Observable.from("one", "two", "three"); - Observable observable = Observable.create(where(w, new Func1() { - - @Override - public Boolean call(String t1) { - return t1.equals("two"); - } - })); - - @SuppressWarnings("unchecked") - Observer aObserver = mock(Observer.class); - observable.subscribe(aObserver); - verify(aObserver, Mockito.never()).onNext("one"); - verify(aObserver, times(1)).onNext("two"); - verify(aObserver, Mockito.never()).onNext("three"); - verify(aObserver, Mockito.never()).onError(any(Throwable.class)); - verify(aObserver, times(1)).onCompleted(); - } - } - -} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorTester.java b/rxjava-core/src/main/java/rx/operators/OperatorTester.java index 22283673d57..664dd37cc83 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorTester.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorTester.java @@ -15,25 +15,11 @@ */ package rx.operators; -import static org.junit.Assert.*; - -import java.lang.Thread.UncaughtExceptionHandler; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import org.junit.Test; -import rx.Observable; -import rx.Observer; import rx.Scheduler; import rx.Subscription; -import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; -import rx.util.functions.Func0; -import rx.util.functions.Func1; import rx.util.functions.Func2; /** @@ -77,16 +63,6 @@ public Subscription schedule(Action0 action) { return underlying.schedule(action); } - @Override - public Subscription schedule(Func0 action) { - return underlying.schedule(action); - } - - @Override - public Subscription schedule(Func1 action) { - return underlying.schedule(action); - } - @Override public Subscription schedule(T state, Func2 action) { return underlying.schedule(state, action); @@ -97,16 +73,6 @@ public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) { return underlying.schedule(action, dueTime, unit); } - @Override - public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { - return underlying.schedule(action, dueTime, unit); - } - - @Override - public Subscription schedule(Func1 action, long dueTime, TimeUnit unit) { - return underlying.schedule(action, dueTime, unit); - } - @Override public Subscription schedule(T state, Func2 action, long dueTime, TimeUnit unit) { return underlying.schedule(state, action, dueTime, unit); @@ -117,16 +83,6 @@ public Subscription schedulePeriodically(Action0 action, long initialDelay, long return underlying.schedulePeriodically(action, initialDelay, period, unit); } - @Override - public Subscription schedulePeriodically(Func0 action, long initialDelay, long period, TimeUnit unit) { - return underlying.schedulePeriodically(action, initialDelay, period, unit); - } - - @Override - public Subscription schedulePeriodically(Func1 action, long initialDelay, long period, TimeUnit unit) { - return underlying.schedulePeriodically(action, initialDelay, period, unit); - } - @Override public Subscription schedulePeriodically(T state, Func2 action, long initialDelay, long period, TimeUnit unit) { return underlying.schedulePeriodically(state, action, initialDelay, period, unit); diff --git a/rxjava-core/src/main/java/rx/util/functions/Action.java b/rxjava-core/src/main/java/rx/util/functions/Action.java new file mode 100644 index 00000000000..c1d43eede6e --- /dev/null +++ b/rxjava-core/src/main/java/rx/util/functions/Action.java @@ -0,0 +1,10 @@ +package rx.util.functions; + +/** + * All Action interfaces extend from this. + *

+ * Marker interface to allow instanceof checks. + */ +public interface Action { + +} diff --git a/rxjava-core/src/main/java/rx/util/functions/Action0.java b/rxjava-core/src/main/java/rx/util/functions/Action0.java index 62d57bd5630..7b6e7426992 100644 --- a/rxjava-core/src/main/java/rx/util/functions/Action0.java +++ b/rxjava-core/src/main/java/rx/util/functions/Action0.java @@ -15,6 +15,6 @@ */ package rx.util.functions; -public interface Action0 extends Function { +public interface Action0 extends Function, Action { public void call(); } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/functions/Action1.java b/rxjava-core/src/main/java/rx/util/functions/Action1.java index 14fa7ced8ce..e21fd4e38b1 100644 --- a/rxjava-core/src/main/java/rx/util/functions/Action1.java +++ b/rxjava-core/src/main/java/rx/util/functions/Action1.java @@ -15,6 +15,6 @@ */ package rx.util.functions; -public interface Action1 extends Function { +public interface Action1 extends Function, Action { public void call(T1 t1); } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/functions/Action2.java b/rxjava-core/src/main/java/rx/util/functions/Action2.java index 8a17875a9ee..76c48d3eafb 100644 --- a/rxjava-core/src/main/java/rx/util/functions/Action2.java +++ b/rxjava-core/src/main/java/rx/util/functions/Action2.java @@ -15,6 +15,6 @@ */ package rx.util.functions; -public interface Action2 extends Function { +public interface Action2 extends Function, Action { public void call(T1 t1, T2 t2); } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/functions/Action3.java b/rxjava-core/src/main/java/rx/util/functions/Action3.java index 2b613b621ef..0bb69327929 100644 --- a/rxjava-core/src/main/java/rx/util/functions/Action3.java +++ b/rxjava-core/src/main/java/rx/util/functions/Action3.java @@ -15,6 +15,6 @@ */ package rx.util.functions; -public interface Action3 extends Function { +public interface Action3 extends Function, Action { public void call(T1 t1, T2 t2, T3 t3); } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/functions/Function.java b/rxjava-core/src/main/java/rx/util/functions/Function.java index cfe85a221f3..60ad4b52396 100644 --- a/rxjava-core/src/main/java/rx/util/functions/Function.java +++ b/rxjava-core/src/main/java/rx/util/functions/Function.java @@ -3,7 +3,7 @@ /** * All Func and Action interfaces extend from this. *

- * Marker interface to allow isntanceof checks. + * Marker interface to allow instanceof checks. */ public interface Function { diff --git a/rxjava-core/src/test/java/README.md b/rxjava-core/src/test/java/README.md index 74b6c915369..c6a9aea6afa 100644 --- a/rxjava-core/src/test/java/README.md +++ b/rxjava-core/src/test/java/README.md @@ -1,6 +1,4 @@ -This test folder only contains performance and functional/integration style tests. - -The unit tests themselves are embedded as inner classes of the Java code (such as here: [rxjava-core/src/main/java/rx/operators](https://github.com/Netflix/RxJava/tree/master/rxjava-core/src/main/java/rx/operators)). +Not all unit tests are here, many are also embedded as inner classes of the main code (such as here: [rxjava-core/src/main/java/rx/operators](https://github.com/Netflix/RxJava/tree/master/rxjava-core/src/main/java/rx/operators)). * For an explanation of this design choice see Ben J. Christensen's [JUnit Tests as Inner Classes](http://benjchristensen.com/2011/10/23/junit-tests-as-inner-classes/). diff --git a/rxjava-core/src/test/java/rx/ObservableTests.java b/rxjava-core/src/test/java/rx/ObservableTests.java new file mode 100644 index 00000000000..690d45859ee --- /dev/null +++ b/rxjava-core/src/test/java/rx/ObservableTests.java @@ -0,0 +1,536 @@ +package rx; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import rx.observables.ConnectableObservable; +import rx.subscriptions.BooleanSubscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action1; +import rx.util.functions.Func1; +import rx.util.functions.Func2; + +public class ObservableTests { + + @Mock + Observer w; + + @Before + public void before() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testCreate() { + + Observable observable = Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(Observer Observer) { + Observer.onNext("one"); + Observer.onNext("two"); + Observer.onNext("three"); + Observer.onCompleted(); + return Subscriptions.empty(); + } + + }); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, times(1)).onNext("three"); + verify(aObserver, Mockito.never()).onError(any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testReduce() { + Observable observable = Observable.from(1, 2, 3, 4); + observable.reduce(new Func2() { + + @Override + public Integer call(Integer t1, Integer t2) { + return t1 + t2; + } + + }).subscribe(w); + // we should be called only once + verify(w, times(1)).onNext(anyInt()); + verify(w).onNext(10); + } + + @Test + public void testReduceWithInitialValue() { + Observable observable = Observable.from(1, 2, 3, 4); + observable.reduce(50, new Func2() { + + @Override + public Integer call(Integer t1, Integer t2) { + return t1 + t2; + } + + }).subscribe(w); + // we should be called only once + verify(w, times(1)).onNext(anyInt()); + verify(w).onNext(60); + } + + @Test + public void testSequenceEqual() { + Observable first = Observable.from(1, 2, 3); + Observable second = Observable.from(1, 2, 4); + @SuppressWarnings("unchecked") + Observer result = mock(Observer.class); + Observable.sequenceEqual(first, second).subscribe(result); + verify(result, times(2)).onNext(true); + verify(result, times(1)).onNext(false); + } + + @Test + public void testOnSubscribeFails() { + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + final RuntimeException re = new RuntimeException("bad impl"); + Observable o = Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(Observer t1) { + throw re; + } + + }); + o.subscribe(observer); + verify(observer, times(0)).onNext(anyString()); + verify(observer, times(0)).onCompleted(); + verify(observer, times(1)).onError(re); + } + + @Test + public void testMaterializeDematerializeChaining() { + Observable obs = Observable.just(1); + Observable chained = obs.materialize().dematerialize(); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + chained.subscribe(observer); + + verify(observer, times(1)).onNext(1); + verify(observer, times(1)).onCompleted(); + verify(observer, times(0)).onError(any(Throwable.class)); + } + + /** + * The error from the user provided Observer is not handled by the subscribe method try/catch. + * + * It is handled by the AtomicObserver that wraps the provided Observer. + * + * Result: Passes (if AtomicObserver functionality exists) + */ + @Test + public void testCustomObservableWithErrorInObserverAsynchronous() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger count = new AtomicInteger(); + final AtomicReference error = new AtomicReference(); + Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(final Observer observer) { + final BooleanSubscription s = new BooleanSubscription(); + new Thread(new Runnable() { + + @Override + public void run() { + try { + if (!s.isUnsubscribed()) { + observer.onNext("1"); + observer.onNext("2"); + observer.onNext("three"); + observer.onNext("4"); + observer.onCompleted(); + } + } finally { + latch.countDown(); + } + } + }).start(); + return s; + } + }).subscribe(new Observer() { + @Override + public void onCompleted() { + System.out.println("completed"); + } + + @Override + public void onError(Throwable e) { + error.set(e); + System.out.println("error"); + e.printStackTrace(); + } + + @Override + public void onNext(String v) { + int num = Integer.parseInt(v); + System.out.println(num); + // doSomething(num); + count.incrementAndGet(); + } + + }); + + // wait for async sequence to complete + latch.await(); + + assertEquals(2, count.get()); + assertNotNull(error.get()); + if (!(error.get() instanceof NumberFormatException)) { + fail("It should be a NumberFormatException"); + } + } + + /** + * The error from the user provided Observer is handled by the subscribe try/catch because this is synchronous + * + * Result: Passes + */ + @Test + public void testCustomObservableWithErrorInObserverSynchronous() { + final AtomicInteger count = new AtomicInteger(); + final AtomicReference error = new AtomicReference(); + Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(Observer observer) { + observer.onNext("1"); + observer.onNext("2"); + observer.onNext("three"); + observer.onNext("4"); + observer.onCompleted(); + return Subscriptions.empty(); + } + }).subscribe(new Observer() { + + @Override + public void onCompleted() { + System.out.println("completed"); + } + + @Override + public void onError(Throwable e) { + error.set(e); + System.out.println("error"); + e.printStackTrace(); + } + + @Override + public void onNext(String v) { + int num = Integer.parseInt(v); + System.out.println(num); + // doSomething(num); + count.incrementAndGet(); + } + + }); + assertEquals(2, count.get()); + assertNotNull(error.get()); + if (!(error.get() instanceof NumberFormatException)) { + fail("It should be a NumberFormatException"); + } + } + + /** + * The error from the user provided Observable is handled by the subscribe try/catch because this is synchronous + * + * + * Result: Passes + */ + @Test + public void testCustomObservableWithErrorInObservableSynchronous() { + final AtomicInteger count = new AtomicInteger(); + final AtomicReference error = new AtomicReference(); + Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(Observer observer) { + observer.onNext("1"); + observer.onNext("2"); + throw new NumberFormatException(); + } + }).subscribe(new Observer() { + + @Override + public void onCompleted() { + System.out.println("completed"); + } + + @Override + public void onError(Throwable e) { + error.set(e); + System.out.println("error"); + e.printStackTrace(); + } + + @Override + public void onNext(String v) { + System.out.println(v); + count.incrementAndGet(); + } + + }); + assertEquals(2, count.get()); + assertNotNull(error.get()); + if (!(error.get() instanceof NumberFormatException)) { + fail("It should be a NumberFormatException"); + } + } + + @Test + public void testPublish() throws InterruptedException { + final AtomicInteger counter = new AtomicInteger(); + ConnectableObservable o = Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(final Observer observer) { + final BooleanSubscription subscription = new BooleanSubscription(); + new Thread(new Runnable() { + + @Override + public void run() { + counter.incrementAndGet(); + observer.onNext("one"); + observer.onCompleted(); + } + }).start(); + return subscription; + } + }).publish(); + + final CountDownLatch latch = new CountDownLatch(2); + + // subscribe once + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + latch.countDown(); + } + }); + + // subscribe again + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + latch.countDown(); + } + }); + + Subscription s = o.connect(); + try { + if (!latch.await(1000, TimeUnit.MILLISECONDS)) { + fail("subscriptions did not receive values"); + } + assertEquals(1, counter.get()); + } finally { + s.unsubscribe(); + } + } + + @Test + public void testReplay() throws InterruptedException { + final AtomicInteger counter = new AtomicInteger(); + ConnectableObservable o = Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(final Observer observer) { + final BooleanSubscription subscription = new BooleanSubscription(); + new Thread(new Runnable() { + + @Override + public void run() { + counter.incrementAndGet(); + observer.onNext("one"); + observer.onCompleted(); + } + }).start(); + return subscription; + } + }).replay(); + + // we connect immediately and it will emit the value + Subscription s = o.connect(); + try { + + // we then expect the following 2 subscriptions to get that same value + final CountDownLatch latch = new CountDownLatch(2); + + // subscribe once + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + latch.countDown(); + } + }); + + // subscribe again + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + latch.countDown(); + } + }); + + if (!latch.await(1000, TimeUnit.MILLISECONDS)) { + fail("subscriptions did not receive values"); + } + assertEquals(1, counter.get()); + } finally { + s.unsubscribe(); + } + } + + @Test + public void testCache() throws InterruptedException { + final AtomicInteger counter = new AtomicInteger(); + Observable o = Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(final Observer observer) { + final BooleanSubscription subscription = new BooleanSubscription(); + new Thread(new Runnable() { + + @Override + public void run() { + counter.incrementAndGet(); + observer.onNext("one"); + observer.onCompleted(); + } + }).start(); + return subscription; + } + }).cache(); + + // we then expect the following 2 subscriptions to get that same value + final CountDownLatch latch = new CountDownLatch(2); + + // subscribe once + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + latch.countDown(); + } + }); + + // subscribe again + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + latch.countDown(); + } + }); + + if (!latch.await(1000, TimeUnit.MILLISECONDS)) { + fail("subscriptions did not receive values"); + } + assertEquals(1, counter.get()); + } + + /** + * https://github.com/Netflix/RxJava/issues/198 + * + * Rx Design Guidelines 5.2 + * + * "when calling the Subscribe method that only has an onNext argument, the OnError behavior will be + * to rethrow the exception on the thread that the message comes out from the Observable. + * The OnCompleted behavior in this case is to do nothing." + */ + @Test + public void testErrorThrownWithoutErrorHandlerSynchronous() { + try { + Observable.error(new RuntimeException("failure")).subscribe(new Action1() { + + @Override + public void call(Object t1) { + // won't get anything + } + + }); + fail("expected exception"); + } catch (Throwable e) { + assertEquals("failure", e.getMessage()); + } + } + + /** + * https://github.com/Netflix/RxJava/issues/198 + * + * Rx Design Guidelines 5.2 + * + * "when calling the Subscribe method that only has an onNext argument, the OnError behavior will be + * to rethrow the exception on the thread that the message comes out from the Observable. + * The OnCompleted behavior in this case is to do nothing." + * + * @throws InterruptedException + */ + @Test + public void testErrorThrownWithoutErrorHandlerAsynchronous() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference exception = new AtomicReference(); + Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(final Observer observer) { + new Thread(new Runnable() { + + @Override + public void run() { + try { + observer.onError(new Error("failure")); + } catch (Throwable e) { + // without an onError handler it has to just throw on whatever thread invokes it + exception.set(e); + } + latch.countDown(); + } + }).start(); + return Subscriptions.empty(); + } + }).subscribe(new Action1() { + + @Override + public void call(String t1) { + + } + + }); + // wait for exception + latch.await(3000, TimeUnit.MILLISECONDS); + assertNotNull(exception.get()); + assertEquals("failure", exception.get().getMessage()); + } +} \ No newline at end of file