diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/ChannelContext.java b/libs/nio/src/main/java/org/elasticsearch/nio/ChannelContext.java
index e3702c2880a18..a7cb21d95f537 100644
--- a/libs/nio/src/main/java/org/elasticsearch/nio/ChannelContext.java
+++ b/libs/nio/src/main/java/org/elasticsearch/nio/ChannelContext.java
@@ -66,7 +66,7 @@ protected void setSelectionKey(SelectionKey selectionKey) {
* @throws IOException during channel / context close
*/
public void closeFromSelector() throws IOException {
- if (closeContext.isDone() == false) {
+ if (isOpen()) {
try {
rawChannel.close();
closeContext.complete(null);
diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java b/libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java
index 3c52423c7aff3..87a2489fdbc27 100644
--- a/libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java
+++ b/libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java
@@ -159,8 +159,7 @@ protected void listenerException(Exception exception) {
}
/**
- * This method is called after ready events (READ, ACCEPT, WRITE, CONNECT) have been handled for a
- * channel.
+ * This method is called after events (READ, WRITE, CONNECT) have been handled for a channel.
*
* @param context that was handled
*/
diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java b/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java
index ab6709bcc5bd4..9f82cc2c50d44 100644
--- a/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java
+++ b/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java
@@ -43,9 +43,6 @@
* {@link #runLoop()}, the selector will run until {@link #close()} is called. This instance handles closing
* of channels. Users should call {@link #queueChannelClose(NioChannel)} to schedule a channel for close by
* this selector.
- *
- * Children of this class should implement the specific {@link #processKey(SelectionKey)},
- * {@link #preSelect()}, and {@link #cleanup()} functionality.
*/
public class NioSelector implements Closeable {
@@ -65,7 +62,7 @@ public NioSelector(EventHandler eventHandler) throws IOException {
this(eventHandler, Selector.open());
}
- public NioSelector(EventHandler eventHandler, Selector selector) throws IOException {
+ public NioSelector(EventHandler eventHandler, Selector selector) {
this.selector = selector;
this.eventHandler = eventHandler;
}
@@ -165,7 +162,7 @@ void singleLoop() {
}
void cleanupAndCloseChannels() {
- cleanup();
+ cleanupPendingWrites();
channelsToClose.addAll(channelsToRegister);
channelsToRegister.clear();
channelsToClose.addAll(selector.keys().stream().map(sk -> (ChannelContext>) sk.attachment()).collect(Collectors.toList()));
@@ -234,16 +231,6 @@ void preSelect() {
handleQueuedWrites();
}
- /**
- * Called once as the selector is being closed.
- */
- void cleanup() {
- WriteOperation op;
- while ((op = queuedWrites.poll()) != null) {
- executeFailedListener(op.getListener(), new ClosedSelectorException());
- }
- }
-
/**
* Queues a write operation to be handled by the event loop. This can be called by any thread and is the
* api available for non-selector threads to schedule writes.
@@ -284,20 +271,31 @@ public void scheduleForRegistration(NioChannel channel) {
}
/**
- * Queues a write operation directly in a channel's buffer. Channel buffers are only safe to be accessed
- * by the selector thread. As a result, this method should only be called by the selector thread.
+ * Queues a write operation directly in a channel's buffer. If this channel does not have pending writes
+ * already, the channel will be flushed. Channel buffers are only safe to be accessed by the selector
+ * thread. As a result, this method should only be called by the selector thread. If this channel does
+ * not have pending writes already, the channel will be flushed.
*
* @param writeOperation to be queued in a channel's buffer
*/
- public void queueWriteInChannelBuffer(WriteOperation writeOperation) {
+ public void writeToChannel(WriteOperation writeOperation) {
assertOnSelectorThread();
SocketChannelContext context = writeOperation.getChannel();
+ // If the channel does not currently have anything that is ready to flush, we should flush after
+ // the write operation is queued.
+ boolean shouldFlushAfterQueuing = context.readyForFlush() == false;
try {
SelectionKeyUtils.setWriteInterested(context.getSelectionKey());
context.queueWriteOperation(writeOperation);
} catch (Exception e) {
+ shouldFlushAfterQueuing = false;
executeFailedListener(writeOperation.getListener(), e);
}
+
+ if (shouldFlushAfterQueuing) {
+ handleWrite(context);
+ eventHandler.postHandling(context);
+ }
}
/**
@@ -332,6 +330,13 @@ public void executeFailedListener(BiConsumer listener, Excepti
}
}
+ private void cleanupPendingWrites() {
+ WriteOperation op;
+ while ((op = queuedWrites.poll()) != null) {
+ executeFailedListener(op.getListener(), new ClosedSelectorException());
+ }
+ }
+
private void wakeup() {
// TODO: Do we need the wakeup optimizations that some other libraries use?
selector.wakeup();
@@ -394,7 +399,7 @@ private void handleQueuedWrites() {
WriteOperation writeOperation;
while ((writeOperation = queuedWrites.poll()) != null) {
if (writeOperation.getChannel().isOpen()) {
- queueWriteInChannelBuffer(writeOperation);
+ writeToChannel(writeOperation);
} else {
executeFailedListener(writeOperation.getListener(), new ClosedChannelException());
}
diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java b/libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java
index 53be0e7f89fe0..53fb0da432f48 100644
--- a/libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java
+++ b/libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java
@@ -135,7 +135,7 @@ public void sendMessage(Object message, BiConsumer listener) {
return;
}
- selector.queueWriteInChannelBuffer(writeOperation);
+ selector.writeToChannel(writeOperation);
}
public void queueWriteOperation(WriteOperation writeOperation) {
@@ -164,7 +164,7 @@ protected FlushOperation getPendingFlush() {
@Override
public void closeFromSelector() throws IOException {
getSelector().assertOnSelectorThread();
- if (channel.isOpen()) {
+ if (isOpen()) {
ArrayList closingExceptions = new ArrayList<>(3);
try {
super.closeFromSelector();
diff --git a/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java b/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java
index dd3fea8bf50e8..bd5f1c1eb346f 100644
--- a/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java
+++ b/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java
@@ -262,11 +262,28 @@ public void testQueueWriteSuccessful() throws Exception {
public void testQueueDirectlyInChannelBufferSuccessful() throws Exception {
WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);
- assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) == 0);
+ assertEquals(0, (selectionKey.interestOps() & SelectionKey.OP_WRITE));
- selector.queueWriteInChannelBuffer(writeOperation);
+ when(channelContext.readyForFlush()).thenReturn(true);
+ selector.writeToChannel(writeOperation);
verify(channelContext).queueWriteOperation(writeOperation);
+ verify(eventHandler, times(0)).handleWrite(channelContext);
+ verify(eventHandler, times(0)).postHandling(channelContext);
+ assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0);
+ }
+
+ public void testShouldFlushIfNoPendingFlushes() throws Exception {
+ WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);
+
+ assertEquals(0, (selectionKey.interestOps() & SelectionKey.OP_WRITE));
+
+ when(channelContext.readyForFlush()).thenReturn(false);
+ selector.writeToChannel(writeOperation);
+
+ verify(channelContext).queueWriteOperation(writeOperation);
+ verify(eventHandler).handleWrite(channelContext);
+ verify(eventHandler).postHandling(channelContext);
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0);
}
@@ -277,10 +294,13 @@ public void testQueueDirectlyInChannelBufferSelectionKeyThrowsException() throws
CancelledKeyException cancelledKeyException = new CancelledKeyException();
when(channelContext.getSelectionKey()).thenReturn(selectionKey);
+ when(channelContext.readyForFlush()).thenReturn(false);
when(selectionKey.interestOps(anyInt())).thenThrow(cancelledKeyException);
- selector.queueWriteInChannelBuffer(writeOperation);
+ selector.writeToChannel(writeOperation);
verify(channelContext, times(0)).queueWriteOperation(writeOperation);
+ verify(eventHandler, times(0)).handleWrite(channelContext);
+ verify(eventHandler, times(0)).postHandling(channelContext);
verify(listener).accept(null, cancelledKeyException);
}
diff --git a/libs/nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java b/libs/nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java
index fdb4a77b922e2..dee50724f34c9 100644
--- a/libs/nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java
+++ b/libs/nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java
@@ -170,7 +170,7 @@ public void testSendMessageFromSameThreadIsQueuedInChannel() {
when(readWriteHandler.createWriteOperation(context, buffers, listener)).thenReturn(writeOperation);
context.sendMessage(buffers, listener);
- verify(selector).queueWriteInChannelBuffer(writeOpCaptor.capture());
+ verify(selector).writeToChannel(writeOpCaptor.capture());
WriteOperation writeOp = writeOpCaptor.getValue();
assertSame(writeOperation, writeOp);
diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java
index 95af766515777..2170c55ee0192 100644
--- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java
+++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java
@@ -145,7 +145,7 @@ public void closeChannel() {
selector.queueWrite(writeOperation);
return;
}
- selector.queueWriteInChannelBuffer(writeOperation);
+ selector.writeToChannel(writeOperation);
}
}
diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java
index 14a22d300d12d..bfee50b65bff4 100644
--- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java
+++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java
@@ -345,7 +345,7 @@ public void testInitiateCloseFromSameThreadSchedulesCloseNotify() {
context.closeChannel();
ArgumentCaptor captor = ArgumentCaptor.forClass(WriteOperation.class);
- verify(selector).queueWriteInChannelBuffer(captor.capture());
+ verify(selector).writeToChannel(captor.capture());
context.queueWriteOperation(captor.getValue());
verify(sslDriver).initiateClose();