Skip to content

Commit

Permalink
review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
martinzink committed Oct 31, 2024
1 parent 2ae7246 commit f7f0fe3
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 41 deletions.
6 changes: 1 addition & 5 deletions extensions/librdkafka/KafkaProcessorBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,13 @@

namespace org::apache::nifi::minifi::processors {

std::optional<utils::net::SslData> KafkaProcessorBase::getSslData(core::ProcessContext& context) const {
return utils::net::getSslData(context, SSLContextService, logger_);
}

void KafkaProcessorBase::setKafkaAuthenticationParameters(core::ProcessContext& context, gsl::not_null<rd_kafka_conf_t*> config) {
security_protocol_ = utils::parseEnumProperty<kafka::SecurityProtocolOption>(context, SecurityProtocol);
utils::setKafkaConfigurationField(*config, "security.protocol", std::string{magic_enum::enum_name(security_protocol_)});
logger_->log_debug("Kafka security.protocol [{}]", magic_enum::enum_name(security_protocol_));
if (security_protocol_ == kafka::SecurityProtocolOption::ssl || security_protocol_ == kafka::SecurityProtocolOption::sasl_ssl) {
auto ssl_data = getSslData(context);
if (ssl_data) {
if (auto ssl_data = utils::net::getSslData(context, SSLContextService, logger_)) {
if (ssl_data->ca_loc.empty() && ssl_data->cert_loc.empty() && ssl_data->key_loc.empty() && ssl_data->key_pw.empty()) {
logger_->log_warn("Security protocol is set to {}, but no valid security parameters are set in the properties or in the SSL Context Service.",
magic_enum::enum_name(security_protocol_));
Expand Down
1 change: 0 additions & 1 deletion extensions/librdkafka/KafkaProcessorBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ class KafkaProcessorBase : public core::Processor {
}

protected:
virtual std::optional<utils::net::SslData> getSslData(core::ProcessContext& context) const;
void setKafkaAuthenticationParameters(core::ProcessContext& context, gsl::not_null<rd_kafka_conf_t*> config);

kafka::SecurityProtocolOption security_protocol_{};
Expand Down
9 changes: 0 additions & 9 deletions extensions/librdkafka/PublishKafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -630,15 +630,6 @@ bool PublishKafka::createNewTopic(core::ProcessContext& context, const std::stri
return true;
}

std::optional<utils::net::SslData> PublishKafka::getSslData(core::ProcessContext& context) const {
if (auto result = KafkaProcessorBase::getSslData(context); result) {
return result;
}

utils::net::SslData ssl_data;
return ssl_data;
}

void PublishKafka::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
// Check whether we have been interrupted
if (interrupted_) {
Expand Down
1 change: 0 additions & 1 deletion extensions/librdkafka/PublishKafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ class PublishKafka : public KafkaProcessorBase {
protected:
bool configureNewConnection(core::ProcessContext& context);
bool createNewTopic(core::ProcessContext& context, const std::string& topic_name, const std::shared_ptr<core::FlowFile>& flow_file);
std::optional<utils::net::SslData> getSslData(core::ProcessContext& context) const override;

private:
KafkaConnectionKey key_;
Expand Down
15 changes: 9 additions & 6 deletions extensions/librdkafka/migrators/PublishKafkaMigrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ void migrateKafkaPropertyToSSLContextService(
const std::string_view ssl_context_service_property,
core::flow::Node& publish_kafka_properties,
core::flow::Node& ssl_controller_service_properties) {
const auto security_ca = publish_kafka_properties.getMember(deprecated_publish_kafka_property);
if (const auto security_ca_str = security_ca ? security_ca.getString() : std::nullopt) {
ssl_controller_service_properties.addMember(ssl_context_service_property, *security_ca_str);
const auto property_value = publish_kafka_properties.getMember(deprecated_publish_kafka_property);
if (const auto property_value_str = property_value ? property_value.getString() : std::nullopt) {
ssl_controller_service_properties.addMember(ssl_context_service_property, *property_value_str);
}

std::ignore = publish_kafka_properties.remove(deprecated_publish_kafka_property);
Expand All @@ -57,8 +57,11 @@ void PublishKafkaMigrator::migrate(core::flow::Node& root_node, const core::flow
publish_kafka_properties.contains(DEPRECATED_SECURITY_CERT) ||
publish_kafka_properties.contains(DEPRECATED_SECURITY_PRIVATE_KEY) ||
publish_kafka_properties.contains(DEPRECATED_SECURITY_PASS_PHRASE)) {
std::string publish_kafka_id_str = publish_kafka_processor[schema.identifier].getString().value_or(std::string{utils::IdGenerator::getIdGenerator()->generate().to_string()});
auto ssl_context_service_name = fmt::format("GeneratedSSLContextServiceFor_{}", publish_kafka_id_str);
auto publish_kafka_id_str = publish_kafka_processor[schema.identifier].getString();
if (!publish_kafka_id_str) {
throw Exception(PROCESSOR_EXCEPTION, "Missing identifier for PublishKafka");
}
auto ssl_context_service_name = fmt::format("GeneratedSSLContextServiceFor_{}", *publish_kafka_id_str);
auto root_group = root_node[schema.root_group];
auto controller_services = root_group[schema.controller_services];
auto ssl_controller_service = *controller_services.pushBack();
Expand All @@ -74,7 +77,7 @@ void PublishKafkaMigrator::migrate(core::flow::Node& root_node, const core::flow
migrateKafkaPropertyToSSLContextService(DEPRECATED_SECURITY_PRIVATE_KEY, controllers::SSLContextService::PrivateKey.name, publish_kafka_properties, *ssl_controller_service_properties);
migrateKafkaPropertyToSSLContextService(DEPRECATED_SECURITY_PASS_PHRASE, controllers::SSLContextService::Passphrase.name, publish_kafka_properties, *ssl_controller_service_properties);

logger_->log_warn("Removed deprecated Security Properties from {} and replaced them with SSLContextService", *publish_kafka_processor[schema.identifier].getString());
logger_->log_warn("Removed deprecated Security Properties from {} and replaced them with SSLContextService", *publish_kafka_id_str);
}
}
}
Expand Down
27 changes: 15 additions & 12 deletions extensions/librdkafka/tests/PublishKafkaMigratorTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
#include "PublishKafka.h"
#include "unit/ConfigurationTestController.h"
#include "core/flow/AdaptiveConfiguration.h"
#include "utils/crypto/property_encryption/PropertyEncryptionUtils.h"

#include "yaml-cpp/yaml.h"

namespace org::apache::nifi::minifi::test {

using minifi::utils::crypto::property_encryption::decrypt;

TEST_CASE("PublishKafkaMigratorTest yaml") {
static constexpr std::string_view ORIGINAL_YAML = R"(
MiNiFi Config Version: 3
Expand Down Expand Up @@ -91,14 +94,14 @@ Remote Processing Groups: []
CHECK(migrated_flow["Controller Services"][0]["Properties"]["CA Certificate"].as<std::string>() == "/tmp/resources/certs/ca-cert");
CHECK(migrated_flow["Controller Services"][0]["Properties"]["Client Certificate"].as<std::string>() == "/tmp/resources/certs/client_test_client_client.pem");
CHECK(migrated_flow["Controller Services"][0]["Properties"]["Private Key"].as<std::string>() == "/tmp/resources/certs/client_test_client_client.key");
CHECK(migrated_flow["Controller Services"][0]["Properties"]["CA Certificate"].as<std::string>() == "/tmp/resources/certs/ca-cert");
CHECK(decrypt(migrated_flow["Controller Services"][0]["Properties"]["Passphrase"].as<std::string>(), test_controller.sensitive_values_encryptor_) == "abcdefgh");

CHECK(!migrated_flow["Processors"][1]["Properties"]["Message Key Field"].IsDefined());
CHECK_FALSE(migrated_flow["Processors"][1]["Properties"]["Message Key Field"].IsDefined());

CHECK(!migrated_flow["Processors"][1]["Properties"]["Security CA"].IsDefined());
CHECK(!migrated_flow["Processors"][1]["Properties"]["Security Cert"].IsDefined());
CHECK(!migrated_flow["Processors"][1]["Properties"]["Security Pass Phrase"].IsDefined());
CHECK(!migrated_flow["Processors"][1]["Properties"]["Security Private Key"].IsDefined());
CHECK_FALSE(migrated_flow["Processors"][1]["Properties"]["Security CA"].IsDefined());
CHECK_FALSE(migrated_flow["Processors"][1]["Properties"]["Security Cert"].IsDefined());
CHECK_FALSE(migrated_flow["Processors"][1]["Properties"]["Security Pass Phrase"].IsDefined());
CHECK_FALSE(migrated_flow["Processors"][1]["Properties"]["Security Private Key"].IsDefined());

CHECK(migrated_flow["Processors"][1]["Properties"]["SSL Context Service"].as<std::string>() == "GeneratedSSLContextServiceFor_8a534b4a-2b4a-4e1e-ab07-8a09fa08f848");
}
Expand Down Expand Up @@ -181,14 +184,14 @@ TEST_CASE("PublishKafkaMigratorTest json") {
CHECK(migrated_flow["rootGroup"]["controllerServices"][0]["properties"]["CA Certificate"] == "/tmp/resources/certs/ca-cert");
CHECK(migrated_flow["rootGroup"]["controllerServices"][0]["properties"]["Client Certificate"] == "/tmp/resources/certs/client_test_client_client.pem");
CHECK(migrated_flow["rootGroup"]["controllerServices"][0]["properties"]["Private Key"] == "/tmp/resources/certs/client_test_client_client.key");
CHECK(migrated_flow["rootGroup"]["controllerServices"][0]["properties"]["CA Certificate"] == "/tmp/resources/certs/ca-cert");
CHECK(decrypt(migrated_flow["rootGroup"]["controllerServices"][0]["properties"]["Passphrase"].GetString(), test_controller.sensitive_values_encryptor_) == "abcdefgh");

CHECK(!migrated_flow["rootGroup"]["processors"][1]["properties"].HasMember("Message Key Field"));
CHECK_FALSE(migrated_flow["rootGroup"]["processors"][1]["properties"].HasMember("Message Key Field"));

CHECK(!migrated_flow["rootGroup"]["processors"][1]["properties"].HasMember("Security CA"));
CHECK(!migrated_flow["rootGroup"]["processors"][1]["properties"].HasMember("Security Cert"));
CHECK(!migrated_flow["rootGroup"]["processors"][1]["properties"].HasMember("Security Pass Phrase"));
CHECK(!migrated_flow["rootGroup"]["processors"][1]["properties"].HasMember("Security Private Key"));
CHECK_FALSE(migrated_flow["rootGroup"]["processors"][1]["properties"].HasMember("Security CA"));
CHECK_FALSE(migrated_flow["rootGroup"]["processors"][1]["properties"].HasMember("Security Cert"));
CHECK_FALSE(migrated_flow["rootGroup"]["processors"][1]["properties"].HasMember("Security Pass Phrase"));
CHECK_FALSE(migrated_flow["rootGroup"]["processors"][1]["properties"].HasMember("Security Private Key"));

CHECK(migrated_flow["rootGroup"]["processors"][1]["properties"]["SSL Context Service"] == "GeneratedSSLContextServiceFor_8a534b4a-2b4a-4e1e-ab07-8a09fa08f848");
}
Expand Down
11 changes: 5 additions & 6 deletions libminifi/include/core/flow/FlowMigrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class FlowMigrator : public CoreComponent {
virtual void migrate(Node& flow_root, const FlowSchema& schema) = 0;

protected:
void doOnProcessGroup(Node& process_group, const FlowSchema& schema, auto func) const {
void doOnProcessGroup(const Node& process_group, const FlowSchema& schema, auto func) const {
func(process_group, schema);
const auto process_group_children = process_group[schema.process_groups];
if (process_group.isSequence()) {
Expand All @@ -42,11 +42,10 @@ class FlowMigrator : public CoreComponent {

[[nodiscard]] std::vector<Node> getProcessors(const Node& root_node, const FlowSchema& schema, const std::string_view processor_to_get) const {
std::vector<Node> processors;
auto root_group = root_node[schema.root_group];
doOnProcessGroup(root_group, schema, [&processors, processor_to_get](auto process_group, const FlowSchema& schema) {
for (auto processor_node : process_group[schema.processors]) {
auto processor_type_str = processor_node[schema.type].getString();
if (auto processor_type = processor_node[schema.type].getString(); processor_type && processor_type->find(processor_to_get) != std::string::npos) {
const Node& root_group = root_node[schema.root_group];
doOnProcessGroup(root_group, schema, [&processors, processor_to_get](const Node& process_group, const FlowSchema& flow_schema) {
for (const auto& processor_node : process_group[flow_schema.processors]) {
if (const auto processor_type_str = processor_node[flow_schema.type].getString(); processor_type_str && processor_type_str->find(processor_to_get) != std::string::npos) {
processors.push_back(processor_node);
}
}
Expand Down
2 changes: 1 addition & 1 deletion libminifi/src/core/flow/StructuredConfiguration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,7 @@ void StructuredConfiguration::migrate(Node& root_node, const FlowSchema& schema)
logger_->log_error("Caught Exception during flow {}::migration, type: {}, what: {}", flow_migrator->getName(), typeid(exception).name(), exception.what());
}
} else {
logger_->log_error("ResourceType::FlowMigrator is not a core::flow::FlowMigrator");
logger_->log_critical("{} is registered as a flow migrator, but it is not a subclass of core::flow::FlowMigrator", flow_migrator_class);
}
}
}
Expand Down

0 comments on commit f7f0fe3

Please sign in to comment.