Autonomy Software C++ 24.5.1
Welcome to the Autonomy Software repository of the Mars Rover Design Team (MRDT) at Missouri University of Science and Technology (Missouri S&T)! API reference contains the source code and other resources for the development of the autonomy software for our Mars rover. The Autonomy Software project aims to compete in the University Rover Challenge (URC) by demonstrating advanced autonomous capabilities and robust navigation algorithms.
Loading...
Searching...
No Matches
AutonomyThread.hpp
1
14#ifndef AUTONOMYTHREAD_H
15#define AUTONOMYTHREAD_H
16
17#include "../util/IPS.hpp"
18
20#include "../../external/threadpool/include/BS_thread_pool.hpp"
21#include <atomic>
22#include <chrono>
23#include <condition_variable>
24#include <vector>
25
27
28
36template<class T>
38{
39 public:
41 // Define public enumerators specific to this class.
43
44 // Define an enum for storing this classes state.
45 enum class AutonomyThreadState
46 {
47 eStarting,
48 eRunning,
49 eStopping,
50 eStopped
51 };
52
54 // Declare and define public class methods.
56
64 {
65 // Initialize member variables.
66 m_bStopThreads = false;
67 m_eThreadState = AutonomyThreadState::eStopped;
68 m_nMainThreadMaxIterationPerSecond = 0;
69 }
70
71
82 {
83 // Tell all threads to stop executing user code.
84 m_bStopThreads = true;
85 // Update thread state.
86 m_eThreadState = AutonomyThreadState::eStopping;
87
88 // Pause and clear pool queues.
89 m_thPool.pause();
90 m_thPool.purge();
91 m_thMainThread.pause();
92 m_thMainThread.purge();
93
94 // Wait for all pools to finish.
95 m_thPool.wait();
96 m_thMainThread.wait();
97 // Update thread state.
98 m_eThreadState = AutonomyThreadState::eStopped;
99 }
100
101
117 void Start()
118 {
119 // Tell any open thread to stop.
120 m_bStopThreads = true;
121 // Update thread state.
122 m_eThreadState = AutonomyThreadState::eStopping;
123
124 // Pause queuing of new tasks to the threads, then purge them.
125 m_thPool.pause();
126 m_thPool.purge();
127 m_thMainThread.pause();
128 m_thMainThread.purge();
129
130 // Wait for loop, pool and main thread to join.
131 this->Join();
132
133 // Update thread state.
134 m_eThreadState = AutonomyThreadState::eStarting;
135 // Clear results vector.
136 m_vPoolReturns.clear();
137 // Reset thread stop toggle.
138 m_bStopThreads = false;
139
140 // Submit single task to pool queue and store resulting future. Still using pool, as it's scheduling is more efficient.
141 std::future<void> fuMainReturn = m_thMainThread.submit_task([this]() { this->RunThread(m_bStopThreads); });
142
143 // Unpause pool queues.
144 m_thPool.unpause();
145 m_thMainThread.unpause();
146
147 // Block until thread is started or currently stopping if thread start failed.
148 std::unique_lock<std::mutex> lkStartLock(m_muThreadRunningConditionMutex);
149 m_cdThreadRunningCondition.wait(lkStartLock,
150 [this]
151 { return this->m_eThreadState == AutonomyThreadState::eRunning || this->m_eThreadState == AutonomyThreadState::eStopping; });
152 }
153
154
165 {
166 // Signal for any open threads to stop executing,
167 m_bStopThreads = true;
168 // Update thread state.
169 m_eThreadState = AutonomyThreadState::eStopping;
170 }
171
172
180 void Join()
181 {
182 // Wait for pool to finish all tasks.
183 m_thPool.wait();
184 // Wait for main thread to finish.
185 m_thMainThread.wait();
186
187 // Update thread state.
188 m_eThreadState = AutonomyThreadState::eStopped;
189 }
190
191
201 bool Joinable() const
202 { // Check current number of running and queued tasks.
203 return (m_thMainThread.get_tasks_total() <= 0 && m_thPool.get_tasks_total() <= 0);
204 }
205
206
214 AutonomyThreadState GetThreadState() const { return m_eThreadState; }
215
216
224 IPS& GetIPS() { return m_IPS; }
225
226 protected:
228 // Declare protected objects.
230 IPS m_IPS = IPS();
231
233 // Declare and define protected class methods.
235
236
264 void RunPool(const unsigned int nNumTasksToQueue, const unsigned int nNumThreads = 2, const bool bForceStopCurrentThreads = false)
265 {
266 // Check if the pools need to be resized.
267 if (m_thPool.get_thread_count() != nNumThreads)
268 {
269 // Pause queuing of new tasks to the threads, then purge them.
270 m_thPool.pause();
271 m_thPool.purge();
272 // Wait for open threads to terminate, then resize the pool.
273 m_thPool.reset(nNumThreads);
274 // Unpause queue.
275 m_thPool.unpause();
276
277 // Clear results vector.
278 m_vPoolReturns.clear();
279 }
280 // Check if the current pool tasks should be stopped before queueing more tasks.
281 else if (bForceStopCurrentThreads)
282 {
283 // Pause queuing of new tasks to the threads, then purge them.
284 m_thPool.pause();
285 m_thPool.purge();
286 // Wait for threadpool to join.
287 m_thPool.wait();
288 // Unpause queue.
289 m_thPool.unpause();
290 }
291
292 // Loop nNumThreads times and queue tasks.
293 for (unsigned int i = 0; i < nNumTasksToQueue; ++i)
294 {
295 // Submit single task to pool queue.
296 m_vPoolReturns.emplace_back(m_thPool.submit_task(
297 [this]()
298 {
299 // Run user pool code without lock.
300 this->PooledLinearCode();
301 }));
302 }
303 }
304
305
336 void RunDetachedPool(const unsigned int nNumTasksToQueue, const unsigned int nNumThreads = 2, const bool bForceStopCurrentThreads = false)
337 {
338 // Check if the pools need to be resized.
339 if (m_thPool.get_thread_count() != nNumThreads)
340 {
341 // Pause queuing of new tasks to the threads, then purge them.
342 m_thPool.pause();
343 m_thPool.purge();
344 // Wait for open threads to terminate, then resize the pool.
345 m_thPool.reset(nNumThreads);
346 // Unpause queue.
347 m_thPool.unpause();
348
349 // Clear results vector.
350 m_vPoolReturns.clear();
351 }
352 // Check if the current pool tasks should be stopped before queueing more tasks.
353 else if (bForceStopCurrentThreads)
354 {
355 // Pause queuing of new tasks to the threads, then purge them.
356 m_thPool.pause();
357 m_thPool.purge();
358 // Wait for threadpool to join.
359 m_thPool.wait();
360 // Unpause queue.
361 m_thPool.unpause();
362 }
363
364 // Loop nNumThreads times and queue tasks.
365 for (unsigned int i = 0; i < nNumTasksToQueue; ++i)
366 {
367 // Push single task to pool queue. No return value no control.
368 m_thPool.detach_task(
369 [this]()
370 {
371 // Run user code without lock.
372 this->PooledLinearCode();
373 });
374 }
375 }
376
377
403 template<typename N, typename F>
404 void ParallelizeLoop(const int nNumThreads, const N tTotalIterations, F&& tLoopFunction)
405 {
406 // Create new thread pool.
407 BS::thread_pool m_thLoopPool = BS::thread_pool(nNumThreads);
408
409 m_thLoopPool.detach_blocks(0,
410 tTotalIterations,
411 [&tLoopFunction](const int nStart, const int nEnd)
412 {
413 // Call loop function without lock.
414 tLoopFunction(nStart, nEnd);
415 });
416
417 // Wait for loop to finish.
418 m_thLoopPool.wait();
419 }
420
421
429 void ClearPoolQueue() { m_thPool.purge(); }
430
431
439 void JoinPool() { m_thPool.wait(); }
440
441
451 bool PoolJoinable() const
452 {
453 // Check current number of running and queued tasks.
454 return (m_thPool.get_tasks_total() <= 0);
455 }
456
457
467 void SetMainThreadIPSLimit(int nMaxIterationsPerSecond = 0)
468 {
469 // Assign member variable.
470 m_nMainThreadMaxIterationPerSecond = nMaxIterationsPerSecond;
471 }
472
473
481 int GetPoolNumOfThreads() { return m_thPool.get_thread_count(); }
482
483
491 int GetPoolQueueLength() { return m_thPool.get_tasks_queued(); }
492
493
505 std::vector<T> GetPoolResults()
506 {
507 // Create instance variable.
508 std::vector<T> vResults;
509
510 // Loop the pool futures and get result.
511 for (std::future<T> fResult : m_vPoolReturns)
512 {
513 // Store returned value.
514 vResults.emplace_back(fResult.get());
515 }
516
517 // Clear pool returns member variable.
518 m_vPoolReturns.clear();
519
520 return vResults;
521 }
522
523
532 {
533 // Return member variable value.
534 return m_nMainThreadMaxIterationPerSecond;
535 }
536
537 private:
539 // Declare private class member variables.
541
542 BS::thread_pool m_thMainThread = BS::thread_pool(1);
543 BS::thread_pool m_thPool = BS::thread_pool(2);
544 std::vector<std::future<T>> m_vPoolReturns;
545 std::atomic_bool m_bStopThreads;
546 std::atomic<AutonomyThreadState> m_eThreadState;
547 std::mutex m_muThreadRunningConditionMutex;
548 std::condition_variable m_cdThreadRunningCondition;
549 int m_nMainThreadMaxIterationPerSecond;
550
552 // Declare and/or define private methods.
554
555 // Declare interface class pure virtual functions. (These must be overriden by inheritor.)
556 virtual void ThreadedContinuousCode() = 0; // This is where user's main single threaded and continuously looping code will go.
557 virtual T PooledLinearCode() = 0; // This is where user's offshoot, highly parallelizable code will go. Helpful for intensive short-lived tasks.
558 // Can be ran from inside the ThreadedContinuousCode() method.
559
560 // Declare and define private interface methods.
561
571 void RunThread(std::atomic_bool& bStopThread)
572 {
573 // Declare instance variables.
574 std::chrono::_V2::system_clock::time_point tmStartTime;
575
576 // Loop until stop flag is set.
577 while (!bStopThread)
578 {
579 // Check if max IPS limit has been set.
580 if (m_nMainThreadMaxIterationPerSecond > 0)
581 {
582 // Get start execution time.
583 tmStartTime = std::chrono::high_resolution_clock::now();
584 }
585
586 // Call method containing user code.
587 this->ThreadedContinuousCode();
588
589 // Check if max IPS limit has been set.
590 if (m_nMainThreadMaxIterationPerSecond > 0)
591 {
592 // Get end execution time.
593 std::chrono::_V2::system_clock::time_point tmEndTime = std::chrono::high_resolution_clock::now();
594 // Get execution time of user code.
595 std::chrono::microseconds tmElapsedTime = std::chrono::duration_cast<std::chrono::microseconds>(tmEndTime - tmStartTime);
596 // Check if the elapsed time is slower than the max iterations per seconds.
597 if (tmElapsedTime.count() < (1.0 / m_nMainThreadMaxIterationPerSecond) * 1000000)
598 {
599 // Calculate the time to wait to stay under IPS cap.
600 int nSleepTime = ((1.0 / m_nMainThreadMaxIterationPerSecond) * 1000000) - tmElapsedTime.count();
601 // Make this thread sleep for the remaining time.
602 std::this_thread::sleep_for(std::chrono::microseconds(nSleepTime));
603 }
604 }
605
606 // Check if thread state needs to be updated.
607 if (m_eThreadState != AutonomyThreadState::eRunning && m_eThreadState != AutonomyThreadState::eStopping)
608 {
609 // Update thread state to running.
610 m_eThreadState = AutonomyThreadState::eRunning;
611 // Notify waiting start method that thread is now running.
612 m_cdThreadRunningCondition.notify_all();
613 }
614
615 // Call iteration per second tracking tick.
616 m_IPS.Tick();
617 }
618
619 // Notify waiting start method that thread is now stopping.
620 m_cdThreadRunningCondition.notify_all();
621 }
622};
623
624#endif
Interface class used to easily multithread a child class.
Definition AutonomyThread.hpp:38
void Join()
Waits for thread to finish executing and then closes thread. This method will block the calling code ...
Definition AutonomyThread.hpp:180
void ParallelizeLoop(const int nNumThreads, const N tTotalIterations, F &&tLoopFunction)
Given a ref-qualified looping function and an arbitrary number of iterations, this method will divide...
Definition AutonomyThread.hpp:404
void RunPool(const unsigned int nNumTasksToQueue, const unsigned int nNumThreads=2, const bool bForceStopCurrentThreads=false)
When this method is called, it starts/adds tasks to a thread pool that runs nNumTasksToQueue copies o...
Definition AutonomyThread.hpp:264
int GetMainThreadMaxIPS() const
Accessor for the Main Thread Max I P S private member.
Definition AutonomyThread.hpp:531
void SetMainThreadIPSLimit(int nMaxIterationsPerSecond=0)
Mutator for the Main Thread Max I P S private member.
Definition AutonomyThread.hpp:467
AutonomyThread()
Construct a new Autonomy Thread object.
Definition AutonomyThread.hpp:63
int GetPoolNumOfThreads()
Accessor for the Pool Num Of Threads private member.
Definition AutonomyThread.hpp:481
std::vector< T > GetPoolResults()
Accessor for the Pool Results private member. The action of getting results will destroy and remove t...
Definition AutonomyThread.hpp:505
void RequestStop()
Signals threads to stop executing user code, terminate. DOES NOT JOIN. This method will not force the...
Definition AutonomyThread.hpp:164
bool Joinable() const
Check if the code within the thread and all pools created by it are finished executing and the thread...
Definition AutonomyThread.hpp:201
IPS & GetIPS()
Accessor for the Frame I P S private member.
Definition AutonomyThread.hpp:224
int GetPoolQueueLength()
Accessor for the Pool Queue Size private member.
Definition AutonomyThread.hpp:491
void ClearPoolQueue()
Clears any tasks waiting to be ran in the queue, tasks currently running will remain running.
Definition AutonomyThread.hpp:429
void Start()
When this method is called, it starts a new thread that runs the code within the ThreadedContinuousCo...
Definition AutonomyThread.hpp:117
void RunThread(std::atomic_bool &bStopThread)
This method is ran in a separate thread. It is a middleware between the class member thread and the u...
Definition AutonomyThread.hpp:571
virtual ~AutonomyThread()
Destroy the Autonomy Thread object. If the parent object or main thread is destroyed or exited while ...
Definition AutonomyThread.hpp:81
void RunDetachedPool(const unsigned int nNumTasksToQueue, const unsigned int nNumThreads=2, const bool bForceStopCurrentThreads=false)
When this method is called, it starts a thread pool full of threads that don't return std::futures (l...
Definition AutonomyThread.hpp:336
bool PoolJoinable() const
Check if the internal pool threads are done executing code and the queue is empty.
Definition AutonomyThread.hpp:451
void JoinPool()
Waits for pool to finish executing tasks. This method will block the calling code until thread is fin...
Definition AutonomyThread.hpp:439
AutonomyThreadState GetThreadState() const
Accessor for the Threads State private member.
Definition AutonomyThread.hpp:214
A fast, lightweight, and easy-to-use C++17 thread pool class.
Definition BS_thread_pool.hpp:289
void reset()
Reset the pool with the total number of hardware threads available, as reported by the implementation...
Definition BS_thread_pool.hpp:545
void wait()
Wait for tasks to be completed. Normally, this function waits for all tasks, both those that are curr...
Definition BS_thread_pool.hpp:761
size_t get_tasks_queued() const
Get the number of tasks currently waiting in the queue to be executed by the threads.
Definition BS_thread_pool.hpp:366
void purge()
Purge all the tasks waiting in the queue. Tasks that are currently running will not be affected,...
Definition BS_thread_pool.hpp:444
void detach_task(F &&task BS_THREAD_POOL_PRIORITY_INPUT)
Submit a function with no arguments and no return value into the task queue, with the specified prior...
Definition BS_thread_pool.hpp:459
void detach_blocks(const T first_index, const T index_after_last, F &&block, const size_t num_blocks=0 BS_THREAD_POOL_PRIORITY_INPUT)
Parallelize a loop by automatically splitting it into blocks and submitting each block separately to ...
Definition BS_thread_pool.hpp:480
std::future< R > submit_task(F &&task BS_THREAD_POOL_PRIORITY_INPUT)
Submit a function with no arguments into the task queue, with the specified priority....
Definition BS_thread_pool.hpp:605
size_t get_tasks_total() const
Get the total number of unfinished tasks: either still waiting in the queue, or running in a thread....
Definition BS_thread_pool.hpp:388
concurrency_t get_thread_count() const
Get the number of threads in the pool.
Definition BS_thread_pool.hpp:399
This util class provides an easy way to keep track of iterations per second for any body of code.
Definition IPS.hpp:30
void Tick()
This method is used to update the iterations per second counter and recalculate all of the IPS metric...
Definition IPS.hpp:138