diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java index 1eae706ba9377..35d88842ec246 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java @@ -99,9 +99,8 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) { @Override public void flush(ChannelHandlerContext ctx) { assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext()); - Channel channel = ctx.channel(); - if (channel.isWritable() || channel.isActive() == false) { - doFlush(ctx); + if (doFlush(ctx) == false) { + ctx.flush(); } } @@ -113,16 +112,14 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); } - private void doFlush(ChannelHandlerContext ctx) { + private boolean doFlush(ChannelHandlerContext ctx) { assert ctx.executor().inEventLoop(); final Channel channel = ctx.channel(); if (channel.isActive() == false) { - if (currentWrite != null) { - currentWrite.promise.tryFailure(new ClosedChannelException()); - } failQueuedWrites(); - return; + return false; } + boolean needsFlush = true; while (channel.isWritable()) { if (currentWrite == null) { currentWrite = queuedWrites.poll(); @@ -131,11 +128,6 @@ private void doFlush(ChannelHandlerContext ctx) { break; } final WriteOperation write = currentWrite; - if (write.buf.readableBytes() == 0) { - write.promise.trySuccess(); - currentWrite = null; - continue; - } final int readableBytes = write.buf.readableBytes(); final int bufferSize = Math.min(readableBytes, 1 << 18); final int readerIndex = write.buf.readerIndex(); @@ -148,7 +140,8 @@ private void doFlush(ChannelHandlerContext ctx) { writeBuffer = write.buf; } final ChannelFuture writeFuture = ctx.write(writeBuffer); - if (sliced == false || write.buf.readableBytes() == 0) { + needsFlush = true; + if (sliced == false) { currentWrite = null; writeFuture.addListener(future -> { assert ctx.executor().inEventLoop(); @@ -166,18 +159,30 @@ private void doFlush(ChannelHandlerContext ctx) { } }); } - ctx.flush(); - if (channel.isActive() == false) { - failQueuedWrites(); - return; + if (channel.isWritable() == false) { + // try flushing to make channel writable again, loop will only continue if channel becomes writable again + ctx.flush(); + needsFlush = false; } } + if (needsFlush) { + ctx.flush(); + } + if (channel.isActive() == false) { + failQueuedWrites(); + } + return true; } private void failQueuedWrites() { + if (currentWrite != null) { + final WriteOperation current = currentWrite; + currentWrite = null; + current.failAsClosedChannel(); + } WriteOperation queuedWrite; while ((queuedWrite = queuedWrites.poll()) != null) { - queuedWrite.promise.tryFailure(new ClosedChannelException()); + queuedWrite.failAsClosedChannel(); } } @@ -191,5 +196,10 @@ private static final class WriteOperation { this.buf = buf; this.promise = promise; } + + void failAsClosedChannel() { + promise.tryFailure(new ClosedChannelException()); + buf.release(); + } } }