Skip to content

Commit

Permalink
Add enqueue and forget function for Thread Pool
Browse files Browse the repository at this point in the history
  • Loading branch information
lczech committed Jul 4, 2024
1 parent 9a7253a commit 64d8254
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 10 deletions.
2 changes: 1 addition & 1 deletion lib/genesis/utils/containers/generic_input_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ class GenericInputStream
// Despite its parallel nature, we can use a thread pool here, as we are only ever
// submitting a single read task to it, so there cannot be two reads of the same lambda
// iterator in the pool.
*future_ = generator_->thread_pool_->enqueue(
*future_ = generator_->thread_pool_->enqueue_and_retrieve(
[ generator, buffer_block, block_size ](){
return read_block_( generator, buffer_block, block_size );
}
Expand Down
7 changes: 5 additions & 2 deletions lib/genesis/utils/io/gzip_block_ostream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,13 +545,16 @@ class GzipBlockOStreambuf

// Assert that all pointers are where they should be
assert( pbase() == cur_block.block.get_input_buffer().first );
assert( epptr() == cur_block.block.get_input_buffer().first + cur_block.block.get_input_buffer().second );
assert(
epptr() ==
cur_block.block.get_input_buffer().first + cur_block.block.get_input_buffer().second
);

// Send block to a compression worker thread, using all bytes that have been written to it.
// The thread pool will pick up the task once a thread is available.
assert( thread_pool_ );
auto const avail_in = pptr() - pbase();
cur_block.future = thread_pool_->enqueue(
cur_block.future = thread_pool_->enqueue_and_retrieve(
[&]( size_t av_in ){
cur_block.block.compress( av_in );
},
Expand Down
2 changes: 1 addition & 1 deletion lib/genesis/utils/io/input_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class AsynchronousReader

// We capture the target by value, meaning that the caller has to stay alive until the
// task is finished, so that we don't get a memory access violation for the buffer.
future_ = thread_pool_->enqueue(
future_ = thread_pool_->enqueue_and_retrieve(
[=](){
return input_source->read( target_buffer, target_size );
}
Expand Down
2 changes: 1 addition & 1 deletion lib/genesis/utils/threading/thread_functions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ MultiFuture<R> parallel_block(
auto const e = begin_t + static_cast<T>( current_start + l );
assert( l > 0 );
assert( b < e );
result[i] = thread_pool->enqueue( std::forward<F>( body ), b, e );
result[i] = thread_pool->enqueue_and_retrieve( std::forward<F>( body ), b, e );

// Our next block will start where this one ended.
current_start += l;
Expand Down
60 changes: 57 additions & 3 deletions lib/genesis/utils/threading/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,20 +423,23 @@ class ThreadPool
// -------------------------------------------------------------

/**
* @brief Enqueue a new task, using a function to call and its arguments.
* @brief Enqueue a new task, using a function to call and its arguments, and returning
* a future to receive the result of the task.
*
* The enqueue function returns a future that can be used to check whether the task has
* finished, or to wait for it to be finished. This also allows the task to send its result
* back to the caller, if needed, by simply returning it from the task function.
*
* We internally use a std::packaged_task, so that any exception thrown in the function will
* be caught and trapped inside of the future, until its get() function is called.
* See enqueue_detached() for an alternative function that does not incur the overhead of
* creating the packaged_task and future, and hence has 50% less overhead.
*
* If enqueuing the task would exceed the max queue size, we instead first process existing
* tasks until there is enough space in the queue. This makes the caller do wait and work.
* tasks until there is e space in the queue. This makes the caller do wait and work.
*/
template<class F, class... Args>
auto enqueue( F&& f, Args&&... args )
auto enqueue_and_retrieve( F&& f, Args&&... args )
-> ProactiveFuture<typename std::result_of<F(Args...)>::type>
{
// Check that we can enqueue a task at the moment, of if we need to wait and to work first.
Expand Down Expand Up @@ -490,6 +493,57 @@ class ThreadPool
return future_result;
}

/**
* @brief Enqueue a new task, using a function to call and its arguments, without a std::future.
*
* This function simply submits the task to the pool, but does not create a std::future for the
* caller to wait for the result. Hence, this mostly makes sense for tasks that do not return a
* result that is needed. Thus, the task function itself needs to take care for propagating its
* result, if needed. This has 50% less overhead compared to enqueue_and_retrieve().
*
* If enqueuing the task would exceed the max queue size, we instead first process existing
* tasks until there is enough space in the queue. This makes the caller do wait and work.
*/
template<class F, class... Args>
void enqueue_detached( F&& f, Args&&... args )
{
// Check that we can enqueue a task at the moment, of if we need to wait and to work first.
// In a high-contention situation, this of course could fail, so that once the loop condition
// is checked, some other task already has finished the work. But that doesn't matter, the
// call to try_run_pending_task will catch that and just do nothing. Also, the other way round
// could happen, and the queue could in theory be overloaded if many threads try to enqueue
// at exactly the same time. But we probably never have enough threads for that to be a real
// issue - worst case, we exceed the max queue size by the number of threads, which is fine.
// All we want to avoid is to have an infinitely growing queue.
while( max_queue_size_ > 0 && currently_enqueued_tasks() >= max_queue_size_ ) {
try_run_pending_task();
}

// Prepare the task that we want to submit, by wrapping the function to be called.
// All this wrapping should be completely transparent to the compiler, and removed.
// The task captures the package including the promise that is needed for the future.
WrappedTask wrapped_task;
auto task_function = std::bind( std::forward<F>(f), std::forward<Args>(args)... );
wrapped_task.function = [task_function, this]()
{
// Run the actual work task here.
// Once done, we can signal this to the unfinished list.
task_function();
assert( this->unfinished_tasks_.load() > 0 );
--this->unfinished_tasks_;
};

// We add the task, incrementing the unfinished counter, and only decrementing it once the
// task has been fully processed. That way, the counter always tells us if there is still
// work going on. We capture a reference to `this` in the task above, which could be
// dangerous if the threads survive the lifetime of the pool, but given that their exit
// condition is only called from the pool destructor, this should never be able to happen.
++unfinished_tasks_;
task_queue_.enqueue(
std::move( wrapped_task )
);
}

/**
* @brief Helper function to run a pending task from outside the pool.
*
Expand Down
4 changes: 2 additions & 2 deletions test/src/utils/threading/thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void thread_pool_work_(size_t i)

// LOG_DBG << "sub B " << i;
auto pool = Options::get().global_thread_pool();
auto res = pool->enqueue([=](){
auto res = pool->enqueue_and_retrieve([=](){
// LOG_DBG << "beg B " << i;
thread_pool_sleep_();
// LOG_DBG << "end B " << i;
Expand All @@ -81,7 +81,7 @@ TEST( ThreadPool, Nested )
for (size_t i = 0; i < 4; ++i) {
// LOG_DBG << "sub A " << i;
auto pool = Options::get().global_thread_pool();
tasks.emplace_back( pool->enqueue( [=](){
tasks.emplace_back( pool->enqueue_and_retrieve( [=](){
// LOG_DBG << "beg A " << i;
thread_pool_sleep_();
thread_pool_work_(i);
Expand Down

0 comments on commit 64d8254

Please sign in to comment.