Skip to content

Commit

Permalink
Merge pull request eclipse-vertx#5377 from eclipse-vertx/resume-when-…
Browse files Browse the repository at this point in the history
…read-in-progress

Various fixes
  • Loading branch information
vietj authored Oct 28, 2024
2 parents f5d1cf1 + 4741bd0 commit e37ebff
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -463,16 +463,18 @@ public final void doResume() {
return;
}
paused = false;
if (pending != null) {
assert !read;
if (pending != null && !pending.isEmpty()) {
boolean end = !read;
read = true;
try {
Object msg;
while (!paused && (msg = pending.poll()) != null) {
handleMessage(msg);
}
} finally {
endReadAndFlush();
if (end) {
endReadAndFlush();
}
if (pending.isEmpty() && !autoRead) {
autoRead = true;
chctx.channel().config().setAutoRead(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,7 @@ public Future<?> start() throws Exception {
.compose(HttpClientRequest::send)
.await();
} catch (Throwable e) {
if (e instanceof InterruptedException) {
interruptedThreads.add(Thread.currentThread());
}
interruptedThreads.add(Thread.currentThread());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,4 +634,30 @@ public void testPauseWhenResuming() {
ch.runPendingTasks();
assertEquals(2, count.get());
}

@Test
public void testResumeWhenReadInProgress() {
MessageFactory factory = new MessageFactory();
EmbeddedChannel ch = new EmbeddedChannel();
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(VertxHandler.create(chctx -> new TestConnection(chctx)));
TestConnection connection = (TestConnection) pipeline.get(VertxHandler.class).getConnection();
AtomicInteger count = new AtomicInteger();
connection.handler = event -> count.incrementAndGet();
connection.pause();
pipeline.fireChannelRead(factory.next());
assertEquals(0, count.get());
Object expected = new Object();
connection.write(expected, false, ch.newPromise());
connection.resume();
assertEquals(0, count.get());
assertTrue(ch.hasPendingTasks());
ch.runPendingTasks();
assertEquals(1, count.get());
Object outbound = ch.readOutbound();
assertNull(outbound);
pipeline.fireChannelReadComplete();
outbound = ch.readOutbound();
assertSame(expected, outbound);
}
}

0 comments on commit e37ebff

Please sign in to comment.