Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINIFICPP-2485 fix: Support Expression Language in InvokeHTTP "Remote URL" property #1904

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 56 additions & 10 deletions extensions/standard-processors/processors/InvokeHTTP.cpp
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The used_clients_ list seems unnecessary. I'd just move the used client ownership into the local context (client wrapper?) of the thread while it's used, without also keeping another list of them that needs synchronization. This way only acquiring and releasing clients need to be synchronized across threads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would we make sure that the client returned to the HttpClientStore was a client managed by this client store in this case? Or should we just accept any clients? I'm not sure if we should manage clients not created by the same client store. Also if the used clients list is removed it should probably still needs to be replaced by at least a counter to see how many clients are created and in use to be compared with the defined max size.

Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,45 @@
#include "range/v3/algorithm/any_of.hpp"

namespace org::apache::nifi::minifi::processors {
namespace invoke_http {

HttpClientStore::HttpClientWrapper HttpClientStore::getClient(const std::string& url) {
std::unique_lock<std::mutex> lock(clients_mutex_);
for (auto it = unused_clients_.begin(); it != unused_clients_.end(); ++it) {
if ((*it)->getURL() == url) {
used_clients_.splice(used_clients_.end(), unused_clients_, it);
return {*this, **it};
}
}

if (used_clients_.size() + unused_clients_.size() < max_size_) {
auto client = create_client_function_(url);
used_clients_.push_back(std::move(client));
return {*this, *used_clients_.back()};
} else {
cv_.wait(lock, [this] { return !unused_clients_.empty(); });
auto client = create_client_function_(url);
unused_clients_.front() = std::move(client);
used_clients_.splice(used_clients_.end(), unused_clients_, unused_clients_.begin());
return {*this, *used_clients_.back()};
}
}

void HttpClientStore::returnClient(minifi::http::HTTPClient& client) {
std::unique_lock<std::mutex> lock(clients_mutex_);
for (auto it = used_clients_.begin(); it != used_clients_.end(); ++it) {
if (it->get() != &client) {
continue;
}
unused_clients_.splice(unused_clients_.end(), used_clients_, it);
lock.unlock();
cv_.notify_one();
return;
}
logger_->log_error("Couldn't find HTTP client in client store to be returned");
}

} // namespace invoke_http

std::string InvokeHTTP::DefaultContentType = "application/octet-stream";

Expand Down Expand Up @@ -64,8 +103,9 @@ void setupClientTransferEncoding(http::HTTPClient& client, bool use_chunked_enco
} // namespace

void InvokeHTTP::setupMembersFromProperties(const core::ProcessContext& context) {
if (!context.getProperty(URL, url_))
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "URL property missing or invalid");
std::string url;
if (!context.getProperty(URL, url) || url.empty())
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "URL property missing or empty");

method_ = utils::parseEnumProperty<http::HttpRequestMethod>(context, Method);

Expand Down Expand Up @@ -121,9 +161,9 @@ void InvokeHTTP::setupMembersFromProperties(const core::ProcessContext& context)
}
}

std::unique_ptr<minifi::http::HTTPClient> InvokeHTTP::createHTTPClientFromMembers() const {
gsl::not_null<std::unique_ptr<minifi::http::HTTPClient>> InvokeHTTP::createHTTPClientFromMembers(const std::string& url) const {
auto client = std::make_unique<minifi::http::HTTPClient>();
client->initialize(method_, url_, ssl_context_service_);
client->initialize(method_, url, ssl_context_service_);
setupClientTimeouts(*client, connect_timeout_, read_timeout_);
client->setHTTPProxy(proxy_);
client->setFollowRedirects(follow_redirects_);
Expand All @@ -133,18 +173,18 @@ std::unique_ptr<minifi::http::HTTPClient> InvokeHTTP::createHTTPClientFromMember
client->setMaximumUploadSpeed(maximum_upload_speed_.getValue());
client->setMaximumDownloadSpeed(maximum_download_speed_.getValue());

return client;
return gsl::make_not_null(std::move(client));
}


void InvokeHTTP::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
setupMembersFromProperties(context);

auto create_client = [this]() -> std::unique_ptr<minifi::http::HTTPClient> {
return createHTTPClientFromMembers();
auto create_client = [this](const std::string& url) -> gsl::not_null<std::unique_ptr<minifi::http::HTTPClient>> {
return createHTTPClientFromMembers(url);
};

client_queue_ = utils::ResourceQueue<http::HTTPClient>::create(create_client, getMaxConcurrentTasks(), std::nullopt, logger_);
client_queue_ = std::make_unique<invoke_http::HttpClientStore>(getMaxConcurrentTasks() * 2, create_client);
}

bool InvokeHTTP::shouldEmitFlowFile() const {
Expand Down Expand Up @@ -202,9 +242,15 @@ void InvokeHTTP::onTrigger(core::ProcessContext& context, core::ProcessSession&
logger_->log_debug("InvokeHTTP -- Received flowfile");
}

auto client = client_queue_->getResource();
auto url = context.getProperty(URL, flow_file.get());
if (!url || url->empty()) {
logger_->log_error("InvokeHTTP -- URL is empty, transferring to failure");
session.transfer(flow_file, RelFailure);
return;
}

onTriggerWithClient(context, session, flow_file, *client);
auto client = client_queue_->getClient(*url);
onTriggerWithClient(context, session, flow_file, client.get());
}

void InvokeHTTP::onTriggerWithClient(core::ProcessContext& context, core::ProcessSession& session,
Expand Down
58 changes: 55 additions & 3 deletions extensions/standard-processors/processors/InvokeHTTP.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <string>
#include <unordered_map>
#include <utility>
#include <optional>
#include <list>

#include "Core.h"
#include "FlowFileRecord.h"
Expand All @@ -41,9 +43,59 @@
#include "utils/Enum.h"
#include "utils/RegexUtils.h"

namespace org::apache::nifi::minifi::test {
class HttpClientStoreTestAccessor;
}

namespace org::apache::nifi::minifi::processors {

namespace invoke_http {
class HttpClientStore {
public:
HttpClientStore(const size_t size, std::function<gsl::not_null<std::unique_ptr<minifi::http::HTTPClient>>(const std::string&)> create_client_function)
: max_size_(size),
create_client_function_(std::move(create_client_function)) {
}
HttpClientStore(const HttpClientStore&) = delete;
HttpClientStore& operator=(const HttpClientStore&) = delete;
HttpClientStore(HttpClientStore&&) = delete;
HttpClientStore& operator=(HttpClientStore&&) = delete;
~HttpClientStore() = default;

class HttpClientWrapper {
public:
HttpClientWrapper(HttpClientStore& client_store, minifi::http::HTTPClient& client) : client_(client), client_store_(client_store) {
}
HttpClientWrapper(HttpClientWrapper&& src) = default;
HttpClientWrapper(const HttpClientWrapper&) = delete;
~HttpClientWrapper() {
client_store_.returnClient(client_);
}

minifi::http::HTTPClient& get() const {
return client_;
}

private:
minifi::http::HTTPClient& client_;
HttpClientStore& client_store_;
};

[[nodiscard]] HttpClientWrapper getClient(const std::string& url);

private:
friend class ::org::apache::nifi::minifi::test::HttpClientStoreTestAccessor;
void returnClient(minifi::http::HTTPClient& client);

std::mutex clients_mutex_;
std::condition_variable cv_;
const size_t max_size_;
std::list<gsl::not_null<std::unique_ptr<minifi::http::HTTPClient>>> used_clients_;
std::list<gsl::not_null<std::unique_ptr<minifi::http::HTTPClient>>> unused_clients_;
std::function<gsl::not_null<std::unique_ptr<minifi::http::HTTPClient>>(const std::string&)> create_client_function_;
Comment on lines +93 to +95
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could do without the extra indirection of unique_ptr. List splicing doesn't move the data on the heap.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, unfortunately we would still need to copy or move the HTTPClient on line 56 and line 176 and those constructors are delete from the HTTPClient class.

std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<HttpClientWrapper>::getLogger()};
};

enum class InvalidHTTPHeaderFieldHandlingOption {
fail,
transform,
Expand Down Expand Up @@ -276,7 +328,7 @@ class InvokeHTTP : public core::Processor {


void setupMembersFromProperties(const core::ProcessContext& context);
std::unique_ptr<minifi::http::HTTPClient> createHTTPClientFromMembers() const;
gsl::not_null<std::unique_ptr<minifi::http::HTTPClient>> createHTTPClientFromMembers(const std::string& url) const;

http::HttpRequestMethod method_{};
std::optional<utils::Regex> attributes_to_send_;
Expand All @@ -289,7 +341,6 @@ class InvokeHTTP : public core::Processor {
core::DataTransferSpeedValue maximum_upload_speed_{0};
core::DataTransferSpeedValue maximum_download_speed_{0};

std::string url_;
std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;

std::chrono::milliseconds connect_timeout_{std::chrono::seconds(30)};
Expand All @@ -303,7 +354,8 @@ class InvokeHTTP : public core::Processor {
invoke_http::InvalidHTTPHeaderFieldHandlingOption invalid_http_header_field_handling_strategy_{};

std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<InvokeHTTP>::getLogger(uuid_)};
std::shared_ptr<utils::ResourceQueue<http::HTTPClient>> client_queue_;

std::unique_ptr<invoke_http::HttpClientStore> client_queue_;
};

} // namespace org::apache::nifi::minifi::processors
Loading