Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust Async cancellation to CE 3.5 #2897

Merged
merged 2 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,22 @@ object ArmeriaCatsServerOptions {
})

private def blocking[F[_], T](body: => T)(implicit F: Async[F]): F[T] = {
F.async_ { cb =>
CommonPools
.blockingTaskExecutor()
.execute(() => {
try {
cb(Right(body))
} catch {
case NonFatal(ex) =>
cb(Left(ex))
}
})
F.async { cb =>
F.delay {
val javaFuture = CommonPools
.blockingTaskExecutor()
.submit(new Runnable {
override def run(): Unit = {
try {
cb(Right(body))
} catch {
case NonFatal(ex) =>
cb(Left(ex))
}
}
})
Some(F.void(F.delay { javaFuture.cancel(true) }))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we wait for the future, but then wait for its result, to properly implement cancellation integration?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean call the cancel but still block until the future completes? I think the finalizer should finish as soon as it finishes its logic. Take a look at this example from The Async documentation:

def fromCompletableFuture[A](fut: F[CompletableFuture[A]]): F[A] =
  flatMap(fut) { cf =>
    async[A] { cb =>
      delay {
        //Invoke the callback with the result of the completable future
        val stage = cf.handle[Unit] {
          case (a, null) => cb(Right(a))
          case (_, e) => cb(Left(e))
        }

        //Cancel the completable future if the fiber is canceled
        Some(void(delay(stage.cancel(false))))
      }
    }
  }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that the idea in cats-effect is for the finalizers to block until the effect finishes (after cancellation or not). But the example indeed suggests otherwise.

... and reading https://github.com/typelevel/cats-effect/releases/tag/v3.5.0 your code is correct :)

}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package sttp.tapir.server.armeria.cats

import cats.effect.Async
import cats.effect.{Async, Sync}
import cats.effect.std.Dispatcher
import cats.syntax.all._
import com.linecorp.armeria.common.{HttpData, HttpRequest, HttpResponse}
import com.linecorp.armeria.server.ServiceRequestContext
import fs2.interop.reactivestreams._
Expand All @@ -14,7 +15,7 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import sttp.capabilities.fs2.Fs2Streams
import sttp.monad.MonadAsyncError
import sttp.monad.syntax._

import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.armeria._
import sttp.tapir.server.interceptor.reject.RejectInterceptor
Expand Down Expand Up @@ -91,13 +92,7 @@ private object Fs2StreamCompatible {

private class CatsFutureConversion[F[_]: Async](dispatcher: Dispatcher[F])(implicit ec: ExecutionContext) extends FutureConversion[F] {
override def from[A](f: => Future[A]): F[A] = {
Async[F].async_ { cb =>
f.onComplete {
case Failure(exception) => cb(Left(exception))
case Success(value) => cb(Right(value))
}
()
}
Async[F].fromFutureCancelable(Sync[F].delay((f, ().pure[F])))
}

override def to[A](f: => F[A]): Future[A] = dispatcher.unsafeToFuture(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,15 @@ object FinatraCatsServerInterpreter {
private def convertResponder[F[_]: Async, B](original: Responder[Future, B]): Responder[F, B] =
new Responder[F, B] {
override def apply[O](request: ServerRequest, output: ValuedEndpointOutput[O]): F[ServerResponse[B]] =
original(request, output).asF
fromFuture(original(request, output))
}

private def convertInterceptor[F[_]: Async: Dispatcher: MonadError](original: Interceptor[F]): Interceptor[Future] = {
val fToFuture = new (F ~> Future) {
override def apply[A](f: F[A]): Future[A] = f.asTwitterFuture
}
val futureToF = new (Future ~> F) {
override def apply[A](future: Future[A]): F[A] = future.asF
override def apply[A](future: Future[A]): F[A] = fromFuture(future)
}

def convertRequestInterceptor(interceptor: RequestInterceptor[F]): RequestInterceptor[Future] = new RequestInterceptor[Future] {
Expand All @@ -166,12 +166,14 @@ object FinatraCatsServerInterpreter {
override def apply(request: ServerRequest, endpoints: List[ServerEndpoint[R, F]])(implicit
monad: MonadError[F]
): F[RequestResult[B]] =
original(convertEndpointInterceptor(interceptorF))(
request,
endpoints.map(convertEndpoint[F, Future, R](_, fToFuture)(monad))
)(
FutureMonadError
).asF
fromFuture(
original(convertEndpointInterceptor(interceptorF))(
request,
endpoints.map(convertEndpoint[F, Future, R](_, fToFuture)(monad))
)(
FutureMonadError
)
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@ object FinatraCatsServerOptions extends Logging {
/** Allows customising the interceptors used by the server interpreter. */
def customiseInterceptors[F[_]: Async](dispatcher: Dispatcher[F]): CustomiseInterceptors[F, FinatraCatsServerOptions[F]] = {
def finatraCatsServerLog(finatraServerLog: DefaultServerLog[Future]): DefaultServerLog[F] = DefaultServerLog[F](
doLogWhenReceived = m => finatraServerLog.doLogWhenReceived(m).asF,
doLogWhenHandled = (m, e) => finatraServerLog.doLogWhenHandled(m, e).asF,
doLogAllDecodeFailures = (m, e) => finatraServerLog.doLogAllDecodeFailures(m, e).asF,
doLogExceptions = (m, e) => finatraServerLog.doLogExceptions(m, e).asF,
noLog = finatraServerLog.noLog.asF
doLogWhenReceived = m => fromFuture(finatraServerLog.doLogWhenReceived(m)),
doLogWhenHandled = (m, e) => fromFuture(finatraServerLog.doLogWhenHandled(m, e)),
doLogAllDecodeFailures = (m, e) => fromFuture(finatraServerLog.doLogAllDecodeFailures(m, e)),
doLogExceptions = (m, e) => fromFuture(finatraServerLog.doLogExceptions(m, e)),
noLog = fromFuture(finatraServerLog.noLog)
)

CustomiseInterceptors(
createOptions = (ci: CustomiseInterceptors[F, FinatraCatsServerOptions[F]]) =>
FinatraCatsServerOptions[F](
dispatcher,
FinatraServerOptions.defaultCreateFile(FinatraServerOptions.futurePool)(_).asF,
FinatraServerOptions.defaultDeleteFile(FinatraServerOptions.futurePool)(_).asF,
bs => fromFuture(FinatraServerOptions.defaultCreateFile(FinatraServerOptions.futurePool)(bs)),
file => fromFuture(FinatraServerOptions.defaultDeleteFile(FinatraServerOptions.futurePool)(file)),
ci.interceptors
)
).serverLog(finatraCatsServerLog(FinatraServerOptions.defaultServerLog)).rejectHandler(None)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sttp.tapir.server.finatra.cats

import cats.effect.Async
import cats.effect.{Async, Sync}
import cats.syntax.all._
import cats.effect.std.Dispatcher
import com.twitter.util.{Future, Promise}

Expand All @@ -25,9 +26,10 @@ object conversions {

/** Convert from a Twitter Future to some F with Async capabilities. Based on https://typelevel.org/cats-effect/docs/typeclasses/async
*/
private[cats] implicit class RichTwitterFuture[A](val f: Future[A]) {
def asF[F[_]: Async]: F[A] = Async[F].async_ { cb =>
f.onSuccess(f => cb(Right(f))).onFailure(e => cb(Left(e)))
private[cats] def fromFuture[F[_]: Async, A](f: => Future[A]): F[A] = Async[F].async { cb =>
Sync[F].delay {
val fut = f.onSuccess(f => cb(Right(f))).onFailure(e => cb(Left(e)))
Some(Sync[F].delay(fut.raise(new InterruptedException("Fiber canceled"))).void)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aren't twitter futures cancellable? (it's a distinct operation from raise)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it was called cancel() in older versions, but now it's this raise method https://github.com/twitter/util#future-interrupts

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok thanks :)

}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,17 @@ object VertxCatsServerInterpreter {

implicit class VertxFutureToCatsF[A](f: => Future[A]) {
def asF[F[_]: Async]: F[A] = {
Async[F].async_ { cb =>
f.onComplete({ handler =>
if (handler.succeeded()) {
cb(Right(handler.result()))
} else {
cb(Left(handler.cause()))
}
})
()
Async[F].async { cb =>
Sync[F].delay {
f.onComplete({ handler =>
if (handler.succeeded()) {
cb(Right(handler.result()))
} else {
cb(Left(handler.cause()))
}
})
Some(().pure[F])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this fake cancellation logic - shouldn't we wait for f to complete?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to wait for f to complete then we should return None here, which basically means leaving async_ as before. The difference is that:

  • in older CE (<3.5) canceling such a fiber would not wait for the future and finish immediately (leaving a leaked future)
  • in CE 3.5 this IO is uncancellable, leading to potential deadlocks

If there's no way to cancel underlying future, we need to choose between keeping the previous behavior (canceling the outer fiber and leaving a leaked future) and agreeing to the new uncancellable behavior. I chose the first solution to keep consistent with previous behavior.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, well I guess if we can't do anything, we can't :)

}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ class VertxTestServerInterpreter(vertx: Vertx)

object VertxTestServerInterpreter {
def vertxFutureToIo[A](future: => VFuture[A]): IO[A] =
IO.async_[A] { cb =>
future
.onFailure { cause => cb(Left(cause)) }
.onSuccess { result => cb(Right(result)) }
()
IO.async[A] { cb =>
IO {
future
.onFailure { cause => cb(Left(cause)) }
.onSuccess { result => cb(Right(result)) }
Some(IO.unit)
}
}
}