1#ifndef BS_THREAD_POOL_HPP
2#define BS_THREAD_POOL_HPP
13#ifndef __cpp_exceptions
14 #define BS_THREAD_POOL_DISABLE_EXCEPTION_HANDLING
15 #undef BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK
19#include <condition_variable>
21#ifdef BS_THREAD_POOL_ENABLE_PRIORITY
24#ifndef BS_THREAD_POOL_DISABLE_EXCEPTION_HANDLING
33#ifdef BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK
46#define BS_THREAD_POOL_VERSION_MAJOR 4
47#define BS_THREAD_POOL_VERSION_MINOR 1
48#define BS_THREAD_POOL_VERSION_PATCH 0
60using concurrency_t = std::invoke_result_t<
decltype(std::thread::hardware_concurrency)>;
62#ifdef BS_THREAD_POOL_ENABLE_PRIORITY
66using priority_t = std::int_least16_t;
72 constexpr priority_t highest = 32767;
73 constexpr priority_t high = 16383;
74 constexpr priority_t normal = 0;
75 constexpr priority_t low = -16384;
76 constexpr priority_t lowest = -32768;
80 #define BS_THREAD_POOL_PRIORITY_INPUT , const priority_t priority = 0
81 #define BS_THREAD_POOL_PRIORITY_OUTPUT , priority
83 #define BS_THREAD_POOL_PRIORITY_INPUT
84 #define BS_THREAD_POOL_PRIORITY_OUTPUT
90namespace this_thread {
172 using std::vector<std::future<T>>::vector;
187 [[nodiscard]] std::conditional_t<std::is_void_v<T>, void, std::vector<T>>
get()
189 if constexpr (std::is_void_v<T>)
191 for (std::future<T>& future : *
this)
197 std::vector<T> results;
198 results.reserve(this->
size());
199 for (std::future<T>& future : *
this)
200 results.push_back(future.get());
213 for (
const std::future<T>& future : *
this)
215 if (future.wait_for(std::chrono::duration<double>::zero()) == std::future_status::ready)
228 bool is_valid =
true;
229 for (
const std::future<T>& future : *
this)
230 is_valid = is_valid && future.valid();
239 for (
const std::future<T>& future : *
this)
251 template <
typename R,
typename P>
252 bool wait_for(
const std::chrono::duration<R, P>& duration)
const
254 const std::chrono::time_point<std::chrono::steady_clock> start_time = std::chrono::steady_clock::now();
255 for (
const std::future<T>& future : *
this)
257 future.wait_for(duration - (std::chrono::steady_clock::now() - start_time));
258 if (duration < std::chrono::steady_clock::now() - start_time)
272 template <
typename C,
typename D>
273 bool wait_until(
const std::chrono::time_point<C, D>& timeout_time)
const
275 for (
const std::future<T>& future : *
this)
277 future.wait_until(timeout_time);
278 if (timeout_time < std::chrono::steady_clock::now())
320 thread_pool(
const concurrency_t num_threads,
const std::function<
void()>& init_task) : thread_count(determine_thread_count(num_threads)), threads(
std::make_unique<
std::thread[]>(determine_thread_count(num_threads)))
322 create_threads(init_task);
344#ifdef BS_THREAD_POOL_ENABLE_NATIVE_HANDLES
350 [[nodiscard]] std::vector<std::thread::native_handle_type> get_native_handles()
const
352 std::vector<std::thread::native_handle_type> native_handles(thread_count);
355 native_handles[i] = threads[i].native_handle();
357 return native_handles;
368 const std::scoped_lock tasks_lock(tasks_mutex);
379 const std::scoped_lock tasks_lock(tasks_mutex);
380 return tasks_running;
390 const std::scoped_lock tasks_lock(tasks_mutex);
391 return tasks_running + tasks.size();
411 std::vector<std::thread::id> thread_ids(thread_count);
414 thread_ids[i] = threads[i].get_id();
419#ifdef BS_THREAD_POOL_ENABLE_PAUSE
425 [[nodiscard]]
bool is_paused()
const
427 const std::scoped_lock tasks_lock(tasks_mutex);
436 const std::scoped_lock tasks_lock(tasks_mutex);
446 const std::scoped_lock tasks_lock(tasks_mutex);
447 while (!tasks.empty())
458 template <
typename F>
462 const std::scoped_lock tasks_lock(tasks_mutex);
463 tasks.emplace(std::forward<F>(task) BS_THREAD_POOL_PRIORITY_OUTPUT);
465 task_available_cv.notify_one();
479 template <
typename T,
typename F>
480 void detach_blocks(
const T first_index,
const T index_after_last, F&& block,
const size_t num_blocks = 0 BS_THREAD_POOL_PRIORITY_INPUT)
482 if (index_after_last > first_index)
484 const blocks blks(first_index, index_after_last, num_blocks ? num_blocks : thread_count);
487 [block = std::forward<F>(block), start = blks.
start(blk), end = blks.
end(blk)]
490 } BS_THREAD_POOL_PRIORITY_OUTPUT);
505 template <
typename T,
typename F>
506 void detach_loop(
const T first_index,
const T index_after_last, F&& loop,
const size_t num_blocks = 0 BS_THREAD_POOL_PRIORITY_INPUT)
508 if (index_after_last > first_index)
510 const blocks blks(first_index, index_after_last, num_blocks ? num_blocks : thread_count);
513 [loop = std::forward<F>(loop), start = blks.
start(blk), end = blks.
end(blk)]
515 for (T i = start; i < end; ++i)
517 } BS_THREAD_POOL_PRIORITY_OUTPUT);
531 template <
typename T,
typename F>
532 void detach_sequence(
const T first_index,
const T index_after_last, F&& sequence BS_THREAD_POOL_PRIORITY_INPUT)
534 for (T i = first_index; i < index_after_last; ++i)
536 [sequence = std::forward<F>(sequence), i]
539 } BS_THREAD_POOL_PRIORITY_OUTPUT);
557 reset(num_threads, [] {});
565 void reset(
const std::function<
void()>& init_task)
578#ifdef BS_THREAD_POOL_ENABLE_PAUSE
579 std::unique_lock tasks_lock(tasks_mutex);
580 const bool was_paused = paused;
586 thread_count = determine_thread_count(num_threads);
587 threads = std::make_unique<std::thread[]>(thread_count);
588 create_threads(init_task);
589#ifdef BS_THREAD_POOL_ENABLE_PAUSE
604 template <
typename F,
typename R = std::invoke_result_t<std::decay_t<F>>>
605 [[nodiscard]] std::future<R>
submit_task(F&& task BS_THREAD_POOL_PRIORITY_INPUT)
607 const std::shared_ptr<std::promise<R>> task_promise = std::make_shared<std::promise<R>>();
609 [task = std::forward<F>(task), task_promise]
611#ifndef BS_THREAD_POOL_DISABLE_EXCEPTION_HANDLING
615 if constexpr (std::is_void_v<R>)
618 task_promise->set_value();
622 task_promise->set_value(task());
624#ifndef BS_THREAD_POOL_DISABLE_EXCEPTION_HANDLING
630 task_promise->set_exception(std::current_exception());
637 } BS_THREAD_POOL_PRIORITY_OUTPUT);
638 return task_promise->get_future();
654 template <
typename T,
typename F,
typename R = std::invoke_result_t<std::decay_t<F>, T, T>>
655 [[nodiscard]]
multi_future<R> submit_blocks(
const T first_index,
const T index_after_last, F&& block,
const size_t num_blocks = 0 BS_THREAD_POOL_PRIORITY_INPUT)
657 if (index_after_last > first_index)
659 const blocks blks(first_index, index_after_last, num_blocks ? num_blocks : thread_count);
663 future.push_back(submit_task(
664 [block = std::forward<F>(block), start = blks.
start(blk), end = blks.
end(blk)]
666 return block(start, end);
667 } BS_THREAD_POOL_PRIORITY_OUTPUT));
685 template <
typename T,
typename F>
686 [[nodiscard]]
multi_future<void> submit_loop(
const T first_index,
const T index_after_last, F&& loop,
const size_t num_blocks = 0 BS_THREAD_POOL_PRIORITY_INPUT)
688 if (index_after_last > first_index)
690 const blocks blks(first_index, index_after_last, num_blocks ? num_blocks : thread_count);
694 future.push_back(submit_task(
695 [loop = std::forward<F>(loop), start = blks.
start(blk), end = blks.
end(blk)]
697 for (T i = start; i < end; ++i)
699 } BS_THREAD_POOL_PRIORITY_OUTPUT));
717 template <
typename T,
typename F,
typename R = std::invoke_result_t<std::decay_t<F>, T>>
720 if (index_after_last > first_index)
723 future.reserve(
static_cast<size_t>(index_after_last - first_index));
724 for (T i = first_index; i < index_after_last; ++i)
725 future.push_back(submit_task(
726 [sequence = std::forward<F>(sequence), i]
729 } BS_THREAD_POOL_PRIORITY_OUTPUT));
735#ifdef BS_THREAD_POOL_ENABLE_PAUSE
742 const std::scoped_lock tasks_lock(tasks_mutex);
745 task_available_cv.notify_all();
750#ifdef BS_THREAD_POOL_ENABLE_PAUSE
751 #define BS_THREAD_POOL_PAUSED_OR_EMPTY (paused || tasks.empty())
753 #define BS_THREAD_POOL_PAUSED_OR_EMPTY tasks.empty()
763#ifdef BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK
764 if (this_thread::get_pool() ==
this)
765 throw wait_deadlock();
767 std::unique_lock tasks_lock(tasks_mutex);
769 tasks_done_cv.wait(tasks_lock,
772 return (tasks_running == 0) && BS_THREAD_POOL_PAUSED_OR_EMPTY;
787 template <
typename R,
typename P>
788 bool wait_for(
const std::chrono::duration<R, P>& duration)
790#ifdef BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK
791 if (this_thread::get_pool() ==
this)
792 throw wait_deadlock();
794 std::unique_lock tasks_lock(tasks_mutex);
796 const bool status = tasks_done_cv.wait_for(tasks_lock, duration,
799 return (tasks_running == 0) && BS_THREAD_POOL_PAUSED_OR_EMPTY;
815 template <
typename C,
typename D>
816 bool wait_until(
const std::chrono::time_point<C, D>& timeout_time)
818#ifdef BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK
819 if (this_thread::get_pool() ==
this)
820 throw wait_deadlock();
822 std::unique_lock tasks_lock(tasks_mutex);
824 const bool status = tasks_done_cv.wait_until(tasks_lock, timeout_time,
827 return (tasks_running == 0) && BS_THREAD_POOL_PAUSED_OR_EMPTY;
833#ifdef BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK
841 struct wait_deadlock :
public std::runtime_error
843 wait_deadlock() :
std::runtime_error(
"BS::thread_pool::wait_deadlock"){};
860 const std::scoped_lock tasks_lock(tasks_mutex);
861 tasks_running = thread_count;
862 workers_running =
true;
866 threads[i] = std::thread(&thread_pool::worker,
this, i, init_task);
876 const std::scoped_lock tasks_lock(tasks_mutex);
877 workers_running =
false;
879 task_available_cv.notify_all();
896 if (std::thread::hardware_concurrency() > 0)
897 return std::thread::hardware_concurrency();
909 this_thread::get_index.index = idx;
910 this_thread::get_pool.pool =
this;
912 std::unique_lock tasks_lock(tasks_mutex);
917 if (waiting && (tasks_running == 0) && BS_THREAD_POOL_PAUSED_OR_EMPTY)
918 tasks_done_cv.notify_all();
920 task_available_cv.wait(tasks_lock,
923 return !BS_THREAD_POOL_PAUSED_OR_EMPTY || !workers_running;
925 if (!workers_running)
928#ifdef BS_THREAD_POOL_ENABLE_PRIORITY
929 const std::function<void()> task = std::move(std::remove_const_t<pr_task&>(tasks.top()).task);
932 const std::function<void()> task = std::move(tasks.front());
941 this_thread::get_index.index = std::nullopt;
942 this_thread::get_pool.pool = std::nullopt;
954 template <
typename T>
965 blocks(
const T first_index_,
const T index_after_last_,
const size_t num_blocks_) : first_index(first_index_), index_after_last(index_after_last_), num_blocks(num_blocks_)
967 if (index_after_last > first_index)
969 const size_t total_size =
static_cast<size_t>(index_after_last - first_index);
970 if (num_blocks > total_size)
971 num_blocks = total_size;
972 block_size = total_size / num_blocks;
973 remainder = total_size % num_blocks;
977 num_blocks = (total_size > 1) ? total_size : 1;
992 [[nodiscard]] T
start(
const size_t block)
const
994 return first_index +
static_cast<T
>(block * block_size) +
static_cast<T
>(block < remainder ? block : remainder);
1003 [[nodiscard]] T
end(
const size_t block)
const
1005 return (block == num_blocks - 1) ? index_after_last : start(block + 1);
1022 size_t block_size = 0;
1032 T index_after_last = 0;
1037 size_t num_blocks = 0;
1042 size_t remainder = 0;
1045#ifdef BS_THREAD_POOL_ENABLE_PRIORITY
1049 class [[nodiscard]] pr_task
1060 explicit pr_task(
const std::function<
void()>& task_,
const priority_t priority_ = 0) : task(task_), priority(priority_) {}
1068 explicit pr_task(std::function<
void()>&& task_,
const priority_t priority_ = 0) : task(
std::move(task_)), priority(priority_) {}
1077 [[nodiscard]]
friend bool operator<(
const pr_task& lhs,
const pr_task& rhs)
1079 return lhs.priority < rhs.priority;
1086 std::function<void()> task = {};
1091 priority_t priority = 0;
1099#ifdef BS_THREAD_POOL_ENABLE_PAUSE
1103 bool paused =
false;
1109 std::condition_variable task_available_cv = {};
1114 std::condition_variable tasks_done_cv = {};
1119#ifdef BS_THREAD_POOL_ENABLE_PRIORITY
1120 std::priority_queue<pr_task> tasks = {};
1122 std::queue<std::function<void()>> tasks = {};
1128 size_t tasks_running = 0;
1133 mutable std::mutex tasks_mutex = {};
1143 std::unique_ptr<std::thread[]> threads =
nullptr;
1148 bool waiting =
false;
1153 bool workers_running =
false;
A helper class to facilitate waiting for and/or getting the results of multiple futures at once.
Definition BS_thread_pool.hpp:169
void wait() const
Wait for all the futures stored in this multi_future.
Definition BS_thread_pool.hpp:237
size_t ready_count() const
Check how many of the futures stored in this multi_future are ready.
Definition BS_thread_pool.hpp:210
bool wait_for(const std::chrono::duration< R, P > &duration) const
Wait for all the futures stored in this multi_future, but stop waiting after the specified duration h...
Definition BS_thread_pool.hpp:252
bool valid() const
Check if all the futures stored in this multi_future are valid.
Definition BS_thread_pool.hpp:226
std::conditional_t< std::is_void_v< T >, void, std::vector< T > > get()
Get the results from all the futures stored in this multi_future, rethrowing any stored exceptions.
Definition BS_thread_pool.hpp:187
bool wait_until(const std::chrono::time_point< C, D > &timeout_time) const
Wait for all the futures stored in this multi_future, but stop waiting after the specified time point...
Definition BS_thread_pool.hpp:273
A helper class to store information about the index of the current thread.
Definition BS_thread_pool.hpp:105
optional_index operator()() const
Get the index of the current thread. If this thread belongs to a BS::thread_pool object,...
Definition BS_thread_pool.hpp:114
A helper class to store information about the thread pool that owns the current thread.
Definition BS_thread_pool.hpp:130
optional_pool operator()() const
Get the pointer to the thread pool that owns the current thread. If this thread belongs to a BS::thre...
Definition BS_thread_pool.hpp:139
A helper class to divide a range into blocks. Used by detach_blocks(), submit_blocks(),...
Definition BS_thread_pool.hpp:956
T start(const size_t block) const
Get the first index of a block.
Definition BS_thread_pool.hpp:992
blocks(const T first_index_, const T index_after_last_, const size_t num_blocks_)
Construct a blocks object with the given specifications.
Definition BS_thread_pool.hpp:965
T end(const size_t block) const
Get the index after the last index of a block.
Definition BS_thread_pool.hpp:1003
size_t get_num_blocks() const
Get the number of blocks. Note that this may be different than the desired number of blocks that was ...
Definition BS_thread_pool.hpp:1013
A fast, lightweight, and easy-to-use C++17 thread pool class.
Definition BS_thread_pool.hpp:289
multi_future< R > submit_blocks(const T first_index, const T index_after_last, F &&block, const size_t num_blocks=0 BS_THREAD_POOL_PRIORITY_INPUT)
Parallelize a loop by automatically splitting it into blocks and submitting each block separately to ...
Definition BS_thread_pool.hpp:655
void worker(const concurrency_t idx, const std::function< void()> &init_task)
A worker function to be assigned to each thread in the pool. Waits until it is notified by detach_tas...
Definition BS_thread_pool.hpp:907
size_t get_tasks_running() const
Get the number of tasks currently being executed by the threads.
Definition BS_thread_pool.hpp:377
static concurrency_t determine_thread_count(const concurrency_t num_threads)
Determine how many threads the pool should have, based on the parameter passed to the constructor or ...
Definition BS_thread_pool.hpp:892
multi_future< R > submit_sequence(const T first_index, const T index_after_last, F &&sequence BS_THREAD_POOL_PRIORITY_INPUT)
Submit a sequence of tasks enumerated by indices to the queue, with the specified priority....
Definition BS_thread_pool.hpp:718
thread_pool(const concurrency_t num_threads, const std::function< void()> &init_task)
Construct a new thread pool with the specified number of threads and initialization function.
Definition BS_thread_pool.hpp:320
void create_threads(const std::function< void()> &init_task)
Create the threads in the pool and assign a worker to each thread.
Definition BS_thread_pool.hpp:857
~thread_pool()
Destruct the thread pool. Waits for all tasks to complete, then destroys all threads....
Definition BS_thread_pool.hpp:334
void reset(const concurrency_t num_threads)
Reset the pool with a new number of threads. Waits for all currently running tasks to be completed,...
Definition BS_thread_pool.hpp:555
void reset(const concurrency_t num_threads, const std::function< void()> &init_task)
Reset the pool with a new number of threads and a new initialization function. Waits for all currentl...
Definition BS_thread_pool.hpp:576
bool wait_until(const std::chrono::time_point< C, D > &timeout_time)
Wait for tasks to be completed, but stop waiting after the specified time point has been reached.
Definition BS_thread_pool.hpp:816
void reset(const std::function< void()> &init_task)
Reset the pool with the total number of hardware threads available, as reported by the implementation...
Definition BS_thread_pool.hpp:565
thread_pool()
Construct a new thread pool. The number of threads will be the total number of hardware threads avail...
Definition BS_thread_pool.hpp:298
void detach_sequence(const T first_index, const T index_after_last, F &&sequence BS_THREAD_POOL_PRIORITY_INPUT)
Submit a sequence of tasks enumerated by indices to the queue, with the specified priority....
Definition BS_thread_pool.hpp:532
thread_pool(const concurrency_t num_threads)
Construct a new thread pool with the specified number of threads.
Definition BS_thread_pool.hpp:305
void reset()
Reset the pool with the total number of hardware threads available, as reported by the implementation...
Definition BS_thread_pool.hpp:545
void destroy_threads()
Destroy the threads in the pool.
Definition BS_thread_pool.hpp:873
void wait()
Wait for tasks to be completed. Normally, this function waits for all tasks, both those that are curr...
Definition BS_thread_pool.hpp:761
size_t get_tasks_queued() const
Get the number of tasks currently waiting in the queue to be executed by the threads.
Definition BS_thread_pool.hpp:366
void purge()
Purge all the tasks waiting in the queue. Tasks that are currently running will not be affected,...
Definition BS_thread_pool.hpp:444
void detach_task(F &&task BS_THREAD_POOL_PRIORITY_INPUT)
Submit a function with no arguments and no return value into the task queue, with the specified prior...
Definition BS_thread_pool.hpp:459
void detach_blocks(const T first_index, const T index_after_last, F &&block, const size_t num_blocks=0 BS_THREAD_POOL_PRIORITY_INPUT)
Parallelize a loop by automatically splitting it into blocks and submitting each block separately to ...
Definition BS_thread_pool.hpp:480
thread_pool(const std::function< void()> &init_task)
Construct a new thread pool with the specified initialization function.
Definition BS_thread_pool.hpp:312
void detach_loop(const T first_index, const T index_after_last, F &&loop, const size_t num_blocks=0 BS_THREAD_POOL_PRIORITY_INPUT)
Parallelize a loop by automatically splitting it into blocks and submitting each block separately to ...
Definition BS_thread_pool.hpp:506
std::future< R > submit_task(F &&task BS_THREAD_POOL_PRIORITY_INPUT)
Submit a function with no arguments into the task queue, with the specified priority....
Definition BS_thread_pool.hpp:605
std::vector< std::thread::id > get_thread_ids() const
Get a vector containing the unique identifiers for each of the pool's threads, as obtained by std::th...
Definition BS_thread_pool.hpp:409
bool wait_for(const std::chrono::duration< R, P > &duration)
Wait for tasks to be completed, but stop waiting after the specified duration has passed.
Definition BS_thread_pool.hpp:788
size_t get_tasks_total() const
Get the total number of unfinished tasks: either still waiting in the queue, or running in a thread....
Definition BS_thread_pool.hpp:388
concurrency_t get_thread_count() const
Get the number of threads in the pool.
Definition BS_thread_pool.hpp:399
multi_future< void > submit_loop(const T first_index, const T index_after_last, F &&loop, const size_t num_blocks=0 BS_THREAD_POOL_PRIORITY_INPUT)
Parallelize a loop by automatically splitting it into blocks and submitting each block separately to ...
Definition BS_thread_pool.hpp:686
softfloat & operator=(const softfloat &c)
__device__ __forceinline__ uchar1 operator<(const uchar1 &a, const uchar1 &b)
std::optional< thread_pool * > optional_pool
A type returned by BS::this_thread::get_pool() which can optionally contain the pointer to the pool t...
Definition BS_thread_pool.hpp:99
std::optional< size_t > optional_index
A type returned by BS::this_thread::get_index() which can optionally contain the index of a thread,...
Definition BS_thread_pool.hpp:94
thread_local thread_info_pool get_pool
A thread_local object used to obtain information about the thread pool that owns the current thread.
Definition BS_thread_pool.hpp:159
thread_local thread_info_index get_index
A thread_local object used to obtain information about the index of the current thread.
Definition BS_thread_pool.hpp:154
A namespace used by Barak Shoshany's projects.
Definition BS_thread_pool.hpp:44
std::invoke_result_t< decltype(std::thread::hardware_concurrency)> concurrency_t
A convenient shorthand for the type of std::thread::hardware_concurrency(). Should evaluate to unsign...
Definition BS_thread_pool.hpp:60
std::size_t size_t
A type to represent the size of things.
Definition BS_thread_pool.hpp:55
GOpaque< Size > size(const GMat &src)