Autonomy Software C++ 24.5.1
Welcome to the Autonomy Software repository of the Mars Rover Design Team (MRDT) at Missouri University of Science and Technology (Missouri S&T)! API reference contains the source code and other resources for the development of the autonomy software for our Mars rover. The Autonomy Software project aims to compete in the University Rover Challenge (URC) by demonstrating advanced autonomous capabilities and robust navigation algorithms.
Loading...
Searching...
No Matches
BS::thread_pool Class Reference

A fast, lightweight, and easy-to-use C++17 thread pool class. More...

#include <BS_thread_pool.hpp>

Collaboration diagram for BS::thread_pool:

Classes

class  blocks
 A helper class to divide a range into blocks. Used by detach_blocks(), submit_blocks(), detach_loop(), and submit_loop(). More...
 

Public Member Functions

 thread_pool ()
 Construct a new thread pool. The number of threads will be the total number of hardware threads available, as reported by the implementation. This is usually determined by the number of cores in the CPU. If a core is hyperthreaded, it will count as two threads.
 
 thread_pool (const concurrency_t num_threads)
 Construct a new thread pool with the specified number of threads.
 
 thread_pool (const std::function< void()> &init_task)
 Construct a new thread pool with the specified initialization function.
 
 thread_pool (const concurrency_t num_threads, const std::function< void()> &init_task)
 Construct a new thread pool with the specified number of threads and initialization function.
 
 thread_pool (const thread_pool &)=delete
 
 thread_pool (thread_pool &&)=delete
 
thread_pooloperator= (const thread_pool &)=delete
 
thread_pooloperator= (thread_pool &&)=delete
 
 ~thread_pool ()
 Destruct the thread pool. Waits for all tasks to complete, then destroys all threads. Note that if the pool is paused, then any tasks still in the queue will never be executed.
 
size_t get_tasks_queued () const
 Get the number of tasks currently waiting in the queue to be executed by the threads.
 
size_t get_tasks_running () const
 Get the number of tasks currently being executed by the threads.
 
size_t get_tasks_total () const
 Get the total number of unfinished tasks: either still waiting in the queue, or running in a thread. Note that get_tasks_total() == get_tasks_queued() + get_tasks_running().
 
concurrency_t get_thread_count () const
 Get the number of threads in the pool.
 
std::vector< std::thread::id > get_thread_ids () const
 Get a vector containing the unique identifiers for each of the pool's threads, as obtained by std::thread::get_id().
 
void purge ()
 Purge all the tasks waiting in the queue. Tasks that are currently running will not be affected, but any tasks still waiting in the queue will be discarded, and will never be executed by the threads. Please note that there is no way to restore the purged tasks.
 
template<typename F >
void detach_task (F &&task BS_THREAD_POOL_PRIORITY_INPUT)
 Submit a function with no arguments and no return value into the task queue, with the specified priority. To push a function with arguments, enclose it in a lambda expression. Does not return a future, so the user must use wait() or some other method to ensure that the task finishes executing, otherwise bad things will happen.
 
template<typename T , typename F >
void detach_blocks (const T first_index, const T index_after_last, F &&block, const size_t num_blocks=0 BS_THREAD_POOL_PRIORITY_INPUT)
 Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue, with the specified priority. The block function takes two arguments, the start and end of the block, so that it is only called only once per block, but it is up to the user make sure the block function correctly deals with all the indices in each block. Does not return a multi_future, so the user must use wait() or some other method to ensure that the loop finishes executing, otherwise bad things will happen.
 
template<typename T , typename F >
void detach_loop (const T first_index, const T index_after_last, F &&loop, const size_t num_blocks=0 BS_THREAD_POOL_PRIORITY_INPUT)
 Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue, with the specified priority. The loop function takes one argument, the loop index, so that it is called many times per block. Does not return a multi_future, so the user must use wait() or some other method to ensure that the loop finishes executing, otherwise bad things will happen.
 
template<typename T , typename F >
void detach_sequence (const T first_index, const T index_after_last, F &&sequence BS_THREAD_POOL_PRIORITY_INPUT)
 Submit a sequence of tasks enumerated by indices to the queue, with the specified priority. Does not return a multi_future, so the user must use wait() or some other method to ensure that the sequence finishes executing, otherwise bad things will happen.
 
void reset ()
 Reset the pool with the total number of hardware threads available, as reported by the implementation. Waits for all currently running tasks to be completed, then destroys all threads in the pool and creates a new thread pool with the new number of threads. Any tasks that were waiting in the queue before the pool was reset will then be executed by the new threads. If the pool was paused before resetting it, the new pool will be paused as well.
 
void reset (const concurrency_t num_threads)
 Reset the pool with a new number of threads. Waits for all currently running tasks to be completed, then destroys all threads in the pool and creates a new thread pool with the new number of threads. Any tasks that were waiting in the queue before the pool was reset will then be executed by the new threads. If the pool was paused before resetting it, the new pool will be paused as well.
 
void reset (const std::function< void()> &init_task)
 Reset the pool with the total number of hardware threads available, as reported by the implementation, and a new initialization function. Waits for all currently running tasks to be completed, then destroys all threads in the pool and creates a new thread pool with the new number of threads and initialization function. Any tasks that were waiting in the queue before the pool was reset will then be executed by the new threads. If the pool was paused before resetting it, the new pool will be paused as well.
 
void reset (const concurrency_t num_threads, const std::function< void()> &init_task)
 Reset the pool with a new number of threads and a new initialization function. Waits for all currently running tasks to be completed, then destroys all threads in the pool and creates a new thread pool with the new number of threads and initialization function. Any tasks that were waiting in the queue before the pool was reset will then be executed by the new threads. If the pool was paused before resetting it, the new pool will be paused as well.
 
template<typename F , typename R = std::invoke_result_t<std::decay_t<F>>>
std::future< R > submit_task (F &&task BS_THREAD_POOL_PRIORITY_INPUT)
 Submit a function with no arguments into the task queue, with the specified priority. To submit a function with arguments, enclose it in a lambda expression. If the function has a return value, get a future for the eventual returned value. If the function has no return value, get an std::future<void> which can be used to wait until the task finishes.
 
template<typename T , typename F , typename R = std::invoke_result_t<std::decay_t<F>, T, T>>
multi_future< R > submit_blocks (const T first_index, const T index_after_last, F &&block, const size_t num_blocks=0 BS_THREAD_POOL_PRIORITY_INPUT)
 Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue, with the specified priority. The block function takes two arguments, the start and end of the block, so that it is only called only once per block, but it is up to the user make sure the block function correctly deals with all the indices in each block. Returns a multi_future that contains the futures for all of the blocks.
 
template<typename T , typename F >
multi_future< void > submit_loop (const T first_index, const T index_after_last, F &&loop, const size_t num_blocks=0 BS_THREAD_POOL_PRIORITY_INPUT)
 Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue, with the specified priority. The loop function takes one argument, the loop index, so that it is called many times per block. It must have no return value. Returns a multi_future that contains the futures for all of the blocks.
 
template<typename T , typename F , typename R = std::invoke_result_t<std::decay_t<F>, T>>
multi_future< R > submit_sequence (const T first_index, const T index_after_last, F &&sequence BS_THREAD_POOL_PRIORITY_INPUT)
 Submit a sequence of tasks enumerated by indices to the queue, with the specified priority. Returns a multi_future that contains the futures for all of the tasks.
 
void wait ()
 Wait for tasks to be completed. Normally, this function waits for all tasks, both those that are currently running in the threads and those that are still waiting in the queue. However, if the pool is paused, this function only waits for the currently running tasks (otherwise it would wait forever). Note: To wait for just one specific task, use submit_task() instead, and call the wait() member function of the generated future.
 
template<typename R , typename P >
bool wait_for (const std::chrono::duration< R, P > &duration)
 Wait for tasks to be completed, but stop waiting after the specified duration has passed.
 
template<typename C , typename D >
bool wait_until (const std::chrono::time_point< C, D > &timeout_time)
 Wait for tasks to be completed, but stop waiting after the specified time point has been reached.
 

Private Member Functions

void create_threads (const std::function< void()> &init_task)
 Create the threads in the pool and assign a worker to each thread.
 
void destroy_threads ()
 Destroy the threads in the pool.
 
void worker (const concurrency_t idx, const std::function< void()> &init_task)
 A worker function to be assigned to each thread in the pool. Waits until it is notified by detach_task() that a task is available, and then retrieves the task from the queue and executes it. Once the task finishes, the worker notifies wait() in case it is waiting.
 

Static Private Member Functions

static concurrency_t determine_thread_count (const concurrency_t num_threads)
 Determine how many threads the pool should have, based on the parameter passed to the constructor or reset().
 

Private Attributes

std::condition_variable task_available_cv = {}
 A condition variable to notify worker() that a new task has become available.
 
std::condition_variable tasks_done_cv = {}
 A condition variable to notify wait() that the tasks are done.
 
std::queue< std::function< void()> > tasks = {}
 A queue of tasks to be executed by the threads.
 
size_t tasks_running = 0
 A counter for the total number of currently running tasks.
 
std::mutex tasks_mutex = {}
 A mutex to synchronize access to the task queue by different threads.
 
concurrency_t thread_count = 0
 The number of threads in the pool.
 
std::unique_ptr< std::thread[]> threads = nullptr
 A smart pointer to manage the memory allocated for the threads.
 
bool waiting = false
 A flag indicating that wait() is active and expects to be notified whenever a task is done.
 
bool workers_running = false
 A flag indicating to the workers to keep running. When set to false, the workers terminate permanently.
 

Detailed Description

A fast, lightweight, and easy-to-use C++17 thread pool class.

Constructor & Destructor Documentation

◆ thread_pool() [1/4]

BS::thread_pool::thread_pool ( )
inline

Construct a new thread pool. The number of threads will be the total number of hardware threads available, as reported by the implementation. This is usually determined by the number of cores in the CPU. If a core is hyperthreaded, it will count as two threads.

298: thread_pool(0, [] {}) {}
thread_pool()
Construct a new thread pool. The number of threads will be the total number of hardware threads avail...
Definition BS_thread_pool.hpp:298

◆ thread_pool() [2/4]

BS::thread_pool::thread_pool ( const concurrency_t  num_threads)
inlineexplicit

Construct a new thread pool with the specified number of threads.

Parameters
num_threadsThe number of threads to use.
305: thread_pool(num_threads, [] {}) {}

◆ thread_pool() [3/4]

BS::thread_pool::thread_pool ( const std::function< void()> &  init_task)
inlineexplicit

Construct a new thread pool with the specified initialization function.

Parameters
init_taskAn initialization function to run in each thread before it starts to execute any submitted tasks. The function must take no arguments and have no return value. It will only be executed exactly once, when the thread is first constructed.
312: thread_pool(0, init_task) {}

◆ thread_pool() [4/4]

BS::thread_pool::thread_pool ( const concurrency_t  num_threads,
const std::function< void()> &  init_task 
)
inline

Construct a new thread pool with the specified number of threads and initialization function.

Parameters
num_threadsThe number of threads to use.
init_taskAn initialization function to run in each thread before it starts to execute any submitted tasks. The function must take no arguments and have no return value. It will only be executed exactly once, when the thread is first constructed.
320 : thread_count(determine_thread_count(num_threads)), threads(std::make_unique<std::thread[]>(determine_thread_count(num_threads)))
321 {
322 create_threads(init_task);
323 }
static concurrency_t determine_thread_count(const concurrency_t num_threads)
Determine how many threads the pool should have, based on the parameter passed to the constructor or ...
Definition BS_thread_pool.hpp:892
concurrency_t thread_count
The number of threads in the pool.
Definition BS_thread_pool.hpp:1138
void create_threads(const std::function< void()> &init_task)
Create the threads in the pool and assign a worker to each thread.
Definition BS_thread_pool.hpp:857
std::unique_ptr< std::thread[]> threads
A smart pointer to manage the memory allocated for the threads.
Definition BS_thread_pool.hpp:1143

◆ ~thread_pool()

BS::thread_pool::~thread_pool ( )
inline

Destruct the thread pool. Waits for all tasks to complete, then destroys all threads. Note that if the pool is paused, then any tasks still in the queue will never be executed.

335 {
336 wait();
338 }
void destroy_threads()
Destroy the threads in the pool.
Definition BS_thread_pool.hpp:873
void wait()
Wait for tasks to be completed. Normally, this function waits for all tasks, both those that are curr...
Definition BS_thread_pool.hpp:761

Member Function Documentation

◆ get_tasks_queued()

size_t BS::thread_pool::get_tasks_queued ( ) const
inline

Get the number of tasks currently waiting in the queue to be executed by the threads.

Returns
The number of queued tasks.
367 {
368 const std::scoped_lock tasks_lock(tasks_mutex);
369 return tasks.size();
370 }
std::mutex tasks_mutex
A mutex to synchronize access to the task queue by different threads.
Definition BS_thread_pool.hpp:1133
std::queue< std::function< void()> > tasks
A queue of tasks to be executed by the threads.
Definition BS_thread_pool.hpp:1122
Here is the caller graph for this function:

◆ get_tasks_running()

size_t BS::thread_pool::get_tasks_running ( ) const
inline

Get the number of tasks currently being executed by the threads.

Returns
The number of running tasks.
378 {
379 const std::scoped_lock tasks_lock(tasks_mutex);
380 return tasks_running;
381 }
size_t tasks_running
A counter for the total number of currently running tasks.
Definition BS_thread_pool.hpp:1128
Here is the caller graph for this function:

◆ get_tasks_total()

size_t BS::thread_pool::get_tasks_total ( ) const
inline

Get the total number of unfinished tasks: either still waiting in the queue, or running in a thread. Note that get_tasks_total() == get_tasks_queued() + get_tasks_running().

Returns
The total number of tasks.
389 {
390 const std::scoped_lock tasks_lock(tasks_mutex);
391 return tasks_running + tasks.size();
392 }
Here is the caller graph for this function:

◆ get_thread_count()

concurrency_t BS::thread_pool::get_thread_count ( ) const
inline

Get the number of threads in the pool.

Returns
The number of threads.
400 {
401 return thread_count;
402 }
Here is the caller graph for this function:

◆ get_thread_ids()

std::vector< std::thread::id > BS::thread_pool::get_thread_ids ( ) const
inline

Get a vector containing the unique identifiers for each of the pool's threads, as obtained by std::thread::get_id().

Returns
The unique thread identifiers.
410 {
411 std::vector<std::thread::id> thread_ids(thread_count);
412 for (concurrency_t i = 0; i < thread_count; ++i)
413 {
414 thread_ids[i] = threads[i].get_id();
415 }
416 return thread_ids;
417 }
std::invoke_result_t< decltype(std::thread::hardware_concurrency)> concurrency_t
A convenient shorthand for the type of std::thread::hardware_concurrency(). Should evaluate to unsign...
Definition BS_thread_pool.hpp:60
Here is the caller graph for this function:

◆ purge()

void BS::thread_pool::purge ( )
inline

Purge all the tasks waiting in the queue. Tasks that are currently running will not be affected, but any tasks still waiting in the queue will be discarded, and will never be executed by the threads. Please note that there is no way to restore the purged tasks.

445 {
446 const std::scoped_lock tasks_lock(tasks_mutex);
447 while (!tasks.empty())
448 tasks.pop();
449 }
Here is the caller graph for this function:

◆ detach_task()

template<typename F >
void BS::thread_pool::detach_task ( F &&task  BS_THREAD_POOL_PRIORITY_INPUT)
inline

Submit a function with no arguments and no return value into the task queue, with the specified priority. To push a function with arguments, enclose it in a lambda expression. Does not return a future, so the user must use wait() or some other method to ensure that the task finishes executing, otherwise bad things will happen.

Template Parameters
FThe type of the function.
Parameters
taskThe function to push.
priorityThe priority of the task. Should be between -32,768 and 32,767 (a signed 16-bit integer). The default is 0. Only enabled if BS_THREAD_POOL_ENABLE_PRIORITY is defined.
460 {
461 {
462 const std::scoped_lock tasks_lock(tasks_mutex);
463 tasks.emplace(std::forward<F>(task) BS_THREAD_POOL_PRIORITY_OUTPUT);
464 }
465 task_available_cv.notify_one();
466 }
std::condition_variable task_available_cv
A condition variable to notify worker() that a new task has become available.
Definition BS_thread_pool.hpp:1109
Here is the caller graph for this function:

◆ detach_blocks()

template<typename T , typename F >
void BS::thread_pool::detach_blocks ( const T  first_index,
const T  index_after_last,
F &&  block,
const size_t  num_blocks = 0 BS_THREAD_POOL_PRIORITY_INPUT 
)
inline

Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue, with the specified priority. The block function takes two arguments, the start and end of the block, so that it is only called only once per block, but it is up to the user make sure the block function correctly deals with all the indices in each block. Does not return a multi_future, so the user must use wait() or some other method to ensure that the loop finishes executing, otherwise bad things will happen.

Template Parameters
TThe type of the indices. Should be a signed or unsigned integer.
FThe type of the function to loop through.
Parameters
first_indexThe first index in the loop.
index_after_lastThe index after the last index in the loop. The loop will iterate from first_index to (index_after_last - 1) inclusive. In other words, it will be equivalent to for (T i = first_index; i < index_after_last; ++i). Note that if index_after_last <= first_index, no blocks will be submitted.
blockA function that will be called once per block. Should take exactly two arguments: the first index in the block and the index after the last index in the block. block(start, end) should typically involve a loop of the form for (T i = start; i < end; ++i).
num_blocksThe maximum number of blocks to split the loop into. The default is 0, which means the number of blocks will be equal to the number of threads in the pool.
priorityThe priority of the tasks. Should be between -32,768 and 32,767 (a signed 16-bit integer). The default is 0. Only enabled if BS_THREAD_POOL_ENABLE_PRIORITY is defined.
481 {
482 if (index_after_last > first_index)
483 {
484 const blocks blks(first_index, index_after_last, num_blocks ? num_blocks : thread_count);
485 for (size_t blk = 0; blk < blks.get_num_blocks(); ++blk)
487 [block = std::forward<F>(block), start = blks.start(blk), end = blks.end(blk)]
488 {
489 block(start, end);
490 } BS_THREAD_POOL_PRIORITY_OUTPUT);
491 }
492 }
void detach_task(F &&task BS_THREAD_POOL_PRIORITY_INPUT)
Submit a function with no arguments and no return value into the task queue, with the specified prior...
Definition BS_thread_pool.hpp:459
Here is the call graph for this function:
Here is the caller graph for this function:

◆ detach_loop()

template<typename T , typename F >
void BS::thread_pool::detach_loop ( const T  first_index,
const T  index_after_last,
F &&  loop,
const size_t  num_blocks = 0 BS_THREAD_POOL_PRIORITY_INPUT 
)
inline

Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue, with the specified priority. The loop function takes one argument, the loop index, so that it is called many times per block. Does not return a multi_future, so the user must use wait() or some other method to ensure that the loop finishes executing, otherwise bad things will happen.

Template Parameters
TThe type of the indices. Should be a signed or unsigned integer.
FThe type of the function to loop through.
Parameters
first_indexThe first index in the loop.
index_after_lastThe index after the last index in the loop. The loop will iterate from first_index to (index_after_last - 1) inclusive. In other words, it will be equivalent to for (T i = first_index; i < index_after_last; ++i). Note that if index_after_last <= first_index, no blocks will be submitted.
loopThe function to loop through. Will be called once per index, many times per block. Should take exactly one argument: the loop index.
num_blocksThe maximum number of blocks to split the loop into. The default is 0, which means the number of blocks will be equal to the number of threads in the pool.
priorityThe priority of the tasks. Should be between -32,768 and 32,767 (a signed 16-bit integer). The default is 0. Only enabled if BS_THREAD_POOL_ENABLE_PRIORITY is defined.
507 {
508 if (index_after_last > first_index)
509 {
510 const blocks blks(first_index, index_after_last, num_blocks ? num_blocks : thread_count);
511 for (size_t blk = 0; blk < blks.get_num_blocks(); ++blk)
513 [loop = std::forward<F>(loop), start = blks.start(blk), end = blks.end(blk)]
514 {
515 for (T i = start; i < end; ++i)
516 loop(i);
517 } BS_THREAD_POOL_PRIORITY_OUTPUT);
518 }
519 }
Here is the call graph for this function:
Here is the caller graph for this function:

◆ detach_sequence()

template<typename T , typename F >
void BS::thread_pool::detach_sequence ( const T  first_index,
const T  index_after_last,
F &&sequence  BS_THREAD_POOL_PRIORITY_INPUT 
)
inline

Submit a sequence of tasks enumerated by indices to the queue, with the specified priority. Does not return a multi_future, so the user must use wait() or some other method to ensure that the sequence finishes executing, otherwise bad things will happen.

Template Parameters
TThe type of the indices. Should be a signed or unsigned integer.
FThe type of the function used to define the sequence.
Parameters
first_indexThe first index in the sequence.
index_after_lastThe index after the last index in the sequence. The sequence will iterate from first_index to (index_after_last - 1) inclusive. In other words, it will be equivalent to for (T i = first_index; i < index_after_last; ++i). Note that if index_after_last <= first_index, no tasks will be submitted.
sequenceThe function used to define the sequence. Will be called once per index. Should take exactly one argument, the index.
priorityThe priority of the tasks. Should be between -32,768 and 32,767 (a signed 16-bit integer). The default is 0. Only enabled if BS_THREAD_POOL_ENABLE_PRIORITY is defined.
533 {
534 for (T i = first_index; i < index_after_last; ++i)
536 [sequence = std::forward<F>(sequence), i]
537 {
538 sequence(i);
539 } BS_THREAD_POOL_PRIORITY_OUTPUT);
540 }
Here is the caller graph for this function:

◆ reset() [1/4]

void BS::thread_pool::reset ( )
inline

Reset the pool with the total number of hardware threads available, as reported by the implementation. Waits for all currently running tasks to be completed, then destroys all threads in the pool and creates a new thread pool with the new number of threads. Any tasks that were waiting in the queue before the pool was reset will then be executed by the new threads. If the pool was paused before resetting it, the new pool will be paused as well.

546 {
547 reset(0, [] {});
548 }
void reset()
Reset the pool with the total number of hardware threads available, as reported by the implementation...
Definition BS_thread_pool.hpp:545
Here is the caller graph for this function:

◆ reset() [2/4]

void BS::thread_pool::reset ( const concurrency_t  num_threads)
inline

Reset the pool with a new number of threads. Waits for all currently running tasks to be completed, then destroys all threads in the pool and creates a new thread pool with the new number of threads. Any tasks that were waiting in the queue before the pool was reset will then be executed by the new threads. If the pool was paused before resetting it, the new pool will be paused as well.

Parameters
num_threadsThe number of threads to use.
556 {
557 reset(num_threads, [] {});
558 }

◆ reset() [3/4]

void BS::thread_pool::reset ( const std::function< void()> &  init_task)
inline

Reset the pool with the total number of hardware threads available, as reported by the implementation, and a new initialization function. Waits for all currently running tasks to be completed, then destroys all threads in the pool and creates a new thread pool with the new number of threads and initialization function. Any tasks that were waiting in the queue before the pool was reset will then be executed by the new threads. If the pool was paused before resetting it, the new pool will be paused as well.

Parameters
init_taskAn initialization function to run in each thread before it starts to execute any submitted tasks. The function must take no arguments and have no return value. It will only be executed exactly once, when the thread is first constructed.
566 {
567 reset(0, init_task);
568 }

◆ reset() [4/4]

void BS::thread_pool::reset ( const concurrency_t  num_threads,
const std::function< void()> &  init_task 
)
inline

Reset the pool with a new number of threads and a new initialization function. Waits for all currently running tasks to be completed, then destroys all threads in the pool and creates a new thread pool with the new number of threads and initialization function. Any tasks that were waiting in the queue before the pool was reset will then be executed by the new threads. If the pool was paused before resetting it, the new pool will be paused as well.

Parameters
num_threadsThe number of threads to use.
init_taskAn initialization function to run in each thread before it starts to execute any submitted tasks. The function must take no arguments and have no return value. It will only be executed exactly once, when the thread is first constructed.
577 {
578#ifdef BS_THREAD_POOL_ENABLE_PAUSE
579 std::unique_lock tasks_lock(tasks_mutex);
580 const bool was_paused = paused;
581 paused = true;
582 tasks_lock.unlock();
583#endif
584 wait();
587 threads = std::make_unique<std::thread[]>(thread_count);
588 create_threads(init_task);
589#ifdef BS_THREAD_POOL_ENABLE_PAUSE
590 tasks_lock.lock();
591 paused = was_paused;
592#endif
593 }

◆ submit_task()

template<typename F , typename R = std::invoke_result_t<std::decay_t<F>>>
std::future< R > BS::thread_pool::submit_task ( F &&task  BS_THREAD_POOL_PRIORITY_INPUT)
inline

Submit a function with no arguments into the task queue, with the specified priority. To submit a function with arguments, enclose it in a lambda expression. If the function has a return value, get a future for the eventual returned value. If the function has no return value, get an std::future<void> which can be used to wait until the task finishes.

Template Parameters
FThe type of the function.
RThe return type of the function (can be void).
Parameters
taskThe function to submit.
priorityThe priority of the task. Should be between -32,768 and 32,767 (a signed 16-bit integer). The default is 0. Only enabled if BS_THREAD_POOL_ENABLE_PRIORITY is defined.
Returns
A future to be used later to wait for the function to finish executing and/or obtain its returned value if it has one.
606 {
607 const std::shared_ptr<std::promise<R>> task_promise = std::make_shared<std::promise<R>>();
609 [task = std::forward<F>(task), task_promise]
610 {
611#ifndef BS_THREAD_POOL_DISABLE_EXCEPTION_HANDLING
612 try
613 {
614#endif
615 if constexpr (std::is_void_v<R>)
616 {
617 task();
618 task_promise->set_value();
619 }
620 else
621 {
622 task_promise->set_value(task());
623 }
624#ifndef BS_THREAD_POOL_DISABLE_EXCEPTION_HANDLING
625 }
626 catch (...)
627 {
628 try
629 {
630 task_promise->set_exception(std::current_exception());
631 }
632 catch (...)
633 {
634 }
635 }
636#endif
637 } BS_THREAD_POOL_PRIORITY_OUTPUT);
638 return task_promise->get_future();
639 }
Here is the caller graph for this function:

◆ submit_blocks()

template<typename T , typename F , typename R = std::invoke_result_t<std::decay_t<F>, T, T>>
multi_future< R > BS::thread_pool::submit_blocks ( const T  first_index,
const T  index_after_last,
F &&  block,
const size_t  num_blocks = 0 BS_THREAD_POOL_PRIORITY_INPUT 
)
inline

Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue, with the specified priority. The block function takes two arguments, the start and end of the block, so that it is only called only once per block, but it is up to the user make sure the block function correctly deals with all the indices in each block. Returns a multi_future that contains the futures for all of the blocks.

Template Parameters
TThe type of the indices. Should be a signed or unsigned integer.
FThe type of the function to loop through.
RThe return type of the function to loop through (can be void).
Parameters
first_indexThe first index in the loop.
index_after_lastThe index after the last index in the loop. The loop will iterate from first_index to (index_after_last - 1) inclusive. In other words, it will be equivalent to for (T i = first_index; i < index_after_last; ++i). Note that if index_after_last <= first_index, no blocks will be submitted, and an empty multi_future will be returned.
blockA function that will be called once per block. Should take exactly two arguments: the first index in the block and the index after the last index in the block. block(start, end) should typically involve a loop of the form for (T i = start; i < end; ++i).
num_blocksThe maximum number of blocks to split the loop into. The default is 0, which means the number of blocks will be equal to the number of threads in the pool.
priorityThe priority of the tasks. Should be between -32,768 and 32,767 (a signed 16-bit integer). The default is 0. Only enabled if BS_THREAD_POOL_ENABLE_PRIORITY is defined.
Returns
A multi_future that can be used to wait for all the blocks to finish. If the block function returns a value, the multi_future can also be used to obtain the values returned by each block.
656 {
657 if (index_after_last > first_index)
658 {
659 const blocks blks(first_index, index_after_last, num_blocks ? num_blocks : thread_count);
660 multi_future<R> future;
661 future.reserve(blks.get_num_blocks());
662 for (size_t blk = 0; blk < blks.get_num_blocks(); ++blk)
663 future.push_back(submit_task(
664 [block = std::forward<F>(block), start = blks.start(blk), end = blks.end(blk)]
665 {
666 return block(start, end);
667 } BS_THREAD_POOL_PRIORITY_OUTPUT));
668 return future;
669 }
670 return {};
671 }
std::future< R > submit_task(F &&task BS_THREAD_POOL_PRIORITY_INPUT)
Submit a function with no arguments into the task queue, with the specified priority....
Definition BS_thread_pool.hpp:605
Here is the call graph for this function:
Here is the caller graph for this function:

◆ submit_loop()

template<typename T , typename F >
multi_future< void > BS::thread_pool::submit_loop ( const T  first_index,
const T  index_after_last,
F &&  loop,
const size_t  num_blocks = 0 BS_THREAD_POOL_PRIORITY_INPUT 
)
inline

Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue, with the specified priority. The loop function takes one argument, the loop index, so that it is called many times per block. It must have no return value. Returns a multi_future that contains the futures for all of the blocks.

Template Parameters
TThe type of the indices. Should be a signed or unsigned integer.
FThe type of the function to loop through.
Parameters
first_indexThe first index in the loop.
index_after_lastThe index after the last index in the loop. The loop will iterate from first_index to (index_after_last - 1) inclusive. In other words, it will be equivalent to for (T i = first_index; i < index_after_last; ++i). Note that if index_after_last <= first_index, no tasks will be submitted, and an empty multi_future will be returned.
loopThe function to loop through. Will be called once per index, many times per block. Should take exactly one argument: the loop index. It cannot have a return value.
num_blocksThe maximum number of blocks to split the loop into. The default is 0, which means the number of blocks will be equal to the number of threads in the pool.
priorityThe priority of the tasks. Should be between -32,768 and 32,767 (a signed 16-bit integer). The default is 0. Only enabled if BS_THREAD_POOL_ENABLE_PRIORITY is defined.
Returns
A multi_future that can be used to wait for all the blocks to finish.
687 {
688 if (index_after_last > first_index)
689 {
690 const blocks blks(first_index, index_after_last, num_blocks ? num_blocks : thread_count);
691 multi_future<void> future;
692 future.reserve(blks.get_num_blocks());
693 for (size_t blk = 0; blk < blks.get_num_blocks(); ++blk)
694 future.push_back(submit_task(
695 [loop = std::forward<F>(loop), start = blks.start(blk), end = blks.end(blk)]
696 {
697 for (T i = start; i < end; ++i)
698 loop(i);
699 } BS_THREAD_POOL_PRIORITY_OUTPUT));
700 return future;
701 }
702 return {};
703 }
Here is the call graph for this function:
Here is the caller graph for this function:

◆ submit_sequence()

template<typename T , typename F , typename R = std::invoke_result_t<std::decay_t<F>, T>>
multi_future< R > BS::thread_pool::submit_sequence ( const T  first_index,
const T  index_after_last,
F &&sequence  BS_THREAD_POOL_PRIORITY_INPUT 
)
inline

Submit a sequence of tasks enumerated by indices to the queue, with the specified priority. Returns a multi_future that contains the futures for all of the tasks.

Template Parameters
TThe type of the indices. Should be a signed or unsigned integer.
FThe type of the function used to define the sequence.
RThe return type of the function used to define the sequence (can be void).
Parameters
first_indexThe first index in the sequence.
index_after_lastThe index after the last index in the sequence. The sequence will iterate from first_index to (index_after_last - 1) inclusive. In other words, it will be equivalent to for (T i = first_index; i < index_after_last; ++i). Note that if index_after_last <= first_index, no tasks will be submitted, and an empty multi_future will be returned.
sequenceThe function used to define the sequence. Will be called once per index. Should take exactly one argument, the index.
priorityThe priority of the tasks. Should be between -32,768 and 32,767 (a signed 16-bit integer). The default is 0. Only enabled if BS_THREAD_POOL_ENABLE_PRIORITY is defined.
Returns
A multi_future that can be used to wait for all the tasks to finish. If the sequence function returns a value, the multi_future can also be used to obtain the values returned by each task.
719 {
720 if (index_after_last > first_index)
721 {
722 multi_future<R> future;
723 future.reserve(static_cast<size_t>(index_after_last - first_index));
724 for (T i = first_index; i < index_after_last; ++i)
725 future.push_back(submit_task(
726 [sequence = std::forward<F>(sequence), i]
727 {
728 return sequence(i);
729 } BS_THREAD_POOL_PRIORITY_OUTPUT));
730 return future;
731 }
732 return {};
733 }
Here is the caller graph for this function:

◆ wait()

void BS::thread_pool::wait ( )
inline

Wait for tasks to be completed. Normally, this function waits for all tasks, both those that are currently running in the threads and those that are still waiting in the queue. However, if the pool is paused, this function only waits for the currently running tasks (otherwise it would wait forever). Note: To wait for just one specific task, use submit_task() instead, and call the wait() member function of the generated future.

Exceptions
`wait_deadlock`if called from within a thread of the same pool, which would result in a deadlock. Only enabled if BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK is defined.
762 {
763#ifdef BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK
764 if (this_thread::get_pool() == this)
765 throw wait_deadlock();
766#endif
767 std::unique_lock tasks_lock(tasks_mutex);
768 waiting = true;
769 tasks_done_cv.wait(tasks_lock,
770 [this]
771 {
772 return (tasks_running == 0) && BS_THREAD_POOL_PAUSED_OR_EMPTY;
773 });
774 waiting = false;
775 }
std::condition_variable tasks_done_cv
A condition variable to notify wait() that the tasks are done.
Definition BS_thread_pool.hpp:1114
bool waiting
A flag indicating that wait() is active and expects to be notified whenever a task is done.
Definition BS_thread_pool.hpp:1148
thread_local thread_info_pool get_pool
A thread_local object used to obtain information about the thread pool that owns the current thread.
Definition BS_thread_pool.hpp:159
Here is the caller graph for this function:

◆ wait_for()

template<typename R , typename P >
bool BS::thread_pool::wait_for ( const std::chrono::duration< R, P > &  duration)
inline

Wait for tasks to be completed, but stop waiting after the specified duration has passed.

Template Parameters
RAn arithmetic type representing the number of ticks to wait.
PAn std::ratio representing the length of each tick in seconds.
Parameters
durationThe amount of time to wait.
Returns
true if all tasks finished running, false if the duration expired but some tasks are still running.
Exceptions
`wait_deadlock`if called from within a thread of the same pool, which would result in a deadlock. Only enabled if BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK is defined.
789 {
790#ifdef BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK
791 if (this_thread::get_pool() == this)
792 throw wait_deadlock();
793#endif
794 std::unique_lock tasks_lock(tasks_mutex);
795 waiting = true;
796 const bool status = tasks_done_cv.wait_for(tasks_lock, duration,
797 [this]
798 {
799 return (tasks_running == 0) && BS_THREAD_POOL_PAUSED_OR_EMPTY;
800 });
801 waiting = false;
802 return status;
803 }
Here is the caller graph for this function:

◆ wait_until()

template<typename C , typename D >
bool BS::thread_pool::wait_until ( const std::chrono::time_point< C, D > &  timeout_time)
inline

Wait for tasks to be completed, but stop waiting after the specified time point has been reached.

Template Parameters
CThe type of the clock used to measure time.
DAn std::chrono::duration type used to indicate the time point.
Parameters
timeout_timeThe time point at which to stop waiting.
Returns
true if all tasks finished running, false if the time point was reached but some tasks are still running.
Exceptions
`wait_deadlock`if called from within a thread of the same pool, which would result in a deadlock. Only enabled if BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK is defined.
817 {
818#ifdef BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK
819 if (this_thread::get_pool() == this)
820 throw wait_deadlock();
821#endif
822 std::unique_lock tasks_lock(tasks_mutex);
823 waiting = true;
824 const bool status = tasks_done_cv.wait_until(tasks_lock, timeout_time,
825 [this]
826 {
827 return (tasks_running == 0) && BS_THREAD_POOL_PAUSED_OR_EMPTY;
828 });
829 waiting = false;
830 return status;
831 }
Here is the caller graph for this function:

◆ create_threads()

void BS::thread_pool::create_threads ( const std::function< void()> &  init_task)
inlineprivate

Create the threads in the pool and assign a worker to each thread.

Parameters
init_taskAn initialization function to run in each thread before it starts to execute any submitted tasks.
858 {
859 {
860 const std::scoped_lock tasks_lock(tasks_mutex);
862 workers_running = true;
863 }
864 for (concurrency_t i = 0; i < thread_count; ++i)
865 {
866 threads[i] = std::thread(&thread_pool::worker, this, i, init_task);
867 }
868 }
bool workers_running
A flag indicating to the workers to keep running. When set to false, the workers terminate permanentl...
Definition BS_thread_pool.hpp:1153
void worker(const concurrency_t idx, const std::function< void()> &init_task)
A worker function to be assigned to each thread in the pool. Waits until it is notified by detach_tas...
Definition BS_thread_pool.hpp:907

◆ destroy_threads()

void BS::thread_pool::destroy_threads ( )
inlineprivate

Destroy the threads in the pool.

874 {
875 {
876 const std::scoped_lock tasks_lock(tasks_mutex);
877 workers_running = false;
878 }
879 task_available_cv.notify_all();
880 for (concurrency_t i = 0; i < thread_count; ++i)
881 {
882 threads[i].join();
883 }
884 }

◆ determine_thread_count()

static concurrency_t BS::thread_pool::determine_thread_count ( const concurrency_t  num_threads)
inlinestaticprivate

Determine how many threads the pool should have, based on the parameter passed to the constructor or reset().

Parameters
num_threadsThe parameter passed to the constructor or reset(). If the parameter is a positive number, then the pool will be created with this number of threads. If the parameter is non-positive, or a parameter was not supplied (in which case it will have the default value of 0), then the pool will be created with the total number of hardware threads available, as obtained from std::thread::hardware_concurrency(). If the latter returns zero for some reason, then the pool will be created with just one thread.
Returns
The number of threads to use for constructing the pool.
893 {
894 if (num_threads > 0)
895 return num_threads;
896 if (std::thread::hardware_concurrency() > 0)
897 return std::thread::hardware_concurrency();
898 return 1;
899 }

◆ worker()

void BS::thread_pool::worker ( const concurrency_t  idx,
const std::function< void()> &  init_task 
)
inlineprivate

A worker function to be assigned to each thread in the pool. Waits until it is notified by detach_task() that a task is available, and then retrieves the task from the queue and executes it. Once the task finishes, the worker notifies wait() in case it is waiting.

Parameters
idxThe index of this thread.
init_taskAn initialization function to run in this thread before it starts to execute any submitted tasks.
908 {
911 init_task();
912 std::unique_lock tasks_lock(tasks_mutex);
913 while (true)
914 {
916 tasks_lock.unlock();
917 if (waiting && (tasks_running == 0) && BS_THREAD_POOL_PAUSED_OR_EMPTY)
918 tasks_done_cv.notify_all();
919 tasks_lock.lock();
920 task_available_cv.wait(tasks_lock,
921 [this]
922 {
923 return !BS_THREAD_POOL_PAUSED_OR_EMPTY || !workers_running;
924 });
925 if (!workers_running)
926 break;
927 {
928#ifdef BS_THREAD_POOL_ENABLE_PRIORITY
929 const std::function<void()> task = std::move(std::remove_const_t<pr_task&>(tasks.top()).task);
930 tasks.pop();
931#else
932 const std::function<void()> task = std::move(tasks.front());
933 tasks.pop();
934#endif
936 tasks_lock.unlock();
937 task();
938 }
939 tasks_lock.lock();
940 }
941 this_thread::get_index.index = std::nullopt;
942 this_thread::get_pool.pool = std::nullopt;
943 }
optional_index index
The index of the current thread.
Definition BS_thread_pool.hpp:123
optional_pool pool
A pointer to the thread pool that owns the current thread.
Definition BS_thread_pool.hpp:148
thread_local thread_info_index get_index
A thread_local object used to obtain information about the index of the current thread.
Definition BS_thread_pool.hpp:154

Member Data Documentation

◆ task_available_cv

std::condition_variable BS::thread_pool::task_available_cv = {}
private

A condition variable to notify worker() that a new task has become available.

1109{};

◆ tasks_done_cv

std::condition_variable BS::thread_pool::tasks_done_cv = {}
private

A condition variable to notify wait() that the tasks are done.

1114{};

◆ tasks

std::queue<std::function<void()> > BS::thread_pool::tasks = {}
private

A queue of tasks to be executed by the threads.

1122{};

◆ tasks_mutex

std::mutex BS::thread_pool::tasks_mutex = {}
mutableprivate

A mutex to synchronize access to the task queue by different threads.

1133{};

The documentation for this class was generated from the following file: