From 3a53c7c480a0d036af4b40624dcfce99ad7fee87 Mon Sep 17 00:00:00 2001 From: anavarr Date: Thu, 6 Jul 2023 11:05:04 +0200 Subject: [PATCH] fix comments --- .../runtime/QuarkusWorkerPoolRegistry.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.java index e087422999e71..c997126c5f458 100644 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.java +++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.java @@ -84,8 +84,6 @@ public void run() { } }; } catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) { - System.err.println(e); - //quite ugly but works logger.warnf("You weren't able to create an executor that spawns virtual threads, the default" + " blocking executor will be used, please check that your JDK is compatible with " + "virtual threads"); @@ -114,8 +112,9 @@ public Uni executeWork(Context currentContext, Uni uni, String workerN return currentContext.executeBlocking(Uni.createFrom().deferred(() -> uni), ordered); } return executionHolder.vertx().executeBlocking(uni, ordered); - } else if (virtualThreadConcurrency.containsKey(workerName)) { - Semaphore semaphore = virtualThreadConcurrency.get(workerName); + } + Semaphore semaphore = virtualThreadConcurrency.get(workerName); + if (semaphore != null) { boolean acquired = semaphore.tryAcquire(); if (acquired) { return runOnVirtualThread(currentContext, uni, semaphore); @@ -151,6 +150,7 @@ public Uni executeWork(Context currentContext, Uni uni, String workerN private Uni runOnVirtualThread(Context currentContext, Uni uni, Semaphore semaphore) { return uni.runSubscriptionOn(command -> VIRTUAL_EXECUTOR_SUPPLIER.get().execute(command)) + .onCancellation().invoke(semaphore::release) .onItemOrFailure().transformToUni((item, failure) -> { semaphore.release(); return Uni.createFrom().emitter(emitter -> {