Skip to content

Commit

Permalink
Repair file channel when it's closed by interruption
Browse files Browse the repository at this point in the history
When an interrupted that calls FileChannel.read, the channel is
closed and the read fails with a ClosedByInterruptException. The
closure of the channel makes it unusable by other threads. To
allow other threads to read from the data block, this commit
recreates the FileChannel when a read fails on an interrupted
thread with a ClosedByInterruptException. The exception is then
rethrown to continue the thread's interruption.

Closes gh-38154
  • Loading branch information
wilkinsona committed Nov 1, 2023
1 parent 173e654 commit 890a3e7
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
Expand Down Expand Up @@ -179,7 +180,13 @@ int read(ByteBuffer dst, long position) throws IOException {
synchronized (this.lock) {
if (position < this.bufferPosition || position >= this.bufferPosition + this.bufferSize) {
this.buffer.clear();
this.bufferSize = this.fileChannel.read(this.buffer, position);
try {
this.bufferSize = this.fileChannel.read(this.buffer, position);
}
catch (ClosedByInterruptException ex) {
repairFileChannel();
throw ex;
}
this.bufferPosition = position;
}
if (this.bufferSize <= 0) {
Expand All @@ -193,6 +200,16 @@ int read(ByteBuffer dst, long position) throws IOException {
}
}

private void repairFileChannel() throws IOException {
if (tracker != null) {
tracker.closedFileChannel(this.path, this.fileChannel);
}
this.fileChannel = FileChannel.open(this.path, StandardOpenOption.READ);
if (tracker != null) {
tracker.openedFileChannel(this.path, this.fileChannel);
}
}

void open() throws IOException {
synchronized (this.lock) {
if (this.referenceCount == 0) {
Expand Down Expand Up @@ -231,7 +248,7 @@ void close() throws IOException {

<E extends Exception> void ensureOpen(Supplier<E> exceptionSupplier) throws E {
synchronized (this.lock) {
if (this.referenceCount == 0) {
if (this.referenceCount == 0 || !this.fileChannel.isOpen()) {
throw exceptionSupplier.get();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -74,6 +76,28 @@ void readReadsFile() throws IOException {
}
}

@Test
void readReadsFileWhenAnotherThreadHasBeenInterrupted() throws IOException, InterruptedException {
try (FileChannelDataBlock block = createAndOpenBlock()) {
ByteBuffer buffer = ByteBuffer.allocate(CONTENT.length);
AtomicReference<IOException> failure = new AtomicReference<>();
Thread thread = new Thread(() -> {
Thread.currentThread().interrupt();
try {
block.read(ByteBuffer.allocate(CONTENT.length), 0);
}
catch (IOException ex) {
failure.set(ex);
}
});
thread.start();
thread.join();
assertThat(failure.get()).isInstanceOf(ClosedByInterruptException.class);
assertThat(block.read(buffer, 0)).isEqualTo(6);
assertThat(buffer.array()).containsExactly(CONTENT);
}
}

@Test
void readDoesNotReadPastEndOfFile() throws IOException {
try (FileChannelDataBlock block = createAndOpenBlock()) {
Expand Down

0 comments on commit 890a3e7

Please sign in to comment.