Skip to content

Commit

Permalink
Add parameter to limit read ahead to maximum length. Allows to use mu…
Browse files Browse the repository at this point in the history
…ltiple concurrent threads reading from the same file with an offset without reading too much ahead for a single segment.
  • Loading branch information
dkocher committed Dec 23, 2021
1 parent bad207a commit 55aca3e
Showing 1 changed file with 13 additions and 4 deletions.
17 changes: 13 additions & 4 deletions src/main/java/net/schmizz/sshj/sftp/RemoteFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ public class ReadAheadRemoteFileInputStream
private final byte[] b = new byte[1];

private final int maxUnconfirmedReads;
private final long maxOffset;
private final Queue<Promise<Response, SFTPException>> unconfirmedReads = new LinkedList<Promise<Response, SFTPException>>();
private final Queue<Long> unconfirmedReadOffsets = new LinkedList<Long>();

Expand All @@ -232,17 +233,22 @@ public class ReadAheadRemoteFileInputStream
private boolean eof;

public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads) {
assert 0 <= maxUnconfirmedReads;

this.maxUnconfirmedReads = maxUnconfirmedReads;
this(maxUnconfirmedReads, 0L, -1L);
}

public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads, long fileOffset) {
/**
*
* @param maxUnconfirmedReads Maximum number of unconfirmed requests to send
* @param fileOffset Initial offset in file to read from
* @param maxLength Maximum length to read
*/
public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads, long fileOffset, long maxLength) {
assert 0 <= maxUnconfirmedReads;
assert 0 <= fileOffset;

this.maxUnconfirmedReads = maxUnconfirmedReads;
this.requestOffset = this.responseOffset = fileOffset;
this.maxOffset = maxLength > 0 ? fileOffset + maxLength : Long.MAX_VALUE;
}

private ByteArrayInputStream pending = new ByteArrayInputStream(new byte[0]);
Expand Down Expand Up @@ -296,6 +302,9 @@ public int read(byte[] into, int off, int len) throws IOException {
unconfirmedReads.add(RemoteFile.this.asyncRead(requestOffset, reqLen));
unconfirmedReadOffsets.add(requestOffset);
requestOffset += reqLen;
if (requestOffset >= maxOffset) {
break;
}
}

long nextOffset = unconfirmedReadOffsets.peek();
Expand Down

0 comments on commit 55aca3e

Please sign in to comment.