Skip to content

Commit

Permalink
Merge pull request #420 from samuelgruetter/scalaadaptor
Browse files Browse the repository at this point in the history
Scala Adaptor
  • Loading branch information
benjchristensen committed Oct 9, 2013
2 parents b8ba0f7 + 677f84c commit 2bd22d6
Show file tree
Hide file tree
Showing 23 changed files with 1,494 additions and 802 deletions.
5 changes: 0 additions & 5 deletions language-adaptors/rxjava-scala-java/README.md

This file was deleted.

32 changes: 0 additions & 32 deletions language-adaptors/rxjava-scala-java/build.gradle

This file was deleted.

9 changes: 8 additions & 1 deletion language-adaptors/rxjava-scala/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,14 @@ For more examples, see [RxScalaDemo.scala](https://github.com/Netflix/RxJava/blo

Scala code using Rx should only import members from `rx.lang.scala` and below.

Work on this adaptor is still in progress, and for the moment, the best source of documentation are the comments in the source code of [`rx.lang.scala.Observable`](https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala).

## Documentation

The API documentation can be found [here](http://rxscala.github.io/scaladoc/index.html#rx.lang.scala.Observable).

You can build the API documentation yourself by running `./gradlew scaladoc` in the RxJava root directory.

Then navigate to `RxJava/language-adaptors/rxjava-scala/build/docs/scaladoc/index.html` to display it.


## Binaries
Expand Down
17 changes: 11 additions & 6 deletions language-adaptors/rxjava-scala/TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@ TODOs for Scala Adapter

This is a (probably incomplete) list of what still needs to be done in the Scala adaptor:

* mirror complete Java package structure in Scala
* objects for classes with static methods or singletons (e.g. Schedulers, Subscriptions)
* Notification as a case class
* integrating Scala Futures, should there be a common base interface for Futures and Observables?
* Add methods present in Scala collections library, but not in RxJava, e.g. aggregate à la Scala, collect, exists, tails, ...
* Integrating Scala Futures: Should there be a common base interface for Futures and Observables? And if all subscribers of an Observable wrapping a Future unsubscribe, the Future should be cancelled, but Futures do not support cancellation.
* Add methods present in Scala collections library, but not in RxJava, e.g. aggregate à la Scala, collect, tails, ...
* combineLatest with arities > 2
* decide where the MovieLib/MovieLibUsage (use Scala code from Java code) example should live and make sure gradle builds it in the right order
* Implicit schedulers?
* Avoid text duplication in scaladoc using templates, add examples, distinction between use case signature and full signature
* other small TODOs


(Implicit) schedulers for interval: Options:

```scala
def interval(duration: Duration)(implicit scheduler: Scheduler): Observable[Long]
def interval(duration: Duration)(scheduler: Scheduler): Observable[Long]
def interval(scheduler: Scheduler)(duration: Duration): Observable[Long]
def interval(duration: Duration, scheduler: Scheduler): Observable[Long] && def interval(duration: Duration): Observable[Long]
````
29 changes: 29 additions & 0 deletions language-adaptors/rxjava-scala/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,30 @@ tasks.withType(ScalaCompile) {
}

sourceSets {
main {
scala {
srcDir 'src/main/scala'
}
}
test {
scala {
srcDir 'src/main/scala'
srcDir 'src/test/scala'
srcDir 'src/examples/scala'
srcDir 'src/examples/java'
}
java.srcDirs = []
}
examples {
// It seems that in Gradle, the dependency "compileScala depends on compileJava" is hardcoded,
// or at least not meant to be removed.
// However, compileScala also runs javac at the very end, so we just add the Java sources to
// the scala source set:
scala {
srcDir 'src/examples/scala'
srcDir 'src/examples/java'
}
java.srcDirs = []
}
}

Expand All @@ -34,6 +54,15 @@ tasks.compileScala {
classpath = classpath + (configurations.compile + configurations.provided)
}

tasks.compileExamplesScala {
classpath = classpath + files(compileScala.destinationDir) + (configurations.compile + configurations.provided)
}

// Add RxJava core to Scaladoc input:
// tasks.scaladoc.source(project(':rxjava-core').tasks.getByPath(':rxjava-core:compileJava').source)
// println("-------")
// println(tasks.scaladoc.source.asPath)

task test(overwrite: true, dependsOn: testClasses) << {
ant.taskdef(name: 'scalatest',
classname: 'org.scalatest.tools.ScalaTestAntTask',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import rx.lang.scala._
import scala.concurrent.duration._
import org.junit.{Before, Test, Ignore}
import org.junit.Assert._
import rx.lang.scala.concurrency.NewThreadScheduler
import rx.lang.scala.concurrency.Schedulers
import java.io.IOException

@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily
class RxScalaDemo extends JUnitSuite {
Expand Down Expand Up @@ -167,10 +168,10 @@ class RxScalaDemo extends JUnitSuite {

@Test def schedulersExample() {
val o = Observable.interval(100 millis).take(8)
o.observeOn(NewThreadScheduler).subscribe(
o.observeOn(Schedulers.newThread).subscribe(
i => println(s"${i}a (on thread #${Thread.currentThread().getId()})")
)
o.observeOn(NewThreadScheduler).subscribe(
o.observeOn(Schedulers.newThread).subscribe(
i => println(s"${i}b (on thread #${Thread.currentThread().getId()})")
)
waitFor(o)
Expand Down Expand Up @@ -287,7 +288,7 @@ class RxScalaDemo extends JUnitSuite {
// We can't put a general average method into Observable.scala, because Scala's Numeric
// does not have scalar multiplication (we would need to calculate (1.0/numberOfElements)*sum)
def doubleAverage(o: Observable[Double]): Observable[Double] = {
for ((finalSum, finalCount) <- o.fold((0.0, 0))({case ((sum, count), elem) => (sum+elem, count+1)}))
for ((finalSum, finalCount) <- o.foldLeft((0.0, 0))({case ((sum, count), elem) => (sum+elem, count+1)}))
yield finalSum / finalCount
}

Expand Down Expand Up @@ -321,13 +322,13 @@ class RxScalaDemo extends JUnitSuite {
.toBlockingObservable.foreach(println(_))
}

// source Observables are in a List:
@Test def zipManySeqExample() {
val observables = List(Observable(1, 2), Observable(10, 20), Observable(100, 200))
(for (seq <- Observable.zip(observables)) yield seq.mkString("(", ", ", ")"))
// source Observables are all known:
@Test def zip3Example() {
val o = Observable.zip(Observable(1, 2), Observable(10, 20), Observable(100, 200))
(for ((n1, n2, n3) <- o) yield s"$n1, $n2 and $n3")
.toBlockingObservable.foreach(println(_))
}

// source Observables are in an Observable:
@Test def zipManyObservableExample() {
val observables = Observable(Observable(1, 2), Observable(10, 20), Observable(100, 200))
Expand Down Expand Up @@ -375,6 +376,88 @@ class RxScalaDemo extends JUnitSuite {
assertEquals(Seq(10, 9, 8, 7), Observable(10, 7, 8, 9).toSeq.map(_.sortWith(f)).toBlockingObservable.single)
}

@Test def timestampExample() {
val timestamped = Observable.interval(100 millis).take(3).timestamp.toBlockingObservable
for ((millis, value) <- timestamped if value > 0) {
println(value + " at t = " + millis)
}
}

@Test def materializeExample1() {
def printObservable[T](o: Observable[T]): Unit = {
import Notification._
o.materialize.subscribe(n => n match {
case OnNext(v) => println("Got value " + v)
case OnCompleted() => println("Completed")
case OnError(err) => println("Error: " + err.getMessage)
})
}

val o1 = Observable.interval(100 millis).take(3)
val o2 = Observable(new IOException("Oops"))
printObservable(o1)
waitFor(o1)
printObservable(o2)
waitFor(o2)
}

@Test def materializeExample2() {
import Notification._
Observable(1, 2, 3).materialize.subscribe(n => n match {
case OnNext(v) => println("Got value " + v)
case OnCompleted() => println("Completed")
case OnError(err) => println("Error: " + err.getMessage)
})
}

@Test def elementAtReplacement() {
assertEquals("b", Observable("a", "b", "c").drop(1).first.toBlockingObservable.single)
}

@Test def elementAtOrDefaultReplacement() {
assertEquals("b", Observable("a", "b", "c").drop(1).firstOrElse("!").toBlockingObservable.single)
assertEquals("!!", Observable("a", "b", "c").drop(10).firstOrElse("!!").toBlockingObservable.single)
}

@Test def observableLikeFuture1() {
implicit val scheduler = Schedulers.threadPoolForIO
val o1 = observable {
Thread.sleep(1000)
5
}
val o2 = observable {
Thread.sleep(500)
4
}
Thread.sleep(500)
val t1 = System.currentTimeMillis
println((o1 merge o2).first.toBlockingObservable.single)
println(System.currentTimeMillis - t1)
}

@Test def observableLikeFuture2() {
class Friend {}
val session = new Object {
def getFriends: List[Friend] = List(new Friend, new Friend)
}

implicit val scheduler = Schedulers.threadPoolForIO
val o: Observable[List[Friend]] = observable {
session.getFriends
}
o.subscribe(
friendList => println(friendList),
err => println(err.getMessage)
)

Thread.sleep(1500) // or convert to BlockingObservable
}

@Test def takeWhileWithIndexAlternative {
val condition = true
Observable("a", "b").zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1)
}

def output(s: String): Unit = println(s)

// blocks until obs has completed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,56 +19,52 @@ import java.{ lang => jlang }
import rx.util.functions._

/**
* These function conversions convert between Scala functions and Rx Funcs and Actions.
* Most users RxScala won't need them, but they might be useful if one wants to use
* the rx.Observable directly instead of using rx.lang.scala.Observable or if one wants
* to use a Java library taking/returning Funcs and Actions.
* These function conversions convert between Scala functions and Rx `Func`s and `Action`s.
* Most RxScala users won't need them, but they might be useful if one wants to use
* the `rx.Observable` directly instead of using `rx.lang.scala.Observable` or if one wants
* to use a Java library taking/returning `Func`s and `Action`s.
*/
object ImplicitFunctionConversions {
import language.implicitConversions

implicit def schedulerActionToFunc2[T](action: (Scheduler, T) => Subscription) =
new Func2[rx.Scheduler, T, Subscription] {
def call(s: rx.Scheduler, t: T): Subscription = {
action(s, t)
}
}

implicit def scalaSchedulerToJavaScheduler(s: Scheduler): rx.Scheduler = s.asJava

implicit def javaSchedulerToScalaScheduler(s: rx.Scheduler): Scheduler = Scheduler(s)

implicit def scalaFunction1ToOnSubscribeFunc[T](f: rx.lang.scala.Observer[T] => Subscription) =
new rx.Observable.OnSubscribeFunc[T] {
def onSubscribe(obs: Observer[_ >: T]): Subscription = {
def onSubscribe(obs: rx.Observer[_ >: T]): rx.Subscription = {
f(obs)
}
}

/**
* Converts a by-name parameter to a Rx Func0
*/
implicit def scalaByNameParamToFunc0[B](param: => B): Func0[B] =
new Func0[B] {
def call(): B = param
}

/**
* Converts 0-arg function to Rx Action0
*/
implicit def scalaFunction0ProducingUnitToAction0(f: (() => Unit)): Action0 =
new Action0 {
def call(): Unit = f()
}

/**
* Converts 1-arg function to Rx Action1
*/
implicit def scalaFunction1ProducingUnitToAction1[A](f: (A => Unit)): Action1[A] =
new Action1[A] {
def call(a: A): Unit = f(a)
}

/**
* Converts 1-arg predicate to Rx Func1[A, java.lang.Boolean]
*/
implicit def scalaBooleanFunction1ToRxBooleanFunc1[A](f: (A => Boolean)): Func1[A, jlang.Boolean] =
new Func1[A, jlang.Boolean] {
def call(a: A): jlang.Boolean = f(a).booleanValue
}

/**
* Converts 2-arg predicate to Rx Func2[A, B, java.lang.Boolean]
*/
implicit def scalaBooleanFunction2ToRxBooleanFunc1[A, B](f: ((A, B) => Boolean)): Func2[A, B, jlang.Boolean] =
new Func2[A, B, jlang.Boolean] {
def call(a: A, b: B): jlang.Boolean = f(a, b).booleanValue
Expand All @@ -79,34 +75,21 @@ object ImplicitFunctionConversions {
def call(args: java.lang.Object*): R = f(args)
}

/**
* Converts a specific function shape (used in takeWhile) to the equivalent Java types with an Rx Func2
*/
implicit def convertTakeWhileFuncToRxFunc2[A](f: (A, Int) => Boolean): Func2[A, jlang.Integer, jlang.Boolean] =
new Func2[A, jlang.Integer, jlang.Boolean] {
def call(a: A, b: jlang.Integer): jlang.Boolean = f(a, b).booleanValue
}

/**
* Converts a function shaped ilke compareTo into the equivalent Rx Func2
*/
implicit def convertComparisonFuncToRxFunc2[A](f: (A, A) => Int): Func2[A, A, jlang.Integer] =
new Func2[A, A, jlang.Integer] {
def call(a1: A, a2: A): jlang.Integer = f(a1, a2).intValue
}

/**
* This implicit allows Scala code to use any exception type and still work
* with invariant Func1 interface
*/
implicit def exceptionFunction1ToRxExceptionFunc1[A <: Exception, B](f: (A => B)): Func1[Exception, B] =
new Func1[Exception, B] {
def call(ex: Exception): B = f(ex.asInstanceOf[A])
}

/**
* The following implicits convert functions of different arities into the Rx equivalents
*/
implicit def scalaFunction0ToRxFunc0[A](f: () => A): Func0[A] =
new Func0[A] {
def call(): A = f()
Expand Down
Loading

0 comments on commit 2bd22d6

Please sign in to comment.