diff --git a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/TestParallel.groovy b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/TestParallel.groovy index 509b7b0ca54..a54241b1607 100644 --- a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/TestParallel.groovy +++ b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/TestParallel.groovy @@ -19,7 +19,7 @@ import org.junit.Test import rx.Observable import rx.Scheduler -import rx.concurrency.Schedulers +import rx.schedulers.Schedulers import rx.util.functions.Func1 class TestParallel { diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/TestSchedulerExample.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/TestSchedulerExample.scala index 2e53f1afe9a..a48b8acb51c 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/TestSchedulerExample.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/TestSchedulerExample.scala @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.lang.scala.examples import scala.concurrent.duration.DurationInt diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/JavaConversions.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/JavaConversions.scala index cc380c463c2..f36dc6b6dd9 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/JavaConversions.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/JavaConversions.scala @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.lang.scala /** diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala index 8b4edde4fa7..ead8054279f 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala @@ -204,11 +204,11 @@ trait Scheduler { private [scala] object Scheduler { def apply(scheduler: rx.Scheduler): Scheduler = scheduler match { - case s: rx.concurrency.CurrentThreadScheduler => new CurrentThreadScheduler(s) - case s: rx.concurrency.ExecutorScheduler => new ExecutorScheduler(s) - case s: rx.concurrency.ImmediateScheduler => new ImmediateScheduler(s) - case s: rx.concurrency.NewThreadScheduler => new NewThreadScheduler(s) - case s: rx.concurrency.TestScheduler => new TestScheduler(s) + case s: rx.schedulers.CurrentThreadScheduler => new CurrentThreadScheduler(s) + case s: rx.schedulers.ExecutorScheduler => new ExecutorScheduler(s) + case s: rx.schedulers.ImmediateScheduler => new ImmediateScheduler(s) + case s: rx.schedulers.NewThreadScheduler => new NewThreadScheduler(s) + case s: rx.schedulers.TestScheduler => new TestScheduler(s) case s: rx.Scheduler => new Scheduler{ val asJavaScheduler = s } } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/CurrentThreadScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/CurrentThreadScheduler.scala index f3a7898a30c..688b1e86caa 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/CurrentThreadScheduler.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/CurrentThreadScheduler.scala @@ -23,7 +23,7 @@ object CurrentThreadScheduler { * Returns a [[rx.lang.scala.Scheduler]] that queues work on the current thread to be executed after the current work completes. */ def apply(): CurrentThreadScheduler = { - new CurrentThreadScheduler(rx.concurrency.Schedulers.currentThread()) + new CurrentThreadScheduler(rx.schedulers.Schedulers.currentThread()) } } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ExecutorScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ExecutorScheduler.scala index d718c58b55f..72287939e8d 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ExecutorScheduler.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ExecutorScheduler.scala @@ -26,7 +26,7 @@ object ExecutorScheduler { * Note that this does not support scheduled actions with a delay. */ def apply(executor: Executor): ExecutorScheduler = { - new ExecutorScheduler(rx.concurrency.Schedulers.executor(executor)) + new ExecutorScheduler(rx.schedulers.Schedulers.executor(executor)) } } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ImmediateScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ImmediateScheduler.scala index 7a066a39314..cc22456afaa 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ImmediateScheduler.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ImmediateScheduler.scala @@ -23,7 +23,7 @@ object ImmediateScheduler { * Returns a [[rx.lang.scala.Scheduler]] that executes work immediately on the current thread. */ def apply(): ImmediateScheduler = { - new ImmediateScheduler(rx.concurrency.Schedulers.immediate()) + new ImmediateScheduler(rx.schedulers.Schedulers.immediate()) } } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/NewThreadScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/NewThreadScheduler.scala index 674205ba07f..19550359811 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/NewThreadScheduler.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/NewThreadScheduler.scala @@ -23,7 +23,7 @@ object NewThreadScheduler { * Returns a [[rx.lang.scala.Scheduler]] that creates a new {@link Thread} for each unit of work. */ def apply(): NewThreadScheduler = { - new NewThreadScheduler(rx.concurrency.Schedulers.newThread()) + new NewThreadScheduler(rx.schedulers.Schedulers.newThread()) } } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ScheduledExecutorServiceScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ScheduledExecutorServiceScheduler.scala index 20c4dc3544c..8f64bb4f84a 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ScheduledExecutorServiceScheduler.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ScheduledExecutorServiceScheduler.scala @@ -24,7 +24,7 @@ object ScheduledExecutorServiceScheduler { * Returns a [[rx.lang.scala.Scheduler]] that queues work on an `java.util.concurrent.ScheduledExecutorService`. */ def apply(executor: ScheduledExecutorService): ScheduledExecutorServiceScheduler = { - new ScheduledExecutorServiceScheduler(rx.concurrency.Schedulers.executor(executor)) + new ScheduledExecutorServiceScheduler(rx.schedulers.Schedulers.executor(executor)) } } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/TestScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/TestScheduler.scala index f8df7610b87..3d853f87e91 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/TestScheduler.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/TestScheduler.scala @@ -23,7 +23,7 @@ import rx.lang.scala.Scheduler */ object TestScheduler { def apply(): TestScheduler = { - new TestScheduler(new rx.concurrency.TestScheduler()) + new TestScheduler(new rx.schedulers.TestScheduler()) } } @@ -64,7 +64,7 @@ object TestScheduler { * } * }}} */ -class TestScheduler private[scala] (val asJavaScheduler: rx.concurrency.TestScheduler) extends Scheduler { +class TestScheduler private[scala] (val asJavaScheduler: rx.schedulers.TestScheduler) extends Scheduler { def advanceTimeBy(time: Duration) { asJavaScheduler.advanceTimeBy(time.length, time.unit) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ThreadPoolForComputationScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ThreadPoolForComputationScheduler.scala index b9e0791befa..38e321bc047 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ThreadPoolForComputationScheduler.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ThreadPoolForComputationScheduler.scala @@ -29,7 +29,7 @@ object ThreadPoolForComputationScheduler { * Do not perform IO-bound work on this scheduler. Use [[rx.lang.scala.schedulers.ThreadPoolForIOScheduler]] instead. */ def apply(): ThreadPoolForComputationScheduler = { - new ThreadPoolForComputationScheduler(rx.concurrency.Schedulers.threadPoolForComputation()) + new ThreadPoolForComputationScheduler(rx.schedulers.Schedulers.threadPoolForComputation()) } } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ThreadPoolForIOScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ThreadPoolForIOScheduler.scala index 63b7e6b6596..e3fef920805 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ThreadPoolForIOScheduler.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ThreadPoolForIOScheduler.scala @@ -29,7 +29,7 @@ object ThreadPoolForIOScheduler { * Do not perform computational work on this scheduler. Use [[rx.lang.scala.schedulers.ThreadPoolForComputationScheduler]] instead. */ def apply(): ThreadPoolForIOScheduler = { - new ThreadPoolForIOScheduler(rx.concurrency.Schedulers.threadPoolForIO()) + new ThreadPoolForIOScheduler(rx.schedulers.Schedulers.threadPoolForIO()) } } diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index 41bbdca3479..d4778d7d5e5 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.lang.scala import java.util.Calendar diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ConstructorTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ConstructorTest.scala index e2822405b24..6669db2f77f 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ConstructorTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ConstructorTest.scala @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.lang.scala import scala.language.postfixOps import org.junit.Assert._ diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/NotificationTests.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/NotificationTests.scala index 759ca8b7ecc..ea05faa9426 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/NotificationTests.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/NotificationTests.scala @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.lang.scala diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala index 0b755833da7..39f863b4dd5 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.lang.scala import scala.concurrent.{Future, Await} diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubjectTests.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubjectTests.scala index 50773d55820..f8d72e3520f 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubjectTests.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubjectTests.scala @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.lang.scala import org.junit.{Assert, Test} diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriptionTests.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriptionTests.scala index 71ea4d4a6ef..b53d4fe2654 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriptionTests.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriptionTests.scala @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.lang.scala diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/android/concurrency/AndroidSchedulers.java b/rxjava-contrib/rxjava-android/src/main/java/rx/android/concurrency/AndroidSchedulers.java index 0b238b1644d..f0e42f0d6dc 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/android/concurrency/AndroidSchedulers.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/android/concurrency/AndroidSchedulers.java @@ -15,37 +15,25 @@ */ package rx.android.concurrency; -import android.os.Handler; -import android.os.Looper; import rx.Scheduler; +import android.os.Handler; /** - * Schedulers that have Android specific functionality + * Deprecated. Package changed from rx.android.concurrency to rx.android.schedulers. + * + * @deprecated Use {@link rx.android.schedulers.AndroidSchedulers} instead. This will be removed before 1.0 release. */ +@Deprecated public class AndroidSchedulers { - private static final Scheduler MAIN_THREAD_SCHEDULER = - new HandlerThreadScheduler(new Handler(Looper.getMainLooper())); - - private AndroidSchedulers(){ - - } - - /** - * {@link Scheduler} which uses the provided {@link Handler} to execute an action - * @param handler The handler that will be used when executing the action - * @return A handler based scheduler - */ + @Deprecated public static Scheduler handlerThread(final Handler handler) { - return new HandlerThreadScheduler(handler); + return rx.android.schedulers.AndroidSchedulers.handlerThread(handler); } - /** - * {@link Scheduler} which will execute an action on the main Android UI thread. - * - * @return A Main {@link Looper} based scheduler - */ + @Deprecated public static Scheduler mainThread() { - return MAIN_THREAD_SCHEDULER; + return rx.android.schedulers.AndroidSchedulers.mainThread(); } + } diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/android/concurrency/HandlerThreadScheduler.java b/rxjava-contrib/rxjava-android/src/main/java/rx/android/concurrency/HandlerThreadScheduler.java index ae01d17c1aa..7c117ccf44d 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/android/concurrency/HandlerThreadScheduler.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/android/concurrency/HandlerThreadScheduler.java @@ -17,116 +17,16 @@ import android.os.Handler; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; -import org.robolectric.RobolectricTestRunner; -import org.robolectric.annotation.Config; - -import rx.Scheduler; -import rx.Subscription; -import rx.operators.SafeObservableSubscription; -import rx.util.functions.Func2; - -import java.util.concurrent.TimeUnit; - -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; - /** - * Schedules actions to run on an Android Handler thread. + * Deprecated. Package changed from rx.android.concurrency to rx.android.schedulers. + * + * @deprecated Use {@link rx.android.schedulers.HandlerThreadScheduler} instead. This will be removed before 1.0 release. */ -public class HandlerThreadScheduler extends Scheduler { - - private final Handler handler; +@Deprecated +public class HandlerThreadScheduler extends rx.android.schedulers.HandlerThreadScheduler { - /** - * Constructs a {@link HandlerThreadScheduler} using the given {@link Handler} - * @param handler {@link Handler} to use when scheduling actions - */ public HandlerThreadScheduler(Handler handler) { - this.handler = handler; + super(handler); } - /** - * Calls {@link HandlerThreadScheduler#schedule(Object, rx.util.functions.Func2, long, java.util.concurrent.TimeUnit)} - * with a delay of zero milliseconds. - * - * See {@link #schedule(Object, rx.util.functions.Func2, long, java.util.concurrent.TimeUnit)} - */ - @Override - public Subscription schedule(final T state, final Func2 action) { - return schedule(state, action, 0L, TimeUnit.MILLISECONDS); - } - - /** - * Calls {@link Handler#postDelayed(Runnable, long)} with a runnable that executes the given action. - * @param state - * State to pass into the action. - * @param action - * Action to schedule. - * @param delayTime - * Time the action is to be delayed before executing. - * @param unit - * Time unit of the delay time. - * @return A Subscription from which one can unsubscribe from. - */ - @Override - public Subscription schedule(final T state, final Func2 action, long delayTime, TimeUnit unit) { - final SafeObservableSubscription subscription = new SafeObservableSubscription(); - final Scheduler _scheduler = this; - handler.postDelayed(new Runnable() { - @Override - public void run() { - subscription.wrap(action.call(_scheduler, state)); - } - }, unit.toMillis(delayTime)); - return subscription; - } - - @RunWith(RobolectricTestRunner.class) - @Config(manifest=Config.NONE) - public static final class UnitTest { - - @Test - public void shouldScheduleImmediateActionOnHandlerThread() { - final Handler handler = mock(Handler.class); - final Object state = new Object(); - @SuppressWarnings("unchecked") - final Func2 action = mock(Func2.class); - - Scheduler scheduler = new HandlerThreadScheduler(handler); - scheduler.schedule(state, action); - - // verify that we post to the given Handler - ArgumentCaptor runnable = ArgumentCaptor.forClass(Runnable.class); - verify(handler).postDelayed(runnable.capture(), eq(0L)); - - // verify that the given handler delegates to our action - runnable.getValue().run(); - verify(action).call(scheduler, state); - } - - @Test - public void shouldScheduleDelayedActionOnHandlerThread() { - final Handler handler = mock(Handler.class); - final Object state = new Object(); - @SuppressWarnings("unchecked") - final Func2 action = mock(Func2.class); - - Scheduler scheduler = new HandlerThreadScheduler(handler); - scheduler.schedule(state, action, 1L, TimeUnit.SECONDS); - - // verify that we post to the given Handler - ArgumentCaptor runnable = ArgumentCaptor.forClass(Runnable.class); - verify(handler).postDelayed(runnable.capture(), eq(1000L)); - - // verify that the given handler delegates to our action - runnable.getValue().run(); - verify(action).call(scheduler, state); - } - } } - - diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/AndroidSchedulers.java b/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/AndroidSchedulers.java new file mode 100644 index 00000000000..5120c8e848e --- /dev/null +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/AndroidSchedulers.java @@ -0,0 +1,51 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.android.schedulers; + +import android.os.Handler; +import android.os.Looper; +import rx.Scheduler; + +/** + * Schedulers that have Android specific functionality + */ +public class AndroidSchedulers { + + private static final Scheduler MAIN_THREAD_SCHEDULER = + new HandlerThreadScheduler(new Handler(Looper.getMainLooper())); + + private AndroidSchedulers(){ + + } + + /** + * {@link Scheduler} which uses the provided {@link Handler} to execute an action + * @param handler The handler that will be used when executing the action + * @return A handler based scheduler + */ + public static Scheduler handlerThread(final Handler handler) { + return new HandlerThreadScheduler(handler); + } + + /** + * {@link Scheduler} which will execute an action on the main Android UI thread. + * + * @return A Main {@link Looper} based scheduler + */ + public static Scheduler mainThread() { + return MAIN_THREAD_SCHEDULER; + } +} diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java b/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java new file mode 100644 index 00000000000..8535dda2955 --- /dev/null +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/android/schedulers/HandlerThreadScheduler.java @@ -0,0 +1,132 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.android.schedulers; + +import android.os.Handler; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.robolectric.RobolectricTestRunner; +import org.robolectric.annotation.Config; + +import rx.Scheduler; +import rx.Subscription; +import rx.operators.SafeObservableSubscription; +import rx.util.functions.Func2; + +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * Schedules actions to run on an Android Handler thread. + */ +public class HandlerThreadScheduler extends Scheduler { + + private final Handler handler; + + /** + * Constructs a {@link HandlerThreadScheduler} using the given {@link Handler} + * @param handler {@link Handler} to use when scheduling actions + */ + public HandlerThreadScheduler(Handler handler) { + this.handler = handler; + } + + /** + * Calls {@link HandlerThreadScheduler#schedule(Object, rx.util.functions.Func2, long, java.util.concurrent.TimeUnit)} + * with a delay of zero milliseconds. + * + * See {@link #schedule(Object, rx.util.functions.Func2, long, java.util.concurrent.TimeUnit)} + */ + @Override + public Subscription schedule(final T state, final Func2 action) { + return schedule(state, action, 0L, TimeUnit.MILLISECONDS); + } + + /** + * Calls {@link Handler#postDelayed(Runnable, long)} with a runnable that executes the given action. + * @param state + * State to pass into the action. + * @param action + * Action to schedule. + * @param delayTime + * Time the action is to be delayed before executing. + * @param unit + * Time unit of the delay time. + * @return A Subscription from which one can unsubscribe from. + */ + @Override + public Subscription schedule(final T state, final Func2 action, long delayTime, TimeUnit unit) { + final SafeObservableSubscription subscription = new SafeObservableSubscription(); + final Scheduler _scheduler = this; + handler.postDelayed(new Runnable() { + @Override + public void run() { + subscription.wrap(action.call(_scheduler, state)); + } + }, unit.toMillis(delayTime)); + return subscription; + } + + @RunWith(RobolectricTestRunner.class) + @Config(manifest=Config.NONE) + public static final class UnitTest { + + @Test + public void shouldScheduleImmediateActionOnHandlerThread() { + final Handler handler = mock(Handler.class); + final Object state = new Object(); + @SuppressWarnings("unchecked") + final Func2 action = mock(Func2.class); + + Scheduler scheduler = new HandlerThreadScheduler(handler); + scheduler.schedule(state, action); + + // verify that we post to the given Handler + ArgumentCaptor runnable = ArgumentCaptor.forClass(Runnable.class); + verify(handler).postDelayed(runnable.capture(), eq(0L)); + + // verify that the given handler delegates to our action + runnable.getValue().run(); + verify(action).call(scheduler, state); + } + + @Test + public void shouldScheduleDelayedActionOnHandlerThread() { + final Handler handler = mock(Handler.class); + final Object state = new Object(); + @SuppressWarnings("unchecked") + final Func2 action = mock(Func2.class); + + Scheduler scheduler = new HandlerThreadScheduler(handler); + scheduler.schedule(state, action, 1L, TimeUnit.SECONDS); + + // verify that we post to the given Handler + ArgumentCaptor runnable = ArgumentCaptor.forClass(Runnable.class); + verify(handler).postDelayed(runnable.capture(), eq(1000L)); + + // verify that the given handler delegates to our action + runnable.getValue().run(); + verify(action).call(scheduler, state); + } + } +} + + diff --git a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperationObserveFromAndroidComponent.java b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperationObserveFromAndroidComponent.java index cfce38f8bb9..06b22dae5cb 100644 --- a/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperationObserveFromAndroidComponent.java +++ b/rxjava-contrib/rxjava-android/src/main/java/rx/operators/OperationObserveFromAndroidComponent.java @@ -31,12 +31,12 @@ import org.mockito.MockitoAnnotations; import org.robolectric.RobolectricTestRunner; import org.robolectric.annotation.Config; + import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.android.concurrency.AndroidSchedulers; +import rx.android.schedulers.AndroidSchedulers; import rx.subjects.PublishSubject; - import android.app.Activity; import android.app.Fragment; import android.os.Looper; diff --git a/rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java b/rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java index dfe7ca3e68f..3bdb58a8779 100644 --- a/rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java +++ b/rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.observables; import java.nio.ByteBuffer; diff --git a/rxjava-contrib/rxjava-string/src/test/java/rx/observables/StringObservableTest.java b/rxjava-contrib/rxjava-string/src/test/java/rx/observables/StringObservableTest.java index 8ced455f63e..86a686958c3 100644 --- a/rxjava-contrib/rxjava-string/src/test/java/rx/observables/StringObservableTest.java +++ b/rxjava-contrib/rxjava-string/src/test/java/rx/observables/StringObservableTest.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.observables; import static org.junit.Assert.*; diff --git a/rxjava-contrib/rxjava-swing/src/main/java/rx/concurrency/SwingScheduler.java b/rxjava-contrib/rxjava-swing/src/main/java/rx/concurrency/SwingScheduler.java index f2d10d69b08..c6be1c55ece 100644 --- a/rxjava-contrib/rxjava-swing/src/main/java/rx/concurrency/SwingScheduler.java +++ b/rxjava-contrib/rxjava-swing/src/main/java/rx/concurrency/SwingScheduler.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,260 +15,44 @@ */ package rx.concurrency; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - -import java.awt.EventQueue; -import java.awt.event.ActionEvent; -import java.awt.event.ActionListener; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import javax.swing.SwingUtilities; -import javax.swing.Timer; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.mockito.InOrder; import rx.Scheduler; import rx.Subscription; -import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.Subscriptions; -import rx.util.functions.Action0; import rx.util.functions.Func2; /** - * Executes work on the Swing UI thread. - * This scheduler should only be used with actions that execute quickly. + * Deprecated. Package changed from rx.concurrency to rx.schedulers. + * + * @deprecated Use {@link rx.schedulers.SwingScheduler} instead. This will be removed before 1.0 release. */ -public final class SwingScheduler extends Scheduler { - private static final SwingScheduler INSTANCE = new SwingScheduler(); +@Deprecated +public class SwingScheduler extends Scheduler { + + private final static SwingScheduler INSTANCE = new SwingScheduler(); public static SwingScheduler getInstance() { return INSTANCE; } + private final rx.schedulers.SwingScheduler actual; + private SwingScheduler() { + actual = rx.schedulers.SwingScheduler.getInstance(); } @Override - public Subscription schedule(final T state, final Func2 action) { - final AtomicReference sub = new AtomicReference(); - EventQueue.invokeLater(new Runnable() { - @Override - public void run() { - sub.set(action.call(SwingScheduler.this, state)); - } - }); - return Subscriptions.create(new Action0() { - @Override - public void call() { - Subscription subscription = sub.get(); - if (subscription != null) { - subscription.unsubscribe(); - } - } - }); + public Subscription schedule(T state, Func2 action) { + return actual.schedule(state, action); } @Override - public Subscription schedule(final T state, final Func2 action, long dueTime, TimeUnit unit) { - final AtomicReference sub = new AtomicReference(); - long delay = unit.toMillis(dueTime); - assertThatTheDelayIsValidForTheSwingTimer(delay); - - class ExecuteOnceAction implements ActionListener { - private Timer timer; - - private void setTimer(Timer timer) { - this.timer = timer; - } - - @Override - public void actionPerformed(ActionEvent e) { - timer.stop(); - sub.set(action.call(SwingScheduler.this, state)); - } - } - - ExecuteOnceAction executeOnce = new ExecuteOnceAction(); - final Timer timer = new Timer((int) delay, executeOnce); - executeOnce.setTimer(timer); - timer.start(); - - return Subscriptions.create(new Action0() { - @Override - public void call() { - timer.stop(); - - Subscription subscription = sub.get(); - if (subscription != null) { - subscription.unsubscribe(); - } - } - }); + public Subscription schedule(T state, Func2 action, long delayTime, TimeUnit unit) { + return actual.schedule(state, action, delayTime, unit); } - @Override public Subscription schedulePeriodically(T state, final Func2 action, long initialDelay, long period, TimeUnit unit) { - final AtomicReference timer = new AtomicReference(); - - final long delay = unit.toMillis(period); - assertThatTheDelayIsValidForTheSwingTimer(delay); - - final CompositeSubscription subscriptions = new CompositeSubscription(); - final Func2 initialAction = new Func2() { - @Override - public Subscription call(final Scheduler scheduler, final T state0) { - // start timer for periodic execution, collect subscriptions - timer.set(new Timer((int) delay, new ActionListener() { - @Override - public void actionPerformed(ActionEvent e) { - subscriptions.add(action.call(scheduler, state0)); - } - })); - timer.get().start(); - - return action.call(scheduler, state0); - } - }; - subscriptions.add(schedule(state, initialAction, initialDelay, unit)); - - subscriptions.add(Subscriptions.create(new Action0() { - @Override - public void call() { - // in addition to all the individual unsubscriptions, stop the timer on unsubscribing - Timer maybeTimer = timer.get(); - if (maybeTimer != null) { - maybeTimer.stop(); - } - } - })); - - return subscriptions; - } - - private static void assertThatTheDelayIsValidForTheSwingTimer(long delay) { - if (delay < 0 || delay > Integer.MAX_VALUE) { - throw new IllegalArgumentException(String.format("The swing timer only accepts non-negative delays up to %d milliseconds.", Integer.MAX_VALUE)); - } + return actual.schedulePeriodically(state, action, initialDelay, period, unit); } - public static class UnitTest { - @Rule - public ExpectedException exception = ExpectedException.none(); - - @Test - public void testInvalidDelayValues() { - final SwingScheduler scheduler = new SwingScheduler(); - final Action0 action = mock(Action0.class); - - exception.expect(IllegalArgumentException.class); - scheduler.schedulePeriodically(action, -1L, 100L, TimeUnit.SECONDS); - - exception.expect(IllegalArgumentException.class); - scheduler.schedulePeriodically(action, 100L, -1L, TimeUnit.SECONDS); - - exception.expect(IllegalArgumentException.class); - scheduler.schedulePeriodically(action, 1L + Integer.MAX_VALUE, 100L, TimeUnit.MILLISECONDS); - - exception.expect(IllegalArgumentException.class); - scheduler.schedulePeriodically(action, 100L, 1L + Integer.MAX_VALUE / 1000, TimeUnit.SECONDS); - } - - @Test - public void testPeriodicScheduling() throws Exception { - final SwingScheduler scheduler = new SwingScheduler(); - - final CountDownLatch latch = new CountDownLatch(4); - - final Action0 innerAction = mock(Action0.class); - final Action0 action = new Action0() { - @Override - public void call() { - try { - innerAction.call(); - assertTrue(SwingUtilities.isEventDispatchThread()); - } finally { - latch.countDown(); - } - } - }; - - Subscription sub = scheduler.schedulePeriodically(action, 50, 200, TimeUnit.MILLISECONDS); - - if (!latch.await(5000, TimeUnit.MILLISECONDS)) { - fail("timed out waiting for tasks to execute"); - } - - sub.unsubscribe(); - waitForEmptyEventQueue(); - verify(innerAction, times(4)).call(); - } - - @Test - public void testNestedActions() throws Exception { - final SwingScheduler scheduler = new SwingScheduler(); - - final Action0 firstStepStart = mock(Action0.class); - final Action0 firstStepEnd = mock(Action0.class); - - final Action0 secondStepStart = mock(Action0.class); - final Action0 secondStepEnd = mock(Action0.class); - - final Action0 thirdStepStart = mock(Action0.class); - final Action0 thirdStepEnd = mock(Action0.class); - - final Action0 firstAction = new Action0() { - @Override - public void call() { - assertTrue(SwingUtilities.isEventDispatchThread()); - firstStepStart.call(); - firstStepEnd.call(); - } - }; - final Action0 secondAction = new Action0() { - @Override - public void call() { - assertTrue(SwingUtilities.isEventDispatchThread()); - secondStepStart.call(); - scheduler.schedule(firstAction); - secondStepEnd.call(); - } - }; - final Action0 thirdAction = new Action0() { - @Override - public void call() { - assertTrue(SwingUtilities.isEventDispatchThread()); - thirdStepStart.call(); - scheduler.schedule(secondAction); - thirdStepEnd.call(); - } - }; - - InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd); - - scheduler.schedule(thirdAction); - waitForEmptyEventQueue(); - - inOrder.verify(thirdStepStart, times(1)).call(); - inOrder.verify(thirdStepEnd, times(1)).call(); - inOrder.verify(secondStepStart, times(1)).call(); - inOrder.verify(secondStepEnd, times(1)).call(); - inOrder.verify(firstStepStart, times(1)).call(); - inOrder.verify(firstStepEnd, times(1)).call(); - } - - private static void waitForEmptyEventQueue() throws Exception { - EventQueue.invokeAndWait(new Runnable() { - @Override - public void run() { - // nothing to do, we're just waiting here for the event queue to be emptied - } - }); - } - } } diff --git a/rxjava-contrib/rxjava-swing/src/main/java/rx/schedulers/SwingScheduler.java b/rxjava-contrib/rxjava-swing/src/main/java/rx/schedulers/SwingScheduler.java new file mode 100644 index 00000000000..7ce2c4e2d17 --- /dev/null +++ b/rxjava-contrib/rxjava-swing/src/main/java/rx/schedulers/SwingScheduler.java @@ -0,0 +1,274 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.schedulers; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.awt.EventQueue; +import java.awt.event.ActionEvent; +import java.awt.event.ActionListener; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import javax.swing.SwingUtilities; +import javax.swing.Timer; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.InOrder; + +import rx.Scheduler; +import rx.Subscription; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Func2; + +/** + * Executes work on the Swing UI thread. + * This scheduler should only be used with actions that execute quickly. + */ +public final class SwingScheduler extends Scheduler { + private static final SwingScheduler INSTANCE = new SwingScheduler(); + + public static SwingScheduler getInstance() { + return INSTANCE; + } + + private SwingScheduler() { + } + + @Override + public Subscription schedule(final T state, final Func2 action) { + final AtomicReference sub = new AtomicReference(); + EventQueue.invokeLater(new Runnable() { + @Override + public void run() { + sub.set(action.call(SwingScheduler.this, state)); + } + }); + return Subscriptions.create(new Action0() { + @Override + public void call() { + Subscription subscription = sub.get(); + if (subscription != null) { + subscription.unsubscribe(); + } + } + }); + } + + @Override + public Subscription schedule(final T state, final Func2 action, long dueTime, TimeUnit unit) { + final AtomicReference sub = new AtomicReference(); + long delay = unit.toMillis(dueTime); + assertThatTheDelayIsValidForTheSwingTimer(delay); + + class ExecuteOnceAction implements ActionListener { + private Timer timer; + + private void setTimer(Timer timer) { + this.timer = timer; + } + + @Override + public void actionPerformed(ActionEvent e) { + timer.stop(); + sub.set(action.call(SwingScheduler.this, state)); + } + } + + ExecuteOnceAction executeOnce = new ExecuteOnceAction(); + final Timer timer = new Timer((int) delay, executeOnce); + executeOnce.setTimer(timer); + timer.start(); + + return Subscriptions.create(new Action0() { + @Override + public void call() { + timer.stop(); + + Subscription subscription = sub.get(); + if (subscription != null) { + subscription.unsubscribe(); + } + } + }); + } + + @Override + public Subscription schedulePeriodically(T state, final Func2 action, long initialDelay, long period, TimeUnit unit) { + final AtomicReference timer = new AtomicReference(); + + final long delay = unit.toMillis(period); + assertThatTheDelayIsValidForTheSwingTimer(delay); + + final CompositeSubscription subscriptions = new CompositeSubscription(); + final Func2 initialAction = new Func2() { + @Override + public Subscription call(final Scheduler scheduler, final T state0) { + // start timer for periodic execution, collect subscriptions + timer.set(new Timer((int) delay, new ActionListener() { + @Override + public void actionPerformed(ActionEvent e) { + subscriptions.add(action.call(scheduler, state0)); + } + })); + timer.get().start(); + + return action.call(scheduler, state0); + } + }; + subscriptions.add(schedule(state, initialAction, initialDelay, unit)); + + subscriptions.add(Subscriptions.create(new Action0() { + @Override + public void call() { + // in addition to all the individual unsubscriptions, stop the timer on unsubscribing + Timer maybeTimer = timer.get(); + if (maybeTimer != null) { + maybeTimer.stop(); + } + } + })); + + return subscriptions; + } + + private static void assertThatTheDelayIsValidForTheSwingTimer(long delay) { + if (delay < 0 || delay > Integer.MAX_VALUE) { + throw new IllegalArgumentException(String.format("The swing timer only accepts non-negative delays up to %d milliseconds.", Integer.MAX_VALUE)); + } + } + + public static class UnitTest { + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Test + public void testInvalidDelayValues() { + final SwingScheduler scheduler = new SwingScheduler(); + final Action0 action = mock(Action0.class); + + exception.expect(IllegalArgumentException.class); + scheduler.schedulePeriodically(action, -1L, 100L, TimeUnit.SECONDS); + + exception.expect(IllegalArgumentException.class); + scheduler.schedulePeriodically(action, 100L, -1L, TimeUnit.SECONDS); + + exception.expect(IllegalArgumentException.class); + scheduler.schedulePeriodically(action, 1L + Integer.MAX_VALUE, 100L, TimeUnit.MILLISECONDS); + + exception.expect(IllegalArgumentException.class); + scheduler.schedulePeriodically(action, 100L, 1L + Integer.MAX_VALUE / 1000, TimeUnit.SECONDS); + } + + @Test + public void testPeriodicScheduling() throws Exception { + final SwingScheduler scheduler = new SwingScheduler(); + + final CountDownLatch latch = new CountDownLatch(4); + + final Action0 innerAction = mock(Action0.class); + final Action0 action = new Action0() { + @Override + public void call() { + try { + innerAction.call(); + assertTrue(SwingUtilities.isEventDispatchThread()); + } finally { + latch.countDown(); + } + } + }; + + Subscription sub = scheduler.schedulePeriodically(action, 50, 200, TimeUnit.MILLISECONDS); + + if (!latch.await(5000, TimeUnit.MILLISECONDS)) { + fail("timed out waiting for tasks to execute"); + } + + sub.unsubscribe(); + waitForEmptyEventQueue(); + verify(innerAction, times(4)).call(); + } + + @Test + public void testNestedActions() throws Exception { + final SwingScheduler scheduler = new SwingScheduler(); + + final Action0 firstStepStart = mock(Action0.class); + final Action0 firstStepEnd = mock(Action0.class); + + final Action0 secondStepStart = mock(Action0.class); + final Action0 secondStepEnd = mock(Action0.class); + + final Action0 thirdStepStart = mock(Action0.class); + final Action0 thirdStepEnd = mock(Action0.class); + + final Action0 firstAction = new Action0() { + @Override + public void call() { + assertTrue(SwingUtilities.isEventDispatchThread()); + firstStepStart.call(); + firstStepEnd.call(); + } + }; + final Action0 secondAction = new Action0() { + @Override + public void call() { + assertTrue(SwingUtilities.isEventDispatchThread()); + secondStepStart.call(); + scheduler.schedule(firstAction); + secondStepEnd.call(); + } + }; + final Action0 thirdAction = new Action0() { + @Override + public void call() { + assertTrue(SwingUtilities.isEventDispatchThread()); + thirdStepStart.call(); + scheduler.schedule(secondAction); + thirdStepEnd.call(); + } + }; + + InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd); + + scheduler.schedule(thirdAction); + waitForEmptyEventQueue(); + + inOrder.verify(thirdStepStart, times(1)).call(); + inOrder.verify(thirdStepEnd, times(1)).call(); + inOrder.verify(secondStepStart, times(1)).call(); + inOrder.verify(secondStepEnd, times(1)).call(); + inOrder.verify(firstStepStart, times(1)).call(); + inOrder.verify(firstStepEnd, times(1)).call(); + } + + private static void waitForEmptyEventQueue() throws Exception { + EventQueue.invokeAndWait(new Runnable() { + @Override + public void run() { + // nothing to do, we're just waiting here for the event queue to be emptied + } + }); + } + } +} diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 15a4d5bff63..e11f9b70568 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -27,7 +27,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import rx.concurrency.Schedulers; import rx.joins.Pattern2; import rx.joins.Plan0; import rx.observables.BlockingObservable; @@ -109,6 +108,7 @@ import rx.plugins.RxJavaErrorHandler; import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; +import rx.schedulers.Schedulers; import rx.subjects.AsyncSubject; import rx.subjects.PublishSubject; import rx.subjects.ReplaySubject; diff --git a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java index 9bf1f6ad91c..fe8ac26e0b4 100644 --- a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,85 +15,40 @@ */ package rx.concurrency; -import java.util.PriorityQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import rx.Scheduler; import rx.Subscription; import rx.util.functions.Func2; /** - * Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed after the current unit of work is completed. + * Deprecated. Package changed from rx.concurrency to rx.schedulers. + * + * @deprecated Use {@link rx.schedulers.CurrentThreadScheduler} instead. This will be removed before 1.0 release. */ +@Deprecated public class CurrentThreadScheduler extends Scheduler { - private static final CurrentThreadScheduler INSTANCE = new CurrentThreadScheduler(); + + private final static CurrentThreadScheduler INSTANCE = new CurrentThreadScheduler(); public static CurrentThreadScheduler getInstance() { return INSTANCE; } - private static final ThreadLocal> QUEUE = new ThreadLocal>(); + private final rx.schedulers.CurrentThreadScheduler actual; - /* package accessible for unit tests */CurrentThreadScheduler() { + private CurrentThreadScheduler() { + actual = rx.schedulers.CurrentThreadScheduler.getInstance(); } - private final AtomicInteger counter = new AtomicInteger(0); - @Override public Subscription schedule(T state, Func2 action) { - DiscardableAction discardableAction = new DiscardableAction(state, action); - enqueue(discardableAction, now()); - return discardableAction; + return actual.schedule(state, action); } @Override - public Subscription schedule(T state, Func2 action, long dueTime, TimeUnit unit) { - long execTime = now() + unit.toMillis(dueTime); - - DiscardableAction discardableAction = new DiscardableAction(state, new SleepingAction(action, this, execTime)); - enqueue(discardableAction, execTime); - return discardableAction; - } - - private void enqueue(DiscardableAction action, long execTime) { - PriorityQueue queue = QUEUE.get(); - boolean exec = queue == null; - - if (exec) { - queue = new PriorityQueue(); - QUEUE.set(queue); - } - - queue.add(new TimedAction(action, execTime, counter.incrementAndGet())); - - if (exec) { - while (!queue.isEmpty()) { - queue.poll().action.call(this); - } - - QUEUE.set(null); - } + public Subscription schedule(T state, Func2 action, long delayTime, TimeUnit unit) { + return actual.schedule(state, action, delayTime, unit); } - private static class TimedAction implements Comparable { - final DiscardableAction action; - final Long execTime; - final Integer count; // In case if time between enqueueing took less than 1ms - - private TimedAction(DiscardableAction action, Long execTime, Integer count) { - this.action = action; - this.execTime = execTime; - this.count = count; - } - - @Override - public int compareTo(TimedAction that) { - int result = execTime.compareTo(that.execTime); - if (result == 0) { - return count.compareTo(that.count); - } - return result; - } - } } diff --git a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java index 1e35735c67d..2833a2072e9 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,132 +16,24 @@ package rx.concurrency; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import rx.Scheduler; -import rx.Subscription; -import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.Subscriptions; -import rx.util.functions.Func2; /** - * A {@link Scheduler} implementation that uses an {@link Executor} or {@link ScheduledExecutorService} implementation. - *

- * Note that if an {@link Executor} implementation is used instead of {@link ScheduledExecutorService} then a system-wide Timer will be used to handle delayed events. + * Deprecated. Package changed from rx.concurrency to rx.schedulers. + * + * @deprecated Use {@link rx.schedulers.ExecutorScheduler} instead. This will be removed before 1.0 release. */ -public class ExecutorScheduler extends Scheduler { - private final Executor executor; +@Deprecated +public class ExecutorScheduler extends rx.schedulers.ExecutorScheduler { + @Deprecated public ExecutorScheduler(Executor executor) { - this.executor = executor; + super(executor); } + @Deprecated public ExecutorScheduler(ScheduledExecutorService executor) { - this.executor = executor; - } - - @Override - public Subscription schedulePeriodically(final T state, final Func2 action, long initialDelay, long period, TimeUnit unit) { - if (executor instanceof ScheduledExecutorService) { - final CompositeSubscription subscriptions = new CompositeSubscription(); - - ScheduledFuture f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - Subscription s = action.call(ExecutorScheduler.this, state); - subscriptions.add(s); - } - }, initialDelay, period, unit); - - subscriptions.add(Subscriptions.from(f)); - return subscriptions; - - } else { - return super.schedulePeriodically(state, action, initialDelay, period, unit); - } - } - - @Override - public Subscription schedule(final T state, final Func2 action, long delayTime, TimeUnit unit) { - final DiscardableAction discardableAction = new DiscardableAction(state, action); - final Scheduler _scheduler = this; - // all subscriptions that may need to be unsubscribed - final CompositeSubscription subscription = new CompositeSubscription(discardableAction); - - if (executor instanceof ScheduledExecutorService) { - // we are a ScheduledExecutorService so can do proper scheduling - ScheduledFuture f = ((ScheduledExecutorService) executor).schedule(new Runnable() { - @Override - public void run() { - // when the delay has passed we now do the work on the actual scheduler - Subscription s = discardableAction.call(_scheduler); - // add the subscription to the CompositeSubscription so it is unsubscribed - subscription.add(s); - } - }, delayTime, unit); - // add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens - subscription.add(Subscriptions.from(f)); - } else { - // we are not a ScheduledExecutorService so can't directly schedule - if (delayTime == 0) { - // no delay so put on the thread-pool right now - Subscription s = schedule(state, action); - // add the subscription to the CompositeSubscription so it is unsubscribed - subscription.add(s); - } else { - // there is a delay and this isn't a ScheduledExecutorService so we'll use a system-wide ScheduledExecutorService - // to handle the scheduling and once it's ready then execute on this Executor - ScheduledFuture f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() { - - @Override - public void run() { - // now execute on the real Executor (by using the other overload that schedules for immediate execution) - Subscription s = _scheduler.schedule(state, action); - // add the subscription to the CompositeSubscription so it is unsubscribed - subscription.add(s); - } - }, delayTime, unit); - // add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens - subscription.add(Subscriptions.from(f)); - } - } - return subscription; - } - - @Override - public Subscription schedule(T state, Func2 action) { - final DiscardableAction discardableAction = new DiscardableAction(state, action); - final Scheduler _scheduler = this; - // all subscriptions that may need to be unsubscribed - final CompositeSubscription subscription = new CompositeSubscription(discardableAction); - - // work to be done on a thread - Runnable r = new Runnable() { - @Override - public void run() { - Subscription s = discardableAction.call(_scheduler); - // add the subscription to the CompositeSubscription so it is unsubscribed - subscription.add(s); - } - }; - - // submit for immediate execution - if (executor instanceof ExecutorService) { - // we are an ExecutorService so get a Future back that supports unsubscribe - Future f = ((ExecutorService) executor).submit(r); - // add the Future as a subscription so we can cancel the scheduled action if an unsubscribe happens - subscription.add(Subscriptions.from(f)); - } else { - // we are the lowest common denominator so can't unsubscribe once we execute - executor.execute(r); - } - - return subscription; - + super(executor); } } diff --git a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java index c5a4cef9d48..f3bdf8aa8fc 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,28 +22,33 @@ import rx.util.functions.Func2; /** - * Executes work immediately on the current thread. + * Deprecated. Package changed from rx.concurrency to rx.schedulers. + * + * @deprecated Use {@link rx.schedulers.ImmediateScheduler} instead. This will be removed before 1.0 release. */ -public final class ImmediateScheduler extends Scheduler { - private static final ImmediateScheduler INSTANCE = new ImmediateScheduler(); +@Deprecated +public class ImmediateScheduler extends Scheduler { + + private final static ImmediateScheduler INSTANCE = new ImmediateScheduler(); public static ImmediateScheduler getInstance() { return INSTANCE; } - /* package accessible for unit tests */ImmediateScheduler() { + private final rx.schedulers.ImmediateScheduler actual; + + private ImmediateScheduler() { + actual = rx.schedulers.ImmediateScheduler.getInstance(); } @Override public Subscription schedule(T state, Func2 action) { - return action.call(this, state); + return actual.schedule(state, action); } @Override - public Subscription schedule(T state, Func2 action, long dueTime, TimeUnit unit) { - // since we are executing immediately on this thread we must cause this thread to sleep - long execTime = now() + unit.toMillis(dueTime); - - return schedule(state, new SleepingAction(action, this, execTime)); + public Subscription schedule(T state, Func2 action, long delayTime, TimeUnit unit) { + return actual.schedule(state, action, delayTime, unit); } + } diff --git a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java index 036ba621276..a574442ba67 100644 --- a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,122 +15,40 @@ */ package rx.concurrency; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import rx.Scheduler; import rx.Subscription; -import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.Subscriptions; import rx.util.functions.Func2; /** - * Schedules work on a new thread. + * Deprecated. Package changed from rx.concurrency to rx.schedulers. + * + * @deprecated Use {@link rx.schedulers.NewThreadScheduler} instead. This will be removed before 1.0 release. */ +@Deprecated public class NewThreadScheduler extends Scheduler { private final static NewThreadScheduler INSTANCE = new NewThreadScheduler(); - private final static AtomicLong count = new AtomicLong(); public static NewThreadScheduler getInstance() { return INSTANCE; } - private NewThreadScheduler() { - - } - - private static class EventLoopScheduler extends Scheduler { - private final ExecutorService executor; - - private EventLoopScheduler() { - executor = Executors.newFixedThreadPool(1, new ThreadFactory() { - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "RxNewThreadScheduler-" + count.incrementAndGet()); - } - }); - } - - @Override - public Subscription schedule(T state, Func2 action) { - final DiscardableAction discardableAction = new DiscardableAction(state, action); - // all subscriptions that may need to be unsubscribed - final CompositeSubscription subscription = new CompositeSubscription(discardableAction); - - final Scheduler _scheduler = this; - subscription.add(Subscriptions.from(executor.submit(new Runnable() { - - @Override - public void run() { - Subscription s = discardableAction.call(_scheduler); - subscription.add(s); - } - }))); - - return subscription; - } - - @Override - public Subscription schedule(final T state, final Func2 action, final long delayTime, final TimeUnit unit) { - // we will use the system scheduler since it doesn't make sense to launch a new Thread and then sleep - // we will instead schedule the event then launch the thread after the delay has passed - final Scheduler _scheduler = this; - final CompositeSubscription subscription = new CompositeSubscription(); - ScheduledFuture f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() { - - @Override - public void run() { - if (!subscription.isUnsubscribed()) { - // when the delay has passed we now do the work on the actual scheduler - Subscription s = _scheduler.schedule(state, action); - // add the subscription to the CompositeSubscription so it is unsubscribed - subscription.add(s); - } - } - }, delayTime, unit); - - // add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens - subscription.add(Subscriptions.from(f)); - - return subscription; - } + private final rx.schedulers.NewThreadScheduler actual; + private NewThreadScheduler() { + actual = rx.schedulers.NewThreadScheduler.getInstance(); } @Override public Subscription schedule(T state, Func2 action) { - EventLoopScheduler s = new EventLoopScheduler(); - return s.schedule(state, action); + return actual.schedule(state, action); } @Override - public Subscription schedule(final T state, final Func2 action, long delay, TimeUnit unit) { - // we will use the system scheduler since it doesn't make sense to launch a new Thread and then sleep - // we will instead schedule the event then launch the thread after the delay has passed - final Scheduler _scheduler = this; - final CompositeSubscription subscription = new CompositeSubscription(); - ScheduledFuture f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() { - - @Override - public void run() { - if (!subscription.isUnsubscribed()) { - // when the delay has passed we now do the work on the actual scheduler - Subscription s = _scheduler.schedule(state, action); - // add the subscription to the CompositeSubscription so it is unsubscribed - subscription.add(s); - } - } - }, delay, unit); - - // add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens - subscription.add(Subscriptions.from(f)); - - return subscription; + public Subscription schedule(T state, Func2 action, long delayTime, TimeUnit unit) { + return actual.schedule(state, action, delayTime, unit); } + } diff --git a/rxjava-core/src/main/java/rx/concurrency/README.txt b/rxjava-core/src/main/java/rx/concurrency/README.txt new file mode 100644 index 00000000000..d9048bbbc95 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/README.txt @@ -0,0 +1,3 @@ +This package is deprecated and will be removed prior to a 1.0 release. + +Use rx.schedulers.* instead of rx.concurrency.* \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java index 1b27b9bf0ae..cdc9e1f6a3f 100644 --- a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java +++ b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,32 +16,26 @@ package rx.concurrency; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import rx.Scheduler; /** - * Static factory methods for creating Schedulers. + * Deprecated. Package changed from rx.concurrency to rx.schedulers. + * + * @deprecated Use {@link rx.schedulers.Schedulers} instead. This will be removed before 1.0 release. */ +@Deprecated public class Schedulers { - private static final ScheduledExecutorService COMPUTATION_EXECUTOR = createComputationExecutor(); - private static final Executor IO_EXECUTOR = createIOExecutor(); - - private Schedulers() { - - } /** * {@link Scheduler} that executes work immediately on the current thread. * * @return {@link ImmediateScheduler} instance */ + @Deprecated public static Scheduler immediate() { - return ImmediateScheduler.getInstance(); + return rx.schedulers.ImmediateScheduler.getInstance(); } /** @@ -49,8 +43,9 @@ public static Scheduler immediate() { * * @return {@link CurrentThreadScheduler} instance */ + @Deprecated public static Scheduler currentThread() { - return CurrentThreadScheduler.getInstance(); + return rx.schedulers.CurrentThreadScheduler.getInstance(); } /** @@ -58,8 +53,9 @@ public static Scheduler currentThread() { * * @return {@link NewThreadScheduler} instance */ + @Deprecated public static Scheduler newThread() { - return NewThreadScheduler.getInstance(); + return rx.schedulers.NewThreadScheduler.getInstance(); } /** @@ -69,8 +65,9 @@ public static Scheduler newThread() { * * @return {@link ExecutorScheduler} instance */ + @Deprecated public static Scheduler executor(Executor executor) { - return new ExecutorScheduler(executor); + return new rx.schedulers.ExecutorScheduler(executor); } /** @@ -78,8 +75,9 @@ public static Scheduler executor(Executor executor) { * * @return {@link ExecutorScheduler} instance */ + @Deprecated public static Scheduler executor(ScheduledExecutorService executor) { - return new ExecutorScheduler(executor); + return new rx.schedulers.ExecutorScheduler(executor); } /** @@ -93,8 +91,9 @@ public static Scheduler executor(ScheduledExecutorService executor) { * * @return {@link ExecutorScheduler} for computation-bound work. */ + @Deprecated public static Scheduler threadPoolForComputation() { - return executor(COMPUTATION_EXECUTOR); + return rx.schedulers.Schedulers.threadPoolForComputation(); } /** @@ -108,36 +107,9 @@ public static Scheduler threadPoolForComputation() { * * @return {@link ExecutorScheduler} for IO-bound work. */ + @Deprecated public static Scheduler threadPoolForIO() { - return executor(IO_EXECUTOR); + return rx.schedulers.Schedulers.threadPoolForIO(); } - private static ScheduledExecutorService createComputationExecutor() { - int cores = Runtime.getRuntime().availableProcessors(); - return Executors.newScheduledThreadPool(cores, new ThreadFactory() { - final AtomicInteger counter = new AtomicInteger(); - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "RxComputationThreadPool-" + counter.incrementAndGet()); - t.setDaemon(true); - return t; - } - }); - } - - private static Executor createIOExecutor() { - Executor result = Executors.newCachedThreadPool(new ThreadFactory() { - final AtomicLong counter = new AtomicLong(); - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "RxIOThreadPool-" + counter.incrementAndGet()); - t.setDaemon(true); - return t; - } - }); - - return result; - } } diff --git a/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java b/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java index 04b8c1a2c53..e402e1b63e9 100644 --- a/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,106 +15,15 @@ */ package rx.concurrency; -import java.util.Comparator; -import java.util.PriorityQueue; -import java.util.Queue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import rx.Scheduler; -import rx.Subscription; -import rx.util.functions.Func2; - -public class TestScheduler extends Scheduler { - private final Queue> queue = new PriorityQueue>(11, new CompareActionsByTime()); - - private static class TimedAction { - - private final long time; - private final Func2 action; - private final T state; - private final TestScheduler scheduler; - private final AtomicBoolean isCancelled = new AtomicBoolean(false); - - private TimedAction(TestScheduler scheduler, long time, Func2 action, T state) { - this.time = time; - this.action = action; - this.state = state; - this.scheduler = scheduler; - } - - public void cancel() { - isCancelled.set(true); - } - - @Override - public String toString() { - return String.format("TimedAction(time = %d, action = %s)", time, action.toString()); - } - } - - private static class CompareActionsByTime implements Comparator> { - @Override - public int compare(TimedAction action1, TimedAction action2) { - return Long.valueOf(action1.time).compareTo(Long.valueOf(action2.time)); - } - } - - // Storing time in nanoseconds internally. - private long time; - - @Override - public long now() { - return TimeUnit.NANOSECONDS.toMillis(time); - } - - public void advanceTimeBy(long delayTime, TimeUnit unit) { - advanceTimeTo(time + unit.toNanos(delayTime), TimeUnit.NANOSECONDS); - } - - public void advanceTimeTo(long delayTime, TimeUnit unit) { - long targetTime = unit.toNanos(delayTime); - triggerActions(targetTime); - } - - public void triggerActions() { - triggerActions(time); - } - - @SuppressWarnings("unchecked") - private void triggerActions(long targetTimeInNanos) { - while (!queue.isEmpty()) { - TimedAction current = queue.peek(); - if (current.time > targetTimeInNanos) { - break; - } - time = current.time; - queue.remove(); - - // Only execute if the TimedAction has not yet been cancelled - if (!current.isCancelled.get()) { - // because the queue can have wildcards we have to ignore the type T for the state - ((Func2) current.action).call(current.scheduler, current.state); - } - } - time = targetTimeInNanos; - } +/** + * Deprecated. Package changed from rx.concurrency to rx.schedulers. + * + * @deprecated Use {@link rx.schedulers.TestScheduler} instead. This will be removed before 1.0 release. + */ +@Deprecated +public class TestScheduler extends rx.schedulers.TestScheduler { - @Override - public Subscription schedule(T state, Func2 action) { - return schedule(state, action, 0, TimeUnit.MILLISECONDS); + public TestScheduler() { } - @Override - public Subscription schedule(T state, Func2 action, long delayTime, TimeUnit unit) { - final TimedAction timedAction = new TimedAction(this, time + unit.toNanos(delayTime), action, state); - queue.add(timedAction); - - return new Subscription() { - @Override - public void unsubscribe() { - timedAction.cancel(); - } - }; - } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java index 72cc1df2113..d7c63df23ec 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java +++ b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java @@ -24,7 +24,7 @@ import rx.Observer; import rx.Scheduler; import rx.Subscription; -import rx.concurrency.Schedulers; +import rx.schedulers.Schedulers; import rx.subscriptions.CompositeSubscription; import rx.util.functions.Func0; import rx.util.functions.Func1; diff --git a/rxjava-core/src/main/java/rx/operators/OperationDebounce.java b/rxjava-core/src/main/java/rx/operators/OperationDebounce.java index 3dbdd81f202..2e312fee312 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDebounce.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDebounce.java @@ -23,7 +23,7 @@ import rx.Observer; import rx.Scheduler; import rx.Subscription; -import rx.concurrency.Schedulers; +import rx.schedulers.Schedulers; import rx.util.functions.Action0; import rx.util.functions.Func1; diff --git a/rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java b/rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java index 1b0aafb5783..871238a0b01 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java @@ -16,8 +16,8 @@ package rx.operators; import rx.Observable; -import rx.Observer; import rx.Observable.OnSubscribeFunc; +import rx.Observer; import rx.Subscription; /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationGroupByUntil.java b/rxjava-core/src/main/java/rx/operators/OperationGroupByUntil.java index 9869862b92b..63a271a6888 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationGroupByUntil.java +++ b/rxjava-core/src/main/java/rx/operators/OperationGroupByUntil.java @@ -19,6 +19,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; + import rx.Observable; import rx.Observable.OnSubscribeFunc; import rx.Observer; diff --git a/rxjava-core/src/main/java/rx/operators/OperationGroupJoin.java b/rxjava-core/src/main/java/rx/operators/OperationGroupJoin.java index 9c14efd6323..56d009db0a0 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationGroupJoin.java +++ b/rxjava-core/src/main/java/rx/operators/OperationGroupJoin.java @@ -19,6 +19,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; + import rx.Observable; import rx.Observable.OnSubscribeFunc; import rx.Observer; diff --git a/rxjava-core/src/main/java/rx/operators/OperationInterval.java b/rxjava-core/src/main/java/rx/operators/OperationInterval.java index 72d35d37de9..e6711bfb9ec 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationInterval.java +++ b/rxjava-core/src/main/java/rx/operators/OperationInterval.java @@ -21,7 +21,7 @@ import rx.Observer; import rx.Scheduler; import rx.Subscription; -import rx.concurrency.Schedulers; +import rx.schedulers.Schedulers; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; diff --git a/rxjava-core/src/main/java/rx/operators/OperationJoin.java b/rxjava-core/src/main/java/rx/operators/OperationJoin.java index b75b8498b00..f75bf109300 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationJoin.java +++ b/rxjava-core/src/main/java/rx/operators/OperationJoin.java @@ -17,6 +17,7 @@ import java.util.HashMap; import java.util.Map; + import rx.Observable; import rx.Observable.OnSubscribeFunc; import rx.Observer; diff --git a/rxjava-core/src/main/java/rx/operators/OperationJoinPatterns.java b/rxjava-core/src/main/java/rx/operators/OperationJoinPatterns.java index ffc304e12ad..be3c5375cf2 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationJoinPatterns.java +++ b/rxjava-core/src/main/java/rx/operators/OperationJoinPatterns.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; + import rx.Observable; import rx.Observable.OnSubscribeFunc; import rx.Observer; @@ -29,11 +30,9 @@ import rx.joins.Pattern1; import rx.joins.Pattern2; import rx.joins.Plan0; -import rx.subjects.PublishSubject; import rx.subscriptions.CompositeSubscription; import rx.util.functions.Action1; import rx.util.functions.Func1; -import rx.util.functions.Func2; /** * Join patterns: And, Then, When. diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java index 22876a4ced7..fed1cf4bb0f 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java @@ -24,10 +24,9 @@ import rx.Observer; import rx.Scheduler; import rx.Subscription; -import rx.concurrency.CurrentThreadScheduler; -import rx.concurrency.ImmediateScheduler; +import rx.schedulers.CurrentThreadScheduler; +import rx.schedulers.ImmediateScheduler; import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.SerialSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Action1; diff --git a/rxjava-core/src/main/java/rx/operators/OperationParallel.java b/rxjava-core/src/main/java/rx/operators/OperationParallel.java index 5a584ce370e..32ad3181f3e 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationParallel.java +++ b/rxjava-core/src/main/java/rx/operators/OperationParallel.java @@ -19,8 +19,8 @@ import rx.Observable; import rx.Scheduler; -import rx.concurrency.Schedulers; import rx.observables.GroupedObservable; +import rx.schedulers.Schedulers; import rx.util.functions.Func0; import rx.util.functions.Func1; diff --git a/rxjava-core/src/main/java/rx/operators/OperationParallelMerge.java b/rxjava-core/src/main/java/rx/operators/OperationParallelMerge.java index cb2dd0bc340..86aa31c6090 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationParallelMerge.java +++ b/rxjava-core/src/main/java/rx/operators/OperationParallelMerge.java @@ -19,8 +19,8 @@ import rx.Observable; import rx.Scheduler; -import rx.concurrency.Schedulers; import rx.observables.GroupedObservable; +import rx.schedulers.Schedulers; import rx.util.functions.Func1; public class OperationParallelMerge { diff --git a/rxjava-core/src/main/java/rx/operators/OperationRetry.java b/rxjava-core/src/main/java/rx/operators/OperationRetry.java index 430ef53ce4b..450dfe3ebbf 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationRetry.java +++ b/rxjava-core/src/main/java/rx/operators/OperationRetry.java @@ -23,7 +23,7 @@ import rx.Observer; import rx.Scheduler; import rx.Subscription; -import rx.concurrency.Schedulers; +import rx.schedulers.Schedulers; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.MultipleAssignmentSubscription; import rx.util.functions.Func2; diff --git a/rxjava-core/src/main/java/rx/operators/OperationSample.java b/rxjava-core/src/main/java/rx/operators/OperationSample.java index f5f8f96e3f6..a7fb426bf1f 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSample.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSample.java @@ -24,9 +24,8 @@ import rx.Observer; import rx.Scheduler; import rx.Subscription; -import rx.concurrency.Schedulers; +import rx.schedulers.Schedulers; import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.SerialSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; diff --git a/rxjava-core/src/main/java/rx/operators/OperationSequenceEqual.java b/rxjava-core/src/main/java/rx/operators/OperationSequenceEqual.java index 423ddbc3582..16300b664e2 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSequenceEqual.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSequenceEqual.java @@ -15,9 +15,7 @@ */ package rx.operators; -import static rx.Observable.concat; -import static rx.Observable.from; -import static rx.Observable.zip; +import static rx.Observable.*; import rx.Notification; import rx.Observable; import rx.util.functions.Func1; diff --git a/rxjava-core/src/main/java/rx/operators/OperationSkipUntil.java b/rxjava-core/src/main/java/rx/operators/OperationSkipUntil.java index e8f04fd383f..7e89b741e8b 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSkipUntil.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSkipUntil.java @@ -16,6 +16,7 @@ package rx.operators; import java.util.concurrent.atomic.AtomicBoolean; + import rx.Observable; import rx.Observable.OnSubscribeFunc; import rx.Observer; diff --git a/rxjava-core/src/main/java/rx/operators/OperationThrottleFirst.java b/rxjava-core/src/main/java/rx/operators/OperationThrottleFirst.java index e21617a82c7..8f5568d1b5d 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationThrottleFirst.java +++ b/rxjava-core/src/main/java/rx/operators/OperationThrottleFirst.java @@ -23,7 +23,7 @@ import rx.Observer; import rx.Scheduler; import rx.Subscription; -import rx.concurrency.Schedulers; +import rx.schedulers.Schedulers; import rx.util.functions.Func1; /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationTimeInterval.java b/rxjava-core/src/main/java/rx/operators/OperationTimeInterval.java index 2cd18607119..fe45b0c72d3 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTimeInterval.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTimeInterval.java @@ -20,7 +20,7 @@ import rx.Observer; import rx.Scheduler; import rx.Subscription; -import rx.concurrency.Schedulers; +import rx.schedulers.Schedulers; import rx.util.TimeInterval; /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationTimeout.java b/rxjava-core/src/main/java/rx/operators/OperationTimeout.java index b52c7a5a43a..8f4c5b4b0e1 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTimeout.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTimeout.java @@ -25,7 +25,7 @@ import rx.Observer; import rx.Scheduler; import rx.Subscription; -import rx.concurrency.Schedulers; +import rx.schedulers.Schedulers; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.SerialSubscription; import rx.util.functions.Action0; diff --git a/rxjava-core/src/main/java/rx/operators/OperationTimer.java b/rxjava-core/src/main/java/rx/operators/OperationTimer.java index 3bb462cac08..c8f9f3b33c4 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTimer.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTimer.java @@ -21,7 +21,7 @@ import rx.Observer; import rx.Scheduler; import rx.Subscription; -import rx.concurrency.Schedulers; +import rx.schedulers.Schedulers; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; diff --git a/rxjava-core/src/main/java/rx/operators/OperationToMap.java b/rxjava-core/src/main/java/rx/operators/OperationToMap.java index 754ff82d645..12963ce6a4b 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationToMap.java +++ b/rxjava-core/src/main/java/rx/operators/OperationToMap.java @@ -18,6 +18,7 @@ import java.util.HashMap; import java.util.Map; + import rx.Observable; import rx.Observable.OnSubscribeFunc; import rx.Observer; diff --git a/rxjava-core/src/main/java/rx/operators/OperationToMultimap.java b/rxjava-core/src/main/java/rx/operators/OperationToMultimap.java index 7210ee45e73..92e8f68432d 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationToMultimap.java +++ b/rxjava-core/src/main/java/rx/operators/OperationToMultimap.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; + import rx.Observable; import rx.Observable.OnSubscribeFunc; import rx.Observer; diff --git a/rxjava-core/src/main/java/rx/operators/OperationWindow.java b/rxjava-core/src/main/java/rx/operators/OperationWindow.java index f18a700bd23..ef867898ed8 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationWindow.java +++ b/rxjava-core/src/main/java/rx/operators/OperationWindow.java @@ -22,7 +22,7 @@ import rx.Observer; import rx.Scheduler; import rx.Subscription; -import rx.concurrency.Schedulers; +import rx.schedulers.Schedulers; import rx.util.functions.Func0; import rx.util.functions.Func1; diff --git a/rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java b/rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java new file mode 100644 index 00000000000..d1550a24228 --- /dev/null +++ b/rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java @@ -0,0 +1,99 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.schedulers; + +import java.util.PriorityQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import rx.Scheduler; +import rx.Subscription; +import rx.util.functions.Func2; + +/** + * Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed after the current unit of work is completed. + */ +public class CurrentThreadScheduler extends Scheduler { + private static final CurrentThreadScheduler INSTANCE = new CurrentThreadScheduler(); + + public static CurrentThreadScheduler getInstance() { + return INSTANCE; + } + + private static final ThreadLocal> QUEUE = new ThreadLocal>(); + + /* package accessible for unit tests */CurrentThreadScheduler() { + } + + private final AtomicInteger counter = new AtomicInteger(0); + + @Override + public Subscription schedule(T state, Func2 action) { + DiscardableAction discardableAction = new DiscardableAction(state, action); + enqueue(discardableAction, now()); + return discardableAction; + } + + @Override + public Subscription schedule(T state, Func2 action, long dueTime, TimeUnit unit) { + long execTime = now() + unit.toMillis(dueTime); + + DiscardableAction discardableAction = new DiscardableAction(state, new SleepingAction(action, this, execTime)); + enqueue(discardableAction, execTime); + return discardableAction; + } + + private void enqueue(DiscardableAction action, long execTime) { + PriorityQueue queue = QUEUE.get(); + boolean exec = queue == null; + + if (exec) { + queue = new PriorityQueue(); + QUEUE.set(queue); + } + + queue.add(new TimedAction(action, execTime, counter.incrementAndGet())); + + if (exec) { + while (!queue.isEmpty()) { + queue.poll().action.call(this); + } + + QUEUE.set(null); + } + } + + private static class TimedAction implements Comparable { + final DiscardableAction action; + final Long execTime; + final Integer count; // In case if time between enqueueing took less than 1ms + + private TimedAction(DiscardableAction action, Long execTime, Integer count) { + this.action = action; + this.execTime = execTime; + this.count = count; + } + + @Override + public int compareTo(TimedAction that) { + int result = execTime.compareTo(that.execTime); + if (result == 0) { + return count.compareTo(that.count); + } + return result; + } + } +} diff --git a/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java b/rxjava-core/src/main/java/rx/schedulers/DiscardableAction.java similarity index 98% rename from rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java rename to rxjava-core/src/main/java/rx/schedulers/DiscardableAction.java index 94d04075c5b..4ba336a864f 100644 --- a/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java +++ b/rxjava-core/src/main/java/rx/schedulers/DiscardableAction.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.concurrency; +package rx.schedulers; import java.util.concurrent.atomic.AtomicBoolean; diff --git a/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java new file mode 100644 index 00000000000..563e609612b --- /dev/null +++ b/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java @@ -0,0 +1,147 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.schedulers; + +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import rx.Scheduler; +import rx.Subscription; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Func2; + +/** + * A {@link Scheduler} implementation that uses an {@link Executor} or {@link ScheduledExecutorService} implementation. + *

+ * Note that if an {@link Executor} implementation is used instead of {@link ScheduledExecutorService} then a system-wide Timer will be used to handle delayed events. + */ +public class ExecutorScheduler extends Scheduler { + private final Executor executor; + + public ExecutorScheduler(Executor executor) { + this.executor = executor; + } + + public ExecutorScheduler(ScheduledExecutorService executor) { + this.executor = executor; + } + + @Override + public Subscription schedulePeriodically(final T state, final Func2 action, long initialDelay, long period, TimeUnit unit) { + if (executor instanceof ScheduledExecutorService) { + final CompositeSubscription subscriptions = new CompositeSubscription(); + + ScheduledFuture f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + Subscription s = action.call(ExecutorScheduler.this, state); + subscriptions.add(s); + } + }, initialDelay, period, unit); + + subscriptions.add(Subscriptions.from(f)); + return subscriptions; + + } else { + return super.schedulePeriodically(state, action, initialDelay, period, unit); + } + } + + @Override + public Subscription schedule(final T state, final Func2 action, long delayTime, TimeUnit unit) { + final DiscardableAction discardableAction = new DiscardableAction(state, action); + final Scheduler _scheduler = this; + // all subscriptions that may need to be unsubscribed + final CompositeSubscription subscription = new CompositeSubscription(discardableAction); + + if (executor instanceof ScheduledExecutorService) { + // we are a ScheduledExecutorService so can do proper scheduling + ScheduledFuture f = ((ScheduledExecutorService) executor).schedule(new Runnable() { + @Override + public void run() { + // when the delay has passed we now do the work on the actual scheduler + Subscription s = discardableAction.call(_scheduler); + // add the subscription to the CompositeSubscription so it is unsubscribed + subscription.add(s); + } + }, delayTime, unit); + // add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens + subscription.add(Subscriptions.from(f)); + } else { + // we are not a ScheduledExecutorService so can't directly schedule + if (delayTime == 0) { + // no delay so put on the thread-pool right now + Subscription s = schedule(state, action); + // add the subscription to the CompositeSubscription so it is unsubscribed + subscription.add(s); + } else { + // there is a delay and this isn't a ScheduledExecutorService so we'll use a system-wide ScheduledExecutorService + // to handle the scheduling and once it's ready then execute on this Executor + ScheduledFuture f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() { + + @Override + public void run() { + // now execute on the real Executor (by using the other overload that schedules for immediate execution) + Subscription s = _scheduler.schedule(state, action); + // add the subscription to the CompositeSubscription so it is unsubscribed + subscription.add(s); + } + }, delayTime, unit); + // add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens + subscription.add(Subscriptions.from(f)); + } + } + return subscription; + } + + @Override + public Subscription schedule(T state, Func2 action) { + final DiscardableAction discardableAction = new DiscardableAction(state, action); + final Scheduler _scheduler = this; + // all subscriptions that may need to be unsubscribed + final CompositeSubscription subscription = new CompositeSubscription(discardableAction); + + // work to be done on a thread + Runnable r = new Runnable() { + @Override + public void run() { + Subscription s = discardableAction.call(_scheduler); + // add the subscription to the CompositeSubscription so it is unsubscribed + subscription.add(s); + } + }; + + // submit for immediate execution + if (executor instanceof ExecutorService) { + // we are an ExecutorService so get a Future back that supports unsubscribe + Future f = ((ExecutorService) executor).submit(r); + // add the Future as a subscription so we can cancel the scheduled action if an unsubscribe happens + subscription.add(Subscriptions.from(f)); + } else { + // we are the lowest common denominator so can't unsubscribe once we execute + executor.execute(r); + } + + return subscription; + + } + +} diff --git a/rxjava-core/src/main/java/rx/concurrency/GenericScheduledExecutorService.java b/rxjava-core/src/main/java/rx/schedulers/GenericScheduledExecutorService.java similarity index 99% rename from rxjava-core/src/main/java/rx/concurrency/GenericScheduledExecutorService.java rename to rxjava-core/src/main/java/rx/schedulers/GenericScheduledExecutorService.java index 8bb520d9fee..14546301c51 100644 --- a/rxjava-core/src/main/java/rx/concurrency/GenericScheduledExecutorService.java +++ b/rxjava-core/src/main/java/rx/schedulers/GenericScheduledExecutorService.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.concurrency; +package rx.schedulers; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; diff --git a/rxjava-core/src/main/java/rx/schedulers/ImmediateScheduler.java b/rxjava-core/src/main/java/rx/schedulers/ImmediateScheduler.java new file mode 100644 index 00000000000..3963bce4c51 --- /dev/null +++ b/rxjava-core/src/main/java/rx/schedulers/ImmediateScheduler.java @@ -0,0 +1,49 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.schedulers; + +import java.util.concurrent.TimeUnit; + +import rx.Scheduler; +import rx.Subscription; +import rx.util.functions.Func2; + +/** + * Executes work immediately on the current thread. + */ +public final class ImmediateScheduler extends Scheduler { + private static final ImmediateScheduler INSTANCE = new ImmediateScheduler(); + + public static ImmediateScheduler getInstance() { + return INSTANCE; + } + + /* package accessible for unit tests */ImmediateScheduler() { + } + + @Override + public Subscription schedule(T state, Func2 action) { + return action.call(this, state); + } + + @Override + public Subscription schedule(T state, Func2 action, long dueTime, TimeUnit unit) { + // since we are executing immediately on this thread we must cause this thread to sleep + long execTime = now() + unit.toMillis(dueTime); + + return schedule(state, new SleepingAction(action, this, execTime)); + } +} diff --git a/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java new file mode 100644 index 00000000000..b2fdff50d2c --- /dev/null +++ b/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java @@ -0,0 +1,136 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.schedulers; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import rx.Scheduler; +import rx.Subscription; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Func2; + +/** + * Schedules work on a new thread. + */ +public class NewThreadScheduler extends Scheduler { + + private final static NewThreadScheduler INSTANCE = new NewThreadScheduler(); + private final static AtomicLong count = new AtomicLong(); + + public static NewThreadScheduler getInstance() { + return INSTANCE; + } + + private NewThreadScheduler() { + + } + + private static class EventLoopScheduler extends Scheduler { + private final ExecutorService executor; + + private EventLoopScheduler() { + executor = Executors.newFixedThreadPool(1, new ThreadFactory() { + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "RxNewThreadScheduler-" + count.incrementAndGet()); + } + }); + } + + @Override + public Subscription schedule(T state, Func2 action) { + final DiscardableAction discardableAction = new DiscardableAction(state, action); + // all subscriptions that may need to be unsubscribed + final CompositeSubscription subscription = new CompositeSubscription(discardableAction); + + final Scheduler _scheduler = this; + subscription.add(Subscriptions.from(executor.submit(new Runnable() { + + @Override + public void run() { + Subscription s = discardableAction.call(_scheduler); + subscription.add(s); + } + }))); + + return subscription; + } + + @Override + public Subscription schedule(final T state, final Func2 action, final long delayTime, final TimeUnit unit) { + // we will use the system scheduler since it doesn't make sense to launch a new Thread and then sleep + // we will instead schedule the event then launch the thread after the delay has passed + final Scheduler _scheduler = this; + final CompositeSubscription subscription = new CompositeSubscription(); + ScheduledFuture f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() { + + @Override + public void run() { + if (!subscription.isUnsubscribed()) { + // when the delay has passed we now do the work on the actual scheduler + Subscription s = _scheduler.schedule(state, action); + // add the subscription to the CompositeSubscription so it is unsubscribed + subscription.add(s); + } + } + }, delayTime, unit); + + // add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens + subscription.add(Subscriptions.from(f)); + + return subscription; + } + + } + + @Override + public Subscription schedule(T state, Func2 action) { + EventLoopScheduler s = new EventLoopScheduler(); + return s.schedule(state, action); + } + + @Override + public Subscription schedule(final T state, final Func2 action, long delay, TimeUnit unit) { + // we will use the system scheduler since it doesn't make sense to launch a new Thread and then sleep + // we will instead schedule the event then launch the thread after the delay has passed + final Scheduler _scheduler = this; + final CompositeSubscription subscription = new CompositeSubscription(); + ScheduledFuture f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() { + + @Override + public void run() { + if (!subscription.isUnsubscribed()) { + // when the delay has passed we now do the work on the actual scheduler + Subscription s = _scheduler.schedule(state, action); + // add the subscription to the CompositeSubscription so it is unsubscribed + subscription.add(s); + } + } + }, delay, unit); + + // add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens + subscription.add(Subscriptions.from(f)); + + return subscription; + } +} diff --git a/rxjava-core/src/main/java/rx/schedulers/Schedulers.java b/rxjava-core/src/main/java/rx/schedulers/Schedulers.java new file mode 100644 index 00000000000..b0c6a0cc9b3 --- /dev/null +++ b/rxjava-core/src/main/java/rx/schedulers/Schedulers.java @@ -0,0 +1,143 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.schedulers; + +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import rx.Scheduler; + +/** + * Static factory methods for creating Schedulers. + */ +public class Schedulers { + private static final ScheduledExecutorService COMPUTATION_EXECUTOR = createComputationExecutor(); + private static final Executor IO_EXECUTOR = createIOExecutor(); + + private Schedulers() { + + } + + /** + * {@link Scheduler} that executes work immediately on the current thread. + * + * @return {@link ImmediateScheduler} instance + */ + public static Scheduler immediate() { + return ImmediateScheduler.getInstance(); + } + + /** + * {@link Scheduler} that queues work on the current thread to be executed after the current work completes. + * + * @return {@link CurrentThreadScheduler} instance + */ + public static Scheduler currentThread() { + return CurrentThreadScheduler.getInstance(); + } + + /** + * {@link Scheduler} that creates a new {@link Thread} for each unit of work. + * + * @return {@link NewThreadScheduler} instance + */ + public static Scheduler newThread() { + return NewThreadScheduler.getInstance(); + } + + /** + * {@link Scheduler} that queues work on an {@link Executor}. + *

+ * Note that this does not support scheduled actions with a delay. + * + * @return {@link ExecutorScheduler} instance + */ + public static Scheduler executor(Executor executor) { + return new ExecutorScheduler(executor); + } + + /** + * {@link Scheduler} that queues work on an {@link ScheduledExecutorService}. + * + * @return {@link ExecutorScheduler} instance + */ + public static Scheduler executor(ScheduledExecutorService executor) { + return new ExecutorScheduler(executor); + } + + /** + * {@link Scheduler} intended for computational work. + *

+ * The implementation is backed by a {@link ScheduledExecutorService} thread-pool sized to the number of CPU cores. + *

+ * This can be used for event-loops, processing callbacks and other computational work. + *

+ * Do not perform IO-bound work on this scheduler. Use {@link #threadPoolForComputation()} instead. + * + * @return {@link ExecutorScheduler} for computation-bound work. + */ + public static Scheduler threadPoolForComputation() { + return executor(COMPUTATION_EXECUTOR); + } + + /** + * {@link Scheduler} intended for IO-bound work. + *

+ * The implementation is backed by an {@link Executor} thread-pool that will grow as needed. + *

+ * This can be used for asynchronously performing blocking IO. + *

+ * Do not perform computational work on this scheduler. Use {@link #threadPoolForComputation()} instead. + * + * @return {@link ExecutorScheduler} for IO-bound work. + */ + public static Scheduler threadPoolForIO() { + return executor(IO_EXECUTOR); + } + + private static ScheduledExecutorService createComputationExecutor() { + int cores = Runtime.getRuntime().availableProcessors(); + return Executors.newScheduledThreadPool(cores, new ThreadFactory() { + final AtomicInteger counter = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "RxComputationThreadPool-" + counter.incrementAndGet()); + t.setDaemon(true); + return t; + } + }); + } + + private static Executor createIOExecutor() { + Executor result = Executors.newCachedThreadPool(new ThreadFactory() { + final AtomicLong counter = new AtomicLong(); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "RxIOThreadPool-" + counter.incrementAndGet()); + t.setDaemon(true); + return t; + } + }); + + return result; + } +} diff --git a/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java b/rxjava-core/src/main/java/rx/schedulers/SleepingAction.java similarity index 98% rename from rxjava-core/src/main/java/rx/concurrency/SleepingAction.java rename to rxjava-core/src/main/java/rx/schedulers/SleepingAction.java index 925403e846e..33b1e869df5 100644 --- a/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java +++ b/rxjava-core/src/main/java/rx/schedulers/SleepingAction.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.concurrency; +package rx.schedulers; import rx.Scheduler; import rx.Subscription; diff --git a/rxjava-core/src/main/java/rx/schedulers/TestScheduler.java b/rxjava-core/src/main/java/rx/schedulers/TestScheduler.java new file mode 100644 index 00000000000..27c8e44f9b0 --- /dev/null +++ b/rxjava-core/src/main/java/rx/schedulers/TestScheduler.java @@ -0,0 +1,120 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.schedulers; + +import java.util.Comparator; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import rx.Scheduler; +import rx.Subscription; +import rx.util.functions.Func2; + +public class TestScheduler extends Scheduler { + private final Queue> queue = new PriorityQueue>(11, new CompareActionsByTime()); + + private static class TimedAction { + + private final long time; + private final Func2 action; + private final T state; + private final TestScheduler scheduler; + private final AtomicBoolean isCancelled = new AtomicBoolean(false); + + private TimedAction(TestScheduler scheduler, long time, Func2 action, T state) { + this.time = time; + this.action = action; + this.state = state; + this.scheduler = scheduler; + } + + public void cancel() { + isCancelled.set(true); + } + + @Override + public String toString() { + return String.format("TimedAction(time = %d, action = %s)", time, action.toString()); + } + } + + private static class CompareActionsByTime implements Comparator> { + @Override + public int compare(TimedAction action1, TimedAction action2) { + return Long.valueOf(action1.time).compareTo(Long.valueOf(action2.time)); + } + } + + // Storing time in nanoseconds internally. + private long time; + + @Override + public long now() { + return TimeUnit.NANOSECONDS.toMillis(time); + } + + public void advanceTimeBy(long delayTime, TimeUnit unit) { + advanceTimeTo(time + unit.toNanos(delayTime), TimeUnit.NANOSECONDS); + } + + public void advanceTimeTo(long delayTime, TimeUnit unit) { + long targetTime = unit.toNanos(delayTime); + triggerActions(targetTime); + } + + public void triggerActions() { + triggerActions(time); + } + + @SuppressWarnings("unchecked") + private void triggerActions(long targetTimeInNanos) { + while (!queue.isEmpty()) { + TimedAction current = queue.peek(); + if (current.time > targetTimeInNanos) { + break; + } + time = current.time; + queue.remove(); + + // Only execute if the TimedAction has not yet been cancelled + if (!current.isCancelled.get()) { + // because the queue can have wildcards we have to ignore the type T for the state + ((Func2) current.action).call(current.scheduler, current.state); + } + } + time = targetTimeInNanos; + } + + @Override + public Subscription schedule(T state, Func2 action) { + return schedule(state, action, 0, TimeUnit.MILLISECONDS); + } + + @Override + public Subscription schedule(T state, Func2 action, long delayTime, TimeUnit unit) { + final TimedAction timedAction = new TimedAction(this, time + unit.toNanos(delayTime), action, state); + queue.add(timedAction); + + return new Subscription() { + @Override + public void unsubscribe() { + timedAction.cancel(); + } + }; + } +} diff --git a/rxjava-core/src/main/java/rx/concurrency/package-info.java b/rxjava-core/src/main/java/rx/schedulers/package-info.java similarity index 96% rename from rxjava-core/src/main/java/rx/concurrency/package-info.java rename to rxjava-core/src/main/java/rx/schedulers/package-info.java index e0657722fbe..0bfe32d4747 100644 --- a/rxjava-core/src/main/java/rx/concurrency/package-info.java +++ b/rxjava-core/src/main/java/rx/schedulers/package-info.java @@ -16,4 +16,4 @@ /** * Rx Schedulers */ -package rx.concurrency; \ No newline at end of file +package rx.schedulers; \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/functions/Actions.java b/rxjava-core/src/main/java/rx/util/functions/Actions.java index 4fb017cb36e..7a33d451ce0 100644 --- a/rxjava-core/src/main/java/rx/util/functions/Actions.java +++ b/rxjava-core/src/main/java/rx/util/functions/Actions.java @@ -16,8 +16,6 @@ package rx.util.functions; import rx.Observer; -import rx.util.functions.Action0; -import rx.util.functions.Action1; /** * Utility class for the Action interfaces. diff --git a/rxjava-core/src/main/java/rx/util/functions/Async.java b/rxjava-core/src/main/java/rx/util/functions/Async.java index f31ab3780c8..51a17877151 100644 --- a/rxjava-core/src/main/java/rx/util/functions/Async.java +++ b/rxjava-core/src/main/java/rx/util/functions/Async.java @@ -18,8 +18,8 @@ import rx.Observable; import rx.Scheduler; -import rx.concurrency.ExecutorScheduler; -import rx.concurrency.Schedulers; +import rx.schedulers.ExecutorScheduler; +import rx.schedulers.Schedulers; import rx.subjects.AsyncSubject; /** diff --git a/rxjava-core/src/test/java/rx/EventStream.java b/rxjava-core/src/test/java/rx/EventStream.java index f32787bac1e..f41e670923e 100644 --- a/rxjava-core/src/test/java/rx/EventStream.java +++ b/rxjava-core/src/test/java/rx/EventStream.java @@ -20,7 +20,7 @@ import java.util.Map; import rx.Observable.OnSubscribeFunc; -import rx.concurrency.Schedulers; +import rx.schedulers.Schedulers; import rx.subscriptions.BooleanSubscription; import rx.util.functions.Action0; diff --git a/rxjava-core/src/test/java/rx/ObservableTests.java b/rxjava-core/src/test/java/rx/ObservableTests.java index e4023f44ab2..43b81195af9 100644 --- a/rxjava-core/src/test/java/rx/ObservableTests.java +++ b/rxjava-core/src/test/java/rx/ObservableTests.java @@ -35,7 +35,7 @@ import org.mockito.MockitoAnnotations; import rx.Observable.OnSubscribeFunc; -import rx.concurrency.TestScheduler; +import rx.schedulers.TestScheduler; import rx.observables.ConnectableObservable; import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; diff --git a/rxjava-core/src/test/java/rx/ObserveOnTests.java b/rxjava-core/src/test/java/rx/ObserveOnTests.java index 57d13c35851..3f9e474787c 100644 --- a/rxjava-core/src/test/java/rx/ObserveOnTests.java +++ b/rxjava-core/src/test/java/rx/ObserveOnTests.java @@ -21,7 +21,7 @@ import org.junit.Test; -import rx.concurrency.Schedulers; +import rx.schedulers.Schedulers; import rx.util.functions.Action1; import rx.util.functions.Func1; diff --git a/rxjava-core/src/test/java/rx/RefCountTests.java b/rxjava-core/src/test/java/rx/RefCountTests.java index b83e94fcae3..3f02933a800 100644 --- a/rxjava-core/src/test/java/rx/RefCountTests.java +++ b/rxjava-core/src/test/java/rx/RefCountTests.java @@ -27,7 +27,7 @@ import org.junit.Test; import org.mockito.MockitoAnnotations; -import rx.concurrency.TestScheduler; +import rx.schedulers.TestScheduler; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Action1; diff --git a/rxjava-core/src/test/java/rx/SchedulersTest.java b/rxjava-core/src/test/java/rx/SchedulersTest.java index c9d97054d08..62e74f5798b 100644 --- a/rxjava-core/src/test/java/rx/SchedulersTest.java +++ b/rxjava-core/src/test/java/rx/SchedulersTest.java @@ -31,8 +31,8 @@ import org.mockito.Mockito; import rx.Observable.OnSubscribeFunc; -import rx.concurrency.Schedulers; -import rx.concurrency.TestScheduler; +import rx.schedulers.Schedulers; +import rx.schedulers.TestScheduler; import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; diff --git a/rxjava-core/src/test/java/rx/ThrottleFirstTests.java b/rxjava-core/src/test/java/rx/ThrottleFirstTests.java index 655754d398e..bbba4214774 100644 --- a/rxjava-core/src/test/java/rx/ThrottleFirstTests.java +++ b/rxjava-core/src/test/java/rx/ThrottleFirstTests.java @@ -22,7 +22,7 @@ import org.junit.Test; import org.mockito.InOrder; -import rx.concurrency.TestScheduler; +import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; public class ThrottleFirstTests { diff --git a/rxjava-core/src/test/java/rx/ThrottleLastTests.java b/rxjava-core/src/test/java/rx/ThrottleLastTests.java index c3a037a78c8..94ec7051b28 100644 --- a/rxjava-core/src/test/java/rx/ThrottleLastTests.java +++ b/rxjava-core/src/test/java/rx/ThrottleLastTests.java @@ -22,7 +22,7 @@ import org.junit.Test; import org.mockito.InOrder; -import rx.concurrency.TestScheduler; +import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; public class ThrottleLastTests { diff --git a/rxjava-core/src/test/java/rx/ThrottleWithTimeoutTests.java b/rxjava-core/src/test/java/rx/ThrottleWithTimeoutTests.java index ead4ddb24e8..2e27c81cce8 100644 --- a/rxjava-core/src/test/java/rx/ThrottleWithTimeoutTests.java +++ b/rxjava-core/src/test/java/rx/ThrottleWithTimeoutTests.java @@ -22,7 +22,7 @@ import org.junit.Test; import org.mockito.InOrder; -import rx.concurrency.TestScheduler; +import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; public class ThrottleWithTimeoutTests { diff --git a/rxjava-core/src/test/java/rx/TimeoutTests.java b/rxjava-core/src/test/java/rx/TimeoutTests.java index 46a3620ce5e..144824d2228 100644 --- a/rxjava-core/src/test/java/rx/TimeoutTests.java +++ b/rxjava-core/src/test/java/rx/TimeoutTests.java @@ -30,7 +30,7 @@ import org.mockito.InOrder; import org.mockito.MockitoAnnotations; -import rx.concurrency.TestScheduler; +import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; public class TimeoutTests { diff --git a/rxjava-core/src/test/java/rx/operators/OperationAmbTest.java b/rxjava-core/src/test/java/rx/operators/OperationAmbTest.java index 784d6861985..f6e5cd859cc 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationAmbTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationAmbTest.java @@ -29,7 +29,7 @@ import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Subscription; -import rx.concurrency.TestScheduler; +import rx.schedulers.TestScheduler; import rx.subscriptions.CompositeSubscription; import rx.util.functions.Action0; diff --git a/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java b/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java index a2de69ed63e..dc45a16a8f1 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationBufferTest.java @@ -34,7 +34,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.concurrency.TestScheduler; +import rx.schedulers.TestScheduler; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Action1; diff --git a/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java b/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java index 0f3f8f88669..24146ec131d 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationConcatTest.java @@ -32,7 +32,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.concurrency.TestScheduler; +import rx.schedulers.TestScheduler; import rx.subscriptions.BooleanSubscription; public class OperationConcatTest { diff --git a/rxjava-core/src/test/java/rx/operators/OperationDebounceTest.java b/rxjava-core/src/test/java/rx/operators/OperationDebounceTest.java index e831c7b8296..9779239f956 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationDebounceTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationDebounceTest.java @@ -27,7 +27,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.concurrency.TestScheduler; +import rx.schedulers.TestScheduler; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; diff --git a/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java b/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java index 224a82be553..538131731d5 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.operators; import static org.mockito.Matchers.any; @@ -18,7 +33,7 @@ import rx.Observable; import rx.Observer; -import rx.concurrency.TestScheduler; +import rx.schedulers.TestScheduler; import rx.util.functions.Func1; public class OperationDelayTest { diff --git a/rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java b/rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java index 6c1407ebeaf..e1e72a249a4 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java @@ -33,7 +33,7 @@ import rx.Observable; import rx.Observer; -import rx.concurrency.Schedulers; +import rx.schedulers.Schedulers; import rx.util.functions.Func1; import rx.util.functions.Func2; import rx.util.functions.Action1; diff --git a/rxjava-core/src/test/java/rx/operators/OperationIntervalTest.java b/rxjava-core/src/test/java/rx/operators/OperationIntervalTest.java index 293ff236c9c..23c786fc895 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationIntervalTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationIntervalTest.java @@ -27,7 +27,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.concurrency.TestScheduler; +import rx.schedulers.TestScheduler; import rx.observables.ConnectableObservable; public class OperationIntervalTest { diff --git a/rxjava-core/src/test/java/rx/operators/OperationMapTest.java b/rxjava-core/src/test/java/rx/operators/OperationMapTest.java index 175cd66f752..463b2dca7a2 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationMapTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationMapTest.java @@ -33,7 +33,7 @@ import rx.Observable; import rx.Observer; -import rx.concurrency.Schedulers; +import rx.schedulers.Schedulers; import rx.util.functions.Func1; import rx.util.functions.Func2; diff --git a/rxjava-core/src/test/java/rx/operators/OperationNextTest.java b/rxjava-core/src/test/java/rx/operators/OperationNextTest.java index 140cc0560c2..8a5de26d1d8 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationNextTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationNextTest.java @@ -30,7 +30,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.concurrency.Schedulers; +import rx.schedulers.Schedulers; import rx.subjects.PublishSubject; import rx.subjects.Subject; import rx.subscriptions.Subscriptions; diff --git a/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java b/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java index 63fa0e4f279..69fa6b1718f 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java @@ -30,8 +30,8 @@ import rx.Observable; import rx.Observer; -import rx.concurrency.Schedulers; -import rx.concurrency.TestScheduler; +import rx.schedulers.Schedulers; +import rx.schedulers.TestScheduler; import rx.util.functions.Action1; public class OperationObserveOnTest { diff --git a/rxjava-core/src/main/java/rx/operators/OperationParallelMergeTest.java b/rxjava-core/src/test/java/rx/operators/OperationParallelMergeTest.java similarity index 99% rename from rxjava-core/src/main/java/rx/operators/OperationParallelMergeTest.java rename to rxjava-core/src/test/java/rx/operators/OperationParallelMergeTest.java index a001877a71a..907993ef104 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationParallelMergeTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationParallelMergeTest.java @@ -24,7 +24,7 @@ import org.junit.Test; import rx.Observable; -import rx.concurrency.Schedulers; +import rx.schedulers.Schedulers; import rx.subjects.PublishSubject; import rx.util.functions.Action1; import rx.util.functions.Func1; diff --git a/rxjava-core/src/test/java/rx/operators/OperationSampleTest.java b/rxjava-core/src/test/java/rx/operators/OperationSampleTest.java index 18316889c13..58c98895ec9 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationSampleTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationSampleTest.java @@ -26,7 +26,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.concurrency.TestScheduler; +import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; diff --git a/rxjava-core/src/test/java/rx/operators/OperationSubscribeOnTest.java b/rxjava-core/src/test/java/rx/operators/OperationSubscribeOnTest.java index 635d4d8755f..3f605abf97e 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationSubscribeOnTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationSubscribeOnTest.java @@ -25,7 +25,7 @@ import rx.Observer; import rx.Scheduler; import rx.Subscription; -import rx.concurrency.Schedulers; +import rx.schedulers.Schedulers; import rx.test.OperatorTester; import rx.util.functions.Action0; import rx.util.functions.Func2; diff --git a/rxjava-core/src/test/java/rx/operators/OperationSwitchTest.java b/rxjava-core/src/test/java/rx/operators/OperationSwitchTest.java index fa38f02ff86..8afa5557f13 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationSwitchTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationSwitchTest.java @@ -27,7 +27,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.concurrency.TestScheduler; +import rx.schedulers.TestScheduler; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; diff --git a/rxjava-core/src/test/java/rx/operators/OperationThrottleFirstTest.java b/rxjava-core/src/test/java/rx/operators/OperationThrottleFirstTest.java index ad58e136047..682f0305297 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationThrottleFirstTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationThrottleFirstTest.java @@ -27,7 +27,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.concurrency.TestScheduler; +import rx.schedulers.TestScheduler; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; diff --git a/rxjava-core/src/test/java/rx/operators/OperationTimeIntervalTest.java b/rxjava-core/src/test/java/rx/operators/OperationTimeIntervalTest.java index 056b97bf118..c71ade0166f 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationTimeIntervalTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationTimeIntervalTest.java @@ -27,7 +27,7 @@ import rx.Observable; import rx.Observer; -import rx.concurrency.TestScheduler; +import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; import rx.util.TimeInterval; diff --git a/rxjava-core/src/test/java/rx/operators/OperationTimestampTest.java b/rxjava-core/src/test/java/rx/operators/OperationTimestampTest.java index 9794b6c7f58..90baedd3dc1 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationTimestampTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationTimestampTest.java @@ -24,7 +24,7 @@ import org.mockito.MockitoAnnotations; import rx.Observable; import rx.Observer; -import rx.concurrency.TestScheduler; +import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; import rx.util.Timestamped; diff --git a/rxjava-core/src/test/java/rx/operators/OperationToMultimapTest.java b/rxjava-core/src/test/java/rx/operators/OperationToMultimapTest.java index b254d93d2e6..478f5f1da8f 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationToMultimapTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationToMultimapTest.java @@ -1,9 +1,18 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ - package rx.operators; import java.util.ArrayList; import java.util.Arrays; diff --git a/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java b/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java index 6eff5828786..180179a9021 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java @@ -28,7 +28,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.concurrency.TestScheduler; +import rx.schedulers.TestScheduler; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Action1; diff --git a/rxjava-core/src/test/java/rx/operators/OperationZipTestCompletion.java b/rxjava-core/src/test/java/rx/operators/OperationZipTestCompletion.java index 3c0913f64d9..23baf7652c8 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationZipTestCompletion.java +++ b/rxjava-core/src/test/java/rx/operators/OperationZipTestCompletion.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.operators; import static org.mockito.Mockito.inOrder; diff --git a/rxjava-core/src/test/java/rx/concurrency/CurrentThreadSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/CurrentThreadSchedulerTest.java similarity index 99% rename from rxjava-core/src/test/java/rx/concurrency/CurrentThreadSchedulerTest.java rename to rxjava-core/src/test/java/rx/schedulers/CurrentThreadSchedulerTest.java index 3613b7c5924..b71d96af9e0 100644 --- a/rxjava-core/src/test/java/rx/concurrency/CurrentThreadSchedulerTest.java +++ b/rxjava-core/src/test/java/rx/schedulers/CurrentThreadSchedulerTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.concurrency; +package rx.schedulers; import static org.mockito.Mockito.*; diff --git a/rxjava-core/src/test/java/rx/concurrency/ImmediateSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java similarity index 99% rename from rxjava-core/src/test/java/rx/concurrency/ImmediateSchedulerTest.java rename to rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java index 593f6b6a522..bfece3af6c3 100644 --- a/rxjava-core/src/test/java/rx/concurrency/ImmediateSchedulerTest.java +++ b/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.concurrency; +package rx.schedulers; import static org.mockito.Mockito.*; diff --git a/rxjava-core/src/test/java/rx/concurrency/SchedulerUnsubscribeTest.java b/rxjava-core/src/test/java/rx/schedulers/SchedulerUnsubscribeTest.java similarity index 82% rename from rxjava-core/src/test/java/rx/concurrency/SchedulerUnsubscribeTest.java rename to rxjava-core/src/test/java/rx/schedulers/SchedulerUnsubscribeTest.java index a89da84304f..ec51d7f977e 100644 --- a/rxjava-core/src/test/java/rx/concurrency/SchedulerUnsubscribeTest.java +++ b/rxjava-core/src/test/java/rx/schedulers/SchedulerUnsubscribeTest.java @@ -1,4 +1,19 @@ -package rx.concurrency; +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.schedulers; import static org.junit.Assert.*; @@ -11,6 +26,7 @@ import rx.Observable; import rx.Observer; import rx.Scheduler; +import rx.schedulers.Schedulers; import rx.operators.SafeObservableSubscription; import rx.util.functions.Func1; diff --git a/rxjava-core/src/test/java/rx/util/AssertObservable.java b/rxjava-core/src/test/java/rx/util/AssertObservable.java index 1bd34fcd239..196a12e92f8 100644 --- a/rxjava-core/src/test/java/rx/util/AssertObservable.java +++ b/rxjava-core/src/test/java/rx/util/AssertObservable.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.util; import rx.Notification; diff --git a/rxjava-core/src/test/java/rx/util/AssertObservableTest.java b/rxjava-core/src/test/java/rx/util/AssertObservableTest.java index f2182bd8cfb..a67381e89f3 100644 --- a/rxjava-core/src/test/java/rx/util/AssertObservableTest.java +++ b/rxjava-core/src/test/java/rx/util/AssertObservableTest.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.util; import org.junit.Test; diff --git a/rxjava-core/src/test/java/rx/util/functions/AsyncTest.java b/rxjava-core/src/test/java/rx/util/functions/AsyncTest.java index cc761708942..05a75c4f855 100644 --- a/rxjava-core/src/test/java/rx/util/functions/AsyncTest.java +++ b/rxjava-core/src/test/java/rx/util/functions/AsyncTest.java @@ -27,7 +27,7 @@ import static org.mockito.Mockito.verify; import org.mockito.MockitoAnnotations; import rx.Observer; -import rx.concurrency.Schedulers; +import rx.schedulers.Schedulers; import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Action2;