From f8f6395433714c8cdd1d425692dc177ae62dc053 Mon Sep 17 00:00:00 2001 From: Georgios Andrianakis Date: Fri, 5 Jul 2024 14:25:27 +0300 Subject: [PATCH] Cancel suspend functions in Quarkus REST when the connection is closed Closes: #41705 --- .../kotlin/CoroutineInvocationHandler.kt | 59 ++++++++++++++----- 1 file changed, 43 insertions(+), 16 deletions(-) diff --git a/extensions/resteasy-reactive/rest-kotlin/runtime/src/main/kotlin/org/jboss/resteasy/reactive/server/runtime/kotlin/CoroutineInvocationHandler.kt b/extensions/resteasy-reactive/rest-kotlin/runtime/src/main/kotlin/org/jboss/resteasy/reactive/server/runtime/kotlin/CoroutineInvocationHandler.kt index 61027d0a212353..fa7fc5a8ec64a3 100644 --- a/extensions/resteasy-reactive/rest-kotlin/runtime/src/main/kotlin/org/jboss/resteasy/reactive/server/runtime/kotlin/CoroutineInvocationHandler.kt +++ b/extensions/resteasy-reactive/rest-kotlin/runtime/src/main/kotlin/org/jboss/resteasy/reactive/server/runtime/kotlin/CoroutineInvocationHandler.kt @@ -1,6 +1,7 @@ package org.jboss.resteasy.reactive.server.runtime.kotlin import io.vertx.core.Vertx +import java.util.concurrent.atomic.AtomicBoolean import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch @@ -35,24 +36,50 @@ class CoroutineInvocationHandler( logger.trace("Handling request with dispatcher {}", dispatcher) requestContext.suspend() - coroutineScope.launch(context = dispatcher) { - // ensure the proper CL is not lost in dev-mode - Thread.currentThread().contextClassLoader = originalTCCL - try { - val result = - invoker.invokeCoroutine( - requestContext.endpointInstance, - requestContext.parameters - ) - if (result != Unit) { - requestContext.result = result + val done = AtomicBoolean() + var canceled = AtomicBoolean() + + val job = + coroutineScope.launch(context = dispatcher) { + // ensure the proper CL is not lost in dev-mode + Thread.currentThread().contextClassLoader = originalTCCL + try { + val result = + invoker.invokeCoroutine( + requestContext.endpointInstance, + requestContext.parameters + ) + done.set(true) + if (result != Unit) { + requestContext.result = result + } + requestContext.resume() + } catch (t: Throwable) { + done.set(true) + + if (canceled.get()) { + try { + // get rid of everything related to the request since the connection has + // already gone away + requestContext.close() + } catch (ignored: Exception) {} + } else { + // passing true since the target doesn't change and we want response filters + // to + // be able to know what the resource method was + requestContext.handleException(t, true) + requestContext.resume() + } } - } catch (t: Throwable) { - // passing true since the target doesn't change and we want response filters to be - // able to know what the resource method was - requestContext.handleException(t, true) } - requestContext.resume() + + requestContext.serverResponse().addCloseHandler { + if (!done.get()) { + try { + canceled.set(true) + job.cancel(null) + } catch (ignored: Exception) {} + } } } }