From 85a269d8815dc5a7c612ddbabec7300b38c5b530 Mon Sep 17 00:00:00 2001 From: Tom Brisland Date: Fri, 9 Feb 2024 23:13:35 +0000 Subject: [PATCH] NIFI-12772 Expose REMOTE_POLL_BATCH_SIZE property for ListSFTP --- .../util/file/transfer/ListFileTransfer.java | 9 +---- .../nifi/processors/standard/ListSFTP.java | 3 +- .../processors/standard/TestListSFTP.java | 37 +++++++++++++------ 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-file-transfer/src/main/java/org/apache/nifi/processor/util/file/transfer/ListFileTransfer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-file-transfer/src/main/java/org/apache/nifi/processor/util/file/transfer/ListFileTransfer.java index 1106b27fa37c5..af10d592d9d3a 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-file-transfer/src/main/java/org/apache/nifi/processor/util/file/transfer/ListFileTransfer.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-file-transfer/src/main/java/org/apache/nifi/processor/util/file/transfer/ListFileTransfer.java @@ -31,7 +31,6 @@ import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; @@ -124,13 +123,7 @@ protected List performListing(final ProcessContext context, final Long return listing; } - final Iterator itr = listing.iterator(); - while (itr.hasNext()) { - final FileInfo next = itr.next(); - if (next.getLastModifiedTime() < minTimestamp) { - itr.remove(); - } - } + listing.removeIf(file -> file.getLastModifiedTime() < minTimestamp); return listing; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java index 16a4e8e14e246..1d3e498a0c265 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java @@ -100,6 +100,7 @@ protected List getSupportedPropertyDescriptors() { properties.add(SFTPTransfer.FILE_FILTER_REGEX); properties.add(SFTPTransfer.PATH_FILTER_REGEX); properties.add(SFTPTransfer.IGNORE_DOTTED_FILES); + properties.add(SFTPTransfer.REMOTE_POLL_BATCH_SIZE); properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING); properties.add(SFTPTransfer.HOST_KEY_FILE); properties.add(SFTPTransfer.CONNECTION_TIMEOUT); @@ -176,7 +177,7 @@ private Predicate createFileFilter(final ProcessContext context) { final Long maxAge = context.getProperty(ListFile.MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS); return (attributes) -> { - if(attributes.isDirectory()) { + if (attributes.isDirectory()) { return true; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java index f2340238c12b3..2ae2b783352b9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java @@ -16,16 +16,6 @@ */ package org.apache.nifi.processors.standard; -import java.io.File; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Collections; -import java.util.List; -import java.util.UUID; - import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.ConfigVerificationResult.Outcome; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; @@ -45,6 +35,16 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + import static org.junit.jupiter.api.Assertions.assertEquals; public class TestListSFTP { @@ -94,7 +94,7 @@ public void testRunFileFound() { runner.assertAllFlowFilesContainAttribute(ListFile.FILE_PERMISSIONS_ATTRIBUTE); runner.assertAllFlowFilesContainAttribute(ListFile.FILE_SIZE_ATTRIBUTE); runner.assertAllFlowFilesContainAttribute(ListFile.FILE_LAST_MODIFY_TIME_ATTRIBUTE); - runner.assertAllFlowFilesContainAttribute( "filename"); + runner.assertAllFlowFilesContainAttribute("filename"); final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0); retrievedFile.assertAttributeEquals("sftp.listing.user", sshServer.getUsername()); @@ -178,6 +178,21 @@ public void testRunFileNotFoundMinSizeFiltered() { runner.assertTransferCount(ListSFTP.REL_SUCCESS, 0); } + @Test + public void testRemotePollBatchSizeEnforced() { + runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.NO_TRACKING); + runner.setProperty(SFTPTransfer.REMOTE_POLL_BATCH_SIZE, "1"); + + runner.run(); + // Of 3 items only 1 returned due to batch size + runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1); + + runner.setProperty(SFTPTransfer.REMOTE_POLL_BATCH_SIZE, "2"); + + runner.run(); + runner.assertTransferCount(ListSFTP.REL_SUCCESS, 3); + } + @Test public void testVerificationSuccessful() { final List results = ((VerifiableProcessor) runner.getProcessor())