Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use compiler integrated async phase under -Xasync #237

Merged
merged 1 commit into from
Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import: scala/scala-dev:travis/default.yml
language: scala

scala:
- 2.12.11
- 2.13.2
- 2.12.12
- 2.13.3

env:
- ADOPTOPENJDK=8
Expand Down
126 changes: 64 additions & 62 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
# scala-async [![Build Status](https://travis-ci.org/scala/scala-async.svg?branch=master)](https://travis-ci.org/scala/scala-async) [<img src="https://img.shields.io/maven-central/v/org.scala-lang.modules/scala-async_2.12.svg?label=latest%20release%20for%202.12">](http://search.maven.org/#search%7Cga%7C1%7Cg%3Aorg.scala-lang.modules%20a%3Ascala-async_2.12) [<img src="https://img.shields.io/maven-central/v/org.scala-lang.modules/scala-async_2.13.svg?label=latest%20release%20for%202.13">](http://search.maven.org/#search%7Cga%7C1%7Cg%3Aorg.scala-lang.modules%20a%3Ascala-async_2.13)

## Supported Scala versions

This branch (version series 0.10.x) targets Scala 2.12 and 2.13. `scala-async` is no longer maintained for older versions.
A DSL to enable a direct style of programming with when composing values wrapped in Scala `Future`s.

## Quick start

To include scala-async in an existing project use the library published on Maven Central.
For sbt projects add the following to your build definition - build.sbt or project/Build.scala:

### Use a modern Scala compiler

As of scala-async 1.0, Scala 2.12.12+ or 2.13.3+ are required.

### Add dependency

#### SBT Example

```scala
libraryDependencies += "org.scala-lang.modules" %% "scala-async" % "0.10.0"
retronym marked this conversation as resolved.
Show resolved Hide resolved
libraryDependencies += "org.scala-lang" % "scala-reflect" % scalaVersion.value % Provided
Expand All @@ -17,28 +23,58 @@ libraryDependencies += "org.scala-lang" % "scala-reflect" % scalaVersion.value %
For Maven projects add the following to your <dependencies> (make sure to use the correct Scala version suffix
to match your project’s Scala binary version):

#### Maven Example

```scala
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-async_2.12</artifactId>
<version>0.10.0</version>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-async_2.13</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.12.11</version>
<scope>provided</scope>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.13.3</version>
<scope>provided</scope>
</dependency>
```

After adding scala-async to your classpath, write your first `async` block:
### Enable compiler support for `async`

Add the `-Xasync` to the Scala compiler options.

#### SBT Example
```scala
scalaOptions += "-Xasync"
```

#### Maven Example

```xml
<project>
...
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.4.0</version>
<configuration>
<args>
<arg>-Xasync</arg>
</args>
</configuration>
</plugin>
...
</project>
```

### Start coding

```scala
import scala.concurrent.ExecutionContext.Implicits.global
import scala.async.Async.{async, await}

val future = async {
val f1 = async { ...; true }
val f1: Future[Boolean] = async { ...; true }
val f2 = async { ...; 42 }
if (await(f1)) await(f2) else 0
}
Expand Down Expand Up @@ -93,6 +129,22 @@ def combined: Future[Int] = async {
}
```

## Limitations

### `await` must be directly in the control flow of the async expression

The `await` cannot be nested under a local method, object, class or lambda:

```
async {
List(1).foreach { x => await(f(x) } // invali
retronym marked this conversation as resolved.
Show resolved Hide resolved
}
```

### `await` must be not be nested within `try` / `catch` / `finally`.

This implementation restriction may be lifted in future versions.

## Comparison with direct use of `Future` API

This computation could also be expressed by directly using the
Expand All @@ -119,53 +171,3 @@ The `async` approach has two advantages over the use of
required at each generator (`<-`) in the for-comprehension.
This reduces the size of generated code, and can avoid boxing
of intermediate results.

## Comparison with CPS plugin

The existing continuations (CPS) plugin for Scala can also be used
to provide a syntactic layer like `async`. This approach has been
used in Akka's [Dataflow Concurrency](http://doc.akka.io/docs/akka/2.3-M1/scala/dataflow.html)
(now deprecated in favour of this library).

CPS-based rewriting of asynchronous code also produces a closure
for each suspension. It can also lead to type errors that are
difficult to understand.

## How it works

- The `async` macro analyses the block of code, looking for control
structures and locations of `await` calls. It then breaks the code
into 'chunks'. Each chunk contains a linear sequence of statements
that concludes with a branching decision, or with the registration
of a subsequent state handler as the continuation.
- Before this analysis and transformation, the program is normalized
into a form amenable to this manipulation. This is called the
"A Normal Form" (ANF), and roughly means that:
- `if` and `match` constructs are only used as statements;
they cannot be used as an expression.
- calls to `await` are not allowed in compound expressions.
- Identify vals, vars and defs that are accessed from multiple
states. These will be lifted out to fields in the state machine
object.
- Synthesize a class that holds:
- an integer representing the current state ID.
- the lifted definitions.
- an `apply(value: Try[Any]): Unit` method that will be
called on completion of each future. The behavior of
this method is determined by the current state. It records
the downcast result of the future in a field, and calls the
`resume()` method.
- the `resume(): Unit` method that switches on the current state
and runs the users code for one 'chunk', and either:
a) registers the state machine as the handler for the next future
b) completes the result Promise of the `async` block, if at the terminal state.
- an `apply(): Unit` method that starts the computation.

SethTisue marked this conversation as resolved.
Show resolved Hide resolved
## Limitations

- See the [neg](https://github.com/scala/async/tree/master/src/test/scala/scala/async/neg) test cases
for constructs that are not allowed in an `async` block.
- See the [issue list](https://github.com/scala/async/issues?state=open) for which of these restrictions are planned
to be dropped in the future.
- See [#32](https://github.com/scala/async/issues/32) for why `await` is not possible in closures, and for suggestions on
ways to structure the code to work around this limitation.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ ScalaModulePlugin.scalaModuleOsgiSettings
name := "scala-async"

libraryDependencies += "org.scala-lang" % "scala-reflect" % scalaVersion.value % "provided"
libraryDependencies += "org.scala-lang" % "scala-compiler" % scalaVersion.value % "test" // for ToolBox
libraryDependencies += "junit" % "junit" % "4.12" % "test"
libraryDependencies += "com.novocode" % "junit-interface" % "0.11" % "test"

ScalaModulePlugin.enableOptimizer
testOptions += Tests.Argument(TestFrameworks.JUnit, "-q", "-v", "-s")
scalacOptions in Test ++= Seq("-Yrangepos")
scalacOptions ++= List("-deprecation" , "-Xasync")

parallelExecution in Global := false

Expand Down
Empty file.
35 changes: 32 additions & 3 deletions src/main/scala/scala/async/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
package scala.async

import scala.language.experimental.macros
import scala.concurrent.{Future, ExecutionContext}
import scala.concurrent.{ExecutionContext, Future}
import scala.annotation.compileTimeOnly
import scala.reflect.macros.whitebox

/**
* Async blocks provide a direct means to work with [[scala.concurrent.Future]].
Expand Down Expand Up @@ -50,14 +51,42 @@ object Async {
* Run the block of code `body` asynchronously. `body` may contain calls to `await` when the results of
* a `Future` are needed; this is translated into non-blocking code.
*/
def async[T](body: => T)(implicit execContext: ExecutionContext): Future[T] = macro internal.ScalaConcurrentAsync.asyncImpl[T]
def async[T](body: => T)(implicit execContext: ExecutionContext): Future[T] = macro asyncImpl[T]

/**
* Non-blocking await the on result of `awaitable`. This may only be used directly within an enclosing `async` block.
*
* Internally, this will register the remainder of the code in enclosing `async` block as a callback
* in the `onComplete` handler of `awaitable`, and will *not* block a thread.
*/
@compileTimeOnly("`await` must be enclosed in an `async` block")
@compileTimeOnly("[async] `await` must be enclosed in an `async` block")
def await[T](awaitable: Future[T]): T = ??? // No implementation here, as calls to this are translated to `onComplete` by the macro.

def asyncImpl[T: c.WeakTypeTag](c: whitebox.Context)
retronym marked this conversation as resolved.
Show resolved Hide resolved
(body: c.Tree)
(execContext: c.Tree): c.Tree = {
import c.universe._
if (!c.compilerSettings.contains("-Xasync")) {
c.abort(c.macroApplication.pos, "The async requires the compiler option -Xasync (supported only by Scala 2.12.12+ / 2.13.3+)")
} else try {
val awaitSym = typeOf[Async.type].decl(TermName("await"))
def mark(t: DefDef): Tree = {
import language.reflectiveCalls
c.internal.asInstanceOf[{
def markForAsyncTransform(owner: Symbol, method: DefDef, awaitSymbol: Symbol, config: Map[String, AnyRef]): DefDef
retronym marked this conversation as resolved.
Show resolved Hide resolved
}].markForAsyncTransform(c.internal.enclosingOwner, t, awaitSym, Map.empty)
}
val name = TypeName("stateMachine$async")
q"""
final class $name extends _root_.scala.async.FutureStateMachine(${execContext}) {
// FSM translated method
${mark(q"""override def apply(tr$$async: _root_.scala.util.Try[_root_.scala.AnyRef]) = ${body}""")}
}
new $name().start() : ${c.macroApplication.tpe}
"""
} catch {
case e: ReflectiveOperationException =>
c.abort(c.macroApplication.pos, "-Xasync is provided as a Scala compiler option, but the async macro is unable to call c.internal.markForAsyncTransform. " + e.getClass.getName + " " + e.getMessage)
}
}
}
80 changes: 80 additions & 0 deletions src/main/scala/scala/async/FutureStateMachine.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Scala (https://www.scala-lang.org)
*
* Copyright EPFL and Lightbend, Inc.
*
* Licensed under Apache License 2.0
* (http://www.apache.org/licenses/LICENSE-2.0).
*
* See the NOTICE file distributed with this work for
* additional information regarding copyright ownership.
*/
package scala.async

import java.util.Objects

import scala.util.{Failure, Success, Try}
import scala.concurrent.{ExecutionContext, Future, Promise}

/** The base class for state machines generated by the `scala.async.Async.async` macro.
* Not intended to be directly extended in user-written code.
*/
abstract class FutureStateMachine(execContext: ExecutionContext) extends Function1[Try[AnyRef], Unit] {
Objects.requireNonNull(execContext)

type F = scala.concurrent.Future[AnyRef]
type R = scala.util.Try[AnyRef]

private[this] val result$async: Promise[AnyRef] = Promise[AnyRef]();
private[this] var state$async: Int = 0

/** Retrieve the current value of the state variable */
protected def state: Int = state$async

/** Assign `i` to the state variable */
protected def state_=(s: Int): Unit = state$async = s

/** Complete the state machine with the given failure. */
// scala-async accidentally started catching NonFatal exceptions in:
// https://github.com/scala/scala-async/commit/e3ff0382ae4e015fc69da8335450718951714982#diff-136ab0b6ecaee5d240cd109e2b17ccb2R411
// This follows the new behaviour but should we fix the regression?
protected def completeFailure(t: Throwable): Unit = {
result$async.complete(Failure(t))
}

/** Complete the state machine with the given value. */
protected def completeSuccess(value: AnyRef): Unit = {
result$async.complete(Success(value))
}

/** Register the state machine as a completion callback of the given future. */
protected def onComplete(f: F): Unit = {
f.onComplete(this)(execContext)
}

/** Extract the result of the given future if it is complete, or `null` if it is incomplete. */
protected def getCompleted(f: F): Try[AnyRef] = {
if (f.isCompleted) {
f.value.get
} else null
}

/**
* Extract the success value of the given future. If the state machine detects a failure it may
* complete the async block and return `this` as a sentinel value to indicate that the caller
* (the state machine dispatch loop) should immediately exit.
*/
protected def tryGet(tr: R): AnyRef = tr match {
case Success(value) =>
value.asInstanceOf[AnyRef]
case Failure(throwable) =>
completeFailure(throwable)
this // sentinel value to indicate the dispatch loop should exit.
}

def start[T](): Future[T] = {
// This cast is safe because we know that `def apply` does not consult its argument when `state == 0`.
Future.unit.asInstanceOf[Future[AnyRef]].onComplete(this)(execContext)
result$async.future.asInstanceOf[Future[T]]
}
}
Loading