diff --git a/CMakeLists.txt b/CMakeLists.txt index abe825f56a..de4085f862 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,10 +9,14 @@ option(WARNINGS_AS_ERRORS "Treat compiler warnings as errors" ON) option(BUILD_CURL_TRANSPORT "Build internal http transport implementation with CURL for HTTP Pipeline" OFF) option(BUILD_TESTING "Build test cases" OFF) option(BUILD_DOCUMENTATION "Create HTML based API documentation (requires Doxygen)" OFF) +option(RUN_LONG_UNIT_TESTS "Tests that takes more than 5 minutes to complete. No effect if BUILD_TESTING is OFF" OFF) if(BUILD_TESTING) # define a symbol that enables some test hooks in code add_compile_definitions(TESTING_BUILD) + if(RUN_LONG_UNIT_TESTS) + add_compile_definitions(RUN_LONG_UNIT_TESTS) + endif() endif() # VCPKG Integration diff --git a/eng/pipelines/templates/jobs/archetype-sdk-client.yml b/eng/pipelines/templates/jobs/archetype-sdk-client.yml index 3cc8698024..d1ce13e6ed 100644 --- a/eng/pipelines/templates/jobs/archetype-sdk-client.yml +++ b/eng/pipelines/templates/jobs/archetype-sdk-client.yml @@ -35,26 +35,26 @@ jobs: OSVmImage: 'ubuntu-18.04' VcpkgInstall: 'curl[ssl] libxml2 openssl' VCPKG_DEFAULT_TRIPLET: 'x64-linux' - CmakeArgs: ' -DBUILD_TESTING=ON' + CmakeArgs: ' -DBUILD_TESTING=ON -DRUN_LONG_UNIT_TESTS=ON' Win_x86_with_unit_test: OSVmImage: 'windows-2019' VcpkgInstall: 'curl[winssl] libxml2' VCPKG_DEFAULT_TRIPLET: 'x86-windows-static' CMAKE_GENERATOR: 'Visual Studio 16 2019' CMAKE_GENERATOR_PLATFORM: Win32 - CmakeArgs: ' -DBUILD_TESTING=ON' + CmakeArgs: ' -DBUILD_TESTING=ON -DRUN_LONG_UNIT_TESTS=ON' Win_x64_with_unit_test: OSVmImage: 'windows-2019' VcpkgInstall: 'curl[winssl] libxml2' VCPKG_DEFAULT_TRIPLET: 'x64-windows-static' CMAKE_GENERATOR: 'Visual Studio 16 2019' CMAKE_GENERATOR_PLATFORM: x64 - CmakeArgs: ' -DBUILD_TESTING=ON' + CmakeArgs: ' -DBUILD_TESTING=ON -DRUN_LONG_UNIT_TESTS=ON' MacOS_x64_with_unit_test: OSVmImage: 'macOS-10.14' VcpkgInstall: 'curl[ssl] libxml2 openssl' VCPKG_DEFAULT_TRIPLET: 'x64-osx' - CmakeArgs: ' -DBUILD_TESTING=ON' + CmakeArgs: ' -DBUILD_TESTING=ON -DRUN_LONG_UNIT_TESTS=ON' pool: vmImage: $(OSVmImage) variables: diff --git a/sdk/core/azure-core/inc/azure/core/http/curl/curl.hpp b/sdk/core/azure-core/inc/azure/core/http/curl/curl.hpp index 0f54205ae7..ec8ec49286 100644 --- a/sdk/core/azure-core/inc/azure/core/http/curl/curl.hpp +++ b/sdk/core/azure-core/inc/azure/core/http/curl/curl.hpp @@ -8,6 +8,7 @@ #include "azure/core/http/http.hpp" #include "azure/core/http/policy.hpp" +#include #include #include #include @@ -15,6 +16,12 @@ #include #include +// Define the class name that reads from ConnectionPool private members +namespace Azure { namespace Core { namespace Test { + class TransportAdapter_ConnectionPoolCleaner_Test; + class TransportAdapter_getMultiThread_Test; +}}} // namespace Azure::Core::Test + namespace Azure { namespace Core { namespace Http { namespace Details { @@ -26,38 +33,48 @@ namespace Azure { namespace Core { namespace Http { constexpr static const char* c_DefaultFailedToGetNewConnectionTemplate = "Fail to get a new connection for: "; constexpr static int c_DefaultMaxOpenNewConnectionIntentsAllowed = 10; + // 90 sec -> cleaner wait time before next clean routine + constexpr static int c_DefaultCleanerIntervalMilliseconds = 1000 * 90; + // 60 sec -> expired connection is when it waits for 60 sec or more and it's not re-used + constexpr static int c_DefaultConnectionExpiredMilliseconds = 1000 * 60; } // namespace Details - /** - * @brief Statefull component that controls sending an HTTP Request with libcurl thru the wire and - * parsing and building an HTTP RawResponse. - * This session supports the classic libcurl easy interface to send and receive bytes from network - * using callbacks. - * This session also supports working with the custom HTTP protocol option from libcurl to - * manually upload and download bytes using a network socket. This implementation is used when - * working with streams so customers can lazily pull data from netwok using an stream abstraction. - * - * @remarks This component is expected to be used by an HTTP Transporter to ensure that - * transporter to be re usuable in multiple pipelines while every call to network is unique. - */ - class CurlSession : public BodyStream { - // connection handle. It will be taken from a pool - class CurlConnection { - private: - CURL* m_handle; - std::string m_host; + class CurlConnection { + private: + CURL* m_handle; + std::string m_host; + std::chrono::steady_clock::time_point m_lastUseTime; - public: - CurlConnection(std::string const& host) : m_handle(curl_easy_init()), m_host(host) {} + public: + CurlConnection(std::string const& host) : m_handle(curl_easy_init()), m_host(host) {} - ~CurlConnection() { curl_easy_cleanup(this->m_handle); } + ~CurlConnection() { curl_easy_cleanup(this->m_handle); } - CURL* GetHandle() { return this->m_handle; } + CURL* GetHandle() { return this->m_handle; } - std::string GetHost() const { return this->m_host; } - }; + std::string GetHost() const { return this->m_host; } + + void updateLastUsageTime() { this->m_lastUseTime = std::chrono::steady_clock::now(); } + + bool isExpired() + { + auto connectionOnWaitingTimeMs = std::chrono::duration_cast( + std::chrono::steady_clock::now() - this->m_lastUseTime); + return connectionOnWaitingTimeMs.count() >= Details::c_DefaultConnectionExpiredMilliseconds; + } + }; + + struct CurlConnectionPool + { + // Give access to private to this tests class + friend class Azure::Core::Test::TransportAdapter_getMultiThread_Test; + friend class Azure::Core::Test::TransportAdapter_ConnectionPoolCleaner_Test; + + /* + * Mutex for accessing connection pool for thread-safe reading and writing + */ + static std::mutex s_connectionPoolMutex; - // TODO: Mutex for this code to access connectionPoolIndex /* * Keeps an unique key for each host and creates a connection pool for each key. * This way getting a connection for an specific host can be done in O(1) instead of looping a @@ -66,13 +83,67 @@ namespace Azure { namespace Core { namespace Http { * There might be multiple connections for each host. */ static std::map>> s_connectionPoolIndex; + + /* + * Finds a connection to be re-used from the connection pool. If there is not any available + * connection, a new connection is created. + */ static std::unique_ptr GetCurlConnection(Request& request); - static void MoveConnectionBackToPool(std::unique_ptr connection); - static std::mutex s_connectionPoolMutex; + + /** + * Moves a connection back to the pool to be re-used + */ + static void MoveConnectionBackToPool(std::unique_ptr connection); + + // Class can't have instances. + CurlConnectionPool() = delete; private: /** - * @brief Enum used by ResponseBufferParser to control the parsing internal state while building + * Review all connections in the pool and removes old connections that might be already + * expired and closed its connection on server side. + */ + static void CleanUp(); + + static int32_t s_connectionCounter; + static bool s_isCleanConnectionsRunning; + + // Makes possible to know the number of current connections in the connection pool for an + // index + static int64_t ConnectionsOnPool(std::string const& host) + { + auto& pool = CurlConnectionPool::s_connectionPoolIndex[host]; + return pool.size(); + }; + + // Makes possible to know the number indexes in the pool + static int64_t ConnectionsIndexOnPool() + { + return CurlConnectionPool::s_connectionPoolIndex.size(); + }; + }; + + /** + * @brief Stateful component that controls sending an HTTP Request with libcurl thru the wire + * and parsing and building an HTTP RawResponse. This session supports the classic libcurl easy + * interface to send and receive bytes from network using callbacks. This session also supports + * working with the custom HTTP protocol option from libcurl to manually upload and download + * bytes using a network socket. This implementation is used when working with streams so + * customers can lazily pull data from network using an stream abstraction. + * + * @remarks This component is expected to be used by an HTTP Transporter to ensure that + * transporter to be reusable in multiple pipelines while every call to network is unique. + */ + class CurlSession : public BodyStream { + private: + /* + * Static Connection pool for the application. Multiple CurlSessions will use the connection + * pool for getting a curl handle connection. + * + */ + + /* + * Enum used by ResponseBufferParser to control the parsing internal state while building * the HTTP RawResponse * */ @@ -87,13 +158,13 @@ namespace Azure { namespace Core { namespace Http { * @brief stateful component used to read and parse a buffer to construct a valid HTTP * RawResponse. * - * It uses an internal string as buffers to accumulate a response token (version, code, header, - * etc) until the next delimiter is found. Then it uses this string to keep building the HTTP - * RawResponse. + * It uses an internal string as buffers to accumulate a response token (version, code, + * header, etc) until the next delimiter is found. Then it uses this string to keep building + * the HTTP RawResponse. * - * @remark Only status line and headers are parsed and built. Body is ignored by this component. - * A libcurl session will use this component to build and return the HTTP RawResponse with a - * body stream to the pipeline. + * @remark Only status line and headers are parsed and built. Body is ignored by this + * component. A libcurl session will use this component to build and return the HTTP + * RawResponse with a body stream to the pipeline. */ class ResponseBufferParser { private: @@ -104,8 +175,8 @@ namespace Azure { namespace Core { namespace Http { ResponseParserState state; /** * @brief Unique ptr to a response. Parser will create an Initial-valid HTTP RawResponse and - * then it will append headers to it. This response is moved to a different owner once parsing - * is completed. + * then it will append headers to it. This response is moved to a different owner once + * parsing is completed. * */ std::unique_ptr m_response; @@ -124,16 +195,17 @@ namespace Azure { namespace Core { namespace Http { * the token for the HTTP RawResponse is taken from this internal sting if it contains data. * * @remark This buffer allows a libcurl session to use any size of buffer to read from a - * socket while constructing an initial valid HTTP RawResponse. No matter if the response from - * wire contains hundreds of headers, we can use only one fixed size buffer to parse it all. + * socket while constructing an initial valid HTTP RawResponse. No matter if the response + * from wire contains hundreds of headers, we can use only one fixed size buffer to parse it + * all. * */ std::string m_internalBuffer; /** - * @brief This method is invoked by the Parsing process if the internal state is set to status - * code. Function will get the status-line expected tokens until finding the end of status - * line delimiter. + * @brief This method is invoked by the Parsing process if the internal state is set to + * status code. Function will get the status-line expected tokens until finding the end of + * status line delimiter. * * @remark When the end of status line delimiter is found, this method will create the HTTP * RawResponse. The HTTP RawResponse is constructed by default with body type as Stream. @@ -151,8 +223,9 @@ namespace Azure { namespace Core { namespace Http { * * @param buffer Points to a memory address with all or some part of a HTTP header. * @param bufferSize Indicates the size of the buffer. - * @return Returns the index of the last parsed position from buffer. When the returned value - * is smaller than the body size, means there is part of the body response in the buffer. + * @return Returns the index of the last parsed position from buffer. When the returned + * value is smaller than the body size, means there is part of the body response in the + * buffer. */ int64_t BuildHeader(uint8_t const* const buffer, int64_t const bufferSize); @@ -170,17 +243,18 @@ namespace Azure { namespace Core { namespace Http { } // Parse contents of buffer to construct HttpResponse. Returns the index of the last parsed - // possition. Return bufferSize when all buffer was used to parse + // position. Return bufferSize when all buffer was used to parse /** - * @brief Parses the content of a buffer to constuct a valid HTTP RawResponse. This method is - * expected to be called over and over until it returns 0, indicating there is nothing more to - * parse to build the HTTP RawResponse. + * @brief Parses the content of a buffer to construct a valid HTTP RawResponse. This method + * is expected to be called over and over until it returns 0, indicating there is nothing + * more to parse to build the HTTP RawResponse. * - * @param buffer points to a memory area that contains, all or some part of an HTTP response. + * @param buffer points to a memory area that contains, all or some part of an HTTP + * response. * @param bufferSize Indicates the size of the buffer. * @return Returns the index of the last parsed position. Returning a 0 means nothing was - * parsed and it is likely that the HTTP RawResponse is completed. Returning the same value as - * the buffer size means all buffer was parsed and the HTTP might be completed or not. + * parsed and it is likely that the HTTP RawResponse is completed. Returning the same value + * as the buffer size means all buffer was parsed and the HTTP might be completed or not. * Returning a value smaller than the buffer size will likely indicate that the HTTP * RawResponse is completed and that the rest of the buffer contains part of the response * body. @@ -197,8 +271,8 @@ namespace Azure { namespace Core { namespace Http { /** * @brief Moves the internal response to a different owner. * - * @return Will move the response only if parsing is completed and if the HTTP RawResponse was - * not moved before. + * @return Will move the response only if parsing is completed and if the HTTP RawResponse + * was not moved before. */ std::unique_ptr GetResponse() { @@ -232,8 +306,8 @@ namespace Azure { namespace Core { namespace Http { Request& m_request; /** - * @brief Controls the progress of a body buffer upload when using libcurl callbacks. Woks like - * an offset to move the pointer to read the body from the HTTP Request on each callback. + * @brief Controls the progress of a body buffer upload when using libcurl callbacks. Woks + * like an offset to move the pointer to read the body from the HTTP Request on each callback. * */ int64_t m_uploadedBytes; @@ -257,9 +331,9 @@ namespace Azure { namespace Core { namespace Http { bool m_isChunkedResponseType; /** - * @brief This is a copy of the value of an HTTP response header `content-length`. The value is - * received as string and parsed to size_t. This field avoid parsing the string header everytime - * from HTTP RawResponse. + * @brief This is a copy of the value of an HTTP response header `content-length`. The value + * is received as string and parsed to size_t. This field avoid parsing the string header + * every time from HTTP RawResponse. * * @remark This value is also used to avoid trying to read more data from network than what we * are expecting to. @@ -268,8 +342,8 @@ namespace Azure { namespace Core { namespace Http { int64_t m_contentLength; /** - * @brief For chunked responses, this field knows the size of the current chuck size server will - * de sending + * @brief For chunked responses, this field knows the size of the current chuck size server + * will de sending * */ int64_t m_chunkSize; @@ -285,8 +359,8 @@ namespace Azure { namespace Core { namespace Http { uint8_t m_readBuffer[Details::c_DefaultLibcurlReaderSize]; // to work with libcurl custom read. /** - * @brief convenient function that indicates when the HTTP Request will need to upload a payload - * or not. + * @brief convenient function that indicates when the HTTP Request will need to upload a + * payload or not. * * @return true if the HTTP Request will need to upload bytes to wire. * @@ -328,12 +402,12 @@ namespace Azure { namespace Core { namespace Http { CURLcode SetReadRequest(); /** - * @brief Function used when working with Streams to manually write from the HTTP Request to the - * wire. + * @brief Function used when working with Streams to manually write from the HTTP Request to + * the wire. * * @return CURL_OK when response is sent successfully. */ - CURLcode HttpRawSend(Context const& context); + CURLcode SendRawHttp(Context const& context); CURLcode UploadBody(Context const& context); /** @@ -372,7 +446,7 @@ namespace Azure { namespace Core { namespace Http { * @return return the numbers of bytes pulled from socket. It can be less than what it was * requested. */ - int64_t ReadSocketToBuffer(uint8_t* buffer, int64_t bufferSize); + int64_t ReadFromSocket(uint8_t* buffer, int64_t bufferSize); bool IsEOF() { @@ -381,15 +455,6 @@ namespace Azure { namespace Core { namespace Http { } public: -#ifdef TESTING_BUILD - // Makes possible to know the number of current connections in the connection pool - static int64_t s_ConnectionsOnPool(std::string const& host) - { - auto& pool = s_connectionPoolIndex[host]; - return pool.size(); - }; -#endif - /** * @brief Construct a new Curl Session object. Init internal libcurl handler. * @@ -397,7 +462,7 @@ namespace Azure { namespace Core { namespace Http { */ CurlSession(Request& request) : m_request(request) { - this->m_connection = GetCurlConnection(this->m_request); + this->m_connection = CurlConnectionPool::GetCurlConnection(this->m_request); this->m_bodyStartInBuffer = -1; this->m_innerBufferSize = Details::c_DefaultLibcurlReaderSize; this->m_isChunkedResponseType = false; @@ -414,7 +479,7 @@ namespace Azure { namespace Core { namespace Http { // destructor to clean libcurl handle and close the connection. if (IsEOF()) { - MoveConnectionBackToPool(std::move(this->m_connection)); + CurlConnectionPool::MoveConnectionBackToPool(std::move(this->m_connection)); } } diff --git a/sdk/core/azure-core/src/http/curl/curl.cpp b/sdk/core/azure-core/src/http/curl/curl.cpp index e4f5a25b05..d4cc27e3a9 100644 --- a/sdk/core/azure-core/src/http/curl/curl.cpp +++ b/sdk/core/azure-core/src/http/curl/curl.cpp @@ -1,11 +1,12 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // SPDX-License-Identifier: MIT -#include "azure/core/azure.hpp" #include "azure/core/http/curl/curl.hpp" +#include "azure/core/azure.hpp" #include "azure/core/http/http.hpp" #include +#include using namespace Azure::Core::Http; @@ -92,7 +93,7 @@ CURLcode CurlSession::Perform(Context const& context) // Send request. If the connection assigned to this curlSession is closed or the socket is // somehow lost, libcurl will return CURLE_UNSUPPORTED_PROTOCOL // (https://curl.haxx.se/libcurl/c/curl_easy_send.html). Return the error back. - result = HttpRawSend(context); + result = SendRawHttp(context); if (result != CURLE_OK) { return result; @@ -265,7 +266,7 @@ CURLcode CurlSession::UploadBody(Context const& context) } // custom sending to wire an http request -CURLcode CurlSession::HttpRawSend(Context const& context) +CURLcode CurlSession::SendRawHttp(Context const& context) { // something like GET /path HTTP1.0 \r\nheaders\r\n auto rawRequest = this->m_request.GetHTTPMessagePreBody(); @@ -310,7 +311,7 @@ void CurlSession::ParseChunkSize() if (index + 1 == this->m_innerBufferSize) { // on last index. Whatever we read is the BodyStart here this->m_innerBufferSize - = ReadSocketToBuffer(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize); + = ReadFromSocket(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize); this->m_bodyStartInBuffer = 0; } else @@ -325,7 +326,7 @@ void CurlSession::ParseChunkSize() if (keepPolling) { // Read all internal buffer and \n was not found, pull from wire this->m_innerBufferSize - = ReadSocketToBuffer(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize); + = ReadFromSocket(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize); this->m_bodyStartInBuffer = 0; } } @@ -343,7 +344,7 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse() { // Try to fill internal buffer from socket. // If response is smaller than buffer, we will get back the size of the response - bufferSize = ReadSocketToBuffer(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize); + bufferSize = ReadFromSocket(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize); // returns the number of bytes parsed up to the body Start auto bytesParsed = parser.Parse(this->m_readBuffer, static_cast(bufferSize)); @@ -398,7 +399,7 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse() if (this->m_bodyStartInBuffer == -1) { // if nothing on inner buffer, pull from wire this->m_innerBufferSize - = ReadSocketToBuffer(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize); + = ReadFromSocket(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize); this->m_bodyStartInBuffer = 0; } @@ -438,7 +439,7 @@ int64_t CurlSession::Read(Azure::Core::Context const& context, uint8_t* buffer, else { // end of buffer, pull data from wire this->m_innerBufferSize - = ReadSocketToBuffer(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize); + = ReadFromSocket(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize); this->m_bodyStartInBuffer = 1; // jump first char (could be \r or \n) } } @@ -495,14 +496,14 @@ int64_t CurlSession::Read(Azure::Core::Context const& context, uint8_t* buffer, // Read from socket when no more data on internal buffer // For chunk request, read a chunk based on chunk size - totalRead = ReadSocketToBuffer(buffer, static_cast(readRequestLength)); + totalRead = ReadFromSocket(buffer, static_cast(readRequestLength)); this->m_sessionTotalRead += totalRead; return totalRead; } // Read from socket and return the number of bytes taken from socket -int64_t CurlSession::ReadSocketToBuffer(uint8_t* buffer, int64_t bufferSize) +int64_t CurlSession::ReadFromSocket(uint8_t* buffer, int64_t bufferSize) { // loop until read result is not CURLE_AGAIN size_t readBytes = 0; @@ -779,29 +780,41 @@ int64_t CurlSession::ResponseBufferParser::BuildHeader( return indexOfEndOfStatusLine + 1 - buffer; } -std::mutex CurlSession::s_connectionPoolMutex; -std::map>> - CurlSession::s_connectionPoolIndex; +std::mutex CurlConnectionPool::s_connectionPoolMutex; +std::map>> + CurlConnectionPool::s_connectionPoolIndex; +int32_t CurlConnectionPool::s_connectionCounter = 0; +bool CurlConnectionPool::s_isCleanConnectionsRunning = false; -std::unique_ptr CurlSession::GetCurlConnection(Request& request) +std::unique_ptr CurlConnectionPool::GetCurlConnection(Request& request) { std::string const& host = request.GetHost(); { // Critical section. Needs to own s_connectionPoolMutex before executing // Lock mutex to access connection pool. mutex is unlock as soon as lock is out of scope - std::lock_guard lock(s_connectionPoolMutex); + std::lock_guard lock(CurlConnectionPool::s_connectionPoolMutex); // get a ref to the pool from the map of pools - auto& hostPool = s_connectionPoolIndex[host]; - if (hostPool.size() > 0) + auto hostPoolIndex = CurlConnectionPool::s_connectionPoolIndex.find(host); + if (hostPoolIndex != CurlConnectionPool::s_connectionPoolIndex.end() + && hostPoolIndex->second.size() > 0) { // get ref to first connection - auto fistConnectionIterator = hostPool.begin(); + auto fistConnectionIterator = hostPoolIndex->second.begin(); // move the connection ref to temp ref auto connection = std::move(*fistConnectionIterator); // Remove the connection ref from list - hostPool.erase(fistConnectionIterator); + hostPoolIndex->second.erase(fistConnectionIterator); + // reduce number of connections on the pool + CurlConnectionPool::s_connectionCounter -= 1; + + // Remove index if there are no more connections + if (CurlConnectionPool::s_connectionPoolIndex.size() == 0) + { + CurlConnectionPool::s_connectionPoolIndex.erase(hostPoolIndex); + } + // return connection ref return connection; } @@ -850,10 +863,88 @@ std::unique_ptr CurlSession::GetCurlConnection(Requ // Move the connection back to the connection pool. Push it to the front so it becomes the first // connection to be picked next time some one ask for a connection to the pool (LIFO) -void CurlSession::MoveConnectionBackToPool(std::unique_ptr connection) +void CurlConnectionPool::MoveConnectionBackToPool(std::unique_ptr connection) { // Lock mutex to access connection pool. mutex is unlock as soon as lock is out of scope - std::lock_guard lock(s_connectionPoolMutex); - auto& hostPool = s_connectionPoolIndex[connection->GetHost()]; + std::lock_guard lock(CurlConnectionPool::s_connectionPoolMutex); + auto& hostPool = CurlConnectionPool::s_connectionPoolIndex[connection->GetHost()]; + // update the time when connection was moved back to pool + connection->updateLastUsageTime(); hostPool.push_front(std::move(connection)); + CurlConnectionPool::s_connectionCounter += 1; + // Check if there's no cleaner running and started + if (!CurlConnectionPool::s_isCleanConnectionsRunning) + { + CurlConnectionPool::s_isCleanConnectionsRunning = true; + CurlConnectionPool::CleanUp(); + } +} + +// spawn a thread for cleaning old connections. +// Thread will keep running while there are at least one connection in the pool +void CurlConnectionPool::CleanUp() +{ + std::thread backgroundCleanerThread([]() { + for (;;) + { + // wait before trying to clean + std::this_thread::sleep_for( + std::chrono::milliseconds(Details::c_DefaultCleanerIntervalMilliseconds)); + + { + // take mutex for reading the pool + std::lock_guard lock(CurlConnectionPool::s_connectionPoolMutex); + + if (CurlConnectionPool::s_connectionCounter == 0) + { + // stop the cleaner since there are no connections + CurlConnectionPool::s_isCleanConnectionsRunning = false; + return; + } + + // loop the connection pool index + for (auto index = CurlConnectionPool::s_connectionPoolIndex.begin(); + index != CurlConnectionPool::s_connectionPoolIndex.end(); + index++) + { + if (index->second.size() == 0) + { + // Move the next pool index + continue; + } + + // Pool index with waiting connections. Loop the connection pool backwards until + // a connection that is not expired is found or until all connections are removed. + for (auto connection = index->second.end();;) + { + // loop starts at end(), go back to previous possition. We know the list is size() > 0 + // so we are safe to go end() - 1 and find the last element in the list + connection--; + if (connection->get()->isExpired()) + { + // remove connection from the pool and update the connection to the next one which + // is going to be list.end() + connection = index->second.erase(connection); + CurlConnectionPool::s_connectionCounter -= 1; + + // Connection removed, break if there are no more connections to check + if (index->second.size() == 0) + { + break; + } + } + else + { + // Got a non-expired connection, all connections before this one are not expired. + // Break the loop and continue looping the Pool index + break; + } + } + } + } + } + }); + + // let thread run independent. It will be done once ther is not connections in the pool + backgroundCleanerThread.detach(); } diff --git a/sdk/core/azure-core/test/ut/transport_adapter.cpp b/sdk/core/azure-core/test/ut/transport_adapter.cpp index 5609646e58..1a2971ca41 100644 --- a/sdk/core/azure-core/test/ut/transport_adapter.cpp +++ b/sdk/core/azure-core/test/ut/transport_adapter.cpp @@ -4,6 +4,7 @@ #include "transport_adapter.hpp" #include #include +#include #include #include @@ -43,7 +44,7 @@ namespace Azure { namespace Core { namespace Test { CheckBodyFromBuffer(*response, expectedResponseBodySize + 6 + 13); } - // multiThread test requires `s_ConnectionsOnPool` hook which is only available when building + // multiThread test requires `ConnectionsOnPool` hook which is only available when building // TESTING_BUILD. This test cases are only built when that case is true.` TEST_F(TransportAdapter, getMultiThread) { @@ -61,7 +62,8 @@ namespace Azure { namespace Core { namespace Test { std::thread t2(threadRoutine); t1.join(); t2.join(); - auto connectionsNow = Http::CurlSession::s_ConnectionsOnPool("httpbin.org"); + auto connectionsNow = Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"); + // 2 connections must be available at this point EXPECT_EQ(connectionsNow, 2); @@ -71,11 +73,67 @@ namespace Azure { namespace Core { namespace Test { t3.join(); t4.join(); t5.join(); - connectionsNow = Http::CurlSession::s_ConnectionsOnPool("httpbin.org"); + connectionsNow = Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"); // Two connections re-used plus one connection created EXPECT_EQ(connectionsNow, 3); } +#ifdef RUN_LONG_UNIT_TESTS + TEST_F(TransportAdapter, ConnectionPoolCleaner) + { + std::string host("http://httpbin.org/get"); + + auto threadRoutine = [host]() { + auto request = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Get, host); + auto response = pipeline.Send(context, request); + checkResponseCode(response->GetStatusCode()); + auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length")); + CheckBodyFromBuffer(*response, expectedResponseBodySize); + }; + + // one index expected from previous tests + EXPECT_EQ(Http::CurlConnectionPool::ConnectionsIndexOnPool(), 1); + + std::cout + << "Running Connection Pool Cleaner Test. This test takes more than 3 minutes to complete." + << std::endl + << "Add compiler option -DRUN_LONG_UNIT_TESTS=OFF when building if you want to skip this " + "test." + << std::endl; + + // Wait for 100 secs to make sure any previous connection is removed by the cleaner + std::this_thread::sleep_for(std::chrono::milliseconds(1000 * 100)); + + std::cout << "First wait time done. Validating state." << std::endl; + + // index is not affected by cleaner. It does not remove index + EXPECT_EQ(Http::CurlConnectionPool::ConnectionsIndexOnPool(), 1); + // cleaner should have remove connections + EXPECT_EQ(Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"), 0); + + // Let cleaner finish + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + + std::thread t1(threadRoutine); + std::thread t2(threadRoutine); + t1.join(); + t2.join(); + + // 2 connections must be available at this point and one index + EXPECT_EQ(Http::CurlConnectionPool::ConnectionsIndexOnPool(), 1); + EXPECT_EQ(Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"), 2); + + // At this point, cleaner should be ON and will clean connections after on second. + // After 5 seconds connection pool should have been cleaned + std::this_thread::sleep_for(std::chrono::milliseconds(1000 * 100)); + + std::cout << "Second wait time done. Validating state." << std::endl; + + // EXPECT_EQ(Http::CurlSession::ConnectionsIndexOnPool(), 0); + EXPECT_EQ(Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"), 0); + } +#endif + TEST_F(TransportAdapter, get204) { std::string host("http://mt3.google.com/generate_204");