Skip to content

Commit

Permalink
Merge pull request #27 from embulk/fix-connection-remaining
Browse files Browse the repository at this point in the history
Fix SFTP connection remaining problem
  • Loading branch information
sakama authored Mar 1, 2018
2 parents 1f4b2f2 + dc5f3f2 commit 4373dce
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 29 deletions.
65 changes: 36 additions & 29 deletions src/main/java/org/embulk/input/sftp/SftpFileInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public TaskReport commit()
@Override
public void close()
{
super.close();
}

private static StandardFileSystemManager initializeStandardFileSystemManager()
Expand Down Expand Up @@ -189,43 +190,49 @@ public FileList call() throws IOException
{
String lastKey = null;
log.info("Getting to download file list");
StandardFileSystemManager manager = initializeStandardFileSystemManager();
FileSystemOptions fsOptions = initializeFsOptions(task);
StandardFileSystemManager manager = null;
try {
manager = initializeStandardFileSystemManager();
FileSystemOptions fsOptions = initializeFsOptions(task);

if (task.getLastPath().isPresent() && !task.getLastPath().get().isEmpty()) {
lastKey = manager.resolveFile(getSftpFileUri(task, task.getLastPath().get()), fsOptions).toString();
}
if (task.getLastPath().isPresent() && !task.getLastPath().get().isEmpty()) {
lastKey = manager.resolveFile(getSftpFileUri(task, task.getLastPath().get()), fsOptions).toString();
}

FileObject files = manager.resolveFile(getSftpFileUri(task, task.getPathPrefix()), fsOptions);
FileObject files = manager.resolveFile(getSftpFileUri(task, task.getPathPrefix()), fsOptions);

if (files.isFolder()) {
//path_prefix is a folder, we add everything in that folder
FileObject[] children = files.getChildren();
Arrays.sort(children);
for (FileObject f : children) {
if (f.isFile()) {
addFileToList(builder, f.toString(), f.getContent().getSize(), "", lastKey);
if (files.isFolder()) {
//path_prefix is a folder, we add everything in that folder
FileObject[] children = files.getChildren();
Arrays.sort(children);
for (FileObject f : children) {
if (f.isFile()) {
addFileToList(builder, f.toString(), f.getContent().getSize(), "", lastKey);
}
}
} else if (files.isFile()) {
//path_prefix is a file then we just need to add that file
addFileToList(builder, files.toString(), files.getContent().getSize(), "", lastKey);
} else {
// path_prefix is neither file or folder, then we scan the parent folder to file path
// that match the path_prefix basename
FileObject parent = files.getParent();
FileObject[] children = parent.getChildren();
Arrays.sort(children);
String fileName = FilenameUtils.getName(task.getPathPrefix());
for (FileObject f : children) {
if (f.isFile()) {
addFileToList(builder, f.toString(), f.getContent().getSize(), fileName, lastKey);
}
}
}
return builder.build();
}
else if (files.isFile()) {
//path_prefix is a file then we just need to add that file
addFileToList(builder, files.toString(), files.getContent().getSize(), "", lastKey);
}
else {
// path_prefix is neither file or folder, then we scan the parent folder to file path
// that match the path_prefix basename
FileObject parent = files.getParent();
FileObject[] children = parent.getChildren();
Arrays.sort(children);
String fileName = FilenameUtils.getName(task.getPathPrefix());
for (FileObject f : children) {
if (f.isFile()) {
addFileToList(builder, f.toString(), f.getContent().getSize(), fileName, lastKey);
}
finally {
if (manager != null) {
manager.close();
}
}
return builder.build();
}

@Override
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/org/embulk/input/sftp/SingleFileProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,8 @@ public void onGiveup(Exception firstException, Exception lastException)
@Override
public void close()
{
if (manager != null) {
manager.close();
}
}
}

0 comments on commit 4373dce

Please sign in to comment.