Skip to content

Commit

Permalink
Merge pull request #15 from embulk/fix-last-path-generation-logic
Browse files Browse the repository at this point in the history
Fix last_path generation logic
  • Loading branch information
sakama authored Sep 12, 2016
2 parents 7ac95c2 + eb03570 commit 2e9faf8
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 8 deletions.
73 changes: 68 additions & 5 deletions src/main/java/org/embulk/input/sftp/SftpFileInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,82 @@ public static String getSftpFileUri(PluginTask task, String path)
}
}

public static String getRelativePath(Optional<String> path)
public static String getRelativePath(PluginTask task, Optional<String> uri)
{
try {
if (path.isPresent()) {
return new URI(path.get()).getPath();
if (!uri.isPresent()) {
return null;
}
else if (task.getPassword().isPresent()) {
return getRelativePathFromURIwithPassword(task, uri);
}
else {
return null;
return new URI(uri.get()).getPath();
}
}
catch (URISyntaxException ex) {
return null;
throw new ConfigException("Failed to generate last_path due to URI parse failure that contains invalid file path.", ex);
}
}

private static String getRelativePathFromURIwithPassword(final PluginTask task, final Optional<String> uri)
{
try {
return retryExecutor()
.withRetryLimit(task.getMaxConnectionRetry())
.withInitialRetryWait(500)
.withMaxRetryWait(30 * 1000)
.runInterruptible(new Retryable<String>() {
@Override
public String call() throws URISyntaxException, IOException
{
log.info("Creating last_path from URI contains password in FileList.");
StandardFileSystemManager manager = initializeStandardFileSystemManager();

String prefix = new URI("sftp", initializeUserInfo(task), task.getHost(), task.getPort(), null, null, null).toString();
prefix = manager.resolveFile(prefix).toString();
// To avoid URI parse failure when password contains special characters
String newUri = uri.get().replace(prefix, "sftp://user:[email protected]/");

return new URI(newUri).getPath();
}

@Override
public boolean isRetryableException(Exception exception)
{
if (exception instanceof URISyntaxException) {
// Don't throw cause because URISyntaxException shows password
throw new ConfigException("Failed to generate last_path due to URI parse failure that contains invalid file path or password.");
}
return true;
}

@Override
public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait) throws RetryGiveupException
{
String message = String.format("SFTP List request failed. Retrying %d/%d after %d seconds. Message: %s",
retryCount, retryLimit, retryWait / 1000, exception.getMessage());
if (retryCount % 3 == 0) {
log.warn(message, exception);
}
else {
log.warn(message);
}
}

@Override
public void onGiveup(Exception firstException, Exception lastException) throws RetryGiveupException
{
}
});
}
catch (RetryGiveupException ex) {
throw new ConfigException("Failed to generate last_path due to FTP connection failure");
}
catch (InterruptedException ex) {
Throwables.propagate(ex);
}
return null;
}

public static FileList listFilesByPrefix(final PluginTask task)
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/org/embulk/input/sftp/SftpFileInputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ public ConfigDiff resume(TaskSource taskSource,
FileInputPlugin.Control control)
{
PluginTask task = taskSource.loadTask(PluginTask.class);
String lastPath = null;
if (task.getIncremental()) {
lastPath = SftpFileInput.getRelativePath(task, task.getFiles().getLastPath(task.getLastPath()));
}
control.run(taskSource, taskCount);

ConfigDiff configDiff = Exec.newConfigDiff();
if (task.getIncremental()) {
configDiff.set("last_path", SftpFileInput.getRelativePath(task.getFiles().getLastPath(task.getLastPath())));
if (task.getIncremental() && lastPath != null) {
configDiff.set("last_path", lastPath);
}

return configDiff;
Expand Down
19 changes: 18 additions & 1 deletion src/test/java/org/embulk/input/sftp/TestSftpFileInputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public List<TaskReport> run(TaskSource taskSource, int taskCount)

assertEquals(expected.get(0), actual.get(0));
assertEquals(expected.get(1), actual.get(1));
assertEquals(SftpFileInput.getRelativePath(Optional.of(expected.get(1).get(0))), configDiff.get(String.class, "last_path"));
assertEquals(SftpFileInput.getRelativePath(task, Optional.of(expected.get(1).get(0))), configDiff.get(String.class, "last_path"));
}

@Test
Expand Down Expand Up @@ -373,6 +373,23 @@ public void testSetProxyType() throws Exception
assertEquals(SftpFileSystemConfigBuilder.PROXY_STREAM, builder.getProxyType(fsOptions));
}

@Test
public void testGetRelativePath()
{
ConfigSource conf = config();
String expected = "/path/to/sample.csv";

conf.set("password", "ABCDE");
PluginTask task = config.loadConfig(PluginTask.class);
String uri = SftpFileInput.getSftpFileUri(task, "/path/to/sample.csv");
assertEquals(expected, SftpFileInput.getRelativePath(task, Optional.of(uri)));

conf.set("password", "ABCD#$¥!%'\"@?<>\\&/_^~|-=+-,{}[]()");
task = config.loadConfig(PluginTask.class);
uri = SftpFileInput.getSftpFileUri(task, "/path/to/sample.csv");
assertEquals(expected, SftpFileInput.getRelativePath(task, Optional.of(uri)));
}

private SshServer createSshServer(String host, int port, final String sshUsername, final String sshPassword)
{
// setup a mock sftp server
Expand Down

0 comments on commit 2e9faf8

Please sign in to comment.