From a9874d53f9eb5d9e5db5dbaab487a81f506248a6 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Mon, 28 Oct 2024 09:10:28 +0100 Subject: [PATCH 1/2] Fix VirtualThreadDeploymentTest#testHttpClientStopRequestInProgress --- .../vertx/tests/deployment/VirtualThreadDeploymentTest.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/vertx-core/src/test/java/io/vertx/tests/deployment/VirtualThreadDeploymentTest.java b/vertx-core/src/test/java/io/vertx/tests/deployment/VirtualThreadDeploymentTest.java index 525e02d4016..5ea96989b42 100644 --- a/vertx-core/src/test/java/io/vertx/tests/deployment/VirtualThreadDeploymentTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/deployment/VirtualThreadDeploymentTest.java @@ -192,9 +192,7 @@ public Future start() throws Exception { .compose(HttpClientRequest::send) .await(); } catch (Throwable e) { - if (e instanceof InterruptedException) { - interruptedThreads.add(Thread.currentThread()); - } + interruptedThreads.add(Thread.currentThread()); } }); } From 4741bd0f41d6849d9c502e394385ea4ec1eb6430 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Mon, 28 Oct 2024 10:09:35 +0100 Subject: [PATCH 2/2] Fix VertxConnection resume when there is a read in progress bug. Motivation: VertxConnection resume incorrectly assumes no event loop tasks can be executed when there is a channel read in progress. This is somehow true for nio/epoll/kqueue but this is incorrect for io_uring. Changes: Handle properly the case where a resume task executes and there is a read in progress. --- .../vertx/core/net/impl/VertxConnection.java | 8 +++--- .../vertx/tests/net/ConnectionBaseTest.java | 26 +++++++++++++++++++ 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/vertx-core/src/main/java/io/vertx/core/net/impl/VertxConnection.java b/vertx-core/src/main/java/io/vertx/core/net/impl/VertxConnection.java index 5d93897e3cb..6ed702c6ea7 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/impl/VertxConnection.java +++ b/vertx-core/src/main/java/io/vertx/core/net/impl/VertxConnection.java @@ -463,8 +463,8 @@ public final void doResume() { return; } paused = false; - if (pending != null) { - assert !read; + if (pending != null && !pending.isEmpty()) { + boolean end = !read; read = true; try { Object msg; @@ -472,7 +472,9 @@ public final void doResume() { handleMessage(msg); } } finally { - endReadAndFlush(); + if (end) { + endReadAndFlush(); + } if (pending.isEmpty() && !autoRead) { autoRead = true; chctx.channel().config().setAutoRead(true); diff --git a/vertx-core/src/test/java/io/vertx/tests/net/ConnectionBaseTest.java b/vertx-core/src/test/java/io/vertx/tests/net/ConnectionBaseTest.java index 14dec4106f6..91e7daad049 100644 --- a/vertx-core/src/test/java/io/vertx/tests/net/ConnectionBaseTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/net/ConnectionBaseTest.java @@ -634,4 +634,30 @@ public void testPauseWhenResuming() { ch.runPendingTasks(); assertEquals(2, count.get()); } + + @Test + public void testResumeWhenReadInProgress() { + MessageFactory factory = new MessageFactory(); + EmbeddedChannel ch = new EmbeddedChannel(); + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(VertxHandler.create(chctx -> new TestConnection(chctx))); + TestConnection connection = (TestConnection) pipeline.get(VertxHandler.class).getConnection(); + AtomicInteger count = new AtomicInteger(); + connection.handler = event -> count.incrementAndGet(); + connection.pause(); + pipeline.fireChannelRead(factory.next()); + assertEquals(0, count.get()); + Object expected = new Object(); + connection.write(expected, false, ch.newPromise()); + connection.resume(); + assertEquals(0, count.get()); + assertTrue(ch.hasPendingTasks()); + ch.runPendingTasks(); + assertEquals(1, count.get()); + Object outbound = ch.readOutbound(); + assertNull(outbound); + pipeline.fireChannelReadComplete(); + outbound = ch.readOutbound(); + assertSame(expected, outbound); + } }