Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add integration with new async/await support in 2.12.12+ and 2.13.3+ #279

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ before_cache:
scala:
- 2.11.12
- 2.12.8
- 2.13.1
- 2.13.3

jdk:
- openjdk8
Expand Down
6 changes: 5 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def jdk11GcJavaOptions: Seq[String] = {

val defaultProjectSettings = Seq(
scalaVersion := "2.12.8",
crossScalaVersions := Seq("2.11.12", "2.12.8", "2.13.1")
crossScalaVersions := Seq("2.11.12", "2.12.8", "2.13.3")
)

val baseSettings = Seq(
Expand Down Expand Up @@ -115,6 +115,10 @@ val baseSettings = Seq(
"-Xlint:-missing-interpolator",
"-Yrangepos"
),
scalacOptions ++= {
if (scalaBinaryVersion.value == "2.13") List("-Xasync")
else Nil
},
// Note: Use -Xlint rather than -Xlint:unchecked when TestThriftStructure
// warnings are resolved
javacOptions ++= Seq("-Xlint:unchecked", "-source", "1.8", "-target", "1.8"),
Expand Down
106 changes: 106 additions & 0 deletions util-core/src/main/scala/com/twitter/util/Async.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package com.twitter.util

import scala.language.experimental.macros
import scala.concurrent.{ExecutionContext}
import com.twitter.util.Future
import scala.annotation.compileTimeOnly
import scala.reflect.macros.whitebox

/**
* Async blocks provide a direct means to work with [[scala.concurrent.Future]].
*
* For example, to use an API that fetches a web page to fetch
* two pages and add their lengths:
*
* {{{
* import com.twitter.util.Future
* import com.twitter.util.Async.{async, await}
*
* def fetchURL(url: URL): Future[String] = ...
*
* val sumLengths: Future[Int] = async {
* val body1 = fetchURL("http://scala-lang.org")
* val body2 = fetchURL("http://docs.scala-lang.org")
* await(body1).length + await(body2).length
* }
* }}}
*
* Note that in the following program, the second fetch does *not* start
* until after the first. If you need to start tasks in parallel, you must do
* so before `await`-ing a result.
*
* {{{
* val sumLengths: Future[Int] = async {
* await(fetchURL("http://scala-lang.org")).length + await(fetchURL("http://docs.scala-lang.org")).length
* }
* }}}
*/
object Async {

/**
* Run the block of code `body` asynchronously. `body` may contain calls to `await` when the results of
* a `Future` are needed; this is translated into non-blocking code.
*/
def async[T](body: => T): Future[T] =
macro asyncImpl[T]

/**
* Non-blocking await the on result of `awaitable`. This may only be used directly within an enclosing `async` block.
*
* Internally, this will register the remainder of the code in enclosing `async` block as a callback
* in the `onComplete` handler of `awaitable`, and will *not* block a thread.
*/
@compileTimeOnly("[async] `await` must be enclosed in an `async` block")
def await[T](awaitable: Future[T]): T =
??? // No implementation here, as calls to this are translated to `onComplete` by the macro.

def asyncImpl[T: c.WeakTypeTag](
c: whitebox.Context
)(body: c.Tree): c.Tree = {
import c.universe._
if (!c.compilerSettings.contains("-Xasync")) {
c.abort(
c.macroApplication.pos,
"The async requires the compiler option -Xasync (supported only by Scala 2.12.12+ / 2.13.3+)"
)
} else
try {
val awaitSym = typeOf[Async.type].decl(TermName("await"))
def mark(t: DefDef): Tree = {
import language.reflectiveCalls
c.internal
.asInstanceOf[{
def markForAsyncTransform(
owner: Symbol,
method: DefDef,
awaitSymbol: Symbol,
config: Map[String, AnyRef]
): DefDef
}
]
.markForAsyncTransform(
c.internal.enclosingOwner,
t,
awaitSym,
Map.empty
)
}
val name = TypeName("stateMachine$async")
q"""
final class $name extends _root_.com.twitter.util.FutureStateMachine() {
// FSM translated method
${mark(
q"""override def apply(tr$$async: _root_.scala.util.Try[_root_.scala.AnyRef]) = ${body}"""
)}
}
new $name().start() : ${c.macroApplication.tpe}
"""
} catch {
case e: ReflectiveOperationException =>
c.abort(
c.macroApplication.pos,
"-Xasync is provided as a Scala compiler option, but the async macro is unable to call c.internal.markForAsyncTransform. " + e.getClass.getName + " " + e.getMessage
)
}
}
}
78 changes: 78 additions & 0 deletions util-core/src/main/scala/com/twitter/util/FutureStateMachine.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.twitter.util

import com.twitter.util.Promise
import com.twitter.util.Future
import com.twitter.util.Return
import com.twitter.util.Throw

import java.util.Objects

import scala.concurrent.{ExecutionContext}
import scala.util.Success
import scala.util.Failure

/** The base class for state machines generated by the `scala.async.Async.async` macro.
* Not intended to be directly extended in user-written code.
*/
abstract class FutureStateMachine()
extends Function1[scala.util.Try[AnyRef], Unit] {

type F = com.twitter.util.Future[AnyRef]
type R = scala.util.Try[AnyRef]

private[this] val result$async: Promise[AnyRef] = Promise[AnyRef]();
private[this] var state$async: Int = 0

/** Retrieve the current value of the state variable */
protected def state: Int = state$async

/** Assign `i` to the state variable */
protected def state_=(s: Int): Unit = state$async = s

/** Complete the state machine with the given failure. */
// scala-async accidentally started catching NonFatal exceptions in:
// https://github.com/scala/scala-async/commit/e3ff0382ae4e015fc69da8335450718951714982#diff-136ab0b6ecaee5d240cd109e2b17ccb2R411
// This follows the new behaviour but should we fix the regression?
protected def completeFailure(t: Throwable): Unit = {
result$async.update(Throw(t))
}

/** Complete the state machine with the given value. */
protected def completeSuccess(value: AnyRef): Unit = {
result$async.update(Return(value))
}

/** Register the state machine as a completion callback of the given future. */
protected def onComplete(f: F): Unit = {
f.respond(t => this(t.asScala))
}

/** Extract the result of the given future if it is complete, or `null` if it is incomplete. */
protected def getCompleted(f: F): R = {
if (f.isDefined) {
f.poll.get.asScala
} else {
null
}
}

/**
* Extract the success value of the given future. If the state machine detects a failure it may
* complete the async block and return `this` as a sentinel value to indicate that the caller
* (the state machine dispatch loop) should immediately exit.
*/
protected def tryGet(tr: R): AnyRef =
tr match {
case Success(value) =>
value.asInstanceOf[AnyRef]
case Failure(throwable) =>
completeFailure(throwable)
this // sentinel value to indicate the dispatch loop should exit.
}

def start[T](): Future[T] = {
// This cast is safe because we know that `def apply` does not consult its argument when `state == 0`.
Future.Unit.asInstanceOf[Future[AnyRef]].respond(t => this(t.asScala))
result$async.asInstanceOf[Future[T]]
}
}
19 changes: 19 additions & 0 deletions util-core/src/test/scala/com/twitter/util/AsyncTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.twitter.util

import org.scalatest.WordSpec
import com.twitter.util.Async.{async, await}

class AsyncTest extends WordSpec {

"async/await" should {
"work" in {
val c = async {
val a = Future.value(10)
val b = Future.value(5)
await(a) + await(b)
}
assert(c.poll == Some(Return(15)))
}
}

}