Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Immediately flush channel after writing to buffer #31301

Merged
merged 4 commits into from
Jun 13, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected void setSelectionKey(SelectionKey selectionKey) {
* @throws IOException during channel / context close
*/
public void closeFromSelector() throws IOException {
if (closeContext.isDone() == false) {
if (isOpen()) {
try {
rawChannel.close();
closeContext.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,11 @@ protected void listenerException(Exception exception) {
}

/**
* This method is called after ready events (READ, ACCEPT, WRITE, CONNECT) have been handled for a
* channel.
* This method is called after events (READ, WRITE, CONNECT) have been handled for a channel.
*
* @param context that was handled
*/
protected void postHandling(SocketChannelContext context) {
protected void postSocketChannelHandling(SocketChannelContext context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I liked postHandling better since SocketChannel is kind of part of the signature due to the argument? WDYT?

if (context.selectorShouldClose()) {
handleClose(context);
} else {
Expand Down
41 changes: 22 additions & 19 deletions libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@
* {@link #runLoop()}, the selector will run until {@link #close()} is called. This instance handles closing
* of channels. Users should call {@link #queueChannelClose(NioChannel)} to schedule a channel for close by
* this selector.
* <p>
* Children of this class should implement the specific {@link #processKey(SelectionKey)},
* {@link #preSelect()}, and {@link #cleanup()} functionality.
*/
public class NioSelector implements Closeable {

Expand Down Expand Up @@ -165,7 +162,7 @@ void singleLoop() {
}

void cleanupAndCloseChannels() {
cleanup();
cleanupPendingWrites();
channelsToClose.addAll(channelsToRegister);
channelsToRegister.clear();
channelsToClose.addAll(selector.keys().stream().map(sk -> (ChannelContext<?>) sk.attachment()).collect(Collectors.toList()));
Expand Down Expand Up @@ -219,7 +216,7 @@ void processKey(SelectionKey selectionKey) {
handleRead(channelContext);
}
}
eventHandler.postHandling(channelContext);
eventHandler.postSocketChannelHandling(channelContext);
}

}
Expand All @@ -234,16 +231,6 @@ void preSelect() {
handleQueuedWrites();
}

/**
* Called once as the selector is being closed.
*/
void cleanup() {
WriteOperation op;
while ((op = queuedWrites.poll()) != null) {
executeFailedListener(op.getListener(), new ClosedSelectorException());
}
}

/**
* Queues a write operation to be handled by the event loop. This can be called by any thread and is the
* api available for non-selector threads to schedule writes.
Expand Down Expand Up @@ -284,20 +271,29 @@ public void scheduleForRegistration(NioChannel channel) {
}

/**
* Queues a write operation directly in a channel's buffer. Channel buffers are only safe to be accessed
* by the selector thread. As a result, this method should only be called by the selector thread.
* Queues a write operation directly in a channel's buffer. If this channel does not have pending writes
* already, the channel will be flushed. Channel buffers are only safe to be accessed by the selector
* thread. As a result, this method should only be called by the selector thread. If this channel does
* not have pending writes already, the channel will be flushed.
*
* @param writeOperation to be queued in a channel's buffer
*/
public void queueWriteInChannelBuffer(WriteOperation writeOperation) {
public void writeToChannel(WriteOperation writeOperation) {
assertOnSelectorThread();
SocketChannelContext context = writeOperation.getChannel();
boolean shouldFlush = context.readyForFlush() == false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic doesn't look intuitive to me. why should we flush if the context is not ready for it? maybe it's a naming issue?

try {
SelectionKeyUtils.setWriteInterested(context.getSelectionKey());
context.queueWriteOperation(writeOperation);
} catch (Exception e) {
shouldFlush = false;
executeFailedListener(writeOperation.getListener(), e);
}

if (shouldFlush) {
handleWrite(context);
eventHandler.postSocketChannelHandling(context);
}
}

/**
Expand Down Expand Up @@ -332,6 +328,13 @@ public <V> void executeFailedListener(BiConsumer<V, Exception> listener, Excepti
}
}

private void cleanupPendingWrites() {
WriteOperation op;
while ((op = queuedWrites.poll()) != null) {
executeFailedListener(op.getListener(), new ClosedSelectorException());
}
}

private void wakeup() {
// TODO: Do we need the wakeup optimizations that some other libraries use?
selector.wakeup();
Expand Down Expand Up @@ -394,7 +397,7 @@ private void handleQueuedWrites() {
WriteOperation writeOperation;
while ((writeOperation = queuedWrites.poll()) != null) {
if (writeOperation.getChannel().isOpen()) {
queueWriteInChannelBuffer(writeOperation);
writeToChannel(writeOperation);
} else {
executeFailedListener(writeOperation.getListener(), new ClosedChannelException());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void sendMessage(Object message, BiConsumer<Void, Exception> listener) {
return;
}

selector.queueWriteInChannelBuffer(writeOperation);
selector.writeToChannel(writeOperation);
}

public void queueWriteOperation(WriteOperation writeOperation) {
Expand Down Expand Up @@ -164,7 +164,7 @@ protected FlushOperation getPendingFlush() {
@Override
public void closeFromSelector() throws IOException {
getSelector().assertOnSelectorThread();
if (channel.isOpen()) {
if (isOpen()) {
ArrayList<IOException> closingExceptions = new ArrayList<>(3);
try {
super.closeFromSelector();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public void testPostHandlingCallWillCloseTheChannelIfReady() throws IOException

when(channel.getContext()).thenReturn(context);
when(context.selectorShouldClose()).thenReturn(true);
handler.postHandling(context);
handler.postSocketChannelHandling(context);

verify(context).closeFromSelector();
}
Expand All @@ -207,7 +207,7 @@ public void testPostHandlingCallWillNotCloseTheChannelIfNotReady() throws IOExce
NioSocketChannel channel = mock(NioSocketChannel.class);
when(channel.getContext()).thenReturn(context);

handler.postHandling(context);
handler.postSocketChannelHandling(context);

verify(context, times(0)).closeFromSelector();
}
Expand All @@ -222,7 +222,7 @@ public void testPostHandlingWillAddWriteIfNecessary() throws IOException {
when(channel.getContext()).thenReturn(context);

assertEquals(SelectionKey.OP_READ, selectionKey.interestOps());
handler.postHandling(context);
handler.postSocketChannelHandling(context);
assertEquals(SelectionKey.OP_READ | SelectionKey.OP_WRITE, selectionKey.interestOps());
}

Expand All @@ -237,7 +237,7 @@ public void testPostHandlingWillRemoveWriteIfNecessary() throws IOException {


assertEquals(SelectionKey.OP_READ | SelectionKey.OP_WRITE, key.interestOps());
handler.postHandling(context);
handler.postSocketChannelHandling(context);
assertEquals(SelectionKey.OP_READ, key.interestOps());
}

Expand Down
28 changes: 24 additions & 4 deletions libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,28 @@ public void testQueueWriteSuccessful() throws Exception {
public void testQueueDirectlyInChannelBufferSuccessful() throws Exception {
WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);

assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) == 0);
assertEquals(0, (selectionKey.interestOps() & SelectionKey.OP_WRITE));

when(channelContext.readyForFlush()).thenReturn(true);
selector.writeToChannel(writeOperation);

verify(channelContext).queueWriteOperation(writeOperation);
verify(eventHandler, times(0)).handleWrite(channelContext);
verify(eventHandler, times(0)).postSocketChannelHandling(channelContext);
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0);
}

public void testShouldFlushIfNoPendingFlushes() throws Exception {
WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);

selector.queueWriteInChannelBuffer(writeOperation);
assertEquals(0, (selectionKey.interestOps() & SelectionKey.OP_WRITE));

when(channelContext.readyForFlush()).thenReturn(false);
selector.writeToChannel(writeOperation);

verify(channelContext).queueWriteOperation(writeOperation);
verify(eventHandler).handleWrite(channelContext);
verify(eventHandler).postSocketChannelHandling(channelContext);
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0);
}

Expand All @@ -277,10 +294,13 @@ public void testQueueDirectlyInChannelBufferSelectionKeyThrowsException() throws
CancelledKeyException cancelledKeyException = new CancelledKeyException();

when(channelContext.getSelectionKey()).thenReturn(selectionKey);
when(channelContext.readyForFlush()).thenReturn(false);
when(selectionKey.interestOps(anyInt())).thenThrow(cancelledKeyException);
selector.queueWriteInChannelBuffer(writeOperation);
selector.writeToChannel(writeOperation);

verify(channelContext, times(0)).queueWriteOperation(writeOperation);
verify(eventHandler, times(0)).handleWrite(channelContext);
verify(eventHandler, times(0)).postSocketChannelHandling(channelContext);
verify(listener).accept(null, cancelledKeyException);
}

Expand Down Expand Up @@ -373,7 +393,7 @@ public void testWillCallPostHandleAfterChannelHandling() throws Exception {

verify(eventHandler).handleWrite(channelContext);
verify(eventHandler).handleRead(channelContext);
verify(eventHandler).postHandling(channelContext);
verify(eventHandler).postSocketChannelHandling(channelContext);
}

public void testCleanup() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public void testSendMessageFromSameThreadIsQueuedInChannel() {
when(readWriteHandler.createWriteOperation(context, buffers, listener)).thenReturn(writeOperation);
context.sendMessage(buffers, listener);

verify(selector).queueWriteInChannelBuffer(writeOpCaptor.capture());
verify(selector).writeToChannel(writeOpCaptor.capture());
WriteOperation writeOp = writeOpCaptor.getValue();

assertSame(writeOperation, writeOp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void closeChannel() {
selector.queueWrite(writeOperation);
return;
}
selector.queueWriteInChannelBuffer(writeOperation);
selector.writeToChannel(writeOperation);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ public void testInitiateCloseFromSameThreadSchedulesCloseNotify() {
context.closeChannel();

ArgumentCaptor<WriteOperation> captor = ArgumentCaptor.forClass(WriteOperation.class);
verify(selector).queueWriteInChannelBuffer(captor.capture());
verify(selector).writeToChannel(captor.capture());

context.queueWriteOperation(captor.getValue());
verify(sslDriver).initiateClose();
Expand Down