Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added SFTP file transfer resume support on both PUT and GET. #775

Merged
merged 13 commits into from
May 27, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/main/java/net/schmizz/sshj/sftp/RemoteFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,16 @@ public int getLength() {
private boolean eof;

public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads) {
this(maxUnconfirmedReads, 0L, -1L);
this(maxUnconfirmedReads, 0L);
}

/**
*
* @param maxUnconfirmedReads Maximum number of unconfirmed requests to send
* @param fileOffset Initial offset in file to read from
*/
public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads, long fileOffset) {
this(maxUnconfirmedReads, fileOffset, -1L);
}

/**
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/net/schmizz/sshj/sftp/SFTPClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,21 +232,41 @@ public void get(String source, String dest)
throws IOException {
xfer.download(source, dest);
}

public void get(String source, String dest, boolean resume)
throws IOException {
xfer.download(source, dest, resume);
}

public void put(String source, String dest)
throws IOException {
xfer.upload(source, dest);
}

public void put(String source, String dest, boolean resume)
throws IOException {
xfer.upload(source, dest, resume);
}

public void get(String source, LocalDestFile dest)
throws IOException {
xfer.download(source, dest);
}

public void get(String source, LocalDestFile dest, boolean resume)
throws IOException {
xfer.download(source, dest, resume);
}

public void put(LocalSourceFile source, String dest)
throws IOException {
xfer.upload(source, dest);
}

public void put(LocalSourceFile source, String dest, boolean resume)
throws IOException {
xfer.upload(source, dest, resume);
}

@Override
public void close()
Expand Down
106 changes: 78 additions & 28 deletions src/main/java/net/schmizz/sshj/sftp/SFTPFileTransfer.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,25 +50,47 @@ public void setPreserveAttributes(boolean preserveAttributes) {
@Override
public void upload(String source, String dest)
throws IOException {
upload(new FileSystemFile(source), dest);
upload(source, dest, false);
}

@Override
public void upload(String source, String dest, boolean resume)
throws IOException {
upload(new FileSystemFile(source), dest, resume);
}

@Override
public void download(String source, String dest)
throws IOException {
download(source, new FileSystemFile(dest));
download(source, dest, false);
}

@Override
public void download(String source, String dest, boolean resume)
throws IOException {
download(source, new FileSystemFile(dest), resume);
}

@Override
public void upload(LocalSourceFile localFile, String remotePath) throws IOException {
new Uploader(localFile, remotePath).upload(getTransferListener());
upload(localFile, remotePath, false);
}

@Override
public void upload(LocalSourceFile localFile, String remotePath, boolean resume) throws IOException {
new Uploader(localFile, remotePath).upload(getTransferListener(), resume);
}

@Override
public void download(String source, LocalDestFile dest) throws IOException {
download(source, dest, false);
}

@Override
public void download(String source, LocalDestFile dest, boolean resume) throws IOException {
final PathComponents pathComponents = engine.getPathHelper().getComponents(source);
final FileAttributes attributes = engine.stat(source);
new Downloader().download(getTransferListener(), new RemoteResourceInfo(pathComponents, attributes), dest);
new Downloader().download(getTransferListener(), new RemoteResourceInfo(pathComponents, attributes), dest, resume);
}

public void setUploadFilter(LocalFileFilter uploadFilter) {
Expand All @@ -92,17 +114,18 @@ private class Downloader {
@SuppressWarnings("PMD.MissingBreakInSwitch")
private void download(final TransferListener listener,
final RemoteResourceInfo remote,
final LocalDestFile local) throws IOException {
final LocalDestFile local,
final boolean resume) throws IOException {
final LocalDestFile adjustedFile;
switch (remote.getAttributes().getType()) {
case DIRECTORY:
adjustedFile = downloadDir(listener.directory(remote.getName()), remote, local);
adjustedFile = downloadDir(listener.directory(remote.getName()), remote, local, resume);
break;
case UNKNOWN:
log.warn("Server did not supply information about the type of file at `{}` " +
"-- assuming it is a regular file!", remote.getPath());
case REGULAR:
brenttyler marked this conversation as resolved.
Show resolved Hide resolved
adjustedFile = downloadFile(listener.file(remote.getName(), remote.getAttributes().getSize()), remote, local);
adjustedFile = downloadFile(listener.file(remote.getName(), remote.getAttributes().getSize()), remote, local, resume);
break;
default:
throw new IOException(remote + " is not a regular file or directory");
Expand All @@ -113,13 +136,14 @@ private void download(final TransferListener listener,

private LocalDestFile downloadDir(final TransferListener listener,
final RemoteResourceInfo remote,
final LocalDestFile local)
final LocalDestFile local,
final boolean resume)
throws IOException {
final LocalDestFile adjusted = local.getTargetDirectory(remote.getName());
final RemoteDirectory rd = engine.openDir(remote.getPath());
try {
for (RemoteResourceInfo rri : rd.scan(getDownloadFilter()))
download(listener, rri, adjusted.getChild(rri.getName()));
download(listener, rri, adjusted.getChild(rri.getName()), resume);
} finally {
rd.close();
}
Expand All @@ -128,13 +152,22 @@ private LocalDestFile downloadDir(final TransferListener listener,

private LocalDestFile downloadFile(final StreamCopier.Listener listener,
final RemoteResourceInfo remote,
final LocalDestFile local)
final LocalDestFile local,
final boolean resume)
throws IOException {
final LocalDestFile adjusted = local.getTargetFile(remote.getName());
final RemoteFile rf = engine.open(remote.getPath());
long byteOffset = 0;
try {
final RemoteFile.ReadAheadRemoteFileInputStream rfis = rf.new ReadAheadRemoteFileInputStream(16);
final OutputStream os = adjusted.getOutputStream();
if (resume && adjusted.getLength() != 0 && adjusted.getLength() < remote.getAttributes().getSize()) {
// resume requested AND there's already at least one byte there AND the existing file isn't as long as the incoming remote
// we can honor the request to resume by setting the offset
byteOffset = adjusted.getLength();
}

log.debug("Attempting to download {} with offset={}", remote.getPath(), byteOffset);
final RemoteFile.ReadAheadRemoteFileInputStream rfis = rf.new ReadAheadRemoteFileInputStream(16, byteOffset);
final OutputStream os = adjusted.getOutputStream(byteOffset != 0);
try {
new StreamCopier(rfis, os, engine.getLoggerFactory())
.bufSize(engine.getSubsystem().getLocalMaxPacketSize())
Expand Down Expand Up @@ -173,17 +206,17 @@ private Uploader(final LocalSourceFile source, final String remote) {
this.remote = remote;
}

private void upload(final TransferListener listener) throws IOException {
private void upload(final TransferListener listener, boolean resume) throws IOException {
if (source.isDirectory()) {
makeDirIfNotExists(remote); // Ensure that the directory exists
uploadDir(listener.directory(source.getName()), source, remote);
setAttributes(source, remote);
} else if (source.isFile() && isDirectory(remote)) {
String adjustedRemote = engine.getPathHelper().adjustForParent(this.remote, source.getName());
uploadFile(listener.file(source.getName(), source.getLength()), source, adjustedRemote);
uploadFile(listener.file(source.getName(), source.getLength()), source, adjustedRemote, resume);
setAttributes(source, adjustedRemote);
} else if (source.isFile()) {
uploadFile(listener.file(source.getName(), source.getLength()), source, remote);
uploadFile(listener.file(source.getName(), source.getLength()), source, remote, resume);
setAttributes(source, remote);
} else {
throw new IOException(source + " is not a file or directory");
Expand All @@ -192,13 +225,14 @@ private void upload(final TransferListener listener) throws IOException {

private void upload(final TransferListener listener,
final LocalSourceFile local,
final String remote)
final String remote,
final boolean resume)
throws IOException {
final String adjustedPath;
if (local.isDirectory()) {
adjustedPath = uploadDir(listener.directory(local.getName()), local, remote);
} else if (local.isFile()) {
adjustedPath = uploadFile(listener.file(local.getName(), local.getLength()), local, remote);
adjustedPath = uploadFile(listener.file(local.getName(), local.getLength()), local, remote, resume);
} else {
throw new IOException(local + " is not a file or directory");
}
Expand All @@ -217,22 +251,37 @@ private String uploadDir(final TransferListener listener,
throws IOException {
makeDirIfNotExists(remote);
for (LocalSourceFile f : local.getChildren(getUploadFilter()))
upload(listener, f, engine.getPathHelper().adjustForParent(remote, f.getName()));
upload(listener, f, engine.getPathHelper().adjustForParent(remote, f.getName()), false);
return remote;
}

private String uploadFile(final StreamCopier.Listener listener,
final LocalSourceFile local,
final String remote)
final String remote,
final boolean resume)
throws IOException {
final String adjusted = prepareFile(local, remote);
final RemoteResourceInfo remoteFileInfo = prepareFile(local, remote, resume);
RemoteFile rf = null;
InputStream fis = null;
RemoteFile.RemoteFileOutputStream rfos = null;
long byteOffset = 0;
EnumSet<OpenMode> modes;
try {
rf = engine.open(adjusted, EnumSet.of(OpenMode.WRITE, OpenMode.CREAT, OpenMode.TRUNC));
if (!resume || remoteFileInfo.getAttributes() == null || local.getLength() < remoteFileInfo.getAttributes().getSize()) {
// not resuming OR remote file doesn't exist yet OR the local is smaller than the intended remote target already is
// we consider this to be a full replacement
modes = EnumSet.of(OpenMode.WRITE, OpenMode.CREAT, OpenMode.TRUNC);
} else {
// resuming something that's out there
byteOffset = remoteFileInfo.getAttributes().getSize();
modes = EnumSet.of(OpenMode.WRITE, OpenMode.APPEND);
}

log.debug("Attempting to upload {} with offset={}", local.getName(), byteOffset);
rf = engine.open(remoteFileInfo.getPath(), modes);
fis = local.getInputStream();
rfos = rf.new RemoteFileOutputStream(0, 16);
fis.skip(byteOffset);
rfos = rf.new RemoteFileOutputStream(byteOffset, 16);
new StreamCopier(fis, rfos, engine.getLoggerFactory())
.bufSize(engine.getSubsystem().getRemoteMaxPacketSize() - rf.getOutgoingPacketOverhead())
.keepFlushing(false)
Expand All @@ -258,7 +307,7 @@ private String uploadFile(final StreamCopier.Listener listener,
}
}
}
return adjusted;
return remoteFileInfo.getPath();
}

private boolean makeDirIfNotExists(final String remote) throws IOException {
Expand Down Expand Up @@ -293,24 +342,25 @@ private boolean isDirectory(final String remote) throws IOException {
}
}
}

private String prepareFile(final LocalSourceFile local, final String remote)
private RemoteResourceInfo prepareFile(final LocalSourceFile local, final String remote, final boolean resume)
throws IOException {
final PathComponents pathComponents = engine.getPathHelper().getComponents(remote);
final FileAttributes attrs;
try {
attrs = engine.stat(remote);
} catch (SFTPException e) {
if (e.getStatusCode() == StatusCode.NO_SUCH_FILE) {
log.debug("probeFile: {} does not exist", remote);
return remote;
return new RemoteResourceInfo(pathComponents, null);
} else
throw e;
}
if (attrs.getMode().getType() == FileMode.Type.DIRECTORY) {
throw new IOException("Trying to upload file " + local.getName() + " to path " + remote + " but that is a directory");
} else {
log.debug("probeFile: {} is a {} file that will be replaced", remote, attrs.getMode().getType());
return remote;
log.debug("probeFile: {} is a {} file that will be {}", remote, attrs.getMode().getType(), resume ? "resumed" : "replaced");
return new RemoteResourceInfo(pathComponents, attrs);
}
}

Expand Down
8 changes: 7 additions & 1 deletion src/main/java/net/schmizz/sshj/xfer/FileSystemFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,13 @@ public InputStream getInputStream()
@Override
public OutputStream getOutputStream()
throws IOException {
return new FileOutputStream(file);
return getOutputStream(false);
}

@Override
public OutputStream getOutputStream(boolean append)
throws IOException {
return new FileOutputStream(file, append);
}

@Override
Expand Down
50 changes: 50 additions & 0 deletions src/main/java/net/schmizz/sshj/xfer/FileTransfer.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ public interface FileTransfer {
void upload(String localPath, String remotePath)
throws IOException;

/**
* This is meant to delegate to {@link #upload(LocalSourceFile, String)} with the {@code localPath} wrapped as e.g.
* a {@link FileSystemFile}. Attempts to resume with the {@code resume} flag.
*
* @param localPath
brenttyler marked this conversation as resolved.
Show resolved Hide resolved
* @param remotePath
brenttyler marked this conversation as resolved.
Show resolved Hide resolved
* @param resume
*
* @throws IOException
brenttyler marked this conversation as resolved.
Show resolved Hide resolved
*/
void upload(String localPath, String remotePath, boolean resume)
throws IOException;

/**
* This is meant to delegate to {@link #download(String, LocalDestFile)} with the {@code localPath} wrapped as e.g.
* a {@link FileSystemFile}.
Expand All @@ -43,6 +56,19 @@ void upload(String localPath, String remotePath)
void download(String remotePath, String localPath)
throws IOException;

/**
* This is meant to delegate to {@link #download(String, LocalDestFile)} with the {@code localPath} wrapped as e.g.
* a {@link FileSystemFile}. Attempts to resume with the {@code resume} flag.
*
* @param localPath
* @param remotePath
brenttyler marked this conversation as resolved.
Show resolved Hide resolved
* @param resume
*
* @throws IOException
*/
void download(String remotePath, String localPath, boolean resume)
throws IOException;

/**
* Upload {@code localFile} to {@code remotePath}.
*
Expand All @@ -54,6 +80,18 @@ void download(String remotePath, String localPath)
void upload(LocalSourceFile localFile, String remotePath)
throws IOException;

/**
* Upload {@code localFile} to {@code remotePath}. Attempts to resume with the {@code resume} flag.
*
* @param localFile
brenttyler marked this conversation as resolved.
Show resolved Hide resolved
* @param remotePath
* @param resume
*
* @throws IOException
*/
void upload(LocalSourceFile localFile, String remotePath, boolean resume)
throws IOException;

/**
* Download {@code remotePath} to {@code localFile}.
*
Expand All @@ -65,6 +103,18 @@ void upload(LocalSourceFile localFile, String remotePath)
void download(String remotePath, LocalDestFile localFile)
throws IOException;

/**
* Download {@code remotePath} to {@code localFile}. Attempts to resume with the {@code resume} flag.
*
* @param localFile
* @param remotePath
brenttyler marked this conversation as resolved.
Show resolved Hide resolved
* @param resume
brenttyler marked this conversation as resolved.
Show resolved Hide resolved
*
* @throws IOException
brenttyler marked this conversation as resolved.
Show resolved Hide resolved
*/
void download(String remotePath, LocalDestFile localFile, boolean resume)
throws IOException;

TransferListener getTransferListener();

void setTransferListener(TransferListener listener);
Expand Down
Loading