Skip to content

Commit

Permalink
NIFI-13491 Added Response Header Request Attributes Prefix to InvokeH…
Browse files Browse the repository at this point in the history
…TTP (apache#9507)

Signed-off-by: David Handermann <[email protected]>
  • Loading branch information
NissimShiman authored Nov 21, 2024
1 parent b421cca commit aa3a7c0
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -455,12 +455,22 @@ public class InvokeHTTP extends AbstractProcessor {

public static final PropertyDescriptor RESPONSE_HEADER_REQUEST_ATTRIBUTES_ENABLED = new PropertyDescriptor.Builder()
.name("Response Header Request Attributes Enabled")
.description("Enable adding HTTP response headers as attributes to FlowFiles transferred to the Original relationship.")
.description("Enable adding HTTP response headers as attributes to FlowFiles transferred to the Original, Retry or No Retry relationships.")
.required(false)
.defaultValue(Boolean.FALSE.toString())
.allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString())
.build();

public static final PropertyDescriptor RESPONSE_HEADER_REQUEST_ATTRIBUTES_PREFIX = new PropertyDescriptor.Builder()
.name("Response Header Request Attributes Prefix")
.description("Prefix to HTTP response headers when included as attributes to FlowFiles transferred to the Original, Retry or No Retry relationships. "
+ "It is recommended to end with a separator character like '.' or '-'.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dependsOn(RESPONSE_HEADER_REQUEST_ATTRIBUTES_ENABLED, Boolean.TRUE.toString())
.build();

public static final PropertyDescriptor RESPONSE_REDIRECTS_ENABLED = new PropertyDescriptor.Builder()
.name("Response Redirects Enabled")
.description("Enable following HTTP redirects sent with HTTP 300 series responses as described in RFC 7231 Section 6.4.")
Expand Down Expand Up @@ -508,6 +518,7 @@ public class InvokeHTTP extends AbstractProcessor {
RESPONSE_GENERATION_REQUIRED,
RESPONSE_FLOW_FILE_NAMING_STRATEGY,
RESPONSE_HEADER_REQUEST_ATTRIBUTES_ENABLED,
RESPONSE_HEADER_REQUEST_ATTRIBUTES_PREFIX,
RESPONSE_REDIRECTS_ENABLED
);

Expand Down Expand Up @@ -878,13 +889,6 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
requestFlowFile = session.putAllAttributes(requestFlowFile, statusAttributes);
}

// If the property to add the response headers to the request flowfile is true then add them
if (context.getProperty(RESPONSE_HEADER_REQUEST_ATTRIBUTES_ENABLED).asBoolean() && requestFlowFile != null) {
// write the response headers as attributes
// this will overwrite any existing flowfile attributes
requestFlowFile = session.putAllAttributes(requestFlowFile, convertAttributesFromHeaders(responseHttp));
}

boolean outputBodyToRequestAttribute = (!isSuccess(statusCode) || putToAttribute) && requestFlowFile != null;
boolean outputBodyToResponseContent = (isSuccess(statusCode) && !putToAttribute) || context.getProperty(RESPONSE_GENERATION_REQUIRED).asBoolean();
ResponseBody responseBody = responseHttp.body();
Expand Down Expand Up @@ -918,7 +922,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro

// write the response headers as attributes
// this will overwrite any existing flowfile attributes
responseFlowFile = session.putAllAttributes(responseFlowFile, convertAttributesFromHeaders(responseHttp));
responseFlowFile = session.putAllAttributes(responseFlowFile, convertAttributesFromHeaders(responseHttp, ""));

// update FlowFile's filename attribute with an extracted value from the remote URL
if (FlowFileNamingStrategy.URL_PATH.equals(getFlowFileNamingStrategy(context)) && HttpMethod.GET.name().equals(httpRequest.method())) {
Expand Down Expand Up @@ -987,6 +991,16 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
}
}

// This needs to be done after the response flowFile has been created from the request flowFile
// as the added attribute headers may have a prefix added that doesn't make sense for the response flowFile.
if (context.getProperty(RESPONSE_HEADER_REQUEST_ATTRIBUTES_ENABLED).asBoolean() && requestFlowFile != null) {
final String prefix = context.getProperty(RESPONSE_HEADER_REQUEST_ATTRIBUTES_PREFIX).evaluateAttributeExpressions(requestFlowFile).getValue();

// write the response headers as attributes
// this will overwrite any existing flowfile attributes
requestFlowFile = session.putAllAttributes(requestFlowFile, convertAttributesFromHeaders(responseHttp, prefix));
}

route(requestFlowFile, responseFlowFile, session, context, statusCode);

}
Expand Down Expand Up @@ -1255,18 +1269,20 @@ private String getLogString(Map<String, List<String>> map) {

/**
* Returns a Map of flowfile attributes from the response http headers. Multivalue headers are naively converted to comma separated strings.
* Prefix is passed in to allow differentiation for these new attributes.
*/
private Map<String, String> convertAttributesFromHeaders(final Response responseHttp) {
private Map<String, String> convertAttributesFromHeaders(final Response responseHttp, final String prefix) {
// create a new hashmap to store the values from the connection
final Map<String, String> attributes = new HashMap<>();
final String trimmedPrefix = trimToEmpty(prefix);
final Headers headers = responseHttp.headers();
headers.names().forEach((key) -> {
final List<String> values = headers.values(key);
// we ignore any headers with no actual values (rare)
if (!values.isEmpty()) {
// create a comma separated string from the values, this is stored in the map
final String value = StringUtils.join(values, MULTIPLE_HEADER_DELIMITER);
attributes.put(key, value);
attributes.put(trimmedPrefix + key, value);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,10 @@ public void testRunGetHttp200SuccessRequestHeaderAttributesAndDynamicProperties(

@Test
public void testRunGetHttp200SuccessResponseHeaderRequestAttributes() {
final String prefix = "response.";
setUrlProperty();
runner.setProperty(InvokeHTTP.RESPONSE_HEADER_REQUEST_ATTRIBUTES_ENABLED, Boolean.TRUE.toString());
runner.setProperty(InvokeHTTP.RESPONSE_HEADER_REQUEST_ATTRIBUTES_PREFIX, prefix);

final String firstHeader = String.class.getSimpleName();
final String secondHeader = Integer.class.getSimpleName();
Expand All @@ -470,10 +472,19 @@ public void testRunGetHttp200SuccessResponseHeaderRequestAttributes() {
assertRelationshipStatusCodeEquals(InvokeHTTP.RESPONSE, HTTP_OK);

final MockFlowFile requestFlowFile = getRequestFlowFile();
requestFlowFile.assertAttributeEquals(CONTENT_LENGTH_HEADER, Integer.toString(0));
requestFlowFile.assertAttributeEquals(prefix + CONTENT_LENGTH_HEADER, Integer.toString(0));
requestFlowFile.assertAttributeNotExists(CONTENT_LENGTH_HEADER);

final String repeatedHeaders = String.format("%s, %s", firstHeader, secondHeader);
requestFlowFile.assertAttributeEquals(REPEATED_HEADER, repeatedHeaders);
requestFlowFile.assertAttributeEquals(prefix + REPEATED_HEADER, repeatedHeaders);
requestFlowFile.assertAttributeNotExists(REPEATED_HEADER);

final MockFlowFile responseFlowFile = getResponseFlowFile();
responseFlowFile.assertAttributeNotExists(prefix + CONTENT_LENGTH_HEADER);
responseFlowFile.assertAttributeEquals(CONTENT_LENGTH_HEADER, Integer.toString(0));

responseFlowFile.assertAttributeNotExists(prefix + REPEATED_HEADER);
responseFlowFile.assertAttributeEquals(REPEATED_HEADER, repeatedHeaders);
}

@Test
Expand Down

0 comments on commit aa3a7c0

Please sign in to comment.