Vuo  2.3.2
VuoThreadManager.cc
Go to the documentation of this file.
1 
10 #include "VuoThreadManager.hh"
11 
13 #include "VuoRuntimeState.hh"
14 #include "VuoRuntimeUtilities.hh"
15 #include "VuoEventLoop.h"
16 #include "VuoException.hh"
17 
21 VuoThreadManager::Worker::Worker(dispatch_queue_t queue, void *context, void (*function)(void *),
22  int minThreadsNeeded, int maxThreadsNeeded,
23  unsigned long eventId, unsigned long compositionHash, int chainCount)
24 {
25  this->queue = queue;
26  this->context = context;
27  this->function = function;
28  this->isTrigger = true;
29  this->minThreadsNeeded = minThreadsNeeded;
30  this->maxThreadsNeeded = maxThreadsNeeded;
31  this->eventId = eventId;
32  this->compositionHash = compositionHash;
33  this->chainIndex = ULONG_MAX;
34  this->chainCount = chainCount;
35  this->upstreamChainIndices = NULL;
36  this->upstreamChainIndicesCount = 0;
37 }
38 
42 VuoThreadManager::Worker::Worker(dispatch_queue_t queue, void *context, void (*function)(void *),
43  int minThreadsNeeded, int maxThreadsNeeded,
44  unsigned long eventId, unsigned long compositionHash, unsigned long chainIndex,
45  unsigned long *upstreamChainIndices, int upstreamChainIndicesCount)
46 {
47  this->queue = queue;
48  this->context = context;
49  this->function = function;
50  this->isTrigger = false;
51  this->minThreadsNeeded = minThreadsNeeded;
52  this->maxThreadsNeeded = maxThreadsNeeded;
53  this->eventId = eventId;
54  this->compositionHash = compositionHash;
55  this->chainIndex = chainIndex;
56  this->chainCount = -1;
57  this->upstreamChainIndices = upstreamChainIndices;
58  this->upstreamChainIndicesCount = upstreamChainIndicesCount;
59 }
60 
64 VuoThreadManager::WorkerQueue::WorkerQueue(void)
65 {
66  first = NULL;
67  last = NULL;
68 }
69 
74 void VuoThreadManager::WorkerQueue::enqueue(Worker *worker)
75 {
76  Node *node = new Node;
77  node->worker = worker;
78  node->next = NULL;
79  node->prev = NULL;
80 
81  if (! first)
82  {
83  last = node;
84  }
85  else
86  {
87  first->prev = node;
88  node->next = first;
89  }
90 
91  first = node;
92 }
93 
97 void VuoThreadManager::WorkerQueue::dequeue(Node *node)
98 {
99  if (node == last)
100  last = node->prev;
101  else
102  node->next->prev = node->prev;
103 
104  if (node == first)
105  first = node->next;
106  else
107  node->prev->next = node->next;
108 
109  delete node;
110 }
111 
117 VuoThreadManager::WorkerQueue::Node * VuoThreadManager::WorkerQueue::getNextOldest(Node *newest, Node *node)
118 {
119  if (node == newest)
120  return NULL;
121  else
122  return node->prev;
123 }
124 
128 VuoThreadManager::WorkerQueue::Node * VuoThreadManager::WorkerQueue::getOldest(void)
129 {
130  return last;
131 }
132 
136 VuoThreadManager::WorkerQueue::Node * VuoThreadManager::WorkerQueue::getNewest(void)
137 {
138  return first;
139 }
140 
144 size_t VuoThreadManager::WorkerQueue::size(void)
145 {
146  size_t s = 0;
147  for (Node *curr = first; curr != NULL; curr = curr->next)
148  ++s;
149  return s;
150 }
151 
155 VuoThreadManager::ThreadPool::ThreadPool(void)
156 {
157  totalThreads = 0;
158  threadsAvailable = 0;
159  totalWorkers = 0;
160 }
161 
165 void VuoThreadManager::ThreadPool::setTotalThreads(int totalThreads)
166 {
167  this->totalThreads = totalThreads;
168  this->threadsAvailable = totalThreads;
169 }
170 
175 bool VuoThreadManager::ThreadPool::tryClaimThreads(int minThreadsNeeded, int maxThreadsNeeded, unsigned long workerId, int &threadsClaimed)
176 {
177  if (minThreadsNeeded <= threadsAvailable)
178  {
179  threadsClaimed = min(threadsAvailable, maxThreadsNeeded);
180  threadsAvailable -= threadsClaimed;
181  workersClaimingThreads[workerId] = threadsClaimed;
182  return true;
183  }
184  else
185  {
186  threadsClaimed = 0;
187  return false;
188  }
189 }
190 
196 int VuoThreadManager::ThreadPool::getThreadsClaimed(unsigned long workerId)
197 {
198  map<unsigned long, int>::iterator iter = workersClaimingThreads.find(workerId);
199  if (iter != workersClaimingThreads.end())
200  return iter->second;
201  else
202  throw VuoException("Couldn't find worker in thread pool to get its claimed threads.");
203 }
204 
210 void VuoThreadManager::ThreadPool::returnThreads(unsigned long workerId)
211 {
212  map<unsigned long, int>::iterator iter = workersClaimingThreads.find(workerId);
213  if (iter != workersClaimingThreads.end())
214  {
215  threadsAvailable += iter->second;
216  if (totalWorkers > 0)
217  workersCompleted.insert(iter->first);
218  workersClaimingThreads.erase(iter);
219  }
220  else
221  throw VuoException("Couldn't find worker in thread pool to return its threads.");
222 }
223 
224 
229 {
230  mainThreadPool.setTotalThreads(60); // maximum number of worker threads in use simultaneously
231  workersWaitingSync = dispatch_queue_create("org.vuo.runtime.workersWaiting", VuoEventLoop_getDispatchInteractiveAttribute());
232  threadPoolSync = dispatch_queue_create("org.vuo.runtime.threadPool", VuoEventLoop_getDispatchInteractiveAttribute());
233  workersUpdated = dispatch_semaphore_create(0);
234  completed = dispatch_semaphore_create(0);
235 }
236 
241 {
242  dispatch_release(workersWaitingSync);
243  dispatch_release(threadPoolSync);
244  dispatch_release(workersUpdated);
245 }
246 
251 {
252  mayMoreWorkersBeEnqueued = true;
253  mayMoreWorkersBeDequeued = true;
254 
255  dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0);
256  dispatch_async(queue, ^{
257  while (mayMoreWorkersBeDequeued)
258  {
259  vector<Worker *> workers = dequeueWorkers();
260 
261  for (vector<Worker *>::iterator i = workers.begin(); i != workers.end(); ++i)
262  {
263  Worker *w = *i;
264  dispatch_async_f(w->queue, w->context, w->function);
265  delete w;
266  }
267  }
268 
269  dispatch_semaphore_signal(completed);
270  });
271 }
272 
277 {
278  dispatch_sync(workersWaitingSync, ^{
279  mayMoreWorkersBeEnqueued = false;
280  });
281 
282  dispatch_semaphore_signal(workersUpdated);
283  dispatch_semaphore_wait(completed, DISPATCH_TIME_FOREVER);
284 
285  mainThreadPool.workersCompleted.clear();
286 }
287 
301 vector<VuoThreadManager::Worker *> VuoThreadManager::dequeueWorkers(void)
302 {
303  workersDequeued.clear();
304  workersDequeuedNodes.clear();
305 
306  while (workersDequeued.empty() && mayMoreWorkersBeDequeued)
307  {
308  dispatch_semaphore_wait(workersUpdated, DISPATCH_TIME_FOREVER);
309 
310  __block WorkerQueue::Node *newest;
311  __block WorkerQueue::Node *oldest;
312  dispatch_sync(workersWaitingSync, ^{
313  newest = workersWaitingForThreads.getNewest();
314  oldest = workersWaitingForThreads.getOldest();
315  });
316 
317  // During this block, additional workers may be enqueued in `workersWaitingForThreads`,
318  // but the section of the queue from `oldest` to `newest` remains unchanged.
319  dispatch_sync(threadPoolSync, ^{
320  bool hasTriggerBeenSkipped = false;
321 
322  for (WorkerQueue::Node *n = oldest; n != NULL; n = workersWaitingForThreads.getNextOldest(newest, n))
323  {
324  Worker *w = n->worker;
325 
326  if (w->isTrigger)
327  {
328  if (w->minThreadsNeeded >= 0 && w->maxThreadsNeeded >= 0)
329  {
330  // Trigger worker for new event
331 
332  if (! hasTriggerBeenSkipped)
333  {
334  int threadsClaimed;
335  bool gotThreads = mainThreadPool.tryClaimThreads(w->minThreadsNeeded, w->maxThreadsNeeded, w->eventId, threadsClaimed);
336  if (gotThreads)
337  {
338  ThreadPool &triggerThreadPool = triggerThreadPools[w->eventId][w->compositionHash];
339  triggerThreadPool.setTotalThreads(threadsClaimed);
340  triggerThreadPool.totalWorkers = w->chainCount;
341  workersDequeuedNodes.push_back(n);
342  }
343  else
344  {
345  hasTriggerBeenSkipped = true;
346  }
347  }
348  }
349  else
350  {
351  // Trigger worker for published input trigger of a subcomposition node
352 
353  triggerThreadPools[w->eventId][w->compositionHash].totalWorkers = w->chainCount;
354  workersDequeuedNodes.push_back(n);
355  }
356  }
357  else
358  {
359  // Chain worker
360 
361  ThreadPool &triggerThreadPool = triggerThreadPools[w->eventId][w->compositionHash];
362 
363  bool haveAllUpstreamChainsCompleted = true;
364  for (int i = 0; i < w->upstreamChainIndicesCount; ++i)
365  {
366  unsigned long index = w->upstreamChainIndices[i];
367 
368  set<unsigned long>::iterator iter = triggerThreadPool.workersCompleted.find( index );
369  if (iter == triggerThreadPool.workersCompleted.end())
370  {
371  haveAllUpstreamChainsCompleted = false;
372  break;
373  }
374  }
375 
376  if (haveAllUpstreamChainsCompleted)
377  {
378  int threadsClaimed;
379  bool gotThreads = triggerThreadPool.tryClaimThreads(w->minThreadsNeeded, w->maxThreadsNeeded, w->chainIndex, threadsClaimed);
380  if (gotThreads)
381  {
382  workersDequeuedNodes.push_back(n);
383 
384  free(w->upstreamChainIndices);
385  w->upstreamChainIndices = NULL;
386  }
387  }
388  }
389 
390  if (n == newest) {
391  break;
392  }
393  }
394  });
395 
396  dispatch_sync(workersWaitingSync, ^{
397  for (vector<WorkerQueue::Node *>::iterator i = workersDequeuedNodes.begin(); i != workersDequeuedNodes.end(); ++i)
398  {
399  WorkerQueue::Node *n = *i;
400  workersDequeued.push_back(n->worker);
401  workersWaitingForThreads.dequeue(n);
402  }
403 
404  if (! mayMoreWorkersBeEnqueued && workersWaitingForThreads.first == NULL) {
405  mayMoreWorkersBeDequeued = false;
406  }
407  });
408  }
409 
410  return workersDequeued;
411 }
412 
421 void VuoThreadManager::scheduleTriggerWorker(dispatch_queue_t queue, void *context, void (*function)(void *),
422  int minThreadsNeeded, int maxThreadsNeeded, unsigned long eventId,
423  const char *compositionIdentifier, int chainCount)
424 {
425  // Assumes mainThreadPool.totalThreads is constant, thus safe to access outside of threadPoolSync.
426  int adjustedMinThreadsNeeded = minThreadsNeeded;
427  if (adjustedMinThreadsNeeded > mainThreadPool.totalThreads) {
428  adjustedMinThreadsNeeded = mainThreadPool.totalThreads;
429  VUserLog("Warning: Couldn't allocate as many threads to a trigger worker as it requires.");
430  }
431 
432  unsigned long compositionHash = VuoRuntimeUtilities::hash(compositionIdentifier);
433  Worker *worker = new Worker(queue, context, function, adjustedMinThreadsNeeded, maxThreadsNeeded, eventId, compositionHash, chainCount);
434 
435  dispatch_sync(workersWaitingSync, ^{
436  workersWaitingForThreads.enqueue(worker);
437  });
438 
439  dispatch_semaphore_signal(workersUpdated);
440 }
441 
447 void VuoThreadManager::scheduleChainWorker(dispatch_queue_t queue, void *context, void (*function)(void *),
448  int minThreadsNeeded, int maxThreadsNeeded, unsigned long eventId, const char *compositionIdentifier,
449  unsigned long chainIndex, unsigned long *upstreamChainIndices, int upstreamChainIndicesCount)
450 {
451  unsigned long compositionHash = VuoRuntimeUtilities::hash(compositionIdentifier);
452  Worker *worker = new Worker(queue, context, function, minThreadsNeeded, maxThreadsNeeded, eventId, compositionHash, chainIndex, upstreamChainIndices, upstreamChainIndicesCount);
453 
454  dispatch_sync(workersWaitingSync, ^{
455  workersWaitingForThreads.enqueue(worker);
456  });
457 
458  dispatch_semaphore_signal(workersUpdated);
459 }
460 
466 void VuoThreadManager::grantThreadsToChain(int minThreadsNeeded, int maxThreadsNeeded, unsigned long eventId, const char *compositionIdentifier,
467  unsigned long chainIndex)
468 {
469  unsigned long compositionHash = VuoRuntimeUtilities::hash(compositionIdentifier);
470 
471  dispatch_sync(threadPoolSync, ^{
472  ThreadPool &triggerThreadPool = triggerThreadPools[eventId][compositionHash];
473  int threadsClaimed;
474  bool gotThreads = triggerThreadPool.tryClaimThreads(minThreadsNeeded, maxThreadsNeeded, chainIndex, threadsClaimed);
475  if (! gotThreads) {
476  throw VuoException("Not enough threads available in the thread pool to execute the chain.");
477  }
478  });
479 
480  dispatch_semaphore_signal(workersUpdated);
481 }
482 
488 void VuoThreadManager::grantThreadsToSubcomposition(unsigned long eventId, const char *compositionIdentifier, unsigned long chainIndex,
489  const char *subcompositionIdentifier)
490 {
491  unsigned long compositionHash = VuoRuntimeUtilities::hash(compositionIdentifier);
492  unsigned long subcompositionHash = VuoRuntimeUtilities::hash(subcompositionIdentifier);
493 
494  dispatch_sync(threadPoolSync, ^{
495  map<unsigned long, ThreadPool> &eventThreadPools = triggerThreadPools[eventId];
496  int threadsClaimedForChain = eventThreadPools[compositionHash].getThreadsClaimed(chainIndex);
497  eventThreadPools[subcompositionHash].setTotalThreads(threadsClaimedForChain);
498  });
499 }
500 
505 {
506  dispatch_sync(threadPoolSync, ^{
507  triggerThreadPools.erase(eventId);
508  mainThreadPool.returnThreads(eventId);
509  });
510 
511  dispatch_semaphore_signal(workersUpdated);
512 }
513 
517 void VuoThreadManager::returnThreadsForChainWorker(unsigned long eventId, const char *compositionIdentifier, unsigned long chainIndex)
518 {
519  unsigned long compositionHash = VuoRuntimeUtilities::hash(compositionIdentifier);
520 
521  dispatch_sync(threadPoolSync, ^{
522  ThreadPool &triggerThreadPool = triggerThreadPools[eventId][compositionHash];
523  triggerThreadPool.returnThreads(chainIndex);
524 
525  // If this was the last chain for the event (for the current subcomposition or overall),
526  // erase the thread pool entries for the event.
527  set<unsigned long> &workersCompleted = triggerThreadPool.workersCompleted;
528  if (workersCompleted.size() == triggerThreadPool.totalWorkers)
529  {
530  map<unsigned long, ThreadPool> &eventThreadPools = triggerThreadPools[eventId];
531  eventThreadPools.erase(compositionHash);
532  if (eventThreadPools.empty())
533  {
534  triggerThreadPools.erase(eventId);
535  mainThreadPool.returnThreads(eventId);
536  }
537  }
538  });
539 
540  dispatch_semaphore_signal(workersUpdated);
541 }
542 
543 extern "C"
544 {
545 
549 void vuoScheduleTriggerWorker(VuoCompositionState *compositionState, dispatch_queue_t queue, void *context, void (*function)(void *),
550  int minThreadsNeeded, int maxThreadsNeeded, unsigned long eventId, int chainCount)
551 {
553  const char *compositionIdentifier = compositionState->compositionIdentifier;
555  minThreadsNeeded, maxThreadsNeeded, eventId,
556  compositionIdentifier, chainCount);
557 }
558 
562 void vuoScheduleChainWorker(VuoCompositionState *compositionState, dispatch_queue_t queue, void *context, void (*function)(void *),
563  int minThreadsNeeded, int maxThreadsNeeded, unsigned long eventId, unsigned long chainIndex,
564  unsigned long *upstreamChainIndices, int upstreamChainIndicesCount)
565 {
567  const char *compositionIdentifier = compositionState->compositionIdentifier;
569  minThreadsNeeded, maxThreadsNeeded, eventId, compositionIdentifier,
570  chainIndex, upstreamChainIndices, upstreamChainIndicesCount);
571 }
572 
577  int minThreadsNeeded, int maxThreadsNeeded, unsigned long eventId, unsigned long chainIndex)
578 {
580  const char *compositionIdentifier = compositionState->compositionIdentifier;
581  runtimeState->persistentState->threadManager->grantThreadsToChain(minThreadsNeeded, maxThreadsNeeded, eventId, compositionIdentifier,
582  chainIndex);
583 }
584 
588 void vuoGrantThreadsToSubcomposition(VuoCompositionState *compositionState, unsigned long eventId, unsigned long chainIndex,
589  char *subcompositionIdentifier)
590 {
592  const char *compositionIdentifier = compositionState->compositionIdentifier;
593  runtimeState->persistentState->threadManager->grantThreadsToSubcomposition(eventId, compositionIdentifier, chainIndex,
594  subcompositionIdentifier);
595 }
596 
600 void vuoReturnThreadsForTriggerWorker(VuoCompositionState *compositionState, unsigned long eventId)
601 {
604 }
605 
609 void vuoReturnThreadsForChainWorker(VuoCompositionState *compositionState, unsigned long eventId, unsigned long chainIndex)
610 {
612  const char *compositionIdentifier = compositionState->compositionIdentifier;
613  runtimeState->persistentState->threadManager->returnThreadsForChainWorker(eventId, compositionIdentifier, chainIndex);
614 }
615 
616 }