Skip to content

Commit

Permalink
Merge pull request ReactiveX#529 from benjchristensen/scala-tweaks
Browse files Browse the repository at this point in the history
Scala Tweaks
  • Loading branch information
benjchristensen committed Nov 26, 2013
2 parents d230512 + 22ef317 commit e9c4c99
Show file tree
Hide file tree
Showing 22 changed files with 335 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ class TestSchedulerExample extends JUnitSuite {
verify(observer, never).onCompleted()
verify(observer, never).onError(any(classOf[Throwable]))

verify(observer, never).onNext(2L)

sub.unsubscribe();

scheduler.advanceTimeTo(4 seconds)
verify(observer, never).onNext(2L)
verify(observer, times(1)).onCompleted()
verify(observer, never).onError(any(classOf[Throwable]))

// after unsubscription we expect no further events
verifyNoMoreInteractions(observer)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,21 @@ object ImplicitFunctionConversions {
}
}

implicit def toJavaNotification[T](s: Notification[T]): rx.Notification[_ <: T] = s.asJava
implicit def toScalaNotification[T](s: rx.Notification[_ <: T]): Notification[T] = Notification(s)

implicit def toJavaSubscription(s: Subscription): rx.Subscription = s.asJavaSubscription
implicit def toScalaSubscription(s: rx.Subscription): Subscription = Subscription(s)

implicit def scalaSchedulerToJavaScheduler(s: Scheduler): rx.Scheduler = s.asJavaScheduler
implicit def javaSchedulerToScalaScheduler(s: rx.Scheduler): Scheduler = Scheduler(s)

implicit def toJavaObserver[T](s: Observer[T]): rx.Observer[_ >: T] = s.asJavaObserver
implicit def toScalaObserver[T](s: rx.Observer[T]): Observer[T] = Observer(s)
implicit def toScalaObserver[T](s: rx.Observer[_ >: T]): Observer[T] = Observer(s)

implicit def toJavaObservable[T](s: Observable[T]): rx.Observable[_ <: T] = s.asJavaObservable
implicit def toScalaObservable[T](s: rx.Observable[T]): Observable[T] = Observable(s)
implicit def toScalaObservable[T](s: rx.Observable[_ <: T]): Observable[T] = Observable(s)

implicit def scalaFunction1ToOnSubscribeFunc[T](f: rx.lang.scala.Observer[T] => Subscription) =
new rx.Observable.OnSubscribeFunc[T] {
def onSubscribe(obs: rx.Observer[_ >: T]): rx.Subscription = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.lang.scala

/**
Expand Down Expand Up @@ -36,6 +51,11 @@ object Notification {
}

object OnNext {

def apply[T](value: T): Notification[T] = {
Notification(new rx.Notification[T](value))
}

def unapply[U](n: Notification[U]): Option[U] = n match {
case n2: OnNext[U] => Some(n.asJava.getValue)
case _ => None
Expand All @@ -47,6 +67,11 @@ object Notification {
}

object OnError {

def apply[T](error: Throwable): Notification[T] = {
Notification(new rx.Notification[T](error))
}

def unapply[U](n: Notification[U]): Option[Throwable] = n match {
case n2: OnError[U] => Some(n2.asJava.getThrowable)
case _ => None
Expand All @@ -56,6 +81,11 @@ object Notification {
class OnCompleted[T](val asJava: rx.Notification[_ <: T]) extends Notification[T] {}

object OnCompleted {

def apply[T](): Notification[T] = {
Notification(new rx.Notification())
}

def unapply[U](n: Notification[U]): Option[Unit] = n match {
case n2: OnCompleted[U] => Some()
case _ => None
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.lang.scala

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.lang.scala

import java.util.Date
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.lang.scala

import ImplicitFunctionConversions.scalaBooleanFunction1ToRxBooleanFunc1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.lang.scala.concurrency

import java.util.concurrent.Executor
Expand All @@ -13,29 +28,29 @@ object Schedulers {
/**
* Returns a [[rx.lang.scala.Scheduler]] that executes work immediately on the current thread.
*/
def immediate: Scheduler = rx.concurrency.Schedulers.immediate()
def immediate: Scheduler = Scheduler(rx.concurrency.Schedulers.immediate())

/**
* Returns a [[rx.lang.scala.Scheduler]] that queues work on the current thread to be executed after the current work completes.
*/
def currentThread: Scheduler = rx.concurrency.Schedulers.currentThread()
def currentThread: Scheduler = Scheduler(rx.concurrency.Schedulers.currentThread())

/**
* Returns a [[rx.lang.scala.Scheduler]] that creates a new {@link Thread} for each unit of work.
*/
def newThread: Scheduler = rx.concurrency.Schedulers.newThread
def newThread: Scheduler = Scheduler(rx.concurrency.Schedulers.newThread)

/**
* Returns a [[rx.lang.scala.Scheduler]] that queues work on an `java.util.concurrent.Executor`.
*
* Note that this does not support scheduled actions with a delay.
*/
def executor(executor: Executor): Scheduler = rx.concurrency.Schedulers.executor(executor)
def executor(executor: Executor): Scheduler = Scheduler(rx.concurrency.Schedulers.executor(executor))

/**
* Returns a [[rx.lang.scala.Scheduler]] that queues work on an `java.util.concurrent.ScheduledExecutorService`.
*/
def executor(executor: ScheduledExecutorService): Scheduler = rx.concurrency.Schedulers.executor(executor)
def executor(executor: ScheduledExecutorService): Scheduler = Scheduler(rx.concurrency.Schedulers.executor(executor))

/**
* Returns a [[rx.lang.scala.Scheduler]] intended for computational work.
Expand All @@ -46,7 +61,7 @@ object Schedulers {
*
* Do not perform IO-bound work on this scheduler. Use [[rx.lang.scala.concurrency.Schedulers.threadPoolForIO]] instead.
*/
def threadPoolForComputation: Scheduler = rx.concurrency.Schedulers.threadPoolForComputation()
def threadPoolForComputation: Scheduler = Scheduler(rx.concurrency.Schedulers.threadPoolForComputation())

/**
* [[rx.lang.scala.Scheduler]] intended for IO-bound work.
Expand All @@ -57,6 +72,6 @@ object Schedulers {
*
* Do not perform computational work on this scheduler. Use [[rx.lang.scala.concurrency.Schedulers.threadPoolForComputation]] instead.
*/
def threadPoolForIO: Scheduler = rx.concurrency.Schedulers.threadPoolForIO()
def threadPoolForIO: Scheduler = Scheduler(rx.concurrency.Schedulers.threadPoolForIO())

}
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.lang.scala.concurrency

import scala.concurrent.duration.Duration
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.lang.scala

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.lang.scala.subjects

import rx.lang.scala.Subject
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.lang.scala.subjects

import rx.lang.scala.Subject
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.lang.scala.subjects

import rx.lang.scala.Subject
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.lang.scala.subjects

import rx.lang.scala.Subject
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.lang.scala

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.lang.scala

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.lang.scala.subscriptions

import rx.lang.scala._
Expand Down
Loading

0 comments on commit e9c4c99

Please sign in to comment.