Skip to content

Commit

Permalink
Retry read on ClosedByInterruptException
Browse files Browse the repository at this point in the history
In gh-38154, we started handling ClosedByInterruptException. The
FileChannel was repaired by recreating it and then the exception was
rethrown. This allowed other threads to use the channel that had been
read by an interrupted thread while allowing that interruption to
continue.

This approach has proven to be insufficient as there are scenarios
where the read needs to succeed on the interrupted thread. This
commit updates the handling of ClosedByInterruptException so that
this is the case. The FileChannel is recreated as before but the
thread's interrupted flag is now cleared before retrying the read.
The flag is then reinstated so that any subsequent actions that
should fail due to the interruption will do so.

We could clear and reinstate the interrupted flag before the first
read, rather than catching ClosedByInterruptException. This approach
was rejected as it will have an impact on the performance of the
happy path where the thread hasn't been interrupted.

Fixes gh-38611
  • Loading branch information
wilkinsona authored and philwebb committed Dec 7, 2023
1 parent 9eae176 commit e697024
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,7 @@ static class ManagedFileChannel {
int read(ByteBuffer dst, long position) throws IOException {
synchronized (this.lock) {
if (position < this.bufferPosition || position >= this.bufferPosition + this.bufferSize) {
this.buffer.clear();
try {
this.bufferSize = this.fileChannel.read(this.buffer, position);
}
catch (ClosedByInterruptException ex) {
repairFileChannel();
throw ex;
}
this.bufferPosition = position;
fillBuffer(position);
}
if (this.bufferSize <= 0) {
return this.bufferSize;
Expand All @@ -200,6 +192,27 @@ int read(ByteBuffer dst, long position) throws IOException {
}
}

private void fillBuffer(long position) throws IOException {
for (int i = 0; i < 10; i++) {
boolean interrupted = (i != 0) ? Thread.interrupted() : false;
try {
this.buffer.clear();
this.bufferSize = this.fileChannel.read(this.buffer, position);
this.bufferPosition = position;
return;
}
catch (ClosedByInterruptException ex) {
repairFileChannel();
}
finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
throw new ClosedByInterruptException();
}

private void repairFileChannel() throws IOException {
if (tracker != null) {
tracker.closedFileChannel(this.path, this.fileChannel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -77,25 +75,16 @@ void readReadsFile() throws IOException {
}

@Test
void readReadsFileWhenAnotherThreadHasBeenInterrupted() throws IOException, InterruptedException {
void readReadsFileWhenThreadHasBeenInterrupted() throws IOException {
try (FileChannelDataBlock block = createAndOpenBlock()) {
ByteBuffer buffer = ByteBuffer.allocate(CONTENT.length);
AtomicReference<IOException> failure = new AtomicReference<>();
Thread thread = new Thread(() -> {
Thread.currentThread().interrupt();
try {
block.read(ByteBuffer.allocate(CONTENT.length), 0);
}
catch (IOException ex) {
failure.set(ex);
}
});
thread.start();
thread.join();
assertThat(failure.get()).isInstanceOf(ClosedByInterruptException.class);
Thread.currentThread().interrupt();
assertThat(block.read(buffer, 0)).isEqualTo(6);
assertThat(buffer.array()).containsExactly(CONTENT);
}
finally {
Thread.interrupted();
}
}

@Test
Expand Down

0 comments on commit e697024

Please sign in to comment.