From eb03570d004aedf7c6fb4f31e4c1668be044e2dd Mon Sep 17 00:00:00 2001 From: sakama Date: Tue, 6 Sep 2016 19:38:13 +0900 Subject: [PATCH] Fix last_path generation logic --- .../org/embulk/input/sftp/SftpFileInput.java | 73 +++++++++++++++++-- .../input/sftp/SftpFileInputPlugin.java | 8 +- .../input/sftp/TestSftpFileInputPlugin.java | 19 ++++- 3 files changed, 92 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/embulk/input/sftp/SftpFileInput.java b/src/main/java/org/embulk/input/sftp/SftpFileInput.java index 8ee8b25..b9e6fc8 100644 --- a/src/main/java/org/embulk/input/sftp/SftpFileInput.java +++ b/src/main/java/org/embulk/input/sftp/SftpFileInput.java @@ -143,19 +143,82 @@ public static String getSftpFileUri(PluginTask task, String path) } } - public static String getRelativePath(Optional path) + public static String getRelativePath(PluginTask task, Optional uri) { try { - if (path.isPresent()) { - return new URI(path.get()).getPath(); + if (!uri.isPresent()) { + return null; + } + else if (task.getPassword().isPresent()) { + return getRelativePathFromURIwithPassword(task, uri); } else { - return null; + return new URI(uri.get()).getPath(); } } catch (URISyntaxException ex) { - return null; + throw new ConfigException("Failed to generate last_path due to URI parse failure that contains invalid file path.", ex); + } + } + + private static String getRelativePathFromURIwithPassword(final PluginTask task, final Optional uri) + { + try { + return retryExecutor() + .withRetryLimit(task.getMaxConnectionRetry()) + .withInitialRetryWait(500) + .withMaxRetryWait(30 * 1000) + .runInterruptible(new Retryable() { + @Override + public String call() throws URISyntaxException, IOException + { + log.info("Creating last_path from URI contains password in FileList."); + StandardFileSystemManager manager = initializeStandardFileSystemManager(); + + String prefix = new URI("sftp", initializeUserInfo(task), task.getHost(), task.getPort(), null, null, null).toString(); + prefix = manager.resolveFile(prefix).toString(); + // To avoid URI parse failure when password contains special characters + String newUri = uri.get().replace(prefix, "sftp://user:password@example.com/"); + + return new URI(newUri).getPath(); + } + + @Override + public boolean isRetryableException(Exception exception) + { + if (exception instanceof URISyntaxException) { + // Don't throw cause because URISyntaxException shows password + throw new ConfigException("Failed to generate last_path due to URI parse failure that contains invalid file path or password."); + } + return true; + } + + @Override + public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait) throws RetryGiveupException + { + String message = String.format("SFTP List request failed. Retrying %d/%d after %d seconds. Message: %s", + retryCount, retryLimit, retryWait / 1000, exception.getMessage()); + if (retryCount % 3 == 0) { + log.warn(message, exception); + } + else { + log.warn(message); + } + } + + @Override + public void onGiveup(Exception firstException, Exception lastException) throws RetryGiveupException + { + } + }); + } + catch (RetryGiveupException ex) { + throw new ConfigException("Failed to generate last_path due to FTP connection failure"); + } + catch (InterruptedException ex) { + Throwables.propagate(ex); } + return null; } public static FileList listFilesByPrefix(final PluginTask task) diff --git a/src/main/java/org/embulk/input/sftp/SftpFileInputPlugin.java b/src/main/java/org/embulk/input/sftp/SftpFileInputPlugin.java index ade8382..6ad77df 100644 --- a/src/main/java/org/embulk/input/sftp/SftpFileInputPlugin.java +++ b/src/main/java/org/embulk/input/sftp/SftpFileInputPlugin.java @@ -30,11 +30,15 @@ public ConfigDiff resume(TaskSource taskSource, FileInputPlugin.Control control) { PluginTask task = taskSource.loadTask(PluginTask.class); + String lastPath = null; + if (task.getIncremental()) { + lastPath = SftpFileInput.getRelativePath(task, task.getFiles().getLastPath(task.getLastPath())); + } control.run(taskSource, taskCount); ConfigDiff configDiff = Exec.newConfigDiff(); - if (task.getIncremental()) { - configDiff.set("last_path", SftpFileInput.getRelativePath(task.getFiles().getLastPath(task.getLastPath()))); + if (task.getIncremental() && lastPath != null) { + configDiff.set("last_path", lastPath); } return configDiff; diff --git a/src/test/java/org/embulk/input/sftp/TestSftpFileInputPlugin.java b/src/test/java/org/embulk/input/sftp/TestSftpFileInputPlugin.java index aa44b0e..88ca628 100644 --- a/src/test/java/org/embulk/input/sftp/TestSftpFileInputPlugin.java +++ b/src/test/java/org/embulk/input/sftp/TestSftpFileInputPlugin.java @@ -227,7 +227,7 @@ public List run(TaskSource taskSource, int taskCount) assertEquals(expected.get(0), actual.get(0)); assertEquals(expected.get(1), actual.get(1)); - assertEquals(SftpFileInput.getRelativePath(Optional.of(expected.get(1).get(0))), configDiff.get(String.class, "last_path")); + assertEquals(SftpFileInput.getRelativePath(task, Optional.of(expected.get(1).get(0))), configDiff.get(String.class, "last_path")); } @Test @@ -373,6 +373,23 @@ public void testSetProxyType() throws Exception assertEquals(SftpFileSystemConfigBuilder.PROXY_STREAM, builder.getProxyType(fsOptions)); } + @Test + public void testGetRelativePath() + { + ConfigSource conf = config(); + String expected = "/path/to/sample.csv"; + + conf.set("password", "ABCDE"); + PluginTask task = config.loadConfig(PluginTask.class); + String uri = SftpFileInput.getSftpFileUri(task, "/path/to/sample.csv"); + assertEquals(expected, SftpFileInput.getRelativePath(task, Optional.of(uri))); + + conf.set("password", "ABCD#$¥!%'\"@?<>\\&/_^~|-=+-,{}[]()"); + task = config.loadConfig(PluginTask.class); + uri = SftpFileInput.getSftpFileUri(task, "/path/to/sample.csv"); + assertEquals(expected, SftpFileInput.getRelativePath(task, Optional.of(uri))); + } + private SshServer createSshServer(String host, int port, final String sshUsername, final String sshPassword) { // setup a mock sftp server