Skip to content

Commit

Permalink
Merge branch 'series/3.x' into topic/thread-local-iolocal
Browse files Browse the repository at this point in the history
  • Loading branch information
armanbilge authored Nov 21, 2024
2 parents af84973 + ccae9c7 commit 1adf368
Show file tree
Hide file tree
Showing 21 changed files with 416 additions and 96 deletions.
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@

## Getting Started

- Wired: **3.5.5**
- Wired: **3.5.6**
- Tired: **2.5.5** (end of life)

```scala
libraryDependencies += "org.typelevel" %% "cats-effect" % "3.5.5"
libraryDependencies += "org.typelevel" %% "cats-effect" % "3.5.6"
```

The above represents the core, stable dependency which brings in the entirety of Cats Effect. This is *most likely* what you want. All current Cats Effect releases are published for Scala 2.12, 2.13, 3.2, and Scala.js 1.13.
Expand All @@ -30,22 +30,22 @@ Depending on your use-case, you may want to consider one of the several other mo

```scala
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-effect-kernel" % "3.5.5",
"org.typelevel" %% "cats-effect-laws" % "3.5.5" % Test)
"org.typelevel" %% "cats-effect-kernel" % "3.5.6",
"org.typelevel" %% "cats-effect-laws" % "3.5.6" % Test)
```

If you're a middleware framework (like [Fs2](https://fs2.io/)), you probably want to depend on **std**, which gives you access to `Queue`, `Semaphore`, and much more without introducing a hard-dependency on `IO` outside of your tests:

```scala
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-effect-std" % "3.5.5",
"org.typelevel" %% "cats-effect" % "3.5.5" % Test)
"org.typelevel" %% "cats-effect-std" % "3.5.6",
"org.typelevel" %% "cats-effect" % "3.5.6" % Test)
```

You may also find some utility in the **testkit** and **kernel-testkit** projects, which contain `TestContext`, generators for `IO`, and a few other things:

```scala
libraryDependencies += "org.typelevel" %% "cats-effect-testkit" % "3.5.5" % Test
libraryDependencies += "org.typelevel" %% "cats-effect-testkit" % "3.5.6" % Test
```

Cats Effect provides backward binary compatibility within the 2.x and 3.x version lines, and both forward and backward compatibility within any major/minor line. This is analogous to the versioning scheme used by Cats itself, as well as other major projects such as Scala.js. Thus, any project depending upon Cats Effect 2.2.1 can be used with libraries compiled against Cats Effect 2.0.0 or 2.2.3, but *not* with libraries compiled against 2.3.0 or higher.
Expand Down
84 changes: 50 additions & 34 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import cats.data.Ior
import cats.effect.instances.spawn
import cats.effect.kernel.CancelScope
import cats.effect.kernel.GenTemporal.handleDuration
import cats.effect.std.{Backpressure, Console, Env, Supervisor, UUIDGen}
import cats.effect.std.{Backpressure, Console, Env, Supervisor, SystemProperties, UUIDGen}
import cats.effect.tracing.{Tracing, TracingEvent}
import cats.effect.unsafe.IORuntime
import cats.syntax._
Expand Down Expand Up @@ -786,17 +786,27 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
andWait(duration: Duration)

/**
* Returns an IO that either completes with the result of the source within the specified time
* `duration` or otherwise raises a `TimeoutException`.
* Returns an IO that either completes with the result of the source or otherwise raises a
* `TimeoutException`.
*
* The source is canceled in the event that it takes longer than the specified time duration
* to complete. Once the source has been successfully canceled (and has completed its
* finalizers), the `TimeoutException` will be raised. If the source is uncancelable, the
* resulting effect will wait for it to complete before raising the exception.
* The source is raced against the timeout `duration`, and its cancelation is triggered if the
* source doesn't complete within the specified time. The resulting effect will always wait
* for the source effect to complete (and to complete its finalizers), and will return the
* source's outcome over raising a `TimeoutException`.
*
* In case source and timeout complete simultaneously, the result of the source will be
* returned over raising a `TimeoutException`.
*
* If the source effect is uncancelable, a `TimeoutException` will never be raised.
*
* @param duration
* is the time span for which we wait for the source to complete; in the event that the
* specified time has passed without the source completing, a `TimeoutException` is raised
* is the time span for which we wait for the source to complete before triggering its
* cancelation; in the event that the specified time has passed without the source
* completing, a `TimeoutException` is raised
*
* @see
* [[timeoutAndForget]] for a variant which does not wait for cancelation of the source
* effect to complete.
*/
def timeout[A2 >: A](duration: Duration): IO[A2] =
handleDuration(duration, this) { finiteDuration =>
Expand All @@ -809,26 +819,35 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
timeout(duration: Duration)

/**
* Returns an IO that either completes with the result of the source within the specified time
* `duration` or otherwise evaluates the `fallback`.
* Returns an IO that either completes with the result of the source or otherwise evaluates
* the `fallback`.
*
* The source is canceled in the event that it takes longer than the specified time duration
* to complete. Once the source has been successfully canceled (and has completed its
* finalizers), the fallback will be sequenced. If the source is uncancelable, the resulting
* effect will wait for it to complete before evaluating the fallback.
* The source is raised against the timeout `duration`, and its cancelation is triggered if
* the source doesn't complete within the specified time. The resulting effect will always
* wait for the source effect to complete (and to complete its finalizers), and will return
* the source's outcome over sequencing the `fallback`.
*
* In case source and timeout complete simultaneously, the result of the source will be
* returned over sequencing the `fallback`.
*
* If the source in uncancelable, `fallback` will never be evaluated.
*
* @param duration
* is the time span for which we wait for the source to complete; in the event that the
* specified time has passed without the source completing, the `fallback` gets evaluated
* is the time span for which we wait for the source to complete before triggering its
* cancelation; in the event that the specified time has passed without the source
* completing, the `fallback` gets evaluated
*
* @param fallback
* is the task evaluated after the duration has passed and the source canceled
*/
def timeoutTo[A2 >: A](duration: Duration, fallback: IO[A2]): IO[A2] = {
handleDuration[IO[A2]](duration, this) { finiteDuration =>
race(IO.sleep(finiteDuration)).flatMap {
case Right(_) => fallback
case Left(value) => IO.pure(value)
IO.uncancelable { poll =>
poll(racePair(IO.sleep(finiteDuration))) flatMap {
case Left((oc, f)) => f.cancel *> oc.embed(poll(IO.canceled) *> IO.never)
case Right((f, _)) =>
f.cancel *> f.join.flatMap { oc => oc.fold(fallback, IO.raiseError, identity) }
}
}
}
}
Expand Down Expand Up @@ -1098,21 +1117,16 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
val fiber = new IOFiber[A](
if (IOFiberConstants.ioLocalPropagation) IOLocal.getThreadLocalState()
else IOLocalState.empty,
oc =>
{ oc =>
if (registerCallback) {
runtime.fiberErrorCbs.remove(failure)
}
oc.fold(
{
runtime.fiberErrorCbs.remove(failure)
canceled
},
{ t =>
runtime.fiberErrorCbs.remove(failure)
failure(t)
},
{ ioa =>
runtime.fiberErrorCbs.remove(failure)
success(ioa.asInstanceOf[IO.Pure[A]].value)
}
),
canceled,
failure,
{ ioa => success(ioa.asInstanceOf[IO.Pure[A]].value) }
)
},
this,
runtime.compute,
runtime
Expand Down Expand Up @@ -2164,6 +2178,8 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits with TuplePara

implicit val envForIO: Env[IO] = Env.make

implicit val systemPropertiesForIO: SystemProperties[IO] = SystemProperties.make

// This is cached as a val to save allocations, but it uses ops from the Async
// instance which is also cached as a val, and therefore needs to appear
// later in the file
Expand Down
2 changes: 1 addition & 1 deletion docs/core/native-image.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ ThisBuild / scalaVersion := "2.13.8"

lazy val root = (project in file(".")).enablePlugins(NativeImagePlugin).settings(
name := "cats-effect-3-hello-world",
libraryDependencies += "org.typelevel" %% "cats-effect" % "3.5.5",
libraryDependencies += "org.typelevel" %% "cats-effect" % "3.5.6",
Compile / mainClass := Some("com.example.Main"),
nativeImageOptions += "--no-fallback",
nativeImageVersion := "22.1.0" // It should be at least version 21.0.0
Expand Down
2 changes: 1 addition & 1 deletion docs/core/scala-native.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ lazy val root = project.in(file("."))
.enablePlugins(ScalaNativePlugin)
.settings(
name := "cats-effect-3-hello-world",
libraryDependencies += "org.typelevel" %%% "cats-effect" % "3.5.5",
libraryDependencies += "org.typelevel" %%% "cats-effect" % "3.5.6",
Compile / mainClass := Some("com.example.Main")
)

Expand Down
2 changes: 1 addition & 1 deletion docs/core/test-runtime.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ For those migrating code from Cats Effect 2, `TestControl` is a considerably mor
In order to use `TestControl`, you will need to bring in the **cats-effect-testkit** dependency:

```scala
libraryDependencies += "org.typelevel" %% "cats-effect-testkit" % "3.5.5" % Test
libraryDependencies += "org.typelevel" %% "cats-effect-testkit" % "3.5.6" % Test
```

## Example
Expand Down
2 changes: 1 addition & 1 deletion docs/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ title: FAQ

```scala-cli
//> using scala "2.13.8"
//> using dep "org.typelevel::cats-effect::3.5.5"
//> using dep "org.typelevel::cats-effect::3.5.6"
import cats.effect._
Expand Down
4 changes: 2 additions & 2 deletions docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ title: Getting Started
Add the following to your **build.sbt**:

```scala
libraryDependencies += "org.typelevel" %% "cats-effect" % "3.5.5"
libraryDependencies += "org.typelevel" %% "cats-effect" % "3.5.6"
```

Naturally, if you're using ScalaJS, you should replace the double `%%` with a triple `%%%`. If you're on Scala 2, it is *highly* recommended that you enable the [better-monadic-for](https://github.com/oleg-py/better-monadic-for) plugin, which fixes a number of surprising elements of the `for`-comprehension syntax in the Scala language:
Expand Down Expand Up @@ -68,7 +68,7 @@ We will learn more about constructs like `start` and `*>` in later pages, but fo
Of course, the easiest way to play with Cats Effect is to try it out in a Scala REPL. We recommend using [Ammonite](https://ammonite.io/#Ammonite-REPL) for this kind of thing. To get started, run the following lines (if not using Ammonite, skip the first line and make sure that Cats Effect and its dependencies are correctly configured on the classpath):

```scala
import $ivy.`org.typelevel::cats-effect:3.5.5`
import $ivy.`org.typelevel::cats-effect:3.5.6`

import cats.effect.unsafe.implicits._
import cats.effect.IO
Expand Down
8 changes: 4 additions & 4 deletions docs/migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ Cats Effect 3 splits the code dependency into multiple modules. If you were prev
The current non-test modules are:

```scala
"org.typelevel" %% "cats-effect-kernel" % "3.5.5",
"org.typelevel" %% "cats-effect-std" % "3.5.5",
"org.typelevel" %% "cats-effect" % "3.5.5",
"org.typelevel" %% "cats-effect-kernel" % "3.5.6",
"org.typelevel" %% "cats-effect-std" % "3.5.6",
"org.typelevel" %% "cats-effect" % "3.5.6",
```

- `kernel` - type class definitions, simple concurrency primitives
Expand All @@ -96,7 +96,7 @@ The current non-test modules are:
libraryDependencies ++= Seq(
//...
- "org.typelevel" %% "cats-effect" % "2.4.0",
+ "org.typelevel" %% "cats-effect" % "3.5.5",
+ "org.typelevel" %% "cats-effect" % "3.5.6",
//...
)
```
Expand Down
2 changes: 1 addition & 1 deletion docs/std/mapref.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ as long as their keys belong to different shards.
This is probably one of the most common uses of this datatype.

```scala mdoc:reset:silent
//> using lib "org.typelevel::cats-effect::3.5.5"
//> using lib "org.typelevel::cats-effect::3.5.6"

import cats.effect.IO
import cats.effect.std.MapRef
Expand Down
2 changes: 1 addition & 1 deletion docs/std/ref.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ This is probably one of the most common uses of this concurrency primitive.
In this example, the workers will concurrently run and update the value of the `Ref`.

```scala mdoc:reset:silent
//> using lib "org.typelevel::cats-effect::3.5.5"
//> using lib "org.typelevel::cats-effect::3.5.6"

import cats.effect.{IO, IOApp, Sync}
import cats.effect.kernel.Ref
Expand Down
4 changes: 2 additions & 2 deletions docs/tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ running the code snippets in this tutorial, it is recommended to use the same
```scala
name := "cats-effect-tutorial"

version := "3.5.5"
version := "3.5.6"

scalaVersion := "2.13.13"

libraryDependencies += "org.typelevel" %% "cats-effect" % "3.5.5" withSources() withJavadoc()
libraryDependencies += "org.typelevel" %% "cats-effect" % "3.5.6" withSources() withJavadoc()

scalacOptions ++= Seq(
"-feature",
Expand Down
2 changes: 1 addition & 1 deletion docs/typeclasses/spawn.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ In English, the semantics of this are as follows:

- If the child fiber completed successfully, produce its result
- If it errored, re-raise the error within the current fiber
- If it canceled, attempt to self-cancel, and if the self-cancelation fails, **deadlock**
- If it canceled, the caller is indefinitely suspended without termination (a.k.a. **deadlock**)

Sometimes this is an appropriate semantic, and the cautiously-verbose `joinWithNever` function implements it for you. It is worth noting that this semantic was the *default* in Cats Effect 2 (and in fact, no other semantic was possible).

Expand Down
70 changes: 48 additions & 22 deletions kernel/shared/src/main/scala/cats/effect/kernel/GenTemporal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,23 @@ trait GenTemporal[F[_], E] extends GenConcurrent[F, E] with Clock[F] {
productL(fa)(sleep(time))

/**
* Returns an effect that either completes with the result of the source within the specified
* time `duration` or otherwise evaluates the `fallback`.
* Returns an effect that either completes with the result of the source or otherwise
* evaluates the `fallback`.
*
* The source is canceled in the event that it takes longer than the specified time duration
* to complete. Once the source has been successfully canceled (and has completed its
* finalizers), the fallback will be sequenced. If the source is uncancelable, the resulting
* effect will wait for it to complete before evaluating the fallback.
* The source is raised against the timeout `duration`, and its cancelation is triggered if
* the source doesn't complete within the specified time. The resulting effect will always
* wait for the source effect to complete (and to complete its finalizers), and will return
* the source's outcome over sequencing the `fallback`.
*
* In case source and timeout complete simultaneously, the result of the source will be
* returned over sequencing the `fallback`.
*
* If the source in uncancelable, `fallback` will never be evaluated.
*
* @param duration
* The time span for which we wait for the source to complete; in the event that the
* specified time has passed without the source completing, the `fallback` gets evaluated
* The time span for which we wait for the source to complete before triggering its
* cancelation; in the event that the specified time has passed without the source
* completing, the `fallback` gets evaluated
*
* @param fallback
* The task evaluated after the duration has passed and the source canceled
Expand All @@ -91,33 +97,53 @@ trait GenTemporal[F[_], E] extends GenConcurrent[F, E] with Clock[F] {
handleDuration(duration, fa)(timeoutTo(fa, _, fallback))

protected def timeoutTo[A](fa: F[A], duration: FiniteDuration, fallback: F[A]): F[A] =
flatMap(race(fa, sleep(duration))) {
case Left(a) => pure(a)
case Right(_) => fallback
uncancelable { poll =>
implicit val F: GenTemporal[F, E] = this

poll(racePair(fa, sleep(duration))) flatMap {
case Left((oc, f)) => f.cancel *> oc.embed(poll(F.canceled) *> F.never)
case Right((f, _)) => f.cancel *> f.join.flatMap { oc => oc.embed(fallback) }
}
}

/**
* Returns an effect that either completes with the result of the source within the specified
* time `duration` or otherwise raises a `TimeoutException`.
* Returns an effect that either completes with the result of the source or raises a
* `TimeoutException`.
*
* The source is canceled in the event that it takes longer than the specified time duration
* to complete. Once the source has been successfully canceled (and has completed its
* finalizers), the `TimeoutException` will be raised. If the source is uncancelable, the
* resulting effect will wait for it to complete before raising the exception.
* The source is raced against the timeout `duration`, and its cancelation is triggered if the
* source doesn't complete within the specified time. The resulting effect will always wait
* for the source effect to complete (and to complete its finalizers), and will return the
* source's outcome over raising a `TimeoutException`.
*
* In case source and timeout complete simultaneously, the result of the source will be
* returned over raising a `TimeoutException`.
*
* If the source effect is uncancelable, a `TimeoutException` will never be raised.
*
* @param duration
* The time span for which we wait for the source to complete; in the event that the
* specified time has passed without the source completing, a `TimeoutException` is raised
* The time span for which we wait for the source to complete before triggering its
* cancelation; in the event that the specified time has passed without the source
* completing, a `TimeoutException` is raised
* @see
* [[timeoutAndForget[A](fa:F[A],duration:scala\.concurrent\.duration\.Duration)* timeoutAndForget]]
* for a variant which does not wait for cancelation of the source effect to complete.
*/
def timeout[A](fa: F[A], duration: Duration)(implicit ev: TimeoutException <:< E): F[A] = {
handleDuration(duration, fa)(timeout(fa, _))
}

protected def timeout[A](fa: F[A], duration: FiniteDuration)(
implicit ev: TimeoutException <:< E): F[A] = {
flatMap(race(fa, sleep(duration))) {
case Left(a) => pure(a)
case Right(_) => raiseError[A](ev(new TimeoutException(duration.toString())))
uncancelable { poll =>
implicit val F: GenTemporal[F, E] = this

poll(racePair(fa, sleep(duration))) flatMap {
case Left((oc, f)) => f.cancel *> oc.embed(poll(F.canceled) *> F.never)
case Right((f, _)) =>
f.cancel *> f.join.flatMap { oc =>
oc.embed(raiseError[A](ev(new TimeoutException(duration.toString()))))
}
}
}
}

Expand Down
Loading

0 comments on commit 1adf368

Please sign in to comment.