Skip to content

Commit

Permalink
Refine fix
Browse files Browse the repository at this point in the history
Rather than clearing the interrupted flag, repairing the channel, and
retrying the read, we now repair the channel and then rethrow the
ClosedByInterruptException. This ensures that the channel can be read
by other threads while also allowing the interruption of the current
thread to continue.
  • Loading branch information
wilkinsona committed Nov 1, 2023
1 parent b4cabc4 commit 2aa1e3b
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,8 @@ int read(ByteBuffer dst, long position) throws IOException {
this.bufferSize = this.fileChannel.read(this.buffer, position);
}
catch (ClosedByInterruptException ex) {
Thread.interrupted();
if (tracker != null) {
tracker.closedFileChannel(this.path, this.fileChannel);
}
this.fileChannel = FileChannel.open(this.path, StandardOpenOption.READ);
if (tracker != null) {
tracker.openedFileChannel(this.path, this.fileChannel);
}
this.bufferSize = this.fileChannel.read(this.buffer, position);
repairFileChannel();
throw ex;
}
this.bufferPosition = position;
}
Expand All @@ -207,6 +200,16 @@ int read(ByteBuffer dst, long position) throws IOException {
}
}

private void repairFileChannel() throws IOException {
if (tracker != null) {
tracker.closedFileChannel(this.path, this.fileChannel);
}
this.fileChannel = FileChannel.open(this.path, StandardOpenOption.READ);
if (tracker != null) {
tracker.openedFileChannel(this.path, this.fileChannel);
}
}

void open() throws IOException {
synchronized (this.lock) {
if (this.referenceCount == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -75,16 +77,25 @@ void readReadsFile() throws IOException {
}

@Test
void readReadsFileWhenThreadHasBeenInterrupted() throws IOException {
void readReadsFileWhenAnotherThreadHasBeenInterrupted() throws IOException, InterruptedException {
try (FileChannelDataBlock block = createAndOpenBlock()) {
ByteBuffer buffer = ByteBuffer.allocate(CONTENT.length);
Thread.currentThread().interrupt();
AtomicReference<IOException> failure = new AtomicReference<>();
Thread thread = new Thread(() -> {
Thread.currentThread().interrupt();
try {
block.read(ByteBuffer.allocate(CONTENT.length), 0);
}
catch (IOException ex) {
failure.set(ex);
}
});
thread.start();
thread.join();
assertThat(failure.get()).isInstanceOf(ClosedByInterruptException.class);
assertThat(block.read(buffer, 0)).isEqualTo(6);
assertThat(buffer.array()).containsExactly(CONTENT);
}
finally {
Thread.interrupted();
}
}

@Test
Expand Down

0 comments on commit 2aa1e3b

Please sign in to comment.