diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-file-transfer/src/main/java/org/apache/nifi/processor/util/file/transfer/FileTransfer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-file-transfer/src/main/java/org/apache/nifi/processor/util/file/transfer/FileTransfer.java index 56f6f1bfa72a..884b17de17f6 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-file-transfer/src/main/java/org/apache/nifi/processor/util/file/transfer/FileTransfer.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-file-transfer/src/main/java/org/apache/nifi/processor/util/file/transfer/FileTransfer.java @@ -29,6 +29,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.list.AbstractListProcessor; public interface FileTransfer extends Closeable { diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-file-transfer/src/main/java/org/apache/nifi/processor/util/file/transfer/ListFileTransfer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-file-transfer/src/main/java/org/apache/nifi/processor/util/file/transfer/ListFileTransfer.java index 1106b27fa37c..af10d592d9d3 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-file-transfer/src/main/java/org/apache/nifi/processor/util/file/transfer/ListFileTransfer.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-file-transfer/src/main/java/org/apache/nifi/processor/util/file/transfer/ListFileTransfer.java @@ -31,7 +31,6 @@ import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; @@ -124,13 +123,7 @@ protected List performListing(final ProcessContext context, final Long return listing; } - final Iterator itr = listing.iterator(); - while (itr.hasNext()) { - final FileInfo next = itr.next(); - if (next.getLastModifiedTime() < minTimestamp) { - itr.remove(); - } - } + listing.removeIf(file -> file.getLastModifiedTime() < minTimestamp); return listing; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java index 16a4e8e14e24..1d3e498a0c26 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java @@ -100,6 +100,7 @@ protected List getSupportedPropertyDescriptors() { properties.add(SFTPTransfer.FILE_FILTER_REGEX); properties.add(SFTPTransfer.PATH_FILTER_REGEX); properties.add(SFTPTransfer.IGNORE_DOTTED_FILES); + properties.add(SFTPTransfer.REMOTE_POLL_BATCH_SIZE); properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING); properties.add(SFTPTransfer.HOST_KEY_FILE); properties.add(SFTPTransfer.CONNECTION_TIMEOUT); @@ -176,7 +177,7 @@ private Predicate createFileFilter(final ProcessContext context) { final Long maxAge = context.getProperty(ListFile.MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS); return (attributes) -> { - if(attributes.isDirectory()) { + if (attributes.isDirectory()) { return true; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java index 1dd640809ab3..13d6ad8b86ae 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java @@ -317,6 +317,12 @@ protected void getListing(final String path, final int depth, final int maxResul final RemoteResourceFilter filter = (entry) -> { final String entryFilename = entry.getName(); + // Since SSHJ does not have the concept of BREAK that JSCH had, we need to move this before the call to listing.add + // below, otherwise we would keep adding to the listings since returning false here doesn't break + if (listing.size() >= maxResults) { + return false; + } + // skip over 'this directory' and 'parent directory' special files regardless of ignoring dot files if (RELATIVE_CURRENT_DIRECTORY.equals(entryFilename) || RELATIVE_PARENT_DIRECTORY.equals(entryFilename)) { return false; @@ -332,12 +338,6 @@ protected void getListing(final String path, final int depth, final int maxResul return false; } - // Since SSHJ does not have the concept of BREAK that JSCH had, we need to move this before the call to listing.add - // below, otherwise we would keep adding to the listings since returning false here doesn't break - if (listing.size() >= maxResults) { - return false; - } - if (isIncludedFile(entry, symlink) && (filteringDisabled || pathMatched)) { if (filteringDisabled || fileFilterPattern == null || fileFilterPattern.matcher(entryFilename).matches()) { listing.add(newFileInfo(entry, path)); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java index f2340238c12b..2ae2b783352b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java @@ -16,16 +16,6 @@ */ package org.apache.nifi.processors.standard; -import java.io.File; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Collections; -import java.util.List; -import java.util.UUID; - import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.ConfigVerificationResult.Outcome; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; @@ -45,6 +35,16 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + import static org.junit.jupiter.api.Assertions.assertEquals; public class TestListSFTP { @@ -94,7 +94,7 @@ public void testRunFileFound() { runner.assertAllFlowFilesContainAttribute(ListFile.FILE_PERMISSIONS_ATTRIBUTE); runner.assertAllFlowFilesContainAttribute(ListFile.FILE_SIZE_ATTRIBUTE); runner.assertAllFlowFilesContainAttribute(ListFile.FILE_LAST_MODIFY_TIME_ATTRIBUTE); - runner.assertAllFlowFilesContainAttribute( "filename"); + runner.assertAllFlowFilesContainAttribute("filename"); final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0); retrievedFile.assertAttributeEquals("sftp.listing.user", sshServer.getUsername()); @@ -178,6 +178,21 @@ public void testRunFileNotFoundMinSizeFiltered() { runner.assertTransferCount(ListSFTP.REL_SUCCESS, 0); } + @Test + public void testRemotePollBatchSizeEnforced() { + runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.NO_TRACKING); + runner.setProperty(SFTPTransfer.REMOTE_POLL_BATCH_SIZE, "1"); + + runner.run(); + // Of 3 items only 1 returned due to batch size + runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1); + + runner.setProperty(SFTPTransfer.REMOTE_POLL_BATCH_SIZE, "2"); + + runner.run(); + runner.assertTransferCount(ListSFTP.REL_SUCCESS, 3); + } + @Test public void testVerificationSuccessful() { final List results = ((VerifiableProcessor) runner.getProcessor())