Vuo 2.4.4
Loading...
Searching...
No Matches
VuoThreadManager.cc
Go to the documentation of this file.
1
10#include "VuoThreadManager.hh"
11
13#include "VuoRuntimeState.hh"
15#include "VuoEventLoop.h"
16#include "VuoException.hh"
17
21VuoThreadManager::Worker::Worker(dispatch_queue_t queue, void *context, void (*function)(void *),
22 int minThreadsNeeded, int maxThreadsNeeded,
23 unsigned long eventId, unsigned long compositionHash, int chainCount)
24{
25 this->queue = queue;
26 this->context = context;
27 this->function = function;
28 this->isTrigger = true;
29 this->minThreadsNeeded = minThreadsNeeded;
30 this->maxThreadsNeeded = maxThreadsNeeded;
31 this->eventId = eventId;
32 this->compositionHash = compositionHash;
33 this->chainIndex = ULONG_MAX;
34 this->chainCount = chainCount;
35 this->upstreamChainIndices = NULL;
36 this->upstreamChainIndicesCount = 0;
37}
38
42VuoThreadManager::Worker::Worker(dispatch_queue_t queue, void *context, void (*function)(void *),
43 int minThreadsNeeded, int maxThreadsNeeded,
44 unsigned long eventId, unsigned long compositionHash, unsigned long chainIndex,
45 unsigned long *upstreamChainIndices, int upstreamChainIndicesCount)
46{
47 this->queue = queue;
48 this->context = context;
49 this->function = function;
50 this->isTrigger = false;
51 this->minThreadsNeeded = minThreadsNeeded;
52 this->maxThreadsNeeded = maxThreadsNeeded;
53 this->eventId = eventId;
54 this->compositionHash = compositionHash;
55 this->chainIndex = chainIndex;
56 this->chainCount = -1;
57 this->upstreamChainIndices = upstreamChainIndices;
58 this->upstreamChainIndicesCount = upstreamChainIndicesCount;
59}
60
64VuoThreadManager::WorkerQueue::WorkerQueue(void)
65{
66 first = NULL;
67 last = NULL;
68}
69
74void VuoThreadManager::WorkerQueue::enqueue(Worker *worker)
75{
76 Node *node = new Node;
77 node->worker = worker;
78 node->next = NULL;
79 node->prev = NULL;
80
81 if (! first)
82 {
83 last = node;
84 }
85 else
86 {
87 first->prev = node;
88 node->next = first;
89 }
90
91 first = node;
92}
93
97void VuoThreadManager::WorkerQueue::dequeue(Node *node)
98{
99 if (node == last)
100 last = node->prev;
101 else
102 node->next->prev = node->prev;
103
104 if (node == first)
105 first = node->next;
106 else
107 node->prev->next = node->next;
108
109 delete node;
110}
111
117VuoThreadManager::WorkerQueue::Node * VuoThreadManager::WorkerQueue::getNextOldest(Node *newest, Node *node)
118{
119 if (node == newest)
120 return NULL;
121 else
122 return node->prev;
123}
124
128VuoThreadManager::WorkerQueue::Node * VuoThreadManager::WorkerQueue::getOldest(void)
129{
130 return last;
131}
132
136VuoThreadManager::WorkerQueue::Node * VuoThreadManager::WorkerQueue::getNewest(void)
137{
138 return first;
139}
140
144size_t VuoThreadManager::WorkerQueue::size(void)
145{
146 size_t s = 0;
147 for (Node *curr = first; curr != NULL; curr = curr->next)
148 ++s;
149 return s;
150}
151
155VuoThreadManager::ThreadPool::ThreadPool(void)
156{
157 totalThreads = 0;
158 threadsAvailable = 0;
159 totalWorkers = 0;
160}
161
165void VuoThreadManager::ThreadPool::setTotalThreads(int totalThreads)
166{
167 this->totalThreads = totalThreads;
168 this->threadsAvailable = totalThreads;
169}
170
175bool VuoThreadManager::ThreadPool::tryClaimThreads(int minThreadsNeeded, int maxThreadsNeeded, unsigned long workerId, int &threadsClaimed)
176{
177 if (minThreadsNeeded <= threadsAvailable)
178 {
179 threadsClaimed = min(threadsAvailable, maxThreadsNeeded);
180 threadsAvailable -= threadsClaimed;
181 workersClaimingThreads[workerId] = threadsClaimed;
182 return true;
183 }
184 else
185 {
186 threadsClaimed = 0;
187 return false;
188 }
189}
190
196int VuoThreadManager::ThreadPool::getThreadsClaimed(unsigned long workerId)
197{
198 map<unsigned long, int>::iterator iter = workersClaimingThreads.find(workerId);
199 if (iter != workersClaimingThreads.end())
200 return iter->second;
201 else
202 throw VuoException("Couldn't find worker in thread pool to get its claimed threads.");
203}
204
210void VuoThreadManager::ThreadPool::returnThreads(unsigned long workerId)
211{
212 map<unsigned long, int>::iterator iter = workersClaimingThreads.find(workerId);
213 if (iter != workersClaimingThreads.end())
214 {
215 threadsAvailable += iter->second;
216 if (totalWorkers > 0)
217 workersCompleted.insert(iter->first);
218 workersClaimingThreads.erase(iter);
219 }
220 else
221 throw VuoException("Couldn't find worker in thread pool to return its threads.");
222}
223
224
229{
230 mainThreadPool.setTotalThreads(60); // maximum number of worker threads in use simultaneously
231 workersWaitingSync = dispatch_queue_create("org.vuo.runtime.workersWaiting", VuoEventLoop_getDispatchInteractiveAttribute());
232 threadPoolSync = dispatch_queue_create("org.vuo.runtime.threadPool", VuoEventLoop_getDispatchInteractiveAttribute());
233 workersUpdated = dispatch_semaphore_create(0);
234 completed = dispatch_semaphore_create(0);
235}
236
241{
242 dispatch_release(workersWaitingSync);
243 dispatch_release(threadPoolSync);
244 dispatch_release(workersUpdated);
245}
246
251{
252 mayMoreWorkersBeEnqueued = true;
253 mayMoreWorkersBeDequeued = true;
254
255 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0);
256 dispatch_async(queue, ^{
257 while (mayMoreWorkersBeDequeued)
258 {
259 vector<Worker *> workers = dequeueWorkers();
260
261 for (vector<Worker *>::iterator i = workers.begin(); i != workers.end(); ++i)
262 {
263 Worker *w = *i;
264 dispatch_async_f(w->queue, w->context, w->function);
265 delete w;
266 }
267 }
268
269 dispatch_semaphore_signal(completed);
270 });
271}
272
277{
278 dispatch_sync(workersWaitingSync, ^{
279 mayMoreWorkersBeEnqueued = false;
280 });
281
282 dispatch_semaphore_signal(workersUpdated);
283 dispatch_semaphore_wait(completed, DISPATCH_TIME_FOREVER);
284
285 mainThreadPool.workersCompleted.clear();
286}
287
301vector<VuoThreadManager::Worker *> VuoThreadManager::dequeueWorkers(void)
302{
303 workersDequeued.clear();
304 workersDequeuedNodes.clear();
305
306 while (workersDequeued.empty() && mayMoreWorkersBeDequeued)
307 {
308 dispatch_semaphore_wait(workersUpdated, DISPATCH_TIME_FOREVER);
309
310 __block WorkerQueue::Node *newest;
311 __block WorkerQueue::Node *oldest;
312 dispatch_sync(workersWaitingSync, ^{
313 newest = workersWaitingForThreads.getNewest();
314 oldest = workersWaitingForThreads.getOldest();
315 });
316
317 // During this block, additional workers may be enqueued in `workersWaitingForThreads`,
318 // but the section of the queue from `oldest` to `newest` remains unchanged.
319 dispatch_sync(threadPoolSync, ^{
320 bool hasTriggerBeenSkipped = false;
321
322 for (WorkerQueue::Node *n = oldest; n != NULL; n = workersWaitingForThreads.getNextOldest(newest, n))
323 {
324 Worker *w = n->worker;
325
326 if (w->isTrigger)
327 {
328 if (w->minThreadsNeeded >= 0 && w->maxThreadsNeeded >= 0)
329 {
330 // Trigger worker for new event
331
332 if (! hasTriggerBeenSkipped)
333 {
334 int threadsClaimed;
335 bool gotThreads = mainThreadPool.tryClaimThreads(w->minThreadsNeeded, w->maxThreadsNeeded, w->eventId, threadsClaimed);
336 if (gotThreads)
337 {
338 ThreadPool &triggerThreadPool = triggerThreadPools[w->eventId][w->compositionHash];
339 triggerThreadPool.setTotalThreads(threadsClaimed);
340 triggerThreadPool.totalWorkers = w->chainCount;
341 workersDequeuedNodes.push_back(n);
342 }
343 else
344 {
345 hasTriggerBeenSkipped = true;
346 }
347 }
348 }
349 else
350 {
351 // Trigger worker for published input trigger of a subcomposition node
352
353 triggerThreadPools[w->eventId][w->compositionHash].totalWorkers = w->chainCount;
354 workersDequeuedNodes.push_back(n);
355 }
356 }
357 else
358 {
359 // Chain worker
360
361 ThreadPool &triggerThreadPool = triggerThreadPools[w->eventId][w->compositionHash];
362
363 bool haveAllUpstreamChainsCompleted = true;
364 for (int i = 0; i < w->upstreamChainIndicesCount; ++i)
365 {
366 unsigned long index = w->upstreamChainIndices[i];
367
368 set<unsigned long>::iterator iter = triggerThreadPool.workersCompleted.find( index );
369 if (iter == triggerThreadPool.workersCompleted.end())
370 {
371 haveAllUpstreamChainsCompleted = false;
372 break;
373 }
374 }
375
376 if (haveAllUpstreamChainsCompleted)
377 {
378 int threadsClaimed;
379 bool gotThreads = triggerThreadPool.tryClaimThreads(w->minThreadsNeeded, w->maxThreadsNeeded, w->chainIndex, threadsClaimed);
380 if (gotThreads)
381 {
382 workersDequeuedNodes.push_back(n);
383
384 free(w->upstreamChainIndices);
385 w->upstreamChainIndices = NULL;
386 }
387 }
388 }
389
390 if (n == newest) {
391 break;
392 }
393 }
394 });
395
396 dispatch_sync(workersWaitingSync, ^{
397 for (vector<WorkerQueue::Node *>::iterator i = workersDequeuedNodes.begin(); i != workersDequeuedNodes.end(); ++i)
398 {
399 WorkerQueue::Node *n = *i;
400 workersDequeued.push_back(n->worker);
401 workersWaitingForThreads.dequeue(n);
402 }
403
404 if (! mayMoreWorkersBeEnqueued && workersWaitingForThreads.first == NULL) {
405 mayMoreWorkersBeDequeued = false;
406 }
407 });
408 }
409
410 return workersDequeued;
411}
412
421void VuoThreadManager::scheduleTriggerWorker(dispatch_queue_t queue, void *context, void (*function)(void *),
422 int minThreadsNeeded, int maxThreadsNeeded, unsigned long eventId,
423 const char *compositionIdentifier, int chainCount)
424{
425 // Assumes mainThreadPool.totalThreads is constant, thus safe to access outside of threadPoolSync.
426 int adjustedMinThreadsNeeded = minThreadsNeeded;
427 if (adjustedMinThreadsNeeded > mainThreadPool.totalThreads) {
428 adjustedMinThreadsNeeded = mainThreadPool.totalThreads;
429 VUserLog("Warning: Couldn't allocate as many threads to a trigger worker as it requires.");
430 }
431
432 unsigned long compositionHash = VuoRuntimeUtilities::hash(compositionIdentifier);
433 Worker *worker = new Worker(queue, context, function, adjustedMinThreadsNeeded, maxThreadsNeeded, eventId, compositionHash, chainCount);
434
435 dispatch_sync(workersWaitingSync, ^{
436 workersWaitingForThreads.enqueue(worker);
437 });
438
439 dispatch_semaphore_signal(workersUpdated);
440}
441
447void VuoThreadManager::scheduleChainWorker(dispatch_queue_t queue, void *context, void (*function)(void *),
448 int minThreadsNeeded, int maxThreadsNeeded, unsigned long eventId, const char *compositionIdentifier,
449 unsigned long chainIndex, unsigned long *upstreamChainIndices, int upstreamChainIndicesCount)
450{
451 unsigned long compositionHash = VuoRuntimeUtilities::hash(compositionIdentifier);
452 Worker *worker = new Worker(queue, context, function, minThreadsNeeded, maxThreadsNeeded, eventId, compositionHash, chainIndex, upstreamChainIndices, upstreamChainIndicesCount);
453
454 dispatch_sync(workersWaitingSync, ^{
455 workersWaitingForThreads.enqueue(worker);
456 });
457
458 dispatch_semaphore_signal(workersUpdated);
459}
460
466void VuoThreadManager::grantThreadsToChain(int minThreadsNeeded, int maxThreadsNeeded, unsigned long eventId, const char *compositionIdentifier,
467 unsigned long chainIndex)
468{
469 unsigned long compositionHash = VuoRuntimeUtilities::hash(compositionIdentifier);
470
471 dispatch_sync(threadPoolSync, ^{
472 ThreadPool &triggerThreadPool = triggerThreadPools[eventId][compositionHash];
473 int threadsClaimed;
474 bool gotThreads = triggerThreadPool.tryClaimThreads(minThreadsNeeded, maxThreadsNeeded, chainIndex, threadsClaimed);
475 if (! gotThreads) {
476 throw VuoException("Not enough threads available in the thread pool to execute the chain.");
477 }
478 });
479
480 dispatch_semaphore_signal(workersUpdated);
481}
482
488void VuoThreadManager::grantThreadsToSubcomposition(unsigned long eventId, const char *compositionIdentifier, unsigned long chainIndex,
489 const char *subcompositionIdentifier)
490{
491 unsigned long compositionHash = VuoRuntimeUtilities::hash(compositionIdentifier);
492 unsigned long subcompositionHash = VuoRuntimeUtilities::hash(subcompositionIdentifier);
493
494 dispatch_sync(threadPoolSync, ^{
495 map<unsigned long, ThreadPool> &eventThreadPools = triggerThreadPools[eventId];
496 int threadsClaimedForChain = eventThreadPools[compositionHash].getThreadsClaimed(chainIndex);
497 eventThreadPools[subcompositionHash].setTotalThreads(threadsClaimedForChain);
498 });
499}
500
505{
506 dispatch_sync(threadPoolSync, ^{
507 triggerThreadPools.erase(eventId);
508 mainThreadPool.returnThreads(eventId);
509 });
510
511 dispatch_semaphore_signal(workersUpdated);
512}
513
517void VuoThreadManager::returnThreadsForChainWorker(unsigned long eventId, const char *compositionIdentifier, unsigned long chainIndex)
518{
519 unsigned long compositionHash = VuoRuntimeUtilities::hash(compositionIdentifier);
520
521 dispatch_sync(threadPoolSync, ^{
522 ThreadPool &triggerThreadPool = triggerThreadPools[eventId][compositionHash];
523 triggerThreadPool.returnThreads(chainIndex);
524
525 // If this was the last chain for the event (for the current subcomposition or overall),
526 // erase the thread pool entries for the event.
527 set<unsigned long> &workersCompleted = triggerThreadPool.workersCompleted;
528 if (workersCompleted.size() == triggerThreadPool.totalWorkers)
529 {
530 map<unsigned long, ThreadPool> &eventThreadPools = triggerThreadPools[eventId];
531 eventThreadPools.erase(compositionHash);
532 if (eventThreadPools.empty())
533 {
534 triggerThreadPools.erase(eventId);
535 mainThreadPool.returnThreads(eventId);
536 }
537 }
538 });
539
540 dispatch_semaphore_signal(workersUpdated);
541}
542
543extern "C"
544{
545
549void vuoScheduleTriggerWorker(VuoCompositionState *compositionState, dispatch_queue_t queue, void *context, void (*function)(void *),
550 int minThreadsNeeded, int maxThreadsNeeded, unsigned long eventId, int chainCount)
551{
553 const char *compositionIdentifier = compositionState->compositionIdentifier;
555 minThreadsNeeded, maxThreadsNeeded, eventId,
556 compositionIdentifier, chainCount);
557}
558
562void vuoScheduleChainWorker(VuoCompositionState *compositionState, dispatch_queue_t queue, void *context, void (*function)(void *),
563 int minThreadsNeeded, int maxThreadsNeeded, unsigned long eventId, unsigned long chainIndex,
564 unsigned long *upstreamChainIndices, int upstreamChainIndicesCount)
565{
567 const char *compositionIdentifier = compositionState->compositionIdentifier;
569 minThreadsNeeded, maxThreadsNeeded, eventId, compositionIdentifier,
570 chainIndex, upstreamChainIndices, upstreamChainIndicesCount);
571}
572
577 int minThreadsNeeded, int maxThreadsNeeded, unsigned long eventId, unsigned long chainIndex)
578{
580 const char *compositionIdentifier = compositionState->compositionIdentifier;
581 runtimeState->persistentState->threadManager->grantThreadsToChain(minThreadsNeeded, maxThreadsNeeded, eventId, compositionIdentifier,
582 chainIndex);
583}
584
588void vuoGrantThreadsToSubcomposition(VuoCompositionState *compositionState, unsigned long eventId, unsigned long chainIndex,
589 char *subcompositionIdentifier)
590{
592 const char *compositionIdentifier = compositionState->compositionIdentifier;
593 runtimeState->persistentState->threadManager->grantThreadsToSubcomposition(eventId, compositionIdentifier, chainIndex,
594 subcompositionIdentifier);
595}
596
605
609void vuoReturnThreadsForChainWorker(VuoCompositionState *compositionState, unsigned long eventId, unsigned long chainIndex)
610{
612 const char *compositionIdentifier = compositionState->compositionIdentifier;
613 runtimeState->persistentState->threadManager->returnThreadsForChainWorker(eventId, compositionIdentifier, chainIndex);
614}
615
616}