From db8dd864daffea5c79348f8f208ececb865c1d13 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Wed, 4 Dec 2024 11:48:08 +0100 Subject: [PATCH 1/3] MINIFICPP-2485 fix: Support Expression Language in InvokeHTTP "Request URL" property --- .../processors/InvokeHTTP.cpp | 87 ++++++- .../processors/InvokeHTTP.h | 58 ++++- .../tests/unit/HttpClientStoreTests.cpp | 223 ++++++++++++++++++ 3 files changed, 355 insertions(+), 13 deletions(-) create mode 100644 extensions/standard-processors/tests/unit/HttpClientStoreTests.cpp diff --git a/extensions/standard-processors/processors/InvokeHTTP.cpp b/extensions/standard-processors/processors/InvokeHTTP.cpp index 7104359a8c..8679615f88 100644 --- a/extensions/standard-processors/processors/InvokeHTTP.cpp +++ b/extensions/standard-processors/processors/InvokeHTTP.cpp @@ -35,6 +35,66 @@ #include "range/v3/algorithm/any_of.hpp" namespace org::apache::nifi::minifi::processors { +namespace invoke_http { + +namespace { +template +void rotateRight(ForwardIt first, ForwardIt last, size_t n) { + if (first == last) return; + auto size = std::distance(first, last); + n = n % size; + std::rotate(first, std::prev(last, n), last); +} +} // namespace + +HttpClientStore::HttpClientWrapper HttpClientStore::getClient(const std::string& url) { + std::unique_lock lock(clients_mutex_); + for (size_t i = 0; i < clients_created_; ++i) { + gsl_Assert(clients_[i].first); + auto& [client, in_use] = clients_[i]; + if (client->getURL() == url && !in_use) { + in_use = true; + std::rotate(clients_.begin() + gsl::narrow(i), clients_.begin() + gsl::narrow(i + 1), clients_.begin() + gsl::narrow(clients_created_)); + return {*this, *client}; + } + } + + if (clients_created_ < max_size_) { + gsl_Assert(!clients_[clients_created_].first); + auto client = create_client_function_(url); + clients_[clients_created_] = std::make_pair(std::move(client), true); + ++clients_created_; + return {*this, *clients_[clients_created_ - 1].first}; + } else { + cv_.wait(lock, [this] { return !clients_[0].second; }); + gsl_Assert(!clients_[0].second); + auto client = create_client_function_(url); + clients_[0] = std::make_pair(std::move(client), true); + std::rotate(clients_.begin(), clients_.begin() + 1, clients_.begin() + gsl::narrow(clients_created_)); + return {*this, *clients_[clients_created_ - 1].first}; + } +} + +void HttpClientStore::returnClient(minifi::http::HTTPClient& client) { + std::unique_lock lock(clients_mutex_); + int64_t last_unused = -1; + for (size_t i = 0; i < clients_created_; ++i) { + if (clients_[i].first.get() != &client) { + if (!clients_[i].second) { + last_unused = gsl::narrow(i); + } + continue; + } + clients_[i].second = false; + rotateRight(clients_.begin() + last_unused + 1, clients_.begin() + gsl::narrow(i) + 1, 1); + lock.unlock(); + cv_.notify_one(); + return; + } + logger_->log_error("Couldn't find HTTP client in client store to be returned"); +} + +} // namespace invoke_http std::string InvokeHTTP::DefaultContentType = "application/octet-stream"; @@ -64,8 +124,9 @@ void setupClientTransferEncoding(http::HTTPClient& client, bool use_chunked_enco } // namespace void InvokeHTTP::setupMembersFromProperties(const core::ProcessContext& context) { - if (!context.getProperty(URL, url_)) - throw Exception(PROCESS_SCHEDULE_EXCEPTION, "URL property missing or invalid"); + std::string url; + if (!context.getProperty(URL, url) || url.empty()) + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "URL property missing or empty"); method_ = utils::parseEnumProperty(context, Method); @@ -121,9 +182,9 @@ void InvokeHTTP::setupMembersFromProperties(const core::ProcessContext& context) } } -std::unique_ptr InvokeHTTP::createHTTPClientFromMembers() const { +gsl::not_null> InvokeHTTP::createHTTPClientFromMembers(const std::string& url) const { auto client = std::make_unique(); - client->initialize(method_, url_, ssl_context_service_); + client->initialize(method_, url, ssl_context_service_); setupClientTimeouts(*client, connect_timeout_, read_timeout_); client->setHTTPProxy(proxy_); client->setFollowRedirects(follow_redirects_); @@ -133,18 +194,18 @@ std::unique_ptr InvokeHTTP::createHTTPClientFromMember client->setMaximumUploadSpeed(maximum_upload_speed_.getValue()); client->setMaximumDownloadSpeed(maximum_download_speed_.getValue()); - return client; + return gsl::make_not_null(std::move(client)); } void InvokeHTTP::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { setupMembersFromProperties(context); - auto create_client = [this]() -> std::unique_ptr { - return createHTTPClientFromMembers(); + auto create_client = [this](const std::string& url) -> gsl::not_null> { + return createHTTPClientFromMembers(url); }; - client_queue_ = utils::ResourceQueue::create(create_client, getMaxConcurrentTasks(), std::nullopt, logger_); + client_queue_ = std::make_unique(getMaxConcurrentTasks() * 2, create_client); } bool InvokeHTTP::shouldEmitFlowFile() const { @@ -202,9 +263,15 @@ void InvokeHTTP::onTrigger(core::ProcessContext& context, core::ProcessSession& logger_->log_debug("InvokeHTTP -- Received flowfile"); } - auto client = client_queue_->getResource(); + auto url = context.getProperty(URL, flow_file.get()); + if (!url || url->empty()) { + logger_->log_error("InvokeHTTP -- URL is empty, transferring to failure"); + session.transfer(flow_file, RelFailure); + return; + } - onTriggerWithClient(context, session, flow_file, *client); + auto client = client_queue_->getClient(*url); + onTriggerWithClient(context, session, flow_file, client.get()); } void InvokeHTTP::onTriggerWithClient(core::ProcessContext& context, core::ProcessSession& session, diff --git a/extensions/standard-processors/processors/InvokeHTTP.h b/extensions/standard-processors/processors/InvokeHTTP.h index bb5e9d2837..a1636dd1a8 100644 --- a/extensions/standard-processors/processors/InvokeHTTP.h +++ b/extensions/standard-processors/processors/InvokeHTTP.h @@ -23,6 +23,7 @@ #include #include #include +#include #include "Core.h" #include "FlowFileRecord.h" @@ -41,9 +42,60 @@ #include "utils/Enum.h" #include "utils/RegexUtils.h" +namespace org::apache::nifi::minifi::test { +class HttpClientStoreTestAccessor; +} + namespace org::apache::nifi::minifi::processors { namespace invoke_http { +class HttpClientStore { + public: + HttpClientStore(const size_t size, std::function>(const std::string&)> create_client_function) + : max_size_(size), + create_client_function_(std::move(create_client_function)) { + clients_.resize(size); + } + HttpClientStore(const HttpClientStore&) = delete; + HttpClientStore& operator=(const HttpClientStore&) = delete; + HttpClientStore(HttpClientStore&&) = delete; + HttpClientStore& operator=(HttpClientStore&&) = delete; + ~HttpClientStore() = default; + + class HttpClientWrapper { + public: + HttpClientWrapper(HttpClientStore& client_store, minifi::http::HTTPClient& client) : client_(client), client_store_(client_store) { + } + HttpClientWrapper(HttpClientWrapper&& src) = default; + HttpClientWrapper(const HttpClientWrapper&) = delete; + ~HttpClientWrapper() { + client_store_.returnClient(client_); + } + + minifi::http::HTTPClient& get() const { + return client_; + } + + private: + minifi::http::HTTPClient& client_; + HttpClientStore& client_store_; + }; + + [[nodiscard]] HttpClientWrapper getClient(const std::string& url); + + private: + friend class ::org::apache::nifi::minifi::test::HttpClientStoreTestAccessor; + void returnClient(minifi::http::HTTPClient& client); + + std::mutex clients_mutex_; + std::condition_variable cv_; + const size_t max_size_; + size_t clients_created_{0}; + std::vector, bool>> clients_; + std::function>(const std::string&)> create_client_function_; + std::shared_ptr logger_{core::logging::LoggerFactory::getLogger()}; +}; + enum class InvalidHTTPHeaderFieldHandlingOption { fail, transform, @@ -276,7 +328,7 @@ class InvokeHTTP : public core::Processor { void setupMembersFromProperties(const core::ProcessContext& context); - std::unique_ptr createHTTPClientFromMembers() const; + gsl::not_null> createHTTPClientFromMembers(const std::string& url) const; http::HttpRequestMethod method_{}; std::optional attributes_to_send_; @@ -289,7 +341,6 @@ class InvokeHTTP : public core::Processor { core::DataTransferSpeedValue maximum_upload_speed_{0}; core::DataTransferSpeedValue maximum_download_speed_{0}; - std::string url_; std::shared_ptr ssl_context_service_; std::chrono::milliseconds connect_timeout_{std::chrono::seconds(30)}; @@ -303,7 +354,8 @@ class InvokeHTTP : public core::Processor { invoke_http::InvalidHTTPHeaderFieldHandlingOption invalid_http_header_field_handling_strategy_{}; std::shared_ptr logger_{core::logging::LoggerFactory::getLogger(uuid_)}; - std::shared_ptr> client_queue_; + + std::unique_ptr client_queue_; }; } // namespace org::apache::nifi::minifi::processors diff --git a/extensions/standard-processors/tests/unit/HttpClientStoreTests.cpp b/extensions/standard-processors/tests/unit/HttpClientStoreTests.cpp new file mode 100644 index 0000000000..0a456cbdb3 --- /dev/null +++ b/extensions/standard-processors/tests/unit/HttpClientStoreTests.cpp @@ -0,0 +1,223 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include "unit/TestBase.h" +#include "unit/Catch.h" +#include "InvokeHTTP.h" +#include "http/BaseHTTPClient.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::test { + +class HttpClientStoreTestAccessor { + public: + static const std::vector, bool>>& getClients(minifi::processors::invoke_http::HttpClientStore& store) { + return store.clients_; + } + + static size_t getMaxSize(minifi::processors::invoke_http::HttpClientStore& store) { + return store.max_size_; + } + + static size_t getClientsCreated(minifi::processors::invoke_http::HttpClientStore& store) { + return store.clients_created_; + } +}; + +TEST_CASE("HttpClientStore can create new client for a url and is returned after it's not used anymore") { + minifi::processors::invoke_http::HttpClientStore store(2, [](const std::string& url) { + auto client = std::make_unique(); + client->initialize(minifi::http::HttpRequestMethod::GET, url, nullptr); + return gsl::make_not_null(std::move(client)); + }); + REQUIRE(HttpClientStoreTestAccessor::getMaxSize(store) == 2); + REQUIRE(HttpClientStoreTestAccessor::getClientsCreated(store) == 0); + { + [[maybe_unused]] auto client = store.getClient("http://localhost:8080"); + const auto& clients = HttpClientStoreTestAccessor::getClients(store); + REQUIRE(HttpClientStoreTestAccessor::getClientsCreated(store) == 1); + REQUIRE(clients[0].second == true); + } + + const auto& clients = HttpClientStoreTestAccessor::getClients(store); + REQUIRE(HttpClientStoreTestAccessor::getClientsCreated(store) == 1); + REQUIRE(clients[0].second == false); +} + +TEST_CASE("A http client can be reused") { + minifi::processors::invoke_http::HttpClientStore store(2, [](const std::string& url) { + auto client = std::make_unique(); + client->initialize(minifi::http::HttpRequestMethod::GET, url, nullptr); + return gsl::make_not_null(std::move(client)); + }); + minifi::http::HTTPClient* client_ptr = nullptr; + auto& clients = HttpClientStoreTestAccessor::getClients(store); + { + auto client = store.getClient("http://localhost:8080"); + client_ptr = &client.get(); + } + + { + auto client = store.getClient("http://localhost:8080"); + REQUIRE(&client.get() == client_ptr); + REQUIRE(HttpClientStoreTestAccessor::getClientsCreated(store) == 1); + REQUIRE(clients[0].second); + } + REQUIRE_FALSE(clients[0].second); +} + +TEST_CASE("A new url always creates a new client") { + minifi::processors::invoke_http::HttpClientStore store(3, [](const std::string& url) { + auto client = std::make_unique(); + client->initialize(minifi::http::HttpRequestMethod::GET, url, nullptr); + return gsl::make_not_null(std::move(client)); + }); + auto& clients = HttpClientStoreTestAccessor::getClients(store); + { + [[maybe_unused]] auto client1 = store.getClient("http://localhost:8080"); + [[maybe_unused]] auto client2 = store.getClient("http://localhost:8081"); + [[maybe_unused]] auto client3 = store.getClient("http://localhost:8082"); + REQUIRE(HttpClientStoreTestAccessor::getClientsCreated(store) == 3); + REQUIRE(clients[0].first->getURL() == "http://localhost:8080"); + REQUIRE(clients[1].first->getURL() == "http://localhost:8081"); + REQUIRE(clients[2].first->getURL() == "http://localhost:8082"); + } + REQUIRE(clients[0].first->getURL() == "http://localhost:8082"); + REQUIRE(clients[1].first->getURL() == "http://localhost:8081"); + REQUIRE(clients[2].first->getURL() == "http://localhost:8080"); +} + +TEST_CASE("If a store is full, the first unused client is replaced by the newly requested one") { + minifi::processors::invoke_http::HttpClientStore store(3, [](const std::string& url) { + auto client = std::make_unique(); + client->initialize(minifi::http::HttpRequestMethod::GET, url, nullptr); + return gsl::make_not_null(std::move(client)); + }); + auto& clients = HttpClientStoreTestAccessor::getClients(store); + { + [[maybe_unused]] auto client1 = store.getClient("http://localhost:8080"); + { + [[maybe_unused]] auto client2 = store.getClient("http://localhost:8081"); + } + [[maybe_unused]] auto client3 = store.getClient("http://localhost:8082"); + REQUIRE(HttpClientStoreTestAccessor::getClientsCreated(store) == 3); + REQUIRE(clients[0].first->getURL() == "http://localhost:8081"); + REQUIRE_FALSE(clients[0].second); + REQUIRE(clients[1].first->getURL() == "http://localhost:8080"); + REQUIRE(clients[1].second); + REQUIRE(clients[2].first->getURL() == "http://localhost:8082"); + REQUIRE(clients[2].second); + + [[maybe_unused]] auto client4 = store.getClient("http://localhost:8083"); + REQUIRE(clients[0].first->getURL() == "http://localhost:8080"); + REQUIRE(clients[0].second); + REQUIRE(clients[1].first->getURL() == "http://localhost:8082"); + REQUIRE(clients[1].second); + REQUIRE(clients[2].first->getURL() == "http://localhost:8083"); + REQUIRE(clients[2].second); + } + REQUIRE(clients[0].first->getURL() == "http://localhost:8083"); + REQUIRE_FALSE(clients[0].second); + REQUIRE(clients[1].first->getURL() == "http://localhost:8082"); + REQUIRE_FALSE(clients[1].second); + REQUIRE(clients[2].first->getURL() == "http://localhost:8080"); + REQUIRE_FALSE(clients[2].second); +} + +TEST_CASE("Multiple unused clients are present the oldest one is replaced") { + minifi::processors::invoke_http::HttpClientStore store(4, [](const std::string& url) { + auto client = std::make_unique(); + client->initialize(minifi::http::HttpRequestMethod::GET, url, nullptr); + return gsl::make_not_null(std::move(client)); + }); + auto& clients = HttpClientStoreTestAccessor::getClients(store); + { + [[maybe_unused]] auto client1 = store.getClient("http://localhost:8080"); + { + [[maybe_unused]] auto client2 = store.getClient("http://localhost:8081"); + [[maybe_unused]] auto client3 = store.getClient("http://localhost:8082"); + } + [[maybe_unused]] auto client4 = store.getClient("http://localhost:8083"); + REQUIRE(HttpClientStoreTestAccessor::getClientsCreated(store) == 4); + REQUIRE(clients[0].first->getURL() == "http://localhost:8082"); + REQUIRE_FALSE(clients[0].second); + REQUIRE(clients[1].first->getURL() == "http://localhost:8081"); + REQUIRE_FALSE(clients[1].second); + REQUIRE(clients[2].first->getURL() == "http://localhost:8080"); + REQUIRE(clients[2].second); + REQUIRE(clients[3].first->getURL() == "http://localhost:8083"); + REQUIRE(clients[3].second); + + [[maybe_unused]] auto client5 = store.getClient("http://localhost:8084"); + REQUIRE(clients[0].first->getURL() == "http://localhost:8081"); + REQUIRE_FALSE(clients[0].second); + REQUIRE(clients[1].first->getURL() == "http://localhost:8080"); + REQUIRE(clients[1].second); + REQUIRE(clients[2].first->getURL() == "http://localhost:8083"); + REQUIRE(clients[2].second); + REQUIRE(clients[3].first->getURL() == "http://localhost:8084"); + REQUIRE(clients[3].second); + } + REQUIRE(clients[0].first->getURL() == "http://localhost:8081"); + REQUIRE_FALSE(clients[0].second); + REQUIRE(clients[1].first->getURL() == "http://localhost:8084"); + REQUIRE_FALSE(clients[1].second); + REQUIRE(clients[2].first->getURL() == "http://localhost:8083"); + REQUIRE_FALSE(clients[2].second); + REQUIRE(clients[3].first->getURL() == "http://localhost:8080"); + REQUIRE_FALSE(clients[3].second); +} + +TEST_CASE("If all clients are in use, the call will block until a client is returned") { + minifi::processors::invoke_http::HttpClientStore store(2, [](const std::string& url) { + auto client = std::make_unique(); + client->initialize(minifi::http::HttpRequestMethod::GET, url, nullptr); + return gsl::make_not_null(std::move(client)); + }); + bool client2_created{false}; + std::mutex mutex; + std::condition_variable client2_created_cv; + [[maybe_unused]] auto client1 = store.getClient("http://localhost:8080"); + + std::thread thread1([&store, &mutex, &client2_created, &client2_created_cv] { + std::unique_lock lock(mutex); + [[maybe_unused]] auto client2 = store.getClient("http://localhost:8081"); + client2_created = true; + lock.unlock(); + client2_created_cv.notify_one(); + std::this_thread::sleep_for(300ms); + }); + + std::thread thread2([&store, &mutex, &client2_created, &client2_created_cv] { + std::unique_lock lock(mutex); + client2_created_cv.wait(lock, [&client2_created] { return client2_created; }); + [[maybe_unused]] auto client3 = store.getClient("http://localhost:8082"); + auto& clients = HttpClientStoreTestAccessor::getClients(store); + REQUIRE(clients[0].first->getURL() == "http://localhost:8080"); + REQUIRE(clients[0].second); + REQUIRE(clients[1].first->getURL() == "http://localhost:8082"); + REQUIRE(clients[1].second); + }); + + thread1.join(); + thread2.join(); +} + +} // namespace org::apache::nifi::minifi::test From 028d6b5e699dca283b7b0d706bacbc79887dcb8a Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Mon, 6 Jan 2025 14:26:30 +0100 Subject: [PATCH 2/3] Review update --- .../processors/InvokeHTTP.cpp | 51 +++---- .../processors/InvokeHTTP.h | 5 +- .../tests/unit/HttpClientStoreTests.cpp | 135 +++++++++--------- 3 files changed, 87 insertions(+), 104 deletions(-) diff --git a/extensions/standard-processors/processors/InvokeHTTP.cpp b/extensions/standard-processors/processors/InvokeHTTP.cpp index 8679615f88..31c0ab5a98 100644 --- a/extensions/standard-processors/processors/InvokeHTTP.cpp +++ b/extensions/standard-processors/processors/InvokeHTTP.cpp @@ -37,56 +37,45 @@ namespace org::apache::nifi::minifi::processors { namespace invoke_http { -namespace { -template -void rotateRight(ForwardIt first, ForwardIt last, size_t n) { - if (first == last) return; - auto size = std::distance(first, last); - n = n % size; - std::rotate(first, std::prev(last, n), last); -} -} // namespace - HttpClientStore::HttpClientWrapper HttpClientStore::getClient(const std::string& url) { std::unique_lock lock(clients_mutex_); - for (size_t i = 0; i < clients_created_; ++i) { - gsl_Assert(clients_[i].first); - auto& [client, in_use] = clients_[i]; + for (auto it = clients_.begin(); it != clients_.end(); ++it) { + gsl_Assert(it->first); + auto& [client, in_use] = *it; if (client->getURL() == url && !in_use) { in_use = true; - std::rotate(clients_.begin() + gsl::narrow(i), clients_.begin() + gsl::narrow(i + 1), clients_.begin() + gsl::narrow(clients_created_)); + clients_.splice(clients_.end(), clients_, it); return {*this, *client}; } } - if (clients_created_ < max_size_) { - gsl_Assert(!clients_[clients_created_].first); + if (clients_.size() < max_size_) { auto client = create_client_function_(url); - clients_[clients_created_] = std::make_pair(std::move(client), true); - ++clients_created_; - return {*this, *clients_[clients_created_ - 1].first}; + clients_.push_back(std::make_pair(std::move(client), true)); + return {*this, *clients_.back().first}; } else { - cv_.wait(lock, [this] { return !clients_[0].second; }); - gsl_Assert(!clients_[0].second); + cv_.wait(lock, [this] { return !clients_.front().second; }); + gsl_Assert(!clients_.front().second); auto client = create_client_function_(url); - clients_[0] = std::make_pair(std::move(client), true); - std::rotate(clients_.begin(), clients_.begin() + 1, clients_.begin() + gsl::narrow(clients_created_)); - return {*this, *clients_[clients_created_ - 1].first}; + clients_.front() = std::make_pair(std::move(client), true); + clients_.splice(clients_.end(), clients_, clients_.begin()); + return {*this, *clients_.back().first}; } } void HttpClientStore::returnClient(minifi::http::HTTPClient& client) { std::unique_lock lock(clients_mutex_); - int64_t last_unused = -1; - for (size_t i = 0; i < clients_created_; ++i) { - if (clients_[i].first.get() != &client) { - if (!clients_[i].second) { - last_unused = gsl::narrow(i); + auto last_unused = clients_.end(); + for (auto it = clients_.begin(); it != clients_.end(); ++it) { + if (it->first.get() != &client) { + if (!it->second) { + last_unused = it; } continue; } - clients_[i].second = false; - rotateRight(clients_.begin() + last_unused + 1, clients_.begin() + gsl::narrow(i) + 1, 1); + it->second = false; + auto insert_point = last_unused == clients_.end() ? clients_.begin() : std::next(last_unused, 1); + clients_.splice(insert_point, clients_, it); lock.unlock(); cv_.notify_one(); return; diff --git a/extensions/standard-processors/processors/InvokeHTTP.h b/extensions/standard-processors/processors/InvokeHTTP.h index a1636dd1a8..ad565da058 100644 --- a/extensions/standard-processors/processors/InvokeHTTP.h +++ b/extensions/standard-processors/processors/InvokeHTTP.h @@ -24,6 +24,7 @@ #include #include #include +#include #include "Core.h" #include "FlowFileRecord.h" @@ -54,7 +55,6 @@ class HttpClientStore { HttpClientStore(const size_t size, std::function>(const std::string&)> create_client_function) : max_size_(size), create_client_function_(std::move(create_client_function)) { - clients_.resize(size); } HttpClientStore(const HttpClientStore&) = delete; HttpClientStore& operator=(const HttpClientStore&) = delete; @@ -90,8 +90,7 @@ class HttpClientStore { std::mutex clients_mutex_; std::condition_variable cv_; const size_t max_size_; - size_t clients_created_{0}; - std::vector, bool>> clients_; + std::list, bool>> clients_; std::function>(const std::string&)> create_client_function_; std::shared_ptr logger_{core::logging::LoggerFactory::getLogger()}; }; diff --git a/extensions/standard-processors/tests/unit/HttpClientStoreTests.cpp b/extensions/standard-processors/tests/unit/HttpClientStoreTests.cpp index 0a456cbdb3..a09b0d082d 100644 --- a/extensions/standard-processors/tests/unit/HttpClientStoreTests.cpp +++ b/extensions/standard-processors/tests/unit/HttpClientStoreTests.cpp @@ -28,17 +28,13 @@ namespace org::apache::nifi::minifi::test { class HttpClientStoreTestAccessor { public: - static const std::vector, bool>>& getClients(minifi::processors::invoke_http::HttpClientStore& store) { + static const std::list, bool>>& getClients(minifi::processors::invoke_http::HttpClientStore& store) { return store.clients_; } static size_t getMaxSize(minifi::processors::invoke_http::HttpClientStore& store) { return store.max_size_; } - - static size_t getClientsCreated(minifi::processors::invoke_http::HttpClientStore& store) { - return store.clients_created_; - } }; TEST_CASE("HttpClientStore can create new client for a url and is returned after it's not used anymore") { @@ -48,17 +44,16 @@ TEST_CASE("HttpClientStore can create new client for a url and is returned after return gsl::make_not_null(std::move(client)); }); REQUIRE(HttpClientStoreTestAccessor::getMaxSize(store) == 2); - REQUIRE(HttpClientStoreTestAccessor::getClientsCreated(store) == 0); + const auto& clients = HttpClientStoreTestAccessor::getClients(store); + REQUIRE(clients.size() == 0); { [[maybe_unused]] auto client = store.getClient("http://localhost:8080"); - const auto& clients = HttpClientStoreTestAccessor::getClients(store); - REQUIRE(HttpClientStoreTestAccessor::getClientsCreated(store) == 1); - REQUIRE(clients[0].second == true); + REQUIRE(clients.size() == 1); + REQUIRE(clients.front().second == true); } - const auto& clients = HttpClientStoreTestAccessor::getClients(store); - REQUIRE(HttpClientStoreTestAccessor::getClientsCreated(store) == 1); - REQUIRE(clients[0].second == false); + REQUIRE(clients.size() == 1); + REQUIRE(clients.front().second == false); } TEST_CASE("A http client can be reused") { @@ -77,10 +72,10 @@ TEST_CASE("A http client can be reused") { { auto client = store.getClient("http://localhost:8080"); REQUIRE(&client.get() == client_ptr); - REQUIRE(HttpClientStoreTestAccessor::getClientsCreated(store) == 1); - REQUIRE(clients[0].second); + REQUIRE(clients.size() == 1); + REQUIRE(clients.front().second); } - REQUIRE_FALSE(clients[0].second); + REQUIRE_FALSE(clients.front().second); } TEST_CASE("A new url always creates a new client") { @@ -94,14 +89,14 @@ TEST_CASE("A new url always creates a new client") { [[maybe_unused]] auto client1 = store.getClient("http://localhost:8080"); [[maybe_unused]] auto client2 = store.getClient("http://localhost:8081"); [[maybe_unused]] auto client3 = store.getClient("http://localhost:8082"); - REQUIRE(HttpClientStoreTestAccessor::getClientsCreated(store) == 3); - REQUIRE(clients[0].first->getURL() == "http://localhost:8080"); - REQUIRE(clients[1].first->getURL() == "http://localhost:8081"); - REQUIRE(clients[2].first->getURL() == "http://localhost:8082"); + REQUIRE(clients.size() == 3); + CHECK(clients.front().first->getURL() == "http://localhost:8080"); + CHECK(std::next(clients.begin(), 1)->first->getURL() == "http://localhost:8081"); + CHECK(std::next(clients.begin(), 2)->first->getURL() == "http://localhost:8082"); } - REQUIRE(clients[0].first->getURL() == "http://localhost:8082"); - REQUIRE(clients[1].first->getURL() == "http://localhost:8081"); - REQUIRE(clients[2].first->getURL() == "http://localhost:8080"); + CHECK(clients.front().first->getURL() == "http://localhost:8082"); + CHECK(std::next(clients.begin(), 1)->first->getURL() == "http://localhost:8081"); + CHECK(std::next(clients.begin(), 2)->first->getURL() == "http://localhost:8080"); } TEST_CASE("If a store is full, the first unused client is replaced by the newly requested one") { @@ -117,28 +112,28 @@ TEST_CASE("If a store is full, the first unused client is replaced by the newly [[maybe_unused]] auto client2 = store.getClient("http://localhost:8081"); } [[maybe_unused]] auto client3 = store.getClient("http://localhost:8082"); - REQUIRE(HttpClientStoreTestAccessor::getClientsCreated(store) == 3); - REQUIRE(clients[0].first->getURL() == "http://localhost:8081"); - REQUIRE_FALSE(clients[0].second); - REQUIRE(clients[1].first->getURL() == "http://localhost:8080"); - REQUIRE(clients[1].second); - REQUIRE(clients[2].first->getURL() == "http://localhost:8082"); - REQUIRE(clients[2].second); + REQUIRE(clients.size() == 3); + CHECK(clients.front().first->getURL() == "http://localhost:8081"); + CHECK_FALSE(clients.front().second); + CHECK(std::next(clients.begin(), 1)->first->getURL() == "http://localhost:8080"); + CHECK(std::next(clients.begin(), 1)->second); + CHECK(std::next(clients.begin(), 2)->first->getURL() == "http://localhost:8082"); + CHECK(std::next(clients.begin(), 2)->second); [[maybe_unused]] auto client4 = store.getClient("http://localhost:8083"); - REQUIRE(clients[0].first->getURL() == "http://localhost:8080"); - REQUIRE(clients[0].second); - REQUIRE(clients[1].first->getURL() == "http://localhost:8082"); - REQUIRE(clients[1].second); - REQUIRE(clients[2].first->getURL() == "http://localhost:8083"); - REQUIRE(clients[2].second); + CHECK(clients.front().first->getURL() == "http://localhost:8080"); + CHECK(clients.front().second); + CHECK(std::next(clients.begin(), 1)->first->getURL() == "http://localhost:8082"); + CHECK(std::next(clients.begin(), 1)->second); + CHECK(std::next(clients.begin(), 2)->first->getURL() == "http://localhost:8083"); + CHECK(std::next(clients.begin(), 2)->second); } - REQUIRE(clients[0].first->getURL() == "http://localhost:8083"); - REQUIRE_FALSE(clients[0].second); - REQUIRE(clients[1].first->getURL() == "http://localhost:8082"); - REQUIRE_FALSE(clients[1].second); - REQUIRE(clients[2].first->getURL() == "http://localhost:8080"); - REQUIRE_FALSE(clients[2].second); + CHECK(clients.front().first->getURL() == "http://localhost:8083"); + CHECK_FALSE(clients.front().second); + CHECK(std::next(clients.begin(), 1)->first->getURL() == "http://localhost:8082"); + CHECK_FALSE(std::next(clients.begin(), 1)->second); + CHECK(std::next(clients.begin(), 2)->first->getURL() == "http://localhost:8080"); + CHECK_FALSE(std::next(clients.begin(), 2)->second); } TEST_CASE("Multiple unused clients are present the oldest one is replaced") { @@ -155,34 +150,34 @@ TEST_CASE("Multiple unused clients are present the oldest one is replaced") { [[maybe_unused]] auto client3 = store.getClient("http://localhost:8082"); } [[maybe_unused]] auto client4 = store.getClient("http://localhost:8083"); - REQUIRE(HttpClientStoreTestAccessor::getClientsCreated(store) == 4); - REQUIRE(clients[0].first->getURL() == "http://localhost:8082"); - REQUIRE_FALSE(clients[0].second); - REQUIRE(clients[1].first->getURL() == "http://localhost:8081"); - REQUIRE_FALSE(clients[1].second); - REQUIRE(clients[2].first->getURL() == "http://localhost:8080"); - REQUIRE(clients[2].second); - REQUIRE(clients[3].first->getURL() == "http://localhost:8083"); - REQUIRE(clients[3].second); + REQUIRE(clients.size() == 4); + CHECK(clients.front().first->getURL() == "http://localhost:8082"); + CHECK_FALSE(clients.front().second); + CHECK(std::next(clients.begin(), 1)->first->getURL() == "http://localhost:8081"); + CHECK_FALSE(std::next(clients.begin(), 1)->second); + CHECK(std::next(clients.begin(), 2)->first->getURL() == "http://localhost:8080"); + CHECK(std::next(clients.begin(), 2)->second); + CHECK(std::next(clients.begin(), 3)->first->getURL() == "http://localhost:8083"); + CHECK(std::next(clients.begin(), 3)->second); [[maybe_unused]] auto client5 = store.getClient("http://localhost:8084"); - REQUIRE(clients[0].first->getURL() == "http://localhost:8081"); - REQUIRE_FALSE(clients[0].second); - REQUIRE(clients[1].first->getURL() == "http://localhost:8080"); - REQUIRE(clients[1].second); - REQUIRE(clients[2].first->getURL() == "http://localhost:8083"); - REQUIRE(clients[2].second); - REQUIRE(clients[3].first->getURL() == "http://localhost:8084"); - REQUIRE(clients[3].second); + CHECK(clients.front().first->getURL() == "http://localhost:8081"); + CHECK_FALSE(clients.front().second); + CHECK(std::next(clients.begin(), 1)->first->getURL() == "http://localhost:8080"); + CHECK(std::next(clients.begin(), 1)->second); + CHECK(std::next(clients.begin(), 2)->first->getURL() == "http://localhost:8083"); + CHECK(std::next(clients.begin(), 2)->second); + CHECK(std::next(clients.begin(), 3)->first->getURL() == "http://localhost:8084"); + CHECK(std::next(clients.begin(), 3)->second); } - REQUIRE(clients[0].first->getURL() == "http://localhost:8081"); - REQUIRE_FALSE(clients[0].second); - REQUIRE(clients[1].first->getURL() == "http://localhost:8084"); - REQUIRE_FALSE(clients[1].second); - REQUIRE(clients[2].first->getURL() == "http://localhost:8083"); - REQUIRE_FALSE(clients[2].second); - REQUIRE(clients[3].first->getURL() == "http://localhost:8080"); - REQUIRE_FALSE(clients[3].second); + CHECK(clients.front().first->getURL() == "http://localhost:8081"); + CHECK_FALSE(clients.front().second); + CHECK(std::next(clients.begin(), 1)->first->getURL() == "http://localhost:8084"); + CHECK_FALSE(std::next(clients.begin(), 1)->second); + CHECK(std::next(clients.begin(), 2)->first->getURL() == "http://localhost:8083"); + CHECK_FALSE(std::next(clients.begin(), 2)->second); + CHECK(std::next(clients.begin(), 3)->first->getURL() == "http://localhost:8080"); + CHECK_FALSE(std::next(clients.begin(), 3)->second); } TEST_CASE("If all clients are in use, the call will block until a client is returned") { @@ -210,10 +205,10 @@ TEST_CASE("If all clients are in use, the call will block until a client is retu client2_created_cv.wait(lock, [&client2_created] { return client2_created; }); [[maybe_unused]] auto client3 = store.getClient("http://localhost:8082"); auto& clients = HttpClientStoreTestAccessor::getClients(store); - REQUIRE(clients[0].first->getURL() == "http://localhost:8080"); - REQUIRE(clients[0].second); - REQUIRE(clients[1].first->getURL() == "http://localhost:8082"); - REQUIRE(clients[1].second); + REQUIRE(clients.front().first->getURL() == "http://localhost:8080"); + REQUIRE(clients.front().second); + REQUIRE(std::next(clients.begin(), 1)->first->getURL() == "http://localhost:8082"); + REQUIRE(std::next(clients.begin(), 1)->second); }); thread1.join(); From 97387df21a6e2c7ec275f2cad7cfbbdb071a8d98 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Mon, 6 Jan 2025 17:03:36 +0100 Subject: [PATCH 3/3] Split used and unused clients into separate lists --- .../processors/InvokeHTTP.cpp | 38 ++--- .../processors/InvokeHTTP.h | 3 +- .../tests/unit/HttpClientStoreTests.cpp | 143 +++++++++--------- 3 files changed, 88 insertions(+), 96 deletions(-) diff --git a/extensions/standard-processors/processors/InvokeHTTP.cpp b/extensions/standard-processors/processors/InvokeHTTP.cpp index 31c0ab5a98..b9eeaf9618 100644 --- a/extensions/standard-processors/processors/InvokeHTTP.cpp +++ b/extensions/standard-processors/processors/InvokeHTTP.cpp @@ -39,43 +39,33 @@ namespace invoke_http { HttpClientStore::HttpClientWrapper HttpClientStore::getClient(const std::string& url) { std::unique_lock lock(clients_mutex_); - for (auto it = clients_.begin(); it != clients_.end(); ++it) { - gsl_Assert(it->first); - auto& [client, in_use] = *it; - if (client->getURL() == url && !in_use) { - in_use = true; - clients_.splice(clients_.end(), clients_, it); - return {*this, *client}; + for (auto it = unused_clients_.begin(); it != unused_clients_.end(); ++it) { + if ((*it)->getURL() == url) { + used_clients_.splice(used_clients_.end(), unused_clients_, it); + return {*this, **it}; } } - if (clients_.size() < max_size_) { + if (used_clients_.size() + unused_clients_.size() < max_size_) { auto client = create_client_function_(url); - clients_.push_back(std::make_pair(std::move(client), true)); - return {*this, *clients_.back().first}; + used_clients_.push_back(std::move(client)); + return {*this, *used_clients_.back()}; } else { - cv_.wait(lock, [this] { return !clients_.front().second; }); - gsl_Assert(!clients_.front().second); + cv_.wait(lock, [this] { return !unused_clients_.empty(); }); auto client = create_client_function_(url); - clients_.front() = std::make_pair(std::move(client), true); - clients_.splice(clients_.end(), clients_, clients_.begin()); - return {*this, *clients_.back().first}; + unused_clients_.front() = std::move(client); + used_clients_.splice(used_clients_.end(), unused_clients_, unused_clients_.begin()); + return {*this, *used_clients_.back()}; } } void HttpClientStore::returnClient(minifi::http::HTTPClient& client) { std::unique_lock lock(clients_mutex_); - auto last_unused = clients_.end(); - for (auto it = clients_.begin(); it != clients_.end(); ++it) { - if (it->first.get() != &client) { - if (!it->second) { - last_unused = it; - } + for (auto it = used_clients_.begin(); it != used_clients_.end(); ++it) { + if (it->get() != &client) { continue; } - it->second = false; - auto insert_point = last_unused == clients_.end() ? clients_.begin() : std::next(last_unused, 1); - clients_.splice(insert_point, clients_, it); + unused_clients_.splice(unused_clients_.end(), used_clients_, it); lock.unlock(); cv_.notify_one(); return; diff --git a/extensions/standard-processors/processors/InvokeHTTP.h b/extensions/standard-processors/processors/InvokeHTTP.h index ad565da058..bf482fd703 100644 --- a/extensions/standard-processors/processors/InvokeHTTP.h +++ b/extensions/standard-processors/processors/InvokeHTTP.h @@ -90,7 +90,8 @@ class HttpClientStore { std::mutex clients_mutex_; std::condition_variable cv_; const size_t max_size_; - std::list, bool>> clients_; + std::list>> used_clients_; + std::list>> unused_clients_; std::function>(const std::string&)> create_client_function_; std::shared_ptr logger_{core::logging::LoggerFactory::getLogger()}; }; diff --git a/extensions/standard-processors/tests/unit/HttpClientStoreTests.cpp b/extensions/standard-processors/tests/unit/HttpClientStoreTests.cpp index a09b0d082d..c3bc7f9851 100644 --- a/extensions/standard-processors/tests/unit/HttpClientStoreTests.cpp +++ b/extensions/standard-processors/tests/unit/HttpClientStoreTests.cpp @@ -28,8 +28,12 @@ namespace org::apache::nifi::minifi::test { class HttpClientStoreTestAccessor { public: - static const std::list, bool>>& getClients(minifi::processors::invoke_http::HttpClientStore& store) { - return store.clients_; + static const std::list>>& getUsedClients(minifi::processors::invoke_http::HttpClientStore& store) { + return store.used_clients_; + } + + static const std::list>>& getUnusedClients(minifi::processors::invoke_http::HttpClientStore& store) { + return store.unused_clients_; } static size_t getMaxSize(minifi::processors::invoke_http::HttpClientStore& store) { @@ -44,16 +48,18 @@ TEST_CASE("HttpClientStore can create new client for a url and is returned after return gsl::make_not_null(std::move(client)); }); REQUIRE(HttpClientStoreTestAccessor::getMaxSize(store) == 2); - const auto& clients = HttpClientStoreTestAccessor::getClients(store); - REQUIRE(clients.size() == 0); + const auto& used_clients = HttpClientStoreTestAccessor::getUsedClients(store); + const auto& unused_clients = HttpClientStoreTestAccessor::getUnusedClients(store); + REQUIRE(used_clients.empty()); + REQUIRE(unused_clients.empty()); { [[maybe_unused]] auto client = store.getClient("http://localhost:8080"); - REQUIRE(clients.size() == 1); - REQUIRE(clients.front().second == true); + REQUIRE(used_clients.size() == 1); + REQUIRE(unused_clients.empty()); } - REQUIRE(clients.size() == 1); - REQUIRE(clients.front().second == false); + REQUIRE(used_clients.empty()); + REQUIRE(unused_clients.size() == 1); } TEST_CASE("A http client can be reused") { @@ -63,7 +69,8 @@ TEST_CASE("A http client can be reused") { return gsl::make_not_null(std::move(client)); }); minifi::http::HTTPClient* client_ptr = nullptr; - auto& clients = HttpClientStoreTestAccessor::getClients(store); + const auto& used_clients = HttpClientStoreTestAccessor::getUsedClients(store); + const auto& unused_clients = HttpClientStoreTestAccessor::getUnusedClients(store); { auto client = store.getClient("http://localhost:8080"); client_ptr = &client.get(); @@ -72,10 +79,11 @@ TEST_CASE("A http client can be reused") { { auto client = store.getClient("http://localhost:8080"); REQUIRE(&client.get() == client_ptr); - REQUIRE(clients.size() == 1); - REQUIRE(clients.front().second); + REQUIRE(used_clients.size() == 1); + REQUIRE(unused_clients.empty()); } - REQUIRE_FALSE(clients.front().second); + REQUIRE(used_clients.empty()); + REQUIRE(unused_clients.size() == 1); } TEST_CASE("A new url always creates a new client") { @@ -84,19 +92,23 @@ TEST_CASE("A new url always creates a new client") { client->initialize(minifi::http::HttpRequestMethod::GET, url, nullptr); return gsl::make_not_null(std::move(client)); }); - auto& clients = HttpClientStoreTestAccessor::getClients(store); + const auto& used_clients = HttpClientStoreTestAccessor::getUsedClients(store); + const auto& unused_clients = HttpClientStoreTestAccessor::getUnusedClients(store); { [[maybe_unused]] auto client1 = store.getClient("http://localhost:8080"); [[maybe_unused]] auto client2 = store.getClient("http://localhost:8081"); [[maybe_unused]] auto client3 = store.getClient("http://localhost:8082"); - REQUIRE(clients.size() == 3); - CHECK(clients.front().first->getURL() == "http://localhost:8080"); - CHECK(std::next(clients.begin(), 1)->first->getURL() == "http://localhost:8081"); - CHECK(std::next(clients.begin(), 2)->first->getURL() == "http://localhost:8082"); + REQUIRE(used_clients.size() == 3); + REQUIRE(unused_clients.empty()); + CHECK(used_clients.front()->getURL() == "http://localhost:8080"); + CHECK((*std::next(used_clients.begin(), 1))->getURL() == "http://localhost:8081"); + CHECK((*std::next(used_clients.begin(), 2))->getURL() == "http://localhost:8082"); } - CHECK(clients.front().first->getURL() == "http://localhost:8082"); - CHECK(std::next(clients.begin(), 1)->first->getURL() == "http://localhost:8081"); - CHECK(std::next(clients.begin(), 2)->first->getURL() == "http://localhost:8080"); + REQUIRE(used_clients.empty()); + REQUIRE(unused_clients.size() == 3); + CHECK(unused_clients.front()->getURL() == "http://localhost:8082"); + CHECK((*std::next(unused_clients.begin(), 1))->getURL() == "http://localhost:8081"); + CHECK((*std::next(unused_clients.begin(), 2))->getURL() == "http://localhost:8080"); } TEST_CASE("If a store is full, the first unused client is replaced by the newly requested one") { @@ -105,35 +117,32 @@ TEST_CASE("If a store is full, the first unused client is replaced by the newly client->initialize(minifi::http::HttpRequestMethod::GET, url, nullptr); return gsl::make_not_null(std::move(client)); }); - auto& clients = HttpClientStoreTestAccessor::getClients(store); + const auto& used_clients = HttpClientStoreTestAccessor::getUsedClients(store); + const auto& unused_clients = HttpClientStoreTestAccessor::getUnusedClients(store); { [[maybe_unused]] auto client1 = store.getClient("http://localhost:8080"); { [[maybe_unused]] auto client2 = store.getClient("http://localhost:8081"); } [[maybe_unused]] auto client3 = store.getClient("http://localhost:8082"); - REQUIRE(clients.size() == 3); - CHECK(clients.front().first->getURL() == "http://localhost:8081"); - CHECK_FALSE(clients.front().second); - CHECK(std::next(clients.begin(), 1)->first->getURL() == "http://localhost:8080"); - CHECK(std::next(clients.begin(), 1)->second); - CHECK(std::next(clients.begin(), 2)->first->getURL() == "http://localhost:8082"); - CHECK(std::next(clients.begin(), 2)->second); + REQUIRE(used_clients.size() == 2); + REQUIRE(unused_clients.size() == 1); + CHECK(unused_clients.front()->getURL() == "http://localhost:8081"); + CHECK(used_clients.front()->getURL() == "http://localhost:8080"); + CHECK((*std::next(used_clients.begin(), 1))->getURL() == "http://localhost:8082"); [[maybe_unused]] auto client4 = store.getClient("http://localhost:8083"); - CHECK(clients.front().first->getURL() == "http://localhost:8080"); - CHECK(clients.front().second); - CHECK(std::next(clients.begin(), 1)->first->getURL() == "http://localhost:8082"); - CHECK(std::next(clients.begin(), 1)->second); - CHECK(std::next(clients.begin(), 2)->first->getURL() == "http://localhost:8083"); - CHECK(std::next(clients.begin(), 2)->second); + REQUIRE(used_clients.size() == 3); + REQUIRE(unused_clients.empty()); + CHECK(used_clients.front()->getURL() == "http://localhost:8080"); + CHECK((*std::next(used_clients.begin(), 1))->getURL() == "http://localhost:8082"); + CHECK((*std::next(used_clients.begin(), 2))->getURL() == "http://localhost:8083"); } - CHECK(clients.front().first->getURL() == "http://localhost:8083"); - CHECK_FALSE(clients.front().second); - CHECK(std::next(clients.begin(), 1)->first->getURL() == "http://localhost:8082"); - CHECK_FALSE(std::next(clients.begin(), 1)->second); - CHECK(std::next(clients.begin(), 2)->first->getURL() == "http://localhost:8080"); - CHECK_FALSE(std::next(clients.begin(), 2)->second); + REQUIRE(used_clients.empty()); + REQUIRE(unused_clients.size() == 3); + CHECK(unused_clients.front()->getURL() == "http://localhost:8083"); + CHECK((*std::next(unused_clients.begin(), 1))->getURL() == "http://localhost:8082"); + CHECK((*std::next(unused_clients.begin(), 2))->getURL() == "http://localhost:8080"); } TEST_CASE("Multiple unused clients are present the oldest one is replaced") { @@ -142,7 +151,8 @@ TEST_CASE("Multiple unused clients are present the oldest one is replaced") { client->initialize(minifi::http::HttpRequestMethod::GET, url, nullptr); return gsl::make_not_null(std::move(client)); }); - auto& clients = HttpClientStoreTestAccessor::getClients(store); + const auto& used_clients = HttpClientStoreTestAccessor::getUsedClients(store); + const auto& unused_clients = HttpClientStoreTestAccessor::getUnusedClients(store); { [[maybe_unused]] auto client1 = store.getClient("http://localhost:8080"); { @@ -150,34 +160,26 @@ TEST_CASE("Multiple unused clients are present the oldest one is replaced") { [[maybe_unused]] auto client3 = store.getClient("http://localhost:8082"); } [[maybe_unused]] auto client4 = store.getClient("http://localhost:8083"); - REQUIRE(clients.size() == 4); - CHECK(clients.front().first->getURL() == "http://localhost:8082"); - CHECK_FALSE(clients.front().second); - CHECK(std::next(clients.begin(), 1)->first->getURL() == "http://localhost:8081"); - CHECK_FALSE(std::next(clients.begin(), 1)->second); - CHECK(std::next(clients.begin(), 2)->first->getURL() == "http://localhost:8080"); - CHECK(std::next(clients.begin(), 2)->second); - CHECK(std::next(clients.begin(), 3)->first->getURL() == "http://localhost:8083"); - CHECK(std::next(clients.begin(), 3)->second); + REQUIRE(used_clients.size() == 2); + REQUIRE(unused_clients.size() == 2); + CHECK(unused_clients.front()->getURL() == "http://localhost:8082"); + CHECK((*std::next(unused_clients.begin(), 1))->getURL() == "http://localhost:8081"); + CHECK(used_clients.front()->getURL() == "http://localhost:8080"); + CHECK((*std::next(used_clients.begin(), 1))->getURL() == "http://localhost:8083"); [[maybe_unused]] auto client5 = store.getClient("http://localhost:8084"); - CHECK(clients.front().first->getURL() == "http://localhost:8081"); - CHECK_FALSE(clients.front().second); - CHECK(std::next(clients.begin(), 1)->first->getURL() == "http://localhost:8080"); - CHECK(std::next(clients.begin(), 1)->second); - CHECK(std::next(clients.begin(), 2)->first->getURL() == "http://localhost:8083"); - CHECK(std::next(clients.begin(), 2)->second); - CHECK(std::next(clients.begin(), 3)->first->getURL() == "http://localhost:8084"); - CHECK(std::next(clients.begin(), 3)->second); + CHECK(unused_clients.front()->getURL() == "http://localhost:8081"); + + CHECK(used_clients.front()->getURL() == "http://localhost:8080"); + CHECK((*std::next(used_clients.begin(), 1))->getURL() == "http://localhost:8083"); + CHECK((*std::next(used_clients.begin(), 2))->getURL() == "http://localhost:8084"); } - CHECK(clients.front().first->getURL() == "http://localhost:8081"); - CHECK_FALSE(clients.front().second); - CHECK(std::next(clients.begin(), 1)->first->getURL() == "http://localhost:8084"); - CHECK_FALSE(std::next(clients.begin(), 1)->second); - CHECK(std::next(clients.begin(), 2)->first->getURL() == "http://localhost:8083"); - CHECK_FALSE(std::next(clients.begin(), 2)->second); - CHECK(std::next(clients.begin(), 3)->first->getURL() == "http://localhost:8080"); - CHECK_FALSE(std::next(clients.begin(), 3)->second); + REQUIRE(used_clients.empty()); + REQUIRE(unused_clients.size() == 4); + CHECK(unused_clients.front()->getURL() == "http://localhost:8081"); + CHECK((*std::next(unused_clients.begin(), 1))->getURL() == "http://localhost:8084"); + CHECK((*std::next(unused_clients.begin(), 2))->getURL() == "http://localhost:8083"); + CHECK((*std::next(unused_clients.begin(), 3))->getURL() == "http://localhost:8080"); } TEST_CASE("If all clients are in use, the call will block until a client is returned") { @@ -204,11 +206,10 @@ TEST_CASE("If all clients are in use, the call will block until a client is retu std::unique_lock lock(mutex); client2_created_cv.wait(lock, [&client2_created] { return client2_created; }); [[maybe_unused]] auto client3 = store.getClient("http://localhost:8082"); - auto& clients = HttpClientStoreTestAccessor::getClients(store); - REQUIRE(clients.front().first->getURL() == "http://localhost:8080"); - REQUIRE(clients.front().second); - REQUIRE(std::next(clients.begin(), 1)->first->getURL() == "http://localhost:8082"); - REQUIRE(std::next(clients.begin(), 1)->second); + auto& used_clients = HttpClientStoreTestAccessor::getUsedClients(store); + REQUIRE(used_clients.size() == 2); + CHECK(used_clients.front()->getURL() == "http://localhost:8080"); + CHECK((*std::next(used_clients.begin(), 1))->getURL() == "http://localhost:8082"); }); thread1.join();