Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more operators to RxScala #1210

Merged
merged 12 commits into from
May 20, 2014
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ class RxScalaDemo extends JUnitSuite {
o.buffer(5).subscribe((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
}

@Test def bufferExample() {
val o = Observable.from(1 to 18).zip(Observable.interval(100 millis)).map(_._1)
val boundary = Observable.interval(500 millis)
o.buffer(boundary).toBlockingObservable.foreach((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
}

@Test def windowExample() {
(for ((o, i) <- Observable.from(1 to 18).window(5).zipWithIndex; n <- o)
yield s"Observable#$i emits $n"
Expand Down Expand Up @@ -549,11 +555,10 @@ class RxScalaDemo extends JUnitSuite {
* all elements are calculated.
*/
@Test def createExampleBad() {
val o = Observable.create[String](observer => {
val o = Observable[String](observer => {
observer.onNext(calculateElement(0))
observer.onNext(calculateElement(1))
observer.onCompleted()
Subscription {}
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you want to change this? Users coming from C# will look for an Observable.create(Observer[T] => Subscription) example, and they should find something. Even if we completely remove Observable.create one day, I think we should leave this code (maybe commented out).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally agreed; no harm in leaving this in. Also for folks that watch the coursera videos.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already reverted.

o.take(1).subscribe(println(_))
}
Expand Down Expand Up @@ -644,6 +649,21 @@ class RxScalaDemo extends JUnitSuite {
println(result)
}

@Test def delayExample3(): Unit = {
val o = List(100, 500, 200).toObservable.delay(
(i: Int) => Observable.items(i).delay(i millis)
)
o.toBlockingObservable.foreach(println(_))
}

@Test def delayExample4(): Unit = {
val o = List(100, 500, 200).toObservable.delay(
() => Observable.interval(500 millis).take(1),
(i: Int) => Observable.items(i).delay(i millis)
)
o.toBlockingObservable.foreach(println(_))
}

@Test def delaySubscriptionExample(): Unit = {
val o = List(100L, 200L, 300L).toObservable.delaySubscription(2 seconds)
val result = o.toBlockingObservable.toList
Expand Down Expand Up @@ -792,6 +812,53 @@ class RxScalaDemo extends JUnitSuite {
assertEquals(List(1, 2, 3, 4), o.toBlockingObservable.toList)
}

@Test def sequenceEqualExampe(): Unit = {
val o1 = List(1, 2, 3).toObservable
val o2 = List(1, 2, 3).toObservable
val o3 = List(1, 2).toObservable
val o4 = List(1.0, 2.0, 3.0).toObservable
assertTrue(o1.sequenceEqual(o2).toBlockingObservable.single)
assertFalse(o1.sequenceEqual(o3).toBlockingObservable.single)
assertTrue(o1.sequenceEqual(o4).toBlockingObservable.single)
}

@Test def takeExample(): Unit = {
val o = (1 to 20).toObservable
.zip(Observable.interval(300 millis))
.map(_._1)
.take(2 seconds)
println(o.toBlockingObservable.toList)
}

@Test def takeRightExample(): Unit = {
val o = (1 to 6).toObservable.takeRight(3)
assertEquals(List(4, 5, 6), o.toBlockingObservable.toList)
}

@Test def takeRightExample2(): Unit = {
val o = (1 to 10).toObservable
.zip(Observable.interval(100 millis))
.map(_._1)
.takeRight(300 millis)
println(o.toBlockingObservable.toList)
}

@Test def takeRightExample3(): Unit = {
val o = (1 to 10).toObservable
.zip(Observable.interval(100 millis))
.map(_._1)
.takeRight(2, 300 millis)
println(o.toBlockingObservable.toList)
}

@Test def timeIntervalExample(): Unit = {
val o = (1 to 10).toObservable
.zip(Observable.interval(100 millis))
.map(_._1)
.timeInterval
println(o.toBlockingObservable.toList)
}

@Test def schedulerExample1(): Unit = {
val latch = new CountDownLatch(1)
val worker = IOScheduler().createWorker
Expand Down
Loading