Skip to content

Commit

Permalink
NIFI-12772 Expose REMOTE_POLL_BATCH_SIZE property for ListSFTP
Browse files Browse the repository at this point in the history
  • Loading branch information
tombrisland committed Feb 12, 2024
1 parent e93fb17 commit cfef828
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.list.AbstractListProcessor;

public interface FileTransfer extends Closeable {

Expand Down Expand Up @@ -172,6 +173,8 @@ default String getAbsolutePath(FlowFile flowFile, String remotePath) throws IOEx
+ "than normal.")
.defaultValue("5000")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
// Limiting to batches means that TIME_TRACKING and TIME_WINDOW strategies will leave files behind
.dependsOn(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_ENTITIES, AbstractListProcessor.NO_TRACKING)
.required(true)
.build();
public static final PropertyDescriptor DELETE_ORIGINAL = new PropertyDescriptor.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -124,13 +123,7 @@ protected List<FileInfo> performListing(final ProcessContext context, final Long
return listing;
}

final Iterator<FileInfo> itr = listing.iterator();
while (itr.hasNext()) {
final FileInfo next = itr.next();
if (next.getLastModifiedTime() < minTimestamp) {
itr.remove();
}
}
listing.removeIf(file -> file.getLastModifiedTime() < minTimestamp);

return listing;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
properties.add(SFTPTransfer.FILE_FILTER_REGEX);
properties.add(SFTPTransfer.PATH_FILTER_REGEX);
properties.add(SFTPTransfer.IGNORE_DOTTED_FILES);
properties.add(SFTPTransfer.REMOTE_POLL_BATCH_SIZE);
properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING);
properties.add(SFTPTransfer.HOST_KEY_FILE);
properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
Expand Down Expand Up @@ -176,7 +177,7 @@ private Predicate<FileInfo> createFileFilter(final ProcessContext context) {
final Long maxAge = context.getProperty(ListFile.MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);

return (attributes) -> {
if(attributes.isDirectory()) {
if (attributes.isDirectory()) {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,12 @@ protected void getListing(final String path, final int depth, final int maxResul
final RemoteResourceFilter filter = (entry) -> {
final String entryFilename = entry.getName();

// Since SSHJ does not have the concept of BREAK that JSCH had, we need to move this before the call to listing.add
// below, otherwise we would keep adding to the listings since returning false here doesn't break
if (listing.size() >= maxResults) {
return false;
}

// skip over 'this directory' and 'parent directory' special files regardless of ignoring dot files
if (RELATIVE_CURRENT_DIRECTORY.equals(entryFilename) || RELATIVE_PARENT_DIRECTORY.equals(entryFilename)) {
return false;
Expand All @@ -332,12 +338,6 @@ protected void getListing(final String path, final int depth, final int maxResul
return false;
}

// Since SSHJ does not have the concept of BREAK that JSCH had, we need to move this before the call to listing.add
// below, otherwise we would keep adding to the listings since returning false here doesn't break
if (listing.size() >= maxResults) {
return false;
}

if (isIncludedFile(entry, symlink) && (filteringDisabled || pathMatched)) {
if (filteringDisabled || fileFilterPattern == null || fileFilterPattern.matcher(entryFilename).matches()) {
listing.add(newFileInfo(entry, path));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,6 @@
*/
package org.apache.nifi.processors.standard;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.UUID;

import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
Expand All @@ -45,6 +35,16 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class TestListSFTP {
Expand Down Expand Up @@ -94,7 +94,7 @@ public void testRunFileFound() {
runner.assertAllFlowFilesContainAttribute(ListFile.FILE_PERMISSIONS_ATTRIBUTE);
runner.assertAllFlowFilesContainAttribute(ListFile.FILE_SIZE_ATTRIBUTE);
runner.assertAllFlowFilesContainAttribute(ListFile.FILE_LAST_MODIFY_TIME_ATTRIBUTE);
runner.assertAllFlowFilesContainAttribute( "filename");
runner.assertAllFlowFilesContainAttribute("filename");

final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0);
retrievedFile.assertAttributeEquals("sftp.listing.user", sshServer.getUsername());
Expand Down Expand Up @@ -178,6 +178,21 @@ public void testRunFileNotFoundMinSizeFiltered() {
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 0);
}

@Test
public void testRemotePollBatchSizeEnforced() {
runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.NO_TRACKING);
runner.setProperty(SFTPTransfer.REMOTE_POLL_BATCH_SIZE, "1");

runner.run();
// Of 3 items only 1 returned due to batch size
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);

runner.setProperty(SFTPTransfer.REMOTE_POLL_BATCH_SIZE, "2");

runner.run();
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 3);
}

@Test
public void testVerificationSuccessful() {
final List<ConfigVerificationResult> results = ((VerifiableProcessor) runner.getProcessor())
Expand Down

0 comments on commit cfef828

Please sign in to comment.