Skip to content

Commit

Permalink
THRIFT-4847: CancelledKeyException causes TThreadedSelectorServer to …
Browse files Browse the repository at this point in the history
…fail
  • Loading branch information
mrcjkb authored and Jens-G committed Aug 1, 2024
1 parent 438fc82 commit d4503a1
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public class FrameBuffer {
protected final TNonblockingTransport trans_;

// the SelectionKey that corresponds to our transport
protected final SelectionKey selectionKey_;
protected SelectionKey selectionKey_;

// the SelectThread that owns the registration of our transport
protected final AbstractSelectThread selectThread_;
Expand Down Expand Up @@ -302,6 +302,14 @@ public FrameBuffer(
}
}

/**
* Sets the selection key (this is not thread safe).
* @param selectionKey the new key to set.
*/
public void setSelectionKey(SelectionKey selectionKey) {
selectionKey_ = selectionKey;
}

/**
* Give this FrameBuffer a chance to read. The selector loop should have received a read event
* for this FrameBuffer.
Expand Down Expand Up @@ -375,7 +383,11 @@ public boolean read() {
// modify our selection key directly.
if (buffer_.remaining() == 0) {
// get rid of the read select interests
selectionKey_.interestOps(0);
if (selectionKey_.isValid()) {
selectionKey_.interestOps(0);
} else {
LOGGER.warn("SelectionKey was invalidated during read");
}
state_ = FrameBufferState.READ_FRAME_COMPLETE;
}

Expand Down Expand Up @@ -415,8 +427,12 @@ public void changeSelectInterests() {
switch (state_) {
case AWAITING_REGISTER_WRITE:
// set the OP_WRITE interest
selectionKey_.interestOps(SelectionKey.OP_WRITE);
state_ = FrameBufferState.WRITING;
if (selectionKey_.isValid()) {
selectionKey_.interestOps(SelectionKey.OP_WRITE);
state_ = FrameBufferState.WRITING;
} else {
LOGGER.warn("SelectionKey was invalidated before write");
}
break;
case AWAITING_REGISTER_READ:
prepareRead();
Expand Down Expand Up @@ -520,7 +536,11 @@ private boolean internalRead() {
private void prepareRead() {
// we can set our interest directly without using the queue because
// we're in the select thread.
selectionKey_.interestOps(SelectionKey.OP_READ);
if (selectionKey_.isValid()) {
selectionKey_.interestOps(SelectionKey.OP_READ);
} else {
LOGGER.warn("SelectionKey was invalidated before read");
}
// get ready for another go-around
buffer_ = ByteBuffer.allocate(4);
state_ = FrameBufferState.READING_FRAME_SIZE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,16 +626,23 @@ private synchronized void rebuildSelector() {
LOGGER.error("Create new Selector error.", e);
}

for (SelectionKey key : oldSelector.selectedKeys()) {
if (!key.isValid() && key.readyOps() == 0) continue;
for (SelectionKey key : oldSelector.keys()) {
if (!key.isValid() || key.interestOps() == 0 || key.channel().keyFor(newSelector) != null) {
continue;
}
SelectableChannel channel = key.channel();
Object attachment = key.attachment();

int interestOps = key.interestOps();
SelectionKey newKey;
try {
if (attachment == null) {
channel.register(newSelector, key.readyOps());
newKey = channel.register(newSelector, interestOpts);
} else {
channel.register(newSelector, key.readyOps(), attachment);
newKey = channel.register(newSelector, interestOpts, attachment);
if (attachment instanceof FrameBuffer) {
((FrameBuffer) attachment.setSelectionKey(newKey);
}
}
} catch (ClosedChannelException e) {
LOGGER.error("Register new selector key error.", e);
Expand Down

0 comments on commit d4503a1

Please sign in to comment.