Skip to content

Commit

Permalink
Merge pull request ReactiveX#1 from samuelgruetter/RxJavaBugFixesSam
Browse files Browse the repository at this point in the history
A few trivial Scala adaptor details
  • Loading branch information
headinthebox committed Nov 30, 2013
2 parents 8ae65eb + 9ecbd5c commit 7273063
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 47 deletions.
2 changes: 0 additions & 2 deletions gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ DEFAULT_JVM_OPTS=""
APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"`

JAVA_HOME="/Library/Java/JavaVirtualMachines/jdk1.7.0_40.jdk/Contents/Home"

# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,40 +427,6 @@ class RxScalaDemo extends JUnitSuite {
assertEquals("!!", Observable("a", "b", "c").drop(10).firstOrElse("!!").toBlockingObservable.single)
}

@Test def observableLikeFuture1() {
implicit val scheduler = ThreadPoolForIOScheduler()
val o1 = Observable {
Thread.sleep(1000)
5
}
val o2 = Observable {
Thread.sleep(500)
4
}
Thread.sleep(500)
val t1 = System.currentTimeMillis
println((o1 merge o2).first.toBlockingObservable.single)
println(System.currentTimeMillis - t1)
}

@Test def ObservableLikeFuture2() {
class Friend {}
val session = new Object {
def getFriends: List[Friend] = List(new Friend, new Friend)
}

implicit val scheduler = ThreadPoolForIOScheduler()
val o: Observable[List[Friend]] = Observable {
session.getFriends
}
o.subscribe(
friendList => println(friendList),
err => println(err.getMessage)
)

Thread.sleep(1500) // or convert to BlockingObservable
}

@Test def takeWhileWithIndexAlternative {
val condition = true
Observable("a", "b").zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ package rx.lang.scala

import java.lang.Exception
import java.{ lang => jlang }
//import rx.lang.scala._
import rx.util.functions._

import scala.collection.Seq
import java.{lang => jlang}
import scala.language.implicitConversions
//import rx.lang.scala.Scheduler

import rx.util.functions._

/**
* These function conversions convert between Scala functions and Rx `Func`s and `Action`s.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,10 @@ trait Observable[+T]
* corresponding elements using the selector function.
* The number of `onNext` invocations of the resulting `Observable[(T, U)]`
* is the minumum of the number of `onNext` invocations of `this` and `that`.
*
* Note that this function is private because Scala collections don't have such a function.
*/
def zip[U, R](that: Observable[U], selector: (T,U) => R): Observable[R] = {
private def zip[U, R](that: Observable[U], selector: (T,U) => R): Observable[R] = {
Observable[R](rx.Observable.zip[T, U, R](this.asJavaObservable, that.asJavaObservable, selector))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ object ThreadPoolForComputationScheduler {
*
* This can be used for event-loops, processing callbacks and other computational work.
*
* Do not perform IO-bound work on this scheduler. Use [[rx.lang.scala.concurrency.Schedulers.threadPoolForIO]] instead.
* Do not perform IO-bound work on this scheduler. Use [[rx.lang.scala.concurrency.ThreadPoolForIOScheduler]] instead.
*/
def apply(): ThreadPoolForComputationScheduler = {
def apply(): ThreadPoolForComputationScheduler = {
new ThreadPoolForComputationScheduler(rx.concurrency.Schedulers.threadPoolForComputation())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ object ThreadPoolForIOScheduler {
*
* This can be used for asynchronously performing blocking IO.
*
* Do not perform computational work on this scheduler. Use [[rx.lang.scala.concurrency.Schedulers.threadPoolForComputation]] instead.
* Do not perform computational work on this scheduler. Use [[rx.lang.scala.concurrency.ThreadPoolForComputationScheduler]] instead.
*/
def apply(): ThreadPoolForIOScheduler = {
def apply(): ThreadPoolForIOScheduler = {
new ThreadPoolForIOScheduler(rx.concurrency.Schedulers.threadPoolForIO())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,9 @@
*/
package rx.lang

package object scala {
}
/**
* This package contains all classes that RxScala users need.
*
* It basically mirrors the structure of package `rx`, but some changes were made to make it more Scala-idiomatic.
*/
package object scala {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.lang.scala

/**
* Subjects are Observers and Observables at the same time.
*/
package object subjects {}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import org.junit.Assert._
import org.junit.Test
import org.scalatest.junit.JUnitSuite

import rx.lang.scala.Subscription

class SubscriptionTests extends JUnitSuite {

@Test
Expand Down

0 comments on commit 7273063

Please sign in to comment.