From d4503a1f0e62d38f7a12ba9748744c5490c915c4 Mon Sep 17 00:00:00 2001 From: Marc Jakobi Date: Thu, 25 Jul 2024 10:59:03 +0200 Subject: [PATCH] THRIFT-4847: CancelledKeyException causes TThreadedSelectorServer to fail --- .../server/AbstractNonblockingServer.java | 30 +++++++++++++++---- .../server/TThreadedSelectorServer.java | 15 +++++++--- 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/lib/java/src/main/java/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/main/java/org/apache/thrift/server/AbstractNonblockingServer.java index 954aaebf1b5..a573f0cdd77 100644 --- a/lib/java/src/main/java/org/apache/thrift/server/AbstractNonblockingServer.java +++ b/lib/java/src/main/java/org/apache/thrift/server/AbstractNonblockingServer.java @@ -248,7 +248,7 @@ public class FrameBuffer { protected final TNonblockingTransport trans_; // the SelectionKey that corresponds to our transport - protected final SelectionKey selectionKey_; + protected SelectionKey selectionKey_; // the SelectThread that owns the registration of our transport protected final AbstractSelectThread selectThread_; @@ -302,6 +302,14 @@ public FrameBuffer( } } + /** + * Sets the selection key (this is not thread safe). + * @param selectionKey the new key to set. + */ + public void setSelectionKey(SelectionKey selectionKey) { + selectionKey_ = selectionKey; + } + /** * Give this FrameBuffer a chance to read. The selector loop should have received a read event * for this FrameBuffer. @@ -375,7 +383,11 @@ public boolean read() { // modify our selection key directly. if (buffer_.remaining() == 0) { // get rid of the read select interests - selectionKey_.interestOps(0); + if (selectionKey_.isValid()) { + selectionKey_.interestOps(0); + } else { + LOGGER.warn("SelectionKey was invalidated during read"); + } state_ = FrameBufferState.READ_FRAME_COMPLETE; } @@ -415,8 +427,12 @@ public void changeSelectInterests() { switch (state_) { case AWAITING_REGISTER_WRITE: // set the OP_WRITE interest - selectionKey_.interestOps(SelectionKey.OP_WRITE); - state_ = FrameBufferState.WRITING; + if (selectionKey_.isValid()) { + selectionKey_.interestOps(SelectionKey.OP_WRITE); + state_ = FrameBufferState.WRITING; + } else { + LOGGER.warn("SelectionKey was invalidated before write"); + } break; case AWAITING_REGISTER_READ: prepareRead(); @@ -520,7 +536,11 @@ private boolean internalRead() { private void prepareRead() { // we can set our interest directly without using the queue because // we're in the select thread. - selectionKey_.interestOps(SelectionKey.OP_READ); + if (selectionKey_.isValid()) { + selectionKey_.interestOps(SelectionKey.OP_READ); + } else { + LOGGER.warn("SelectionKey was invalidated before read"); + } // get ready for another go-around buffer_ = ByteBuffer.allocate(4); state_ = FrameBufferState.READING_FRAME_SIZE; diff --git a/lib/java/src/main/java/org/apache/thrift/server/TThreadedSelectorServer.java b/lib/java/src/main/java/org/apache/thrift/server/TThreadedSelectorServer.java index 86b8dfd2337..2a95fe176cd 100644 --- a/lib/java/src/main/java/org/apache/thrift/server/TThreadedSelectorServer.java +++ b/lib/java/src/main/java/org/apache/thrift/server/TThreadedSelectorServer.java @@ -626,16 +626,23 @@ private synchronized void rebuildSelector() { LOGGER.error("Create new Selector error.", e); } - for (SelectionKey key : oldSelector.selectedKeys()) { - if (!key.isValid() && key.readyOps() == 0) continue; + for (SelectionKey key : oldSelector.keys()) { + if (!key.isValid() || key.interestOps() == 0 || key.channel().keyFor(newSelector) != null) { + continue; + } SelectableChannel channel = key.channel(); Object attachment = key.attachment(); + int interestOps = key.interestOps(); + SelectionKey newKey; try { if (attachment == null) { - channel.register(newSelector, key.readyOps()); + newKey = channel.register(newSelector, interestOpts); } else { - channel.register(newSelector, key.readyOps(), attachment); + newKey = channel.register(newSelector, interestOpts, attachment); + if (attachment instanceof FrameBuffer) { + ((FrameBuffer) attachment.setSelectionKey(newKey); + } } } catch (ClosedChannelException e) { LOGGER.error("Register new selector key error.", e);