From fa9b63ee2c9e92c30082cd5becff04074fefd2f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20H=C3=A4rer?= Date: Mon, 30 Dec 2024 15:08:40 +0100 Subject: [PATCH 1/2] [ThreadPool] Initial commit --- src/utils/CMakeLists.txt | 2 + src/utils/ThreadPool.cpp | 72 ++++++++++++++++++++++++ src/utils/ThreadPool.h | 117 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 191 insertions(+) create mode 100644 src/utils/ThreadPool.cpp create mode 100644 src/utils/ThreadPool.h diff --git a/src/utils/CMakeLists.txt b/src/utils/CMakeLists.txt index 9b0d1abe9..c4a2a5ccf 100644 --- a/src/utils/CMakeLists.txt +++ b/src/utils/CMakeLists.txt @@ -6,6 +6,7 @@ set(SOURCES FileUtils.cpp JsonUtils.cpp StringUtils.cpp + ThreadPool.cpp UrlUtils.cpp Utils.cpp XMLUtils.cpp @@ -21,6 +22,7 @@ set(HEADERS JsonUtils.h log.h StringUtils.h + ThreadPool.h UrlUtils.h Utils.h XMLUtils.h diff --git a/src/utils/ThreadPool.cpp b/src/utils/ThreadPool.cpp new file mode 100644 index 000000000..e6fdfe4da --- /dev/null +++ b/src/utils/ThreadPool.cpp @@ -0,0 +1,72 @@ +/* + * Copyright (C) 2025 Team Kodi + * This file is part of Kodi - https://kodi.tv + * + * SPDX-License-Identifier: GPL-2.0-or-later + * See LICENSES/README.md for more information. + */ + +#include "ThreadPool.h" + +UTILS::THREAD::ThreadPool::~ThreadPool() +{ + Stop(); +} + +void UTILS::THREAD::ThreadPool::Stop() +{ + { + std::lock_guard lock(m_mutex); + m_isStopped = true; + } + + m_condVar.notify_all(); + + for (const auto& executor : m_executors) + executor->Join(); +} + +std::optional> UTILS::THREAD::ThreadPool::TakeTask() +{ + std::unique_lock lock(m_mutex); + + m_condVar.wait(lock, [this]() { return !m_taskQueue.empty() || m_isStopped; }); + + if (m_isStopped) + return {}; + + ++m_activeExecutors; + auto func = std::move(m_taskQueue.front()); + m_taskQueue.pop(); + return {std::move(func)}; +} + +void UTILS::THREAD::ThreadPool::TaskFinished() +{ + std::lock_guard lock(m_mutex); + --m_activeExecutors; +} + +UTILS::THREAD::ThreadPool::Executor::Executor(ThreadPool& threadPool) : m_threadPool(&threadPool) +{ + m_thread = std::thread(&Executor::Run, this); +} + +void UTILS::THREAD::ThreadPool::Executor::Join() +{ + m_thread.join(); +} + +void UTILS::THREAD::ThreadPool::Executor::Run() +{ + while (true) + { + auto f = m_threadPool->TakeTask(); + if (!f) + return; + (*f)(); + m_threadPool->TaskFinished(); + } +} + +UTILS::THREAD::ThreadPool UTILS::THREAD::GlobalThreadPool; diff --git a/src/utils/ThreadPool.h b/src/utils/ThreadPool.h new file mode 100644 index 000000000..cf6368d1f --- /dev/null +++ b/src/utils/ThreadPool.h @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2025 Team Kodi + * This file is part of Kodi - https://kodi.tv + * + * SPDX-License-Identifier: GPL-2.0-or-later + * See LICENSES/README.md for more information. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#pragma once + +namespace UTILS::THREAD +{ + +/*! + * \brief A simple thread pool + * + * The thread pool automatically grows if there are more concurrent tasks then + * threads. Automatic shrinking is not implemented but this could easily be + * added if desired. + */ +class ThreadPool +{ +public: + ThreadPool() = default; + ~ThreadPool(); + + /*! + * \brief Execute a callable on a thread pool thread + * + * \attention If the `std::future` obtained from this function is not moved + * from or bound to a reference, the destructor of the `std::future` will + * block at the end of the full expression until the asynchronous operation + * completes, essentially making the call synchronous. + */ + template + [[nodiscard]] auto Execute(F&& f, Args&&... args) -> auto + { + using return_type = decltype(std::invoke(f, args...)); + + std::future future; + + { + std::lock_guard lock(m_mutex); + + if (m_isStopped) + { + std::promise p; + p.set_exception( + std::make_exception_ptr(std::runtime_error("ThreadPool has already been stopped"))); + return p.get_future(); + } + + auto task = std::make_shared>( + std::bind(std::forward(f), std::forward(args)...)); + future = task->get_future(); + + m_taskQueue.emplace([task = std::move(task)]() { (*task)(); }); + + // Check if there are enough executors for the number of task, if not create more + if (m_executors.size() - m_activeExecutors < m_taskQueue.size()) + m_executors.emplace_back(std::make_unique(*this)); + } + + m_condVar.notify_one(); + + return future; + } + + /*! + * \brief Don't allow execution of new tasks and block until all running tasks completed + */ + void Stop(); + +private: + class Executor + { + public: + Executor(ThreadPool& threadPool); + void Join(); + + private: + ThreadPool* m_threadPool; + std::thread m_thread; + void Run(); + }; + + /*! + * \brief Returns a task or nothing, if nothing is returned the Executor should exit + */ + std::optional> TakeTask(); + /*! + * \brief Informs the thread pool that a task is done and the Executor is available again + */ + void TaskFinished(); + + // Executors need to be on the heap for a stable `this` pointer + std::vector> m_executors; + std::queue> m_taskQueue; + std::mutex m_mutex; + std::condition_variable m_condVar; + size_t m_activeExecutors{0}; + bool m_isStopped{false}; +}; + +extern ThreadPool GlobalThreadPool; + +} // namespace UTILS::THREAD From db450e9f66be1332d4ce58a9c3941c47d35d8345 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20H=C3=A4rer?= Date: Mon, 30 Dec 2024 15:12:25 +0100 Subject: [PATCH 2/2] [SampleReader] Use the global `ThreadPool` --- src/samplereader/SampleReader.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/samplereader/SampleReader.h b/src/samplereader/SampleReader.h index 680a04bb0..747f36a57 100644 --- a/src/samplereader/SampleReader.h +++ b/src/samplereader/SampleReader.h @@ -9,6 +9,7 @@ #pragma once #include "utils/CryptoUtils.h" +#include "utils/ThreadPool.h" #include @@ -96,7 +97,8 @@ class ATTR_DLL_LOCAL ISampleReader */ void ReadSampleAsync() { - m_readSampleAsyncState = std::async(std::launch::async, &ISampleReader::ReadSample, this); + m_readSampleAsyncState = + UTILS::THREAD::GlobalThreadPool.Execute(&ISampleReader::ReadSample, this); } /*!