diff --git a/language-adaptors/rxjava-scala/README.md b/language-adaptors/rxjava-scala/README.md index 5afb2392cc6..30b8d75b632 100644 --- a/language-adaptors/rxjava-scala/README.md +++ b/language-adaptors/rxjava-scala/README.md @@ -1,144 +1,103 @@ -Alternative Rx bindings for Scala -================================= +# Scala Adaptor for RxJava + +This adaptor allows to use RxJava in Scala with anonymous functions, e.g. -The current RxScala binding attempt to optimize for seamless interop between Scala and Java. -The intended interop is illustrated by the following example where in Scala a class is defined that takes -an `Observable[Movie]` that is transformed using RxScala operators: ```scala -class MovieLib(val moviesStream: Observable[Movie]) { - val threshold = 1200 - def shortMovies: Observable[Movie] = ??? - def longMovies: Observable[Movie] = ??? -} +val o = Observable.interval(200 millis).take(5) +o.subscribe(n => println("n = " + n)) +Observable(1, 2, 3, 4).reduce(_ + _) ``` -which is then called in Java, passing a Java `Observable` to the constructor -```java -public void test() { - MovieLib lib = new MovieLib(Observable.from(...)); - lib.longMovies().subscribe(moviePrinter); -} -``` -The technique used to obtain this transparency is to use a value class with a private constructor that implements -the Rx operators in an idiomatic Scala way, and a companion object that is used to construct instances in Scala -```scala -object Observable { - def apply[T](asJava: rx.Observable[_ <: T]): Observable[T] = { new Observable[T](asJava) } -} +For-comprehensions are also supported: -class Observable[+T] private[scala] (val asJava: rx.Observable[_ <: T]) extends AnyVal { - // Idiomatic Scala friendly definitions of Rx operators -} -``` -Since `rx.lang.scala.Observable[T] extends AnyVal`, the underlying representation of `rx.lang.scala.Observable[T]` -is the same as `rx.Observable`. Because `rx.lang.scala.Observable[T]` is an opaque type in Scala, -the Scala programmer only sees the Scala-friendly operators. - -However, in the current the illusion of interop is quickly lost when going beyond this simple example. -For example but type `Notification[T]` and `Scheduler[T]` are defined using wrappers, -and hence they are not compatible with `Notification` respectively `Scheduler`. -For instance, when materializing an `Observable[T]` in Scala to an `Observable[Notification[T]]`, -we lost the seamless interop with `Observable>` on the Java side. - -However, the real problems with seamless interop show up when we try to creating bindings for other Rx types. -In particular types that have inheritance or more structure. - -For example, RxScala currently defines a type synonym `type Observer[-T] = rx.Observer[_ >: T]`, -but no further bindings for observers. -Similarly, for subjects RxScala defines `type Subject[-T, +R] = rx.subjects.Subject[_ >: T, _ <: R]`. -The problem with these definitions is that on the Java side, subjects are defined as: ```scala -public abstract class Subject extends Observable implements Observer { …} +val first = Observable(10, 11, 12) +val second = Observable(10, 11, 12) +val booleans = for ((n1, n2) <- (first zip second)) yield (n1 == n2) ``` -without binding any of the Rx subjects. - -The consequence is that `Subject[S,T]` in Scala is unrelated to `rx.lang.scala.Observable[T]` in Scala, -but shows up as a `rx.Observable[T]`. The problem however is that if we want to expose subjects in Scala -such that they derive from both `Observable[S]` and `Observer[T]` we cannot use the `extend AnyVal` trick -we used for `Observable[T]` and immediately lose transparent interop with Java. - -The problem is even worse because `AsyncSubject`, `BehaviorSubject`, … all derive from `Subject`, -so if we want them to derive from a common base `Subject[T,T]` type in Scala we lose transparency for those as well. -And again, if we expose the various subjects by extending `AnyVal`, they are useless in Scala because they do not inherit -from a common base type. To avoid implementing all methods of observable and observer on each specific subject -we might add implicit conversions to `Observable[T]` and `Observer[T]` but that still does not give Scala users -a native `Subject[S,T]` type. + +Further, this adaptor attempts to expose an API which is as Scala-idiomatic as possible. This means that certain methods have been renamed, their signature was changed, or static methods were changed to instance methods. Some examples: + ```scala -object AsyncSubject { - def apply[T](): AsyncSubject[T] = - new AsyncSubject[T](rx.subjects.AsyncSubject.create()) -} + // instead of concat: +def ++[U >: T](that: Observable[U]): Observable[U] + +// instance method instead of static: +def zip[U](that: Observable[U]): Observable[(T, U)] + +// the implicit evidence argument ensures that dematerialize can only be called on Observables of Notifications: +def dematerialize[U](implicit evidence: T <:< Notification[U]): Observable[U] + +// additional type parameter U with lower bound to get covariance right: +def onErrorResumeNext[U >: T](resumeFunction: Throwable => Observable[U]): Observable[U] -class AsyncSubject[T] private [scala] (val inner: rx.subjects.AsyncSubject[T]) - extends AnyVal -{ … } +// curried in Scala collections, so curry fold also here: +def fold[R](initialValue: R)(accumulator: (R, T) => R): Observable[R] -implicit final def asObservable[T](subject: AsyncSubject[T]): Observable[T] = - Observable(subject.inner) +// using Duration instead of (long timepan, TimeUnit duration): +def sample(duration: Duration): Observable[T] -implicit final def asObserver[T](subject: AsyncSubject[T]): Observer[T] = - subject.inner +// called skip in Java, but drop in Scala +def drop(n: Int): Observable[T] + +// there's only mapWithIndex in Java, because Java doesn't have tuples: +def zipWithIndex: Observable[(T, Int)] + +// corresponds to Java's toList: +def toSeq: Observable[Seq[T]] + +// the implicit evidence argument ensures that switch can only be called on Observables of Observables: +def switch[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] + +// Java's from becomes apply, and we use Scala Range +def apply(range: Range): Observable[Int] + +// use Bottom type: +def never: Observable[Nothing] ``` -The inheritance problem is not just limited to subjects, but also surfaces for subscriptions. -Rx scala currently defines `type Subscription = rx.Subscription` using a type synonym as well, -and we run into exactly the same problems as with subjects when we try to bind the -various Rx subscriptions `BooleanSubscription`, `SerialSubscription`, etc. -Since we cannot wrap Rx types in Scala such that they are both (a) transparently interoperable with Java, -and (b) feel native and idiomatic to Scala, we should decide in favor of optimizing RxScala for Scala -and consumption of Rx values from Java but not for Scala as a producer. +Also, the Scala Observable is fully covariant in its type parameter, whereas the Java Observable only achieves partial covariance due to limitations of Java's type system (or if you can fix this, your suggestions are very welcome). -If we take that approach, we can make bindings that feels like a completely native Scala library, -without needing any complications of the Scala side. -```scala -object Observer { …} -trait Observable[+T] { - def asJavaObservable: rx.Observable[_ <: T] -} - -object Observer {…} -trait Observer[-T] { - def asJavaObserver: rx.Observer[_ >: T] -} - -object Subject {…} -trait Subject[-T, +R] extends Observable[R] with Observer[T] { - val asJavaSubject: rx.subjects.Subject[_ >: T, _<: R] -} - -object Scheduler {…} -trait Scheduler { - def asJavaScheduler: rx.Scheduler; -} - -object Notification {…} -trait Notification[+T] { - def asJavaNotification: rx.Notification[_ <: T] -} - -object Subscription {…} -trait Subscription { - def asJavaSubscription: rx.Subscription -} +For more examples, see [RxScalaDemo.scala](https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala). + +Scala code using Rx should only import members from `rx.lang.scala` and below. + + +## Documentation + +The API documentation can be found [here](http://rxscala.github.io/scaladoc/index.html#rx.lang.scala.Observable). + +Note that starting from version 0.15, `rx.lang.scala.Observable` is not a value class any more. [./Rationale.md](https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/Rationale.md) explains why. + +You can build the API documentation yourself by running `./gradlew scaladoc` in the RxJava root directory. + +Then navigate to `RxJava/language-adaptors/rxjava-scala/build/docs/scaladoc/index.html` to display it. + + +## Binaries + +Binaries and dependency information for Maven, Ivy, Gradle and others can be found at [http://search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22rxjava-scala%22). + +Example for Maven: + +```xml + + com.netflix.rxjava + rxjava-scala + x.y.z + +``` + +and for Ivy: + +```xml + ``` -You pay the price when crossing the Scala/Java interop boundary, which is where it should be. -The proper way is to put the burden of interop on the Scala side, in case you want to create -a reusable Rx-based library in Scala, or wrap and unwrap on the Java side. -```java -public static void main(String[] args) { - - Observable movies = Observable.from(new Movie(3000), new Movie(1000), new Movie(2000)); - MovieLib lib = new MovieLib(toScalaObservable(movies)); - lib.longMovies().asJavaObservable().subscribe(m -> - System.out.println("A movie of length " + m.lengthInSeconds() + "s") - ); -} + +and for sbt: + +```scala +libraryDependencies ++= Seq( + "com.netflix.rxjava" % "rxjava-scala" % "x.y.z" +) ``` -Delegation versus Inheritance ------------------------------ -The obvious thought is that using delegation instead of inheritance (http://c2.com/cgi/wiki?DelegationIsInheritance) -will lead to excessive wrapping, since all Scala types wrap and delegate to an underlying RxJava implementation. -Note however, that the wrapping happens at query generation time and incurs no overhead when messages are flowing -through the pipeline. Say we have a query `xs.map(f).filter(p).subscribe(o)`. Even though the Scala types are wrappers, -the callback that is registered with xs is something like `x => { val y = f(x); if(p(y)){ o.asJavaObserver.onNext(y) }}` -and hence there is no additional runtime penalty. \ No newline at end of file diff --git a/language-adaptors/rxjava-scala/Rationale.md b/language-adaptors/rxjava-scala/Rationale.md new file mode 100644 index 00000000000..5afb2392cc6 --- /dev/null +++ b/language-adaptors/rxjava-scala/Rationale.md @@ -0,0 +1,144 @@ +Alternative Rx bindings for Scala +================================= + +The current RxScala binding attempt to optimize for seamless interop between Scala and Java. +The intended interop is illustrated by the following example where in Scala a class is defined that takes +an `Observable[Movie]` that is transformed using RxScala operators: +```scala +class MovieLib(val moviesStream: Observable[Movie]) { + val threshold = 1200 + def shortMovies: Observable[Movie] = ??? + def longMovies: Observable[Movie] = ??? +} +``` +which is then called in Java, passing a Java `Observable` to the constructor +```java +public void test() { + MovieLib lib = new MovieLib(Observable.from(...)); + + lib.longMovies().subscribe(moviePrinter); +} +``` +The technique used to obtain this transparency is to use a value class with a private constructor that implements +the Rx operators in an idiomatic Scala way, and a companion object that is used to construct instances in Scala +```scala +object Observable { + def apply[T](asJava: rx.Observable[_ <: T]): Observable[T] = { new Observable[T](asJava) } +} + +class Observable[+T] private[scala] (val asJava: rx.Observable[_ <: T]) extends AnyVal { + // Idiomatic Scala friendly definitions of Rx operators +} +``` +Since `rx.lang.scala.Observable[T] extends AnyVal`, the underlying representation of `rx.lang.scala.Observable[T]` +is the same as `rx.Observable`. Because `rx.lang.scala.Observable[T]` is an opaque type in Scala, +the Scala programmer only sees the Scala-friendly operators. + +However, in the current the illusion of interop is quickly lost when going beyond this simple example. +For example but type `Notification[T]` and `Scheduler[T]` are defined using wrappers, +and hence they are not compatible with `Notification` respectively `Scheduler`. +For instance, when materializing an `Observable[T]` in Scala to an `Observable[Notification[T]]`, +we lost the seamless interop with `Observable>` on the Java side. + +However, the real problems with seamless interop show up when we try to creating bindings for other Rx types. +In particular types that have inheritance or more structure. + +For example, RxScala currently defines a type synonym `type Observer[-T] = rx.Observer[_ >: T]`, +but no further bindings for observers. +Similarly, for subjects RxScala defines `type Subject[-T, +R] = rx.subjects.Subject[_ >: T, _ <: R]`. +The problem with these definitions is that on the Java side, subjects are defined as: +```scala +public abstract class Subject extends Observable implements Observer { …} +``` +without binding any of the Rx subjects. + +The consequence is that `Subject[S,T]` in Scala is unrelated to `rx.lang.scala.Observable[T]` in Scala, +but shows up as a `rx.Observable[T]`. The problem however is that if we want to expose subjects in Scala +such that they derive from both `Observable[S]` and `Observer[T]` we cannot use the `extend AnyVal` trick +we used for `Observable[T]` and immediately lose transparent interop with Java. + +The problem is even worse because `AsyncSubject`, `BehaviorSubject`, … all derive from `Subject`, +so if we want them to derive from a common base `Subject[T,T]` type in Scala we lose transparency for those as well. +And again, if we expose the various subjects by extending `AnyVal`, they are useless in Scala because they do not inherit +from a common base type. To avoid implementing all methods of observable and observer on each specific subject +we might add implicit conversions to `Observable[T]` and `Observer[T]` but that still does not give Scala users +a native `Subject[S,T]` type. +```scala +object AsyncSubject { + def apply[T](): AsyncSubject[T] = + new AsyncSubject[T](rx.subjects.AsyncSubject.create()) +} + +class AsyncSubject[T] private [scala] (val inner: rx.subjects.AsyncSubject[T]) + extends AnyVal +{ … } + +implicit final def asObservable[T](subject: AsyncSubject[T]): Observable[T] = + Observable(subject.inner) + +implicit final def asObserver[T](subject: AsyncSubject[T]): Observer[T] = + subject.inner +``` +The inheritance problem is not just limited to subjects, but also surfaces for subscriptions. +Rx scala currently defines `type Subscription = rx.Subscription` using a type synonym as well, +and we run into exactly the same problems as with subjects when we try to bind the +various Rx subscriptions `BooleanSubscription`, `SerialSubscription`, etc. + +Since we cannot wrap Rx types in Scala such that they are both (a) transparently interoperable with Java, +and (b) feel native and idiomatic to Scala, we should decide in favor of optimizing RxScala for Scala +and consumption of Rx values from Java but not for Scala as a producer. + +If we take that approach, we can make bindings that feels like a completely native Scala library, +without needing any complications of the Scala side. +```scala +object Observer { …} +trait Observable[+T] { + def asJavaObservable: rx.Observable[_ <: T] +} + +object Observer {…} +trait Observer[-T] { + def asJavaObserver: rx.Observer[_ >: T] +} + +object Subject {…} +trait Subject[-T, +R] extends Observable[R] with Observer[T] { + val asJavaSubject: rx.subjects.Subject[_ >: T, _<: R] +} + +object Scheduler {…} +trait Scheduler { + def asJavaScheduler: rx.Scheduler; +} + +object Notification {…} +trait Notification[+T] { + def asJavaNotification: rx.Notification[_ <: T] +} + +object Subscription {…} +trait Subscription { + def asJavaSubscription: rx.Subscription +} +``` +You pay the price when crossing the Scala/Java interop boundary, which is where it should be. +The proper way is to put the burden of interop on the Scala side, in case you want to create +a reusable Rx-based library in Scala, or wrap and unwrap on the Java side. +```java +public static void main(String[] args) { + + Observable movies = Observable.from(new Movie(3000), new Movie(1000), new Movie(2000)); + MovieLib lib = new MovieLib(toScalaObservable(movies)); + lib.longMovies().asJavaObservable().subscribe(m -> + System.out.println("A movie of length " + m.lengthInSeconds() + "s") + ); +} +``` +Delegation versus Inheritance +----------------------------- +The obvious thought is that using delegation instead of inheritance (http://c2.com/cgi/wiki?DelegationIsInheritance) +will lead to excessive wrapping, since all Scala types wrap and delegate to an underlying RxJava implementation. +Note however, that the wrapping happens at query generation time and incurs no overhead when messages are flowing +through the pipeline. Say we have a query `xs.map(f).filter(p).subscribe(o)`. Even though the Scala types are wrappers, +the callback that is registered with xs is something like `x => { val y = f(x); if(p(y)){ o.asJavaObserver.onNext(y) }}` +and hence there is no additional runtime penalty. \ No newline at end of file diff --git a/language-adaptors/rxjava-scala/TODO.md b/language-adaptors/rxjava-scala/TODO.md index 60169c9a816..f8965cc498c 100644 --- a/language-adaptors/rxjava-scala/TODO.md +++ b/language-adaptors/rxjava-scala/TODO.md @@ -8,9 +8,7 @@ TODOs which came up at the meeting with Erik Meijer on 2013-10-11: * Rename the factory methods in `object Observable`, considering that the most important is the one taking an `Observer => Subscription` function (the "king" according to Erik). Thunk to Subscription conversion (?), also consider Jason's [comments](https://github.com/Netflix/RxJava/commit/c1596253fc5567b7cc37d20128374d189471ff79). A useful trick might also be to have `apply(T, T, T*)` instead of just `apply(T*)`. * Factory methods for observables and instance methods should take implicit scheduler, default is different per method (Isn't this a contradiction? In other words: If I call a method without providing a scheduler, should the default scheduler be used or the implicit that I provided?) Find in .NET source the list of which schedulers goes with which operators by default. If no other default, use NewThreadScheduler. Note that there are methods in Scala Observable which should have an overload taking a Scheduler, but do not yet have it! Also remember Erik saying that he would like to "minimize magically injected concurrency". -* Bring `BooleanSubscription`, `CompositeSubscription`, `MultipleAssignmentSubscription` to Scala, `compositeSubscription.subscription = ...`instead of setter method, add on `CompositeSubscription` should be `+=` * Convert executor to scheduler -* Java Scheduler methods take `state` arguments (they were needed to schedule work on a different machine, but are now considered a bad idea). Remove these `state` arguments from all Scala schedulers. * Check if TestScheduler added in 0.14.3 is sufficient * Infinite Iterables: the java Observable.from version unfolds an iterable, even it is infinite. Should be fixed in java. * subscribe methods: There are many overloads, but still not all combinations one might need. Make this nicer and complete, maybe using default arguments. Also try to make sure that `subscribe(println)` works, not only `subscribe(println(_))`. `foreach(println)` works on collections, but not on `subscribe(println)`, because subscribe is overloaded. @@ -18,9 +16,6 @@ TODOs which came up at the meeting with Erik Meijer on 2013-10-11: * There are no examples yet using `async`, but `async` will be used in the course. Write examples and check if everything works as expected when combined with `async`. * Futures: For the moment, just add a Future->Observable converter method to `object Observable`. Later, think if `Future[T] extends Observable[T]`. * Operator `delay`: Once Erik has commented on [this](https://github.com/Netflix/RxJava/pull/384), make sure this operator is added accordingly to RxJava and then to RxScala -* add wrappers or aliases for `AsyncSubject`, `BehaviorSubject`, `PublishSubject`, and `ReplaySubject` -* go through Erik's code that he showed at the meeting and check if everything can now be done nicely -* get Erik's slides from the course and check if they are compatible with the library Some more TODOs: diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/examples/RxJavaDemos.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala similarity index 97% rename from language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/examples/RxJavaDemos.scala rename to language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index 130c9f2aa8f..4a8ee563457 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/examples/RxJavaDemos.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -15,19 +15,25 @@ */ package rx.lang.scala.examples -import org.scalatest.junit.JUnitSuite -import rx.lang.scala._ -import scala.concurrent.duration._ +import java.io.IOException + +import scala.concurrent.duration.Duration +import scala.concurrent.duration.DurationInt +import scala.concurrent.duration.DurationLong +import scala.language.postfixOps + +import org.junit.Assert.assertEquals +import org.junit.Assert.assertTrue +import org.junit.Ignore import org.junit.Test -import org.junit.Assert._ +import org.scalatest.junit.JUnitSuite + +import rx.lang.scala.Notification +import rx.lang.scala.Observable +import rx.lang.scala.observable import rx.lang.scala.concurrency.Schedulers -import java.io.IOException -import rx.lang.scala.examples.Olympics -import rx.lang.scala.Notification.OnCompleted -import rx.lang.scala.Notification.OnError -import rx.lang.scala.Notification.OnNext -//@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily +@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily class RxScalaDemo extends JUnitSuite { @Test def intervalExample() { diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/TestSchedulerExample.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/TestSchedulerExample.scala new file mode 100644 index 00000000000..df588c96894 --- /dev/null +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/TestSchedulerExample.scala @@ -0,0 +1,45 @@ +package rx.lang.scala.examples + +import org.junit.Test +import org.scalatest.junit.JUnitSuite +import scala.concurrent.duration._ +import scala.language.postfixOps +import rx.lang.scala.{ Observable, Observer } +import rx.lang.scala.concurrency.TestScheduler + +class TestSchedulerExample extends JUnitSuite { + + @Test def testInterval() { + import org.mockito.Matchers._ + import org.mockito.Mockito._ + + val scheduler = TestScheduler() + // Use a Java Observer for Mockito + val observer = mock(classOf[rx.Observer[Long]]) + + val o = Observable.interval(1 second, scheduler) + + // Wrap Java Observer in Scala Observer, then subscribe + val sub = o.subscribe(Observer(observer)) + + verify(observer, never).onNext(0L) + verify(observer, never).onCompleted() + verify(observer, never).onError(any(classOf[Throwable])) + + scheduler.advanceTimeTo(2 seconds) + + val inOrdr = inOrder(observer); + inOrdr.verify(observer, times(1)).onNext(0L) + inOrdr.verify(observer, times(1)).onNext(1L) + inOrdr.verify(observer, never).onNext(2L) + verify(observer, never).onCompleted() + verify(observer, never).onError(any(classOf[Throwable])) + + sub.unsubscribe(); + scheduler.advanceTimeTo(4 seconds) + verify(observer, never).onNext(2L) + verify(observer, times(1)).onCompleted() + verify(observer, never).onError(any(classOf[Throwable])) + } + +} diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala index 458f5fedbc6..6a64c858e85 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala @@ -20,11 +20,8 @@ import java.{ lang => jlang } import rx.lang.scala._ import rx.util.functions._ import scala.collection.Seq -import rx.lang.scala.subscriptions.Subscription import java.{lang => jlang} import scala.language.implicitConversions -import rx.lang.scala.Observer -import rx.lang.scala.Scheduler /** * These function conversions convert between Scala functions and Rx `Func`s and `Action`s. diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 0d8cbb71668..68f7530dec2 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -16,14 +16,8 @@ package rx.lang.scala - import rx.util.functions.FuncN import rx.Observable.OnSubscribeFunc -import rx.lang.scala.Notification -import rx.lang.scala.ImplicitFunctionConversions -import rx.lang.scala.Observer -import rx.lang.scala.Scheduler - /** * The Observable interface that implements the Reactive Pattern. @@ -1820,9 +1814,9 @@ trait Observable[+T] Observable[java.lang.Boolean](asJavaObservable.isEmpty).map(_.booleanValue()) } - //def withFilter(p: T => Boolean): WithFilter[T] = { - // new WithFilter[T](p, asJava) - //} + def withFilter(p: T => Boolean): WithFilter[T] = { + new WithFilter[T](p, asJavaObservable) + } } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala index 8060dbaa36b..2142b3d4024 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala @@ -5,7 +5,6 @@ import scala.concurrent.duration.Duration import ImplicitFunctionConversions.scalaFunction0ProducingUnitToAction0 import ImplicitFunctionConversions.schedulerActionToFunc2 import rx.util.functions.{Action0, Action1, Func2} -import rx.lang.scala.subscriptions.Subscription /** * Represents an object thatimport rx.lang.scala.ImplicitFunctionConversions @@ -163,7 +162,7 @@ trait Scheduler { def scheduleRec(work: (=>Unit)=>Unit): Subscription = { Subscription(asJavaScheduler.schedule(new Action1[Action0] { def call(t1: Action0){ - work{ t1 } + work{ t1.call() } } })) //action1[action0] diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscription.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscription.scala new file mode 100644 index 00000000000..3de9b8262b2 --- /dev/null +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscription.scala @@ -0,0 +1,78 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.lang.scala + +/** + * Subscriptions are returned from all `Observable.subscribe` methods to allow unsubscribing. + * + * This interface is the equivalent of `IDisposable` in the .NET Rx implementation. + */ +trait Subscription { + + val asJavaSubscription: rx.Subscription + + /** + * Call this method to stop receiving notifications on the Observer that was registered when + * this Subscription was received. + */ + def unsubscribe(): Unit = asJavaSubscription.unsubscribe() + + /** + * Checks if the subscription is unsubscribed. + */ + def isUnsubscribed: Boolean +} + +object Subscription { + import java.util.concurrent.atomic.AtomicBoolean + import rx.lang.scala.subscriptions._ + + /** + * Creates an [[rx.lang.scala.Subscription]] from an [[rx.Subscription]]. + */ + def apply(subscription: rx.Subscription): Subscription = { + subscription match { + case x: rx.subscriptions.BooleanSubscription => new BooleanSubscription(x) + case x: rx.subscriptions.CompositeSubscription => new CompositeSubscription(x) + case x: rx.subscriptions.MultipleAssignmentSubscription => new MultipleAssignmentSubscription(x) + case x: rx.subscriptions.SerialSubscription => new SerialSubscription(x) + case x: rx.Subscription => Subscription { x.unsubscribe() } + } + } + + /** + * Creates an [[rx.lang.scala.Subscription]] that invokes the specified action when unsubscribed. + */ + def apply(u: => Unit): Subscription = { + new Subscription() { + + private val unsubscribed = new AtomicBoolean(false) + def isUnsubscribed = unsubscribed.get() + + val asJavaSubscription = new rx.Subscription { + def unsubscribe() { u; unsubscribed.set(true) } + } + } + } + +} + + + + + + diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/WithFilter.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/WithFilter.scala index 61b25b87842..d8524825bac 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/WithFilter.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/WithFilter.scala @@ -1,7 +1,5 @@ package rx.lang.scala -import rx.lang.scala.ImplicitFunctionConversions - import ImplicitFunctionConversions.scalaBooleanFunction1ToRxBooleanFunc1 import ImplicitFunctionConversions.scalaFunction1ToRxFunc1 diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/TestScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/TestScheduler.scala index a76d95f0969..e2873f9a295 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/TestScheduler.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/TestScheduler.scala @@ -2,7 +2,6 @@ package rx.lang.scala.concurrency import scala.concurrent.duration.Duration import rx.lang.scala.Scheduler -//import org.scalatest.junit.JUnitSuite /** * Scheduler with artificial time, useful for testing. @@ -66,40 +65,3 @@ object TestScheduler { } } -//private class UnitTest extends JUnitSuite { -// import org.junit.Test -// import scala.concurrent.duration._ -// import scala.language.postfixOps -// import rx.lang.scala.{Observable, Observer} -// -// @Test def testInterval() { -// import org.mockito.Matchers._ -// import org.mockito.Mockito._ -// -// val scheduler = TestScheduler() -// val observer = mock(classOf[rx.Observer[Long]]) -// -// val o = Observable.interval(1 second, scheduler) -// val sub = o.subscribe(observer) -// -// verify(observer, never).onNext(0L) -// verify(observer, never).onCompleted() -// verify(observer, never).onError(any(classOf[Throwable])) -// -// scheduler.advanceTimeTo(2 seconds) -// -// val inOrdr = inOrder(observer); -// inOrdr.verify(observer, times(1)).onNext(0L) -// inOrdr.verify(observer, times(1)).onNext(1L) -// inOrdr.verify(observer, never).onNext(2L) -// verify(observer, never).onCompleted() -// verify(observer, never).onError(any(classOf[Throwable])) -// -// sub.unsubscribe(); -// scheduler.advanceTimeTo(4 seconds) -// verify(observer, never).onNext(2L) -// verify(observer, times(1)).onCompleted() -// verify(observer, never).onError(any(classOf[Throwable])) -// } -//} - diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/package.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/package.scala new file mode 100644 index 00000000000..a3e61c00219 --- /dev/null +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/package.scala @@ -0,0 +1,31 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.scala + +import rx.concurrency.CurrentThreadScheduler + +/** + * Provides schedulers. + */ +package object concurrency { + + // These classes are not exposed to Scala users, but are accessible through rx.lang.scala.concurrency.Schedulers: + + // rx.concurrency.CurrentThreadScheduler + // rx.concurrency.ExecutorScheduler + // rx.concurrency.ImmediateScheduler + // rx.concurrency.NewThreadScheduler +} diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/package.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/package.scala new file mode 100644 index 00000000000..8507f0a54cc --- /dev/null +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/package.scala @@ -0,0 +1,12 @@ +package rx.lang.scala + +/** + * Contains special Observables. + * + * In Scala, this package only contains [[BlockingObservable]]. + * In the corresponding Java package `rx.observables`, there is also a + * `GroupedObservable` and a `ConnectableObservable`, but these are not needed + * in Scala, because we use a pair `(key, observable)` instead of `GroupedObservable` + * and a pair `(startFunction, observable)` instead of `ConnectableObservable`. + */ +package object observables {} diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala new file mode 100644 index 00000000000..0809e1fb2b3 --- /dev/null +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala @@ -0,0 +1,48 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang + +import java.util.concurrent.TimeUnit +import java.util.Date + +/** + * This package contains all classes that RxScala users need. + * + * It mirrors the structure of package `rx`, but implementation classes that RxScala users + * will not need are left out. + */ +package object scala { + + /** + * Allows to construct observables in a similar way as futures. + * + * Example: + * + * {{{ + * implicit val scheduler = Schedulers.threadPoolForIO + * val o: Observable[List[Friend]] = observable { + * session.getFriends + * } + * o.subscribe( + * friendList => println(friendList), + * err => println(err.getMessage) + * ) + * }}} + */ + def observable[T](body: => T)(implicit scheduler: Scheduler): Observable[T] = { + Observable(1).observeOn(scheduler).map(_ => body) + } +} diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/PublishSubject.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/PublishSubject.scala index 93989bfe32b..a5fd50af29b 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/PublishSubject.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/PublishSubject.scala @@ -3,7 +3,7 @@ package rx.lang.scala.subjects import rx.lang.scala.Subject object PublishSubject { - def apply[T](value: T): PublishSubject[T] = { + def apply[T](): PublishSubject[T] = { new PublishSubject[T](rx.subjects.PublishSubject.create()) } } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/Subject.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/Subject.scala index 5631b1bdea2..cb92df90d91 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/Subject.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/Subject.scala @@ -1,7 +1,5 @@ package rx.lang.scala -import rx.lang.scala.Observer - /** * A Subject is an Observable and an Observer at the same time. */ diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/package.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/package.scala new file mode 100644 index 00000000000..cf7db56f11f --- /dev/null +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/package.scala @@ -0,0 +1,6 @@ +package rx.lang.scala + +/** + * Subjects are Observers and Observables at the same time. + */ +package object subjects {} diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/MultiAssignmentSubscription.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/MultiAssignmentSubscription.scala index 1cdd485d37f..79557668104 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/MultiAssignmentSubscription.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/MultiAssignmentSubscription.scala @@ -24,7 +24,7 @@ object MultipleAssignmentSubscription { /** - * Represents a [[rx.lang.scala.subscriptions.Subscription]] whose underlying subscription can be swapped for another subscription. + * Represents a [[rx.lang.scala.Subscription]] whose underlying subscription can be swapped for another subscription. */ class MultipleAssignmentSubscription private[scala] (val asJavaSubscription: rx.subscriptions.MultipleAssignmentSubscription) extends Subscription { diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/SerialSubscription.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/SerialSubscription.scala index 8ca4083c8d3..c54a39b58d6 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/SerialSubscription.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/SerialSubscription.scala @@ -29,17 +29,17 @@ object SerialSubscription { class SerialSubscription private[scala] (val asJavaSubscription: rx.subscriptions.SerialSubscription) extends Subscription { - private val _isUnsubscribed = new AtomicBoolean(false) + private val unsubscribed = new AtomicBoolean(false) /** * Checks whether the subscription has been unsubscribed. */ - def isUnsubscribed: Boolean = _isUnsubscribed.get() + def isUnsubscribed: Boolean = unsubscribed.get() /** * Unsubscribes this subscription, setting isUnsubscribed to true. */ - override def unsubscribe(): Unit = { super.unsubscribe(); _isUnsubscribed.set(true) } + override def unsubscribe(): Unit = { super.unsubscribe(); unsubscribed.set(true) } def subscription_=(value: Subscription): Unit = asJavaSubscription.setSubscription(value.asJavaSubscription) def subscription: Subscription = Subscription(asJavaSubscription.getSubscription) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/Subscription.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/Subscription.scala deleted file mode 100644 index ec4e7b57040..00000000000 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/Subscription.scala +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package rx.lang.scala { - - /** - * Subscriptions are returned from all `Observable.subscribe` methods to allow unsubscribing. - * - * This interface is the equivalent of `IDisposable` in the .NET Rx implementation. - */ - trait Subscription { - - val asJavaSubscription: rx.Subscription - - /** - * Call this method to stop receiving notifications on the Observer that was registered when - * this Subscription was received. - */ - def unsubscribe(): Unit = asJavaSubscription.unsubscribe() - - /** - * Checks if the subscription is unsubscribed. - */ - def isUnsubscribed: Boolean - } -} - -package rx.lang.scala.subscriptions { - -import rx.lang.scala.Subscription -import java.util.concurrent.atomic.AtomicBoolean - - -object Subscription { - - /** - * Creates an [[rx.lang.scala.Subscription]] from an[[rx.Subscription]]. - */ - def apply(subscription: rx.Subscription): Subscription = { - subscription match { - case x: rx.subscriptions.BooleanSubscription => new BooleanSubscription(x) - case x: rx.subscriptions.CompositeSubscription => new CompositeSubscription(x) - case x: rx.subscriptions.MultipleAssignmentSubscription => new MultipleAssignmentSubscription(x) - case x: rx.subscriptions.SerialSubscription => new SerialSubscription(x) - case x: rx.Subscription => Subscription { x.unsubscribe() } - } - } - - /** - * Creates an [[rx.lang.scala.Subscription]] that invokes the specified action when unsubscribed. - */ - def apply(u: => Unit): Subscription = { - new Subscription () { - - private val _isUnsubscribed = new AtomicBoolean(false) - def isUnsubscribed = _isUnsubscribed.get() - - val asJavaSubscription = new rx.Subscription { - def unsubscribe() { u; _isUnsubscribed.set(true) } - } - } - } - } -} - - - - - - diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/package.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/package.scala new file mode 100644 index 00000000000..4662cb9ccb8 --- /dev/null +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/package.scala @@ -0,0 +1,6 @@ +package rx.lang.scala + +/** + * Provides `Subscription`, and specialized versions of it. + */ +package object subscriptions {} diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/scala.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/scala.scala deleted file mode 100644 index d0c7fa1761c..00000000000 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/scala.scala +++ /dev/null @@ -1,25 +0,0 @@ -package rx.lang - -import rx.lang.scala.Scheduler -package object scala { - - /** - * Allows to construct observables in a similar way as futures. - * - * Example: - * - * {{{ - * implicit val scheduler = Schedulers.threadPoolForIO - * val o: Observable[List[Friend]] = observable { - * session.getFriends - * } - * o.subscribe( - * friendList => println(friendList), - * err => println(err.getMessage) - * ) - * }}} - */ - def observable[T](body: => T)(implicit scheduler: Scheduler): Observable[T] = { - Observable(1).observeOn(scheduler).map(_ => body) - } -} diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/util/util.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/util/package.scala similarity index 67% rename from language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/util/util.scala rename to language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/util/package.scala index fe58b2eeb44..ed19d849ab4 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/util/util.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/util/package.scala @@ -16,31 +16,31 @@ package rx.lang.scala /** - * Provides [[rx.lang.scala.util.Opening]]s, [[rx.lang.scala.util.Closing]]s, and [[rx.util.Timestamped]]. + * Provides [[Opening]]s, [[Closing]]s, and [[Timestamped]]. */ package object util { /** * Tagging interface for objects which can open buffers. - * @see [[rx.lang.scala.Observable `Observable.buffer(Observable[Opening], Opening => Observable[Closing])`]] + * @see [[Observable `Observable.buffer(Observable[Opening], Opening => Observable[Closing])`]] */ type Opening = rx.util.Opening /** * Creates an object which can open buffers. - * @see [[rx.lang.scala.Observable `Observable.buffer(Observable[Opening], Opening => Observable[Closing])`]] + * @see [[Observable `Observable.buffer(Observable[Opening], Opening => Observable[Closing])`]] */ def Opening() = rx.util.Openings.create() /** * Tagging interface for objects which can close buffers. - * @see [[rx.lang.scala.Observable `Observable.buffer(Observable[Opening], Opening => Observable[Closing])`]] + * @see [[Observable `Observable.buffer(Observable[Opening], Opening => Observable[Closing])`]] */ type Closing = rx.util.Closing /** * Creates an object which can close buffers. - * @see [[rx.lang.scala.Observable `Observable.buffer(Observable[Opening], Opening => Observable[Closing])`]] + * @see [[Observable `Observable.buffer(Observable[Opening], Opening => Observable[Closing])`]] */ def Closing() = rx.util.Closings.create() diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala new file mode 100644 index 00000000000..f38ac0d5219 --- /dev/null +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -0,0 +1,354 @@ +package rx.lang.scala + +import java.util.Calendar + +import scala.collection.SortedMap +import scala.reflect.runtime.universe +import scala.reflect.runtime.universe.Symbol +import scala.reflect.runtime.universe.Type +import scala.reflect.runtime.universe.typeOf + +import org.junit.Ignore +import org.junit.Test +import org.scalatest.junit.JUnitSuite + +/** + * These tests can be used to check if all methods of the Java Observable have a corresponding + * method in the Scala Observable. + * + * These tests don't contain any assertions, so they will always succeed, but they print their + * results to stdout. + */ +class CompletenessTest extends JUnitSuite { + + // some frequently used comments: + val unnecessary = "[considered unnecessary in Scala land]" + val deprecated = "[deprecated in RxJava]" + val averageProblem = "[We can't have a general average method because Scala's `Numeric` does not have " + + "scalar multiplication (we would need to calculate `(1.0/numberOfElements)*sum`). " + + "You can use `fold` instead to accumulate `sum` and `numberOfElements` and divide at the end.]" + val commentForFirstWithPredicate = "[use `.filter(condition).first`]" + val fromFuture = "[TODO: Decide how Scala Futures should relate to Observables. Should there be a " + + "common base interface for Future and Observable? And should Futures also have an unsubscribe method?]" + + /** + * Maps each method from the Java Observable to its corresponding method in the Scala Observable + */ + val correspondence = defaultMethodCorrespondence ++ correspondenceChanges // ++ overrides LHS with RHS + + /** + * Creates default method correspondence mappings, assuming that Scala methods have the same + * name and the same argument types as in Java + */ + def defaultMethodCorrespondence: Map[String, String] = { + val allMethods = getPublicInstanceAndCompanionMethods(typeOf[rx.Observable[_]]) + val tuples = for (javaM <- allMethods) yield (javaM, javaMethodSignatureToScala(javaM)) + tuples.toMap + } + + /** + * Manually added mappings from Java Observable methods to Scala Observable methods + */ + def correspondenceChanges = Map( + // manually added entries for Java instance methods + "aggregate(Func2[T, T, T])" -> "reduce((U, U) => U)", + "aggregate(R, Func2[R, _ >: T, R])" -> "foldLeft(R)((R, T) => R)", + "all(Func1[_ >: T, Boolean])" -> "forall(T => Boolean)", + "buffer(Long, Long, TimeUnit)" -> "buffer(Duration, Duration)", + "buffer(Long, Long, TimeUnit, Scheduler)" -> "buffer(Duration, Duration, Scheduler)", + "count()" -> "length", + "dematerialize()" -> "dematerialize(<:<[Observable[T], Observable[Notification[U]]])", + "elementAt(Int)" -> "[use `.drop(index).first`]", + "elementAtOrDefault(Int, T)" -> "[use `.drop(index).firstOrElse(default)`]", + "first(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate, + "firstOrDefault(T)" -> "firstOrElse(=> U)", + "firstOrDefault(Func1[_ >: T, Boolean], T)" -> "[use `.filter(condition).firstOrElse(default)`]", + "groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]", + "mapMany(Func1[_ >: T, _ <: Observable[_ <: R]])" -> "flatMap(T => Observable[R])", + "mapWithIndex(Func2[_ >: T, Integer, _ <: R])" -> "[combine `zipWithIndex` with `map` or with a for comprehension]", + "onErrorResumeNext(Func1[Throwable, _ <: Observable[_ <: T]])" -> "onErrorResumeNext(Throwable => Observable[U])", + "onErrorResumeNext(Observable[_ <: T])" -> "onErrorResumeNext(Observable[U])", + "onErrorReturn(Func1[Throwable, _ <: T])" -> "onErrorReturn(Throwable => U)", + "onExceptionResumeNext(Observable[_ <: T])" -> "onExceptionResumeNext(Observable[U])", + "parallel(Func1[Observable[T], Observable[R]])" -> "parallel(Observable[T] => Observable[R])", + "parallel(Func1[Observable[T], Observable[R]], Scheduler)" -> "parallel(Observable[T] => Observable[R], Scheduler)", + "reduce(Func2[T, T, T])" -> "reduce((U, U) => U)", + "reduce(R, Func2[R, _ >: T, R])" -> "foldLeft(R)((R, T) => R)", + "scan(Func2[T, T, T])" -> unnecessary, + "scan(R, Func2[R, _ >: T, R])" -> "scan(R)((R, T) => R)", + "skip(Int)" -> "drop(Int)", + "skipWhile(Func1[_ >: T, Boolean])" -> "dropWhile(T => Boolean)", + "skipWhileWithIndex(Func2[_ >: T, Integer, Boolean])" -> unnecessary, + "startWith(Iterable[T])" -> "[unnecessary because we can just use `++` instead]", + "takeFirst()" -> "first", + "takeFirst(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate, + "takeLast(Int)" -> "takeRight(Int)", + "takeWhileWithIndex(Func2[_ >: T, _ >: Integer, Boolean])" -> "[use `.zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1)`]", + "toList()" -> "toSeq", + "toSortedList()" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]", + "toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]", + "where(Func1[_ >: T, Boolean])" -> "filter(T => Boolean)", + "window(Long, Long, TimeUnit)" -> "window(Duration, Duration)", + "window(Long, Long, TimeUnit, Scheduler)" -> "window(Duration, Duration, Scheduler)", + + // manually added entries for Java static methods + "average(Observable[Integer])" -> averageProblem, + "averageDoubles(Observable[Double])" -> averageProblem, + "averageFloats(Observable[Float])" -> averageProblem, + "averageLongs(Observable[Long])" -> averageProblem, + "create(OnSubscribeFunc[T])" -> "apply(Observer[T] => Subscription)", + "combineLatest(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "combineLatest(Observable[U])", + "concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])", + "defer(Func0[_ <: Observable[_ <: T]])" -> "defer(=> Observable[T])", + "empty()" -> "apply(T*)", + "error(Throwable)" -> "apply(Throwable)", + "from(Array[T])" -> "apply(T*)", + "from(Iterable[_ <: T])" -> "apply(T*)", + "from(Future[_ <: T])" -> fromFuture, + "from(Future[_ <: T], Long, TimeUnit)" -> fromFuture, + "from(Future[_ <: T], Scheduler)" -> fromFuture, + "just(T)" -> "apply(T*)", + "merge(Observable[_ <: T], Observable[_ <: T])" -> "merge(Observable[U])", + "merge(Observable[_ <: Observable[_ <: T]])" -> "flatten(<:<[Observable[T], Observable[Observable[U]]])", + "mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[U])", + "mergeDelayError(Observable[_ <: Observable[_ <: T]])" -> "flattenDelayError(<:<[Observable[T], Observable[Observable[U]]])", + "range(Int, Int)" -> "apply(Range)", + "sequenceEqual(Observable[_ <: T], Observable[_ <: T])" -> "[use `(first zip second) map (p => p._1 == p._2)`]", + "sequenceEqual(Observable[_ <: T], Observable[_ <: T], Func2[_ >: T, _ >: T, Boolean])" -> "[use `(first zip second) map (p => equality(p._1, p._2))`]", + "sum(Observable[Integer])" -> "sum(Numeric[U])", + "sumDoubles(Observable[Double])" -> "sum(Numeric[U])", + "sumFloats(Observable[Float])" -> "sum(Numeric[U])", + "sumLongs(Observable[Long])" -> "sum(Numeric[U])", + "synchronize(Observable[T])" -> "synchronize", + "switchDo(Observable[_ <: Observable[_ <: T]])" -> deprecated, + "switchOnNext(Observable[_ <: Observable[_ <: T]])" -> "switch(<:<[Observable[T], Observable[Observable[U]]])", + "zip(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "[use instance method `zip` and `map`]", + "zip(Observable[_ <: Observable[_]], FuncN[_ <: R])" -> "[use `zip` in companion object and `map`]", + "zip(Iterable[_ <: Observable[_]], FuncN[_ <: R])" -> "[use `zip` in companion object and `map`]" + ) ++ List.iterate("T", 9)(s => s + ", T").map( + // all 9 overloads of startWith: + "startWith(" + _ + ")" -> "[unnecessary because we can just use `++` instead]" + ).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map( + // concat 2-9 + "concat(" + _ + ")" -> "[unnecessary because we can use `++` instead or `Observable(o1, o2, ...).concat`]" + ).drop(1).toMap ++ List.iterate("T", 10)(s => s + ", T").map( + // all 10 overloads of from: + "from(" + _ + ")" -> "apply(T*)" + ).toMap ++ (3 to 9).map(i => { + // zip3-9: + val obsArgs = (1 to i).map(j => s"Observable[_ <: T$j], ").mkString("") + val funcParams = (1 to i).map(j => s"_ >: T$j, ").mkString("") + ("zip(" + obsArgs + "Func" + i + "[" + funcParams + "_ <: R])", unnecessary) + }).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map( + // merge 3-9: + "merge(" + _ + ")" -> "[unnecessary because we can use `Observable(o1, o2, ...).flatten` instead]" + ).drop(2).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map( + // mergeDelayError 3-9: + "mergeDelayError(" + _ + ")" -> "[unnecessary because we can use `Observable(o1, o2, ...).flattenDelayError` instead]" + ).drop(2).toMap ++ (3 to 9).map(i => { + // combineLatest 3-9: + val obsArgs = (1 to i).map(j => s"Observable[_ <: T$j], ").mkString("") + val funcParams = (1 to i).map(j => s"_ >: T$j, ").mkString("") + ("combineLatest(" + obsArgs + "Func" + i + "[" + funcParams + "_ <: R])", "[If C# doesn't need it, Scala doesn't need it either ;-)]") + }).toMap + + def removePackage(s: String) = s.replaceAll("(\\w+\\.)+(\\w+)", "$2") + + def methodMembersToMethodStrings(members: Iterable[Symbol]): Iterable[String] = { + for (member <- members; alt <- member.asTerm.alternatives) yield { + val m = alt.asMethod + // multiple parameter lists in case of curried functions + val paramListStrs = for (paramList <- m.paramss) yield { + paramList.map( + symb => removePackage(symb.typeSignature.toString.replaceAll(",(\\S)", ", $1")) + ).mkString("(", ", ", ")") + } + val name = alt.asMethod.name.decoded + name + paramListStrs.mkString("") + } + } + + def getPublicInstanceMethods(tp: Type): Iterable[String] = { + // declarations: => only those declared in Observable + // members => also those of superclasses + methodMembersToMethodStrings(tp.declarations.filter(m => m.isMethod && m.isPublic)) + // TODO how can we filter out instance methods which were put into companion because + // of extends AnyVal in a way which does not depend on implementation-chosen name '$extension'? + .filter(! _.contains("$extension")) + } + + // also applicable for Java types + def getPublicInstanceAndCompanionMethods(tp: Type): Iterable[String] = + getPublicInstanceMethods(tp) ++ + getPublicInstanceMethods(tp.typeSymbol.companionSymbol.typeSignature) + + def printMethodSet(title: String, tp: Type) { + println("\n" + title) + println(title.map(_ => '-') + "\n") + getPublicInstanceMethods(tp).toList.sorted.foreach(println(_)) + } + + @Ignore // because spams output + @Test def printJavaInstanceMethods: Unit = { + printMethodSet("Instance methods of rx.Observable", + typeOf[rx.Observable[_]]) + } + + @Ignore // because spams output + @Test def printScalaInstanceMethods: Unit = { + printMethodSet("Instance methods of rx.lang.scala.Observable", + typeOf[rx.lang.scala.Observable[_]]) + } + + @Ignore // because spams output + @Test def printJavaStaticMethods: Unit = { + printMethodSet("Static methods of rx.Observable", + typeOf[rx.Observable[_]].typeSymbol.companionSymbol.typeSignature) + } + + @Ignore // because spams output + @Test def printScalaCompanionMethods: Unit = { + printMethodSet("Companion methods of rx.lang.scala.Observable", + typeOf[rx.lang.scala.Observable.type]) + } + + def javaMethodSignatureToScala(s: String): String = { + s.replaceAllLiterally("Long, TimeUnit", "Duration") + .replaceAll("Action0", "() => Unit") + // nested [] can't be parsed with regex, so these will have to be added manually + .replaceAll("Action1\\[([^]]*)\\]", "$1 => Unit") + .replaceAll("Action2\\[([^]]*), ([^]]*)\\]", "($1, $2) => Unit") + .replaceAll("Func0\\[([^]]*)\\]", "() => $1") + .replaceAll("Func1\\[([^]]*), ([^]]*)\\]", "$1 => $2") + .replaceAll("Func2\\[([^]]*), ([^]]*), ([^]]*)\\]", "($1, $2) => $3") + .replaceAllLiterally("_ <: ", "") + .replaceAllLiterally("_ >: ", "") + .replaceAll("(\\w+)\\(\\)", "$1") + } + + @Ignore // because spams output + @Test def printDefaultMethodCorrespondence: Unit = { + println("\nDefault Method Correspondence") + println( "-----------------------------\n") + val c = SortedMap(defaultMethodCorrespondence.toSeq : _*) + val len = c.keys.map(_.length).max + 2 + for ((javaM, scalaM) <- c) { + println(s""" %-${len}s -> %s,""".format("\"" + javaM + "\"", "\"" + scalaM + "\"")) + } + } + + @Ignore // because spams output + @Test def printCorrectedMethodCorrespondence: Unit = { + println("\nCorrected Method Correspondence") + println( "-------------------------------\n") + val c = SortedMap(correspondence.toSeq : _*) + for ((javaM, scalaM) <- c) { + println("%s -> %s,".format("\"" + javaM + "\"", "\"" + scalaM + "\"")) + } + } + + def checkMethodPresence(expectedMethods: Iterable[String], tp: Type): Unit = { + val actualMethods = getPublicInstanceAndCompanionMethods(tp).toSet + val expMethodsSorted = expectedMethods.toList.sorted + var good = 0 + var bad = 0 + for (m <- expMethodsSorted) if (actualMethods.contains(m) || m.charAt(0) == '[') { + good += 1 + } else { + bad += 1 + println(s"Warning: $m is NOT present in $tp") + } + val status = if (bad == 0) "SUCCESS" else "BAD" + println(s"$status: $bad out of ${bad+good} methods were not found in $tp") + } + + @Test def checkScalaMethodPresenceVerbose: Unit = { + println("\nTesting that all mentioned Scala methods exist") + println( "----------------------------------------------\n") + + val actualMethods = getPublicInstanceAndCompanionMethods(typeOf[rx.lang.scala.Observable[_]]).toSet + var good = 0 + var bad = 0 + for ((javaM, scalaM) <- SortedMap(correspondence.toSeq :_*)) { + if (actualMethods.contains(scalaM) || scalaM.charAt(0) == '[') { + good += 1 + } else { + bad += 1 + println(s"Warning:") + println(s"$scalaM is NOT present in Scala Observable") + println(s"$javaM is the method in Java Observable generating this warning") + } + } + val status = if (bad == 0) "SUCCESS" else "BAD" + println(s"\n$status: $bad out of ${bad+good} methods were not found in Scala Observable") + } + + def setTodoForMissingMethods(corresp: Map[String, String]): Map[String, String] = { + val actualMethods = getPublicInstanceAndCompanionMethods(typeOf[rx.lang.scala.Observable[_]]).toSet + for ((javaM, scalaM) <- corresp) yield + (javaM, if (actualMethods.contains(scalaM) || scalaM.charAt(0) == '[') scalaM else "[**TODO: missing**]") + } + + @Test def checkJavaMethodPresence: Unit = { + println("\nTesting that all mentioned Java methods exist") + println( "---------------------------------------------\n") + checkMethodPresence(correspondence.keys, typeOf[rx.Observable[_]]) + } + + @Ignore // because we prefer the verbose version + @Test def checkScalaMethodPresence: Unit = { + checkMethodPresence(correspondence.values, typeOf[rx.lang.scala.Observable[_]]) + } + + def scalaToJavaSignature(s: String) = + s.replaceAllLiterally("_ <:", "? extends") + .replaceAllLiterally("_ >:", "? super") + .replaceAllLiterally("[", "<") + .replaceAllLiterally("]", ">") + .replaceAllLiterally("Array", "T[]") + + def escapeJava(s: String) = + s.replaceAllLiterally("<", "<") + .replaceAllLiterally(">", ">") + + @Ignore // because spams output + @Test def printMarkdownCorrespondenceTable() { + def isInteresting(p: (String, String)): Boolean = + p._1.replaceAllLiterally("()", "") != p._2 + def groupingKey(p: (String, String)): (String, String) = + (if (p._1.startsWith("average")) "average" else p._1.takeWhile(_ != '('), p._2) + def formatJavaCol(name: String, alternatives: Iterable[String]): String = { + alternatives.toList.sorted.map(scalaToJavaSignature(_)).map(s => { + if (s.length > 64) { + val toolTip = escapeJava(s) + "" + name + "(...)" + } else { + "`" + s + "`" + } + }).mkString("
") + } + def formatScalaCol(s: String): String = + if (s.startsWith("[") && s.endsWith("]")) s.drop(1).dropRight(1) else "`" + s + "`" + def escape(s: String) = s.replaceAllLiterally("[", "<").replaceAllLiterally("]", ">") + + println(""" +## Comparison of Scala Observable and Java Observable + +Note: +* This table contains both static methods and instance methods. +* If a signature is too long, move your mouse over it to get the full signature. + + +| Java Method | Scala Method | +|-------------|--------------|""") + + val ps = setTodoForMissingMethods(correspondence) + + (for (((javaName, scalaCol), pairs) <- ps.groupBy(groupingKey(_)).toList.sortBy(_._1._1)) yield { + "| " + formatJavaCol(javaName, pairs.map(_._1)) + " | " + formatScalaCol(scalaCol) + " |" + }).foreach(println(_)) + println(s"\nThis table was generated on ${Calendar.getInstance().getTime()}.") + println(s"**Do not edit**. Instead, edit `${getClass().getCanonicalName()}`.") + } + +} diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala new file mode 100644 index 00000000000..d96b23fe43f --- /dev/null +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala @@ -0,0 +1,88 @@ +package rx.lang.scala + +import org.junit.Assert._ +import org.junit.{ Ignore, Test } +import org.scalatest.junit.JUnitSuite + +class ObservableTests extends JUnitSuite { + + // Tests which needn't be run: + + @Ignore + def testCovariance = { + //println("hey, you shouldn't run this test") + + val o1: Observable[Nothing] = Observable() + val o2: Observable[Int] = o1 + val o3: Observable[App] = o1 + val o4: Observable[Any] = o2 + val o5: Observable[Any] = o3 + } + + // Tests which have to be run: + + @Test + def testDematerialize() { + val o = Observable(1, 2, 3) + val mat = o.materialize + val demat = mat.dematerialize + + // correctly rejected: + //val wrongDemat = Observable("hello").dematerialize + + assertEquals(demat.toBlockingObservable.toIterable.toList, List(1, 2, 3)) + } + + // Test that Java's firstOrDefault propagates errors. + // If this changes (i.e. it suppresses errors and returns default) then Scala's firstOrElse + // should be changed accordingly. + @Test def testJavaFirstOrDefault() { + assertEquals(1, rx.Observable.from(1, 2).firstOrDefault(10).toBlockingObservable().single) + assertEquals(10, rx.Observable.empty().firstOrDefault(10).toBlockingObservable().single) + val msg = "msg6251" + var receivedMsg = "none" + try { + rx.Observable.error(new Exception(msg)).firstOrDefault(10).toBlockingObservable().single + } catch { + case e: Exception => receivedMsg = e.getCause().getMessage() + } + assertEquals(receivedMsg, msg) + } + + @Test def testFirstOrElse() { + def mustNotBeCalled: String = sys.error("this method should not be called") + def mustBeCalled: String = "this is the default value" + assertEquals("hello", Observable("hello").firstOrElse(mustNotBeCalled).toBlockingObservable.single) + assertEquals("this is the default value", Observable().firstOrElse(mustBeCalled).toBlockingObservable.single) + } + + @Test def testFirstOrElseWithError() { + val msg = "msg6251" + var receivedMsg = "none" + try { + Observable[Int](new Exception(msg)).firstOrElse(10).toBlockingObservable.single + } catch { + case e: Exception => receivedMsg = e.getCause().getMessage() + } + assertEquals(receivedMsg, msg) + } + + /* + @Test def testHead() { + val observer = mock(classOf[Observer[Int]]) + val o = Observable().head + val sub = o.subscribe(observer) + + verify(observer, never).onNext(any(classOf[Int])) + verify(observer, never).onCompleted() + verify(observer, times(1)).onError(any(classOf[NoSuchElementException])) + } + */ + + @Test def testTest() = { + val a: Observable[Int] = Observable() + assertEquals(4, Observable(1, 2, 3, 4).toBlockingObservable.toIterable.last) + //println("This UnitTestSuite.testTest() for rx.lang.scala.Observable") + } + +} diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/examples/UnitTestSuite.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/examples/UnitTestSuite.scala deleted file mode 100644 index 0e93aeb69e0..00000000000 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/examples/UnitTestSuite.scala +++ /dev/null @@ -1,88 +0,0 @@ -package rx.lang.scala.examples - -import org.junit.{Ignore, Assert, Test} -import org.scalatest.junit.JUnitSuite -import rx.lang.scala.Observable - -class UnitTestSuite extends JUnitSuite { - - // Tests which needn't be run: - -@Ignore -def testCovariance = { - //println("hey, you shouldn't run this test") - - val o1: Observable[Nothing] = Observable() - val o2: Observable[Int] = o1 - val o3: Observable[App] = o1 - val o4: Observable[Any] = o2 - val o5: Observable[Any] = o3 -} - -// Tests which have to be run: - -@Test - def testDematerialize() { - val o = Observable(1, 2, 3) - val mat = o.materialize - val demat = mat.dematerialize - - // correctly rejected: - //val wrongDemat = Observable("hello").dematerialize - - Assert.assertEquals(demat.toBlockingObservable.toIterable.toList, List(1, 2, 3)) -} - -// Test that Java's firstOrDefault propagates errors. -// If this changes (i.e. it suppresses errors and returns default) then Scala's firstOrElse -// should be changed accordingly. -@Test def testJavaFirstOrDefault() { - Assert.assertEquals(1, rx.Observable.from(1, 2).firstOrDefault(10).toBlockingObservable().single) - Assert.assertEquals(10, rx.Observable.empty().firstOrDefault(10).toBlockingObservable().single) - val msg = "msg6251" - var receivedMsg = "none" - try { - rx.Observable.error(new Exception(msg)).firstOrDefault(10).toBlockingObservable().single - } catch { - case e: Exception => receivedMsg = e.getCause().getMessage() - } - Assert.assertEquals(receivedMsg, msg) -} - -@Test def testFirstOrElse() { - def mustNotBeCalled: String = sys.error("this method should not be called") - def mustBeCalled: String = "this is the default value" - Assert.assertEquals("hello", Observable("hello").firstOrElse(mustNotBeCalled).toBlockingObservable.single) - Assert.assertEquals("this is the default value", Observable().firstOrElse(mustBeCalled).toBlockingObservable.single) -} - -@Test def testFirstOrElseWithError() { - val msg = "msg6251" - var receivedMsg = "none" - try { - Observable[Int](new Exception(msg)).firstOrElse(10).toBlockingObservable.single - } catch { - case e: Exception => receivedMsg = e.getCause().getMessage() - } - Assert.assertEquals(receivedMsg, msg) -} - - /* - @Test def testHead() { - val observer = mock(classOf[Observer[Int]]) - val o = Observable().head - val sub = o.subscribe(observer) - - verify(observer, never).onNext(any(classOf[Int])) - verify(observer, never).onCompleted() - verify(observer, times(1)).onError(any(classOf[NoSuchElementException])) - } - */ - - @Test def testTest() = { - val a: Observable[Int] = Observable() - Assert.assertEquals(4, Observable(1, 2, 3, 4).toBlockingObservable.toIterable.last) - //println("This UnitTestSuite.testTest() for rx.lang.scala.Observable") - } - -} diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/examples/SubscriptionTests.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/subscriptions/SubscriptionTests.scala similarity index 51% rename from language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/examples/SubscriptionTests.scala rename to language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/subscriptions/SubscriptionTests.scala index aad6f22f958..4309967c0a3 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/examples/SubscriptionTests.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/subscriptions/SubscriptionTests.scala @@ -1,24 +1,26 @@ -package rx.lang.scala.examples +package rx.lang.scala.subscriptions -import org.junit.{Assert, Test} +import org.junit.Assert._ +import org.junit.Test import org.scalatest.junit.JUnitSuite -import rx.lang.scala.subscriptions.{MultipleAssignmentSubscription, CompositeSubscription, BooleanSubscription, Subscription} + +import rx.lang.scala.Subscription class SubscriptionTests extends JUnitSuite { @Test def anonymousSubscriptionCreate() { val subscription = Subscription{} - Assert.assertNotNull(subscription) + assertNotNull(subscription) } @Test def anonymousSubscriptionDispose() { var unsubscribed = false val subscription = Subscription{ unsubscribed = true } - Assert.assertFalse(unsubscribed) + assertFalse(unsubscribed) subscription.unsubscribe() - Assert.assertTrue(unsubscribed) + assertTrue(unsubscribed) } @Test @@ -30,11 +32,11 @@ class SubscriptionTests extends JUnitSuite { @Test def booleanSubscription() { val subscription = BooleanSubscription() - Assert.assertFalse(subscription.isUnsubscribed) + assertFalse(subscription.isUnsubscribed) subscription.unsubscribe() - Assert.assertTrue(subscription.isUnsubscribed) + assertTrue(subscription.isUnsubscribed) subscription.unsubscribe() - Assert.assertTrue(subscription.isUnsubscribed) + assertTrue(subscription.isUnsubscribed) } @Test @@ -48,22 +50,22 @@ class SubscriptionTests extends JUnitSuite { val composite = CompositeSubscription() - Assert.assertFalse(composite.isUnsubscribed) + assertFalse(composite.isUnsubscribed) composite += s0 composite += s1 composite.unsubscribe() - Assert.assertTrue(composite.isUnsubscribed) - Assert.assertTrue(s0.isUnsubscribed) - Assert.assertTrue(u0) - Assert.assertTrue(u1) + assertTrue(composite.isUnsubscribed) + assertTrue(s0.isUnsubscribed) + assertTrue(u0) + assertTrue(u1) val s2 = BooleanSubscription() - Assert.assertFalse(s2.isUnsubscribed) + assertFalse(s2.isUnsubscribed) composite += s2 - Assert.assertTrue(s2.isUnsubscribed) + assertTrue(s2.isUnsubscribed) } @@ -74,13 +76,13 @@ class SubscriptionTests extends JUnitSuite { val composite = CompositeSubscription() composite += s0 - Assert.assertFalse(s0.isUnsubscribed) + assertFalse(s0.isUnsubscribed) composite -= s0 - Assert.assertTrue(s0.isUnsubscribed) + assertTrue(s0.isUnsubscribed) composite.unsubscribe() - Assert.assertTrue(composite.isUnsubscribed) + assertTrue(composite.isUnsubscribed) } @Test @@ -90,27 +92,27 @@ class SubscriptionTests extends JUnitSuite { val s1 = BooleanSubscription() val multiple = MultipleAssignmentSubscription() - Assert.assertFalse(multiple.isUnsubscribed) + assertFalse(multiple.isUnsubscribed) multiple.subscription = s0 - Assert.assertEquals(s0.asJavaSubscription, multiple.subscription.asJavaSubscription) + assertEquals(s0.asJavaSubscription, multiple.subscription.asJavaSubscription) multiple.subscription = s1 - Assert.assertEquals(s1.asJavaSubscription, multiple.subscription.asJavaSubscription) + assertEquals(s1.asJavaSubscription, multiple.subscription.asJavaSubscription) - Assert.assertFalse(s0.isUnsubscribed) - Assert.assertFalse(s1.isUnsubscribed) + assertFalse(s0.isUnsubscribed) + assertFalse(s1.isUnsubscribed) multiple.unsubscribe() - Assert.assertTrue(multiple.isUnsubscribed) - Assert.assertFalse(s0.isUnsubscribed) - Assert.assertTrue(s1.isUnsubscribed) + assertTrue(multiple.isUnsubscribed) + assertFalse(s0.isUnsubscribed) + assertTrue(s1.isUnsubscribed) val s2 = BooleanSubscription() - Assert.assertFalse(s2.isUnsubscribed) + assertFalse(s2.isUnsubscribed) multiple.subscription = s2 - Assert.assertTrue(s2.isUnsubscribed) + assertTrue(s2.isUnsubscribed) } }