Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Immediately flush channel after writing to buffer #31301

Merged
merged 4 commits into from
Jun 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected void setSelectionKey(SelectionKey selectionKey) {
* @throws IOException during channel / context close
*/
public void closeFromSelector() throws IOException {
if (closeContext.isDone() == false) {
if (isOpen()) {
try {
rawChannel.close();
closeContext.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ protected void listenerException(Exception exception) {
}

/**
* This method is called after ready events (READ, ACCEPT, WRITE, CONNECT) have been handled for a
* channel.
* This method is called after events (READ, WRITE, CONNECT) have been handled for a channel.
*
* @param context that was handled
*/
Expand Down
43 changes: 24 additions & 19 deletions libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@
* {@link #runLoop()}, the selector will run until {@link #close()} is called. This instance handles closing
* of channels. Users should call {@link #queueChannelClose(NioChannel)} to schedule a channel for close by
* this selector.
* <p>
* Children of this class should implement the specific {@link #processKey(SelectionKey)},
* {@link #preSelect()}, and {@link #cleanup()} functionality.
*/
public class NioSelector implements Closeable {

Expand All @@ -65,7 +62,7 @@ public NioSelector(EventHandler eventHandler) throws IOException {
this(eventHandler, Selector.open());
}

public NioSelector(EventHandler eventHandler, Selector selector) throws IOException {
public NioSelector(EventHandler eventHandler, Selector selector) {
this.selector = selector;
this.eventHandler = eventHandler;
}
Expand Down Expand Up @@ -165,7 +162,7 @@ void singleLoop() {
}

void cleanupAndCloseChannels() {
cleanup();
cleanupPendingWrites();
channelsToClose.addAll(channelsToRegister);
channelsToRegister.clear();
channelsToClose.addAll(selector.keys().stream().map(sk -> (ChannelContext<?>) sk.attachment()).collect(Collectors.toList()));
Expand Down Expand Up @@ -234,16 +231,6 @@ void preSelect() {
handleQueuedWrites();
}

/**
* Called once as the selector is being closed.
*/
void cleanup() {
WriteOperation op;
while ((op = queuedWrites.poll()) != null) {
executeFailedListener(op.getListener(), new ClosedSelectorException());
}
}

/**
* Queues a write operation to be handled by the event loop. This can be called by any thread and is the
* api available for non-selector threads to schedule writes.
Expand Down Expand Up @@ -284,20 +271,31 @@ public void scheduleForRegistration(NioChannel channel) {
}

/**
* Queues a write operation directly in a channel's buffer. Channel buffers are only safe to be accessed
* by the selector thread. As a result, this method should only be called by the selector thread.
* Queues a write operation directly in a channel's buffer. If this channel does not have pending writes
* already, the channel will be flushed. Channel buffers are only safe to be accessed by the selector
* thread. As a result, this method should only be called by the selector thread. If this channel does
* not have pending writes already, the channel will be flushed.
*
* @param writeOperation to be queued in a channel's buffer
*/
public void queueWriteInChannelBuffer(WriteOperation writeOperation) {
public void writeToChannel(WriteOperation writeOperation) {
assertOnSelectorThread();
SocketChannelContext context = writeOperation.getChannel();
// If the channel does not currently have anything that is ready to flush, we should flush after
// the write operation is queued.
boolean shouldFlushAfterQueuing = context.readyForFlush() == false;
try {
SelectionKeyUtils.setWriteInterested(context.getSelectionKey());
context.queueWriteOperation(writeOperation);
} catch (Exception e) {
shouldFlushAfterQueuing = false;
executeFailedListener(writeOperation.getListener(), e);
}

if (shouldFlushAfterQueuing) {
handleWrite(context);
eventHandler.postHandling(context);
}
}

/**
Expand Down Expand Up @@ -332,6 +330,13 @@ public <V> void executeFailedListener(BiConsumer<V, Exception> listener, Excepti
}
}

private void cleanupPendingWrites() {
WriteOperation op;
while ((op = queuedWrites.poll()) != null) {
executeFailedListener(op.getListener(), new ClosedSelectorException());
}
}

private void wakeup() {
// TODO: Do we need the wakeup optimizations that some other libraries use?
selector.wakeup();
Expand Down Expand Up @@ -394,7 +399,7 @@ private void handleQueuedWrites() {
WriteOperation writeOperation;
while ((writeOperation = queuedWrites.poll()) != null) {
if (writeOperation.getChannel().isOpen()) {
queueWriteInChannelBuffer(writeOperation);
writeToChannel(writeOperation);
} else {
executeFailedListener(writeOperation.getListener(), new ClosedChannelException());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void sendMessage(Object message, BiConsumer<Void, Exception> listener) {
return;
}

selector.queueWriteInChannelBuffer(writeOperation);
selector.writeToChannel(writeOperation);
}

public void queueWriteOperation(WriteOperation writeOperation) {
Expand Down Expand Up @@ -164,7 +164,7 @@ protected FlushOperation getPendingFlush() {
@Override
public void closeFromSelector() throws IOException {
getSelector().assertOnSelectorThread();
if (channel.isOpen()) {
if (isOpen()) {
ArrayList<IOException> closingExceptions = new ArrayList<>(3);
try {
super.closeFromSelector();
Expand Down
26 changes: 23 additions & 3 deletions libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,28 @@ public void testQueueWriteSuccessful() throws Exception {
public void testQueueDirectlyInChannelBufferSuccessful() throws Exception {
WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);

assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) == 0);
assertEquals(0, (selectionKey.interestOps() & SelectionKey.OP_WRITE));

selector.queueWriteInChannelBuffer(writeOperation);
when(channelContext.readyForFlush()).thenReturn(true);
selector.writeToChannel(writeOperation);

verify(channelContext).queueWriteOperation(writeOperation);
verify(eventHandler, times(0)).handleWrite(channelContext);
verify(eventHandler, times(0)).postHandling(channelContext);
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0);
}

public void testShouldFlushIfNoPendingFlushes() throws Exception {
WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);

assertEquals(0, (selectionKey.interestOps() & SelectionKey.OP_WRITE));

when(channelContext.readyForFlush()).thenReturn(false);
selector.writeToChannel(writeOperation);

verify(channelContext).queueWriteOperation(writeOperation);
verify(eventHandler).handleWrite(channelContext);
verify(eventHandler).postHandling(channelContext);
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0);
}

Expand All @@ -277,10 +294,13 @@ public void testQueueDirectlyInChannelBufferSelectionKeyThrowsException() throws
CancelledKeyException cancelledKeyException = new CancelledKeyException();

when(channelContext.getSelectionKey()).thenReturn(selectionKey);
when(channelContext.readyForFlush()).thenReturn(false);
when(selectionKey.interestOps(anyInt())).thenThrow(cancelledKeyException);
selector.queueWriteInChannelBuffer(writeOperation);
selector.writeToChannel(writeOperation);

verify(channelContext, times(0)).queueWriteOperation(writeOperation);
verify(eventHandler, times(0)).handleWrite(channelContext);
verify(eventHandler, times(0)).postHandling(channelContext);
verify(listener).accept(null, cancelledKeyException);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public void testSendMessageFromSameThreadIsQueuedInChannel() {
when(readWriteHandler.createWriteOperation(context, buffers, listener)).thenReturn(writeOperation);
context.sendMessage(buffers, listener);

verify(selector).queueWriteInChannelBuffer(writeOpCaptor.capture());
verify(selector).writeToChannel(writeOpCaptor.capture());
WriteOperation writeOp = writeOpCaptor.getValue();

assertSame(writeOperation, writeOp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void closeChannel() {
selector.queueWrite(writeOperation);
return;
}
selector.queueWriteInChannelBuffer(writeOperation);
selector.writeToChannel(writeOperation);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ public void testInitiateCloseFromSameThreadSchedulesCloseNotify() {
context.closeChannel();

ArgumentCaptor<WriteOperation> captor = ArgumentCaptor.forClass(WriteOperation.class);
verify(selector).queueWriteInChannelBuffer(captor.capture());
verify(selector).writeToChannel(captor.capture());

context.queueWriteOperation(captor.getValue());
verify(sslDriver).initiateClose();
Expand Down