EntityComponentMetaSystem
An implementation of an EntityComponent System with template meta-programming.
Loading...
Searching...
No Matches
ThreadPool.hpp
1#ifndef EC_META_SYSTEM_THREADPOOL_HPP
2#define EC_META_SYSTEM_THREADPOOL_HPP
3
4#include <atomic>
5#include <chrono>
6#include <deque>
7#include <functional>
8#include <list>
9#include <memory>
10#include <mutex>
11#include <queue>
12#include <thread>
13#include <tuple>
14#include <vector>
15
16#ifndef NDEBUG
17#include <iostream>
18#endif
19
20namespace EC {
21
22namespace Internal {
23using TPFnType = std::function<void(void *)>;
24using TPTupleType = std::tuple<TPFnType, void *>;
25using TPQueueType = std::queue<TPTupleType>;
26using ThreadPtr = std::unique_ptr<std::thread>;
27using ThreadStackType = std::vector<std::tuple<ThreadPtr, std::thread::id>>;
28using ThreadStacksType = std::deque<ThreadStackType>;
29using ThreadStacksMutexesT = std::deque<std::mutex>;
30using ThreadCountersT = std::deque<std::atomic_uint>;
31using PtrsHoldT = std::deque<std::atomic_bool>;
32using PointersT = std::tuple<ThreadStackType *, std::mutex *,
33 std::atomic_uint *, std::atomic_bool *>;
34} // namespace Internal
35
42template <unsigned int MAXSIZE>
44 public:
46 : threadStacks{}, threadStackMutexes{}, fnQueue{}, queueMutex{} {}
47
48 ~ThreadPool() {
49 while (!isNotRunning()) {
50 std::this_thread::sleep_for(std::chrono::microseconds(30));
51 }
52 }
53
64 void queueFn(std::function<void(void *)> &&fn, void *ud = nullptr) {
65 std::lock_guard<std::mutex> lock(queueMutex);
66 fnQueue.emplace(std::make_tuple(fn, ud));
67 }
68
77 Internal::PointersT startThreads() {
78 if (MAXSIZE >= 2) {
79 checkStacks();
80 auto pointers = newStackEntry();
81 Internal::ThreadStackType *threadStack = std::get<0>(pointers);
82 std::mutex *threadStackMutex = std::get<1>(pointers);
83 std::atomic_uint *aCounter = std::get<2>(pointers);
84 for (unsigned int i = 0; i < MAXSIZE; ++i) {
85 std::thread *newThread = new std::thread(
86 [](Internal::ThreadStackType *threadStack,
87 std::mutex *threadStackMutex,
88 Internal::TPQueueType *fnQueue, std::mutex *queueMutex,
89 std::atomic_uint *initCount) {
90 // add id to idStack "call stack"
91 {
92 std::lock_guard<std::mutex> lock(*threadStackMutex);
93 threadStack->push_back(
94 {Internal::ThreadPtr(nullptr),
95 std::this_thread::get_id()});
96 }
97
98 ++(*initCount);
99
100 // fetch queued fns and execute them
101 // fnTuples must live until end of function
102 std::list<Internal::TPTupleType> fnTuples;
103 do {
104 bool fnFound = false;
105 {
106 std::lock_guard<std::mutex> lock(*queueMutex);
107 if (!fnQueue->empty()) {
108 fnTuples.emplace_back(
109 std::move(fnQueue->front()));
110 fnQueue->pop();
111 fnFound = true;
112 }
113 }
114 if (fnFound) {
115 std::get<0>(fnTuples.back())(
116 std::get<1>(fnTuples.back()));
117 } else {
118 break;
119 }
120 } while (true);
121
122 // pop id from idStack "call stack"
123 do {
124 std::this_thread::sleep_for(
125 std::chrono::microseconds(15));
126 if (initCount->load() != MAXSIZE) {
127 continue;
128 }
129 {
130 std::lock_guard<std::mutex> lock(
131 *threadStackMutex);
132 if (std::get<1>(threadStack->back()) ==
133 std::this_thread::get_id()) {
134 if (!std::get<0>(threadStack->back())) {
135 continue;
136 }
137 std::get<0>(threadStack->back())->detach();
138 threadStack->pop_back();
139 break;
140 }
141 }
142 } while (true);
143 },
144 threadStack, threadStackMutex, &fnQueue, &queueMutex,
145 aCounter);
146 // Wait until thread has pushed to threadStack before setting
147 // the handle to it
148 while (aCounter->load() != i + 1) {
149 std::this_thread::sleep_for(std::chrono::microseconds(15));
150 }
151 std::lock_guard<std::mutex> stackLock(*threadStackMutex);
152 std::get<0>(threadStack->at(i)).reset(newThread);
153 }
154 return pointers;
155 } else {
156 sequentiallyRunTasks();
157 }
158 return {nullptr, nullptr, nullptr, nullptr};
159 }
160
165 std::lock_guard<std::mutex> lock(queueMutex);
166 return fnQueue.empty();
167 }
168
172 constexpr unsigned int getMaxThreadCount() { return MAXSIZE; }
173
181 if (MAXSIZE >= 2) {
182 Internal::PointersT pointers = startThreads();
183 do {
184 std::this_thread::sleep_for(std::chrono::microseconds(30));
185
186 bool isQueueEmpty = false;
187 {
188 std::lock_guard<std::mutex> lock(queueMutex);
189 isQueueEmpty = fnQueue.empty();
190 }
191
192 if (isQueueEmpty) {
193 break;
194 }
195 } while (true);
196 if (std::get<0>(pointers)) {
197 do {
198 {
199 std::lock_guard<std::mutex> lock(
200 *std::get<1>(pointers));
201 if (std::get<0>(pointers)->empty()) {
202 std::get<3>(pointers)->store(false);
203 break;
204 }
205 }
206 std::this_thread::sleep_for(std::chrono::microseconds(15));
207 } while (true);
208 }
209 } else {
210 sequentiallyRunTasks();
211 }
212 }
213
219 std::lock_guard<std::mutex> lock(dequesMutex);
220 auto tIter = threadStacks.begin();
221 auto mIter = threadStackMutexes.begin();
222 while (tIter != threadStacks.end() &&
223 mIter != threadStackMutexes.end()) {
224 {
225 std::lock_guard<std::mutex> lock(*mIter);
226 if (!tIter->empty()) {
227 return false;
228 }
229 }
230 ++tIter;
231 ++mIter;
232 }
233 return true;
234 }
235
236 private:
237 Internal::ThreadStacksType threadStacks;
238 Internal::ThreadStacksMutexesT threadStackMutexes;
239 Internal::TPQueueType fnQueue;
240 std::mutex queueMutex;
241 Internal::ThreadCountersT threadCounters;
242 Internal::PtrsHoldT ptrsHoldBools;
243 std::mutex dequesMutex;
244
245 void sequentiallyRunTasks() {
246 // pull functions from queue and run them on current thread
247 Internal::TPTupleType fnTuple;
248 bool hasFn;
249 do {
250 {
251 std::lock_guard<std::mutex> lock(queueMutex);
252 if (!fnQueue.empty()) {
253 hasFn = true;
254 fnTuple = fnQueue.front();
255 fnQueue.pop();
256 } else {
257 hasFn = false;
258 }
259 }
260 if (hasFn) {
261 std::get<0>(fnTuple)(std::get<1>(fnTuple));
262 }
263 } while (hasFn);
264 }
265
266 void checkStacks() {
267 std::lock_guard<std::mutex> lock(dequesMutex);
268 if (threadStacks.empty()) {
269 return;
270 }
271
272 bool erased = false;
273 do {
274 erased = false;
275 {
276 std::lock_guard<std::mutex> lock(threadStackMutexes.front());
277 if (ptrsHoldBools.front().load()) {
278 break;
279 } else if (threadStacks.front().empty()) {
280 threadStacks.pop_front();
281 threadCounters.pop_front();
282 ptrsHoldBools.pop_front();
283 erased = true;
284 }
285 }
286 if (erased) {
287 threadStackMutexes.pop_front();
288 } else {
289 break;
290 }
291 } while (!threadStacks.empty() && !threadStackMutexes.empty() &&
292 !threadCounters.empty() && !ptrsHoldBools.empty());
293 }
294
295 Internal::PointersT newStackEntry() {
296 std::lock_guard<std::mutex> lock(dequesMutex);
297 threadStacks.emplace_back();
298 threadStackMutexes.emplace_back();
299 threadCounters.emplace_back();
300 threadCounters.back().store(0);
301 ptrsHoldBools.emplace_back();
302 ptrsHoldBools.back().store(true);
303
304 return {&threadStacks.back(), &threadStackMutexes.back(),
305 &threadCounters.back(), &ptrsHoldBools.back()};
306 }
307};
308
309} // namespace EC
310
311#endif
Implementation of a Thread Pool.
Definition ThreadPool.hpp:43
constexpr unsigned int getMaxThreadCount()
Returns the MAXSIZE count that this class was created with.
Definition ThreadPool.hpp:172
bool isNotRunning()
Checks if any threads are currently running, returning true if there are no threads running.
Definition ThreadPool.hpp:218
bool isQueueEmpty()
Returns true if the function queue is empty.
Definition ThreadPool.hpp:164
void easyStartAndWait()
Calls startThreads() and waits until all threads have finished.
Definition ThreadPool.hpp:180
Internal::PointersT startThreads()
Creates MAXSIZE threads that will process queueFn() functions.
Definition ThreadPool.hpp:77
void queueFn(std::function< void(void *)> &&fn, void *ud=nullptr)
Queues a function to be called (doesn't start calling yet).
Definition ThreadPool.hpp:64