Skip to content

Commit

Permalink
Merge pull request #1746 from neo1973/ThreadPool
Browse files Browse the repository at this point in the history
Add a simple thread pool
  • Loading branch information
CastagnaIT authored Jan 4, 2025
2 parents d789f97 + db450e9 commit 4b49c87
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 1 deletion.
4 changes: 3 additions & 1 deletion src/samplereader/SampleReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#pragma once

#include "utils/CryptoUtils.h"
#include "utils/ThreadPool.h"

#include <bento4/Ap4.h>

Expand Down Expand Up @@ -96,7 +97,8 @@ class ATTR_DLL_LOCAL ISampleReader
*/
void ReadSampleAsync()
{
m_readSampleAsyncState = std::async(std::launch::async, &ISampleReader::ReadSample, this);
m_readSampleAsyncState =
UTILS::THREAD::GlobalThreadPool.Execute(&ISampleReader::ReadSample, this);
}

/*!
Expand Down
2 changes: 2 additions & 0 deletions src/utils/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ set(SOURCES
FileUtils.cpp
JsonUtils.cpp
StringUtils.cpp
ThreadPool.cpp
UrlUtils.cpp
Utils.cpp
XMLUtils.cpp
Expand All @@ -21,6 +22,7 @@ set(HEADERS
JsonUtils.h
log.h
StringUtils.h
ThreadPool.h
UrlUtils.h
Utils.h
XMLUtils.h
Expand Down
72 changes: 72 additions & 0 deletions src/utils/ThreadPool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (C) 2025 Team Kodi
* This file is part of Kodi - https://kodi.tv
*
* SPDX-License-Identifier: GPL-2.0-or-later
* See LICENSES/README.md for more information.
*/

#include "ThreadPool.h"

UTILS::THREAD::ThreadPool::~ThreadPool()
{
Stop();
}

void UTILS::THREAD::ThreadPool::Stop()
{
{
std::lock_guard lock(m_mutex);
m_isStopped = true;
}

m_condVar.notify_all();

for (const auto& executor : m_executors)
executor->Join();
}

std::optional<std::function<void()>> UTILS::THREAD::ThreadPool::TakeTask()
{
std::unique_lock lock(m_mutex);

m_condVar.wait(lock, [this]() { return !m_taskQueue.empty() || m_isStopped; });

if (m_isStopped)
return {};

++m_activeExecutors;
auto func = std::move(m_taskQueue.front());
m_taskQueue.pop();
return {std::move(func)};
}

void UTILS::THREAD::ThreadPool::TaskFinished()
{
std::lock_guard lock(m_mutex);
--m_activeExecutors;
}

UTILS::THREAD::ThreadPool::Executor::Executor(ThreadPool& threadPool) : m_threadPool(&threadPool)
{
m_thread = std::thread(&Executor::Run, this);
}

void UTILS::THREAD::ThreadPool::Executor::Join()
{
m_thread.join();
}

void UTILS::THREAD::ThreadPool::Executor::Run()
{
while (true)
{
auto f = m_threadPool->TakeTask();
if (!f)
return;
(*f)();
m_threadPool->TaskFinished();
}
}

UTILS::THREAD::ThreadPool UTILS::THREAD::GlobalThreadPool;
117 changes: 117 additions & 0 deletions src/utils/ThreadPool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright (C) 2025 Team Kodi
* This file is part of Kodi - https://kodi.tv
*
* SPDX-License-Identifier: GPL-2.0-or-later
* See LICENSES/README.md for more information.
*/

#include <condition_variable>
#include <exception>
#include <functional>
#include <future>
#include <mutex>
#include <optional>
#include <queue>
#include <thread>
#include <vector>

#pragma once

namespace UTILS::THREAD
{

/*!
* \brief A simple thread pool
*
* The thread pool automatically grows if there are more concurrent tasks then
* threads. Automatic shrinking is not implemented but this could easily be
* added if desired.
*/
class ThreadPool
{
public:
ThreadPool() = default;
~ThreadPool();

/*!
* \brief Execute a callable on a thread pool thread
*
* \attention If the `std::future` obtained from this function is not moved
* from or bound to a reference, the destructor of the `std::future` will
* block at the end of the full expression until the asynchronous operation
* completes, essentially making the call synchronous.
*/
template<class F, class... Args>
[[nodiscard]] auto Execute(F&& f, Args&&... args) -> auto
{
using return_type = decltype(std::invoke(f, args...));

std::future<return_type> future;

{
std::lock_guard lock(m_mutex);

if (m_isStopped)
{
std::promise<return_type> p;
p.set_exception(
std::make_exception_ptr(std::runtime_error("ThreadPool has already been stopped")));
return p.get_future();
}

auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
future = task->get_future();

m_taskQueue.emplace([task = std::move(task)]() { (*task)(); });

// Check if there are enough executors for the number of task, if not create more
if (m_executors.size() - m_activeExecutors < m_taskQueue.size())
m_executors.emplace_back(std::make_unique<Executor>(*this));
}

m_condVar.notify_one();

return future;
}

/*!
* \brief Don't allow execution of new tasks and block until all running tasks completed
*/
void Stop();

private:
class Executor
{
public:
Executor(ThreadPool& threadPool);
void Join();

private:
ThreadPool* m_threadPool;
std::thread m_thread;
void Run();
};

/*!
* \brief Returns a task or nothing, if nothing is returned the Executor should exit
*/
std::optional<std::function<void()>> TakeTask();
/*!
* \brief Informs the thread pool that a task is done and the Executor is available again
*/
void TaskFinished();

// Executors need to be on the heap for a stable `this` pointer
std::vector<std::unique_ptr<Executor>> m_executors;
std::queue<std::function<void()>> m_taskQueue;
std::mutex m_mutex;
std::condition_variable m_condVar;
size_t m_activeExecutors{0};
bool m_isStopped{false};
};

extern ThreadPool GlobalThreadPool;

} // namespace UTILS::THREAD

0 comments on commit 4b49c87

Please sign in to comment.