Skip to content

Commit

Permalink
NIFI-13714 Fixed RecordTransform Python Processor Partition Handling (#…
Browse files Browse the repository at this point in the history
…9253)

Signed-off-by: David Handermann <[email protected]>
  • Loading branch information
lordgamez authored Sep 21, 2024
1 parent 7292479 commit 426b8fe
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,14 @@ def getRelationship(self):
return self.processor_result.relationship

def getPartition(self):
return self.processor_result.partition
if self.processor_result.partition is None:
return None

map = JvmHolder.jvm.java.util.HashMap()
for key, value in self.processor_result.partition.items():
map.put(key, value)

return map


class RecordTransformResult:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,91 @@ public void testRecordTransform() throws NiFiClientException, IOException, Inter
assertEquals("HELLO", secondRecordValues.get( headerIndices.get("greeting") ));
}

@Test
public void testRecordTransformPartitioning() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
final ProcessorEntity setRecordField = getClientUtil().createPythonProcessor("SetRecordField");
final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");

// Add Reader and Writer
final ControllerServiceEntity csvReader = getClientUtil().createControllerService("MockCSVReader");
final ControllerServiceEntity csvWriter = getClientUtil().createControllerService("MockCSVWriter");

getClientUtil().enableControllerService(csvReader);
getClientUtil().enableControllerService(csvWriter);

// Configure the SetRecordField property
final Map<String, String> fieldMap = new HashMap<>();
fieldMap.put("Record Reader", csvReader.getId());
fieldMap.put("Record Writer", csvWriter.getId());
getClientUtil().updateProcessorProperties(setRecordField, fieldMap);
getClientUtil().setAutoTerminatedRelationships(setRecordField, new HashSet<>(Arrays.asList("original", "failure")));

// Set contents of GenerateFlowFile
getClientUtil().updateProcessorProperties(generate,
Collections.singletonMap("Text", "name, group\nJane Doe, default\nJake Doe, other"));

// Connect flow
getClientUtil().createConnection(generate, setRecordField, "success");
final ConnectionEntity outputConnection = getClientUtil().createConnection(setRecordField, terminate, "success");

// Wait for processor validation to complete
getClientUtil().waitForValidProcessor(generate.getId());
getClientUtil().waitForValidProcessor(setRecordField.getId());

// Run the flow
getClientUtil().startProcessor(generate);
getClientUtil().startProcessor(setRecordField);

// Wait for output data
waitForQueueCount(outputConnection.getId(), 2);

// Verify output contents. We don't know the order that the fields will be in, but we know that we should get back 2 fields per record: name, group.
final String ff1Contents = getClientUtil().getFlowFileContentAsUtf8(outputConnection.getId(), 0);
final String[] ff1Lines = ff1Contents.split("\n");
final String ff1HeaderLine = ff1Lines[0];
final List<String> ff1Headers = Stream.of(ff1HeaderLine.split(","))
.map(String::trim)
.toList();
assertTrue(ff1Headers.contains("name"));
assertTrue(ff1Headers.contains("group"));

final Map<String, Integer> ff1HeaderIndices = new HashMap<>();
int index = 0;
for (final String header : ff1Headers) {
ff1HeaderIndices.put(header, index++);
}

final String firstRecordLine = ff1Lines[1];
final List<String> firstRecordValues = Stream.of(firstRecordLine.split(","))
.map(String::trim)
.toList();
assertEquals("Jane Doe", firstRecordValues.get( ff1HeaderIndices.get("name") ));
assertEquals("default", firstRecordValues.get( ff1HeaderIndices.get("group") ));

final String ff2Contents = getClientUtil().getFlowFileContentAsUtf8(outputConnection.getId(), 1);
final String[] ff2Lines = ff2Contents.split("\n");
final String ff2HeaderLine = ff2Lines[0];
final List<String> ff2Headers = Stream.of(ff2HeaderLine.split(","))
.map(String::trim)
.toList();
assertTrue(ff2Headers.contains("name"));
assertTrue(ff2Headers.contains("group"));

final Map<String, Integer> headerIndices = new HashMap<>();
index = 0;
for (final String header : ff2Headers) {
headerIndices.put(header, index++);
}

final String secondRecordLine = ff2Lines[1];
final List<String> secondRecordValues = Stream.of(secondRecordLine.split(","))
.map(String::trim)
.toList();
assertEquals("Jake Doe", secondRecordValues.get( headerIndices.get("name") ));
assertEquals("other", secondRecordValues.get( headerIndices.get("group") ));
}

@Test
public void testFlowFileSource() throws NiFiClientException, IOException, InterruptedException {
final String messageContents = "Hello World";
Expand Down

0 comments on commit 426b8fe

Please sign in to comment.