From 8da31fdc2688dd4e54709e04afcc32a5648889aa Mon Sep 17 00:00:00 2001 From: arnavarr Date: Tue, 28 Sep 2021 18:08:45 +0200 Subject: [PATCH] Add virtual-threads support in Quarkus - when an endpoint has the annotation @RunOnVirtualThread its blockingHandler will use a special Excutor spawing a new virtual threads for every request - added isRunOnVirtualThread method to check that @RunOnVirtualThread is possible (must be on a @Blocking endpoint, the user is warned if the jdk they use to compile is not loom-compliant) - an endpoint with the @Transactional annotation will override the @RunOnVirtualThread annotation -> arbitrary choice of caution (@Transactional might use thread locals for instance ?) - the loom-related pieces are called by reflective calls to avoid dependencies on java 19 (if the runtime is not compliant, the endpoint falls back to traditional workers) Add a new extension : netty-loom-adaptor that performs bytecode manipulation on netty's PooledByteBufAllocator to use a concurrentHashMap instead of threadLocals - when creating a DirectBuffer, we check if the current Thread is virtual or not - if it is, we get its carrier to check if the threadCaches hashMap has the carrier's name as a key - if so, we return the PoolThreadCache associated to the carrier - else, we create a new PoolThreadCache (wip) and return it - after that, we will use this cache instead of the PoolThreadLocalCache - added static fields to cache isVirtual() method (instead of using reflection to get it for every request) - added static fields to cache currentCarrierThread method (same goal as for isVirtualMethod field) - tested quarkus-loom with the application --> from 20 000 reqs during benchmark to ~ 160 000 requests during benchmark (9.5% quarkus perf to ~71% quarkus perf) - netty-loom-adaptor extension will do its magic only if @RunOnVirtualThread annotations are present in the combinedIndexBuildItem, else it won't do anything When using quarkus dev services with loom code, the user must specify -Dopen-lang-package. If they don't the reflective operations will throw exceptions --- .../main/java/io/quarkus/maven/DevMojo.java | 7 + .../netty-loom-adaptor/deployment/pom.xml | 52 ++ .../adaptor/NettyLoomAdaptorProcessor.java | 763 ++++++++++++++++++ extensions/netty-loom-adaptor/pom.xml | 25 + extensions/netty-loom-adaptor/runtime/pom.xml | 61 ++ extensions/pom.xml | 3 + .../vertx/web/deployment/DotNames.java | 2 + .../runtime/ResteasyReactiveRecorder.java | 56 ++ .../common/processor/BlockingDefault.java | 3 +- .../common/processor/EndpointIndexer.java | 69 +- .../processor/ResteasyReactiveDotNames.java | 2 + .../reactive/common/model/ResourceMethod.java | 11 + .../ResteasyReactiveDeploymentManager.java | 9 +- .../startup/RuntimeDeploymentManager.java | 4 + .../startup/RuntimeMappingDeployment.java | 4 +- .../startup/RuntimeResourceDeployment.java | 22 +- .../VirtualThreadNonBlockingHandler.java | 37 + .../server/mapping/RuntimeResource.java | 9 +- .../framework/ResteasyReactiveUnitTest.java | 15 + 19 files changed, 1144 insertions(+), 10 deletions(-) create mode 100644 extensions/netty-loom-adaptor/deployment/pom.xml create mode 100644 extensions/netty-loom-adaptor/deployment/src/main/java/io/quarkus/netty/loom/adaptor/NettyLoomAdaptorProcessor.java create mode 100644 extensions/netty-loom-adaptor/pom.xml create mode 100644 extensions/netty-loom-adaptor/runtime/pom.xml create mode 100644 independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/VirtualThreadNonBlockingHandler.java diff --git a/devtools/maven/src/main/java/io/quarkus/maven/DevMojo.java b/devtools/maven/src/main/java/io/quarkus/maven/DevMojo.java index 42c219939298a..db0e5d6ffda36 100644 --- a/devtools/maven/src/main/java/io/quarkus/maven/DevMojo.java +++ b/devtools/maven/src/main/java/io/quarkus/maven/DevMojo.java @@ -212,6 +212,8 @@ public class DevMojo extends AbstractMojo { @Parameter(defaultValue = "${debug}") private String debug; + @Parameter(defaultValue = "${open-lang-package}") + private boolean openJavaLang; /** * Whether or not the JVM launch, in debug mode, should be suspended. This parameter is only * relevant when the JVM is launched in {@link #debug debug mode}. This parameter supports the @@ -956,6 +958,11 @@ private QuarkusDevModeLauncher newLauncher() throws Exception { builder.jvmArgs("-Dio.quarkus.force-color-support=true"); } + if (openJavaLang) { + builder.jvmArgs("--add-opens"); + builder.jvmArgs("java.base/java.lang=ALL-UNNAMED"); + } + builder.projectDir(project.getFile().getParentFile()); builder.buildSystemProperties((Map) project.getProperties()); diff --git a/extensions/netty-loom-adaptor/deployment/pom.xml b/extensions/netty-loom-adaptor/deployment/pom.xml new file mode 100644 index 0000000000000..de1efc9e7fd96 --- /dev/null +++ b/extensions/netty-loom-adaptor/deployment/pom.xml @@ -0,0 +1,52 @@ + + + 4.0.0 + + + io.quarkus + quarkus-netty-loom-adaptor-parent + 999-SNAPSHOT + ../pom.xml + + + quarkus-netty-loom-adaptor-deployment + Quarkus - Netty Loom Adaptor - Deployment + + + + io.quarkus + quarkus-arc-deployment + + + io.quarkus + quarkus-netty-loom-adaptor + 999-SNAPSHOT + + + io.quarkus + quarkus-netty-deployment + + + io.quarkus + quarkus-core-deployment + + + + + + maven-compiler-plugin + + + + io.quarkus + quarkus-extension-processor + ${project.version} + + + + + + + diff --git a/extensions/netty-loom-adaptor/deployment/src/main/java/io/quarkus/netty/loom/adaptor/NettyLoomAdaptorProcessor.java b/extensions/netty-loom-adaptor/deployment/src/main/java/io/quarkus/netty/loom/adaptor/NettyLoomAdaptorProcessor.java new file mode 100644 index 0000000000000..8a4aca522db0b --- /dev/null +++ b/extensions/netty-loom-adaptor/deployment/src/main/java/io/quarkus/netty/loom/adaptor/NettyLoomAdaptorProcessor.java @@ -0,0 +1,763 @@ +package io.quarkus.netty.loom.adaptor; + +import static org.objectweb.asm.Opcodes.AALOAD; +import static org.objectweb.asm.Opcodes.ACC_PRIVATE; +import static org.objectweb.asm.Opcodes.ACC_STATIC; +import static org.objectweb.asm.Opcodes.ACONST_NULL; +import static org.objectweb.asm.Opcodes.ALOAD; +import static org.objectweb.asm.Opcodes.ANEWARRAY; +import static org.objectweb.asm.Opcodes.ARETURN; +import static org.objectweb.asm.Opcodes.ARRAYLENGTH; +import static org.objectweb.asm.Opcodes.ASM9; +import static org.objectweb.asm.Opcodes.ASTORE; +import static org.objectweb.asm.Opcodes.CHECKCAST; +import static org.objectweb.asm.Opcodes.DUP; +import static org.objectweb.asm.Opcodes.GETFIELD; +import static org.objectweb.asm.Opcodes.GETSTATIC; +import static org.objectweb.asm.Opcodes.GOTO; +import static org.objectweb.asm.Opcodes.ICONST_0; +import static org.objectweb.asm.Opcodes.ICONST_1; +import static org.objectweb.asm.Opcodes.IFEQ; +import static org.objectweb.asm.Opcodes.IFLE; +import static org.objectweb.asm.Opcodes.IFNE; +import static org.objectweb.asm.Opcodes.IFNONNULL; +import static org.objectweb.asm.Opcodes.IFNULL; +import static org.objectweb.asm.Opcodes.IF_ICMPGE; +import static org.objectweb.asm.Opcodes.ILOAD; +import static org.objectweb.asm.Opcodes.INVOKEINTERFACE; +import static org.objectweb.asm.Opcodes.INVOKESPECIAL; +import static org.objectweb.asm.Opcodes.INVOKESTATIC; +import static org.objectweb.asm.Opcodes.INVOKEVIRTUAL; +import static org.objectweb.asm.Opcodes.ISTORE; +import static org.objectweb.asm.Opcodes.LCMP; +import static org.objectweb.asm.Opcodes.LCONST_0; +import static org.objectweb.asm.Opcodes.NEW; +import static org.objectweb.asm.Opcodes.POP; +import static org.objectweb.asm.Opcodes.PUTSTATIC; +import static org.objectweb.asm.Opcodes.RETURN; + +import java.io.IOException; +import java.util.function.BiFunction; + +import org.jboss.jandex.DotName; +import org.jboss.logging.Logger; +import org.objectweb.asm.ClassVisitor; +import org.objectweb.asm.Label; +import org.objectweb.asm.MethodVisitor; + +import io.quarkus.builder.item.EmptyBuildItem; +import io.quarkus.deployment.annotations.BuildProducer; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.annotations.Consume; +import io.quarkus.deployment.annotations.Produce; +import io.quarkus.deployment.builditem.BytecodeTransformerBuildItem; +import io.quarkus.deployment.builditem.CombinedIndexBuildItem; +import io.quarkus.deployment.builditem.FeatureBuildItem; +import io.quarkus.gizmo.Gizmo; +import io.quarkus.netty.deployment.MinNettyAllocatorMaxOrderBuildItem; +import io.smallrye.common.annotation.RunOnVirtualThread; + +public class NettyLoomAdaptorProcessor { + static Logger LOG = Logger.getLogger(NettyLoomAdaptorProcessor.class); + + @BuildStep + public FeatureBuildItem feature() { + return new FeatureBuildItem("netty-Loom-adaptor"); + } + + /** + * This extension is designed to stop using Netty's {@link io.netty.buffer.PooledByteBufAllocator.PoolThreadLocalCache + * PoolThreadLocalCache}, extending {@link io.netty.util.concurrent.FastThreadLocal FastThreadLocal} in the + * {@link io.netty.buffer.PooledByteBufAllocator#newDirectBuffer(int, int)} newDirectBuffer(int,int)} method and to replace + * them with a {@link java.util.concurrent.ConcurrentHashMap ConcurrentHashMap} using the carrier thread's name as a key. + * + * we want to instrument the source ({@link io.netty.buffer.PooledByteBufAllocator#newDirectBuffer(int, int)} + * newDirectBuffer(int,int)} to get: + * + * protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { + * boolean isVirtual = false; + * PoolThreadCache cache=null; + * if(canUseVirtual){ + * try { + * isVirtual = (boolean) isVirtualMethod.invoke(Thread.currentThread()); + * } catch (IllegalAccessException | InvocationTargetException e) { + * System.err.println(e); + * } + * if(isVirtual){ + * cache = createCache(initialCapacity, maxCapacity); + * } + * } + * if(cache == null){ + * cache = threadCache.get(); + * } + * PoolArena directArena = cache.directArena; + * + * final ByteBuf buf; + * if (directArena != null) { + * buf = directArena.allocate(cache, initialCapacity, maxCapacity); + * } else { + * buf = PlatformDependent.hasUnsafe() ? + * UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : + * new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); + * } + * + * return toLeakAwareBuffer(buf); + * } + * + * private PoolThreadCache createCache(int initialCapacity, int maxCapacity){ + * PoolThreadCache cache; + * Thread currentCarrierThread; + * try { + * currentCarrierThread = (Thread) getCurrentCarrierMethod.invoke(null); + * } catch (InvocationTargetException | IllegalAccessException e) { + * System.out.println(e); + * return null; + * } + * if(threadCaches.containsKey(currentCarrierThread)){ + * return threadCaches.get(currentCarrierThread); + * }else{ + * PoolArena heapArena = leastUsedArena(heapArenas); + * PoolArena directArena = leastUsedArena(directArenas); + * + * cache = new PoolThreadCache( + * heapArena, directArena, smallCacheSize, normalCacheSize, + * DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); + * threadCaches.put(currentCarrierThread, cache); + * if (DEFAULT_CACHE_TRIM_INTERVAL_MILLIS > 0) { + * EventExecutor executor = ThreadExecutorMap.currentExecutor(); + * if (executor != null) { + * executor.scheduleAtFixedRate(trimTask, DEFAULT_CACHE_TRIM_INTERVAL_MILLIS, + * DEFAULT_CACHE_TRIM_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); + * } + * } + * } + * return cache; + * } + * + */ + @Produce(EmptyBuildItem.class) + @Consume(MinNettyAllocatorMaxOrderBuildItem.class) + @BuildStep + void adaptNetty(CombinedIndexBuildItem combinedIndexBuildItem, BuildProducer producer) + throws IOException { + var runOnVirtualThreadAnnotations = combinedIndexBuildItem.getComputingIndex() + .getAnnotations(DotName.createSimple(RunOnVirtualThread.class.getName())).size(); + if (runOnVirtualThreadAnnotations == 0) { + return; + } + var klass = "io.netty.buffer.PooledByteBufAllocator"; + + producer.produce(new BytecodeTransformerBuildItem(klass, new BiFunction() { + @Override + public ClassVisitor apply(String cls, ClassVisitor classVisitor) { + return new NettyCurrentAdaptor(ASM9, classVisitor); + } + + })); + } + + private class NettyCurrentAdaptor extends ClassVisitor { + public NettyCurrentAdaptor(int version, ClassVisitor cv) { + super(version, cv); + LOG.info("Adapting Netty for Loom..."); + } + + @Override + public MethodVisitor visitMethod( + final int access, + final String name, + final String descriptor, + final String signature, + final String[] exceptions) { + if (cv != null) { + MethodVisitor mv = cv.visitMethod(access, name, descriptor, signature, exceptions); + if (name.equals("")) { + // we need to augment the method to assigned the different static fields we added to the + // {@link io.netty.buffer.PooledByteBufAllocator PooledByteBufAllocator} class + mv = new MethodVisitor(Gizmo.ASM_API_VERSION, mv) { + @Override + public void visitInsn(int opcode) { + if (opcode == RETURN) { + Label L0 = new Label(); + Label L1 = new Label(); + Label L2 = new Label(); + + Label LthreadCaches = new Label(); + Label lcanUseVirtual = new Label(); + + // set canUseVirtual to true + mv.visitLabel(lcanUseVirtual); + mv.visitInsn(ICONST_1); + mv.visitMethodInsn(INVOKESTATIC, "java/lang/Boolean", "valueOf", + "(Z)Ljava/lang/Boolean;", false); + mv.visitFieldInsn(PUTSTATIC, "io/netty/buffer/PooledByteBufAllocator", + "canUseVirtual", "Ljava/lang/Boolean;"); + + // fetch the currentCarrierThread method and put it inside the getCurrentCarrierMethod field + // to avoid having to fetch it everytime we need to invoke it + mv.visitLabel(L0); + mv.visitLdcInsn("java.lang.Thread"); + mv.visitMethodInsn(INVOKESTATIC, "java/lang/Class", "forName", + "(Ljava/lang/String;)Ljava/lang/Class;", false); + mv.visitLdcInsn("currentCarrierThread"); + mv.visitInsn(ICONST_0); + mv.visitTypeInsn(ANEWARRAY, "java/lang/Class"); + mv.visitMethodInsn(INVOKEVIRTUAL, "java/lang/Class", "getDeclaredMethod", + "(Ljava/lang/String;[Ljava/lang/Class;)Ljava/lang/reflect/Method;", + false); + mv.visitFieldInsn(PUTSTATIC, "io/netty/buffer/PooledByteBufAllocator", + "getCurrentCarrierMethod", "Ljava/lang/reflect/Method;"); + mv.visitFieldInsn(GETSTATIC, "io/netty/buffer/PooledByteBufAllocator", + "getCurrentCarrierMethod", "Ljava/lang/reflect/Method;"); + mv.visitInsn(ICONST_1); + // make it accessible + mv.visitMethodInsn(INVOKEVIRTUAL, "java/lang/reflect/Method", "setAccessible", + "(Z)V", false); + + // fetch the isVirtual method and put it inside the isVirtualMethod field to avoid + // having to fetch it everytime we need to invoke it + mv.visitLdcInsn("java.lang.Thread"); + mv.visitMethodInsn(INVOKESTATIC, "java/lang/Class", "forName", + "(Ljava/lang/String;)Ljava/lang/Class;", false); + mv.visitLdcInsn("isVirtual"); + mv.visitInsn(ICONST_0); + mv.visitTypeInsn(ANEWARRAY, "java/lang/Class"); + mv.visitMethodInsn(INVOKEVIRTUAL, "java/lang/Class", "getDeclaredMethod", + "(Ljava/lang/String;[Ljava/lang/Class;)Ljava/lang/reflect/Method;", + false); + mv.visitFieldInsn(PUTSTATIC, "io/netty/buffer/PooledByteBufAllocator", + "isVirtualMethod", "Ljava/lang/reflect/Method;"); + + mv.visitLabel(L1); + mv.visitJumpInsn(GOTO, LthreadCaches); + + // catch block of reflective calls to fetch isVirtual and currentCarrierThread. + // we set the canUseVirtual field to false if we enter the catch block: + // if these methods can't be found the jdk is not quarkus-loom compliant + mv.visitLabel(L2); + mv.visitVarInsn(ASTORE, 0); + mv.visitInsn(ICONST_0); + mv.visitMethodInsn(INVOKESTATIC, "java/lang/Boolean", "valueOf", + "(Z)Ljava/lang/Boolean;", false); + mv.visitFieldInsn(PUTSTATIC, "io/netty/buffer/PooledByteBufAllocator", + "canUseVirtual", "Ljava/lang/Boolean;"); + + // create the static concurrentHashMap that will be populated + mv.visitLabel(LthreadCaches); + mv.visitTypeInsn(NEW, "java/util/concurrent/ConcurrentHashMap"); + mv.visitInsn(DUP); + mv.visitMethodInsn(INVOKESPECIAL, "java/util/concurrent/ConcurrentHashMap", + "", "()V", false); + mv.visitFieldInsn(PUTSTATIC, "io/netty/buffer/PooledByteBufAllocator", + "threadCaches", "Ljava/util/concurrent/ConcurrentHashMap;"); + + mv.visitTryCatchBlock(L0, L1, L2, "java/lang/NoSuchMethodException"); + mv.visitTryCatchBlock(L0, L1, L2, "java/lang/ClassNotFoundException"); + } + super.visitInsn(opcode); + } + + }; + mv.visitMaxs(3, 3); + return mv; + } + if (name.equals("newDirectBuffer")) { + // this is the actual method we want to modify + mv = new CurrentThreadMethodAdaptor(Gizmo.ASM_API_VERSION, mv); + mv.visitMaxs(4, 4); + return mv; + } + return mv; + } + return null; + } + + /** + * this method contains logic that was previously in + * {@link io.netty.buffer.PooledByteBufAllocator#newDirectBuffer(int, int)} newDirectBuffer(int, int) + * The FastThreadLocals are used to store thread cache, they are hence created with an initial value that needs a + * {@link io.netty.buffer.PoolArena}, this is + */ + public void createLeastUsedArenaMethod() { + var L0 = new Label(); + var L1 = new Label(); + var L2 = new Label(); + var L3 = new Label(); + var L4 = new Label(); + var L5 = new Label(); + var L6 = new Label(); + var L7 = new Label(); + var L8 = new Label(); + var L9 = new Label(); + var L10 = new Label(); + var mv = cv.visitMethod(ACC_PRIVATE, "leastUsedArena", + "([Lio/netty/buffer/PoolArena;)Lio/netty/buffer/PoolArena;", null, null); + mv.visitLabel(L0); + mv.visitVarInsn(ALOAD, 1); + mv.visitJumpInsn(IFNULL, L1); + mv.visitVarInsn(ALOAD, 1); + mv.visitInsn(ARRAYLENGTH); + mv.visitJumpInsn(IFNE, L2); + + mv.visitLabel(L1); + mv.visitInsn(ACONST_NULL); + mv.visitInsn(ARETURN); + + mv.visitLabel(L2); + mv.visitVarInsn(ALOAD, 1); + mv.visitInsn(ICONST_0); + mv.visitInsn(AALOAD); + mv.visitVarInsn(ASTORE, 2); + + mv.visitLabel(L3); + mv.visitInsn(ICONST_1); + mv.visitVarInsn(ISTORE, 3); + + mv.visitLabel(L4); + mv.visitVarInsn(ILOAD, 3); + mv.visitVarInsn(ALOAD, 1); + mv.visitInsn(ARRAYLENGTH); + mv.visitJumpInsn(IF_ICMPGE, L5); + + mv.visitLabel(L5); + mv.visitVarInsn(ALOAD, 1); + mv.visitVarInsn(ILOAD, 3); + mv.visitInsn(AALOAD); + mv.visitVarInsn(ASTORE, 4); + + mv.visitLabel(L7); + mv.visitVarInsn(ALOAD, 4); + mv.visitFieldInsn(GETFIELD, "io/netty/buffer/PoolArena", "numThreadCaches", + "Ljava/util/concurrent/atomic/AtomicInteger;"); + mv.visitMethodInsn(INVOKEVIRTUAL, "java/util/concurrent/atomic/AtomicInteger", "get", + "()I", false); + mv.visitVarInsn(ALOAD, 2); + mv.visitFieldInsn(GETFIELD, "io/netty/buffer/PoolArena", "numThreadCaches", + "Ljava/util/concurrent/atomic/AtomicInteger;"); + mv.visitMethodInsn(INVOKEVIRTUAL, "java/util/concurrent/atomic/AtomicInteger", "get", + "()I", false); + mv.visitJumpInsn(IF_ICMPGE, L8); + + mv.visitLabel(L9); + mv.visitVarInsn(ALOAD, 4); + mv.visitVarInsn(ASTORE, 2); + + mv.visitLabel(L8); + mv.visitIincInsn(3, 1); + mv.visitJumpInsn(GOTO, L4); + + mv.visitLabel(L5); + mv.visitVarInsn(ALOAD, 2); + mv.visitInsn(ARETURN); + + mv.visitLabel(L10); + mv.visitLocalVariable("arena", "Lio/netty/buffer/PoolArena;", + "Lio/netty/buffer/PoolArena;", L7, L8, 4); + mv.visitLocalVariable("i", "I", null, L4, L5, 3); + mv.visitLocalVariable("this", "Lio/netty/buffer/PooledByteBufAllocator;", + null, L0, L10, 0); + mv.visitLocalVariable("arenas", "[Lio/netty/buffer/PoolArena;", + "[Lio/netty/buffer/PoolArena;", L0, L10, 1); + mv.visitLocalVariable("minArena", "Lio/netty/buffer/PoolArena;", + "Lio/netty/buffer/PoolArena;", L3, L10, 2); + mv.visitMaxs(2, 5); + } + + /** + * this method contains logic that was previously in + * {@link io.netty.buffer.PooledByteBufAllocator#newDirectBuffer(int, int)} newDirectBuffer(int, int) + * it was a method of {@link io.netty.buffer.PooledByteBufAllocator.PoolThreadLocalCache PoolThreadLocalCache}, + * we need to reimplement it outside of this subclass that we don't use anymore + */ + public void createCacheMethod() { + Label L0 = new Label(); + Label L1 = new Label(); + Label L2 = new Label(); + Label LError = new Label(); + Label LStart = new Label(); + Label LEnd = new Label(); + Label testHashMap = new Label(); + Label LKeyIn = new Label(); + Label LKeyOut = new Label(); + Label L14 = new Label(); + //needs to be private + var mv = cv.visitMethod(ACC_PRIVATE, "createCache", "(II)Lio/netty/buffer/PoolThreadCache;", null, null); + mv.visitLabel(LStart); + //set currentCarrier to the currentThread + mv.visitMethodInsn(INVOKESTATIC, "java/lang/Thread", "currentThread", + "()Ljava/lang/Thread;", false); + mv.visitTypeInsn(CHECKCAST, "java/lang/Thread"); + mv.visitVarInsn(ASTORE, 5); + + //we try to access the currentCarrierThread method + mv.visitLabel(L0); + mv.visitFieldInsn(GETSTATIC, "io/netty/buffer/PooledByteBufAllocator", + "getCurrentCarrierMethod", "Ljava/lang/reflect/Method;"); + //we store the result in method + mv.visitTypeInsn(CHECKCAST, "java/lang/reflect/Method"); + mv.visitMethodInsn(INVOKESTATIC, "java/lang/Thread", "currentThread", + "()Ljava/lang/Thread;", false); + mv.visitInsn(ICONST_0); + mv.visitTypeInsn(ANEWARRAY, "java/lang/Object"); + mv.visitMethodInsn(INVOKEVIRTUAL, "java/lang/reflect/Method", "invoke", + "(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object;", false); + mv.visitTypeInsn(CHECKCAST, "java/lang/Thread"); + mv.visitVarInsn(ASTORE, 5); + + //we finished to try to access currentCarrierThread and it went fine, we jump to the next thing to do + mv.visitLabel(L1); + mv.visitJumpInsn(GOTO, testHashMap); + + //to handle the exception we merely store it in 7 + mv.visitLabel(L2); + mv.visitLabel(LError); + mv.visitVarInsn(ASTORE, 6); + mv.visitInsn(ACONST_NULL); + mv.visitInsn(ARETURN); + + //we try to access the currentHashmap + mv.visitLabel(testHashMap); + mv.visitInsn(ACONST_NULL); + mv.visitTypeInsn(CHECKCAST, "io/netty/buffer/PoolThreadCache"); + mv.visitVarInsn(ASTORE, 3); + mv.visitFieldInsn(GETSTATIC, "io/netty/buffer/PooledByteBufAllocator", "threadCaches", + "Ljava/util/concurrent/ConcurrentHashMap;"); + //we store the testHashMap + mv.visitTypeInsn(CHECKCAST, "java/util/concurrent/ConcurrentHashMap"); + mv.visitVarInsn(ASTORE, 7); + mv.visitVarInsn(ALOAD, 7); + mv.visitVarInsn(ALOAD, 5); + //... currentCarrierThread.getName() + mv.visitMethodInsn(INVOKEVIRTUAL, "java/util/concurrent/ConcurrentHashMap", "containsKey", + "(Ljava/lang/Object;)Z", false); + mv.visitJumpInsn(IFEQ, LKeyOut); + + //the carrier name is already a key in the concurrentHashMap + mv.visitLabel(LKeyIn); + mv.visitVarInsn(ALOAD, 7); + mv.visitVarInsn(ALOAD, 5); + mv.visitMethodInsn(INVOKEVIRTUAL, "java/util/concurrent/ConcurrentHashMap", "get", + "(Ljava/lang/Object;)Ljava/lang/Object;", false); + mv.visitTypeInsn(CHECKCAST, "io/netty/buffer/PoolThreadCache"); + mv.visitInsn(ARETURN); + + //the carrier name is not already a key in the concurrentHashMap + mv.visitLabel(LKeyOut); + mv.visitVarInsn(ALOAD, 0); + mv.visitVarInsn(ALOAD, 0); + mv.visitFieldInsn(GETFIELD, "io/netty/buffer/PooledByteBufAllocator", "heapArenas", + "[Lio/netty/buffer/PoolArena;"); + mv.visitMethodInsn(INVOKESPECIAL, "io/netty/buffer/PooledByteBufAllocator", "leastUsedArena", + "([Lio/netty/buffer/PoolArena;)Lio/netty/buffer/PoolArena;", false); + mv.visitVarInsn(ASTORE, 6); + mv.visitVarInsn(ALOAD, 0); + mv.visitVarInsn(ALOAD, 0); + mv.visitFieldInsn(GETFIELD, "io/netty/buffer/PooledByteBufAllocator", "directArenas", + "[Lio/netty/buffer/PoolArena;"); + mv.visitMethodInsn(INVOKESPECIAL, "io/netty/buffer/PooledByteBufAllocator", "leastUsedArena", + "([Lio/netty/buffer/PoolArena;)Lio/netty/buffer/PoolArena;", false); + mv.visitVarInsn(ASTORE, 9); + mv.visitTypeInsn(NEW, "io/netty/buffer/PoolThreadCache"); + + mv.visitInsn(DUP); + mv.visitVarInsn(ALOAD, 6); + mv.visitVarInsn(ALOAD, 9); + mv.visitVarInsn(ALOAD, 0); + mv.visitFieldInsn(GETFIELD, "io/netty/buffer/PooledByteBufAllocator", "smallCacheSize", + "I"); + mv.visitVarInsn(ALOAD, 0); + mv.visitFieldInsn(GETFIELD, "io/netty/buffer/PooledByteBufAllocator", "normalCacheSize", + "I"); + mv.visitFieldInsn(GETSTATIC, "io/netty/buffer/PooledByteBufAllocator", + "DEFAULT_MAX_CACHED_BUFFER_CAPACITY", "I"); + mv.visitFieldInsn(GETSTATIC, "io/netty/buffer/PooledByteBufAllocator", + "DEFAULT_CACHE_TRIM_INTERVAL", "I"); + mv.visitMethodInsn(INVOKESPECIAL, "io/netty/buffer/PoolThreadCache", "", + "(Lio/netty/buffer/PoolArena;Lio/netty/buffer/PoolArena;IIII)V", false); + mv.visitVarInsn(ASTORE, 3); + mv.visitVarInsn(ALOAD, 7); + mv.visitVarInsn(ALOAD, 5); + mv.visitVarInsn(ALOAD, 3); + mv.visitMethodInsn(INVOKEVIRTUAL, "java/util/concurrent/ConcurrentHashMap", "put", + "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;", false); + + mv.visitFieldInsn(GETSTATIC, "io/netty/buffer/PooledByteBufAllocator", + "DEFAULT_CACHE_TRIM_INTERVAL_MILLIS", "J"); + mv.visitInsn(LCONST_0); + mv.visitInsn(LCMP); + mv.visitJumpInsn(IFLE, L14); + + mv.visitLabel(new Label()); + mv.visitMethodInsn(INVOKESTATIC, "io/netty/util/internal/ThreadExecutorMap", "currentExecutor", + "()Lio/netty/util/concurrent/EventExecutor;", false); + mv.visitVarInsn(ASTORE, 10); + mv.visitVarInsn(ALOAD, 10); + mv.visitJumpInsn(IFNULL, L14); + mv.visitVarInsn(ALOAD, 10); + mv.visitVarInsn(ALOAD, 0); + + mv.visitFieldInsn(GETFIELD, "io/netty/buffer/PooledByteBufAllocator", "trimTask", + "Ljava/lang/Runnable;"); + mv.visitFieldInsn(GETSTATIC, "io/netty/buffer/PooledByteBufAllocator", + "DEFAULT_CACHE_TRIM_INTERVAL_MILLIS", "J"); + mv.visitFieldInsn(GETSTATIC, "io/netty/buffer/PooledByteBufAllocator", + "DEFAULT_CACHE_TRIM_INTERVAL_MILLIS", "J"); + mv.visitFieldInsn(GETSTATIC, "io/netty/buffer/PooledByteBufAllocator", "MILLISECONDS", + "Ljava/util/concurrent/TimeUnit;"); + mv.visitMethodInsn(INVOKEINTERFACE, "io/netty/util/concurrent/EventExecutor", + "scheduleAtFixedRate", + "(Ljava/lang/Runnable;JJLjava/util/concurrent/TimeUnit;)Lio/netty/util/concurrent/ScheduledFuture;", + true); + mv.visitInsn(POP); + + mv.visitLabel(L14); + mv.visitVarInsn(ALOAD, 3); + mv.visitInsn(ARETURN); + mv.visitLabel(LEnd); + + mv.visitTryCatchBlock(L0, L1, L2, "java/lang/NoSuchMethodException"); + mv.visitTryCatchBlock(L0, L1, L2, "java/lang/ClassNotFoundException"); + mv.visitTryCatchBlock(L0, L1, L2, "java/lang/reflect/InvocationTargetException"); + mv.visitTryCatchBlock(L0, L1, L2, "java/lang/IllegalAccessException"); + + mv.visitLocalVariable("cache", "Lio/netty/buffer/PoolThreadCache;", null, testHashMap, LEnd, 3); + mv.visitLocalVariable("this", "Lio/netty/buffer/PooledByteBufAllocator;", null, LStart, LEnd, 0); + mv.visitLocalVariable("initialCapacity", "I", null, LStart, LEnd, 1); + mv.visitLocalVariable("maxCapacity", "I", null, LStart, LEnd, 2); + mv.visitLocalVariable("method", "Ljava/lang/reflect/Method;", null, L0, LEnd, 4); + mv.visitLocalVariable("currentCarrierThread", "Ljava/lang/Thread;", null, LStart, LEnd, 5); + mv.visitLocalVariable("e", "Ljava/lang/ReflectiveOperationException;", null, + LError, testHashMap, 6); + mv.visitLocalVariable("lthreadCaches", "Ljava/util/concurrent/ConcurrentHashMap;", + "Ljava/util/concurrent/ConcurrentHashMap;", + testHashMap, LEnd, 7); + mv.visitLocalVariable("heapArena", "Lio/netty/buffer/PoolArena;", + "Lio/netty/buffer/PoolArena<[B>;", LKeyOut, LEnd, 6); + mv.visitLocalVariable("directArena", "Lio/netty/buffer/PoolArena;", + "Lio/netty/buffer/PoolArena<[B>;", LKeyOut, LEnd, 9); + + mv.visitMaxs(5, 10); + } + + @Override + public void visitEnd() { + cv.visitField(ACC_STATIC | ACC_PRIVATE, "isVirtualMethod", + "Ljava/lang/reflect/Method;", + null, + null); + cv.visitField(ACC_STATIC | ACC_PRIVATE, "getCurrentCarrierMethod", + "Ljava/lang/reflect/Method;", + null, + null); + cv.visitField(ACC_STATIC | ACC_PRIVATE, "canUseVirtual", + "Ljava/lang/Boolean;", + null, + null); + cv.visitField(ACC_STATIC | ACC_PRIVATE, "threadCaches", + "Ljava/util/concurrent/ConcurrentHashMap;", + "Ljava/util/concurrent/ConcurrentHashMapConcurrentHashMap;", + null); + + if (cv != null) { + createLeastUsedArenaMethod(); + createCacheMethod(); + cv.visitEnd(); + } + } + + } + + private class NettyCurrentAdaptorPrinter extends NettyCurrentAdaptor { + public NettyCurrentAdaptorPrinter(int version, ClassVisitor cv) { + super(version, cv); + } + + @Override + public MethodVisitor visitMethod( + final int access, + final String name, + final String descriptor, + final String signature, + final String[] exceptions) { + if (cv != null) { + if (name.equals("newDirectBuffer")) { + MethodVisitor mv = cv.visitMethod(access, name, descriptor, signature, exceptions); + mv = new CurrentThreadMethodAdaptor(Gizmo.ASM_API_VERSION, mv); + mv.visitMaxs(4, 4); + return mv; + } + return null; + } + return null; + } + } + + private static void addPrintInsn(String msg, MethodVisitor mv) { + mv.visitFieldInsn(GETSTATIC, "java/lang/System", "out", "Ljava/io/PrintStream;"); + mv.visitLdcInsn(msg); + mv.visitMethodInsn(INVOKEVIRTUAL, "java/io/PrintStream", "println", "(Ljava/lang/String;)V", false); + } + + private static void addPrintVar(int var, String type, int instruction, MethodVisitor mv) { + mv.visitFieldInsn(GETSTATIC, "java/lang/System", "err", "Ljava/io/PrintStream;"); + mv.visitVarInsn(instruction, var); + mv.visitMethodInsn(INVOKEVIRTUAL, "java/io/PrintStream", "println", "(" + type + ")V", false); + } + + private class CurrentThreadMethodAdaptor extends MethodVisitor { + boolean firstReturn = true; + MethodVisitor mv; + + public CurrentThreadMethodAdaptor(int api, MethodVisitor methodVisitor) { + super(api, null); + mv = methodVisitor; + } + + @Override + public void visitCode() { + mv.visitCode(); + firstReturn = false; + Label L0 = new Label(); + Label L1 = new Label(); + Label L2 = new Label(); + Label L16 = new Label(); + Label L18 = new Label(); + Label LNullDirectArena = new Label(); + + Label LStart = new Label(); + Label LTest = new Label(); + + Label lVirtual = new Label(); + Label lTestCache = new Label(); + Label lAfter = new Label(); + Label LgotCache = new Label(); + Label LReturn = new Label(); + Label LEnd = new Label(); + + //...Thread.class.getMethod("isVirtual")... + mv.visitLabel(LStart); + mv.visitInsn(ICONST_0); + mv.visitVarInsn(ISTORE, 3); + mv.visitInsn(ACONST_NULL); + mv.visitVarInsn(ASTORE, 4); + mv.visitFieldInsn(GETSTATIC, "io/netty/buffer/PooledByteBufAllocator", "canUseVirtual", + "Ljava/lang/Boolean;"); + mv.visitMethodInsn(INVOKEVIRTUAL, "java/lang/Boolean", "booleanValue", + "()Z", false); + mv.visitJumpInsn(IFEQ, lTestCache); + + mv.visitLabel(L0); + mv.visitFieldInsn(GETSTATIC, "io/netty/buffer/PooledByteBufAllocator", "isVirtualMethod", + "Ljava/lang/reflect/Method;"); + mv.visitMethodInsn(INVOKESTATIC, "java/lang/Thread", "currentThread", + "()Ljava/lang/Thread;", false); + mv.visitInsn(ICONST_0); + mv.visitTypeInsn(ANEWARRAY, "java/lang/Object"); + mv.visitMethodInsn(INVOKEVIRTUAL, "java/lang/reflect/Method", "invoke", + "(Ljava/lang/Object;[Ljava/lang/Object;)Ljava/lang/Object;", false); + + mv.visitTypeInsn(CHECKCAST, "java/lang/Boolean"); + mv.visitMethodInsn(INVOKEVIRTUAL, "java/lang/Boolean", "booleanValue", + "()Z", false); + mv.visitVarInsn(ISTORE, 3); + + mv.visitLabel(L1); + mv.visitJumpInsn(GOTO, LTest); + + mv.visitLabel(L2); + mv.visitVarInsn(ASTORE, 4); + addPrintInsn("error in newDirectBuffer : ", mv); + addPrintVar(4, "Ljava/lang/Object;", ALOAD, mv); + + mv.visitLabel(LTest); + mv.visitInsn(ACONST_NULL); + mv.visitVarInsn(ASTORE, 4); + mv.visitVarInsn(ILOAD, 3); + //else + mv.visitJumpInsn(IFEQ, lTestCache); + + //if(isVirtual)... + mv.visitLabel(lVirtual); + mv.visitVarInsn(ALOAD, 0); + mv.visitVarInsn(ILOAD, 1); + mv.visitVarInsn(ILOAD, 2); + mv.visitMethodInsn(INVOKESPECIAL, "io/netty/buffer/PooledByteBufAllocator", "createCache", + "(II)Lio/netty/buffer/PoolThreadCache;", false); + mv.visitVarInsn(ASTORE, 4); + + //if(cache == null).. + mv.visitLabel(lTestCache); + mv.visitVarInsn(ALOAD, 4); + mv.visitJumpInsn(IFNONNULL, LgotCache); + + //if the cache was indeed null + mv.visitVarInsn(ALOAD, 0); + mv.visitFieldInsn(GETFIELD, "io/netty/buffer/PooledByteBufAllocator", "threadCache", + "Lio/netty/buffer/PooledByteBufAllocator$PoolThreadLocalCache;"); + mv.visitMethodInsn(INVOKEVIRTUAL, "io/netty/buffer/PooledByteBufAllocator$PoolThreadLocalCache", + "get", "()Ljava/lang/Object;", false); + mv.visitTypeInsn(CHECKCAST, "io/netty/buffer/PoolThreadCache"); + mv.visitVarInsn(ASTORE, 4); + + //we stored the cache in 4, let's use it now + mv.visitLabel(LgotCache); + mv.visitVarInsn(ALOAD, 4); + mv.visitFieldInsn(GETFIELD, "io/netty/buffer/PoolThreadCache", "directArena", + "Lio/netty/buffer/PoolArena;"); + mv.visitVarInsn(ASTORE, 5); + mv.visitVarInsn(ALOAD, 5); + mv.visitJumpInsn(IFNULL, LNullDirectArena); + mv.visitVarInsn(ALOAD, 5); + mv.visitVarInsn(ALOAD, 4); + mv.visitVarInsn(ILOAD, 1); + mv.visitVarInsn(ILOAD, 2); + mv.visitMethodInsn(INVOKEVIRTUAL, "io/netty/buffer/PoolArena", "allocate", + "(Lio/netty/buffer/PoolThreadCache;II)Lio/netty/buffer/PooledByteBuf;", false); + mv.visitVarInsn(ASTORE, 6); + mv.visitJumpInsn(GOTO, LReturn); + + mv.visitLabel(LNullDirectArena); + mv.visitMethodInsn(INVOKESTATIC, "io/netty/util/internal/PlatformDependent", "hasUnsafe", + "()Z", false); + mv.visitJumpInsn(IFEQ, L16); + mv.visitVarInsn(ALOAD, 0); + mv.visitVarInsn(ILOAD, 1); + mv.visitVarInsn(ILOAD, 2); + mv.visitMethodInsn(INVOKESTATIC, "io/netty/util/internal/PlatformDependent", + "newUnsafeDirectByteBuf", + "(Lio/netty/buffer/ByteBufAllocator;II)Lio/netty/buffer/UnpooledUnsafeDirectByteBuf;", + false); + mv.visitJumpInsn(GOTO, L18); + + mv.visitLabel(L16); + mv.visitTypeInsn(NEW, "io/netty/buffer/UnpooledDirectByteBuf"); + mv.visitInsn(DUP); + mv.visitVarInsn(ALOAD, 0); + mv.visitVarInsn(ILOAD, 1); + mv.visitVarInsn(ILOAD, 2); + mv.visitMethodInsn(INVOKESPECIAL, "io/netty/buffer/UnpooledDirectByteBuf", "", + "(Lio/netty/buffer/ByteBufAllocator;II)V", false); + + mv.visitLabel(L18); + mv.visitVarInsn(ASTORE, 6); + + mv.visitLabel(LReturn); + mv.visitVarInsn(ALOAD, 6); + mv.visitMethodInsn(INVOKESTATIC, "io/netty/buffer/PooledByteBufAllocator", "toLeakAwareBuffer", + "(Lio/netty/buffer/ByteBuf;)Lio/netty/buffer/ByteBuf;", false); + + mv.visitInsn(ARETURN); + + mv.visitLabel(LEnd); + + mv.visitTryCatchBlock(L0, L1, L2, "java/lang/IllegalAccessException"); + mv.visitTryCatchBlock(L0, L1, L2, "java/lang/reflect/InvocationTargetException"); + mv.visitTryCatchBlock(L0, L1, L2, "java/lang/NoSuchMethodException"); + mv.visitTryCatchBlock(L0, L1, L2, "java/lang/ClassNotFoundException"); + + mv.visitLocalVariable("isVirtual", "Z", null, LStart, LEnd, 2); + mv.visitEnd(); + mv.visitMaxs(10, 10); + } + } +} diff --git a/extensions/netty-loom-adaptor/pom.xml b/extensions/netty-loom-adaptor/pom.xml new file mode 100644 index 0000000000000..8983261c25321 --- /dev/null +++ b/extensions/netty-loom-adaptor/pom.xml @@ -0,0 +1,25 @@ + + + 4.0.0 + + + quarkus-extensions-parent + io.quarkus + 999-SNAPSHOT + ../pom.xml + + + quarkus-netty-loom-adaptor-parent + Quarkus - Netty Loom Adaptor + pom + + + runtime + deployment + + + + + \ No newline at end of file diff --git a/extensions/netty-loom-adaptor/runtime/pom.xml b/extensions/netty-loom-adaptor/runtime/pom.xml new file mode 100644 index 0000000000000..55aa4a59558ae --- /dev/null +++ b/extensions/netty-loom-adaptor/runtime/pom.xml @@ -0,0 +1,61 @@ + + + 4.0.0 + + + io.quarkus + quarkus-netty-loom-adaptor-parent + 999-SNAPSHOT + ../pom.xml + + + quarkus-netty-loom-adaptor + Quarkus - Netty Loom Adaptor - Runtime + Modifies some Netty classes to make it work with loom + + + + io.quarkus + quarkus-core + + + org.jboss.logging + commons-logging-jboss-logging + + + io.quarkus + quarkus-netty + + + io.quarkus + quarkus-arc + + + io.netty + netty-buffer + + + + + + + io.quarkus + quarkus-bootstrap-maven-plugin + + + maven-compiler-plugin + + + + io.quarkus + quarkus-extension-processor + ${project.version} + + + + + + + diff --git a/extensions/pom.xml b/extensions/pom.xml index c6c5f7d6bfc64..6618b4c5aaf11 100644 --- a/extensions/pom.xml +++ b/extensions/pom.xml @@ -14,6 +14,8 @@ Quarkus - Extensions - Parent pom pom + + netty-loom-adaptor arc scheduler @@ -193,6 +195,7 @@ grpc-common awt + diff --git a/extensions/reactive-routes/deployment/src/main/java/io/quarkus/vertx/web/deployment/DotNames.java b/extensions/reactive-routes/deployment/src/main/java/io/quarkus/vertx/web/deployment/DotNames.java index 6eefc0add80e6..0a8dcdfa97516 100644 --- a/extensions/reactive-routes/deployment/src/main/java/io/quarkus/vertx/web/deployment/DotNames.java +++ b/extensions/reactive-routes/deployment/src/main/java/io/quarkus/vertx/web/deployment/DotNames.java @@ -15,6 +15,7 @@ import io.quarkus.vertx.web.RouteFilter; import io.quarkus.vertx.web.RoutingExchange; import io.smallrye.common.annotation.Blocking; +import io.smallrye.common.annotation.RunOnVirtualThread; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.vertx.core.buffer.Buffer; @@ -51,6 +52,7 @@ final class DotNames { static final DotName EXCEPTION = DotName.createSimple(Exception.class.getName()); static final DotName THROWABLE = DotName.createSimple(Throwable.class.getName()); static final DotName BLOCKING = DotName.createSimple(Blocking.class.getName()); + static final DotName RUN_ON_VIRTUAL_THREAD = DotName.createSimple(RunOnVirtualThread.class.getName()); static final DotName COMPLETION_STAGE = DotName.createSimple(CompletionStage.class.getName()); static final DotName COMPRESSED = DotName.createSimple(Compressed.class.getName()); static final DotName UNCOMPRESSED = DotName.createSimple(Uncompressed.class.getName()); diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/ResteasyReactiveRecorder.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/ResteasyReactiveRecorder.java index 61fd22755dc33..1ab27aef29823 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/ResteasyReactiveRecorder.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/ResteasyReactiveRecorder.java @@ -3,14 +3,18 @@ import static io.quarkus.resteasy.reactive.server.runtime.NotFoundExceptionMapper.classMappers; import java.io.Closeable; +import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import javax.ws.rs.core.Application; +import org.jboss.logging.Logger; import org.jboss.resteasy.reactive.common.core.SingletonBeanFactory; import org.jboss.resteasy.reactive.common.model.ResourceContextResolver; import org.jboss.resteasy.reactive.common.model.ResourceExceptionMapper; @@ -55,12 +59,62 @@ @Recorder public class ResteasyReactiveRecorder extends ResteasyReactiveCommonRecorder implements EndpointInvokerFactory { + static final Logger logger = Logger.getLogger("io.quarkus"); + public static final Supplier EXECUTOR_SUPPLIER = new Supplier() { @Override public Executor get() { return ExecutorRecorder.getCurrent(); } }; + public static final Supplier VIRTUAL_EXECUTOR_SUPPLIER = new Supplier() { + Executor current = null; + + /** + * This method is used to specify a custom executor to dispatch virtual threads on carrier threads + * We need reflection for both ease of use (see {@link #get() Get} method) but also because we call methods + * of private classes from the java.lang package. + * + * It is used for testing purposes only for now + */ + private Executor setVirtualThreadCustomScheduler(Executor executor) throws ClassNotFoundException, + InvocationTargetException, InstantiationException, IllegalAccessException, NoSuchMethodException { + var vtf = Class.forName("java.lang.ThreadBuilders").getDeclaredClasses()[0]; + Constructor constructor = vtf.getDeclaredConstructors()[0]; + constructor.setAccessible(true); + ThreadFactory tf = (ThreadFactory) constructor.newInstance( + new Object[] { executor, "quarkus-virtual-factory-", 0, 0, + null }); + + return (Executor) Executors.class.getMethod("newThreadPerTaskExecutor", ThreadFactory.class) + .invoke(this, tf); + } + + /** + * This method uses reflection in order to allow developers to quickly test quarkus-loom without needing to + * change --release, --source, --target flags and to enable previews. + * Since we try to load the "Loom-preview" classes/methods at runtime, the application can even be compiled + * using java 11 and executed with a loom-compliant JDK. + */ + @Override + public Executor get() { + if (current == null) { + try { + current = (Executor) Executors.class.getMethod("newVirtualThreadPerTaskExecutor") + .invoke(this); + } catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) { + System.err.println(e); + //quite ugly but works + logger.warnf("You weren't able to create an executor that spawns virtual threads, the default" + + " blocking executor will be used, please check that your JDK is compatible with " + + "virtual threads"); + //if for some reason a class/method can't be loaded or invoked we return the traditional EXECUTOR + current = EXECUTOR_SUPPLIER.get(); + } + } + return current; + } + }; static volatile Deployment currentDeployment; @@ -114,6 +168,7 @@ public ResteasyReactiveRequestContext createContext(Deployment deployment, } RuntimeDeploymentManager runtimeDeploymentManager = new RuntimeDeploymentManager(info, EXECUTOR_SUPPLIER, + VIRTUAL_EXECUTOR_SUPPLIER, closeTaskHandler, contextFactory, new ArcThreadSetupAction(beanContainer.requestContext()), vertxConfig.rootPath); Deployment deployment = runtimeDeploymentManager.deploy(); @@ -164,6 +219,7 @@ public Application get() { SingletonBeanFactory.setInstance(i.getClass().getName(), i); } applicationSupplier = new Supplier() { + @Override public Application get() { return application; diff --git a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/BlockingDefault.java b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/BlockingDefault.java index 6ddd94d71114a..177e7b65bacf1 100644 --- a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/BlockingDefault.java +++ b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/BlockingDefault.java @@ -6,5 +6,6 @@ public enum BlockingDefault { */ AUTOMATIC, BLOCKING, - NON_BLOCKING + NON_BLOCKING, + RUN_ON_VIRTUAL_THREAD } diff --git a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/EndpointIndexer.java b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/EndpointIndexer.java index 18167fbeeb395..dfa580ff8449d 100644 --- a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/EndpointIndexer.java +++ b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/EndpointIndexer.java @@ -60,6 +60,7 @@ import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_RESPONSE; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_SSE_ELEMENT_TYPE; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_STREAM_ELEMENT_TYPE; +import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.RUN_ON_VIRTUAL_THREAD; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.SECURITY_CONTEXT; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.SERVER_REQUEST_CONTEXT; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.SET; @@ -613,6 +614,7 @@ private ResourceMethod createResourceMethod(ClassInfo currentClassInfo, ClassInf } Set nameBindingNames = nameBindingNames(currentMethodInfo, classNameBindings); boolean blocking = isBlocking(currentMethodInfo, defaultBlocking); + boolean runOnVirtualThread = isRunOnVirtualThread(currentMethodInfo, defaultBlocking); // we want to allow "overriding" the blocking/non-blocking setting from an implementation class // when the class defining the annotations is an interface if (!actualEndpointInfo.equals(currentClassInfo) && Modifier.isInterface(currentClassInfo.flags())) { @@ -621,7 +623,10 @@ private ResourceMethod createResourceMethod(ClassInfo currentClassInfo, ClassInf if (actualMethodInfo != null) { //we don't pass AUTOMATIC here, as the method signature would be the same, so the same determination //would be reached for a default - blocking = isBlocking(actualMethodInfo, blocking ? BlockingDefault.BLOCKING : BlockingDefault.NON_BLOCKING); + blocking = isBlocking(actualMethodInfo, + blocking ? BlockingDefault.BLOCKING : BlockingDefault.NON_BLOCKING); + runOnVirtualThread = isRunOnVirtualThread(actualMethodInfo, + blocking ? BlockingDefault.BLOCKING : BlockingDefault.NON_BLOCKING); } } @@ -640,6 +645,7 @@ private ResourceMethod createResourceMethod(ClassInfo currentClassInfo, ClassInf .setNameBindingNames(nameBindingNames) .setName(currentMethodInfo.name()) .setBlocking(blocking) + .setRunOnVirtualThread(runOnVirtualThread) .setSuspended(suspended) .setSse(sse) .setStreamElementType(streamElementType) @@ -692,15 +698,70 @@ private String getAnnotationValueAsString(AnnotationTarget target, DotName annot return value; } + private boolean isRunOnVirtualThread(MethodInfo info, BlockingDefault defaultValue) { + boolean isRunOnVirtualThread = false; + boolean isJDKCompatible = true; + try { + Class.forName("java.lang.ThreadBuilders"); + } catch (ClassNotFoundException e) { + isJDKCompatible = false; + } + + if (!isJDKCompatible) { + log.warn("Your version of the JDK is '" + Runtime.version() + + "' and doesn't support Loom's virtual threads" + + ", your runtime will have to use jdk-19-loom or superior to leverage virtual threads " + + "(else java platform threads will be used instead)."); + } + + Map.Entry runOnVirtualThreadAnnotation = getInheritableAnnotation(info, + RUN_ON_VIRTUAL_THREAD); + + //should the Transactional annotation override the annotation @RunOnVirtualThread ? + //here it does : it is impossible for a transaction to run on a virtual thread + Map.Entry transactional = getInheritableAnnotation(info, TRANSACTIONAL); //we treat this the same as blocking, as JTA is blocking, but it is lower priority + if (transactional != null) { + return false; + } + + if (runOnVirtualThreadAnnotation != null) { + isRunOnVirtualThread = true; + } + + //BlockingDefault.BLOCKING should mean "block a platform thread" ? here it does + if (defaultValue == BlockingDefault.BLOCKING) { + return false; + } else if (defaultValue == BlockingDefault.RUN_ON_VIRTUAL_THREAD) { + isRunOnVirtualThread = true; + } else if (defaultValue == BlockingDefault.NON_BLOCKING) { + return false; + } + + if (isRunOnVirtualThread && !isBlocking(info, defaultValue)) { + throw new DeploymentException( + "Method '" + info.name() + "' of class '" + info.declaringClass().name() + + "' is considered a non blocking method. @RunOnVirtualThread can only be used on " + + " methods considered blocking"); + } else if (isRunOnVirtualThread) { + return true; + } + + return false; + } + private boolean isBlocking(MethodInfo info, BlockingDefault defaultValue) { Map.Entry blockingAnnotation = getInheritableAnnotation(info, BLOCKING); + Map.Entry runOnVirtualThreadAnnotation = getInheritableAnnotation(info, + RUN_ON_VIRTUAL_THREAD); Map.Entry nonBlockingAnnotation = getInheritableAnnotation(info, NON_BLOCKING); + if ((blockingAnnotation != null) && (nonBlockingAnnotation != null)) { if (blockingAnnotation.getKey().kind() == nonBlockingAnnotation.getKey().kind()) { if (blockingAnnotation.getKey().kind() == AnnotationTarget.Kind.METHOD) { - throw new DeploymentException("Method '" + info.name() + "' of class '" + info.declaringClass().name() - + "' contains both @Blocking and @NonBlocking annotations."); + throw new DeploymentException( + "Method '" + info.name() + "' of class '" + info.declaringClass().name() + + "' contains both @Blocking and @NonBlocking annotations."); } else { throw new DeploymentException("Class '" + info.declaringClass().name() + "' contains both @Blocking and @NonBlocking annotations."); @@ -724,6 +785,8 @@ private boolean isBlocking(MethodInfo info, BlockingDefault defaultValue) { } if (defaultValue == BlockingDefault.BLOCKING) { return true; + } else if (defaultValue == BlockingDefault.RUN_ON_VIRTUAL_THREAD) { + return false; } else if (defaultValue == BlockingDefault.NON_BLOCKING) { return false; } diff --git a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java index 9896a9f67c222..a126eff7fa721 100644 --- a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java +++ b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java @@ -2,6 +2,7 @@ import io.smallrye.common.annotation.Blocking; import io.smallrye.common.annotation.NonBlocking; +import io.smallrye.common.annotation.RunOnVirtualThread; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import java.io.InputStream; @@ -170,6 +171,7 @@ public final class ResteasyReactiveDotNames { public static final DotName INVOCATION_CALLBACK = DotName.createSimple(InvocationCallback.class.getName()); public static final DotName BLOCKING = DotName.createSimple(Blocking.class.getName()); + public static final DotName RUN_ON_VIRTUAL_THREAD = DotName.createSimple(RunOnVirtualThread.class.getName()); public static final DotName NON_BLOCKING = DotName.createSimple(NonBlocking.class.getName()); public static final DotName SUSPENDED = DotName.createSimple(Suspended.class.getName()); public static final DotName PRE_MATCHING = DotName.createSimple(PreMatching.class.getName()); diff --git a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/model/ResourceMethod.java b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/model/ResourceMethod.java index a2ef1d08d7d44..2936dd1aef2b7 100644 --- a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/model/ResourceMethod.java +++ b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/model/ResourceMethod.java @@ -58,6 +58,8 @@ public class ResourceMethod { private boolean blocking; + private boolean runOnVirtualThread; + private boolean suspended; private boolean isSse; @@ -181,11 +183,20 @@ public boolean isBlocking() { return blocking; } + public boolean isRunOnVirtualThread() { + return runOnVirtualThread; + } + public ResourceMethod setBlocking(boolean blocking) { this.blocking = blocking; return this; } + public ResourceMethod setRunOnVirtualThread(boolean runOnVirtualThread) { + this.runOnVirtualThread = runOnVirtualThread; + return this; + } + public boolean isSuspended() { return suspended; } diff --git a/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/ResteasyReactiveDeploymentManager.java b/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/ResteasyReactiveDeploymentManager.java index 3ea13d24975eb..e493ec14bd182 100644 --- a/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/ResteasyReactiveDeploymentManager.java +++ b/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/ResteasyReactiveDeploymentManager.java @@ -418,7 +418,14 @@ public Application get() { } info.setApplicationPath(path); List closeTasks = new ArrayList<>(); - RuntimeDeploymentManager runtimeDeploymentManager = new RuntimeDeploymentManager(info, () -> executor, + Supplier executorSupplier = new Supplier() { + @Override + public Executor get() { + return executor; + } + }; + RuntimeDeploymentManager runtimeDeploymentManager = new RuntimeDeploymentManager(info, executorSupplier, + executorSupplier, closeTasks::add, requestContextFactory, ThreadSetupAction.NOOP, "/"); Deployment deployment = runtimeDeploymentManager.deploy(); deployment.setRuntimeConfiguration(runtimeConfiguration); diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeDeploymentManager.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeDeploymentManager.java index d6b3a0d9bef77..e2d80a613e00d 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeDeploymentManager.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeDeploymentManager.java @@ -53,6 +53,7 @@ public class RuntimeDeploymentManager { public static final ServerRestHandler[] EMPTY_REST_HANDLER_ARRAY = new ServerRestHandler[0]; private final DeploymentInfo info; private final Supplier executorSupplier; + private final Supplier virtualExecutorSupplier; private final Consumer closeTaskHandler; private final RequestContextFactory requestContextFactory; private final ThreadSetupAction threadSetupAction; @@ -62,10 +63,12 @@ public class RuntimeDeploymentManager { public RuntimeDeploymentManager(DeploymentInfo info, Supplier executorSupplier, + Supplier virtualExecutorSupplier, Consumer closeTaskHandler, RequestContextFactory requestContextFactory, ThreadSetupAction threadSetupAction, String rootPath) { this.info = info; this.executorSupplier = executorSupplier; + this.virtualExecutorSupplier = virtualExecutorSupplier; this.closeTaskHandler = closeTaskHandler; this.requestContextFactory = requestContextFactory; this.threadSetupAction = threadSetupAction; @@ -100,6 +103,7 @@ public BeanFactory.BeanInstance apply(Class aClass) { }); List runtimeConfigurableServerRestHandlers = new ArrayList<>(); RuntimeResourceDeployment runtimeResourceDeployment = new RuntimeResourceDeployment(info, executorSupplier, + virtualExecutorSupplier, interceptorDeployment, dynamicEntityWriter, resourceLocatorHandler, requestContextFactory.isDefaultBlocking()); List possibleSubResource = new ArrayList<>(locatableResourceClasses); possibleSubResource.addAll(resourceClasses); //the TCK uses normal resources also as sub resources diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeMappingDeployment.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeMappingDeployment.java index 1d4821dc0e4fd..1232813411f3d 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeMappingDeployment.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeMappingDeployment.java @@ -85,9 +85,11 @@ private void forEachMethodTemplateMap(URITemplate path, List(false, fake.getPath(), fake)); } diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeResourceDeployment.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeResourceDeployment.java index c8e634f48f7b1..1685899412d99 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeResourceDeployment.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeResourceDeployment.java @@ -100,6 +100,7 @@ public class RuntimeResourceDeployment { private final ServerSerialisers serialisers; private final ResteasyReactiveConfig resteasyReactiveConfig; private final Supplier executorSupplier; + private final Supplier virtualExecutorSupplier; private final RuntimeInterceptorDeployment runtimeInterceptorDeployment; private final DynamicEntityWriter dynamicEntityWriter; private final ResourceLocatorHandler resourceLocatorHandler; @@ -108,20 +109,24 @@ public class RuntimeResourceDeployment { */ private final boolean defaultBlocking; private final BlockingHandler blockingHandler; + private final BlockingHandler blockingHandlerVirtualThread; private final ResponseWriterHandler responseWriterHandler; public RuntimeResourceDeployment(DeploymentInfo info, Supplier executorSupplier, + Supplier virtualExecutorSupplier, RuntimeInterceptorDeployment runtimeInterceptorDeployment, DynamicEntityWriter dynamicEntityWriter, ResourceLocatorHandler resourceLocatorHandler, boolean defaultBlocking) { this.info = info; this.serialisers = info.getSerialisers(); this.resteasyReactiveConfig = info.getResteasyReactiveConfig(); this.executorSupplier = executorSupplier; + this.virtualExecutorSupplier = virtualExecutorSupplier; this.runtimeInterceptorDeployment = runtimeInterceptorDeployment; this.dynamicEntityWriter = dynamicEntityWriter; this.resourceLocatorHandler = resourceLocatorHandler; this.defaultBlocking = defaultBlocking; this.blockingHandler = new BlockingHandler(executorSupplier); + this.blockingHandlerVirtualThread = new BlockingHandler(virtualExecutorSupplier); this.responseWriterHandler = new ResponseWriterHandler(dynamicEntityWriter); } @@ -197,11 +202,22 @@ public RuntimeResource buildResourceMethod(ResourceClass clazz, Optional blockingHandlerIndex = Optional.empty(); if (!defaultBlocking) { if (method.isBlocking()) { - handlers.add(blockingHandler); + if (method.isRunOnVirtualThread()) { + handlers.add(blockingHandlerVirtualThread); + } else { + handlers.add(blockingHandler); + } blockingHandlerIndex = Optional.of(handlers.size() - 1); score.add(ScoreSystem.Category.Execution, ScoreSystem.Diagnostic.ExecutionBlocking); } else { - handlers.add(NonBlockingHandler.INSTANCE); + if (method.isRunOnVirtualThread()) { + //should not happen + log.error("a method was both non blocking and @RunOnVirtualThread, it is now considered " + + "@RunOnVirtual and blocking"); + handlers.add(blockingHandlerVirtualThread); + } else { + handlers.add(NonBlockingHandler.INSTANCE); + } score.add(ScoreSystem.Category.Execution, ScoreSystem.Diagnostic.ExecutionNonBlocking); } } @@ -461,7 +477,7 @@ public RuntimeResource buildResourceMethod(ResourceClass clazz, method.getProduces() == null ? null : serverMediaType, consumesMediaTypes, invoker, clazz.getFactory(), handlers.toArray(EMPTY_REST_HANDLER_ARRAY), method.getName(), parameterDeclaredTypes, - effectiveReturnType, method.isBlocking(), resourceClass, + effectiveReturnType, method.isBlocking(), method.isRunOnVirtualThread(), resourceClass, lazyMethod, pathParameterIndexes, info.isDevelopmentMode() ? score : null, streamElementType, clazz.resourceExceptionMapper()); diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/VirtualThreadNonBlockingHandler.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/VirtualThreadNonBlockingHandler.java new file mode 100644 index 0000000000000..1d0295671411a --- /dev/null +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/VirtualThreadNonBlockingHandler.java @@ -0,0 +1,37 @@ +package org.jboss.resteasy.reactive.server.handlers; + +import java.lang.reflect.Constructor; +import java.util.concurrent.*; +import org.jboss.resteasy.reactive.server.core.BlockingOperationSupport; +import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext; +import org.jboss.resteasy.reactive.server.spi.ServerRestHandler; + +//should not be used anymore, but might be in the future if using an event-loop as a carrier doesn't cause deadlocks anymore +public class VirtualThreadNonBlockingHandler implements ServerRestHandler { + private Executor executor; + private static volatile ConcurrentHashMap eventLoops = new ConcurrentHashMap<>(); + + public VirtualThreadNonBlockingHandler() { + } + + @Override + public void handle(ResteasyReactiveRequestContext requestContext) throws Exception { + if (BlockingOperationSupport.isBlockingAllowed()) { + return; //already dispatched + } + + if (!eventLoops.containsKey(Thread.currentThread().toString())) { + var vtf = Class.forName("java.lang.ThreadBuilders").getDeclaredClasses()[0]; + Constructor constructor = vtf.getDeclaredConstructors()[0]; + constructor.setAccessible(true); + ThreadFactory tf = (ThreadFactory) constructor.newInstance( + new Object[] { requestContext.getContextExecutor(), "quarkus-virtual-factory", 0, 0, + null }); + var exec = (Executor) Executors.class.getMethod("newThreadPerTaskExecutor", ThreadFactory.class) + .invoke(this, tf); + eventLoops.put(Thread.currentThread().toString(), exec); + } + requestContext.suspend(); + requestContext.resume(eventLoops.get(Thread.currentThread().toString())); + } +} diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/mapping/RuntimeResource.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/mapping/RuntimeResource.java index adead56bfd355..7cf24c36dd226 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/mapping/RuntimeResource.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/mapping/RuntimeResource.java @@ -28,6 +28,7 @@ public class RuntimeResource { private final Class[] parameterTypes; private final Type returnType; private final boolean blocking; + private final boolean runOnVirtualThread; private final Class resourceClass; private final ResteasyReactiveResourceInfo lazyMethod; private final Map pathParameterIndexes; @@ -40,7 +41,8 @@ public RuntimeResource(String httpMethod, URITemplate path, URITemplate classPat EndpointInvoker invoker, BeanFactory endpointFactory, ServerRestHandler[] handlerChain, String javaMethodName, Class[] parameterTypes, - Type returnType, boolean blocking, Class resourceClass, ResteasyReactiveResourceInfo lazyMethod, + Type returnType, boolean blocking, boolean runOnVirtualThread, Class resourceClass, + ResteasyReactiveResourceInfo lazyMethod, Map pathParameterIndexes, Map> score, MediaType streamElementType, Map, ResourceExceptionMapper> classExceptionMappers) { @@ -56,6 +58,7 @@ public RuntimeResource(String httpMethod, URITemplate path, URITemplate classPat this.parameterTypes = parameterTypes; this.returnType = returnType; this.blocking = blocking; + this.runOnVirtualThread = runOnVirtualThread; this.resourceClass = resourceClass; this.lazyMethod = lazyMethod; this.pathParameterIndexes = pathParameterIndexes; @@ -104,6 +107,10 @@ public boolean isBlocking() { return blocking; } + public boolean isRunOnVirtualThread() { + return runOnVirtualThread; + } + public Class getResourceClass() { return resourceClass; } diff --git a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/framework/ResteasyReactiveUnitTest.java b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/framework/ResteasyReactiveUnitTest.java index b2ed0f0cd9c7f..c444d75028be5 100644 --- a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/framework/ResteasyReactiveUnitTest.java +++ b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/framework/ResteasyReactiveUnitTest.java @@ -17,6 +17,7 @@ import io.vertx.mutiny.core.file.AsyncFile; import java.io.Closeable; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.MalformedURLException; import java.net.URL; @@ -37,6 +38,7 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -159,6 +161,19 @@ public ResteasyReactiveUnitTest setMaxFormAttributeSize(int maxFormAttributeSize return this; } + private static Executor setVirtualThreadExecutor() { + Executor exec = Executors.newSingleThreadExecutor(); + try { + exec = (Executor) Class.forName("java.util.concurrent.Executors") + .getMethod("newVirtualThreadPerTaskExecutor") + .invoke(null); + } catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException + | IllegalAccessException e) { + e.printStackTrace(); + } + return exec; + } + public ResteasyReactiveUnitTest setExpectedException(Class expectedException) { return assertException(t -> { Throwable i = t;