diff --git a/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/blocking/MultiThreadedBlockingImplTest.java b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/blocking/MultiThreadedBlockingImplTest.java new file mode 100644 index 0000000000000..7f80c59068859 --- /dev/null +++ b/extensions/grpc/deployment/src/test/java/io/quarkus/grpc/server/blocking/MultiThreadedBlockingImplTest.java @@ -0,0 +1,88 @@ +package io.quarkus.grpc.server.blocking; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.acme.Request; +import org.acme.Response; +import org.acme.StandardBlockingGrpcServiceGrpc; +import org.jboss.logging.Logger; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.grpc.stub.StreamObserver; +import io.quarkus.grpc.GrpcClient; +import io.quarkus.grpc.GrpcService; +import io.quarkus.grpc.runtime.devmode.GrpcServices; +import io.quarkus.test.QuarkusUnitTest; +import io.smallrye.common.annotation.Blocking; +import io.vertx.core.impl.ConcurrentHashSet; + +public class MultiThreadedBlockingImplTest { + + private static final Logger logger = Logger.getLogger(GrpcServices.class); + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .setFlatClassPath(true) + .setArchiveProducer( + () -> ShrinkWrap.create(JavaArchive.class) + .addPackage(StandardBlocking.class.getPackage()) + .addPackage( + StandardBlockingGrpcServiceGrpc.StandardBlockingGrpcServiceImplBase.class.getPackage())); + + @GrpcClient + StandardBlockingGrpcServiceGrpc.StandardBlockingGrpcServiceBlockingStub client; + + static ExecutorService executor = Executors.newCachedThreadPool(); + + @AfterAll + static void cleanup() { + executor.shutdown(); + } + + @Test + void testTheBlockingCallsCanBeDispatchedOnMultipleThreads() throws InterruptedException { + int count = 100; + ConcurrentHashSet threads = new ConcurrentHashSet<>(); + CountDownLatch latch = new CountDownLatch(count); + for (int i = 0; i < count; i++) { + int id = i; + executor.submit(() -> { + threads.add(invokeService(id)); + latch.countDown(); + }); + } + + Assertions.assertTrue(latch.await(10, TimeUnit.SECONDS)); + Assertions.assertTrue(threads.size() > 1); + } + + String invokeService(int id) { + return client.invoke(Request.newBuilder().setId(id).build()).getThread(); + } + + @GrpcService + @Blocking + static class StandardBlocking extends StandardBlockingGrpcServiceGrpc.StandardBlockingGrpcServiceImplBase { + @Override + public void invoke(Request request, StreamObserver responseObserver) { + try { + Thread.sleep(Duration.ofSeconds(2).toMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + responseObserver.onNext(Response.newBuilder() + .setId(request.getId()).setThread(Thread.currentThread().getName()).build()); + responseObserver.onCompleted(); + } + } +} diff --git a/extensions/grpc/deployment/src/test/proto/blocking/BlockingGrpcService.proto b/extensions/grpc/deployment/src/test/proto/blocking/BlockingGrpcService.proto new file mode 100644 index 0000000000000..b3521ce857420 --- /dev/null +++ b/extensions/grpc/deployment/src/test/proto/blocking/BlockingGrpcService.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "org.acme"; + +package hello; + +service StandardBlockingGrpcService { + rpc Invoke (Request) returns (Response) {} +} + +message Request { + int32 id = 1; +} + +message Response { + int32 id = 1; + string thread = 2; +} \ No newline at end of file diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingServerInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingServerInterceptor.java index 047d67adce374..3a467c4e320e1 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingServerInterceptor.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingServerInterceptor.java @@ -208,7 +208,7 @@ private void executeBlockingWithRequestContext(Consumer { + vertx.executeBlocking(blockingHandler, false).onComplete(p -> { Consumer> next = incomingEvents.poll(); if (next != null) { executeBlockingWithRequestContext(next);