From 81302815e146e8115f0a715c570c73d1c402cf02 Mon Sep 17 00:00:00 2001 From: Olafur Pall Geirsson Date: Fri, 26 Jun 2020 13:17:32 +0200 Subject: [PATCH] Add integration with new async/await support in 2.12.12+ and 2.13.3+ This commit adds support to compose Twitter futures using async/await style syntax instead of map/flatMap syntax. This functionality is only available in the newly released 2.13.3 (notes https://github.com/scala/scala/releases/tag/v2.13.3) and it will also become available in the upcoming 2.12.12 release. --- .travis.yml | 2 +- build.sbt | 6 +- .../main/scala/com/twitter/util/Async.scala | 106 ++++++++++++++++++ .../com/twitter/util/FutureStateMachine.scala | 78 +++++++++++++ .../scala/com/twitter/util/AsyncTest.scala | 19 ++++ 5 files changed, 209 insertions(+), 2 deletions(-) create mode 100644 util-core/src/main/scala/com/twitter/util/Async.scala create mode 100644 util-core/src/main/scala/com/twitter/util/FutureStateMachine.scala create mode 100644 util-core/src/test/scala/com/twitter/util/AsyncTest.scala diff --git a/.travis.yml b/.travis.yml index f932960712..437eca5020 100644 --- a/.travis.yml +++ b/.travis.yml @@ -26,7 +26,7 @@ before_cache: scala: - 2.11.12 - 2.12.8 - - 2.13.1 + - 2.13.3 jdk: - openjdk8 diff --git a/build.sbt b/build.sbt index fe941c8fdf..563246849f 100644 --- a/build.sbt +++ b/build.sbt @@ -76,7 +76,7 @@ def jdk11GcJavaOptions: Seq[String] = { val defaultProjectSettings = Seq( scalaVersion := "2.12.8", - crossScalaVersions := Seq("2.11.12", "2.12.8", "2.13.1") + crossScalaVersions := Seq("2.11.12", "2.12.8", "2.13.3") ) val baseSettings = Seq( @@ -115,6 +115,10 @@ val baseSettings = Seq( "-Xlint:-missing-interpolator", "-Yrangepos" ), + scalacOptions ++= { + if (scalaBinaryVersion.value == "2.13") List("-Xasync") + else Nil + }, // Note: Use -Xlint rather than -Xlint:unchecked when TestThriftStructure // warnings are resolved javacOptions ++= Seq("-Xlint:unchecked", "-source", "1.8", "-target", "1.8"), diff --git a/util-core/src/main/scala/com/twitter/util/Async.scala b/util-core/src/main/scala/com/twitter/util/Async.scala new file mode 100644 index 0000000000..fa151ae0b4 --- /dev/null +++ b/util-core/src/main/scala/com/twitter/util/Async.scala @@ -0,0 +1,106 @@ +package com.twitter.util + +import scala.language.experimental.macros +import scala.concurrent.{ExecutionContext} +import com.twitter.util.Future +import scala.annotation.compileTimeOnly +import scala.reflect.macros.whitebox + +/** + * Async blocks provide a direct means to work with [[scala.concurrent.Future]]. + * + * For example, to use an API that fetches a web page to fetch + * two pages and add their lengths: + * + * {{{ + * import com.twitter.util.Future + * import com.twitter.util.Async.{async, await} + * + * def fetchURL(url: URL): Future[String] = ... + * + * val sumLengths: Future[Int] = async { + * val body1 = fetchURL("http://scala-lang.org") + * val body2 = fetchURL("http://docs.scala-lang.org") + * await(body1).length + await(body2).length + * } + * }}} + * + * Note that in the following program, the second fetch does *not* start + * until after the first. If you need to start tasks in parallel, you must do + * so before `await`-ing a result. + * + * {{{ + * val sumLengths: Future[Int] = async { + * await(fetchURL("http://scala-lang.org")).length + await(fetchURL("http://docs.scala-lang.org")).length + * } + * }}} + */ +object Async { + + /** + * Run the block of code `body` asynchronously. `body` may contain calls to `await` when the results of + * a `Future` are needed; this is translated into non-blocking code. + */ + def async[T](body: => T): Future[T] = + macro asyncImpl[T] + + /** + * Non-blocking await the on result of `awaitable`. This may only be used directly within an enclosing `async` block. + * + * Internally, this will register the remainder of the code in enclosing `async` block as a callback + * in the `onComplete` handler of `awaitable`, and will *not* block a thread. + */ + @compileTimeOnly("[async] `await` must be enclosed in an `async` block") + def await[T](awaitable: Future[T]): T = + ??? // No implementation here, as calls to this are translated to `onComplete` by the macro. + + def asyncImpl[T: c.WeakTypeTag]( + c: whitebox.Context + )(body: c.Tree): c.Tree = { + import c.universe._ + if (!c.compilerSettings.contains("-Xasync")) { + c.abort( + c.macroApplication.pos, + "The async requires the compiler option -Xasync (supported only by Scala 2.12.12+ / 2.13.3+)" + ) + } else + try { + val awaitSym = typeOf[Async.type].decl(TermName("await")) + def mark(t: DefDef): Tree = { + import language.reflectiveCalls + c.internal + .asInstanceOf[{ + def markForAsyncTransform( + owner: Symbol, + method: DefDef, + awaitSymbol: Symbol, + config: Map[String, AnyRef] + ): DefDef + } + ] + .markForAsyncTransform( + c.internal.enclosingOwner, + t, + awaitSym, + Map.empty + ) + } + val name = TypeName("stateMachine$async") + q""" + final class $name extends _root_.com.twitter.util.FutureStateMachine() { + // FSM translated method + ${mark( + q"""override def apply(tr$$async: _root_.scala.util.Try[_root_.scala.AnyRef]) = ${body}""" + )} + } + new $name().start() : ${c.macroApplication.tpe} + """ + } catch { + case e: ReflectiveOperationException => + c.abort( + c.macroApplication.pos, + "-Xasync is provided as a Scala compiler option, but the async macro is unable to call c.internal.markForAsyncTransform. " + e.getClass.getName + " " + e.getMessage + ) + } + } +} diff --git a/util-core/src/main/scala/com/twitter/util/FutureStateMachine.scala b/util-core/src/main/scala/com/twitter/util/FutureStateMachine.scala new file mode 100644 index 0000000000..64e75a84e7 --- /dev/null +++ b/util-core/src/main/scala/com/twitter/util/FutureStateMachine.scala @@ -0,0 +1,78 @@ +package com.twitter.util + +import com.twitter.util.Promise +import com.twitter.util.Future +import com.twitter.util.Return +import com.twitter.util.Throw + +import java.util.Objects + +import scala.concurrent.{ExecutionContext} +import scala.util.Success +import scala.util.Failure + +/** The base class for state machines generated by the `scala.async.Async.async` macro. + * Not intended to be directly extended in user-written code. + */ +abstract class FutureStateMachine() + extends Function1[scala.util.Try[AnyRef], Unit] { + + type F = com.twitter.util.Future[AnyRef] + type R = scala.util.Try[AnyRef] + + private[this] val result$async: Promise[AnyRef] = Promise[AnyRef](); + private[this] var state$async: Int = 0 + + /** Retrieve the current value of the state variable */ + protected def state: Int = state$async + + /** Assign `i` to the state variable */ + protected def state_=(s: Int): Unit = state$async = s + + /** Complete the state machine with the given failure. */ + // scala-async accidentally started catching NonFatal exceptions in: + // https://github.com/scala/scala-async/commit/e3ff0382ae4e015fc69da8335450718951714982#diff-136ab0b6ecaee5d240cd109e2b17ccb2R411 + // This follows the new behaviour but should we fix the regression? + protected def completeFailure(t: Throwable): Unit = { + result$async.update(Throw(t)) + } + + /** Complete the state machine with the given value. */ + protected def completeSuccess(value: AnyRef): Unit = { + result$async.update(Return(value)) + } + + /** Register the state machine as a completion callback of the given future. */ + protected def onComplete(f: F): Unit = { + f.respond(t => this(t.asScala)) + } + + /** Extract the result of the given future if it is complete, or `null` if it is incomplete. */ + protected def getCompleted(f: F): R = { + if (f.isDefined) { + f.poll.get.asScala + } else { + null + } + } + + /** + * Extract the success value of the given future. If the state machine detects a failure it may + * complete the async block and return `this` as a sentinel value to indicate that the caller + * (the state machine dispatch loop) should immediately exit. + */ + protected def tryGet(tr: R): AnyRef = + tr match { + case Success(value) => + value.asInstanceOf[AnyRef] + case Failure(throwable) => + completeFailure(throwable) + this // sentinel value to indicate the dispatch loop should exit. + } + + def start[T](): Future[T] = { + // This cast is safe because we know that `def apply` does not consult its argument when `state == 0`. + Future.Unit.asInstanceOf[Future[AnyRef]].respond(t => this(t.asScala)) + result$async.asInstanceOf[Future[T]] + } +} diff --git a/util-core/src/test/scala/com/twitter/util/AsyncTest.scala b/util-core/src/test/scala/com/twitter/util/AsyncTest.scala new file mode 100644 index 0000000000..d3388ac784 --- /dev/null +++ b/util-core/src/test/scala/com/twitter/util/AsyncTest.scala @@ -0,0 +1,19 @@ +package com.twitter.util + +import org.scalatest.WordSpec +import com.twitter.util.Async.{async, await} + +class AsyncTest extends WordSpec { + + "async/await" should { + "work" in { + val c = async { + val a = Future.value(10) + val b = Future.value(5) + await(a) + await(b) + } + assert(c.poll == Some(Return(15))) + } + } + +}