Skip to content

Commit

Permalink
MINIFICPP-2294 Flow migration
Browse files Browse the repository at this point in the history
  -Removed deprecated "Send Body" and "Disable Peer Verification" properties from InvokeHTTP
  -Removed deprecated "Message Key Field" property from PublishKafka
  -Removed deprecated SSL related properties from PublishKafka and replaced them with a generated SSLContextService
  • Loading branch information
martinzink committed Aug 14, 2024
1 parent e3fdae6 commit 810d7e2
Show file tree
Hide file tree
Showing 26 changed files with 653 additions and 176 deletions.
7 changes: 0 additions & 7 deletions PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -1265,10 +1265,8 @@ In the list below, the names of required properties appear in bold. Any other pr
| invokehttp-proxy-username | | | Username to set when authenticating against proxy |
| invokehttp-proxy-password | | | Password to set when authenticating against proxy<br/>**Sensitive Property: true** |
| Content-type | application/octet-stream | | The Content-Type to specify for when content is being transmitted through a PUT, POST or PATCH. In the case of an empty value after evaluating an expression language expression, Content-Type defaults to |
| send-message-body | true | true<br/>false | DEPRECATED. Only kept for backwards compatibility, no functionality is included. |
| Send Message Body | true | true<br/>false | If true, sends the HTTP message body on POST/PUT/PATCH requests (default). If false, suppresses the message body and content-type header for these requests. |
| Use Chunked Encoding | false | true<br/>false | When POST'ing, PUT'ing or PATCH'ing content set this property to true in order to not pass the 'Content-length' header and instead send 'Transfer-Encoding' with a value of 'chunked'. This will enable the data transfer mechanism which was introduced in HTTP 1.1 to pass data of unknown lengths in chunks. |
| Disable Peer Verification | false | true<br/>false | DEPRECATED. The value is ignored, peer and host verification are always performed when using SSL/TLS. |
| Put Response Body in Attribute | | | If set, the response body received back will be put into an attribute of the original FlowFile instead of a separate FlowFile. The attribute key to put to is determined by evaluating value of this property. |
| Always Output Response | false | true<br/>false | Will force a response FlowFile to be generated and routed to the 'Response' relationship regardless of what the server status code received is |
| Penalize on "No Retry" | false | true<br/>false | Enabling this property will penalize FlowFiles that are routed to the "No Retry" relationship. |
Expand Down Expand Up @@ -2075,12 +2073,7 @@ In the list below, the names of required properties appear in bold. Any other pr
| Queue Max Message | 1000 | | Maximum number of messages allowed on the producer queue |
| Compress Codec | none | none<br/>gzip<br/>snappy | compression codec to use for compressing message sets |
| Max Flow Segment Size | 0 B | | Maximum flow content payload segment size for the kafka record. 0 B means unlimited. |
| Security CA | | | DEPRECATED in favor of SSL Context Service. File or directory path to CA certificate(s) for verifying the broker's key |
| Security Cert | | | DEPRECATED in favor of SSL Context Service.Path to client's public key (PEM) used for authentication |
| Security Private Key | | | DEPRECATED in favor of SSL Context Service.Path to client's private key (PEM) used for authentication |
| Security Pass Phrase | | | DEPRECATED in favor of SSL Context Service.Private key passphrase<br/>**Sensitive Property: true** |
| Kafka Key | | | The key to use for the message. If not specified, the UUID of the flow file is used as the message key.<br/>**Supports Expression Language: true** |
| Message Key Field | | | DEPRECATED, does not work -- use Kafka Key instead |
| Debug contexts | | | A comma-separated list of debug contexts to enable.Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all |
| Fail empty flow files | true | true<br/>false | Keep backwards compatibility with <=0.7.0 bug which caused flow files with empty content to not be published to Kafka and forwarded to failure. The old behavior is deprecated. Use connections to drop empty flow files! |

Expand Down
1 change: 0 additions & 1 deletion docker/test/integration/features/kafka.feature
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka
And the Minifi logs contain the following message: "PublishKafka: client.id [client_no_42]" in less than 10 seconds
And the Minifi logs contain the following message: "PublishKafka: Message Key [unique_message_key_123]" in less than 10 seconds
And the Minifi logs contain the following message: "PublishKafka: DynamicProperty: [retry.backoff.ms] -> [150]" in less than 10 seconds
And the Minifi logs contain the following message: "The Message Key Field property is set. This property is DEPRECATED and has no effect; please use Kafka Key instead." in less than 10 seconds

Scenario: PublishKafka sends flowfiles to failure when the broker is not available
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
Expand Down
2 changes: 1 addition & 1 deletion extensions/librdkafka/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use_bundled_librdkafka(${CMAKE_SOURCE_DIR} ${CMAKE_BINARY_DIR})

include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)

file(GLOB SOURCES "*.cpp")
file(GLOB SOURCES "*.cpp" "migrators/*.cpp")

add_minifi_library(minifi-rdkafka-extensions SHARED ${SOURCES})

Expand Down
13 changes: 0 additions & 13 deletions extensions/librdkafka/PublishKafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,11 +388,6 @@ void PublishKafka::onSchedule(core::ProcessContext& context, core::ProcessSessio
conn_ = std::make_unique<KafkaConnection>(key_);
configureNewConnection(context);

std::string message_key_field;
if (context.getProperty(MessageKeyField, message_key_field) && !message_key_field.empty()) {
logger_->log_error("The {} property is set. This property is DEPRECATED and has no effect; please use Kafka Key instead.", MessageKeyField.name);
}

logger_->log_debug("Successfully configured PublishKafka");
}

Expand Down Expand Up @@ -641,14 +636,6 @@ std::optional<utils::net::SslData> PublishKafka::getSslData(core::ProcessContext
}

utils::net::SslData ssl_data;
if (auto security_ca = context.getProperty(SecurityCA))
ssl_data.ca_loc = *security_ca;
if (auto security_cert = context.getProperty(SecurityCert))
ssl_data.cert_loc = *security_cert;
if (auto security_private_key = context.getProperty(SecurityPrivateKey))
ssl_data.key_loc = *security_private_key;
if (auto security_private_key_pass = context.getProperty(SecurityPrivateKeyPassWord))
ssl_data.key_pw = *security_private_key_pass;
return ssl_data;
}

Expand Down
23 changes: 1 addition & 22 deletions extensions/librdkafka/PublishKafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,26 +148,10 @@ class PublishKafka : public KafkaProcessorBase {
.withPropertyType(core::StandardPropertyTypes::DATA_SIZE_TYPE)
.withDefaultValue("0 B")
.build();
EXTENSIONAPI static constexpr auto SecurityCA = core::PropertyDefinitionBuilder<>::createProperty("Security CA")
.withDescription("DEPRECATED in favor of SSL Context Service. File or directory path to CA certificate(s) for verifying the broker's key")
.build();
EXTENSIONAPI static constexpr auto SecurityCert = core::PropertyDefinitionBuilder<>::createProperty("Security Cert")
.withDescription("DEPRECATED in favor of SSL Context Service.Path to client's public key (PEM) used for authentication")
.build();
EXTENSIONAPI static constexpr auto SecurityPrivateKey = core::PropertyDefinitionBuilder<>::createProperty("Security Private Key")
.withDescription("DEPRECATED in favor of SSL Context Service.Path to client's private key (PEM) used for authentication")
.build();
EXTENSIONAPI static constexpr auto SecurityPrivateKeyPassWord = core::PropertyDefinitionBuilder<>::createProperty("Security Pass Phrase")
.withDescription("DEPRECATED in favor of SSL Context Service.Private key passphrase")
.isSensitive(true)
.build();
EXTENSIONAPI static constexpr auto KafkaKey = core::PropertyDefinitionBuilder<>::createProperty("Kafka Key")
.withDescription("The key to use for the message. If not specified, the UUID of the flow file is used as the message key.")
.supportsExpressionLanguage(true)
.build();
EXTENSIONAPI static constexpr auto MessageKeyField = core::PropertyDefinitionBuilder<>::createProperty("Message Key Field")
.withDescription("DEPRECATED, does not work -- use Kafka Key instead")
.build();
EXTENSIONAPI static constexpr auto DebugContexts = core::PropertyDefinitionBuilder<>::createProperty("Debug contexts")
.withDescription("A comma-separated list of debug contexts to enable."
"Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all")
Expand All @@ -179,7 +163,7 @@ class PublishKafka : public KafkaProcessorBase {
.withPropertyType(core::StandardPropertyTypes::BOOLEAN_TYPE)
.withDefaultValue("true")
.build();
EXTENSIONAPI static constexpr auto Properties = utils::array_cat(KafkaProcessorBase::Properties, std::array<core::PropertyReference, 23>{
EXTENSIONAPI static constexpr auto Properties = utils::array_cat(KafkaProcessorBase::Properties, std::array<core::PropertyReference, 18>{
SeedBrokers,
Topic,
DeliveryGuarantee,
Expand All @@ -195,12 +179,7 @@ class PublishKafka : public KafkaProcessorBase {
QueueBufferMaxMessage,
CompressCodec,
MaxFlowSegSize,
SecurityCA,
SecurityCert,
SecurityPrivateKey,
SecurityPrivateKeyPassWord,
KafkaKey,
MessageKeyField,
DebugContexts,
FailEmptyFlowFiles
});
Expand Down
83 changes: 83 additions & 0 deletions extensions/librdkafka/migrators/PublishKafkaMigrator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "PublishKafkaMigrator.h"

#include "core/Resource.h"
#include "core/flow/FlowSchema.h"
#include "controllers/SSLContextService.h"
#include "../PublishKafka.h"


namespace org::apache::nifi::minifi::kafka::migration {

namespace {
constexpr std::string_view DEPRECATED_MESSAGE_KEY_FIELD = "Message Key Field";
constexpr std::string_view DEPRECATED_SECURITY_CA = "Security CA";
constexpr std::string_view DEPRECATED_SECURITY_CERT = "Security Cert";
constexpr std::string_view DEPRECATED_SECURITY_PRIVATE_KEY = "Security Private Key";
constexpr std::string_view DEPRECATED_SECURITY_PASS_PHRASE = "Security Pass Phrase";

void migrateKafkaPropertyToSSLContextService(
const std::string_view deprecated_publish_kafka_property,
const std::string_view ssl_context_service_property,
core::flow::Node& publish_kafka_properties,
core::flow::Node& ssl_controller_service_properties) {
const auto security_ca = publish_kafka_properties.getMember(deprecated_publish_kafka_property);
if (const auto security_ca_str = security_ca ? security_ca.getString() : std::nullopt) {
ssl_controller_service_properties.addMember(ssl_context_service_property, *security_ca_str);
}

std::ignore = publish_kafka_properties.remove(deprecated_publish_kafka_property);
}
} // namespace

void PublishKafkaMigrator::migrate(core::flow::Node& root_node, const core::flow::FlowSchema& schema) {
auto publish_kafka_processors = getProcessors(root_node, schema, "PublishKafka");
for (auto& publish_kafka_processor : publish_kafka_processors) {
auto publish_kafka_properties = publish_kafka_processor[schema.processor_properties];
if (publish_kafka_properties.remove(DEPRECATED_MESSAGE_KEY_FIELD)) {
logger_->log_warn("Removed deprecated property \"{}\" from {}", DEPRECATED_MESSAGE_KEY_FIELD, *publish_kafka_processor[schema.identifier].getString());
}
if (publish_kafka_properties.contains(DEPRECATED_SECURITY_CA) ||
publish_kafka_properties.contains(DEPRECATED_SECURITY_CERT) ||
publish_kafka_properties.contains(DEPRECATED_SECURITY_PRIVATE_KEY) ||
publish_kafka_properties.contains(DEPRECATED_SECURITY_PASS_PHRASE)) {
std::string publish_kafka_id_str = publish_kafka_processor[schema.identifier].getString().value_or(std::string{utils::IdGenerator::getIdGenerator()->generate().to_string()});
auto ssl_context_service_name = fmt::format("GeneratedSSLContextServiceFor_{}", publish_kafka_id_str);
auto root_group = root_node[schema.root_group];
auto controller_services = root_group[schema.controller_services];
auto ssl_controller_service = *controller_services.pushBack();
ssl_controller_service.addMember(schema.name[0], ssl_context_service_name);
ssl_controller_service.addMember(schema.identifier[0], utils::IdGenerator::getIdGenerator()->generate().to_string().c_str());
ssl_controller_service.addMember(schema.type[0], "SSLContextService");

publish_kafka_properties.addMember(processors::PublishKafka::SSLContextService.name, ssl_context_service_name);
auto ssl_controller_service_properties = ssl_controller_service.addObject(schema.controller_service_properties[0]);

migrateKafkaPropertyToSSLContextService(DEPRECATED_SECURITY_CA, controllers::SSLContextService::CACertificate.name, publish_kafka_properties, *ssl_controller_service_properties);
migrateKafkaPropertyToSSLContextService(DEPRECATED_SECURITY_CERT, controllers::SSLContextService::ClientCertificate.name, publish_kafka_properties, *ssl_controller_service_properties);
migrateKafkaPropertyToSSLContextService(DEPRECATED_SECURITY_PRIVATE_KEY, controllers::SSLContextService::PrivateKey.name, publish_kafka_properties, *ssl_controller_service_properties);
migrateKafkaPropertyToSSLContextService(DEPRECATED_SECURITY_PASS_PHRASE, controllers::SSLContextService::Passphrase.name, publish_kafka_properties, *ssl_controller_service_properties);

logger_->log_warn("Removed deprecated Security Properties from {} and replaced them with SSLContextService", *publish_kafka_processor[schema.identifier].getString());
}
}
}

REGISTER_RESOURCE(PublishKafkaMigrator, FlowMigrator);
} // namespace org::apache::nifi::minifi::kafka::migration
Loading

0 comments on commit 810d7e2

Please sign in to comment.